|
@@ -1,3 +1,6 @@
|
|
|
|
|
+//! NanomonSV structural variant caller orchestration (paired and solo).
|
|
|
|
|
+//!
|
|
|
|
|
+//! Runs parse/get and PASS filtering through the shared runner interfaces (local/Slurm) using the global `Config`.
|
|
|
use rayon::prelude::*;
|
|
use rayon::prelude::*;
|
|
|
use std::{
|
|
use std::{
|
|
|
fs::{self},
|
|
fs::{self},
|
|
@@ -11,12 +14,16 @@ use log::{debug, error, info, warn};
|
|
|
use crate::{
|
|
use crate::{
|
|
|
annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
|
|
annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
|
|
|
collection::vcf::Vcf,
|
|
collection::vcf::Vcf,
|
|
|
- commands::bcftools::{BcftoolsConfig, bcftools_concat, bcftools_keep_pass},
|
|
|
|
|
|
|
+ commands::{
|
|
|
|
|
+ bcftools::{BcftoolsConcat, BcftoolsKeepPass},
|
|
|
|
|
+ CapturedOutput, Command as JobCommand, LocalRunner, SlurmParams, SlurmRunner,
|
|
|
|
|
+ },
|
|
|
config::Config,
|
|
config::Config,
|
|
|
helpers::{is_file_older, remove_dir_if_exists},
|
|
helpers::{is_file_older, remove_dir_if_exists},
|
|
|
io::vcf::read_vcf,
|
|
io::vcf::read_vcf,
|
|
|
pipes::{Initialize, InitializeSolo, ShouldRun, Version},
|
|
pipes::{Initialize, InitializeSolo, ShouldRun, Version},
|
|
|
- runners::{CommandRun, Run, RunReport, run_wait},
|
|
|
|
|
|
|
+ run,
|
|
|
|
|
+ runners::Run,
|
|
|
variant::{
|
|
variant::{
|
|
|
variant::{Label, Variants},
|
|
variant::{Label, Variants},
|
|
|
variant_collection::VariantCollection,
|
|
variant_collection::VariantCollection,
|
|
@@ -31,6 +38,31 @@ pub struct NanomonSV {
|
|
|
pub id: String,
|
|
pub id: String,
|
|
|
pub log_dir: String,
|
|
pub log_dir: String,
|
|
|
pub config: Config,
|
|
pub config: Config,
|
|
|
|
|
+
|
|
|
|
|
+ // Command args and threads used by the shared runner.
|
|
|
|
|
+ job_args: Vec<String>,
|
|
|
|
|
+ threads: u8,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl JobCommand for NanomonSV {
|
|
|
|
|
+ fn cmd(&self) -> String {
|
|
|
|
|
+ format!("{} {}", self.config.nanomonsv_bin, self.job_args.join(" "))
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl LocalRunner for NanomonSV {}
|
|
|
|
|
+
|
|
|
|
|
+impl SlurmRunner for NanomonSV {
|
|
|
|
|
+ fn slurm_args(&self) -> Vec<String> {
|
|
|
|
|
+ SlurmParams {
|
|
|
|
|
+ job_name: Some(format!("nanomonsv_{}", self.id)),
|
|
|
|
|
+ cpus_per_task: Some(self.threads as u32),
|
|
|
|
|
+ mem: Some("60G".into()),
|
|
|
|
|
+ partition: Some("shortq".into()),
|
|
|
|
|
+ gres: None,
|
|
|
|
|
+ }
|
|
|
|
|
+ .to_args()
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
impl Initialize for NanomonSV {
|
|
impl Initialize for NanomonSV {
|
|
@@ -45,7 +77,7 @@ impl Initialize for NanomonSV {
|
|
|
///
|
|
///
|
|
|
/// # Returns
|
|
/// # Returns
|
|
|
/// A fully prepared `NanomonSV` instance ready to run.
|
|
/// A fully prepared `NanomonSV` instance ready to run.
|
|
|
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
|
|
|
|
+ fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
let id = id.to_string();
|
|
|
info!("Initialize Nanomonsv for {id}.");
|
|
info!("Initialize Nanomonsv for {id}.");
|
|
|
|
|
|
|
@@ -54,7 +86,9 @@ impl Initialize for NanomonSV {
|
|
|
let nanomonsv = Self {
|
|
let nanomonsv = Self {
|
|
|
id,
|
|
id,
|
|
|
log_dir,
|
|
log_dir,
|
|
|
- config,
|
|
|
|
|
|
|
+ config: config.clone(),
|
|
|
|
|
+ job_args: Vec::new(),
|
|
|
|
|
+ threads: config.nanomonsv_threads,
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
if nanomonsv.config.nanomonsv_force {
|
|
if nanomonsv.config.nanomonsv_force {
|
|
@@ -137,7 +171,7 @@ impl Run for NanomonSV {
|
|
|
.context(format!(
|
|
.context(format!(
|
|
|
"Error while running NanomonSV get for {mrd_result_vcf}"
|
|
"Error while running NanomonSV get for {mrd_result_vcf}"
|
|
|
))?;
|
|
))?;
|
|
|
- report.save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))?;
|
|
|
|
|
|
|
+ report.save_to_file(format!("{}/nanomonsv_get_mrd_", self.log_dir))?;
|
|
|
} else {
|
|
} else {
|
|
|
debug!(
|
|
debug!(
|
|
|
"NanomonSV `get` results already exists for {} normal, skipping execution.",
|
|
"NanomonSV `get` results already exists for {} normal, skipping execution.",
|
|
@@ -157,7 +191,7 @@ impl Run for NanomonSV {
|
|
|
.context(format!(
|
|
.context(format!(
|
|
|
"Error while running NanomonSV get for {diag_result_vcf}"
|
|
"Error while running NanomonSV get for {diag_result_vcf}"
|
|
|
))?;
|
|
))?;
|
|
|
- report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir))?;
|
|
|
|
|
|
|
+ report.save_to_file(format!("{}/nanomonsv_get_diag_", self.log_dir))?;
|
|
|
} else {
|
|
} else {
|
|
|
debug!(
|
|
debug!(
|
|
|
"NanomonSV `get` results already exists for {} tumoral, skipping execution.",
|
|
"NanomonSV `get` results already exists for {} tumoral, skipping execution.",
|
|
@@ -166,11 +200,12 @@ impl Run for NanomonSV {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if !Path::new(&vcf_passed).exists() {
|
|
if !Path::new(&vcf_passed).exists() {
|
|
|
|
|
+ let mut keep =
|
|
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, &diag_result_vcf, &vcf_passed);
|
|
|
let report =
|
|
let report =
|
|
|
- bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default())
|
|
|
|
|
- .context(format!("Can't index {vcf_passed}"))?;
|
|
|
|
|
|
|
+ run!(&self.config, &mut keep).context(format!("Can't index {vcf_passed}"))?;
|
|
|
report
|
|
report
|
|
|
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
|
|
|
|
|
|
|
+ .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
|
|
|
.context("Failed to save report")?;
|
|
.context("Failed to save report")?;
|
|
|
} else {
|
|
} else {
|
|
|
debug!(
|
|
debug!(
|
|
@@ -197,14 +232,16 @@ impl Version for NanomonSV {
|
|
|
/// # Errors
|
|
/// # Errors
|
|
|
/// Returns an error if command execution fails or "Version " not found in output.
|
|
/// Returns an error if command execution fails or "Version " not found in output.
|
|
|
fn version(config: &Config) -> anyhow::Result<String> {
|
|
fn version(config: &Config) -> anyhow::Result<String> {
|
|
|
- let args = ["--version"];
|
|
|
|
|
- let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
|
|
|
|
|
-
|
|
|
|
|
- let report = run_wait(&mut cmd_run).context(format!(
|
|
|
|
|
- "Error while running `NanomonSV {}`",
|
|
|
|
|
- args.join(" ")
|
|
|
|
|
- ))?;
|
|
|
|
|
- let log = report.log;
|
|
|
|
|
|
|
+ let mut runner = NanomonSV {
|
|
|
|
|
+ id: "version".to_string(),
|
|
|
|
|
+ log_dir: config.tmp_dir.clone(),
|
|
|
|
|
+ config: config.clone(),
|
|
|
|
|
+ job_args: vec!["--version".into()],
|
|
|
|
|
+ threads: 1,
|
|
|
|
|
+ };
|
|
|
|
|
+ let out = run!(&runner.config, &mut runner)
|
|
|
|
|
+ .context("Error while running `NanomonSV --version`")?;
|
|
|
|
|
+ let log = format!("{}{}", out.stdout, out.stderr);
|
|
|
let start = log
|
|
let start = log
|
|
|
.find("stdout: nanomonsv ")
|
|
.find("stdout: nanomonsv ")
|
|
|
.context("Failed to find 'stdout: nanomonsv ' in the log")?;
|
|
.context("Failed to find 'stdout: nanomonsv ' in the log")?;
|
|
@@ -284,7 +321,7 @@ impl InitializeSolo for NanomonSVSolo {
|
|
|
///
|
|
///
|
|
|
/// # Errors
|
|
/// # Errors
|
|
|
/// Returns an error if directory creation fails.
|
|
/// Returns an error if directory creation fails.
|
|
|
- fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
|
|
|
|
+ fn initialize(id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
let id = id.to_string();
|
|
|
info!("Initialize Nanomonsv solo for {id} {time}.");
|
|
info!("Initialize Nanomonsv solo for {id} {time}.");
|
|
|
let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id);
|
|
let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id);
|
|
@@ -308,7 +345,7 @@ impl InitializeSolo for NanomonSVSolo {
|
|
|
out_dir,
|
|
out_dir,
|
|
|
log_dir,
|
|
log_dir,
|
|
|
vcf_passed,
|
|
vcf_passed,
|
|
|
- config,
|
|
|
|
|
|
|
+ config: config.clone(),
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -352,11 +389,11 @@ impl Run for NanomonSVSolo {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if !Path::new(&self.vcf_passed).exists() {
|
|
if !Path::new(&self.vcf_passed).exists() {
|
|
|
- let report =
|
|
|
|
|
- bcftools_keep_pass(&result_vcf, &self.vcf_passed, BcftoolsConfig::default())
|
|
|
|
|
- .context(format!(
|
|
|
|
|
- "Error while running bcftools keep PASS for {result_vcf}"
|
|
|
|
|
- ))?;
|
|
|
|
|
|
|
+ let mut keep =
|
|
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, &result_vcf, &self.vcf_passed);
|
|
|
|
|
+ let report = run!(&self.config, &mut keep).context(format!(
|
|
|
|
|
+ "Error while running bcftools keep PASS for {result_vcf}"
|
|
|
|
|
+ ))?;
|
|
|
|
|
|
|
|
let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
|
report
|
|
report
|
|
@@ -423,17 +460,26 @@ impl Variants for NanomonSVSolo {
|
|
|
///
|
|
///
|
|
|
/// # Errors
|
|
/// # Errors
|
|
|
/// Returns an error if command execution fails.
|
|
/// Returns an error if command execution fails.
|
|
|
-pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::Result<RunReport> {
|
|
|
|
|
|
|
+pub fn nanomonsv_parse(
|
|
|
|
|
+ bam: &str,
|
|
|
|
|
+ out_prefix: &str,
|
|
|
|
|
+ config: &Config,
|
|
|
|
|
+) -> anyhow::Result<CapturedOutput> {
|
|
|
let args = vec![
|
|
let args = vec![
|
|
|
- "parse",
|
|
|
|
|
- "--reference_fasta",
|
|
|
|
|
- &config.reference,
|
|
|
|
|
- bam,
|
|
|
|
|
- out_prefix,
|
|
|
|
|
|
|
+ "parse".to_string(),
|
|
|
|
|
+ "--reference_fasta".to_string(),
|
|
|
|
|
+ config.reference.clone(),
|
|
|
|
|
+ bam.to_string(),
|
|
|
|
|
+ out_prefix.to_string(),
|
|
|
];
|
|
];
|
|
|
- let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
|
|
|
|
|
- let res = run_wait(&mut cmd_run)?;
|
|
|
|
|
- Ok(res)
|
|
|
|
|
|
|
+ let mut runner = NanomonSV {
|
|
|
|
|
+ id: "nanomonsv".to_string(),
|
|
|
|
|
+ log_dir: config.tmp_dir.clone(),
|
|
|
|
|
+ config: config.clone(),
|
|
|
|
|
+ job_args: args,
|
|
|
|
|
+ threads: config.nanomonsv_threads,
|
|
|
|
|
+ };
|
|
|
|
|
+ run!(config, &mut runner)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Executes the NanomonSV `parse` step in parallel for both diagnostic and MRD BAMs.
|
|
/// Executes the NanomonSV `parse` step in parallel for both diagnostic and MRD BAMs.
|
|
@@ -504,7 +550,7 @@ fn spawn_parse_thread(
|
|
|
.with_context(|| format!("Failed to parse BAM: {bam}"))?;
|
|
.with_context(|| format!("Failed to parse BAM: {bam}"))?;
|
|
|
|
|
|
|
|
report
|
|
report
|
|
|
- .save_to_file(&format!("{log_dir}/nanomonsv_parse_{bam}_"))
|
|
|
|
|
|
|
+ .save_to_file(format!("{log_dir}/nanomonsv_parse_{bam}_"))
|
|
|
.with_context(|| format!("Failed to save report for BAM: {bam}"))?;
|
|
.with_context(|| format!("Failed to save report for BAM: {bam}"))?;
|
|
|
|
|
|
|
|
Ok(())
|
|
Ok(())
|
|
@@ -529,29 +575,34 @@ pub fn nanomonsv_get(
|
|
|
ctrl_bam: Option<&str>,
|
|
ctrl_bam: Option<&str>,
|
|
|
ctrl_prefix: Option<&str>,
|
|
ctrl_prefix: Option<&str>,
|
|
|
config: &Config,
|
|
config: &Config,
|
|
|
-) -> anyhow::Result<RunReport> {
|
|
|
|
|
|
|
+) -> anyhow::Result<CapturedOutput> {
|
|
|
let threads = config.nanomonsv_threads.to_string();
|
|
let threads = config.nanomonsv_threads.to_string();
|
|
|
- let mut args = vec!["get"];
|
|
|
|
|
|
|
+ let mut args: Vec<String> = vec!["get".into()];
|
|
|
|
|
|
|
|
if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
|
|
if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
|
|
|
args.extend(vec![
|
|
args.extend(vec![
|
|
|
- "--control_prefix",
|
|
|
|
|
- ctrl_prefix,
|
|
|
|
|
- "--control_bam",
|
|
|
|
|
- ctrl_bam,
|
|
|
|
|
|
|
+ "--control_prefix".into(),
|
|
|
|
|
+ ctrl_prefix.to_string(),
|
|
|
|
|
+ "--control_bam".into(),
|
|
|
|
|
+ ctrl_bam.to_string(),
|
|
|
])
|
|
])
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
args.extend(vec![
|
|
args.extend(vec![
|
|
|
- "--process",
|
|
|
|
|
- &threads,
|
|
|
|
|
- out_prefix,
|
|
|
|
|
- bam,
|
|
|
|
|
- &config.reference,
|
|
|
|
|
|
|
+ "--process".into(),
|
|
|
|
|
+ threads,
|
|
|
|
|
+ out_prefix.to_string(),
|
|
|
|
|
+ bam.to_string(),
|
|
|
|
|
+ config.reference.clone(),
|
|
|
]);
|
|
]);
|
|
|
- let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
|
|
|
|
|
- let res = run_wait(&mut cmd_run)?;
|
|
|
|
|
- Ok(res)
|
|
|
|
|
|
|
+ let mut runner = NanomonSV {
|
|
|
|
|
+ id: "nanomonsv".to_string(),
|
|
|
|
|
+ log_dir: config.tmp_dir.clone(),
|
|
|
|
|
+ config: config.clone(),
|
|
|
|
|
+ job_args: args,
|
|
|
|
|
+ threads: config.nanomonsv_threads,
|
|
|
|
|
+ };
|
|
|
|
|
+ run!(config, &mut runner)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Creates a panel of normals (PON) from MRD NanomonSV results.
|
|
/// Creates a panel of normals (PON) from MRD NanomonSV results.
|
|
@@ -562,7 +613,12 @@ pub fn nanomonsv_get(
|
|
|
/// Returns an error if directory traversal, filtering, or concatenation fails.
|
|
/// Returns an error if directory traversal, filtering, or concatenation fails.
|
|
|
pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
|
|
pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
|
|
|
let mut passed_mrd = Vec::new();
|
|
let mut passed_mrd = Vec::new();
|
|
|
- for mrd_dir in find_nanomonsv_dirs(&PathBuf::from(&config.result_dir), "mrd", 0, 3) {
|
|
|
|
|
|
|
+ for mrd_dir in find_nanomonsv_dirs(
|
|
|
|
|
+ &PathBuf::from(&config.result_dir),
|
|
|
|
|
+ &config.normal_name,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ 3,
|
|
|
|
|
+ ) {
|
|
|
let mut passed = None;
|
|
let mut passed = None;
|
|
|
let mut passed_csi = None;
|
|
let mut passed_csi = None;
|
|
|
let mut result = None;
|
|
let mut result = None;
|
|
@@ -589,36 +645,34 @@ pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<(
|
|
|
(Some(p), None, None) => {
|
|
(Some(p), None, None) => {
|
|
|
let output = replace_filename_suffix(
|
|
let output = replace_filename_suffix(
|
|
|
&p,
|
|
&p,
|
|
|
- "_mrd.nanomonsv.result.vcf",
|
|
|
|
|
- "_mrd_nanomonsv_PASSED.vcf.gz",
|
|
|
|
|
|
|
+ &format!("_{}.nanomonsv.result.vcf", config.normal_name),
|
|
|
|
|
+ &format!("_{}_nanomonsv_PASSED.vcf.gz", config.normal_name),
|
|
|
);
|
|
);
|
|
|
info!("Do pass for {} to {}", p.display(), output.display());
|
|
info!("Do pass for {} to {}", p.display(), output.display());
|
|
|
|
|
|
|
|
- if let Err(r) = bcftools_keep_pass(
|
|
|
|
|
- p.to_str().unwrap(),
|
|
|
|
|
- output.to_str().unwrap(),
|
|
|
|
|
- BcftoolsConfig::default(),
|
|
|
|
|
- ) {
|
|
|
|
|
|
|
+ let mut keep = BcftoolsKeepPass::from_config(config, p, &output);
|
|
|
|
|
+ if let Err(r) = run!(config, &mut keep) {
|
|
|
error!("{r}");
|
|
error!("{r}");
|
|
|
} else {
|
|
} else {
|
|
|
passed_mrd.push(output);
|
|
passed_mrd.push(output);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- (Some(_), Some(p), None) => warn!("Do csi for {}", p.display()),
|
|
|
|
|
|
|
+ (Some(_), Some(p), None) => warn!("Prossing csi for {}", p.display()),
|
|
|
(Some(_), Some(p), Some(_)) => passed_mrd.push(p),
|
|
(Some(_), Some(p), Some(_)) => passed_mrd.push(p),
|
|
|
_ => {} // All files found
|
|
_ => {} // All files found
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
println!("{} vcf to concat", passed_mrd.len());
|
|
println!("{} vcf to concat", passed_mrd.len());
|
|
|
- bcftools_concat(
|
|
|
|
|
|
|
+ let mut concat = BcftoolsConcat::from_config(
|
|
|
|
|
+ config,
|
|
|
passed_mrd
|
|
passed_mrd
|
|
|
.iter()
|
|
.iter()
|
|
|
.map(|p| p.to_string_lossy().to_string())
|
|
.map(|p| p.to_string_lossy().to_string())
|
|
|
.collect(),
|
|
.collect(),
|
|
|
pon_path,
|
|
pon_path,
|
|
|
- BcftoolsConfig::default(),
|
|
|
|
|
- )?;
|
|
|
|
|
|
|
+ );
|
|
|
|
|
+ run!(config, &mut concat)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|