use std::{ collections::HashMap, fmt, fs::{self, metadata}, path::{Path, PathBuf}, thread, time::SystemTime, }; use anyhow::Context; use chrono::{DateTime, Utc}; use glob::glob; use log::{error, info, warn}; use modbases::{Dmr, ModBasesCollection, ModType}; use pandora_lib_variants::variants::Variants; use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection}; use crate::{ callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV}, collection::pod5::FlowCellCase, commands::{ dorado::Dorado as BasecallAlign, modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig}, }, config::Config, functions::{ assembler::{Assembler, AssemblerConfig}, variants::{RunVariantsAgg, VariantsConfig}, }, runners::Run, scan::scan::{par_whole_scan, par_whole_scan_local}, }; pub mod bam; pub mod modbases; pub mod pod5; pub mod vcf; #[derive(Debug, Clone)] pub struct CollectionsConfig { pub pod_dir: String, pub corrected_fc_path: String, pub result_dir: String, pub dict_file: String, pub min_diag_cov: f32, pub min_mrd_cov: f32, pub id_black_list: Vec, } impl Default for CollectionsConfig { fn default() -> Self { Self { pod_dir: "/data/run_data".to_string(), corrected_fc_path: "/data/flow_cells.tsv".to_string(), result_dir: "/data/longreads_basic_pipe".to_string(), dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(), min_diag_cov: 11.0, min_mrd_cov: 10.0, id_black_list: Vec::default(), } } } #[derive(Debug)] pub struct Collections { pub config: CollectionsConfig, pub pod5: Pod5Collection, pub bam: BamCollection, pub vcf: VcfCollection, pub modbases: ModBasesCollection, pub tasks: Vec, } impl Collections { pub fn new(config: CollectionsConfig) -> anyhow::Result { let CollectionsConfig { pod_dir, corrected_fc_path, result_dir, .. } = &config; let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?; // let pod5 = Pod5Collection::default(); let bam = BamCollection::new(result_dir); let vcf = VcfCollection::new(result_dir); let modbases = ModBasesCollection::new(result_dir); Ok(Self { pod5, bam, vcf, modbases, tasks: Vec::new(), config, }) } pub fn todo(&mut self) -> anyhow::Result<()> { info!("Looking for base calling tasks..."); let mut tasks = Vec::new(); let mut to_demux = Vec::new(); for run in self.pod5.runs.iter() { for fc in run.flowcells.iter() { let acq_id = fc.pod5_info.acquisition_id.clone(); for case in fc.cases.iter() { let bams_ids: Vec = self .bam .get(&case.id, &case.time_point) .iter() .flat_map(|b| { b.composition .iter() .map(|c| c.0.clone()) .collect::>() }) .filter(|id| *id == acq_id) .collect(); if bams_ids.is_empty() { match fc.pod5_type { pod5::Pod5Type::Raw => to_demux.push(case.clone()), pod5::Pod5Type::Demuxed => { tasks.push(CollectionsTasks::Align(case.clone())) } } } } } } // Group for muxed and push task with all cases let mut grouped: HashMap> = HashMap::new(); for case in to_demux { grouped.entry(case.pod_dir.clone()).or_default().push(case); } grouped .into_values() .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data))); // de novo // tasks.extend(self.todo_assembler()?); // Remove VCF anterior to BAM // let vcf_by_id = self.vcf.group_by_id(); // vcf_by_id.iter().for_each(|(id, vcfs)| { // if let (Some(diag), Some(mrd)) = ( // self.bam.get(id, "diag").first(), // self.bam.get(id, "mrd").first(), // ) { // let diag_modified = diag.modified; // let mrd_modified = mrd.modified; // let mut rm_paths: Vec<&Path> = vcfs // .iter() // .flat_map(|vcf| { // let vcf_mod: DateTime = vcf // .file_metadata // .modified() // .expect("Can't read VCF modified time.") // .into(); // // // For somatic caller erase if one bam (diag or mrd) is more recent. // if vcf.caller != "DeepVariant" { // if vcf_mod < diag_modified || vcf_mod < mrd_modified { // vec![vcf.path.parent().unwrap()] // } else { // vec![] // } // } else if (vcf.time_point == "diag" && vcf_mod < diag_modified) // || (vcf.time_point == "mrd" && vcf_mod < mrd_modified) // { // vec![vcf.path.parent().unwrap()] // } else { // vec![] // } // }) // .collect(); // rm_paths.sort(); // rm_paths.dedup(); // rm_paths // .iter() // .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir.")) // } // }); // Variant calling info!("Looking for variant calling tasks..."); // self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| { // if let (Some(diag), Some(mrd)) = ( // self.bam.get(&id, "diag").first(), // self.bam.get(&id, "mrd").first(), // ) { // if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) { // if diag_cramino.mean_coverage >= self.config.min_diag_cov.into() // && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into() // && !self.config.id_black_list.contains(&id) // { // let caller_time: Vec<(&str, &str)> = vcf_by_id // .iter() // .filter(|(i, _)| *i == id) // .flat_map(|(_, vcfs)| { // vcfs.iter() // .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str())) // }) // .collect(); // // if !caller_time.contains(&("clairs", "diag")) // || !caller_time.contains(&("clairs_indel", "diag")) // { // tasks.push(CollectionsTasks::ClairS { // id: id.to_string(), // diag_bam: diag.path.to_str().unwrap().to_string(), // mrd_bam: mrd.path.to_str().unwrap().to_string(), // config: ClairSConfig::default(), // }); // } // if !caller_time.contains(&("DeepVariant", "diag")) { // tasks.push(CollectionsTasks::DeepVariant { // id: id.to_string(), // time_point: "diag".to_string(), // bam: diag.path.to_str().unwrap().to_string(), // config: DeepVariantConfig::default(), // }); // } // if !caller_time.contains(&("DeepVariant", "mrd")) { // tasks.push(CollectionsTasks::DeepVariant { // id: id.to_string(), // time_point: "mrd".to_string(), // bam: mrd.path.to_str().unwrap().to_string(), // config: DeepVariantConfig::default(), // }); // } // if !caller_time.contains(&("nanomonsv", "diag")) { // tasks.push(CollectionsTasks::NanomonSV { // id: id.to_string(), // diag_bam: diag.path.to_str().unwrap().to_string(), // mrd_bam: mrd.path.to_str().unwrap().to_string(), // config: NanomonSVConfig::default(), // }); // } // } // } // } // }); // Tasks dedup let mut hs = HashMap::new(); tasks.into_iter().for_each(|t| { hs.insert(t.to_string(), t); }); self.tasks = hs.into_values().collect(); // Variants DeepVariant // self.tasks.extend(self.todo_deepvariants()); // Variants ClairS // self.tasks.extend(self.todo_clairs()); // Variants Nanomonsv // self.tasks.extend(self.todo_nanomonsv()); // Variants aggregation // self.tasks.extend(self.todo_variants_agg()?); // ModPileup // self.tasks.extend(self.todo_mod_pileup()); // DMR C diag vs mrd // self.tasks.extend(self.todo_dmr_c_diag_mrd()); // Tasks sorting self.tasks.sort_by_cached_key(|task| task.get_order()); Ok(()) } pub fn tasks_dedup(&mut self) { let mut hs = HashMap::new(); self.tasks.clone().into_iter().for_each(|t| { hs.insert(t.to_string(), t); }); self.tasks = hs.into_values().collect(); } pub fn todo_bam_count(&mut self, config: &Config) -> anyhow::Result<()> { // Whole scan for wgs_bam in self .bam .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov) { let id = wgs_bam.id.as_str(); let count_dir = match wgs_bam.time_point.as_str() { "diag" => config.tumoral_dir_count(id), "mrd" => config.normal_dir_count(id), _ => anyhow::bail!("Unknown bam time point {}", wgs_bam.time_point), }; if PathBuf::from(&count_dir).exists() { let dir_mod: DateTime = fs::metadata(&count_dir)?.modified()?.into(); if wgs_bam.modified > dir_mod { fs::remove_dir_all(&count_dir)?; } } if !PathBuf::from(&count_dir).exists() { self.tasks.push(CollectionsTasks::CountBam { bam_path: wgs_bam.path.to_string_lossy().to_string(), count_dir, config: config.clone(), }); } } Ok(()) } // No pair needed pub fn todo_assembler(&self) -> anyhow::Result> { let mut tasks = Vec::new(); let config = AssemblerConfig::default(); for b in &self.bam.bams { let assemblies_dir = format!( "{}/{}/{}/{}", config.result_dir, b.id, b.time_point, config.output_dir_name ); if !Path::new(&assemblies_dir).exists() { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); continue; } let pattern = format!("{assemblies_dir}/*/*.bam"); let mut mtimes: Vec = glob(&pattern)? .filter_map(|entry| entry.ok()) .filter_map(|path| metadata(path).ok()?.modified().ok()) .collect(); if mtimes.is_empty() { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); continue; } mtimes.sort_unstable(); mtimes.dedup(); let max_mtime: DateTime = mtimes.last().context("No modified time")?.to_owned().into(); if b.modified > max_mtime { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); } } Ok(tasks) } pub fn todo_deepvariants(&self) -> Vec { self.bam .bams .iter() .filter_map(|b| { if self.vcf.vcfs.iter().any(|v| { v.caller == "DeepVariant" && v.id == b.id && v.time == b.time_point && v.modified().unwrap_or_default() > b.modified }) { None } else { Some(CollectionsTasks::DeepVariant { id: b.id.clone(), time_point: b.time_point.clone(), bam: b.path.to_string_lossy().to_string(), }) } }) .collect() } pub fn todo_clairs(&self) -> Vec { self.bam_pairs() .iter() .filter_map(|(diag, mrd)| { if self.vcf.vcfs.iter().any(|v| { v.caller == "clairs" && v.id == diag.id && v.time == diag.time_point && (v.modified().unwrap_or_default() > diag.modified || v.modified().unwrap_or_default() > mrd.modified) }) { None } else { Some(CollectionsTasks::ClairS { id: diag.id.clone(), diag_bam: diag.path.to_string_lossy().to_string(), mrd_bam: mrd.path.to_string_lossy().to_string(), }) } }) .collect() } pub fn run_clairs(&self) -> anyhow::Result<()> { for task in self.todo_clairs() { match task.run() { Ok(_) => info!("done"), Err(e) => warn!("{e}"), } } Ok(()) } pub fn todo_nanomonsv(&self) -> Vec { self.bam_pairs() .iter() .filter_map(|(diag, mrd)| { if self.vcf.vcfs.iter().any(|v| { v.caller == "nanomonsv" && v.id == diag.id && v.time == diag.time_point && (v.modified().unwrap_or_default() > diag.modified || v.modified().unwrap_or_default() > mrd.modified) }) { None } else { Some(CollectionsTasks::NanomonSV { id: diag.id.clone(), }) } }) .collect() } pub fn todo_mod_pileup(&self) -> Vec { let config = ModkitConfig::default(); self.bam .bams .iter() .filter_map(|b| { if self.modbases.modbases.iter().any(|mb| { mb.id == b.id && mb.time_point == b.time_point && mb.pileup_modif > b.modified }) { None } else { Some(CollectionsTasks::ModPileup { bam: b.path.clone(), config: config.clone(), }) } }) .collect() } pub fn todo_dmr_c_diag_mrd(&self) -> Vec { let config = ModkitConfig::default(); self.bam .ids() .iter() .filter_map(|id| { if let Ok((diag, mrd)) = self.modbases.get_diag_mrd(id, ModType::Mod5mC5hmC) { let dmr: Vec<&Dmr> = diag .dmr_files .iter() .filter(|dmr| dmr.base == "C" && dmr.vs == "mrd") .collect(); if dmr.len() == 1 { let dmr = dmr.first().unwrap(); if let Ok(metadata) = dmr.path.metadata() { if let Ok(modif) = metadata.modified() { let m: DateTime = modif.into(); if diag.pileup_modif > m || mrd.pileup_modif > m { return Some(CollectionsTasks::DMRCDiagMrd { id: id.clone(), config: config.clone(), }); } } } None } else { Some(CollectionsTasks::DMRCDiagMrd { id: id.clone(), config: config.clone(), }) } } else { None } }) .collect() } /// Generates pairs of diagnostic and MRD BAM files. /// /// This function performs the following steps: /// 1. Extracts and deduplicates IDs from all BAM files. /// 2. For each unique ID, attempts to find a pair of BAM files: /// - One labeled as "diag" (diagnostic) /// - One labeled as "mrd" (minimal residual disease) /// 3. Returns pairs where both "diag" and "mrd" BAMs are found. /// /// # Returns /// /// * `Vec<(bam::Bam, bam::Bam)>` - A vector of tuples, each containing a pair of BAM files /// (diagnostic and MRD) for a unique ID. /// pub fn bam_pairs(&self) -> Vec<(bam::WGSBam, bam::WGSBam)> { let mut ids: Vec = self.bam.bams.iter().map(|b| b.id.clone()).collect(); ids.sort(); ids.dedup(); ids.iter() .filter_map(|id| { match ( self.bam.get(id, "diag").first(), self.bam.get(id, "mrd").first(), ) { (Some(&diag), Some(&mrd)) => Some((diag.clone(), mrd.clone())), _ => None, } }) .collect() } /// Aggregates variant tasks based on BAM pairs and VCF files. /// /// This function performs the following operations: /// 1. Iterates through BAM pairs (DIAG/MRD). /// 2. Checks for the existence of a _constit.bytes.gz file for each pair. /// 3. If the file exists, compares its modification time with VCF files. /// 4. Creates variant tasks if the file is older than one of VCF or if it doesn't exist. /// /// # Arguments /// /// * `self` - The struct instance containing BAM pairs and VCF information. /// /// # Returns /// /// * `anyhow::Result>` - A Result containing a vector of `CollectionsTasks::Variants` /// if successful, or an error if file metadata cannot be accessed. /// // pub fn todo_variants_agg(&self) -> anyhow::Result> { // let mut tasks = Vec::new(); // let config = VariantsConfig::default(); // let vcfs_ids = self.vcf.group_by_id(); // for pair in &self.bam_pairs() { // if self.config.id_black_list.contains(&pair.0.id) { // continue; // } // let const_path = format!( // "{}/{}/diag/{}_constit.bytes.gz", // &config.result_dir, &pair.0.id, &pair.0.id // ); // let constit = Path::new(&const_path); // // if constit.exists() { // let vcfs: Vec<_> = vcfs_ids.iter().filter(|(id, _)| id == &pair.0.id).collect(); // if let Some((_, vcfs)) = vcfs.first() { // let mtime = constit // .metadata() // .context(format!("Can't access file metadata {const_path}."))? // .mtime(); // let n_new = vcfs // .iter() // .filter(|vcf| mtime < vcf.file_metadata.mtime()) // .count(); // if n_new > 0 { // tasks.push(CollectionsTasks::SomaticVariants { // id: pair.0.id.clone(), // config: config.clone(), // }); // } // } // } else { // tasks.push(CollectionsTasks::SomaticVariants { // id: pair.0.id.clone(), // config: config.clone(), // }); // } // } // Ok(tasks) // } /// Runs all tasks in the collection. /// /// This method attempts to execute each task in the collection. /// /// # Returns /// /// Returns `Ok(())` if the process completes without any critical errors, even if /// individual tasks fail. /// /// # Errors /// /// This function will return an error if: /// - Fetching todo tasks fails when the initial task list is empty. /// - Any critical error occurs during the execution process. /// /// Note that individual task failures do not cause this method to return an error. pub fn run(&mut self) -> anyhow::Result<()> { if self.tasks.is_empty() { self.todo().context("Failed to fetch todo tasks")?; if self.tasks.is_empty() { info!("No tasks to run"); return Ok(()); } } let n_tasks = self.tasks.len(); warn!("{n_tasks} tasks to run"); let mut completed_tasks = Vec::new(); for (i, task) in self.tasks.iter().enumerate() { warn!("Running task {}/{}", i + 1, n_tasks); info!("{task}"); match task.clone().run() { Ok(_) => { info!("Task completed successfully"); completed_tasks.push(i); } Err(err) => error!("Task failed: {}", err), } } // Remove completed tasks for &index in completed_tasks.iter().rev() { self.tasks.remove(index); } info!( "{} tasks completed, {} tasks remaining", completed_tasks.len(), self.tasks.len() ); Ok(()) } pub fn run_deepvariant(&mut self) -> anyhow::Result<()> { let tasks = self.todo_deepvariants(); let n_tasks = tasks.len(); warn!("{n_tasks} tasks to run"); for (i, tasks_chunk) in tasks.chunks_exact(2).enumerate() { match tasks_chunk { [a, b] => { warn!("Running task {}/{} and {}/{n_tasks}", i + 1, n_tasks, i + 2); info!("{a} and {b}"); let a = if let CollectionsTasks::DeepVariant { id, time_point, bam, .. } = a { CollectionsTasks::DeepVariant { id: id.to_string(), time_point: time_point.to_string(), bam: bam.to_string(), } } else { anyhow::bail!("Err") }; let b = if let CollectionsTasks::DeepVariant { id, time_point, bam, .. } = b { CollectionsTasks::DeepVariant { id: id.to_string(), time_point: time_point.to_string(), bam: bam.to_string(), } } else { anyhow::bail!("Err"); }; let handle1 = thread::spawn(|| a.run()); let handle2 = thread::spawn(|| b.run()); let _ = handle1.join().unwrap(); let _ = handle2.join().unwrap(); } [a] => { info!("Single task: ({})", a); let _ = a.clone().run(); } _ => (), } } Ok(()) } } #[derive(Clone, Debug)] pub enum CollectionsTasks { Align(FlowCellCase), DemuxAlign(Vec), CountBam { bam_path: String, count_dir: String, config: Config, }, Assemble { id: String, time_point: String, config: AssemblerConfig, }, ModPileup { bam: PathBuf, config: ModkitConfig, }, DMRCDiagMrd { id: String, config: ModkitConfig, }, DeepVariant { id: String, time_point: String, bam: String, }, ClairS { id: String, diag_bam: String, mrd_bam: String, }, NanomonSV { id: String, }, SomaticVariants { id: String, config: VariantsConfig, }, } impl CollectionsTasks { pub fn run(self) -> anyhow::Result<()> { match self { CollectionsTasks::Align(case) => { BasecallAlign::init(case.clone(), Config::default())?.run_pipe() } CollectionsTasks::DemuxAlign(cases) => { BasecallAlign::from_mux(cases, Config::default()) } CollectionsTasks::ModPileup { bam, config } => bed_methyl(bam, &config), CollectionsTasks::DeepVariant { id, time_point, .. } => { DeepVariant::initialize(&id, &time_point, Config::default())?.run() } CollectionsTasks::ClairS { id, .. } => { ClairS::initialize(&id, Config::default())?.run() } CollectionsTasks::NanomonSV { id, .. } => { NanomonSV::initialize(&id, Config::default())?.run() } CollectionsTasks::CountBam { bam_path, count_dir, config, } => par_whole_scan(&count_dir, &bam_path, &config), CollectionsTasks::SomaticVariants { id, config } => { RunVariantsAgg::new(id, config).run() } CollectionsTasks::Assemble { id, time_point, config, } => Assembler::new(id, time_point, config).run(), CollectionsTasks::DMRCDiagMrd { id, config } => dmr_c_mrd_diag(&id, &config), } } pub fn get_order(&self) -> u8 { match self { CollectionsTasks::Align(_) => 0, CollectionsTasks::DemuxAlign(_) => 1, CollectionsTasks::ModPileup { .. } => 2, CollectionsTasks::DMRCDiagMrd { .. } => 3, CollectionsTasks::CountBam { .. } => 4, CollectionsTasks::Assemble { .. } => 5, CollectionsTasks::DeepVariant { .. } => 6, CollectionsTasks::ClairS { .. } => 7, CollectionsTasks::NanomonSV { .. } => 8, CollectionsTasks::SomaticVariants { .. } => 9, } } } // Implement Display for CollectionsTasks impl fmt::Display for CollectionsTasks { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use CollectionsTasks::*; match self { Align(case) => write!( f, "Alignment task for: {} {} {} {}", case.id, case.time_point, case.barcode, case.pod_dir.display() ), DemuxAlign(cases) => write!( f, "Demultiplex and alignment task for: {}", cases .iter() .map(|c| format!("{} {} {}", c.id, c.time_point, c.barcode)) .collect::>() .join(", ") ), DeepVariant { id, time_point, bam, .. } => { write!( f, "DeepVariant calling task for {} {}, from bam: {}", id, time_point, bam ) } ClairS { id, diag_bam, mrd_bam, .. } => { write!( f, "ClairS calling task for {}, with diag_bam: {}, mrd_bam: {}", id, diag_bam, mrd_bam ) } NanomonSV { id } => { write!(f, "NanomonSV calling task for {id}") } CountBam { bam_path, count_dir, .. } => write!(f, "Whole bam count for bam: {bam_path} into {count_dir}"), SomaticVariants { id, .. } => write!(f, "Variants aggregation for {}", id), Assemble { id, time_point, .. } => { write!(f, "De novo assemblage for {} {}", id, time_point) } ModPileup { bam, .. } => write!(f, "ModPileup for {}", bam.display()), DMRCDiagMrd { id, .. } => write!(f, "DMR C methylation diag vs mrd for {id}"), } } } pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> { let mut last_n = Vec::with_capacity(3); let mut consecutive_same_count = 0; loop { let mut collection = Collections::new(config.clone()).context("Failed to create new Collections")?; collection.todo().context("Failed to get todo tasks")?; collection .tasks .iter() .for_each(|t| info!("Planned task: {t}")); let n_tasks = collection.tasks.len(); if n_tasks == 0 { info!("All results are up to date"); break; } if last_n.len() >= 2 && last_n.iter().rev().take(2).all(|&x| x == n_tasks) { consecutive_same_count += 1; if consecutive_same_count >= 2 { error!("Tasks are not progressing"); break; } } else { consecutive_same_count = 0; } last_n.push(n_tasks); if last_n.len() > 3 { last_n.remove(0); } collection.run().context("Failed to run collection tasks")?; } Ok(()) } pub trait Initialize: Sized { fn initialize(id: &str, config: Config) -> anyhow::Result; } pub trait InitializeSolo: Sized { fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result; } pub trait HasOutputs { fn has_output(&self, id: &str) -> (bool, bool); } pub trait Version { fn version(config: &Config) -> anyhow::Result; } pub trait LoadVariants { fn load_variants(&self) -> anyhow::Result; } pub fn exists_all(paths: Vec<&str>) -> anyhow::Result<()> { for path in paths.iter() { if !Path::new(path).exists() { anyhow::bail!("{path} should exist") } } Ok(()) }