Thomas 1 year ago
parent
commit
768f5e4c83
3 changed files with 231 additions and 107 deletions
  1. 187 104
      src/collection/mod.rs
  2. 2 1
      src/collection/modbases.rs
  3. 42 2
      src/lib.rs

+ 187 - 104
src/collection/mod.rs

@@ -82,7 +82,8 @@ impl Collections {
             result_dir,
             ..
         } = &config;
-        let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
+        // let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
+        let pod5 = Pod5Collection::default();
         let bam = BamCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
         let modbases = ModBasesCollection::new(result_dir);
@@ -174,106 +175,106 @@ impl Collections {
         tasks.extend(self.todo_assembler()?);
 
         // Remove VCF anterior to BAM
-        let vcf_by_id = self.vcf.group_by_id();
-        vcf_by_id.iter().for_each(|(id, vcfs)| {
-            if let (Some(diag), Some(mrd)) = (
-                self.bam.get(id, "diag").first(),
-                self.bam.get(id, "mrd").first(),
-            ) {
-                let diag_modified = diag.modified;
-                let mrd_modified = mrd.modified;
-                let mut rm_paths: Vec<&Path> = vcfs
-                    .iter()
-                    .flat_map(|vcf| {
-                        let vcf_mod: DateTime<Utc> = vcf
-                            .file_metadata
-                            .modified()
-                            .expect("Can't read VCF modified time.")
-                            .into();
-
-                        // For somatic caller erase if one bam (diag or mrd) is more recent.
-                        if vcf.caller != "DeepVariant" {
-                            if vcf_mod < diag_modified || vcf_mod < mrd_modified {
-                                vec![vcf.path.parent().unwrap()]
-                            } else {
-                                vec![]
-                            }
-                        } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
-                            || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
-                        {
-                            vec![vcf.path.parent().unwrap()]
-                        } else {
-                            vec![]
-                        }
-                    })
-                    .collect();
-                rm_paths.sort();
-                rm_paths.dedup();
-                rm_paths
-                    .iter()
-                    .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
-            }
-        });
+        // let vcf_by_id = self.vcf.group_by_id();
+        // vcf_by_id.iter().for_each(|(id, vcfs)| {
+        //     if let (Some(diag), Some(mrd)) = (
+        //         self.bam.get(id, "diag").first(),
+        //         self.bam.get(id, "mrd").first(),
+        //     ) {
+        //         let diag_modified = diag.modified;
+        //         let mrd_modified = mrd.modified;
+        //         let mut rm_paths: Vec<&Path> = vcfs
+        //             .iter()
+        //             .flat_map(|vcf| {
+        //                 let vcf_mod: DateTime<Utc> = vcf
+        //                     .file_metadata
+        //                     .modified()
+        //                     .expect("Can't read VCF modified time.")
+        //                     .into();
+        //
+        //                 // For somatic caller erase if one bam (diag or mrd) is more recent.
+        //                 if vcf.caller != "DeepVariant" {
+        //                     if vcf_mod < diag_modified || vcf_mod < mrd_modified {
+        //                         vec![vcf.path.parent().unwrap()]
+        //                     } else {
+        //                         vec![]
+        //                     }
+        //                 } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
+        //                     || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
+        //                 {
+        //                     vec![vcf.path.parent().unwrap()]
+        //                 } else {
+        //                     vec![]
+        //                 }
+        //             })
+        //             .collect();
+        //         rm_paths.sort();
+        //         rm_paths.dedup();
+        //         rm_paths
+        //             .iter()
+        //             .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
+        //     }
+        // });
 
         // Variant calling
         info!("Looking for variant calling tasks...");
-        self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
-            if let (Some(diag), Some(mrd)) = (
-                self.bam.get(&id, "diag").first(),
-                self.bam.get(&id, "mrd").first(),
-            ) {
-                if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
-                    if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
-                        && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
-                        && !self.config.id_black_list.contains(&id)
-                    {
-                        let caller_time: Vec<(&str, &str)> = vcf_by_id
-                            .iter()
-                            .filter(|(i, _)| *i == id)
-                            .flat_map(|(_, vcfs)| {
-                                vcfs.iter()
-                                    .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
-                            })
-                            .collect();
-
-                        if !caller_time.contains(&("clairs", "diag"))
-                            || !caller_time.contains(&("clairs_indel", "diag"))
-                        {
-                            tasks.push(CollectionsTasks::ClairS {
-                                id: id.to_string(),
-                                diag_bam: diag.path.to_str().unwrap().to_string(),
-                                mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                                config: ClairSConfig::default(),
-                            });
-                        }
-                        if !caller_time.contains(&("DeepVariant", "diag")) {
-                            tasks.push(CollectionsTasks::DeepVariant {
-                                id: id.to_string(),
-                                time_point: "diag".to_string(),
-                                bam: diag.path.to_str().unwrap().to_string(),
-                                config: DeepVariantConfig::default(),
-                            });
-                        }
-                        if !caller_time.contains(&("DeepVariant", "mrd")) {
-                            tasks.push(CollectionsTasks::DeepVariant {
-                                id: id.to_string(),
-                                time_point: "mrd".to_string(),
-                                bam: mrd.path.to_str().unwrap().to_string(),
-                                config: DeepVariantConfig::default(),
-                            });
-                        }
-                        if !caller_time.contains(&("nanomonsv", "diag")) {
-                            tasks.push(CollectionsTasks::NanomonSV {
-                                id: id.to_string(),
-                                diag_bam: diag.path.to_str().unwrap().to_string(),
-                                mrd_bam: mrd.path.to_str().unwrap().to_string(),
-                                config: NanomonSVConfig::default(),
-                            });
-                        }
-                    }
-                }
-            }
-        });
+        // self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
+        //     if let (Some(diag), Some(mrd)) = (
+        //         self.bam.get(&id, "diag").first(),
+        //         self.bam.get(&id, "mrd").first(),
+        //     ) {
+        //         if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
+        //             if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
+        //                 && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
+        //                 && !self.config.id_black_list.contains(&id)
+        //             {
+        //                 let caller_time: Vec<(&str, &str)> = vcf_by_id
+        //                     .iter()
+        //                     .filter(|(i, _)| *i == id)
+        //                     .flat_map(|(_, vcfs)| {
+        //                         vcfs.iter()
+        //                             .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
+        //                     })
+        //                     .collect();
+        //
+        //                 if !caller_time.contains(&("clairs", "diag"))
+        //                     || !caller_time.contains(&("clairs_indel", "diag"))
+        //                 {
+        //                     tasks.push(CollectionsTasks::ClairS {
+        //                         id: id.to_string(),
+        //                         diag_bam: diag.path.to_str().unwrap().to_string(),
+        //                         mrd_bam: mrd.path.to_str().unwrap().to_string(),
+        //                         config: ClairSConfig::default(),
+        //                     });
+        //                 }
+        //                 if !caller_time.contains(&("DeepVariant", "diag")) {
+        //                     tasks.push(CollectionsTasks::DeepVariant {
+        //                         id: id.to_string(),
+        //                         time_point: "diag".to_string(),
+        //                         bam: diag.path.to_str().unwrap().to_string(),
+        //                         config: DeepVariantConfig::default(),
+        //                     });
+        //                 }
+        //                 if !caller_time.contains(&("DeepVariant", "mrd")) {
+        //                     tasks.push(CollectionsTasks::DeepVariant {
+        //                         id: id.to_string(),
+        //                         time_point: "mrd".to_string(),
+        //                         bam: mrd.path.to_str().unwrap().to_string(),
+        //                         config: DeepVariantConfig::default(),
+        //                     });
+        //                 }
+        //                 if !caller_time.contains(&("nanomonsv", "diag")) {
+        //                     tasks.push(CollectionsTasks::NanomonSV {
+        //                         id: id.to_string(),
+        //                         diag_bam: diag.path.to_str().unwrap().to_string(),
+        //                         mrd_bam: mrd.path.to_str().unwrap().to_string(),
+        //                         config: NanomonSVConfig::default(),
+        //                     });
+        //                 }
+        //             }
+        //         }
+        //     }
+        // });
 
         // Tasks dedup
         let mut hs = HashMap::new();
