Thomas 1 anno fa
parent
commit
092d2ae2dd
4 ha cambiato i file con 161 aggiunte e 102 eliminazioni
  1. 2 2
      src/callers/clairs.rs
  2. 153 85
      src/collection/mod.rs
  3. 1 1
      src/commands/bcftools.rs
  4. 5 14
      src/lib.rs

+ 2 - 2
src/callers/clairs.rs

@@ -43,9 +43,9 @@ impl ClairS {
     pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: ClairSConfig) -> Self {
         let output_dir = format!("{}/{}/diag/ClairS", config.result_dir, id);
         let output_vcf = format!("{output_dir}/output.vcf.gz");
-        let output_indel = format!("${output_dir}/indel.vcf.gz");
+        let output_indel = format!("{output_dir}/indel.vcf.gz");
         let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
-        let indel_vcf_passed = format!("${output_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
+        let indel_vcf_passed = format!("{output_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
 
         let log_dir = format!("{}/{}/log/ClairS", config.result_dir, id);
         Self {

+ 153 - 85
src/collection/mod.rs

@@ -1,4 +1,7 @@
-use std::path::PathBuf;
+use std::{
+    fs,
+    path::{Path, PathBuf},
+};
 
 use hashbrown::HashMap;
 use log::{info, warn};
@@ -17,9 +20,29 @@ use crate::{
 
 pub mod bam;
 pub mod pod5;
-pub mod vcf;
 pub mod somatic_variants;
+pub mod vcf;
+
+#[derive(Debug)]
+pub struct CollectionsConfig {
+    pub pod_dir: String,
+    pub corrected_fc_path: String,
+    pub result_dir: String,
+    pub min_diag_cov: f32,
+    pub min_mrd_cov: f32,
+}
 
+impl Default for CollectionsConfig {
+    fn default() -> Self {
+        Self {
+            pod_dir: "/data/run_data".to_string(),
+            corrected_fc_path: "/data/flow_cells.tsv".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            min_diag_cov: 15.0,
+            min_mrd_cov: 10.0,
+        }
+    }
+}
 #[derive(Debug)]
 pub struct Collections {
     pub pod5: Pod5Collection,
@@ -42,7 +65,7 @@ impl Collections {
         })
     }
 
-    pub fn todo(&mut self) {
+    pub fn todo(&mut self, min_diag_cov: f32, min_mrd_cov: f32) {
         info!("Looking for base calling tasks...");
         let mut to_demux = Vec::new();
 
@@ -83,101 +106,118 @@ impl Collections {
             .into_values()
             .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
 
+        // Remove VCF anterior to BAM
+        let vcf_by_id = self.vcf.group_by_id();
+        vcf_by_id.iter().for_each(|(id, vcfs)| {
+            if let (Some(diag), Some(mrd)) = (
+                self.bam.get(id, "diag").first(),
+                self.bam.get(id, "mrd").first(),
+            ) {
+                let diag_modified = diag
+                    .file_metadata
+                    .modified()
+                    .expect("Can't read Bam modified time.");
+                let mrd_modified = mrd
+                    .file_metadata
+                    .modified()
+                    .expect("Can't read Bam modified time.");
+                let mut rm_paths: Vec<&Path> = vcfs
+                    .iter()
+                    .flat_map(|vcf| {
+                        let vcf_mod = vcf
+                            .file_metadata
+                            .modified()
+                            .expect("Can't read VCF modified time.");
+
+                        // For somatic caller erase if one bam (diag or mrd) is more recent.
+                        if vcf.caller != "DeepVariant" {
+                            if vcf_mod < diag_modified || vcf_mod < mrd_modified {
+                                vec![vcf.path.parent().unwrap()]
+                            } else {
+                                vec![]
+                            }
+                        } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
+                            || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
+                        {
+                            vec![vcf.path.parent().unwrap()]
+                        } else {
+                            vec![]
+                        }
+                    })
+                    .collect();
+                rm_paths.sort();
+                rm_paths.dedup();
+                println!("{rm_paths:#?}");
+                rm_paths
+                    .iter()
+                    .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
+            }
+        });
+
         // Variant calling
         info!("Looking for variant calling tasks...");
-        let mut looked_ids = Vec::new();
-        for (id, vcfs) in self.vcf.group_by_id() {
+        self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
             if let (Some(diag), Some(mrd)) = (
                 self.bam.get(&id, "diag").first(),
                 self.bam.get(&id, "mrd").first(),
             ) {
-                let caller_time: Vec<(&str, &str)> = vcfs
-                    .iter()
-                    .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
-                    .collect();
+                if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
+                    if diag_cramino.mean_coverage >= min_diag_cov.into()
+                        && mrd_cramino.mean_coverage >= min_mrd_cov.into()
+                    {
+                        let caller_time: Vec<(&str, &str)> = vcf_by_id
+                            .iter()
+                            .filter(|(i, _)| *i == id)
+                            .flat_map(|(_, vcfs)| {
+                                vcfs.iter()
+                                    .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
+                            })
+                            .collect();
 
-                if !caller_time.contains(&("clairs", "diag"))
-                    || !caller_time.contains(&("clairs_indel", "diag"))
-                {
-                    self.tasks.push(CollectionsTasks::ClairS {
-                        id: id.to_string(),
-                        diag_bam: diag.path.to_str().unwrap().to_string(),
-                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                        config: ClairSConfig::default(),
-                    });
-                }
-                if !caller_time.contains(&("DeepVariant", "diag")) {
-                    self.tasks.push(CollectionsTasks::DeepVariant {
-                        id: id.to_string(),
-                        time_point: "diag".to_string(),
-                        bam: diag.path.to_str().unwrap().to_string(),
-                        config: DeepVariantConfig::default(),
-                    });
-                }
-                if !caller_time.contains(&("DeepVariant", "mrd")) {
-                    self.tasks.push(CollectionsTasks::DeepVariant {
-                        id: id.to_string(),
-                        time_point: "mrd".to_string(),
-                        bam: mrd.path.to_str().unwrap().to_string(),
-                        config: DeepVariantConfig::default(),
-                    });
-                }
-                if !caller_time.contains(&("nanomonsv", "diag")) {
-                    self.tasks.push(CollectionsTasks::NanomonSV {
-                        id: id.to_string(),
-                        diag_bam: diag.path.to_str().unwrap().to_string(),
-                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                        config: NanomonSVConfig::default(),
-                    });
+                        if !caller_time.contains(&("clairs", "diag"))
+                            || !caller_time.contains(&("clairs_indel", "diag"))
+                        {
+                            self.tasks.push(CollectionsTasks::ClairS {
+                                id: id.to_string(),
+                                diag_bam: diag.path.to_str().unwrap().to_string(),
+                                mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                                config: ClairSConfig::default(),
+                            });
+                        }
+                        if !caller_time.contains(&("DeepVariant", "diag")) {
+                            self.tasks.push(CollectionsTasks::DeepVariant {
+                                id: id.to_string(),
+                                time_point: "diag".to_string(),
+                                bam: diag.path.to_str().unwrap().to_string(),
+                                config: DeepVariantConfig::default(),
+                            });
+                        }
+                        if !caller_time.contains(&("DeepVariant", "mrd")) {
+                            self.tasks.push(CollectionsTasks::DeepVariant {
+                                id: id.to_string(),
+                                time_point: "mrd".to_string(),
+                                bam: mrd.path.to_str().unwrap().to_string(),
+                                config: DeepVariantConfig::default(),
+                            });
+                        }
+                        if !caller_time.contains(&("nanomonsv", "diag")) {
+                            self.tasks.push(CollectionsTasks::NanomonSV {
+                                id: id.to_string(),
+                                diag_bam: diag.path.to_str().unwrap().to_string(),
+                                mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                                config: NanomonSVConfig::default(),
+                            });
+                        }
+                    }
                 }
-                looked_ids.push(id.clone());
             }
-        }
-
-        // ids without any vcf
-        self.bam
-            .bams
-            .iter()
-            .map(|b| b.id.clone())
-            .filter(|id| !looked_ids.contains(id))
-            .for_each(|id| {
-                if let (Some(diag), Some(mrd)) = (
-                    self.bam.get(&id, "diag").first(),
-                    self.bam.get(&id, "mrd").first(),
-                ) {
-                    self.tasks.push(CollectionsTasks::ClairS {
-                        id: id.to_string(),
-                        diag_bam: diag.path.to_str().unwrap().to_string(),
-                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                        config: ClairSConfig::default(),
-                    });
-
-                    self.tasks.push(CollectionsTasks::DeepVariant {
-                        id: id.to_string(),
-                        time_point: "diag".to_string(),
-                        bam: diag.path.to_str().unwrap().to_string(),
-                        config: DeepVariantConfig::default(),
-                    });
-                    self.tasks.push(CollectionsTasks::DeepVariant {
-                        id: id.to_string(),
-                        time_point: "mrd".to_string(),
-                        bam: mrd.path.to_str().unwrap().to_string(),
-                        config: DeepVariantConfig::default(),
-                    });
-                    self.tasks.push(CollectionsTasks::NanomonSV {
-                        id: id.to_string(),
-                        diag_bam: diag.path.to_str().unwrap().to_string(),
-                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                        config: NanomonSVConfig::default(),
-                    });
-                }
-            });
+        });
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {
-        self.tasks.reverse();
+        // self.tasks.reverse();
         if self.tasks.is_empty() {
-            self.todo();
+            self.todo(15.0, 10.0);
             if self.tasks.is_empty() {
                 return Ok(());
             } else {
@@ -259,3 +299,31 @@ impl CollectionsTasks {
         Ok(())
     }
 }
+
+pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
+    let mut last_n = Vec::new();
+    loop {
+        let mut collection = Collections::new(
+            &config.pod_dir,
+            &config.corrected_fc_path,
+            &config.result_dir,
+        )?;
+        collection.todo(config.min_diag_cov, config.min_mrd_cov);
+        if collection.tasks.is_empty() {
+            warn!("All results are update");
+            break;
+        }
+        let n_tasks = collection.tasks.len();
+        warn!("{n_tasks} tasks to run");
+        if last_n.len() > 2
+            && last_n[last_n.len() - 1] == n_tasks
+            && last_n[last_n.len() - 2] == n_tasks
+        {
+            warn!("Tasks stalled");
+            break;
+        }
+        last_n.push(n_tasks);
+        collection.run()?;
+    }
+    Ok(())
+}

+ 1 - 1
src/commands/bcftools.rs

@@ -9,7 +9,7 @@ pub struct BcftoolsConfig {
 impl Default for BcftoolsConfig {
     fn default() -> Self {
         Self {
-            bin: "/data/tools/bcftools-1.18/bcftools".to_string(),
+            bin: "bcftools".to_string(),
             threads: 20,
         }
     }

+ 5 - 14
src/lib.rs

@@ -19,7 +19,7 @@ lazy_static! {
 mod tests {
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}, collection::{bam::{self, BamType}, vcf::VcfCollection, Collections}, commands::dorado::Dorado};
+    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}, collection::{bam::{self, BamType}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
 
     #[test]
     fn it_works() {
@@ -91,7 +91,7 @@ mod tests {
             result_dir: "/data/test".to_string(),
             ..DeepVariantConfig::default()
         };
-        let _dv = DeepVariant::new("test_a", "diag", "/data/test_data/subset.bam", config).run();
+        DeepVariant::new("test_a", "diag", "/data/test_data/subset.bam", config).run();
         Ok(())
     }
 
@@ -123,22 +123,13 @@ mod tests {
             "/data/flow_cells.tsv",
             "/data/longreads_basic_pipe",
         )?;
-        collections.todo();
+        collections.todo(15.0, 10.0);
         println!("{:#?}", collections.tasks);
-        // collections.run()?;
         Ok(())
     }
 
     #[test_log::test]
-    fn run_tasks() -> anyhow::Result<()> {
-        let mut collections = Collections::new(
-            "/data/run_data",
-            "/data/flow_cells.tsv",
-            "/data/longreads_basic_pipe",
-        )?;
-        collections.todo();
-        println!("{:#?}", collections.tasks);
-        collections.run()?;
-        Ok(())
+    fn run_t() -> anyhow::Result<()> {
+        run_tasks(CollectionsConfig::default())
     }
 }