Browse Source

Update Pod5sRun

Thomas 2 weeks ago
parent
commit
403d80572c
8 changed files with 1359 additions and 345 deletions
  1. 45 2
      Cargo.lock
  2. 3 0
      Cargo.toml
  3. 4 23
      src/collection/bam.rs
  4. 910 0
      src/collection/bam_stats.rs
  5. 1 0
      src/collection/mod.rs
  6. 87 95
      src/collection/pod5.rs
  7. 34 39
      src/commands/samtools.rs
  8. 275 186
      src/pipes/somatic_slurm.rs

+ 45 - 2
Cargo.lock

@@ -469,7 +469,7 @@ dependencies = [
  "proc-macro2",
  "quote",
  "regex",
- "rustc-hash",
+ "rustc-hash 1.1.0",
  "shlex",
  "syn 2.0.106",
  "which 4.4.2",
@@ -1828,6 +1828,12 @@ version = "0.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
 
+[[package]]
+name = "hex"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+
 [[package]]
 name = "home"
 version = "0.5.11"
@@ -2985,6 +2991,7 @@ dependencies = [
  "flatbuffers",
  "glob",
  "hashbrown 0.15.5",
+ "hex",
  "indicatif 0.17.11",
  "itertools 0.14.0",
  "lazy_static",
@@ -3004,6 +3011,7 @@ dependencies = [
  "regex",
  "rusqlite",
  "rust-htslib 0.50.0",
+ "rustc-hash 2.1.1",
  "semver 1.0.26",
  "serde",
  "serde_json",
@@ -3012,6 +3020,7 @@ dependencies = [
  "toml 0.9.8",
  "tracing",
  "uuid",
+ "walkdir",
 ]
 
 [[package]]
@@ -3797,6 +3806,12 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
 
+[[package]]
+name = "rustc-hash"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
+
 [[package]]
 name = "rustc_version"
 version = "0.1.7"
@@ -3949,6 +3964,15 @@ dependencies = [
  "bytemuck",
 ]
 
+[[package]]
+name = "same-file"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
+dependencies = [
+ "winapi-util",
+]
+
 [[package]]
 name = "scoped_threadpool"
 version = "0.1.9"
@@ -4258,7 +4282,7 @@ dependencies = [
  "data-encoding",
  "debugid",
  "if_chain",
- "rustc-hash",
+ "rustc-hash 1.1.0",
  "rustc_version 0.2.3",
  "serde",
  "serde_json",
@@ -5041,6 +5065,16 @@ version = "0.9.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
 
+[[package]]
+name = "walkdir"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
+dependencies = [
+ "same-file",
+ "winapi-util",
+]
+
 [[package]]
 name = "wasi"
 version = "0.11.1+wasi-snapshot-preview1"
@@ -5189,6 +5223,15 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
 
+[[package]]
+name = "winapi-util"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
+dependencies = [
+ "windows-sys 0.60.2",
+]
+
 [[package]]
 name = "winapi-x86_64-pc-windows-gnu"
 version = "0.4.0"

+ 3 - 0
Cargo.toml

@@ -49,6 +49,9 @@ petgraph = "0.8.1"
 regex = "1.12.2"
 toml = "0.9.8"
 directories = "6.0.0"
+rustc-hash = "2.1.1"
+hex = "0.4.3"
+walkdir = "2.5.0"
 
 [profile.dev]
 opt-level = 0

+ 4 - 23
src/collection/bam.rs

@@ -725,7 +725,7 @@ impl WGSBamStats {
         let mut n_unmapped = 0u64;
         let mut n_duplicates = 0u64;
         let mut n_lowq = 0u64;
-        let mut mapped_lengths = Vec::new();
+        // let mut mapped_lengths = Vec::new();
         let mut lengths_by_tid: HashMap<i32, Vec<u64>> = HashMap::new();
 
         // Main scan loop: apply flag and MAPQ filters.
@@ -759,7 +759,7 @@ impl WGSBamStats {
             let start = r.pos();
             let end = r.reference_end();
             let len = if end > start { (end - start) as u64 } else { 0 };
-            mapped_lengths.push(len);
+            // mapped_lengths.push(len);
             lengths_by_tid.entry(r.tid()).or_default().push(len);
 
             if n_reads.is_multiple_of(500_000) {
@@ -773,6 +773,8 @@ impl WGSBamStats {
             0.0
         };
 
+        let mapped_lengths: Vec<u64> = lengths_by_tid.values().flatten().copied().collect();
+
         // Compute mean, median, N50 (single sort).
         let mut lens = mapped_lengths.clone();
         lens.sort_unstable();
@@ -1128,27 +1130,6 @@ mod tests {
         helpers::test_init,
     };
 
-    #[test]
-    fn bam_stats() -> anyhow::Result<()> {
-        test_init();
-        let case_id = "HOULNE";
-        let time = "diag";
-
-        info!("WGSBamStats::new");
-
-        let r = WGSBamStats::new(case_id, time, &Config::default())?;
-        println!("{r}");
-        assert_eq!(r.all_records, 22417713);
-        let c = r
-            .karyotype
-            .iter()
-            .find_map(|e| if e.2 == "chr9" { Some(e.6) } else { None })
-            .unwrap();
-        assert_eq!(c, 0.8800890147031387);
-
-        Ok(())
-    }
-
     #[test]
     fn bam_new() -> anyhow::Result<()> {
         test_init();

+ 910 - 0
src/collection/bam_stats.rs

@@ -0,0 +1,910 @@
+use std::{
+    collections::BTreeMap,
+    fmt, fs,
+    hash::{Hash, Hasher},
+    io::{Read, Write},
+    path::{Path, PathBuf},
+};
+
+use anyhow::Context;
+use log::info;
+use rust_htslib::{
+    bam::{ext::BamRecordExtensions, record::Aux},
+    htslib::{BAM_FDUP, BAM_FQCFAIL, BAM_FSECONDARY, BAM_FSUPPLEMENTARY, BAM_FUNMAP},
+};
+use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
+use serde::{Deserialize, Serialize};
+
+use crate::config::Config;
+
+const SKIP_FLAGS: u16 = (BAM_FUNMAP | BAM_FSECONDARY | BAM_FQCFAIL | BAM_FSUPPLEMENTARY) as u16;
+const UNMAP_FLAG: u16 = BAM_FUNMAP as u16;
+const DUP_FLAG: u16 = BAM_FDUP as u16;
+const LEN_BIN: u64 = 10; // Bin size for length histogram
+
+#[inline(always)]
+fn hash_bytes(bytes: &[u8]) -> u64 {
+    let mut hasher = FxHasher::default();
+    bytes.hash(&mut hasher);
+    hasher.finish()
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct RGStats {
+    pub rg_id: String,
+    pub n_reads: u64,
+    pub mapped_yield: u64,
+    pub mean_read_length: f64,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct FnStats {
+    pub filename: String,
+    pub n_reads: u64,
+    pub mapped_yield: u64,
+    pub mean_read_length: f64,
+}
+
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct FlagStats {
+    pub paired: u64,
+    pub proper_pair: u64,
+    pub unmap: u64,
+    pub munmap: u64,
+    pub reverse: u64,
+    pub mreverse: u64,
+    pub read1: u64,
+    pub read2: u64,
+    pub secondary: u64,
+    pub qcfail: u64,
+    pub dup: u64,
+    pub supplementary: u64,
+}
+
+impl FlagStats {
+    #[inline(always)]
+    fn update(&mut self, flags: u16) {
+        self.paired += ((flags & 0x1) != 0) as u64;
+        self.proper_pair += ((flags & 0x2) != 0) as u64;
+        self.unmap += ((flags & 0x4) != 0) as u64;
+        self.munmap += ((flags & 0x8) != 0) as u64;
+        self.reverse += ((flags & 0x10) != 0) as u64;
+        self.mreverse += ((flags & 0x20) != 0) as u64;
+        self.read1 += ((flags & 0x40) != 0) as u64;
+        self.read2 += ((flags & 0x80) != 0) as u64;
+        self.secondary += ((flags & 0x100) != 0) as u64;
+        self.qcfail += ((flags & 0x200) != 0) as u64;
+        self.dup += ((flags & 0x400) != 0) as u64;
+        self.supplementary += ((flags & 0x800) != 0) as u64;
+    }
+
+    fn as_vec(&self) -> Vec<(&'static str, u64)> {
+        vec![
+            ("PAIRED", self.paired),
+            ("PROPER_PAIR", self.proper_pair),
+            ("UNMAP", self.unmap),
+            ("MUNMAP", self.munmap),
+            ("REVERSE", self.reverse),
+            ("MREVERSE", self.mreverse),
+            ("READ1", self.read1),
+            ("READ2", self.read2),
+            ("SECONDARY", self.secondary),
+            ("QCFAIL", self.qcfail),
+            ("DUP", self.dup),
+            ("SUPPLEMENTARY", self.supplementary),
+        ]
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct WGSBamStats {
+    pub all_records: u64,
+    pub n_reads: u64,
+    pub mapped_fraction: f64,
+    pub n_unmapped: u64,
+    pub n_duplicates: u64,
+    pub n_lowq: u64,
+    pub mapped_yield: u64,
+    pub mean_read_length: f64,
+    pub median_read_length: u64,
+    pub global_coverage: f64,
+    pub karyotype: Vec<(i32, u64, String, u64, f64, f64)>,
+    pub n50: u64,
+    pub by_lengths: Vec<(u64, u64)>,
+    pub by_rg: Vec<RGStats>,
+    pub by_fn: Vec<FnStats>,
+    pub flag_stats: FlagStats,
+}
+
+impl WGSBamStats {
+    /// Open stats from BAM path (compute if needed)
+    pub fn from_bam(bam_path: impl AsRef<Path>, config: &Config) -> anyhow::Result<Self> {
+        let bam_path = bam_path.as_ref();
+        let json_path = Self::json_path_from_bam(bam_path);
+
+        let should_recompute = if !json_path.exists() {
+            info!("No cached stats found, computing...");
+            true
+        } else if Self::is_bam_newer(bam_path, &json_path)? {
+            info!("BAM is newer than cached stats, recomputing...");
+            true
+        } else {
+            false
+        };
+
+        if should_recompute {
+            let (stats, qnames) = Self::from_bam_path(bam_path, config)?;
+
+            if let Some(parent) = json_path.parent() {
+                fs::create_dir_all(parent)?;
+            }
+
+            stats.save_json(&json_path)?;
+            info!("Saved stats to: {}", json_path.display());
+
+            let qnames_path = Self::qnames_path_from_bam(bam_path);
+            qnames.save(qnames_path)?;
+            info!("Saved qnames to: {}", json_path.display());
+            Ok(stats)
+        } else {
+            info!("Loading cached stats from: {}", json_path.display());
+            Self::load_json(&json_path)
+        }
+    }
+
+    /// Derive JSON path from BAM path
+    /// e.g., /path/to/34528_norm_hs1.bam -> /path/to/34528_norm_bam_stats.json
+    fn json_path_from_bam(bam_path: &Path) -> PathBuf {
+        let dir = bam_path.parent().unwrap_or(Path::new("."));
+        let stem = bam_path
+            .file_stem()
+            .map(|s| s.to_string_lossy())
+            .unwrap_or_default();
+
+        // "34528_norm_hs1" -> "34528_norm"
+        let parts: Vec<&str> = stem.split('_').collect();
+        let prefix = if parts.len() >= 2 {
+            format!("{}_{}", parts[0], parts[1])
+        } else {
+            stem.to_string()
+        };
+
+        dir.join(format!("{}_bam_stats.json", prefix))
+    }
+
+    /// Derive JSON path from BAM path
+    /// e.g., /path/to/34528_norm_hs1.bam -> /path/to/34528_norm_bam_stats.json
+    pub fn qnames_path_from_bam(bam_path: &Path) -> PathBuf {
+        let dir = bam_path.parent().unwrap_or(Path::new("."));
+        let stem = bam_path
+            .file_stem()
+            .map(|s| s.to_string_lossy())
+            .unwrap_or_default();
+
+        // "34528_norm_hs1" -> "34528_norm"
+        let parts: Vec<&str> = stem.split('_').collect();
+        let prefix = if parts.len() >= 2 {
+            format!("{}_{}", parts[0], parts[1])
+        } else {
+            stem.to_string()
+        };
+
+        dir.join(format!("{}_bam_qnames.bin", prefix))
+    }
+
+    /// Open stats from JSON cache, or compute and save if:
+    /// - JSON doesn't exist
+    /// - BAM is newer than JSON
+    pub fn open(case_id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
+        let bam_path = PathBuf::from(config.solo_bam(case_id, time));
+
+        Self::from_bam(&bam_path, config)
+    }
+
+    /// Check if BAM file is newer than JSON cache
+    fn is_bam_newer(bam_path: &Path, json_path: &Path) -> anyhow::Result<bool> {
+        let bam_modified = bam_path
+            .metadata()
+            .with_context(|| format!("Failed to get BAM metadata: {}", bam_path.display()))?
+            .modified()
+            .with_context(|| format!("Failed to get BAM modified time: {}", bam_path.display()))?;
+
+        let json_modified = json_path
+            .metadata()
+            .with_context(|| format!("Failed to get JSON metadata: {}", json_path.display()))?
+            .modified()
+            .with_context(|| {
+                format!("Failed to get JSON modified time: {}", json_path.display())
+            })?;
+
+        Ok(bam_modified > json_modified)
+    }
+
+    /// Get the JSON cache path for a case
+    fn json_path(case_id: &str, time: &str, config: &Config) -> PathBuf {
+        PathBuf::from(&config.result_dir)
+            .join(case_id)
+            .join(time)
+            .join(format!("{}_{}_bam_stats.json", case_id, time))
+    }
+
+    /// Get the Qnames cache path for a case
+    fn qnames_path(case_id: &str, time: &str, config: &Config) -> PathBuf {
+        PathBuf::from(&config.result_dir)
+            .join(case_id)
+            .join(time)
+            .join(format!("{}_{}_bam_qnames.bin", case_id, time))
+    }
+
+    /// Force recompute stats (ignore cache)
+    pub fn recompute(case_id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
+        info!("Recomputing stats for {} {}", case_id, time);
+        let bam_path = PathBuf::from(config.solo_bam(case_id, time));
+        let (stats, qnames) = Self::from_bam_path(&bam_path, config)?;
+        let json_path = Self::json_path(case_id, time, config);
+        let qnames_path = Self::qnames_path(case_id, time, config);
+
+        if let Some(parent) = json_path.parent() {
+            fs::create_dir_all(parent)?;
+        }
+
+        stats.save_json(&json_path)?;
+        info!("Saved stats to: {}", json_path.display());
+
+        qnames.save(qnames_path)?;
+        info!("Saved qnames to: {}", json_path.display());
+        Ok(stats)
+    }
+
+    /// Save stats to JSON file
+    pub fn save_json(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
+        let file = fs::File::create(path.as_ref())
+            .with_context(|| format!("Failed to create JSON file: {}", path.as_ref().display()))?;
+        let writer = std::io::BufWriter::new(file);
+        serde_json::to_writer_pretty(writer, self)
+            .with_context(|| format!("Failed to write JSON to: {}", path.as_ref().display()))?;
+        Ok(())
+    }
+
+    /// Load stats from JSON file
+    pub fn load_json(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let file = fs::File::open(path.as_ref())
+            .with_context(|| format!("Failed to open JSON file: {}", path.as_ref().display()))?;
+        let reader = std::io::BufReader::new(file);
+        let stats: Self = serde_json::from_reader(reader)
+            .with_context(|| format!("Failed to parse JSON from: {}", path.as_ref().display()))?;
+        Ok(stats)
+    }
+
+    /// Compute stats directly from BAM path
+    fn from_bam_path(bam_path: &Path, config: &Config) -> anyhow::Result<(Self, QNameSet)> {
+        use rust_htslib::bam::Read;
+
+        let bam_min_mapq = config.bam_min_mapq;
+        let bam_n_threads = config.bam_n_threads;
+
+        let mut bam = rust_htslib::bam::Reader::from_path(bam_path)?;
+        let h = bam.header().clone();
+        let header = rust_htslib::bam::Header::from_template(&h);
+        bam.set_threads(bam_n_threads as usize)?;
+
+        info!("Parsing BAM file: {}", bam_path.display());
+
+        let mut all_records = 0u64;
+        let mut qnames = QNameSet::default();
+        let mut n_reads = 0u64;
+        let mut n_unmapped = 0u64;
+        let mut n_duplicates = 0u64;
+        let mut n_lowq = 0u64;
+
+        let mut flag_stats = FlagStats::default();
+        let mut length_hist: FxHashMap<u64, u64> = FxHashMap::default();
+        let mut yield_by_tid: FxHashMap<i32, u64> = FxHashMap::default();
+
+        // Hash -> (count, yield)
+        let mut rg_stats: FxHashMap<u64, (u64, u64)> = FxHashMap::default();
+        let mut fn_stats: FxHashMap<u64, (u64, u64)> = FxHashMap::default();
+        // Hash -> name (only store once)
+        let mut rg_names: FxHashMap<u64, String> = FxHashMap::default();
+        let mut fn_names: FxHashMap<u64, String> = FxHashMap::default();
+
+        for rec in bam.rc_records() {
+            all_records += 1;
+            let r = rec.with_context(|| "failed to parse BAM record")?;
+            qnames.add_qname_bytes(r.qname());
+
+            let flags = r.flags();
+
+            flag_stats.update(flags);
+
+            if flags & SKIP_FLAGS != 0 {
+                n_unmapped += ((flags & UNMAP_FLAG) != 0) as u64;
+                continue;
+            }
+
+            if flags & DUP_FLAG != 0 {
+                n_duplicates += 1;
+                continue;
+            }
+
+            if r.mapq() < bam_min_mapq {
+                n_lowq += 1;
+                continue;
+            }
+
+            n_reads += 1;
+
+            let len = {
+                let start = r.pos();
+                let end = r.reference_end();
+                if end > start {
+                    (end - start) as u64
+                } else {
+                    0
+                }
+            };
+
+            *length_hist.entry(len / LEN_BIN * LEN_BIN).or_insert(0) += 1;
+            *yield_by_tid.entry(r.tid()).or_insert(0) += len;
+
+            if let Ok(Aux::String(rg)) = r.aux(b"RG") {
+                let h = hash_bytes(rg.as_bytes());
+                let entry = rg_stats.entry(h).or_insert((0, 0));
+                entry.0 += 1;
+                entry.1 += len;
+                rg_names.entry(h).or_insert_with(|| rg.to_string());
+            }
+
+            if let Ok(Aux::String(fn_tag)) = r.aux(b"fn") {
+                let h = hash_bytes(fn_tag.as_bytes());
+                let entry = fn_stats.entry(h).or_insert((0, 0));
+                entry.0 += 1;
+                entry.1 += len;
+                fn_names.entry(h).or_insert_with(|| fn_tag.to_string());
+            }
+
+            if n_reads.is_multiple_of(500_000) {
+                info!("{}: processed {n_reads} mapped reads", bam_path.display());
+            }
+        }
+
+        let mapped_yield: u64 = length_hist.iter().map(|(len, count)| len * count).sum();
+
+        let mapped_fraction = if all_records > 0 {
+            n_reads as f64 / all_records as f64
+        } else {
+            0.0
+        };
+
+        let mean_read_length = if n_reads > 0 {
+            mapped_yield as f64 / n_reads as f64
+        } else {
+            0.0
+        };
+
+        // Convert to BTreeMap for median/N50 calculation
+        let sorted_hist: BTreeMap<u64, u64> = length_hist.into_iter().collect();
+        let median_read_length = median_from_hist(&sorted_hist, n_reads);
+        let n50 = n50_from_hist(&sorted_hist, mapped_yield);
+
+        let genome = get_genome_sizes(&header)?;
+        let genome_size: u64 = genome.values().sum();
+        let global_coverage = if genome_size > 0 {
+            mapped_yield as f64 / genome_size as f64
+        } else {
+            0.0
+        };
+
+        let mut karyotype: Vec<_> = yield_by_tid
+            .iter()
+            .map(|(tid, &mapped_sum)| {
+                let contig = String::from_utf8(h.tid2name(*tid as u32).to_vec())
+                    .unwrap_or_else(|_| format!("tid_{}", tid));
+                let contig_len = genome.get(&contig).copied().unwrap_or(0);
+
+                let mean_cov = if contig_len > 0 {
+                    mapped_sum as f64 / contig_len as f64
+                } else {
+                    0.0
+                };
+
+                let coverage_ratio = if global_coverage > 0.0 {
+                    mean_cov / global_coverage
+                } else {
+                    0.0
+                };
+
+                (
+                    *tid,
+                    contig_len,
+                    contig,
+                    mapped_sum,
+                    mean_cov,
+                    coverage_ratio,
+                )
+            })
+            .collect();
+
+        karyotype.sort_unstable_by_key(|(tid, _, _, _, _, _)| *tid);
+
+        // Convert hash-based stats to final structs
+        let mut by_rg: Vec<RGStats> = rg_stats
+            .into_iter()
+            .map(|(h, (count, yield_sum))| {
+                let rg_id = rg_names
+                    .get(&h)
+                    .cloned()
+                    .unwrap_or_else(|| "unknown".into());
+                RGStats {
+                    rg_id,
+                    n_reads: count,
+                    mapped_yield: yield_sum,
+                    mean_read_length: if count > 0 {
+                        yield_sum as f64 / count as f64
+                    } else {
+                        0.0
+                    },
+                }
+            })
+            .collect();
+        by_rg.sort_by(|a, b| a.rg_id.cmp(&b.rg_id));
+
+        let mut by_fn: Vec<FnStats> = fn_stats
+            .into_iter()
+            .map(|(h, (count, yield_sum))| {
+                let filename = fn_names
+                    .get(&h)
+                    .cloned()
+                    .unwrap_or_else(|| "unknown".into());
+                FnStats {
+                    filename,
+                    n_reads: count,
+                    mapped_yield: yield_sum,
+                    mean_read_length: if count > 0 {
+                        yield_sum as f64 / count as f64
+                    } else {
+                        0.0
+                    },
+                }
+            })
+            .collect();
+        by_fn.sort_by(|a, b| a.filename.cmp(&b.filename));
+
+        let by_lengths: Vec<_> = sorted_hist.into_iter().collect();
+
+        Ok((
+            Self {
+                all_records,
+                n_reads,
+                mapped_fraction,
+                n_unmapped,
+                n_duplicates,
+                n_lowq,
+                mapped_yield,
+                mean_read_length,
+                median_read_length,
+                global_coverage,
+                karyotype,
+                n50,
+                by_lengths,
+                by_rg,
+                by_fn,
+                flag_stats,
+            },
+            qnames,
+        ))
+    }
+}
+
+pub fn median_from_hist(hist: &BTreeMap<u64, u64>, n_reads: u64) -> u64 {
+    if n_reads == 0 {
+        return 0;
+    }
+
+    let mid = n_reads / 2;
+    let mut cum = 0u64;
+
+    for (&len, &count) in hist {
+        cum += count;
+        if cum >= mid {
+            return len;
+        }
+    }
+
+    0
+}
+
+pub fn n50_from_hist(hist: &BTreeMap<u64, u64>, mapped_yield: u64) -> u64 {
+    if mapped_yield == 0 {
+        return 0;
+    }
+
+    let half = mapped_yield / 2;
+    let mut cum = 0u64;
+
+    for (&len, &count) in hist.iter().rev() {
+        cum += len * count;
+        if cum >= half {
+            return len;
+        }
+    }
+
+    0
+}
+
+fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<FxHashMap<String, u64>> {
+    let mut sizes = FxHashMap::default();
+    for (_, records) in header.to_hashmap() {
+        for record in records {
+            if let (Some(sn), Some(ln)) = (record.get("SN"), record.get("LN")) {
+                if let Ok(len) = ln.parse::<u64>() {
+                    sizes.insert(sn.clone(), len);
+                }
+            }
+        }
+    }
+    Ok(sizes)
+}
+
+/// Display a horizontal histogram with ASCII bars
+pub fn fmt_histogram(
+    f: &mut fmt::Formatter<'_>,
+    data: &[(u64, u64)],
+    title: &str,
+    max_width: usize,
+) -> fmt::Result {
+    if data.is_empty() {
+        return Ok(());
+    }
+
+    writeln!(f)?;
+    writeln!(f, "  {}:", title)?;
+
+    let total: u64 = data.iter().map(|(_, c)| *c).sum();
+
+    // Group into bins for display (max ~20 rows)
+    let display_data: Vec<(String, u64, f64)> = if data.len() <= 25 {
+        data.iter()
+            .map(|(len, count)| {
+                let pct = 100.0 * *count as f64 / total as f64;
+                (format!("{:>6}", len), *count, pct)
+            })
+            .collect()
+    } else {
+        let min_len = data.first().map(|(l, _)| *l).unwrap_or(0);
+        let max_len = data.last().map(|(l, _)| *l).unwrap_or(0);
+        let bin_size = ((max_len - min_len) / 20).max(LEN_BIN);
+
+        let mut bins: Vec<(u64, u64, u64)> = Vec::new();
+        let mut current_start = min_len;
+        let mut current_count = 0u64;
+
+        for (len, count) in data {
+            if *len >= current_start + bin_size && current_count > 0 {
+                bins.push((current_start, current_start + bin_size - 1, current_count));
+                current_start += bin_size;
+                current_count = 0;
+            }
+            while *len >= current_start + bin_size {
+                current_start += bin_size;
+            }
+            current_count += count;
+        }
+        if current_count > 0 {
+            bins.push((current_start, current_start + bin_size - 1, current_count));
+        }
+
+        bins.iter()
+            .map(|(start, end, count)| {
+                let pct = 100.0 * *count as f64 / total as f64;
+                (format!("{:>6}-{:<6}", start, end), *count, pct)
+            })
+            .collect()
+    };
+
+    let max_count_display = display_data.iter().map(|(_, c, _)| *c).max().unwrap_or(1);
+
+    for (label, count, pct) in &display_data {
+        let bar_len = (*count as f64 / max_count_display as f64 * max_width as f64) as usize;
+        let bar: String = "█".repeat(bar_len);
+        writeln!(
+            f,
+            "    {} │{:<width$}│ {:>10} ({:>5.1}%)",
+            label,
+            bar,
+            count,
+            pct,
+            width = max_width
+        )?;
+    }
+
+    Ok(())
+}
+
+/// Truncate string with ellipsis if too long
+fn truncate_str(s: &str, max_len: usize) -> String {
+    if s.len() <= max_len {
+        s.to_string()
+    } else if max_len <= 3 {
+        s[..max_len].to_string()
+    } else {
+        format!("{}...", &s[..max_len - 3])
+    }
+}
+
+impl fmt::Display for WGSBamStats {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        writeln!(f, "BAM statistics summary:")?;
+        writeln!(f, "  All BAM records:         {}", self.all_records)?;
+        writeln!(f, "  Unmapped reads:          {}", self.n_unmapped)?;
+        writeln!(f, "  Duplicate reads:         {}", self.n_duplicates)?;
+        writeln!(f, "  Low MAPQ reads:          {}", self.n_lowq)?;
+        writeln!(f, "  Counted (mapped) reads:  {}", self.n_reads)?;
+        writeln!(f, "  Mapped fraction:         {:.4}", self.mapped_fraction)?;
+        writeln!(
+            f,
+            "  Mapped yield [Gb]:       {:.2}",
+            self.mapped_yield as f64 / 1e9
+        )?;
+        writeln!(f, "  Mean read length [bp]:   {:.2}", self.mean_read_length)?;
+        writeln!(f, "  Median read length [bp]: {}", self.median_read_length)?;
+        writeln!(f, "  N50 [bp]:                {}", self.n50)?;
+        writeln!(f, "  Global mean coverage:    {:.2}x", self.global_coverage)?;
+
+        // Flag stats
+        writeln!(f)?;
+        writeln!(f, "  Flag statistics:")?;
+        let flag_data = self.flag_stats.as_vec();
+        let max_flag = flag_data.iter().map(|(_, c)| *c).max().unwrap_or(1);
+        for (name, count) in &flag_data {
+            let pct = if self.all_records > 0 {
+                100.0 * *count as f64 / self.all_records as f64
+            } else {
+                0.0
+            };
+            let bar_len = (*count as f64 / max_flag as f64 * 30.0) as usize;
+            let bar: String = "█".repeat(bar_len);
+            writeln!(
+                f,
+                "    {:<14} │{:<30}│ {:>12} ({:>6.2}%)",
+                name, bar, count, pct
+            )?;
+        }
+
+        // Read groups
+        writeln!(f)?;
+        writeln!(f, "  Read groups ({}):", self.by_rg.len())?;
+        if !self.by_rg.is_empty() {
+            let max_rg = self.by_rg.iter().map(|r| r.n_reads).max().unwrap_or(1);
+            let max_rg_len = self.by_rg.iter().map(|r| r.rg_id.len()).max().unwrap_or(40);
+
+            writeln!(
+                f,
+                "    {:<width$} {:>12} {:>14} {:>10}",
+                "RG",
+                "Reads",
+                "Yield [Mb]",
+                "MeanLen",
+                width = max_rg_len
+            )?;
+            for rg in &self.by_rg {
+                let bar_len = (rg.n_reads as f64 / max_rg as f64 * 20.0) as usize;
+                let bar: String = "▓".repeat(bar_len);
+                writeln!(
+                    f,
+                    "    {:<width$} {:>12} {:>14.2} {:>10.1} │{:<20}│",
+                    rg.rg_id,
+                    rg.n_reads,
+                    rg.mapped_yield as f64 / 1e6,
+                    rg.mean_read_length,
+                    bar,
+                    width = max_rg_len
+                )?;
+            }
+        }
+
+        // Source files
+        writeln!(f)?;
+        writeln!(f, "  Source files ({}):", self.by_fn.len())?;
+        if !self.by_fn.is_empty() {
+            let max_fn = self.by_fn.iter().map(|r| r.n_reads).max().unwrap_or(1);
+            writeln!(
+                f,
+                "    {:<30} {:>12} {:>14} {:>10}",
+                "Filename", "Reads", "Yield [Mb]", "MeanLen"
+            )?;
+            for fn_stat in &self.by_fn {
+                let bar_len = (fn_stat.n_reads as f64 / max_fn as f64 * 20.0) as usize;
+                let bar: String = "▓".repeat(bar_len);
+                writeln!(
+                    f,
+                    "    {:<30} {:>12} {:>14.2} {:>10.1} │{:<20}│",
+                    truncate_str(&fn_stat.filename, 30),
+                    fn_stat.n_reads,
+                    fn_stat.mapped_yield as f64 / 1e6,
+                    fn_stat.mean_read_length,
+                    bar
+                )?;
+            }
+        }
+
+        // Length histogram
+        fmt_histogram(f, &self.by_lengths, "Read length distribution [bp]", 40)?;
+
+        // Per-contig stats
+        writeln!(f)?;
+        writeln!(f, "  Per-contig stats:")?;
+        if !self.karyotype.is_empty() {
+            // Separate chrM from rest
+            let (chrm, regular): (Vec<_>, Vec<_>) = self
+                .karyotype
+                .iter()
+                .partition(|(_, _, contig, _, _, _)| contig == "chrM");
+
+            // Max coverage excluding chrM for histogram scaling
+            let max_cov = regular
+                .iter()
+                .map(|(_, _, _, _, c, _)| *c)
+                .fold(0.0f64, |a, b| a.max(b));
+
+            writeln!(
+                f,
+                "    {:<6} {:>12} {:<12} {:>14} {:>8}",
+                "TID", "Length", "Name", "MappedYield", "Cov"
+            )?;
+
+            // Display regular contigs with histogram
+            for (tid, contig_len, contig, mapped_sum, mean_cov, _) in &regular {
+                let bar_len = if max_cov > 0.0 {
+                    (*mean_cov / max_cov * 20.0) as usize
+                } else {
+                    0
+                };
+                let bar: String = "▒".repeat(bar_len);
+                writeln!(
+                    f,
+                    "    {:<6} {:>12} {:<12} {:>14} {:>7.1}x │{:<20}│",
+                    tid,
+                    contig_len,
+                    truncate_str(contig, 12),
+                    mapped_sum,
+                    mean_cov,
+                    bar
+                )?;
+            }
+
+            // Display chrM without histogram bar
+            for (tid, contig_len, contig, mapped_sum, mean_cov, _) in &chrm {
+                writeln!(
+                    f,
+                    "    {:<6} {:>12} {:<12} {:>14} {:>7.1}x",
+                    tid,
+                    contig_len,
+                    truncate_str(contig, 12),
+                    mapped_sum,
+                    mean_cov,
+                )?;
+            }
+        }
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct QNameSet {
+    pub qnames: FxHashSet<Vec<u8>>, // 16 raw UUID bytes each
+}
+
+impl QNameSet {
+    /// Load QNAMEs from a BAM file into memory (no stats, no disk write)
+    pub fn from_bam_in_memory(bam_path: impl AsRef<Path>, bam_min_mapq: u8, n_threads: usize) -> anyhow::Result<Self> {
+        let mut bam = rust_htslib::bam::Reader::from_path(bam_path.as_ref())?;
+        rust_htslib::bam::Read::set_threads(&mut bam, n_threads)?;
+        let mut qs = Self::default();
+
+        for rec in rust_htslib::bam::Read::rc_records(&mut bam) {
+            let r = rec?;
+            let flags = r.flags();
+            if flags & SKIP_FLAGS != 0 { continue; }
+            if flags & DUP_FLAG != 0 { continue; }
+            if r.mapq() < bam_min_mapq { continue; }
+            qs.add_qname_bytes(r.qname());
+        }
+        Ok(qs)
+    }
+
+    /// Add a new QNAME (hex or dashed UUID string), store as 16 bytes
+    pub fn add(&mut self, qname: &str) -> bool {
+        let clean = qname.replace('-', "");
+        if clean.len() != 32 {
+            return false;
+        }
+        match hex::decode(&clean) {
+            Ok(bytes) => self.qnames.insert(bytes),
+            Err(_) => false,
+        }
+    }
+
+    /// Add QNAME directly from BAM record (`&[u8]`), expect ASCII UUID
+    pub fn add_qname_bytes(&mut self, qname: &[u8]) -> bool {
+        match std::str::from_utf8(qname) {
+            Ok(s) => self.add(s), // uses your hex/dash logic
+            Err(_) => false,
+        }
+    }
+
+    /// Save raw bytes (binary format), 16 bytes per QNAME
+    pub fn save(self, path: impl AsRef<Path>) -> anyhow::Result<()> {
+        let mut f = fs::File::create(path.as_ref()).context("write QNameSet")?;
+        for q in &self.qnames {
+            f.write_all(q)?;
+        }
+        Ok(())
+    }
+
+    /// Load raw bytes, assume 16 bytes per QNAME
+    pub fn load(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let mut buf = Vec::new();
+        fs::File::open(path.as_ref())?.read_to_end(&mut buf)?;
+        let mut qs = Self::default();
+
+        for chunk in buf.chunks_exact(16) {
+            qs.qnames.insert(chunk.to_vec());
+        }
+
+        Ok(qs)
+    }
+
+    pub fn exists(&self, qname: &str) -> bool {
+        let clean = qname.replace('-', "");
+        if let Ok(bytes) = hex::decode(&clean) {
+            self.qnames.contains(&bytes)
+        } else {
+            false
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.qnames.len()
+    }
+
+    pub fn clear(&mut self) {
+        self.qnames.clear()
+    }
+
+    /// Intersect two sets and return:
+    /// (intersection_set, overlapping_fraction)
+    pub fn intersect(&self, other: &Self) -> (Self, f64) {
+        let mut inter = Self::default();
+        for q in &self.qnames {
+            if other.qnames.contains(q) {
+                inter.qnames.insert(q.clone());
+            }
+        }
+
+        let frac = if self.qnames.is_empty() {
+            0.0
+        } else {
+            inter.qnames.len() as f64 / self.qnames.len() as f64
+        };
+
+        (inter, frac)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::helpers::test_init;
+
+    #[test]
+    fn bam_stats() -> anyhow::Result<()> {
+        test_init();
+
+        let config = Config::default();
+        let stats = WGSBamStats::open("36167", "norm", &config)?;
+        println!("{stats}");
+
+        Ok(())
+    }
+}

+ 1 - 0
src/collection/mod.rs

@@ -5,3 +5,4 @@ pub mod modbases;
 pub mod pod5;
 pub mod run;
 pub mod vcf;
+pub(crate) mod bam_stats;

+ 87 - 95
src/collection/pod5.rs

@@ -102,95 +102,111 @@ pub struct Pod5sRun {
     pub sequencing_kit: String,
     pub cases: Vec<IdInput>,
     pub pod5s: Vec<Pod5>,
-    pub dir: PathBuf,
+    pub bams_pass: Option<PathBuf>,
 }
 
 impl Pod5sRun {
     /// Load all `.pod5` files from a directory and build a collection.
-    ///
-    /// The directory is scanned using `list_files_with_ext`.  
-    /// Each file is parsed using `Pod5::from_path`.
     pub fn load_from_dir<P: AsRef<Path>>(dir: P) -> anyhow::Result<Self> {
-        let pod_paths = list_files_with_ext(dir.as_ref(), "pod5")?;
-
-        if pod_paths.is_empty() {
-            anyhow::bail!(
-                "No .pod5 files found in directory: {}",
-                dir.as_ref().display()
-            );
-        }
+        Self::load_from_dirs(std::iter::once(dir))
+    }
 
-        let mut pod5s = Vec::with_capacity(pod_paths.len());
+    /// Load all `.pod5` files from multiple directories and build a collection.
+    ///
+    /// All files must share the same `run_id`, `flow_cell_id` and `sequencing_kit`.
+    pub fn load_from_dirs<P, I>(dirs: I) -> anyhow::Result<Self>
+    where
+        P: AsRef<Path>,
+        I: IntoIterator<Item = P>,
+    {
+        let mut pod5s = Vec::new();
         let mut flow_cell_id: Option<String> = None;
         let mut sequencing_kit: Option<String> = None;
         let mut run_id: Option<String> = None;
-        let mut skipped = 0;
-
-        for p in pod_paths.iter() {
-            let pod = match Pod5::from_path(&p) {
-                Ok(pod) => pod,
-                Err(e) => {
-                    // Log corrupted files at debug level
-                    log::debug!("Skipping corrupted POD5 file '{}': {}", p.display(), e);
-                    skipped += 1;
-                    continue;
-                }
-            };
+        let mut skipped = 0usize;
+        let mut any_pod_files = false;
 
-            // run_id uniqueness check
-            match &run_id {
-                None => run_id = Some(pod.protocol_run_id.clone()),
-                Some(exp) if &pod.protocol_run_id != exp => {
-                    anyhow::bail!(
-                        "Mixed run IDs: expected '{}', found '{}' (file: {})",
-                        exp,
-                        pod.protocol_run_id,
-                        pod.path.display()
-                    );
-                }
-                _ => {}
+        for dir in dirs {
+            let dir = dir.as_ref();
+            let pod_paths = list_files_with_ext(dir, "pod5")?;
+
+            if pod_paths.is_empty() {
+                log::debug!("No .pod5 files found in directory: {}", dir.display());
+                continue;
             }
 
-            // flow_cell_id uniqueness check
-            match &flow_cell_id {
-                None => flow_cell_id = Some(pod.flow_cell_id.clone()),
-                Some(exp) if &pod.flow_cell_id != exp => {
-                    anyhow::bail!(
-                        "Mixed flow cells: expected '{}', found '{}' (file: {})",
-                        exp,
-                        pod.flow_cell_id,
-                        pod.path.display()
-                    );
+            any_pod_files = true;
+
+            for p in pod_paths.iter() {
+                let pod = match Pod5::from_path(p) {
+                    Ok(pod) => pod,
+                    Err(e) => {
+                        log::debug!("Skipping corrupted POD5 file '{}': {}", p.display(), e);
+                        skipped += 1;
+                        continue;
+                    }
+                };
+
+                // run_id uniqueness check
+                match &run_id {
+                    None => run_id = Some(pod.protocol_run_id.clone()),
+                    Some(exp) if &pod.protocol_run_id != exp => {
+                        anyhow::bail!(
+                            "Mixed run IDs: expected '{}', found '{}' (file: {})",
+                            exp,
+                            pod.protocol_run_id,
+                            pod.path.display()
+                        );
+                    }
+                    _ => {}
+                }
+
+                // flow_cell_id uniqueness check
+                match &flow_cell_id {
+                    None => flow_cell_id = Some(pod.flow_cell_id.clone()),
+                    Some(exp) if &pod.flow_cell_id != exp => {
+                        anyhow::bail!(
+                            "Mixed flow cells: expected '{}', found '{}' (file: {})",
+                            exp,
+                            pod.flow_cell_id,
+                            pod.path.display()
+                        );
+                    }
+                    _ => {}
                 }
-                _ => {}
-            }
 
-            // sequencing_kit uniqueness check
-            match &sequencing_kit {
-                None => sequencing_kit = Some(pod.sequencing_kit.clone()),
-                Some(exp) if &pod.sequencing_kit != exp => {
-                    anyhow::bail!(
-                        "Mixed sequencing kits: expected '{}', found '{}' (file: {})",
-                        exp,
-                        pod.sequencing_kit,
-                        pod.path.display()
-                    );
+                // sequencing_kit uniqueness check
+                match &sequencing_kit {
+                    None => sequencing_kit = Some(pod.sequencing_kit.clone()),
+                    Some(exp) if &pod.sequencing_kit != exp => {
+                        anyhow::bail!(
+                            "Mixed sequencing kits: expected '{}', found '{}' (file: {})",
+                            exp,
+                            pod.sequencing_kit,
+                            pod.path.display()
+                        );
+                    }
+                    _ => {}
                 }
-                _ => {}
+
+                pod5s.push(pod);
             }
+        }
 
-            pod5s.push(pod);
+        if !any_pod_files {
+            anyhow::bail!("No .pod5 files found in any directory");
         }
 
-        let run_id = run_id.ok_or(anyhow::anyhow!("No valid pod5 files loaded"))?;
-        let flow_cell_id = flow_cell_id.ok_or(anyhow::anyhow!("No valid pod5 files loaded"))?;
-        let sequencing_kit = sequencing_kit.ok_or(anyhow::anyhow!("No valid pod5 files loaded"))?;
+        let run_id = run_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
+        let flow_cell_id =
+            flow_cell_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
+        let sequencing_kit =
+            sequencing_kit.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
 
         if skipped > 0 {
             log::debug!(
-                "Skipped {} corrupted POD5 file(s) in directory '{}'",
-                skipped,
-                dir.as_ref().display()
+                "Skipped {} corrupted POD5 file(s) across directories",
+                skipped
             );
         }
 
@@ -200,7 +216,7 @@ impl Pod5sRun {
             sequencing_kit,
             cases: Vec::new(),
             pod5s,
-            dir: dir.as_ref().into(),
+            bams_pass: None,
         })
     }
 
@@ -383,33 +399,10 @@ impl Pod5sRuns {
         Ok(())
     }
 
-    /// Load metadata JSON and restore each run via scanning its directory.
-    ///
-    /// Rebuilds `pod5s` by calling `load_from_dir` for each `dir`,
-    /// but preserves the `cases` from the saved JSON.
+    /// Load metadata from a saved JSON file.
     pub fn load_json<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
-        let raw: Pod5sRuns = serde_json::from_str(&fs::read_to_string(path)?)?;
-
-        let mut rebuilt = Pod5sRuns::new();
-        for saved_run in raw.data {
-            // Rebuild pod5s from directory
-            match Pod5sRun::load_from_dir(&saved_run.dir) {
-                Ok(mut fresh_run) => {
-                    // Preserve the cases from the saved JSON
-                    fresh_run.cases = saved_run.cases;
-                    rebuilt.data.push(fresh_run);
-                }
-                Err(e) => {
-                    // Log error but continue with other runs
-                    log::warn!(
-                        "Failed to reload run from '{}': {}",
-                        saved_run.dir.display(),
-                        e
-                    );
-                }
-            }
-        }
-        Ok(rebuilt)
+        let data = std::fs::read_to_string(path)?;
+        Ok(serde_json::from_str(&data)?)
     }
 }
 
@@ -424,7 +417,6 @@ mod tests {
         test_init();
 
         let dir = "/mnt/beegfs02/scratch/t_steimle/prom_runs/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/pod5_recovered";
-        let saved_runs = "~/data/seq_runs_cases.json";
 
         let flow_cell = Pod5sRun::load_from_dir(dir)?;
         println!("{:#?}", flow_cell.pod5s.first());

+ 34 - 39
src/commands/samtools.rs

@@ -1,4 +1,5 @@
 use std::{
+    collections::HashSet,
     fs,
     path::{Path, PathBuf},
 };
@@ -8,7 +9,7 @@ use log::info;
 use uuid::Uuid;
 
 use crate::{
-    collection::bam::bam_composition,
+    collection::bam_stats::{QNameSet, WGSBamStats},
     commands::{SbatchRunner, SlurmParams},
     config::Config,
 };
@@ -87,6 +88,8 @@ pub struct SamtoolsMerge {
     tmp_bam: PathBuf,
     /// Temporary location for the original `into` BAI (created in `init()`).
     tmp_bam_index: PathBuf,
+    /// Config
+    config: Config,
 }
 
 impl SamtoolsMerge {
@@ -108,6 +111,7 @@ impl SamtoolsMerge {
             into: into.as_ref().into(),
             tmp_bam: PathBuf::new(),
             tmp_bam_index: PathBuf::new(),
+            config: config.clone(),
         }
     }
 }
@@ -121,40 +125,34 @@ impl super::Command for SamtoolsMerge {
     /// - Ensures a BAI index exists for `into`.
     /// - Moves `into` and its BAI to temporary files in the same directory.
     fn init(&mut self) -> anyhow::Result<()> {
-        // Collect RG IDs from destination BAM.
-        let composition_a: Vec<String> =
-            bam_composition(self.into.to_string_lossy().as_ref(), 20_000)?
-                .iter()
-                .map(|(rg, _, _)| rg.clone())
-                .collect();
-
-        // Collect RG IDs from source BAM.
-        let composition_b: Vec<String> =
-            bam_composition(self.bam.to_string_lossy().as_ref(), 20_000)?
-                .iter()
-                .map(|(rg, _, _)| rg.clone())
-                .collect();
-
-        // Check for overlapping RGs between the two BAM files.
-        let n_id = composition_a
-            .iter()
-            .filter(|id| composition_b.contains(id))
-            .count();
-
-        if n_id > 0 {
-            anyhow::bail!(
-                "{} {} are already merged, reads with the same RG in the destination BAM.",
-                self.into.display(),
-                self.bam.display()
-            );
-        }
-
         if !self.into.exists() {
-            anyhow::bail!("BAM file doesn't exists for: {}", self.into.display());
+            anyhow::bail!("BAM file doesn't exist: {}", self.into.display());
         }
 
         if !self.bam.exists() {
-            anyhow::bail!("BAM file doesn't exists for: {}", self.bam.display());
+            anyhow::bail!("BAM file doesn't exist: {}", self.bam.display());
+        }
+
+        // Load qname sets
+        let into_qnames_path = WGSBamStats::qnames_path_from_bam(&self.into);
+
+        let into_qset = QNameSet::load(&into_qnames_path)
+            .with_context(|| format!("Failed to load qnames for {}", self.into.display()))?;
+
+        let bam_qset = QNameSet::from_bam_in_memory(&self.bam, self.config.bam_min_mapq, 4)?;
+
+        // Intersect qname sets to detect shared reads
+        let (intersection, frac_into) = into_qset.intersect(&bam_qset);
+        let n_shared = intersection.len();
+
+        if n_shared > 0 {
+            anyhow::bail!(
+                "Cannot merge: {} and {} share {} read name(s) ({:.4} fraction of `into`)",
+                self.into.display(),
+                self.bam.display(),
+                n_shared,
+                frac_into,
+            );
         }
 
         let dir = self.into.parent().context(format!(
@@ -488,14 +486,11 @@ mod tests {
         assert_eq!(captured_output.stderr, String::new());
 
         info!("Mergin both BAM.");
-        let mut idx = SamtoolsMerge {
-            bin: config.align.samtools_bin,
-            threads: config.align.samtools_merge_threads,
-            bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(),
-            into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(),
-            tmp_bam: "".into(),
-            tmp_bam_index: "".into(),
-        };
+        let mut idx = SamtoolsMerge::from_config(
+            &config,
+            format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
+        );
 
         let captured_output = SlurmRunner::run(&mut idx)?;
         assert_eq!(captured_output.stderr, String::new());

+ 275 - 186
src/pipes/somatic_slurm.rs

@@ -8,7 +8,7 @@ use crate::{
     commands::{
         dorado::{DoradoAlign, DoradoBasecall},
         run_many_sbatch,
-        samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSplit},
+        samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSort, SamtoolsSplit},
         SlurmRunner,
     },
     config::Config,
@@ -58,28 +58,88 @@ fn map_sample_type_to_dir(sample_type: &str, config: &Config) -> String {
     }
 }
 
-/// Basecall, align, and split POD5 files from a sequencing run into organized BAM files.
+use std::collections::HashSet;
+use std::path::Path;
+
+use walkdir::WalkDir;
+
+// Recursively collect all .bam files under a root directory
+fn collect_bams_recursively(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
+    let mut bams = Vec::new();
+
+    for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
+        let path = entry.path();
+        if !path.is_file() {
+            continue;
+        }
+        if path.extension().and_then(|e| e.to_str()) == Some("bam") {
+            bams.push(path.to_path_buf());
+        }
+    }
+
+    Ok(bams)
+}
+
+/// Run the full Nanopore POD5 → aligned, sorted and indexed BAM pipeline for a single run.
+///
+/// This function orchestrates the end-to-end processing of a `Pod5sRun`,
+/// starting from raw POD5 files and producing per–case, per–sample-type
+/// coordinate–sorted, indexed BAM files in `config.result_dir`.
+///
+/// High-level steps:
+///
+/// 1. **Basecalling**  
+///    All POD5 files from the run are basecalled with `DoradoBasecall` into a
+///    single temporary BAM in `tmp_dir`.
+///
+/// 2. **Splitting by read group**  
+///    The basecalled BAM is split by read group using `SamtoolsSplit`, producing
+///    one BAM per barcode/read group in a temporary split directory.
+///
+/// 3. **Barcode → case mapping and alignment job setup**  
+///    Each split BAM is:
+///    - matched to a `IdInput` case via its (normalized) barcode,  
+///    - assigned to a per–case / per–sample-type output directory under
+///      `config.result_dir`, and  
+///    - turned into a `DoradoAlign` job that aligns the reads to the configured
+///      reference.  
+///
+///    The paths of aligned BAMs are tracked in `case_bam_map` for later merging.
+///    Split BAMs below `min_bam_size` are skipped.
+///
+/// 4. **Alignment**  
+///    All `DoradoAlign` jobs are submitted via `run_many_sbatch`. At the end of
+///    this step, each case / sample-type may have one or more temporary aligned
+///    BAMs in `tmp_dir`.
+///
+/// 5. **Merge, sort and index final BAMs**  
+///    For each case with a barcode:
+///    - all aligned BAMs for a given `(case_id, sample_type)` key are merged
+///      into a single BAM at:
+///      `config.result_dir/<case_id>/<sample_type_dir>/<case>_<sample_type>_<reference>.bam`  
+///      using `SamtoolsMerge`,
+///    - the merged BAM is coordinate–sorted with `SamtoolsSort`, and  
+///    - the sorted BAM is indexed with `SamtoolsIndex`.
+///
+/// Finally, all temporary split files and directories are removed.  
+/// The function fails early if:
+/// - the run has no cases,  
+/// - no BAMs are eligible for alignment, or  
+/// - any of the Slurm / samtools / filesystem operations fail.
 ///
 /// # Arguments
-/// * `run` - The Pod5sRun containing metadata and POD5 files to process
-/// * `config` - Pipeline configuration with paths and settings
-/// * `tmp_dir` - Temporary directory for intermediate files
-/// * `min_bam_size` - Minimum BAM file size to process (smaller files are skipped)
 ///
-/// # Output Structure
-/// ```
-/// {result_dir}/{case_id}/{sample_type_dir}/{case_id}_{sample_type_dir}_{reference_name}.bam
-/// ```
-/// Where `sample_type_dir` is either `normal_name` or `tumoral_name` from config.
+/// * `run` – Description of the POD5 run, associated cases and barcodes.
+/// * `config` – Global pipeline configuration (paths, samtools/dorado binaries,
+///   Slurm parameters, reference name, etc.).
+/// * `tmp_dir` – Directory for intermediate BAMs (basecalled, split and aligned).
+/// * `min_bam_size` – Minimum size (in bytes) for a split BAM to be kept and aligned.
+///
+/// # Errors
 ///
-/// # Process
-/// 1. Validates that run has associated cases
-/// 2. Basecalls all POD5 files in the run into a single BAM
-/// 3. Splits the BAM by read group (RG)
-/// 4. Maps barcodes to cases and aligns BAMs
-/// 5. Merges and organizes final BAMs per case
-/// 6. Indexes final BAMs
-/// 7. Cleans up intermediate files
+/// Returns an error if the run has no associated cases, if no BAM files are
+/// eligible for alignment, or if any external command (Dorado, samtools) or
+/// filesystem operation fails.
 pub fn basecall_align_split(
     run: &Pod5sRun,
     config: &Config,
@@ -134,39 +194,148 @@ pub fn basecall_align_split(
         }
     }
 
-    // Step 1: Basecalling
-    let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
-    info!(
-        "Step 1/4: Basecalling into: {}",
-        tmp_basecalled_bam.display()
-    );
+    // ---------------------------------------------------------------------
+    // Step 1 & 2: Decide what to basecall, and collect existing BAMs
+    // ---------------------------------------------------------------------
 
-    let mut cmd = DoradoBasecall::from_config(config, run.dir.clone(), tmp_basecalled_bam.clone());
-    let _out = SlurmRunner::run(&mut cmd)?;
-    info!("Basecalled ✅ (run: {})", run.run_id);
+    let mut split_bams: Vec<PathBuf> = Vec::new(); // All BAMs used for step 3
 
-    // Step 2: Split by read group
-    let tmp_split_dir = tmp_dir.join(format!("split_{}", Uuid::new_v4()));
-    info!(
-        "Step 2/4: Splitting by read group into: {}",
-        tmp_split_dir.display()
-    );
+    // 1) Gather existing BAMs if bam_pass_root is provided
+    let mut existing_bam_stems: HashSet<String> = HashSet::new();
 
-    let mut cmd = SamtoolsSplit::from_config(config, &tmp_basecalled_bam, tmp_split_dir.clone());
-    let _out = SlurmRunner::run(&mut cmd)?;
-    fs::remove_file(&tmp_basecalled_bam).context(format!(
-        "Failed to remove temporary basecalled BAM: {}",
-        tmp_basecalled_bam.display()
-    ))?;
-    info!("Split ✅ (run: {})", run.run_id);
+    if let Some(ref root) = run.bams_pass {
+        info!("Using existing unaligned BAMs from: {}", root.display());
 
-    let split_bams = list_files_with_ext(&tmp_split_dir, "bam").context(format!(
-        "Error listing BAM files in: {}",
-        tmp_split_dir.display()
-    ))?;
+        let existing_bams = collect_bams_recursively(root).context(format!(
+            "Error collecting BAM files under bam_pass_root: {}",
+            root.display()
+        ))?;
+
+        for bam in &existing_bams {
+            if let Some(stem) = bam.file_stem().and_then(|s| s.to_str()) {
+                existing_bam_stems.insert(stem.to_owned());
+            }
+        }
+
+        info!(
+            "Found {} existing BAM file(s) under bam_pass_root",
+            existing_bams.len()
+        );
+
+        // These BAMs are already per-barcode, so we can treat them as "split" BAMs.
+        split_bams.extend(existing_bams);
+    }
+
+    // 2) Decide which POD5s still need basecalling
+    let mut pod5s_to_basecall: Vec<PathBuf> = Vec::new();
+
+    for pod in &run.pod5s {
+        let stem = match pod.path.file_stem().and_then(|s| s.to_str()) {
+            Some(s) => s,
+            None => {
+                warn!(
+                    "Could not determine stem for POD5 '{}', will basecall it",
+                    pod.path.display()
+                );
+                pod5s_to_basecall.push(pod.path.clone());
+                continue;
+            }
+        };
+
+        if existing_bam_stems.contains(stem) {
+            info!(
+                "Found existing BAM for POD5 stem '{}', skipping basecalling for {}",
+                stem,
+                pod.path.display()
+            );
+        } else {
+            pod5s_to_basecall.push(pod.path.clone());
+        }
+    }
+
+    // 3) If there are POD5s to basecall, create a tmp POD5 dir and basecall + split
+    let mut tmp_split_dir: Option<PathBuf> = None;
+    let mut tmp_pod5_dir: Option<PathBuf> = None;
+
+    if !pod5s_to_basecall.is_empty() {
+        info!(
+            "Will basecall {} POD5 file(s) without corresponding BAM",
+            pod5s_to_basecall.len()
+        );
+
+        // Create temporary directory containing only the POD5s that need basecalling
+        let local_pod5_dir = tmp_dir.join(format!("pod5s_{}", Uuid::new_v4()));
+        fs::create_dir_all(&local_pod5_dir).context(format!(
+            "Failed to create temporary POD5 directory: {}",
+            local_pod5_dir.display()
+        ))?;
+
+        // Symlink POD5s into that dir (assuming Linux)
+        for src in &pod5s_to_basecall {
+            let file_name = src
+                .file_name()
+                .ok_or_else(|| anyhow::anyhow!("Invalid POD5 filename: {}", src.display()))?;
+            let dst = local_pod5_dir.join(file_name);
+            std::os::unix::fs::symlink(src, &dst).with_context(|| {
+                format!(
+                    "Failed to symlink POD5 '{}' to '{}'",
+                    src.display(),
+                    dst.display()
+                )
+            })?;
+        }
+
+        tmp_pod5_dir = Some(local_pod5_dir.clone());
+
+        let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
+        info!(
+            "Step 1/5: Basecalling {} POD5 file(s) into: {}",
+            pod5s_to_basecall.len(),
+            tmp_basecalled_bam.display()
+        );
+
+        // DoradoBasecall must accept a directory of POD5s here
+        let mut cmd =
+            DoradoBasecall::from_config(config, local_pod5_dir.clone(), tmp_basecalled_bam.clone());
+        let _out = SlurmRunner::run(&mut cmd)?;
+        info!("Basecalled ✅ (run: {})", run.run_id);
+
+        // Split by read group
+        let local_split_dir = tmp_dir.join(format!("split_{}", Uuid::new_v4()));
+        info!(
+            "Step 2/5: Splitting basecalled BAM by read group into: {}",
+            local_split_dir.display()
+        );
 
+        let mut cmd =
+            SamtoolsSplit::from_config(config, &tmp_basecalled_bam, local_split_dir.clone());
+        let _out = SlurmRunner::run(&mut cmd)?;
+        fs::remove_file(&tmp_basecalled_bam).context(format!(
+            "Failed to remove temporary basecalled BAM: {}",
+            tmp_basecalled_bam.display()
+        ))?;
+        info!("Split ✅ (run: {})", run.run_id);
+
+        let basecalled_split_bams = list_files_with_ext(&local_split_dir, "bam").context(
+            format!("Error listing BAM files in: {}", local_split_dir.display()),
+        )?;
+
+        tmp_split_dir = Some(local_split_dir);
+
+        // Union: existing BAMs + newly split BAMs
+        split_bams.extend(basecalled_split_bams);
+    } else {
+        info!("All POD5s have corresponding BAMs; no basecalling will be performed.");
+    }
+
+    // ---------------------------------------------------------------------
     // Step 3: Map BAMs to cases and prepare alignment
-    info!("Step 3/6: Mapping BAMs to cases and preparing alignment");
+    // ---------------------------------------------------------------------
+
+    info!(
+        "Step 3/5: Mapping {} BAM(s) to cases and preparing alignment",
+        split_bams.len()
+    );
 
     let mut align_jobs = Vec::new();
     let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new(); // case_id → [aligned_bams]
@@ -251,7 +420,7 @@ pub fn basecall_align_split(
     }
 
     let n_jobs = align_jobs.len();
-    info!("Step 4/6: Running alignment of {} BAM files", n_jobs);
+    info!("Step 4/5: Running alignment of {} BAM files", n_jobs);
 
     if n_jobs == 0 {
         bail!(
@@ -267,172 +436,92 @@ pub fn basecall_align_split(
 
     info!("Alignments done ✅ (run: {})", run.run_id);
 
+    // ---------------------------------------------------------------------
     // Step 5: Merge and organize final BAMs
-    info!("Step 5/6: Merging and organizing final BAM files");
+    // ---------------------------------------------------------------------
 
-    for (case_key, aligned_bams) in case_bam_map.iter() {
-        let parts: Vec<&str> = case_key.split('_').collect();
-        if parts.len() < 2 {
-            warn!("Invalid case key format: {}", case_key);
+    info!("Step 5/5: Merging, sorting and indexing final BAM files");
+
+    for case in &run.cases {
+        if case.barcode.is_empty() {
             continue;
         }
 
-        let case_id = parts[0];
-        let sample_type_dir = parts[1];
+        let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
+        let key = format!("{}_{}", case.case_id, sample_type_dir);
+
+        let Some(aligned_bams) = case_bam_map.get(&key) else {
+            warn!("No aligned BAMs found for case {}", case.case_id);
+            continue;
+        };
 
         let final_bam_path = PathBuf::from(&config.result_dir)
-            .join(case_id)
-            .join(sample_type_dir)
+            .join(&case.case_id)
+            .join(&sample_type_dir)
             .join(format!(
-                "{}_{}_{}. bam",
-                case_id, sample_type_dir, config.reference_name
+                "{}_{}_{}.bam",
+                case.case_id, sample_type_dir, config.reference_name
             ));
 
-        // Check if a BAM already exists from previous runs
-        let existing_bam = if final_bam_path.exists() {
-            info!(
-                "  Found existing BAM for case {}: {}",
-                case_id,
-                final_bam_path.display()
-            );
-            Some(final_bam_path.clone())
-        } else {
-            None
-        };
-
-        // Determine which BAMs need to be merged
-        let mut all_bams_to_merge = aligned_bams.clone();
-        if let Some(ref existing) = existing_bam {
-            // Add existing BAM to the list
-            all_bams_to_merge.push(existing.clone());
-            info!(
-                "  Will merge {} new BAM(s) with existing BAM",
-                aligned_bams.len()
-            );
-        }
+        fs::create_dir_all(final_bam_path.parent().unwrap())?;
 
-        if all_bams_to_merge.len() == 1 {
-            // Single BAM and no existing - just move it
-            if existing_bam.is_none() {
-                fs::rename(&aligned_bams[0], &final_bam_path).context(format!(
-                    "Failed to move BAM to: {}",
-                    final_bam_path.display()
-                ))?;
-                info!("  Created: {}", final_bam_path.display());
+        for bam in aligned_bams {
+            if final_bam_path.exists() {
+                // Merge into existing - clean_up() removes source bam
+                let mut merge_cmd = SamtoolsMerge::from_config(config, bam, &final_bam_path);
+                SlurmRunner::run(&mut merge_cmd)?;
             } else {
-                // Existing BAM is already in place, nothing to do
-                info!("  Using existing BAM: {}", final_bam_path.display());
-            }
-        } else {
-            // Multiple BAMs - need to index first, then merge iteratively
-            info!(
-                "  Merging {} BAMs for case {} ({})",
-                all_bams_to_merge.len(),
-                case_id,
-                sample_type_dir
-            );
-
-            // Index all BAMs first
-            for bam in all_bams_to_merge.iter() {
-                // Check if index already exists
-                let index_path = PathBuf::from(format!("{}.bai", bam.display()));
-                if !index_path.exists() {
-                    info!("    Indexing: {}", bam.display());
-                    let mut index_cmd = SamtoolsIndex {
-                        bin: config.align.samtools_bin.clone(),
-                        threads: config.align.samtools_view_threads,
-                        bam: bam.to_string_lossy().to_string(),
-                    };
-                    SlurmRunner::run(&mut index_cmd).context(format!(
-                        "Failed to index BAM before merge: {}",
-                        bam.display()
-                    ))?;
-                } else {
-                    info!("    Index exists: {}", index_path.display());
-                }
-            }
-
-            // Start with the first BAM as base
-            let mut current_bam = all_bams_to_merge[0].clone();
-
-            // Merge remaining BAMs one by one into the current BAM
-            for (i, bam_to_merge) in all_bams_to_merge[1..].iter().enumerate() {
-                info!(
-                    "    Merging {} into {}",
-                    bam_to_merge.display(),
-                    current_bam.display()
-                );
-
-                let mut merge_cmd = SamtoolsMerge::from_config(config, bam_to_merge, &current_bam);
-
-                SlurmRunner::run(&mut merge_cmd).context(format!(
-                    "Failed to merge BAM {} into {}",
-                    bam_to_merge.display(),
-                    current_bam.display()
-                ))?;
-
-                // After merge, the result is in current_bam (it was updated in place)
-                // The merge command handles cleanup of bam_to_merge
+                // First BAM becomes the base
+                fs::rename(bam, &final_bam_path)?;
+
+                // Index the final BAM
+                let mut index_cmd = SamtoolsIndex {
+                    bin: config.align.samtools_bin.clone(),
+                    threads: config.align.samtools_view_threads,
+                    bam: final_bam_path.to_string_lossy().to_string(),
+                };
+                SlurmRunner::run(&mut index_cmd)?;
             }
-
-            // Move final merged BAM to destination (or it's already there if merging into existing)
-            if current_bam != final_bam_path {
-                fs::rename(&current_bam, &final_bam_path).context(format!(
-                    "Failed to move merged BAM to: {}",
-                    final_bam_path.display()
-                ))?;
-            }
-
-            // Clean up individual aligned BAMs (but not the existing one if it was used)
-            // Note: SamtoolsMerge already cleaned up the source BAMs during merge
-            for bam in aligned_bams {
-                let _ = fs::remove_file(bam);
-                let _ = fs::remove_file(format!("{}.bai", bam.display()));
-            }
-
-            info!("  Created (merged): {}", final_bam_path.display());
         }
-    }
 
-    // Step 6: Index all final BAMs
-    info!("Step 6/6: Indexing final BAM files");
+        // Sort the merged final BAM
+        let sorted_tmp = final_bam_path.with_extension("sorted.bam");
 
-    for (case_key, _) in case_bam_map.iter() {
-        let parts: Vec<&str> = case_key.split('_').collect();
-        if parts.len() < 2 {
-            continue;
-        }
+        info!(
+            "  Sorting final BAM for case {} → {}",
+            case.case_id,
+            final_bam_path.display()
+        );
 
-        let case_id = parts[0];
-        let sample_type_dir = parts[1];
+        let mut sort_cmd = SamtoolsSort::from_config(config, &final_bam_path, &sorted_tmp);
+        SlurmRunner::run(&mut sort_cmd)?;
 
-        let final_bam_path = PathBuf::from(&config.result_dir)
-            .join(case_id)
-            .join(sample_type_dir)
-            .join(format!(
-                "{}_{}_{}. bam",
-                case_id, sample_type_dir, config.reference_name
-            ));
+        // Replace unsorted BAM with sorted BAM
+        fs::rename(&sorted_tmp, &final_bam_path)?;
 
-        info!("  Indexing: {}", final_bam_path.display());
+        // Index the **sorted** final BAM
         let mut index_cmd = SamtoolsIndex {
             bin: config.align.samtools_bin.clone(),
             threads: config.align.samtools_view_threads,
             bam: final_bam_path.to_string_lossy().to_string(),
         };
-        SlurmRunner::run(&mut index_cmd).context(format!(
-            "Failed to index final BAM: {}",
-            final_bam_path.display()
-        ))?;
+        SlurmRunner::run(&mut index_cmd)?;
+
+        info!("  Output: {}", final_bam_path.display());
     }
-    info!("  Indexing done ✅");
 
-    // Step 7: Cleanup
+    // Cleanup temporary dirs we created
     info!("Cleaning up temporary files");
-    fs::remove_dir_all(&tmp_split_dir).context(format!(
-        "Failed to remove split directory: {}",
-        tmp_split_dir.display()
-    ))?;
+    if let Some(d) = tmp_split_dir {
+        fs::remove_dir_all(&d)
+            .context(format!("Failed to remove split directory: {}", d.display()))?;
+    }
+    if let Some(d) = tmp_pod5_dir {
+        fs::remove_dir_all(&d).context(format!(
+            "Failed to remove temporary POD5 directory: {}",
+            d.display()
+        ))?;
+    }
 
     info!(
         "Pipeline completed ✅ for run: {} (flowcell: {})",