| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934 |
- use std::{
- collections::HashMap,
- fmt,
- fs::{self, metadata},
- path::{Path, PathBuf},
- thread,
- time::SystemTime,
- };
- use anyhow::Context;
- use chrono::{DateTime, Utc};
- use glob::glob;
- use log::{error, info, warn};
- use modbases::{Dmr, ModBasesCollection, ModType};
- use pandora_lib_variants::variants::Variants;
- use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
- use crate::{
- callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV},
- collection::pod5::FlowCellCase,
- commands::{
- dorado::Dorado as BasecallAlign,
- modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig},
- },
- config::Config,
- functions::{
- assembler::{Assembler, AssemblerConfig},
- variants::{RunVariantsAgg, VariantsConfig},
- },
- runners::Run, scan::scan::{par_whole_scan, par_whole_scan_local},
- };
- pub mod bam;
- pub mod modbases;
- pub mod pod5;
- pub mod vcf;
- #[derive(Debug, Clone)]
- pub struct CollectionsConfig {
- pub pod_dir: String,
- pub corrected_fc_path: String,
- pub result_dir: String,
- pub dict_file: String,
- pub min_diag_cov: f32,
- pub min_mrd_cov: f32,
- pub id_black_list: Vec<String>,
- }
- impl Default for CollectionsConfig {
- fn default() -> Self {
- Self {
- pod_dir: "/data/run_data".to_string(),
- corrected_fc_path: "/data/flow_cells.tsv".to_string(),
- result_dir: "/data/longreads_basic_pipe".to_string(),
- dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
- min_diag_cov: 11.0,
- min_mrd_cov: 10.0,
- id_black_list: Vec::default(),
- }
- }
- }
- #[derive(Debug)]
- pub struct Collections {
- pub config: CollectionsConfig,
- pub pod5: Pod5Collection,
- pub bam: BamCollection,
- pub vcf: VcfCollection,
- pub modbases: ModBasesCollection,
- pub tasks: Vec<CollectionsTasks>,
- }
- impl Collections {
- pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
- let CollectionsConfig {
- pod_dir,
- corrected_fc_path,
- result_dir,
- ..
- } = &config;
- let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
- // let pod5 = Pod5Collection::default();
- let bam = BamCollection::new(result_dir);
- let vcf = VcfCollection::new(result_dir);
- let modbases = ModBasesCollection::new(result_dir);
- Ok(Self {
- pod5,
- bam,
- vcf,
- modbases,
- tasks: Vec::new(),
- config,
- })
- }
- pub fn todo(&mut self) -> anyhow::Result<()> {
- info!("Looking for base calling tasks...");
- let mut tasks = Vec::new();
- let mut to_demux = Vec::new();
- for run in self.pod5.runs.iter() {
- for fc in run.flowcells.iter() {
- let acq_id = fc.pod5_info.acquisition_id.clone();
- for case in fc.cases.iter() {
- let bams_ids: Vec<String> = self
- .bam
- .get(&case.id, &case.time_point)
- .iter()
- .flat_map(|b| {
- b.composition
- .iter()
- .map(|c| c.0.clone())
- .collect::<Vec<String>>()
- })
- .filter(|id| *id == acq_id)
- .collect();
- if bams_ids.is_empty() {
- match fc.pod5_type {
- pod5::Pod5Type::Raw => to_demux.push(case.clone()),
- pod5::Pod5Type::Demuxed => {
- tasks.push(CollectionsTasks::Align(case.clone()))
- }
- }
- }
- }
- }
- }
- // Group for muxed and push task with all cases
- let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
- for case in to_demux {
- grouped.entry(case.pod_dir.clone()).or_default().push(case);
- }
- grouped
- .into_values()
- .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
- // de novo
- // tasks.extend(self.todo_assembler()?);
- // Remove VCF anterior to BAM
- // let vcf_by_id = self.vcf.group_by_id();
- // vcf_by_id.iter().for_each(|(id, vcfs)| {
- // if let (Some(diag), Some(mrd)) = (
- // self.bam.get(id, "diag").first(),
- // self.bam.get(id, "mrd").first(),
- // ) {
- // let diag_modified = diag.modified;
- // let mrd_modified = mrd.modified;
- // let mut rm_paths: Vec<&Path> = vcfs
- // .iter()
- // .flat_map(|vcf| {
- // let vcf_mod: DateTime<Utc> = vcf
- // .file_metadata
- // .modified()
- // .expect("Can't read VCF modified time.")
- // .into();
- //
- // // For somatic caller erase if one bam (diag or mrd) is more recent.
- // if vcf.caller != "DeepVariant" {
- // if vcf_mod < diag_modified || vcf_mod < mrd_modified {
- // vec![vcf.path.parent().unwrap()]
- // } else {
- // vec![]
- // }
- // } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
- // || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
- // {
- // vec![vcf.path.parent().unwrap()]
- // } else {
- // vec![]
- // }
- // })
- // .collect();
- // rm_paths.sort();
- // rm_paths.dedup();
- // rm_paths
- // .iter()
- // .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
- // }
- // });
- // Variant calling
- info!("Looking for variant calling tasks...");
- // self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
- // if let (Some(diag), Some(mrd)) = (
- // self.bam.get(&id, "diag").first(),
- // self.bam.get(&id, "mrd").first(),
- // ) {
- // if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
- // if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
- // && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
- // && !self.config.id_black_list.contains(&id)
- // {
- // let caller_time: Vec<(&str, &str)> = vcf_by_id
- // .iter()
- // .filter(|(i, _)| *i == id)
- // .flat_map(|(_, vcfs)| {
- // vcfs.iter()
- // .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
- // })
- // .collect();
- //
- // if !caller_time.contains(&("clairs", "diag"))
- // || !caller_time.contains(&("clairs_indel", "diag"))
- // {
- // tasks.push(CollectionsTasks::ClairS {
- // id: id.to_string(),
- // diag_bam: diag.path.to_str().unwrap().to_string(),
- // mrd_bam: mrd.path.to_str().unwrap().to_string(),
- // config: ClairSConfig::default(),
- // });
- // }
- // if !caller_time.contains(&("DeepVariant", "diag")) {
- // tasks.push(CollectionsTasks::DeepVariant {
- // id: id.to_string(),
- // time_point: "diag".to_string(),
- // bam: diag.path.to_str().unwrap().to_string(),
- // config: DeepVariantConfig::default(),
- // });
- // }
- // if !caller_time.contains(&("DeepVariant", "mrd")) {
- // tasks.push(CollectionsTasks::DeepVariant {
- // id: id.to_string(),
- // time_point: "mrd".to_string(),
- // bam: mrd.path.to_str().unwrap().to_string(),
- // config: DeepVariantConfig::default(),
- // });
- // }
- // if !caller_time.contains(&("nanomonsv", "diag")) {
- // tasks.push(CollectionsTasks::NanomonSV {
- // id: id.to_string(),
- // diag_bam: diag.path.to_str().unwrap().to_string(),
- // mrd_bam: mrd.path.to_str().unwrap().to_string(),
- // config: NanomonSVConfig::default(),
- // });
- // }
- // }
- // }
- // }
- // });
- // Tasks dedup
- let mut hs = HashMap::new();
- tasks.into_iter().for_each(|t| {
- hs.insert(t.to_string(), t);
- });
- self.tasks = hs.into_values().collect();
- // Variants DeepVariant
- // self.tasks.extend(self.todo_deepvariants());
- // Variants ClairS
- // self.tasks.extend(self.todo_clairs());
- // Variants Nanomonsv
- // self.tasks.extend(self.todo_nanomonsv());
- // Variants aggregation
- // self.tasks.extend(self.todo_variants_agg()?);
- // ModPileup
- // self.tasks.extend(self.todo_mod_pileup());
- // DMR C diag vs mrd
- // self.tasks.extend(self.todo_dmr_c_diag_mrd());
- // Tasks sorting
- self.tasks.sort_by_cached_key(|task| task.get_order());
- Ok(())
- }
- pub fn tasks_dedup(&mut self) {
- let mut hs = HashMap::new();
- self.tasks.clone().into_iter().for_each(|t| {
- hs.insert(t.to_string(), t);
- });
- self.tasks = hs.into_values().collect();
- }
- pub fn todo_bam_count(&mut self, config: &Config) -> anyhow::Result<()> {
- // Whole scan
- for wgs_bam in self
- .bam
- .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
- {
- let id = wgs_bam.id.as_str();
- let count_dir = match wgs_bam.time_point.as_str() {
- "diag" => config.tumoral_dir_count(id),
- "mrd" => config.normal_dir_count(id),
- _ => anyhow::bail!("Unknown bam time point {}", wgs_bam.time_point),
- };
- if PathBuf::from(&count_dir).exists() {
- let dir_mod: DateTime<Utc> = fs::metadata(&count_dir)?.modified()?.into();
- if wgs_bam.modified > dir_mod {
- fs::remove_dir_all(&count_dir)?;
- }
- }
- if !PathBuf::from(&count_dir).exists() {
- self.tasks.push(CollectionsTasks::CountBam {
- bam_path: wgs_bam.path.to_string_lossy().to_string(),
- count_dir,
- config: config.clone(),
- });
- }
- }
- Ok(())
- }
- // No pair needed
- pub fn todo_assembler(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
- let mut tasks = Vec::new();
- let config = AssemblerConfig::default();
- for b in &self.bam.bams {
- let assemblies_dir = format!(
- "{}/{}/{}/{}",
- config.result_dir, b.id, b.time_point, config.output_dir_name
- );
- if !Path::new(&assemblies_dir).exists() {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- continue;
- }
- let pattern = format!("{assemblies_dir}/*/*.bam");
- let mut mtimes: Vec<SystemTime> = glob(&pattern)?
- .filter_map(|entry| entry.ok())
- .filter_map(|path| metadata(path).ok()?.modified().ok())
- .collect();
- if mtimes.is_empty() {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- continue;
- }
- mtimes.sort_unstable();
- mtimes.dedup();
- let max_mtime: DateTime<Utc> =
- mtimes.last().context("No modified time")?.to_owned().into();
- if b.modified > max_mtime {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- }
- }
- Ok(tasks)
- }
- pub fn todo_deepvariants(&self) -> Vec<CollectionsTasks> {
- self.bam
- .bams
- .iter()
- .filter_map(|b| {
- if self.vcf.vcfs.iter().any(|v| {
- v.caller == "DeepVariant"
- && v.id == b.id
- && v.time == b.time_point
- && v.modified().unwrap_or_default() > b.modified
- }) {
- None
- } else {
- Some(CollectionsTasks::DeepVariant {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- bam: b.path.to_string_lossy().to_string(),
- })
- }
- })
- .collect()
- }
- pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
- self.bam_pairs()
- .iter()
- .filter_map(|(diag, mrd)| {
- if self.vcf.vcfs.iter().any(|v| {
- v.caller == "clairs"
- && v.id == diag.id
- && v.time == diag.time_point
- && (v.modified().unwrap_or_default() > diag.modified
- || v.modified().unwrap_or_default() > mrd.modified)
- }) {
- None
- } else {
- Some(CollectionsTasks::ClairS {
- id: diag.id.clone(),
- diag_bam: diag.path.to_string_lossy().to_string(),
- mrd_bam: mrd.path.to_string_lossy().to_string(),
- })
- }
- })
- .collect()
- }
- pub fn run_clairs(&self) -> anyhow::Result<()> {
- for task in self.todo_clairs() {
- match task.run() {
- Ok(_) => info!("done"),
- Err(e) => warn!("{e}"),
- }
- }
- Ok(())
- }
- pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
- self.bam_pairs()
- .iter()
- .filter_map(|(diag, mrd)| {
- if self.vcf.vcfs.iter().any(|v| {
- v.caller == "nanomonsv"
- && v.id == diag.id
- && v.time == diag.time_point
- && (v.modified().unwrap_or_default() > diag.modified
- || v.modified().unwrap_or_default() > mrd.modified)
- }) {
- None
- } else {
- Some(CollectionsTasks::NanomonSV {
- id: diag.id.clone(),
- })
- }
- })
- .collect()
- }
- pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
- let config = ModkitConfig::default();
- self.bam
- .bams
- .iter()
- .filter_map(|b| {
- if self.modbases.modbases.iter().any(|mb| {
- mb.id == b.id && mb.time_point == b.time_point && mb.pileup_modif > b.modified
- }) {
- None
- } else {
- Some(CollectionsTasks::ModPileup {
- bam: b.path.clone(),
- config: config.clone(),
- })
- }
- })
- .collect()
- }
- pub fn todo_dmr_c_diag_mrd(&self) -> Vec<CollectionsTasks> {
- let config = ModkitConfig::default();
- self.bam
- .ids()
- .iter()
- .filter_map(|id| {
- if let Ok((diag, mrd)) = self.modbases.get_diag_mrd(id, ModType::Mod5mC5hmC) {
- let dmr: Vec<&Dmr> = diag
- .dmr_files
- .iter()
- .filter(|dmr| dmr.base == "C" && dmr.vs == "mrd")
- .collect();
- if dmr.len() == 1 {
- let dmr = dmr.first().unwrap();
- if let Ok(metadata) = dmr.path.metadata() {
- if let Ok(modif) = metadata.modified() {
- let m: DateTime<Utc> = modif.into();
- if diag.pileup_modif > m || mrd.pileup_modif > m {
- return Some(CollectionsTasks::DMRCDiagMrd {
- id: id.clone(),
- config: config.clone(),
- });
- }
- }
- }
- None
- } else {
- Some(CollectionsTasks::DMRCDiagMrd {
- id: id.clone(),
- config: config.clone(),
- })
- }
- } else {
- None
- }
- })
- .collect()
- }
- /// Generates pairs of diagnostic and MRD BAM files.
- ///
- /// This function performs the following steps:
- /// 1. Extracts and deduplicates IDs from all BAM files.
- /// 2. For each unique ID, attempts to find a pair of BAM files:
- /// - One labeled as "diag" (diagnostic)
- /// - One labeled as "mrd" (minimal residual disease)
- /// 3. Returns pairs where both "diag" and "mrd" BAMs are found.
- ///
- /// # Returns
- ///
- /// * `Vec<(bam::Bam, bam::Bam)>` - A vector of tuples, each containing a pair of BAM files
- /// (diagnostic and MRD) for a unique ID.
- ///
- pub fn bam_pairs(&self) -> Vec<(bam::WGSBam, bam::WGSBam)> {
- let mut ids: Vec<String> = self.bam.bams.iter().map(|b| b.id.clone()).collect();
- ids.sort();
- ids.dedup();
- ids.iter()
- .filter_map(|id| {
- match (
- self.bam.get(id, "diag").first(),
- self.bam.get(id, "mrd").first(),
- ) {
- (Some(&diag), Some(&mrd)) => Some((diag.clone(), mrd.clone())),
- _ => None,
- }
- })
- .collect()
- }
- /// Aggregates variant tasks based on BAM pairs and VCF files.
- ///
- /// This function performs the following operations:
- /// 1. Iterates through BAM pairs (DIAG/MRD).
- /// 2. Checks for the existence of a _constit.bytes.gz file for each pair.
- /// 3. If the file exists, compares its modification time with VCF files.
- /// 4. Creates variant tasks if the file is older than one of VCF or if it doesn't exist.
- ///
- /// # Arguments
- ///
- /// * `self` - The struct instance containing BAM pairs and VCF information.
- ///
- /// # Returns
- ///
- /// * `anyhow::Result<Vec<CollectionsTasks>>` - A Result containing a vector of `CollectionsTasks::Variants`
- /// if successful, or an error if file metadata cannot be accessed.
- ///
- // pub fn todo_variants_agg(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
- // let mut tasks = Vec::new();
- // let config = VariantsConfig::default();
- // let vcfs_ids = self.vcf.group_by_id();
- // for pair in &self.bam_pairs() {
- // if self.config.id_black_list.contains(&pair.0.id) {
- // continue;
- // }
- // let const_path = format!(
- // "{}/{}/diag/{}_constit.bytes.gz",
- // &config.result_dir, &pair.0.id, &pair.0.id
- // );
- // let constit = Path::new(&const_path);
- //
- // if constit.exists() {
- // let vcfs: Vec<_> = vcfs_ids.iter().filter(|(id, _)| id == &pair.0.id).collect();
- // if let Some((_, vcfs)) = vcfs.first() {
- // let mtime = constit
- // .metadata()
- // .context(format!("Can't access file metadata {const_path}."))?
- // .mtime();
- // let n_new = vcfs
- // .iter()
- // .filter(|vcf| mtime < vcf.file_metadata.mtime())
- // .count();
- // if n_new > 0 {
- // tasks.push(CollectionsTasks::SomaticVariants {
- // id: pair.0.id.clone(),
- // config: config.clone(),
- // });
- // }
- // }
- // } else {
- // tasks.push(CollectionsTasks::SomaticVariants {
- // id: pair.0.id.clone(),
- // config: config.clone(),
- // });
- // }
- // }
- // Ok(tasks)
- // }
- /// Runs all tasks in the collection.
- ///
- /// This method attempts to execute each task in the collection.
- ///
- /// # Returns
- ///
- /// Returns `Ok(())` if the process completes without any critical errors, even if
- /// individual tasks fail.
- ///
- /// # Errors
- ///
- /// This function will return an error if:
- /// - Fetching todo tasks fails when the initial task list is empty.
- /// - Any critical error occurs during the execution process.
- ///
- /// Note that individual task failures do not cause this method to return an error.
- pub fn run(&mut self) -> anyhow::Result<()> {
- if self.tasks.is_empty() {
- self.todo().context("Failed to fetch todo tasks")?;
- if self.tasks.is_empty() {
- info!("No tasks to run");
- return Ok(());
- }
- }
- let n_tasks = self.tasks.len();
- warn!("{n_tasks} tasks to run");
- let mut completed_tasks = Vec::new();
- for (i, task) in self.tasks.iter().enumerate() {
- warn!("Running task {}/{}", i + 1, n_tasks);
- info!("{task}");
- match task.clone().run() {
- Ok(_) => {
- info!("Task completed successfully");
- completed_tasks.push(i);
- }
- Err(err) => error!("Task failed: {}", err),
- }
- }
- // Remove completed tasks
- for &index in completed_tasks.iter().rev() {
- self.tasks.remove(index);
- }
- info!(
- "{} tasks completed, {} tasks remaining",
- completed_tasks.len(),
- self.tasks.len()
- );
- Ok(())
- }
- pub fn run_deepvariant(&mut self) -> anyhow::Result<()> {
- let tasks = self.todo_deepvariants();
- let n_tasks = tasks.len();
- warn!("{n_tasks} tasks to run");
- for (i, tasks_chunk) in tasks.chunks_exact(2).enumerate() {
- match tasks_chunk {
- [a, b] => {
- warn!("Running task {}/{} and {}/{n_tasks}", i + 1, n_tasks, i + 2);
- info!("{a} and {b}");
- let a = if let CollectionsTasks::DeepVariant {
- id,
- time_point,
- bam,
- ..
- } = a
- {
- CollectionsTasks::DeepVariant {
- id: id.to_string(),
- time_point: time_point.to_string(),
- bam: bam.to_string(),
- }
- } else {
- anyhow::bail!("Err")
- };
- let b = if let CollectionsTasks::DeepVariant {
- id,
- time_point,
- bam,
- ..
- } = b
- {
- CollectionsTasks::DeepVariant {
- id: id.to_string(),
- time_point: time_point.to_string(),
- bam: bam.to_string(),
- }
- } else {
- anyhow::bail!("Err");
- };
- let handle1 = thread::spawn(|| a.run());
- let handle2 = thread::spawn(|| b.run());
- let _ = handle1.join().unwrap();
- let _ = handle2.join().unwrap();
- }
- [a] => {
- info!("Single task: ({})", a);
- let _ = a.clone().run();
- }
- _ => (),
- }
- }
- Ok(())
- }
- }
- #[derive(Clone, Debug)]
- pub enum CollectionsTasks {
- Align(FlowCellCase),
- DemuxAlign(Vec<FlowCellCase>),
- CountBam {
- bam_path: String,
- count_dir: String,
- config: Config,
- },
- Assemble {
- id: String,
- time_point: String,
- config: AssemblerConfig,
- },
- ModPileup {
- bam: PathBuf,
- config: ModkitConfig,
- },
- DMRCDiagMrd {
- id: String,
- config: ModkitConfig,
- },
- DeepVariant {
- id: String,
- time_point: String,
- bam: String,
- },
- ClairS {
- id: String,
- diag_bam: String,
- mrd_bam: String,
- },
- NanomonSV {
- id: String,
- },
- SomaticVariants {
- id: String,
- config: VariantsConfig,
- },
- }
- impl CollectionsTasks {
- pub fn run(self) -> anyhow::Result<()> {
- match self {
- CollectionsTasks::Align(case) => {
- BasecallAlign::init(case.clone(), Config::default())?.run_pipe()
- }
- CollectionsTasks::DemuxAlign(cases) => {
- BasecallAlign::from_mux(cases, Config::default())
- }
- CollectionsTasks::ModPileup { bam, config } => bed_methyl(bam, &config),
- CollectionsTasks::DeepVariant { id, time_point, .. } => {
- DeepVariant::initialize(&id, &time_point, Config::default())?.run()
- }
- CollectionsTasks::ClairS { id, .. } => {
- ClairS::initialize(&id, Config::default())?.run()
- }
- CollectionsTasks::NanomonSV { id, .. } => {
- NanomonSV::initialize(&id, Config::default())?.run()
- }
- CollectionsTasks::CountBam {
- bam_path,
- count_dir,
- config,
- } => par_whole_scan(&count_dir, &bam_path, &config),
- CollectionsTasks::SomaticVariants { id, config } => {
- RunVariantsAgg::new(id, config).run()
- }
- CollectionsTasks::Assemble {
- id,
- time_point,
- config,
- } => Assembler::new(id, time_point, config).run(),
- CollectionsTasks::DMRCDiagMrd { id, config } => dmr_c_mrd_diag(&id, &config),
- }
- }
- pub fn get_order(&self) -> u8 {
- match self {
- CollectionsTasks::Align(_) => 0,
- CollectionsTasks::DemuxAlign(_) => 1,
- CollectionsTasks::ModPileup { .. } => 2,
- CollectionsTasks::DMRCDiagMrd { .. } => 3,
- CollectionsTasks::CountBam { .. } => 4,
- CollectionsTasks::Assemble { .. } => 5,
- CollectionsTasks::DeepVariant { .. } => 6,
- CollectionsTasks::ClairS { .. } => 7,
- CollectionsTasks::NanomonSV { .. } => 8,
- CollectionsTasks::SomaticVariants { .. } => 9,
- }
- }
- }
- // Implement Display for CollectionsTasks
- impl fmt::Display for CollectionsTasks {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- use CollectionsTasks::*;
- match self {
- Align(case) => write!(
- f,
- "Alignment task for: {} {} {} {}",
- case.id,
- case.time_point,
- case.barcode,
- case.pod_dir.display()
- ),
- DemuxAlign(cases) => write!(
- f,
- "Demultiplex and alignment task for: {}",
- cases
- .iter()
- .map(|c| format!("{} {} {}", c.id, c.time_point, c.barcode))
- .collect::<Vec<String>>()
- .join(", ")
- ),
- DeepVariant {
- id,
- time_point,
- bam,
- ..
- } => {
- write!(
- f,
- "DeepVariant calling task for {} {}, from bam: {}",
- id, time_point, bam
- )
- }
- ClairS {
- id,
- diag_bam,
- mrd_bam,
- ..
- } => {
- write!(
- f,
- "ClairS calling task for {}, with diag_bam: {}, mrd_bam: {}",
- id, diag_bam, mrd_bam
- )
- }
- NanomonSV { id } => {
- write!(f, "NanomonSV calling task for {id}")
- }
- CountBam {
- bam_path,
- count_dir,
- ..
- } => write!(f, "Whole bam count for bam: {bam_path} into {count_dir}"),
- SomaticVariants { id, .. } => write!(f, "Variants aggregation for {}", id),
- Assemble { id, time_point, .. } => {
- write!(f, "De novo assemblage for {} {}", id, time_point)
- }
- ModPileup { bam, .. } => write!(f, "ModPileup for {}", bam.display()),
- DMRCDiagMrd { id, .. } => write!(f, "DMR C methylation diag vs mrd for {id}"),
- }
- }
- }
- pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
- let mut last_n = Vec::with_capacity(3);
- let mut consecutive_same_count = 0;
- loop {
- let mut collection =
- Collections::new(config.clone()).context("Failed to create new Collections")?;
- collection.todo().context("Failed to get todo tasks")?;
- collection
- .tasks
- .iter()
- .for_each(|t| info!("Planned task: {t}"));
- let n_tasks = collection.tasks.len();
- if n_tasks == 0 {
- info!("All results are up to date");
- break;
- }
- if last_n.len() >= 2 && last_n.iter().rev().take(2).all(|&x| x == n_tasks) {
- consecutive_same_count += 1;
- if consecutive_same_count >= 2 {
- error!("Tasks are not progressing");
- break;
- }
- } else {
- consecutive_same_count = 0;
- }
- last_n.push(n_tasks);
- if last_n.len() > 3 {
- last_n.remove(0);
- }
- collection.run().context("Failed to run collection tasks")?;
- }
- Ok(())
- }
- pub trait Initialize: Sized {
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self>;
- }
- pub trait InitializeSolo: Sized {
- fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self>;
- }
- pub trait HasOutputs {
- fn has_output(&self, id: &str) -> (bool, bool);
- }
- pub trait Version {
- fn version(config: &Config) -> anyhow::Result<String>;
- }
- pub trait LoadVariants {
- fn load_variants(&self) -> anyhow::Result<Variants>;
- }
- pub fn exists_all(paths: Vec<&str>) -> anyhow::Result<()> {
- for path in paths.iter() {
- if !Path::new(path).exists() {
- anyhow::bail!("{path} should exist")
- }
- }
- Ok(())
- }
|