Thomas před 1 rokem
rodič
revize
5fbf2203ff
7 změnil soubory, kde provedl 244 přidání a 183 odebrání
  1. 0 0
      src/collection/bam.rs
  2. 236 0
      src/collection/mod.rs
  3. 0 2
      src/collection/pod5.rs
  4. 0 0
      src/collection/vcf.rs
  5. 1 4
      src/commands/dorado.rs
  6. 3 177
      src/lib.rs
  7. 4 0
      src/runners.rs

+ 0 - 0
src/bam.rs → src/collection/bam.rs


+ 236 - 0
src/collection/mod.rs

@@ -0,0 +1,236 @@
+use std::path::PathBuf;
+
+use hashbrown::HashMap;
+use log::{info, warn};
+
+use crate::{
+    callers::{
+        clairs::{ClairS, ClairSConfig},
+        deep_variant::{DeepVariant, DeepVariantConfig},
+        nanomonsv::{NanomonSV, NanomonSVConfig},
+    },
+    collection::{
+        pod5::{FlowCellCase, Pod5Type},
+        vcf::Vcf,
+    },
+    commands::dorado::Dorado,
+    config::Config,
+    runners::Run,
+};
+use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
+
+pub mod bam;
+pub mod pod5;
+pub mod vcf;
+
+#[derive(Debug)]
+pub struct Collections {
+    pub pod5: Pod5Collection,
+    pub bam: BamCollection,
+    pub vcf: VcfCollection,
+    pub tasks: Vec<CollectionsTasks>,
+}
+
+impl Collections {
+    pub fn new(pod_dir: &str, corrected_fc_path: &str, result_dir: &str) -> anyhow::Result<Self> {
+        let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
+        let bam = BamCollection::new(result_dir);
+        let vcf = VcfCollection::new(result_dir);
+
+        Ok(Self {
+            pod5,
+            bam,
+            vcf,
+            tasks: Vec::new(),
+        })
+    }
+
+    pub fn todo(&mut self) {
+        info!("Looking for base calling tasks...");
+        let mut to_demux = Vec::new();
+
+        // let bams_acquisitions_ids = self.bam.by_acquisition_id();
+        for run in self.pod5.runs.iter() {
+            for fc in run.flowcells.iter() {
+                let acq_id = fc.pod5_info.acquisition_id.clone();
+                for case in fc.cases.iter() {
+                    let bams_ids: Vec<String> = self
+                        .bam
+                        .get(&case.id, &case.time_point)
+                        .iter()
+                        .flat_map(|b| {
+                            b.composition
+                                .iter()
+                                .map(|c| c.0.clone())
+                                .collect::<Vec<String>>()
+                        })
+                        .filter(|id| *id == acq_id)
+                        .collect();
+                    if bams_ids.is_empty() {
+                        match fc.pod5_type {
+                            Pod5Type::Raw => to_demux.push(case.clone()),
+                            Pod5Type::Demuxed => {
+                                self.tasks.push(CollectionsTasks::Align(case.clone()))
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        // Group for muxed and push task with all cases
+        let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
+        for case in to_demux {
+            grouped.entry(case.pod_dir.clone()).or_default().push(case);
+        }
+        grouped
+            .into_values()
+            .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
+
+        // Variant calling
+        self.vcf.sort_by_id();
+        let mut vcf_by_ids: HashMap<String, Vec<Vcf>> = HashMap::new();
+        self.vcf.vcfs.iter().for_each(|v| {
+            vcf_by_ids.entry(v.id.clone()).or_default().push(v.clone());
+        });
+
+        vcf_by_ids.iter().for_each(|(k, v)| {
+            if v.len() != 5 {
+                if let (Some(diag), Some(mrd)) = (
+                    self.bam.get(k, "diag").first(),
+                    self.bam.get(k, "mrd").first(),
+                ) {
+                    println!("{v:#?}");
+                    let diag_bam = diag.path.to_str().unwrap().to_string();
+                    let mrd_bam = mrd.path.to_str().unwrap().to_string();
+
+                    let has_clairs = v.iter().any(|v| v.caller == "clairs");
+                    let has_clairs_indel = v.iter().any(|v| v.caller == "clairs_indel");
+                    if !has_clairs || !has_clairs_indel {
+                        self.tasks.push(CollectionsTasks::ClairS {
+                            id: k.to_string(),
+                            diag_bam: diag_bam.clone(),
+                            mrd_bam: mrd_bam.clone(),
+                            config: ClairSConfig::default(),
+                        });
+                    }
+                    if !v
+                        .iter()
+                        .any(|v| v.caller == "DeepVariant" && v.time_point == "diag")
+                    {
+                        self.tasks.push(CollectionsTasks::DeepVariant {
+                            id: k.to_string(),
+                            time_point: "diag".to_string(),
+                            bam: diag_bam.clone(),
+                            config: DeepVariantConfig::default(),
+                        })
+                    }
+                    if !v
+                        .iter()
+                        .any(|v| v.caller == "DeepVariant" && v.time_point == "mrd")
+                    {
+                        self.tasks.push(CollectionsTasks::DeepVariant {
+                            id: k.to_string(),
+                            time_point: "mrd".to_string(),
+                            bam: mrd_bam.clone(),
+                            config: DeepVariantConfig::default(),
+                        })
+                    }
+                    if !v.iter().any(|v| v.caller == "nanomonsv") {
+                        self.tasks.push(CollectionsTasks::NanomonSV {
+                            id: k.to_string(),
+                            diag_bam: diag_bam.clone(),
+                            mrd_bam: mrd_bam.clone(),
+                            config: NanomonSVConfig::default(),
+                        })
+                    }
+                };
+            }
+        });
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        self.tasks.reverse();
+        if self.tasks.is_empty() {
+            self.todo();
+            if self.tasks.is_empty() {
+                return Ok(());
+            } else {
+                self.run()?;
+            }
+        } else {
+            let n_tasks = self.tasks.len();
+            warn!("{n_tasks} tasks to run");
+            let mut i = 1;
+            while let Some(task) = self.tasks.pop() {
+                warn!("Running {i}/{n_tasks}");
+                info!("{task:#?}");
+                task.run()?;
+                i += 1;
+            }
+        }
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub enum CollectionsTasks {
+    Align(FlowCellCase),
+    DemuxAlign(Vec<FlowCellCase>),
+    DeepVariant {
+        id: String,
+        time_point: String,
+        bam: String,
+        config: DeepVariantConfig,
+    },
+    ClairS {
+        id: String,
+        diag_bam: String,
+        mrd_bam: String,
+        config: ClairSConfig,
+    },
+    NanomonSV {
+        id: String,
+        diag_bam: String,
+        mrd_bam: String,
+        config: NanomonSVConfig,
+    },
+}
+
+impl Run for CollectionsTasks {
+    fn run(self) -> anyhow::Result<()> {
+        match self {
+            CollectionsTasks::Align(case) => {
+                Dorado::init(case.clone(), Config::default())?.run_pipe()?;
+            }
+            CollectionsTasks::DemuxAlign(cases) => {
+                Dorado::from_mux(cases, Config::default())?;
+            }
+            CollectionsTasks::DeepVariant {
+                id,
+                time_point,
+                bam,
+                config,
+            } => {
+                DeepVariant::new(&id, &time_point, &bam, config).run();
+            }
+            CollectionsTasks::ClairS {
+                id,
+                diag_bam,
+                mrd_bam,
+                config,
+            } => {
+                ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
+            }
+            CollectionsTasks::NanomonSV {
+                id,
+                diag_bam,
+                mrd_bam,
+                config,
+            } => {
+                NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
+            }
+        }
+        Ok(())
+    }
+}

+ 0 - 2
src/pod5.rs → src/collection/pod5.rs

@@ -4,14 +4,12 @@ use csv::ReaderBuilder;
 use glob::glob;
 use log::{info, warn};
 use pandora_lib_pod5::Pod5Info;
-use rayon::iter::ParallelBridge;
 use serde::Deserialize;
 use std::{
     fmt::Display,
     fs::{self, File, Metadata},
     os::unix::fs::MetadataExt,
     path::PathBuf,
-    usize,
 };
 use hashbrown::HashMap;
 use rayon::prelude::*;

+ 0 - 0
src/vcf.rs → src/collection/vcf.rs


+ 1 - 4
src/commands/dorado.rs

@@ -9,11 +9,8 @@ use duct::cmd;
 use log::{info, warn};
 use uuid::Uuid;
 
-use crate::{config::Config, pod5::FlowCellCase};
+use crate::{config::Config, collection::pod5::FlowCellCase};
 
-pub trait Run {
-    fn run(self) -> anyhow::Result<()>;
-}
 
 #[derive(Debug, Clone)]
 pub struct DoradoParams {

+ 3 - 177
src/lib.rs

@@ -1,188 +1,15 @@
-use std::{collections::HashMap, path::PathBuf};
-
-use bam::BamCollection;
-use callers::{clairs::{ClairS, ClairSConfig}, deep_variant::{DeepVariant, DeepVariantConfig}, nanomonsv::{NanomonSV, NanomonSVConfig}};
-use commands::dorado::{Dorado, Run};
-use config::Config;
-use log::{info, warn};
-use pod5::{FlowCellCase, Pod5Collection, Pod5Type};
-use vcf::VcfCollection;
-
-use crate::vcf::Vcf;
-
-pub mod bam;
 pub mod commands;
 pub mod config;
 pub mod modkit;
-pub mod pod5;
-mod vcf;
 pub mod callers;
 pub mod runners;
-
-#[derive(Debug)]
-pub struct Collections {
-    pub pod5: Pod5Collection,
-    pub bam: BamCollection,
-    pub vcf: VcfCollection,
-    pub tasks: Vec<CollectionsTasks>,
-}
-
-impl Collections {
-    pub fn new(pod_dir: &str, corrected_fc_path: &str, result_dir: &str) -> anyhow::Result<Self> {
-        let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
-        let bam = BamCollection::new(result_dir);
-        let vcf = VcfCollection::new(result_dir);
-
-        Ok(Self {
-            pod5,
-            bam,
-            vcf,
-            tasks: Vec::new(),
-        })
-    }
-
-    pub fn todo(&mut self) {
-        info!("Looking for base calling tasks...");
-        let mut to_demux = Vec::new();
-
-        // let bams_acquisitions_ids = self.bam.by_acquisition_id();
-        for run in self.pod5.runs.iter() {
-            for fc in run.flowcells.iter() {
-                let acq_id = fc.pod5_info.acquisition_id.clone();
-                for case in fc.cases.iter() {
-                    let bams_ids: Vec<String> = self
-                        .bam
-                        .get(&case.id, &case.time_point)
-                        .iter()
-                        .flat_map(|b| {
-                            b.composition
-                                .iter()
-                                .map(|c| c.0.clone())
-                                .collect::<Vec<String>>()
-                        })
-                        .filter(|id| *id == acq_id)
-                        .collect();
-                    if bams_ids.is_empty() {
-                        match fc.pod5_type {
-                            Pod5Type::Raw => to_demux.push(case.clone()),
-                            Pod5Type::Demuxed => {
-                                self.tasks.push(CollectionsTasks::Align(case.clone()))
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        // Group for muxed and push task with all cases
-        let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
-        for case in to_demux {
-            grouped
-                .entry(case.pod_dir.clone())
-                .or_default()
-                .push(case);
-        }
-        grouped
-            .into_values()
-            .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
-
-        // Variant calling
-        self.vcf.sort_by_id();
-        let mut vcf_by_ids: HashMap<String, Vec<Vcf>> = HashMap::new();
-        self.vcf.vcfs.iter().for_each(|v| {
-            vcf_by_ids.entry(v.id.clone()).or_default().push(v.clone());
-        });
-
-        vcf_by_ids.iter().for_each(|(k, v)| {
-            if v.len() != 5 {
-                if let (Some(diag), Some(mrd)) = (self.bam.get(k, "diag").first(), self.bam.get(k, "mrd").first()) {
-
-                    println!("{v:#?}");
-                    let diag_bam = diag.path.to_str().unwrap().to_string();
-                    let mrd_bam = mrd.path.to_str().unwrap().to_string();
-
-                    let has_clairs = v.iter().any(|v| v.caller == "clairs");
-                    let has_clairs_indel = v.iter().any(|v| v.caller == "clairs_indel");
-                    if !has_clairs || !has_clairs_indel {
-                        self.tasks.push(CollectionsTasks::ClairS { id: k.to_string(), diag_bam: diag_bam.clone(), mrd_bam: mrd_bam.clone(), config: ClairSConfig::default() });
-                    }
-                    if !v.iter().any(|v| v.caller == "DeepVariant" && v.time_point == "diag") {
-                        self.tasks.push(CollectionsTasks::DeepVariant { id: k.to_string(), time_point: "diag".to_string(), bam: diag_bam.clone(), config: DeepVariantConfig::default() })
-                    }
-                    if !v.iter().any(|v| v.caller == "DeepVariant" && v.time_point == "mrd") {
-                        self.tasks.push(CollectionsTasks::DeepVariant { id: k.to_string(), time_point: "mrd".to_string(), bam: mrd_bam.clone(), config: DeepVariantConfig::default() })
-                    }
-                    if !v.iter().any(|v| v.caller == "nanomonsv") {
-                        self.tasks.push(CollectionsTasks::NanomonSV { id: k.to_string(), diag_bam: diag_bam.clone(), mrd_bam: mrd_bam.clone(), config: NanomonSVConfig::default() })
-                    }
-                };
-            }
-        });
-        
-    }
-
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        self.tasks.reverse();
-        if self.tasks.is_empty() {
-            self.todo();
-            if self.tasks.is_empty() {
-                return Ok(());
-            } else {
-                self.run()?;
-            }
-        } else {
-            let n_tasks = self.tasks.len();
-            warn!("{n_tasks} tasks to run");
-            let mut i = 1;
-            while let Some(task) = self.tasks.pop() {
-                warn!("Running {i}/{n_tasks}");
-                info!("{task:#?}");
-                task.run()?;
-                i += 1;
-            }
-        }
-        Ok(())
-    }
-}
-
-#[derive(Debug)]
-pub enum CollectionsTasks {
-    Align(FlowCellCase),
-    DemuxAlign(Vec<FlowCellCase>),
-    DeepVariant {id: String, time_point: String, bam: String, config: DeepVariantConfig},
-    ClairS {id: String, diag_bam: String, mrd_bam: String, config: ClairSConfig},
-    NanomonSV {id: String, diag_bam: String, mrd_bam: String, config: NanomonSVConfig},
-}
-
-impl Run for CollectionsTasks {
-    fn run(self) -> anyhow::Result<()> {
-        match self {
-            CollectionsTasks::Align(case) => {
-                Dorado::init(case.clone(), Config::default())?.run_pipe()?;
-            }
-            CollectionsTasks::DemuxAlign(cases) => {
-                Dorado::from_mux(cases, Config::default())?;
-            },
-            CollectionsTasks::DeepVariant { id, time_point, bam, config } => {
-                DeepVariant::new(&id, &time_point, &bam, config).run();
-            },
-            CollectionsTasks::ClairS { id, diag_bam, mrd_bam, config } => {
-                ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
-            },
-            CollectionsTasks::NanomonSV { id, diag_bam, mrd_bam, config } => {
-                NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
-            },
-        }
-        Ok(())
-    }
-}
+pub mod collection;
 
 #[cfg(test)]
 mod tests {
-    use self::{callers::deep_variant::DeepVariantConfig, commands::dorado};
-
+    use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{bam::BamType, callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}};
+    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}, collection::{bam::{self, BamType}, vcf::VcfCollection, Collections}, commands::dorado::Dorado};
 
     #[test]
     fn it_works() {
@@ -304,5 +131,4 @@ mod tests {
         collections.run()?;
         Ok(())
     }
-
 }

+ 4 - 0
src/runners.rs

@@ -7,6 +7,10 @@ use std::{
 
 use log::{info, warn};
 
+pub trait Run {
+    fn run(self) -> anyhow::Result<()>;
+}
+
 #[derive(Debug, Default)]
 pub struct DockerRun {
     pub args: Vec<String>,