Thomas 1 месяц назад
Родитель
Сommit
8ca894aad4
2 измененных файлов с 125 добавлено и 33 удалено
  1. 6 6
      src/collection/bam_stats.rs
  2. 119 27
      src/collection/prom_run.rs

+ 6 - 6
src/collection/bam_stats.rs

@@ -883,6 +883,11 @@ impl WGSBamStats {
             .map(|(_, _, _, _, mean_cov, _)| *mean_cov)
             .map(|(_, _, _, _, mean_cov, _)| *mean_cov)
             .ok_or_else(|| anyhow::anyhow!("{} coverage data not found", contig_name))
             .ok_or_else(|| anyhow::anyhow!("{} coverage data not found", contig_name))
     }
     }
+
+    /// Returns set of source filenames already merged into this BAM.
+    pub fn source_filenames(&self) -> FxHashSet<String> {
+        self.by_fn.iter().map(|f| f.filename.clone()).collect()
+    }
 }
 }
 
 
 // =============================================================================
 // =============================================================================
@@ -1353,12 +1358,7 @@ mod tests {
         test_init();
         test_init();
 
 
         let config = Config::default();
         let config = Config::default();
-        // let stats = WGSBamStats::open("34528", "norm", &config)?;
-        // println!("{stats}");
-        // let stats = WGSBamStats::open("36167", "norm", &config)?;
-        // println!("{stats}");
-        // let stats = WGSBamStats::open("36434", "norm", &config)?;
-        // println!("{stats}");
+
         let stats = WGSBamStats::open("CHAHA", "diag", &config)?;
         let stats = WGSBamStats::open("CHAHA", "diag", &config)?;
         println!("{stats}");
         println!("{stats}");
         let stats = WGSBamStats::open("DUMCO", "diag", &config)?;
         let stats = WGSBamStats::open("DUMCO", "diag", &config)?;

+ 119 - 27
src/collection/prom_run.rs

@@ -67,6 +67,7 @@ use rayon::{
     ThreadPoolBuilder,
     ThreadPoolBuilder,
 };
 };
 use rust_htslib::bam::{self, Read};
 use rust_htslib::bam::{self, Read};
+use rustc_hash::FxHashSet;
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 use uuid::Uuid;
 
 
@@ -416,32 +417,6 @@ impl PromRun {
         Ok((bam_to_case, unmatched))
         Ok((bam_to_case, unmatched))
     }
     }
 
 
-    /// Filters BAMs to only include those from bam_pass directories.
-    fn filter_pass_bams(&self) -> Vec<&PromBam> {
-        self.bams
-            .iter()
-            .filter(|bam| {
-                let path_str = bam.path.to_string_lossy();
-                let is_fail = path_str.contains("bam_fail");
-
-                if is_fail {
-                    info!("Skipping failed read BAM: {}", bam.path.display());
-                    return false;
-                }
-
-                let is_pass = path_str.contains("bam_pass");
-                if !is_pass {
-                    warn!(
-                        "BAM path ambiguous (not in bam_pass or bam_fail), including: {}",
-                        bam.path.display()
-                    );
-                }
-
-                true
-            })
-            .collect()
-    }
-
     /// Process BAM files from a PromethION run: align, sort, merge, and index per case.
     /// Process BAM files from a PromethION run: align, sort, merge, and index per case.
     ///
     ///
     /// This method handles both multiplexed (SQK-NBD114-24) and non-multiplexed (SQK-LSK114)
     /// This method handles both multiplexed (SQK-NBD114-24) and non-multiplexed (SQK-LSK114)