@@ -282,11 +283,20 @@ impl Collections {
         });
         self.tasks = hs.into_values().collect();
 
+        // Variants DeepVariant
+        self.tasks.extend(self.todo_deepvariants());
+
+        // Variants ClairS
+        self.tasks.extend(self.todo_clairs());
+
+        // Variants Nanomonsv
+        self.tasks.extend(self.todo_nanomonsv());
+        
         // Variants aggregation
         self.tasks.extend(self.todo_variants_agg()?);
 
         // ModPileup
-        self.tasks.extend(self.todo_mod_pileup());
+        // self.tasks.extend(self.todo_mod_pileup());
 
         // DMR C diag vs mrd
         self.tasks.extend(self.todo_dmr_c_diag_mrd());
@@ -354,6 +364,74 @@ impl Collections {
         Ok(tasks)
     }
 
+    pub fn todo_deepvariants(&self) -> Vec<CollectionsTasks> {
+        let config = DeepVariantConfig::default();
+        self.bam
+            .bams
+            .iter()
+            .filter_map(|b| {
+                if self.vcf.vcfs.iter().any(|v| {
+                    v.caller == "DeepVariant"
+                        && v.id == b.id
+                        && v.time_point == b.time_point
+                        && v.modified().unwrap_or_default() > b.modified
+                }) {
+                    None
+                } else {
+                    Some(CollectionsTasks::DeepVariant {
+                        config: config.clone(),
+                        id: b.id.clone(),
+                        time_point: b.time_point.clone(),
+                        bam: b.path.to_string_lossy().to_string(),
+                    })
+                }
+            })
+            .collect()
+    }
+
+    pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
+        let config = ClairSConfig::default();
+        self.bam_pairs().iter().filter_map(|(diag, mrd)| {
+            if self.vcf.vcfs.iter().any(|v| {
+                v.caller == "clairs"
+                    && v.id == diag.id
+                    && v.time_point == diag.time_point
+                    && (v.modified().unwrap_or_default() > diag.modified
+                        || v.modified().unwrap_or_default() > mrd.modified)
+            }) {
+                None
+            } else {
+                Some(CollectionsTasks::ClairS {
+                    config: config.clone(),
+                    id: diag.id.clone(),
+                    diag_bam: diag.path.to_string_lossy().to_string(),
+                    mrd_bam: mrd.path.to_string_lossy().to_string(),
+                })
+            }
+        }).collect()
+    }
+
+    pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
+        let config = NanomonSVConfig::default();
+        self.bam_pairs().iter().filter_map(|(diag, mrd)| {
+            if self.vcf.vcfs.iter().any(|v| {
+                v.caller == "nanomonsv"
+                    && v.id == diag.id
+                    && v.time_point == diag.time_point
+                    && (v.modified().unwrap_or_default() > diag.modified
+                        || v.modified().unwrap_or_default() > mrd.modified)
+            }) {
+                None
+            } else {
+                Some(CollectionsTasks::NanomonSV {
+                    config: config.clone(),
+                    id: diag.id.clone(),
+                    diag_bam: diag.path.to_string_lossy().to_string(),
+                    mrd_bam: mrd.path.to_string_lossy().to_string(),
+                })
+            }
+        }).collect()
+    }
     pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
         let config = ModkitConfig::default();
         self.bam
