use rayon::prelude::*; use std::{ fs::{self}, path::{Path, PathBuf}, thread, }; use anyhow::Context; use log::{debug, error, info, warn}; use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat}, collection::{vcf::Vcf, Initialize, InitializeSolo}, commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig}, config::Config, io::vcf::read_vcf, runners::{run_wait, CommandRun, Run, RunReport}, variant::{ variant::{RunnerVariants, Variants}, variant_collection::VariantCollection, }, }; #[derive(Debug)] pub struct NanomonSV { pub id: String, pub diag_bam: String, pub mrd_bam: String, pub diag_out_dir: String, pub mrd_out_dir: String, pub log_dir: String, pub vcf_passed: String, pub config: Config, } impl Initialize for NanomonSV { fn initialize(id: &str, config: Config) -> anyhow::Result { let id = id.to_string(); info!("Initialize Nanomonsv for {id}."); let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id); if !Path::new(&log_dir).exists() { fs::create_dir_all(&log_dir) .context(format!("Failed to create {log_dir} directory"))?; } let diag_bam = config.tumoral_bam(&id); let mrd_bam = config.normal_bam(&id); let diag_out_dir = config.nanomonsv_output_dir(&id, "diag"); fs::create_dir_all(&diag_out_dir)?; let mrd_out_dir = config.nanomonsv_output_dir(&id, "mrd"); fs::create_dir_all(&mrd_out_dir)?; let vcf_passed = config.nanomonsv_passed_vcf(&id); Ok(Self { id, diag_bam, mrd_bam, diag_out_dir, mrd_out_dir, log_dir, vcf_passed, config, }) } } impl Run for NanomonSV { fn run(&mut self) -> anyhow::Result<()> { somatic_parse(self)?; let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id); let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id); let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf"); let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf"); if !Path::new(&mrd_result_vcf).exists() { info!("Nanomonsv get from normal bam: {}.", self.mrd_bam); let report = nanomonsv_get(&self.mrd_bam, &mrd_out_prefix, None, None, &self.config) .context(format!( "Error while running NanomonSV get for {mrd_result_vcf}" ))?; report.save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))?; } if !Path::new(&diag_result_vcf).exists() { info!("NanomonSV get from tumoral bam: {}.", self.diag_bam); let report = nanomonsv_get( &self.diag_bam, &diag_out_prefix, Some(&self.mrd_bam), Some(&mrd_out_prefix), &self.config, ) .context(format!( "Error while running NanomonSV get for {diag_result_vcf}" ))?; report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?; } if !Path::new(&self.vcf_passed).exists() { let report = bcftools_keep_pass( &diag_result_vcf, &self.vcf_passed, BcftoolsConfig::default(), ) .context(format!("Can't index {}", self.vcf_passed))?; report .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,)) .context("Can't save report")?; } Ok(()) } } impl CallerCat for NanomonSV { fn caller_cat(&self) -> (Caller, Annotation) { (Caller::NanomonSV, Annotation::Somatic) } } impl Variants for NanomonSV { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let (caller, category) = self.caller_cat(); let add = vec![Annotation::Callers(caller.clone()), category.clone()]; info!( "Loading variants from {} {}: {}", caller, category, self.vcf_passed ); let variants = read_vcf(&self.vcf_passed)?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), &add); }); info!( "{} {}, {} variants loaded.", caller, category, variants.len() ); Ok(VariantCollection { variants, vcf: Vcf::new(self.vcf_passed.clone().into())?, caller, category, }) } } impl RunnerVariants for NanomonSV {} #[derive(Debug)] pub struct NanomonSVSolo { pub id: String, pub bam: String, pub time_point: String, pub out_dir: String, pub log_dir: String, pub vcf_passed: String, pub config: Config, } impl InitializeSolo for NanomonSVSolo { fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result { let id = id.to_string(); info!("Initialize Nanomonsv solo for {id} {time}."); let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id); if !Path::new(&log_dir).exists() { fs::create_dir_all(&log_dir) .context(format!("Failed to create {log_dir} directory"))?; } let out_dir = config.nanomonsv_solo_output_dir(&id, time); fs::create_dir_all(&out_dir)?; let bam = config.solo_bam(&id, time); let vcf_passed = config.nanomonsv_solo_passed_vcf(&id, time); Ok(Self { id, bam, time_point: time.to_string(), out_dir, log_dir, vcf_passed, config, }) } } impl Run for NanomonSVSolo { fn run(&mut self) -> anyhow::Result<()> { // Parse info!("Nanomonsv Parse"); let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point); let info_vcf = format!("{out_prefix}.bp_info.sorted.bed.gz"); if !Path::new(&info_vcf).exists() { let report = nanomonsv_parse(&self.bam, &out_prefix, &self.config).context(format!( "Error while running NanomonSV parse for {}", self.bam ))?; let log_file = format!("{}/nanomonsv_solo_parse_{}_", self.log_dir, self.time_point); report .save_to_file(&log_file) .context(format!("Can't write logs into {log_file}"))?; } // Get let result_vcf = format!("{out_prefix}.nanomonsv.result.vcf"); if !Path::new(&result_vcf).exists() { info!("Nanomonsv Get"); let report = nanomonsv_get(&self.bam, &out_prefix, None, None, &self.config).context( format!("Error while running NanomonSV get for {result_vcf}"), )?; let log_file = format!("{}/nanomonsv_solo_get_{}_", self.log_dir, self.time_point); report .save_to_file(&log_file) .context(format!("Error while writing log into {log_file}"))?; } if !Path::new(&self.vcf_passed).exists() { let report = bcftools_keep_pass(&result_vcf, &self.vcf_passed, BcftoolsConfig::default()) .context(format!( "Error while running bcftools keep PASS for {result_vcf}" ))?; let log_file = format!("{}/bcftools_pass_", self.log_dir); report .save_to_file(&log_file) .context(format!("Error while writing log into {log_file}"))?; } Ok(()) } } impl CallerCat for NanomonSVSolo { fn caller_cat(&self) -> (Caller, Annotation) { let Config { normal_name, tumoral_name, .. } = &self.config; let cat = if *normal_name == self.time_point { Annotation::SoloConstit } else if *tumoral_name == self.time_point { Annotation::SoloTumor } else { panic!("Error in time_point name: {}", self.time_point); }; (Caller::NanomonSVSolo, cat) } } impl RunnerVariants for NanomonSVSolo {} impl Variants for NanomonSVSolo { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let (caller, category) = self.caller_cat(); let add = vec![Annotation::Callers(caller.clone()), category.clone()]; info!( "Loading variant from ClairS {} with annotations: {:?}", self.id, add ); let variants = read_vcf(&self.vcf_passed)?; variants.par_iter().for_each(|v| { let var_cat = v.alteration_category(); annotations.insert_update( v.hash(), &vec![ Annotation::Callers(caller.clone()), category.clone(), Annotation::AlterationCategory(var_cat), ], ); }); Ok(VariantCollection { variants, vcf: Vcf::new(self.vcf_passed.clone().into())?, caller, category, }) } } // Helper functions pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::Result { let args = vec![ "parse", "--reference_fasta", &config.reference, bam, out_prefix, ]; let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args); let res = run_wait(&mut cmd_run)?; Ok(res) } pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> { let diag_out_prefix = format!("{}/{}_diag", s.diag_out_dir, s.id); let mrd_out_prefix = format!("{}/{}_mrd", s.mrd_out_dir, s.id); let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz"); let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz"); let mut threads_handles = Vec::new(); if !Path::new(&diag_info_vcf).exists() { let diag_bam = s.diag_bam.clone(); info!("Nanomonsv parse {diag_bam}."); let diag_out_prefix = diag_out_prefix.clone(); let config = s.config.clone(); let log_dir = s.log_dir.clone(); let handle = thread::spawn(move || { let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, &config).unwrap(); report .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_")) .unwrap(); }); threads_handles.push(handle); } else { debug!("Nanonmonsv parse results already exists: {diag_info_vcf}"); } if !Path::new(&mrd_info_vcf).exists() { let mrd_bam = s.mrd_bam.clone(); info!("Nanomonsv parse {mrd_bam}."); let mrd_out_prefix = mrd_out_prefix.clone(); let config = s.config.clone(); let log_dir = s.log_dir.clone(); let handle = thread::spawn(move || { let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, &config).unwrap(); report .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_")) .unwrap(); }); threads_handles.push(handle); } else { debug!("Nanonmonsv parse results already exists: {mrd_info_vcf}"); } // Wait for all threads to finish for handle in threads_handles { handle.join().expect("Thread panicked for nanomonsv parse"); } Ok(()) } pub fn nanomonsv_get( bam: &str, out_prefix: &str, ctrl_bam: Option<&str>, ctrl_prefix: Option<&str>, config: &Config, ) -> anyhow::Result { let threads = config.nanomonsv_threads.to_string(); let mut args = vec!["get"]; if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) { args.extend(vec![ "--control_prefix", ctrl_prefix, "--control_bam", ctrl_bam, ]) } args.extend(vec![ "--process", &threads, out_prefix, bam, &config.reference, ]); let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args); let res = run_wait(&mut cmd_run)?; Ok(res) } pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> { let mut passed_mrd = Vec::new(); for mrd_dir in find_nanomonsv_dirs(&PathBuf::from(&config.result_dir), "mrd", 0, 3) { let mut passed = None; let mut passed_csi = None; let mut result = None; for entry in fs::read_dir(&mrd_dir) .with_context(|| format!("Failed to read directory: {}", mrd_dir.display()))? { let entry = entry.with_context(|| format!("Failed to read entry in: {}", mrd_dir.display()))?; let file_name = entry.file_name().to_string_lossy().to_string(); let path = entry.path(); if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz") { passed = Some(path); } else if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz.csi") { passed_csi = Some(path); } else if file_name.ends_with("_mrd.nanomonsv.result.vcf") { result = Some(path); } } match (result, passed.clone(), passed_csi) { (None, None, None) => error!("No result in {}", mrd_dir.display()), (Some(p), None, None) => { let output = replace_filename_suffix( &p, "_mrd.nanomonsv.result.vcf", "_mrd_nanomonsv_PASSED.vcf.gz", ); info!("Do pass for {} to {}", p.display(), output.display()); if let Err(r) = bcftools_keep_pass( p.to_str().unwrap(), output.to_str().unwrap(), BcftoolsConfig::default(), ) { error!("{r}"); } else { passed_mrd.push(output); } } (Some(_), Some(p), None) => warn!("Do csi for {}", p.display()), (Some(_), Some(p), Some(_)) => passed_mrd.push(p), _ => {} // All files found } } println!("{} vcf to concat", passed_mrd.len()); bcftools_concat( passed_mrd .iter() .map(|p| p.to_string_lossy().to_string()) .collect(), pon_path, BcftoolsConfig::default(), )?; Ok(()) } pub fn find_nanomonsv_dirs( root: &Path, time_point: &str, depth: usize, max_depth: usize, ) -> Vec { if depth >= max_depth { return Vec::new(); } let entries: Vec<_> = match fs::read_dir(root) { Ok(entries) => entries.filter_map(Result::ok).collect(), Err(_) => return Vec::new(), }; let mut result: Vec = entries .iter() .filter_map(|entry| { let path = entry.path(); if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) && path .to_string_lossy() .contains(&format!("{}/nanomonsv", time_point)) { Some(path) } else { None } }) .collect(); result.extend( entries .iter() .filter(|entry| entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false)) .flat_map(|dir| find_nanomonsv_dirs(&dir.path(), time_point, depth + 1, max_depth)), ); result } pub fn replace_filename_suffix(path: &Path, from: &str, to: &str) -> PathBuf { let file_name = path.file_name().and_then(|s| s.to_str()).unwrap_or(""); let new_file_name = file_name.replace(from, to); path.with_file_name(new_file_name) }