use std::{ collections::HashMap, fmt, fs::{self, metadata}, path::{Path, PathBuf}, time::SystemTime, }; use anyhow::Context; use chrono::{DateTime, Utc}; use glob::glob; use log::{info, warn}; use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection}; use crate::{ callers::{ clairs::{ClairS, ClairSConfig}, deep_variant::{DeepVariant, DeepVariantConfig}, nanomonsv::{NanomonSV, NanomonSVConfig}, }, collection::pod5::FlowCellCase, commands::dorado::Dorado as BasecallAlign, config::Config, functions::{ assembler::{Assembler, AssemblerConfig}, variants::{Variants, VariantsConfig}, whole_scan::{WholeScan, WholeScanConfig}, }, }; pub mod bam; pub mod pod5; pub mod variants; pub mod vcf; #[derive(Debug, Clone)] pub struct CollectionsConfig { pub pod_dir: String, pub corrected_fc_path: String, pub result_dir: String, pub dict_file: String, pub min_diag_cov: f32, pub min_mrd_cov: f32, } impl Default for CollectionsConfig { fn default() -> Self { Self { pod_dir: "/data/run_data".to_string(), corrected_fc_path: "/data/flow_cells.tsv".to_string(), result_dir: "/data/longreads_basic_pipe".to_string(), dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(), min_diag_cov: 12.0, min_mrd_cov: 10.0, } } } #[derive(Debug)] pub struct Collections { pub config: CollectionsConfig, pub pod5: Pod5Collection, pub bam: BamCollection, pub vcf: VcfCollection, pub tasks: Vec, } impl Collections { pub fn new(config: CollectionsConfig) -> anyhow::Result { let CollectionsConfig { pod_dir, corrected_fc_path, result_dir, .. } = &config; let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?; let bam = BamCollection::new(result_dir); let vcf = VcfCollection::new(result_dir); Ok(Self { pod5, bam, vcf, tasks: Vec::new(), config, }) } pub fn todo(&mut self) -> anyhow::Result<()> { info!("Looking for base calling tasks..."); let mut tasks = Vec::new(); let mut to_demux = Vec::new(); for run in self.pod5.runs.iter() { for fc in run.flowcells.iter() { let acq_id = fc.pod5_info.acquisition_id.clone(); for case in fc.cases.iter() { let bams_ids: Vec = self .bam .get(&case.id, &case.time_point) .iter() .flat_map(|b| { b.composition .iter() .map(|c| c.0.clone()) .collect::>() }) .filter(|id| *id == acq_id) .collect(); if bams_ids.is_empty() { match fc.pod5_type { pod5::Pod5Type::Raw => to_demux.push(case.clone()), pod5::Pod5Type::Demuxed => { tasks.push(CollectionsTasks::Align(case.clone())) } } } } } } // Group for muxed and push task with all cases let mut grouped: HashMap> = HashMap::new(); for case in to_demux { grouped.entry(case.pod_dir.clone()).or_default().push(case); } grouped .into_values() .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data))); // Whole scan for bam in self .bam .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov) { let config = WholeScanConfig::default(); let scan_dir = format!( "{}/{}/{}/{}", &config.result_dir, bam.id, bam.time_point, config.scan_dir ); if PathBuf::from(&scan_dir).exists() { let dir_mod: DateTime = fs::metadata(&scan_dir)?.modified()?.into(); if bam.modified > dir_mod { fs::remove_dir_all(&scan_dir)?; } } if !PathBuf::from(&scan_dir).exists() { tasks.push(CollectionsTasks::WholeScan { id: bam.id, time_point: bam.time_point, bam: bam .path .to_str() .context("Cant convert path to string")? .to_string(), config: WholeScanConfig::default(), }); } } // Remove VCF anterior to BAM let vcf_by_id = self.vcf.group_by_id(); vcf_by_id.iter().for_each(|(id, vcfs)| { if let (Some(diag), Some(mrd)) = ( self.bam.get(id, "diag").first(), self.bam.get(id, "mrd").first(), ) { let diag_modified = diag.modified; let mrd_modified = mrd.modified; let mut rm_paths: Vec<&Path> = vcfs .iter() .flat_map(|vcf| { let vcf_mod: DateTime = vcf .file_metadata .modified() .expect("Can't read VCF modified time.") .into(); // For somatic caller erase if one bam (diag or mrd) is more recent. if vcf.caller != "DeepVariant" { if vcf_mod < diag_modified || vcf_mod < mrd_modified { vec![vcf.path.parent().unwrap()] } else { vec![] } } else if (vcf.time_point == "diag" && vcf_mod < diag_modified) || (vcf.time_point == "mrd" && vcf_mod < mrd_modified) { vec![vcf.path.parent().unwrap()] } else { vec![] } }) .collect(); rm_paths.sort(); rm_paths.dedup(); rm_paths .iter() .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir.")) } }); // Variant calling info!("Looking for variant calling tasks..."); self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| { if let (Some(diag), Some(mrd)) = ( self.bam.get(&id, "diag").first(), self.bam.get(&id, "mrd").first(), ) { if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) { if diag_cramino.mean_coverage >= self.config.min_diag_cov.into() && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into() { let caller_time: Vec<(&str, &str)> = vcf_by_id .iter() .filter(|(i, _)| *i == id) .flat_map(|(_, vcfs)| { vcfs.iter() .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str())) }) .collect(); if !caller_time.contains(&("clairs", "diag")) || !caller_time.contains(&("clairs_indel", "diag")) { tasks.push(CollectionsTasks::ClairS { id: id.to_string(), diag_bam: diag.path.to_str().unwrap().to_string(), mrd_bam: mrd.path.to_str().unwrap().to_string(), config: ClairSConfig::default(), }); } if !caller_time.contains(&("DeepVariant", "diag")) { tasks.push(CollectionsTasks::DeepVariant { id: id.to_string(), time_point: "diag".to_string(), bam: diag.path.to_str().unwrap().to_string(), config: DeepVariantConfig::default(), }); } if !caller_time.contains(&("DeepVariant", "mrd")) { tasks.push(CollectionsTasks::DeepVariant { id: id.to_string(), time_point: "mrd".to_string(), bam: mrd.path.to_str().unwrap().to_string(), config: DeepVariantConfig::default(), }); } if !caller_time.contains(&("nanomonsv", "diag")) { tasks.push(CollectionsTasks::NanomonSV { id: id.to_string(), diag_bam: diag.path.to_str().unwrap().to_string(), mrd_bam: mrd.path.to_str().unwrap().to_string(), config: NanomonSVConfig::default(), }); } } } } }); // Variants aggregation // info!("Looking for variants aggregation tasks..."); // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| { // let id = bam.id; // }); // de novo tasks.extend(self.todo_assembler()?); // Tasks sorting and dedup let mut hs = HashMap::new(); tasks.into_iter().for_each(|t| { hs.insert(t.to_string(), t); }); self.tasks = hs.into_values().collect(); Ok(()) } pub fn tasks_dedup(&mut self) { let mut hs = HashMap::new(); self.tasks.clone().into_iter().for_each(|t| { hs.insert(t.to_string(), t); }); self.tasks = hs.into_values().collect(); } pub fn todo_assembler(&mut self) -> anyhow::Result> { let mut tasks = Vec::new(); let config = AssemblerConfig::default(); for b in &self.bam.bams { let assemblies_dir = format!( "{}/{}/{}/{}", config.result_dir, b.id, b.time_point, config.output_dir_name ); if !Path::new(&assemblies_dir).exists() { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); continue; } let pattern = format!("{assemblies_dir}/*/*.bam"); let mut mtimes: Vec = glob(&pattern)? .filter_map(|entry| entry.ok()) .filter_map(|path| metadata(path).ok()?.modified().ok()) .collect(); if mtimes.is_empty() { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); continue; } mtimes.sort_unstable(); mtimes.dedup(); let max_mtime: DateTime = mtimes.last().context("No modified time")?.to_owned().into(); if b.modified > max_mtime { tasks.push(CollectionsTasks::Assemble { id: b.id.clone(), time_point: b.time_point.clone(), config: config.clone(), }); } } Ok(tasks) } pub fn run(&mut self) -> anyhow::Result<()> { // self.tasks.reverse(); if self.tasks.is_empty() { self.todo()?; if self.tasks.is_empty() { return Ok(()); } else { self.run()?; } } else { let n_tasks = self.tasks.len(); warn!("{n_tasks} tasks to run"); let mut i = 1; while let Some(task) = self.tasks.pop() { warn!("Running {i}/{n_tasks}"); info!("{task:#?}"); task.run()?; i += 1; } } Ok(()) } } use strum_macros::EnumOrd; #[derive(Debug, Clone, EnumOrd)] pub enum CollectionsTasks { Align(FlowCellCase), DemuxAlign(Vec), WholeScan { id: String, time_point: String, bam: String, config: WholeScanConfig, }, Assemble { id: String, time_point: String, config: AssemblerConfig, }, DeepVariant { id: String, time_point: String, bam: String, config: DeepVariantConfig, }, ClairS { id: String, diag_bam: String, mrd_bam: String, config: ClairSConfig, }, NanomonSV { id: String, diag_bam: String, mrd_bam: String, config: NanomonSVConfig, }, Variants { id: String, config: VariantsConfig, }, } impl CollectionsTasks { pub fn run(self) -> anyhow::Result<()> { match self { CollectionsTasks::Align(case) => { BasecallAlign::init(case.clone(), Config::default())?.run_pipe()?; } CollectionsTasks::DemuxAlign(cases) => { BasecallAlign::from_mux(cases, Config::default())?; } CollectionsTasks::DeepVariant { id, time_point, bam, config, } => { DeepVariant::new(&id, &time_point, &bam, config).run(); } CollectionsTasks::ClairS { id, diag_bam, mrd_bam, config, } => { ClairS::new(&id, &diag_bam, &mrd_bam, config).run(); } CollectionsTasks::NanomonSV { id, diag_bam, mrd_bam, config, } => { NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run(); } CollectionsTasks::WholeScan { id, time_point, bam, config, } => { WholeScan::new(id, time_point, bam, config)?.run()?; } CollectionsTasks::Variants { id, config } => { Variants::new(id, config).run()?; } CollectionsTasks::Assemble { id, time_point, config, } => { Assembler::new(id, time_point, config).run()?; } } Ok(()) } } // Implement Display for CollectionsTasks impl fmt::Display for CollectionsTasks { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use CollectionsTasks::*; match self { Align(case) => write!(f, "Align task with: {:#?}", case), DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases), DeepVariant { id, time_point, bam, .. } => { write!( f, "DeepVariant task with id: {}, time_point: {}, bam: {}", id, time_point, bam ) } ClairS { id, diag_bam, mrd_bam, .. } => { write!( f, "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}", id, diag_bam, mrd_bam ) } NanomonSV { id, diag_bam, mrd_bam, .. } => { write!( f, "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}", id, diag_bam, mrd_bam ) } WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam), Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id), Assemble { id, time_point, .. } => { write!(f, "Assembly for id: {}, time point: {}", id, time_point) } } } } pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> { let mut last_n = Vec::new(); loop { let mut collection = Collections::new(config.clone())?; collection.todo()?; if collection.tasks.is_empty() { warn!("All results are update"); break; } let n_tasks = collection.tasks.len(); warn!("{n_tasks} tasks to run"); if last_n.len() > 2 && last_n[last_n.len() - 1] == n_tasks && last_n[last_n.len() - 2] == n_tasks { warn!("Tasks don't progress"); break; } last_n.push(n_tasks); collection.run()?; } Ok(()) }