Browse Source

context + dedup tasks

Thomas 1 year ago
parent
commit
2073c9ecf8

+ 1 - 1
src/callers/clairs.rs

@@ -45,7 +45,7 @@ impl ClairS {
         let output_vcf = format!("{output_dir}/output.vcf.gz");
         let output_indel = format!("{output_dir}/indel.vcf.gz");
         let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
-        let indel_vcf_passed = format!("{output_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
+        let indel_vcf_passed = format!("{output_dir}/{id}_diag_clairs_indel_PASSED.vcf.gz");
 
         let log_dir = format!("{}/{}/log/ClairS", config.result_dir, id);
         Self {

+ 7 - 2
src/collection/bam.rs

@@ -79,7 +79,9 @@ impl Bam {
                 return Err(anyhow!("Cramino file missing {cramino_path}"));
             }
             let mut cramino = Cramino::default().with_result_path(&cramino_path);
-            cramino.parse_results()?;
+            cramino
+                .parse_results()
+                .context(format!("Error while parsing cramino for {cramino_path}"))?;
 
             if let Some(cramino) = cramino.results {
                 Some(cramino)
@@ -90,7 +92,10 @@ impl Bam {
             None
         };
 
-        let composition = pandora_lib_pileup::bam_compo(path.to_string_lossy().as_ref(), 20000)?;
+        let composition =
+            pandora_lib_pileup::bam_compo(path.to_string_lossy().as_ref(), 20000).context(
+                format!("Error while reading BAM composition for {}", path.display()),
+            )?;
 
         Ok(Self {
             path,

+ 104 - 53
src/collection/mod.rs

@@ -1,9 +1,5 @@
-use std::{
-    fs,
-    path::{Path, PathBuf},
-};
+use std::{collections::HashMap, fmt, fs, path::Path};
 
-use hashbrown::HashMap;
 use log::{info, warn};
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
@@ -13,14 +9,14 @@ use crate::{
         deep_variant::{DeepVariant, DeepVariantConfig},
         nanomonsv::{NanomonSV, NanomonSVConfig},
     },
-    collection::pod5::{FlowCellCase, Pod5Type},
-    commands::dorado::Dorado,
+    collection::pod5::FlowCellCase,
+    commands::dorado::Dorado as BasecallAlign,
     config::Config,
 };
 
 pub mod bam;
 pub mod pod5;
-pub mod somatic_variants;
+pub mod variants;
 pub mod vcf;
 
 #[derive(Debug)]
@@ -67,44 +63,45 @@ impl Collections {
 
     pub fn todo(&mut self, min_diag_cov: f32, min_mrd_cov: f32) {
         info!("Looking for base calling tasks...");
-        let mut to_demux = Vec::new();
-
-        for run in self.pod5.runs.iter() {
-            for fc in run.flowcells.iter() {
-                let acq_id = fc.pod5_info.acquisition_id.clone();
-                for case in fc.cases.iter() {
-                    let bams_ids: Vec<String> = self
-                        .bam
-                        .get(&case.id, &case.time_point)
-                        .iter()
-                        .flat_map(|b| {
-                            b.composition
-                                .iter()
-                                .map(|c| c.0.clone())
-                                .collect::<Vec<String>>()
-                        })
-                        .filter(|id| *id == acq_id)
-                        .collect();
-                    if bams_ids.is_empty() {
-                        match fc.pod5_type {
-                            Pod5Type::Raw => to_demux.push(case.clone()),
-                            Pod5Type::Demuxed => {
-                                self.tasks.push(CollectionsTasks::Align(case.clone()))
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        // Group for muxed and push task with all cases
-        let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
-        for case in to_demux {
-            grouped.entry(case.pod_dir.clone()).or_default().push(case);
-        }
-        grouped
-            .into_values()
-            .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
+        let mut tasks = Vec::new();
+        // let mut to_demux = Vec::new();
+        //
+        // for run in self.pod5.runs.iter() {
+        //     for fc in run.flowcells.iter() {
+        //         let acq_id = fc.pod5_info.acquisition_id.clone();
+        //         for case in fc.cases.iter() {
+        //             let bams_ids: Vec<String> = self
+        //                 .bam
+        //                 .get(&case.id, &case.time_point)
+        //                 .iter()
+        //                 .flat_map(|b| {
+        //                     b.composition
+        //                         .iter()
+        //                         .map(|c| c.0.clone())
+        //                         .collect::<Vec<String>>()
+        //                 })
+        //                 .filter(|id| *id == acq_id)
+        //                 .collect();
+        //             if bams_ids.is_empty() {
+        //                 match fc.pod5_type {
+        //                     Pod5Type::Raw => to_demux.push(case.clone()),
+        //                     Pod5Type::Demuxed => {
+        //                         tasks.push(CollectionsTasks::Align(case.clone()))
+        //                     }
+        //                 }
+        //             }
+        //         }
+        //     }
+        // }
+        //
+        // // Group for muxed and push task with all cases
+        // let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
+        // for case in to_demux {
+        //     grouped.entry(case.pod_dir.clone()).or_default().push(case);
+        // }
+        // grouped
+        //     .into_values()
+        //     .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
 
         // Remove VCF anterior to BAM
         let vcf_by_id = self.vcf.group_by_id();
@@ -176,7 +173,7 @@ impl Collections {
                         if !caller_time.contains(&("clairs", "diag"))
                             || !caller_time.contains(&("clairs_indel", "diag"))
                         {
-                            self.tasks.push(CollectionsTasks::ClairS {
+                            tasks.push(CollectionsTasks::ClairS {
                                 id: id.to_string(),
                                 diag_bam: diag.path.to_str().unwrap().to_string(),
                                 mrd_bam: mrd.path.to_str().unwrap().to_string(),
@@ -184,7 +181,7 @@ impl Collections {
                             });
                         }
                         if !caller_time.contains(&("DeepVariant", "diag")) {
-                            self.tasks.push(CollectionsTasks::DeepVariant {
+                            tasks.push(CollectionsTasks::DeepVariant {
                                 id: id.to_string(),
                                 time_point: "diag".to_string(),
                                 bam: diag.path.to_str().unwrap().to_string(),
@@ -192,7 +189,7 @@ impl Collections {
                             });
                         }
                         if !caller_time.contains(&("DeepVariant", "mrd")) {
-                            self.tasks.push(CollectionsTasks::DeepVariant {
+                            tasks.push(CollectionsTasks::DeepVariant {
                                 id: id.to_string(),
                                 time_point: "mrd".to_string(),
                                 bam: mrd.path.to_str().unwrap().to_string(),
@@ -200,7 +197,7 @@ impl Collections {
                             });
                         }
                         if !caller_time.contains(&("nanomonsv", "diag")) {
-                            self.tasks.push(CollectionsTasks::NanomonSV {
+                            tasks.push(CollectionsTasks::NanomonSV {
                                 id: id.to_string(),
                                 diag_bam: diag.path.to_str().unwrap().to_string(),
                                 mrd_bam: mrd.path.to_str().unwrap().to_string(),
@@ -211,6 +208,12 @@ impl Collections {
                 }
             }
         });
+        let mut hs = HashMap::new();
+        tasks.into_iter().for_each(|t| {
+            hs.insert(t.to_string(), t);
+        });
+
+        self.tasks = hs.into_values().collect();
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {
@@ -265,10 +268,10 @@ impl CollectionsTasks {
     pub fn run(self) -> anyhow::Result<()> {
         match self {
             CollectionsTasks::Align(case) => {
-                Dorado::init(case.clone(), Config::default())?.run_pipe()?;
+                BasecallAlign::init(case.clone(), Config::default())?.run_pipe()?;
             }
             CollectionsTasks::DemuxAlign(cases) => {
-                Dorado::from_mux(cases, Config::default())?;
+                BasecallAlign::from_mux(cases, Config::default())?;
             }
             CollectionsTasks::DeepVariant {
                 id,
@@ -299,6 +302,54 @@ impl CollectionsTasks {
     }
 }
 
+// Implement Display for CollectionsTasks
+impl fmt::Display for CollectionsTasks {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        use CollectionsTasks::*;
+
+        match self {
+            Align(case) => write!(f, "Align task  with: {:#?}", case),
+            DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases),
+            DeepVariant {
+                id,
+                time_point,
+                bam,
+                ..
+            } => {
+                write!(
+                    f,
+                    "DeepVariant task with id: {}, time_point: {}, bam: {}",
+                    id, time_point, bam
+                )
+            }
+            ClairS {
+                id,
+                diag_bam,
+                mrd_bam,
+                ..
+            } => {
+                write!(
+                    f,
+                    "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}",
+                    id, diag_bam, mrd_bam
+                )
+            }
+            NanomonSV {
+                id,
+                diag_bam,
+                mrd_bam,
+                ..
+            } => {
+                write!(
+                    f,
+                    "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}",
+                    id, diag_bam, mrd_bam
+                )
+            }
+        }
+    }
+}
+
 pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
     let mut last_n = Vec::new();
     loop {
@@ -318,7 +369,7 @@ pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
             && last_n[last_n.len() - 1] == n_tasks
             && last_n[last_n.len() - 2] == n_tasks
         {
-            warn!("Tasks stalled");
+            warn!("Tasks don't progress");
             break;
         }
         last_n.push(n_tasks);

+ 0 - 27
src/collection/somatic_variants.rs

@@ -1,27 +0,0 @@
-use std::path::PathBuf;
-
-use glob::glob;
-use log::warn;
-use rayon::prelude::*;
-
-pub struct SomaticVarCollection {
-    pub somatic_var: Vec<PathBuf>,
-}
-
-impl SomaticVarCollection {
-    pub fn new(result_dir: &str) -> anyhow::Result<Self> {
-        let pattern = format!("{}/*/*/*_variants.bytes.gz", result_dir);
-        let somatic_var = glob(&pattern)
-            .expect("Failed to read glob pattern")
-            .par_bridge()
-            .filter_map(|entry| {
-                match entry {
-                    Ok(path) => return Some(path),
-                    Err(e) => warn!("Error: {:?}", e),
-                }
-                None
-            })
-            .collect();
-        Ok(SomaticVarCollection { somatic_var })
-    }
-}

+ 53 - 0
src/collection/variants.rs

@@ -0,0 +1,53 @@
+use std::{fs::Metadata, path::PathBuf};
+
+use glob::glob;
+use log::warn;
+use rayon::prelude::*;
+
+pub struct VariantsCollection {
+    pub data: Vec<VariantsCase>,
+}
+
+#[derive(Debug)]
+pub struct VariantsCase {
+    pub path: PathBuf,
+    pub id: String,
+    pub file_metadata: Metadata,
+}
+
+impl VariantsCase {
+    pub fn new(path: PathBuf) -> anyhow::Result<Self> {
+        let id = path
+            .parent()
+            .unwrap()
+            .parent()
+            .unwrap()
+            .file_name()
+            .unwrap()
+            .to_string_lossy()
+            .to_string();
+        let file_metadata = path.metadata()?;
+        Ok(VariantsCase { path, id, file_metadata })
+    }
+}
+
+impl VariantsCollection {
+    pub fn new(result_dir: &str) -> anyhow::Result<Self> {
+        let pattern = format!("{}/*/*/*_variants.bytes.gz", result_dir);
+        let data = glob(&pattern)
+            .expect("Failed to read glob pattern")
+            .par_bridge()
+            .filter_map(|entry| {
+                match entry {
+                    Ok(path) => match VariantsCase::new(path) {
+                        Ok(vc) => return Some(vc),
+                        Err(err) => warn!("{err}"),
+                    },
+                    Err(e) => warn!("Error: {:?}", e),
+                }
+                None
+            })
+            .collect();
+        Ok(VariantsCollection { data })
+    }
+}

+ 1 - 1
src/collection/vcf.rs

@@ -126,7 +126,7 @@ impl VcfCollection {
 
 pub fn n_variants(path: &str) -> anyhow::Result<u64> {
     let csi_src = format!("{path}.csi");
-    let index = csi::read(csi_src)?;
+    let index = csi::read(csi_src).context(format!("can't read index of {path}"))?;
 
     let mut n = 0;
     for reference_sequence in index.reference_sequences() {

+ 20 - 1
src/commands/bcftools.rs

@@ -1,3 +1,6 @@
+use std::fs;
+use uuid::Uuid;
+
 use crate::runners::{run_wait, CommandRun, RunReport};
 
 #[derive(Debug)]
@@ -20,6 +23,21 @@ pub fn bcftools_keep_pass(
     output: &str,
     config: BcftoolsConfig,
 ) -> anyhow::Result<RunReport> {
+    let tmp_file = format!("/tmp/{}", Uuid::new_v4());
+
+    // First sort
+    let mut cmd_run = CommandRun::new(
+        &config.bin,
+        &[
+            "sort",
+            input,
+            "-o",
+            &tmp_file,
+        ],
+    );
+    let _ = run_wait(&mut cmd_run)?;
+
+    // Then filter
     let mut cmd_run = CommandRun::new(
         &config.bin,
         &[
@@ -29,11 +47,12 @@ pub fn bcftools_keep_pass(
             &config.threads.to_string(),
             "-i",
             "FILTER='PASS'",
-            input,
+            &tmp_file,
             "-o",
             output,
         ],
     );
     let res = run_wait(&mut cmd_run)?;
+    fs::remove_file(tmp_file)?;
     Ok(res)
 }

+ 18 - 1
src/lib.rs

@@ -19,7 +19,7 @@ lazy_static! {
 mod tests {
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}, collection::{bam::{self, BamType}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
+    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}, collection::{bam::{self, BamType}, run_tasks, variants::VariantsCollection, vcf::VcfCollection, Collections, CollectionsConfig}, commands::{bcftools::{bcftools_keep_pass, BcftoolsConfig}, dorado::Dorado}};
 
     #[test]
     fn it_works() {
@@ -125,6 +125,7 @@ mod tests {
         )?;
         collections.todo(15.0, 10.0);
         println!("{:#?}", collections.tasks);
+        println!("{}", collections.tasks.len());
         Ok(())
     }
 
@@ -132,4 +133,20 @@ mod tests {
     fn run_t() -> anyhow::Result<()> {
         run_tasks(CollectionsConfig::default())
     }
+
+    #[test_log::test]
+    fn somatic() -> anyhow::Result<()> {
+        let variants_collection = VariantsCollection::new("/data/longreads_basic_pipe")?;
+        variants_collection.data.iter().for_each(|v| println!("{}\t{}", v.id, v.path.display()));
+        Ok(())
+    }
+
+    #[test_log::test]
+    fn bcftools_pass() {
+        let config = BcftoolsConfig::default();
+        let id = "RICCO";
+        let i = format!("/data/longreads_basic_pipe/{id}/diag/nanomonsv/{id}_diag.nanomonsv.result.vcf");
+        let o = format!("/data/longreads_basic_pipe/{id}/diag/nanomonsv/{id}_diag_nanomonsv_PASSED.vcf.gz");
+        bcftools_keep_pass(&i, &o, config).unwrap();
+    }
 }