@@ -519,7 +494,19 @@ impl PromRun {
         // =====================================================================
         // =====================================================================
 
 
         let validation_start = Timer::start();
         let validation_start = Timer::start();
-        let pass_bams = self.filter_pass_bams();
+
+        // Filter out BAMs already merged into final outputs
+        let candidate_bams = self.filter_already_merged(config);
+
+        if candidate_bams.is_empty() {
+            info!(
+                "🎉 Run {}: all BAMs already merged — nothing to do",
+                self.protocol_run_id
+            );
+            return Ok(());
+        }
+
+        let pass_bams = filter_pass_bams(&candidate_bams);
 
 
         if pass_bams.is_empty() {
         if pass_bams.is_empty() {
             bail!("No BAM files found in bam_pass directories");
             bail!("No BAM files found in bam_pass directories");
@@ -768,6 +755,85 @@ impl PromRun {
     pub fn total_pod5_size(&self) -> u64 {
     pub fn total_pod5_size(&self) -> u64 {
         self.pod5s.iter().map(|p| p.file_size).sum()
         self.pod5s.iter().map(|p| p.file_size).sum()
     }
     }
+
+    /// Returns BAMs not yet merged into any case's final output.
+    ///
+    /// For each case, checks which source files are already in the final BAM
+    /// and excludes them from processing.
+    fn filter_already_merged(&self, config: &Config) -> Vec<&PromBam> {
+        // Collect all source filenames already merged across all cases
+        let mut all_existing_sources: FxHashSet<String> = FxHashSet::default();
+
+        for case in &self.cases {
+            let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
+
+            if !final_bam_path.exists() {
+                info!(
+                    "No existing final BAM for case {} — all matching inputs are new",
+                    case.case_id
+                );
+                continue;
+            }
+
+            match WGSBamStats::from_bam(&final_bam_path, config) {
+                Ok(stats) => {
+                    let sources = stats.source_filenames();
+                    info!(
+                        "Case {} has {} existing source files",
+                        case.case_id,
+                        sources.len()
+                    );
+                    all_existing_sources.extend(sources);
+                }
+                Err(e) => {
+                    warn!(
+                        "Could not load stats for case {} ({}): {}",
+                        case.case_id,
+                        final_bam_path.display(),
+                        e
+                    );
+                }
+            }
+        }
+
+        if all_existing_sources.is_empty() {
+            info!(
+                "Run {}: no existing sources found, all {} BAMs are new",
+                self.protocol_run_id,
+                self.bams.len()
+            );
+            return self.bams.iter().collect();
+        }
+
+        let (new_bams, skipped): (Vec<_>, Vec<_>) = self.bams.iter().partition(|bam| {
+            let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or("");
+            !all_existing_sources.contains(filename)
+        });
+
+        if !skipped.is_empty() {
+            info!(
+                "Run {}: skipping {} BAM(s) already merged: {}{}",
+                self.protocol_run_id,
+                skipped.len(),
+                skipped
+                    .iter()
+                    .take(3)
+                    .filter_map(|b| b.path.file_name())
+                    .map(|n| n.to_string_lossy())
+                    .collect::<Vec<_>>()
+                    .join(", "),
+                if skipped.len() > 3 { "..." } else { "" }
+            );
+        }
+
+        info!(
+            "Run {}: {} new BAM(s) to process",
+            self.protocol_run_id,
+            new_bams.len()
+        );
+
+        new_bams
+    }
 }
 }
 
 
 /// Kit type classification for workflow branching.
 /// Kit type classification for workflow branching.
@@ -1164,6 +1230,32 @@ impl fmt::Display for PromBam {
     }
     }
 }
 }
 
 
+/// Filters BAMs to only include those from bam_pass directories.
+fn filter_pass_bams<'a>(bams: &[&'a PromBam]) -> Vec<&'a PromBam> {
+    bams.iter()
+        .filter(|bam| {
+            let path_str = bam.path.to_string_lossy();
+            let is_fail = path_str.contains("bam_fail");
+
+            if is_fail {
+                info!("Skipping failed read BAM: {}", bam.path.display());
+                return false;
+            }
+
+            let is_pass = path_str.contains("bam_pass");
+            if !is_pass {
+                warn!(
+                    "BAM path ambiguous (not in bam_pass or bam_fail), including: {}",
+                    bam.path.display()
+                );
+            }
+
+            true
+        })
+        .copied()
+        .collect()
+}
+
 /// Builds the final BAM path for a case.
 /// Builds the final BAM path for a case.
 fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result<PathBuf> {
 fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result<PathBuf> {
     let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
     let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));