Browse Source

Add run method

Thomas 2 weeks ago
parent
commit
8e8ec39f18
4 changed files with 349 additions and 67 deletions
  1. 163 53
      src/collection/bam_stats.rs
  2. 133 0
      src/collection/pod5.rs
  3. 8 5
      src/commands/samtools.rs
  4. 45 9
      src/pipes/somatic_slurm.rs

+ 163 - 53
src/collection/bam_stats.rs

@@ -143,8 +143,8 @@ impl WGSBamStats {
             info!("Saved stats to: {}", json_path.display());
 
             let qnames_path = Self::qnames_path_from_bam(bam_path);
-            qnames.save(qnames_path)?;
-            info!("Saved qnames to: {}", json_path.display());
+            qnames.save(&qnames_path)?;
+            info!("Saved qnames to: {}", qnames_path.display());
             Ok(stats)
         } else {
             info!("Loading cached stats from: {}", json_path.display());
@@ -155,19 +155,7 @@ impl WGSBamStats {
     /// Derive JSON path from BAM path
     /// e.g., /path/to/34528_norm_hs1.bam -> /path/to/34528_norm_bam_stats.json
     fn json_path_from_bam(bam_path: &Path) -> PathBuf {
-        let dir = bam_path.parent().unwrap_or(Path::new("."));
-        let stem = bam_path
-            .file_stem()
-            .map(|s| s.to_string_lossy())
-            .unwrap_or_default();
-
-        // "34528_norm_hs1" -> "34528_norm"
-        let parts: Vec<&str> = stem.split('_').collect();
-        let prefix = if parts.len() >= 2 {
-            format!("{}_{}", parts[0], parts[1])
-        } else {
-            stem.to_string()
-        };
+        let (dir, prefix) = bam_prefix(bam_path);
 
         dir.join(format!("{}_bam_stats.json", prefix))
     }
@@ -175,19 +163,7 @@ impl WGSBamStats {
     /// Derive JSON path from BAM path
     /// e.g., /path/to/34528_norm_hs1.bam -> /path/to/34528_norm_bam_stats.json
     pub fn qnames_path_from_bam(bam_path: &Path) -> PathBuf {
-        let dir = bam_path.parent().unwrap_or(Path::new("."));
-        let stem = bam_path
-            .file_stem()
-            .map(|s| s.to_string_lossy())
-            .unwrap_or_default();
-
-        // "34528_norm_hs1" -> "34528_norm"
-        let parts: Vec<&str> = stem.split('_').collect();
-        let prefix = if parts.len() >= 2 {
-            format!("{}_{}", parts[0], parts[1])
-        } else {
-            stem.to_string()
-        };
+        let (dir, prefix) = bam_prefix(bam_path);
 
         dir.join(format!("{}_bam_qnames.bin", prefix))
     }
@@ -197,8 +173,38 @@ impl WGSBamStats {
     /// - BAM is newer than JSON
     pub fn open(case_id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
         let bam_path = PathBuf::from(config.solo_bam(case_id, time));
+        let json_path = Self::json_path(case_id, time, config);
+        Self::from_bam_with_cache(&bam_path, &json_path, config)
+    }
 
-        Self::from_bam(&bam_path, config)
+    fn from_bam_with_cache(
+        bam_path: &Path,
+        json_path: &Path,
+        config: &Config,
+    ) -> anyhow::Result<Self> {
+        let should_recompute = if !json_path.exists() {
+            info!("No cached stats found, computing...");
+            true
+        } else if Self::is_bam_newer(bam_path, json_path)? {
+            info!("BAM is newer than cached stats, recomputing...");
+            true
+        } else {
+            false
+        };
+
+        if should_recompute {
+            let (stats, qnames) = Self::from_bam_path(bam_path, config)?;
+            if let Some(parent) = json_path.parent() {
+                fs::create_dir_all(parent)?;
+            }
+            stats.save_json(json_path)?;
+            let qnames_path = Self::qnames_path_from_bam(bam_path); // or case/time version
+            qnames.save(&qnames_path)?;
+            Ok(stats)
+        } else {
+            info!("Loading cached stats from: {}", json_path.display());
+            Self::load_json(json_path)
+        }
     }
 
     /// Check if BAM file is newer than JSON cache
@@ -496,12 +502,29 @@ impl WGSBamStats {
     }
 }
 
+fn bam_prefix(bam_path: &Path) -> (PathBuf, String) {
+    let dir = bam_path.parent().unwrap_or(Path::new(".")).to_path_buf();
+    let stem = bam_path
+        .file_stem()
+        .map(|s| s.to_string_lossy())
+        .unwrap_or_default();
+
+    let parts: Vec<&str> = stem.split('_').collect();
+    let prefix = if parts.len() >= 2 {
+        format!("{}_{}", parts[0], parts[1])
+    } else {
+        stem.to_string()
+    };
+    (dir, prefix)
+}
+
 pub fn median_from_hist(hist: &BTreeMap<u64, u64>, n_reads: u64) -> u64 {
     if n_reads == 0 {
         return 0;
     }
 
-    let mid = n_reads / 2;
+    // 1-based median index
+    let mid = (n_reads + 1) / 2;
     let mut cum = 0u64;
 
     for (&len, &count) in hist {
@@ -573,7 +596,8 @@ pub fn fmt_histogram(
     } else {
         let min_len = data.first().map(|(l, _)| *l).unwrap_or(0);
         let max_len = data.last().map(|(l, _)| *l).unwrap_or(0);
-        let bin_size = ((max_len - min_len) / 20).max(LEN_BIN);
+        let span = max_len.saturating_sub(min_len);
+        let bin_size = (span / 20).max(LEN_BIN);
 
         let mut bins: Vec<(u64, u64, u64)> = Vec::new();
         let mut current_start = min_len;
@@ -623,12 +647,13 @@ pub fn fmt_histogram(
 
 /// Truncate string with ellipsis if too long
 fn truncate_str(s: &str, max_len: usize) -> String {
-    if s.len() <= max_len {
+    if s.chars().count() <= max_len {
         s.to_string()
     } else if max_len <= 3 {
-        s[..max_len].to_string()
+        s.chars().take(max_len).collect()
     } else {
-        format!("{}...", &s[..max_len - 3])
+        let prefix: String = s.chars().take(max_len - 3).collect();
+        format!("{prefix}...")
     }
 }
 
@@ -790,14 +815,44 @@ impl fmt::Display for WGSBamStats {
     }
 }
 
+fn decode_uuid(qname: &str) -> Option<Vec<u8>> {
+    let clean = qname.replace('-', "");
+    if clean.len() != 32 {
+        return None;
+    }
+    hex::decode(&clean).ok()
+}
+
 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
 pub struct QNameSet {
     pub qnames: FxHashSet<Vec<u8>>, // 16 raw UUID bytes each
 }
 
 impl QNameSet {
+    pub fn qnames_path_from_bam(bam_path: &Path) -> PathBuf {
+        let dir = bam_path.parent().unwrap_or(Path::new("."));
+        let stem = bam_path
+            .file_stem()
+            .map(|s| s.to_string_lossy())
+            .unwrap_or_default();
+
+        // "34528_norm_hs1" -> "34528_norm"
+        let parts: Vec<&str> = stem.split('_').collect();
+        let prefix = if parts.len() >= 2 {
+            format!("{}_{}", parts[0], parts[1])
+        } else {
+            stem.to_string()
+        };
+
+        dir.join(format!("{}_bam_qnames.bin", prefix))
+    }
+
     /// Load QNAMEs from a BAM file into memory (no stats, no disk write)
-    pub fn from_bam_in_memory(bam_path: impl AsRef<Path>, bam_min_mapq: u8, n_threads: usize) -> anyhow::Result<Self> {
+    pub fn from_bam_in_memory(
+        bam_path: impl AsRef<Path>,
+        bam_min_mapq: u8,
+        n_threads: usize,
+    ) -> anyhow::Result<Self> {
         let mut bam = rust_htslib::bam::Reader::from_path(bam_path.as_ref())?;
         rust_htslib::bam::Read::set_threads(&mut bam, n_threads)?;
         let mut qs = Self::default();
@@ -805,26 +860,84 @@ impl QNameSet {
         for rec in rust_htslib::bam::Read::rc_records(&mut bam) {
             let r = rec?;
             let flags = r.flags();
-            if flags & SKIP_FLAGS != 0 { continue; }
-            if flags & DUP_FLAG != 0 { continue; }
-            if r.mapq() < bam_min_mapq { continue; }
+            if flags & SKIP_FLAGS != 0 {
+                continue;
+            }
+            if flags & DUP_FLAG != 0 {
+                continue;
+            }
+            if r.mapq() < bam_min_mapq {
+                continue;
+            }
             qs.add_qname_bytes(r.qname());
         }
         Ok(qs)
     }
 
-    /// Add a new QNAME (hex or dashed UUID string), store as 16 bytes
-    pub fn add(&mut self, qname: &str) -> bool {
-        let clean = qname.replace('-', "");
-        if clean.len() != 32 {
-            return false;
+    /// Load from cache if present and up-to-date; otherwise build from BAM and save.
+    pub fn load_or_create(
+        bam_path: impl AsRef<Path>,
+        bam_min_mapq: u8,
+        n_threads: usize,
+    ) -> anyhow::Result<Self> {
+        use std::fs;
+
+        // Normalize bam_path to a PathBuf so we can reuse it
+        let bam_path = bam_path.as_ref().to_path_buf();
+        let cache = Self::qnames_path_from_bam(&bam_path);
+
+        // Remove cache if older than BAM
+        if cache.exists() {
+            let cb = fs::metadata(&cache)?;
+            let ba = fs::metadata(&bam_path)?;
+            let tb = |m: &fs::Metadata| m.modified().ok();
+
+            if let (Some(tcb), Some(tba)) = (tb(&cb), tb(&ba)) {
+                if tcb < tba {
+                    fs::remove_file(&cache)?;
+                }
+            }
         }
-        match hex::decode(&clean) {
-            Ok(bytes) => self.qnames.insert(bytes),
-            Err(_) => false,
+
+        // If cache exists → load it
+        if cache.exists() {
+            return Self::load(&cache);
+        }
+
+        // Otherwise → build from BAM
+        let qs = Self::from_bam_in_memory(&bam_path, bam_min_mapq, n_threads)?;
+
+        // Save atomic: write to tmp then rename
+        if let Some(parent) = cache.parent() {
+            fs::create_dir_all(parent)?;
+        }
+
+        let tmp = cache.with_extension("tmp");
+        qs.clone().save(&tmp)?;
+        fs::rename(&tmp, &cache)?;
+
+        Ok(qs)
+    }
+
+    /// Merge `other` into `self`. Keep existing and insert only new 16-byte QNAMEs.
+    pub fn merge(&mut self, other: &Self) {
+        for q in &other.qnames {
+            self.qnames.insert(q.clone());
         }
     }
 
+    /// Merge two sets into a new one, returning the result
+    pub fn merged(&self, other: &Self) -> Self {
+        let mut out = self.clone();
+        out.merge(other);
+        out
+    }
+
+    /// Add a new QNAME (hex or dashed UUID string), store as 16 bytes
+    pub fn add(&mut self, qname: &str) -> bool {
+        decode_uuid(qname).is_some_and(|bytes| self.qnames.insert(bytes))
+    }
+
     /// Add QNAME directly from BAM record (`&[u8]`), expect ASCII UUID
     pub fn add_qname_bytes(&mut self, qname: &[u8]) -> bool {
         match std::str::from_utf8(qname) {
@@ -834,7 +947,7 @@ impl QNameSet {
     }
 
     /// Save raw bytes (binary format), 16 bytes per QNAME
-    pub fn save(self, path: impl AsRef<Path>) -> anyhow::Result<()> {
+    pub fn save(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
         let mut f = fs::File::create(path.as_ref()).context("write QNameSet")?;
         for q in &self.qnames {
             f.write_all(q)?;
@@ -856,12 +969,9 @@ impl QNameSet {
     }
 
     pub fn exists(&self, qname: &str) -> bool {
-        let clean = qname.replace('-', "");
-        if let Ok(bytes) = hex::decode(&clean) {
-            self.qnames.contains(&bytes)
-        } else {
-            false
-        }
+        decode_uuid(qname)
+            .map(|bytes| self.qnames.contains(&bytes))
+            .unwrap_or(false)
     }
 
     pub fn len(&self) -> usize {

+ 133 - 0
src/collection/pod5.rs

@@ -220,6 +220,10 @@ impl Pod5sRun {
         })
     }
 
+    pub fn add_id_input(&mut self, id_input: IdInput) {
+        self.cases.push(id_input);
+    }
+
     /// Compute summary statistics for the collection.
     pub fn stats(&self) -> Pod5sFlowCellStats {
         if self.pod5s.is_empty() {
@@ -438,6 +442,118 @@ impl Pod5sRuns {
         let data = std::fs::read_to_string(path)?;
         Ok(serde_json::from_str(&data)?)
     }
+
+    /// Add a run from an ONT run directory.
+    ///
+    /// Layout assumed:
+    /// - `bam_pass/` → attached to `bams_pass` if present.
+    /// - `pod5_pass/barcode*/*.pod5` and `pod5_recovered/*.pod5` → added to `pod5s`.
+    pub fn add_run_dir<P: AsRef<Path>>(&mut self, run_dir: P) -> anyhow::Result<()> {
+        let run_dir = run_dir.as_ref();
+
+        // --- collect POD5 directories ---
+
+        let mut pod_dirs: Vec<PathBuf> = Vec::new();
+
+        // pod5_pass/barcode*/ subdirectories
+        let pod5_pass = run_dir.join("pod5_pass");
+        if pod5_pass.is_dir() {
+            for entry in fs::read_dir(&pod5_pass)? {
+                let entry = entry?;
+                if entry.file_type()?.is_dir() {
+                    let name = entry.file_name();
+                    if name.to_string_lossy().starts_with("barcode") {
+                        pod_dirs.push(entry.path());
+                    }
+                }
+            }
+        }
+
+        // pod5_recovered/
+        let pod5_recovered = run_dir.join("pod5_recovered");
+        if pod5_recovered.is_dir() {
+            pod_dirs.push(pod5_recovered);
+        }
+
+        if pod_dirs.is_empty() {
+            anyhow::bail!(
+                "No POD5 directories (pod5_pass/barcode*/ or pod5_recovered/) found under {}",
+                run_dir.display()
+            );
+        }
+
+        // --- bam_pass directory (optional) ---
+
+        let bam_pass_dir = run_dir.join("bam_pass");
+        let bams_pass = if bam_pass_dir.is_dir() {
+            Some(bam_pass_dir)
+        } else {
+            None
+        };
+
+        // Build the new run from all POD5 directories
+        let mut new_run = Pod5sRun::load_from_dirs(&pod_dirs)?;
+        new_run.bams_pass = bams_pass;
+
+        // --- merge logic identical to `add_from_dir` ---
+
+        if let Some(existing) = self.data.iter_mut().find(|r| {
+            r.run_id == new_run.run_id
+                && r.flow_cell_id == new_run.flow_cell_id
+                && r.sequencing_kit == new_run.sequencing_kit
+        }) {
+            // merge bams_pass
+            match (&existing.bams_pass, &new_run.bams_pass) {
+                (Some(old), Some(new)) if old != new => {
+                    anyhow::bail!(
+                        "Conflicting bam_pass for run {} (flowcell {}): \
+                         existing='{}', new='{}'",
+                        existing.run_id,
+                        existing.flow_cell_id,
+                        old.display(),
+                        new.display()
+                    );
+                }
+                (None, Some(new)) => {
+                    existing.bams_pass = Some(new.clone());
+                }
+                _ => {
+                    // (Some, None) or (None, None) or (Some, Some equal): nothing to do
+                }
+            }
+
+            // merge pod5s by file name, skipping duplicates
+            use std::collections::HashSet;
+
+            let mut existing_names: HashSet<String> = existing
+                .pod5s
+                .iter()
+                .filter_map(|p| p.path.file_name().map(|n| n.to_string_lossy().into_owned()))
+                .collect();
+
+            new_run.pod5s.retain(|p| {
+                if let Some(name_os) = p.path.file_name() {
+                    let name = name_os.to_string_lossy().to_string();
+                    if existing_names.contains(&name) {
+                        false
+                    } else {
+                        existing_names.insert(name);
+                        true
+                    }
+                } else {
+                    true
+                }
+            });
+
+            existing.pod5s.extend(new_run.pod5s);
+
+            Ok(())
+        } else {
+            // No matching run: add as new entry
+            self.data.push(new_run);
+            Ok(())
+        }
+    }
 }
 
 #[cfg(test)]
@@ -458,6 +574,23 @@ mod tests {
 
         println!("{stats}");
 
+        Ok(())
+    }
+    #[test]
+    fn load_prom_run() -> anyhow::Result<()> {
+        test_init();
+
+        let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
+
+        let mut runs = Pod5sRuns::new();
+        runs.add_run_dir(dir)?;
+
+        let stats = runs.data[0].stats();
+
+        println!("{runs:#?}");
+
+        println!("{stats}");
+
         Ok(())
     }
 }

+ 8 - 5
src/commands/samtools.rs

@@ -134,10 +134,8 @@ impl super::Command for SamtoolsMerge {
         }
 
         // Load qname sets
-        let into_qnames_path = WGSBamStats::qnames_path_from_bam(&self.into);
-
-        let into_qset = QNameSet::load(&into_qnames_path)
-            .with_context(|| format!("Failed to load qnames for {}", self.into.display()))?;
+        let mut into_qset = QNameSet::load_or_create(&self.into, self.config.bam_min_mapq, 4)
+            .with_context(|| format!("Failed to load or create qnames for {}", self.into.display()))?;
 
         let bam_qset = QNameSet::from_bam_in_memory(&self.bam, self.config.bam_min_mapq, 4)?;
 
@@ -203,6 +201,11 @@ impl super::Command for SamtoolsMerge {
             self.tmp_bam_index.display()
         ))?;
 
+        // Merge qnames
+
+        into_qset.merge(&bam_qset);
+        into_qset.save(QNameSet::qnames_path_from_bam(&self.into))?;
+
         Ok(())
     }
 
@@ -385,8 +388,8 @@ impl super::Command for SamtoolsSort {
             "{} sort -@ {} -o {} {}",
             self.samtools.display(),
             self.threads,
-            self.input_bam.display(),
             self.output_bam.display(),
+            self.input_bam.display(),
         )
     }
 }

+ 45 - 9
src/pipes/somatic_slurm.rs

@@ -507,6 +507,27 @@ pub fn basecall_align_split(
 
     info!("Alignments done ✅ (run: {})", run.run_id);
 
+    for (_key, aligned_bams) in case_bam_map.iter_mut() {
+        for bam in aligned_bams.iter_mut() {
+            let sorted_bam = bam.with_extension("sorted.bam");
+
+            info!(
+                "  Sorting chunk BAM {} → {}",
+                bam.display(),
+                sorted_bam.display()
+            );
+
+            let mut sort_cmd = SamtoolsSort::from_config(config, &bam, &sorted_bam);
+            SlurmRunner::run(&mut sort_cmd)?;
+
+            // remove unsorted chunk
+            fs::remove_file(&bam)?;
+
+            // replace path in case_bam_map with sorted version
+            *bam = sorted_bam;
+        }
+    }
+
     // ---------------------------------------------------------------------
     // Step 5: Merge and organize final BAMs
     // ---------------------------------------------------------------------
@@ -621,15 +642,30 @@ mod tests {
 
         let config = Config::default();
 
-        let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
-        for run in runs.data {
-            basecall_align_split(
-                &run,
-                &config,
-                format!("{}/outputs", TEST_DIR.as_str()).into(),
-                10 * 1024 * 1024,
-            )?;
-        }
+        // let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
+        // for run in runs.data {
+        //     basecall_align_split(
+        //         &run,
+        //         &config,
+        //         format!("{}/outputs", TEST_DIR.as_str()).into(),
+        //         10 * 1024 * 1024,
+        //     )?;
+        // }
+
+        let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
+
+        let mut runs = Pod5sRuns::new();
+        runs.add_run_dir(dir)?;
+
+        runs.data[0].add_id_input(IdInput { case_id: "TEST_02".to_string(), sample_type: "NOR".to_string(), barcode: "NB02".to_string(), flow_cell_id: "".to_string() });
+        runs.data[0].add_id_input(IdInput { case_id: "TEST_04".to_string(), sample_type: "NOR".to_string(), barcode: "NB04".to_string(), flow_cell_id: "".to_string() });
+
+        basecall_align_split(
+            &runs.data[0],
+            &config,
+            format!("{}/outputs", TEST_DIR.as_str()).into(),
+            10 * 1024 * 1024,
+        )?;
 
         Ok(())
     }