Thomas 1 year ago
parent
commit
a01bc92739
3 changed files with 38 additions and 20 deletions
  1. 6 2
      src/collection/bam.rs
  2. 15 14
      src/collection/mod.rs
  3. 17 4
      src/lib.rs

+ 6 - 2
src/collection/bam.rs

@@ -141,12 +141,16 @@ impl BamCollection {
             .collect()
     }
 
-    pub fn by_id_completed(&self) -> Vec<Bam> {
+    pub fn by_id_completed(&self, min_diag_cov: f32, min_mrd_cov: f32) -> Vec<Bam> {
         self.bams
             .iter()
             .filter(|b| matches!(b.bam_type, BamType::WGS))
             .filter(|b| match &b.cramino {
-                Some(cramino) => cramino.mean_length >= 15.0,
+                Some(cramino) => match b.time_point.as_str() {
+                    "diag" => cramino.mean_length >= min_diag_cov as f64,
+                    "mrd" => cramino.mean_length >= min_mrd_cov as f64,
+                    _ => false
+                },
                 _ => false,
             })
             .cloned()

+ 15 - 14
src/collection/mod.rs

@@ -25,11 +25,12 @@ pub mod pod5;
 pub mod variants;
 pub mod vcf;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct CollectionsConfig {
     pub pod_dir: String,
     pub corrected_fc_path: String,
     pub result_dir: String,
+    pub dict_file: String,
     pub min_diag_cov: f32,
     pub min_mrd_cov: f32,
 }
@@ -40,6 +41,7 @@ impl Default for CollectionsConfig {
             pod_dir: "/data/run_data".to_string(),
             corrected_fc_path: "/data/flow_cells.tsv".to_string(),
             result_dir: "/data/longreads_basic_pipe".to_string(),
+            dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
             min_diag_cov: 15.0,
             min_mrd_cov: 10.0,
         }
@@ -47,7 +49,7 @@ impl Default for CollectionsConfig {
 }
 #[derive(Debug)]
 pub struct Collections {
-    pub result_dir: String,
+    pub config: CollectionsConfig,
     pub pod5: Pod5Collection,
     pub bam: BamCollection,
     pub vcf: VcfCollection,
@@ -55,7 +57,8 @@ pub struct Collections {
 }
 
 impl Collections {
-    pub fn new(pod_dir: &str, corrected_fc_path: &str, result_dir: &str) -> anyhow::Result<Self> {
+    pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
+        let CollectionsConfig { pod_dir, corrected_fc_path, result_dir, .. } = &config;
         let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
         let bam = BamCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
@@ -65,11 +68,11 @@ impl Collections {
             bam,
             vcf,
             tasks: Vec::new(),
-            result_dir: result_dir.to_string(),
+            config,
         })
     }
 
-    pub fn todo(&mut self, min_diag_cov: f32, min_mrd_cov: f32) -> anyhow::Result<()> {
+    pub fn todo(&mut self) -> anyhow::Result<()> {
         info!("Looking for base calling tasks...");
 
         let mut tasks = Vec::new();
@@ -112,8 +115,8 @@ impl Collections {
         //     .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
 
         // Whole scan
-        for bam in self.bam.by_id_completed() {
-            let scan_dir = format!("{}/{}/{}/scan", self.result_dir, bam.id, bam.time_point);
+        for bam in self.bam.by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov) {
+            let scan_dir = format!("{}/{}/{}/scan", &self.config.result_dir, bam.id, bam.time_point);
             if PathBuf::from(&scan_dir).exists() {
                 let dir_mod = fs::metadata(&scan_dir)?.modified()?;
                 if bam.file_metadata.modified()? > dir_mod {
@@ -181,8 +184,8 @@ impl Collections {
                 self.bam.get(&id, "mrd").first(),
             ) {
                 if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
-                    if diag_cramino.mean_coverage >= min_diag_cov.into()
-                        && mrd_cramino.mean_coverage >= min_mrd_cov.into()
+                    if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
+                        && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
                     {
                         let caller_time: Vec<(&str, &str)> = vcf_by_id
                             .iter()
@@ -244,7 +247,7 @@ impl Collections {
     pub fn run(&mut self) -> anyhow::Result<()> {
         // self.tasks.reverse();
         if self.tasks.is_empty() {
-            self.todo(15.0, 10.0);
+            self.todo()?;
             if self.tasks.is_empty() {
                 return Ok(());
             } else {
@@ -379,11 +382,9 @@ pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
     let mut last_n = Vec::new();
     loop {
         let mut collection = Collections::new(
-            &config.pod_dir,
-            &config.corrected_fc_path,
-            &config.result_dir,
+            config.clone()
         )?;
-        collection.todo(config.min_diag_cov, config.min_mrd_cov);
+        collection.todo()?;
         if collection.tasks.is_empty() {
             warn!("All results are update");
             break;

+ 17 - 4
src/lib.rs

@@ -119,11 +119,9 @@ mod tests {
     #[test_log::test]
     fn todo() -> anyhow::Result<()> {
         let mut collections = Collections::new(
-            "/data/run_data",
-            "/data/flow_cells.tsv",
-            "/data/longreads_basic_pipe",
+            CollectionsConfig::default()
         )?;
-        collections.todo(15.0, 10.0);
+        collections.todo()?;
         println!("{:#?}", collections.tasks);
         println!("{}", collections.tasks.len());
         Ok(())
@@ -149,4 +147,19 @@ mod tests {
         let o = format!("/data/longreads_basic_pipe/{id}/diag/nanomonsv/{id}_diag_nanomonsv_PASSED.vcf.gz");
         bcftools_keep_pass(&i, &o, config).unwrap();
     }
+
+    #[test_log::test]
+    fn bam_ok() -> anyhow::Result<()> {
+        let  collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        let mut res: Vec<_> = collections.bam.by_id_completed(15.0, 10.0).iter().map(|b| {
+            (b.id.to_string(), b.time_point.to_string())
+        }).collect();
+        res.sort_by_key(|b| b.1.clone());
+        res.sort_by_key(|b| b.0.clone());
+        
+        res.iter().for_each(|(id, tp)| println!("{id}\t{tp}"));
+        Ok(())
+    }
 }