@@ -400,12 +478,13 @@ impl Collections {
                                 }
                             }
                         }
+                        None
+                    } else {
+                        Some(CollectionsTasks::DMRCDiagMrd {
+                            id: id.clone(),
+                            config: config.clone(),
+                        })
                     }
-
-                    Some(CollectionsTasks::DMRCDiagMrd {
-                        id: id.clone(),
-                        config: config.clone(),
-                    })
                 } else {
                     None
                 }
@@ -749,6 +828,10 @@ pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
         let mut collection =
             Collections::new(config.clone()).context("Failed to create new Collections")?;
         collection.todo().context("Failed to get todo tasks")?;
+        collection
+            .tasks
+            .iter()
+            .for_each(|t| info!("Planned task: {t}"));
 
         let n_tasks = collection.tasks.len();
 

+ 2 - 1
src/collection/modbases.rs

@@ -79,7 +79,8 @@ impl ModBases {
                         .file_stem()
                         .unwrap_or_default()
                         .to_string_lossy()
-                        .to_string();
+                        .to_string()
+                        .replace(".bed", "");
                     match Dmr::from_str(&filename) {
                         Ok(mut dmr) => {
                             dmr.path = path.clone();

+ 42 - 2
src/lib.rs

@@ -22,9 +22,9 @@ mod tests {
     use std::{fs, path::Path};
 
     use callers::nanomonsv::nanomonsv_create_pon;
-    use commands::modkit::{bed_methyl, ModkitConfig};
+    use commands::modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig};
     use log::info;
-    use rayon::prelude::*;
+    use rayon::{prelude::*, ThreadPoolBuilder};
 
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
@@ -255,6 +255,46 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn todo_deepv() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.todo_deepvariants().iter().for_each(|t| info!("{t}"));
+        Ok(())
+    }
+
+    #[test]
+    fn todo_clairs() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.todo_clairs().iter().for_each(|t| info!("{t}"));
+        Ok(())
+    }
+
+    #[test]
+    fn run_dmr_par() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        let tasks = collections.todo_dmr_c_diag_mrd();
+        tasks.iter().for_each(|t| info!("{t}"));
+        let len = tasks.len();
+        // let pool = ThreadPoolBuilder::new().num_threads(10).build().unwrap();
+        // pool.install(|| {
+        //     tasks.par_iter().enumerate().for_each(|(i, t)| {
+        //         let config = ModkitConfig {threads: 2,  ..Default::default() };
+        //         if let collection::CollectionsTasks::DMRCDiagMrd { id, .. } = t { let _ = dmr_c_mrd_diag(id, &config); }
+        //         println!("⚡ {i}/{len}");
+        //     });
+        // });
+        Ok(())
+    }
+
     #[test]
     fn run_mod_par() -> anyhow::Result<()> {
         init();