Browse Source

Full somatic POD5s basecalling and alignment based on case_id

Thomas 2 weeks ago
parent
commit
3c19136885
4 changed files with 458 additions and 60 deletions
  1. 2 2
      pandora-config.example.toml
  2. 25 2
      src/commands/samtools.rs
  3. 425 51
      src/pipes/somatic_slurm.rs
  4. 6 5
      src/slurm_helpers.rs

+ 2 - 2
pandora-config.example.toml

@@ -8,7 +8,7 @@
 pod_dir = "/data/run_data"
 
 # Root directory where all results will be written.
-result_dir = "/data/longreads_basic_pipe"
+result_dir = "/mnt/beegfs02/scratch/t_steimle/data/wgs"
 
 # Temporary directory used when unarchiving input data.
 unarchive_tmp_dir = "/data/unarchived"
@@ -68,7 +68,7 @@ panels = [
 tumoral_name = "diag"
 
 # Normal sample label.
-normal_name = "mrd"
+normal_name = "norm"
 
 # BAM tag name used for haplotagged reads.
 haplotagged_bam_tag_name = "HP"

+ 25 - 2
src/commands/samtools.rs

@@ -89,6 +89,29 @@ pub struct SamtoolsMerge {
     tmp_bam_index: PathBuf,
 }
 
+impl SamtoolsMerge {
+    /// Create a new `SamtoolsMerge` from configuration.
+    ///
+    /// # Arguments
+    /// * `config` - Pipeline configuration containing samtools settings
+    /// * `bam` - Source BAM file to merge into the destination
+    /// * `into` - Destination BAM file (will be merged with `bam`)
+    ///
+    /// # Notes
+    /// - `tmp_bam` and `tmp_bam_index` are initialized as empty and will be set during `init()`
+    /// - Both BAMs must exist and be indexed before calling `init()`
+    pub fn from_config(config: &Config, bam: impl AsRef<Path>, into: impl AsRef<Path>) -> Self {
+        Self {
+            bin: config.align.samtools_bin.clone(),
+            threads: config.align.samtools_merge_threads,
+            bam: bam.as_ref().into(),
+            into: into.as_ref().into(),
+            tmp_bam: PathBuf::new(),
+            tmp_bam_index: PathBuf::new(),
+        }
+    }
+}
+
 impl super::Command for SamtoolsMerge {
     /// Prepare the filesystem and validate preconditions for the merge.
     ///
@@ -100,14 +123,14 @@ impl super::Command for SamtoolsMerge {
     fn init(&mut self) -> anyhow::Result<()> {
         // Collect RG IDs from destination BAM.
         let composition_a: Vec<String> =
-            bam_composition(self.into.to_string_lossy().as_ref(), 20000)?
+            bam_composition(self.into.to_string_lossy().as_ref(), 20_000)?
                 .iter()
                 .map(|(rg, _, _)| rg.clone())
                 .collect();
 
         // Collect RG IDs from source BAM.
         let composition_b: Vec<String> =
-            bam_composition(self.bam.to_string_lossy().as_ref(), 20000)?
+            bam_composition(self.bam.to_string_lossy().as_ref(), 20_000)?
                 .iter()
                 .map(|(rg, _, _)| rg.clone())
                 .collect();

+ 425 - 51
src/pipes/somatic_slurm.rs

@@ -1,101 +1,475 @@
-use anyhow::Context;
+use anyhow::{bail, Context};
+use log::{info, warn};
 use std::{collections::HashMap, fs, path::PathBuf};
 use uuid::Uuid;
 
-use log::{info, warn};
-
 use crate::{
+    collection::{flowcells::IdInput, pod5::Pod5sRun},
     commands::{
         dorado::{DoradoAlign, DoradoBasecall},
         run_many_sbatch,
-        samtools::SamtoolsSplit,
+        samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSplit},
         SlurmRunner,
     },
     config::Config,
     helpers::{extract_barcode, list_files_with_ext},
 };
 
-// const TEN_MB: u64 = 10 * 1024 * 1024;
+/// Normalize barcode strings for matching.
+///
+/// Converts various barcode formats to a canonical form:
+/// - "NB02", "nb02", "02", "2" → "2"
+/// - Strips leading zeros and "NB"/"nb" prefixes
+/// - Returns lowercase string for case-insensitive matching
+fn normalize_barcode(barcode: &str) -> String {
+    let normalized = barcode
+        .to_lowercase()
+        .trim_start_matches("nb")
+        .trim_start_matches("barcode")
+        .trim()
+        .to_string();
+
+    // Try to parse as number to remove leading zeros, then convert back
+    if let Ok(num) = normalized.parse::<u32>() {
+        num.to_string()
+    } else {
+        normalized
+    }
+}
+
+/// Extract and normalize barcode from filename.
+///
+/// Extracts the barcode number from the filename and normalizes it
+/// to match against IdInput barcodes.
+fn extract_and_normalize_barcode(filename: &str) -> Option<String> {
+    extract_barcode(filename).map(|num| num.to_string())
+}
+
+/// Map sample type to output directory name (normal vs tumor).
+///
+/// Maps various representations to either `normal_name` or `tumoral_name` from config:
+/// - "NOR", "nor", "normal", "MRD", "mrd" → normal_name
+/// - Everything else → tumoral_name
+fn map_sample_type_to_dir(sample_type: &str, config: &Config) -> String {
+    let normalized = sample_type.to_lowercase();
+    match normalized.as_str() {
+        "nor" | "normal" | "mrd" => config.normal_name.clone(),
+        _ => config.tumoral_name.clone(),
+    }
+}
+
+/// Basecall, align, and split POD5 files from a sequencing run into organized BAM files.
+///
+/// # Arguments
+/// * `run` - The Pod5sRun containing metadata and POD5 files to process
+/// * `config` - Pipeline configuration with paths and settings
+/// * `tmp_dir` - Temporary directory for intermediate files
+/// * `min_bam_size` - Minimum BAM file size to process (smaller files are skipped)
+///
+/// # Output Structure
+/// ```
+/// {result_dir}/{case_id}/{sample_type_dir}/{case_id}_{sample_type_dir}_{reference_name}.bam
+/// ```
+/// Where `sample_type_dir` is either `normal_name` or `tumoral_name` from config.
+///
+/// # Process
+/// 1. Validates that run has associated cases
+/// 2. Basecalls all POD5 files in the run into a single BAM
+/// 3. Splits the BAM by read group (RG)
+/// 4. Maps barcodes to cases and aligns BAMs
+/// 5. Merges and organizes final BAMs per case
+/// 6. Indexes final BAMs
+/// 7. Cleans up intermediate files
 pub fn basecall_align_split(
-    pod5_dir: PathBuf,
+    run: &Pod5sRun,
+    config: &Config,
     tmp_dir: PathBuf,
     min_bam_size: u64,
 ) -> anyhow::Result<()> {
-    let config = Config::default();
+    // Validate run has cases
+    if run.cases.is_empty() {
+        bail!(
+            "Run {} (flowcell: {}) has no associated cases. Please add cases before processing.",
+            run.run_id,
+            run.flow_cell_id
+        );
+    }
+
+    info!(
+        "Processing run: {} (flowcell: {}, {} POD5 files, {} cases)",
+        run.run_id,
+        run.flow_cell_id,
+        run.pod5s.len(),
+        run.cases.len()
+    );
+
+    // Log cases for this run
+    for case in &run.cases {
+        info!(
+            "  Case: {} ({}) - barcode: {}",
+            case.case_id,
+            case.sample_type,
+            if case.barcode.is_empty() {
+                "none"
+            } else {
+                &case.barcode
+            }
+        );
+    }
+
+    // Create barcode → case mapping with normalized barcodes
+    let mut barcode_to_case: HashMap<String, &IdInput> = HashMap::new();
+    let mut no_barcode_cases: Vec<&IdInput> = Vec::new();
 
-    // Basecalling pod5 dir
+    for case in &run.cases {
+        if case.barcode.is_empty() {
+            no_barcode_cases.push(case);
+        } else {
+            let normalized = normalize_barcode(&case.barcode);
+            barcode_to_case.insert(normalized.clone(), case);
+            info!(
+                "  Barcode mapping: '{}' → '{}' (case: {})",
+                case.barcode, normalized, case.case_id
+            );
+        }
+    }
+
+    // Step 1: Basecalling
     let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
-    info!("Basecalling into: {}", tmp_basecalled_bam.display());
-    let mut cmd = DoradoBasecall::from_config(&config, pod5_dir, tmp_basecalled_bam.clone());
+    info!(
+        "Step 1/4: Basecalling into: {}",
+        tmp_basecalled_bam.display()
+    );
+
+    let mut cmd = DoradoBasecall::from_config(config, run.dir.clone(), tmp_basecalled_bam.clone());
     let _out = SlurmRunner::run(&mut cmd)?;
-    info!("Basecalled ✅");
+    info!("Basecalled ✅ (run: {})", run.run_id);
 
-    // Splitting by RG
-    let tmp_split_dir = tmp_dir.join(format!("{}", Uuid::new_v4()));
-    info!("Split into: {}", tmp_split_dir.display());
-    let mut cmd = SamtoolsSplit::from_config(&config, &tmp_basecalled_bam, tmp_split_dir.clone());
+    // Step 2: Split by read group
+    let tmp_split_dir = tmp_dir.join(format!("split_{}", Uuid::new_v4()));
+    info!(
+        "Step 2/4: Splitting by read group into: {}",
+        tmp_split_dir.display()
+    );
+
+    let mut cmd = SamtoolsSplit::from_config(config, &tmp_basecalled_bam, tmp_split_dir.clone());
     let _out = SlurmRunner::run(&mut cmd)?;
-    fs::remove_file(tmp_basecalled_bam)?;
-    info!("Splitted ✅");
+    fs::remove_file(&tmp_basecalled_bam).context(format!(
+        "Failed to remove temporary basecalled BAM: {}",
+        tmp_basecalled_bam.display()
+    ))?;
+    info!("Split ✅ (run: {})", run.run_id);
 
-    let bams = list_files_with_ext(&tmp_split_dir, "bam").context(format!(
-        "error while listing bam files in: {}",
+    let split_bams = list_files_with_ext(&tmp_split_dir, "bam").context(format!(
+        "Error listing BAM files in: {}",
         tmp_split_dir.display()
     ))?;
 
-    // Aligments
+    // Step 3: Map BAMs to cases and prepare alignment
+    info!("Step 3/6: Mapping BAMs to cases and preparing alignment");
+
     let mut align_jobs = Vec::new();
-    let mut names_to_barcode = HashMap::new();
-    let mut no_barcode_bams = Vec::new();
-    for bam in bams.iter() {
-        if bam.metadata()?.len() > min_bam_size {
-            if let Some(Some(bam_file_name)) = bam.file_name().map(|name| name.to_str()) {
-                if let Some(barcode) = extract_barcode(bam_file_name) {
-                    let bam_out = tmp_dir.join(format!("{}_barcode{barcode}.bam", Uuid::new_v4()));
-                    names_to_barcode.insert(barcode, bam_out.clone());
-                    let job = DoradoAlign::from_config(&config, bam, bam_out);
-                    align_jobs.push(job);
-                } else {
-                    no_barcode_bams.push(bam);
-                    warn!("No barcode and no alignement for: {}", bam.display());
-                }
+    let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new(); // case_id → [aligned_bams]
+    let mut unmatched_bams = Vec::new();
+
+    for bam in split_bams.iter() {
+        let bam_size = bam.metadata()?.len();
+
+        if bam_size <= min_bam_size {
+            warn!(
+                "BAM size too small ({} bytes < {} bytes), skipping: {}",
+                bam_size,
+                min_bam_size,
+                bam.display()
+            );
+            continue;
+        }
+
+        let bam_file_name = bam
+            .file_name()
+            .and_then(|n| n.to_str())
+            .ok_or_else(|| anyhow::anyhow!("Invalid BAM filename: {}", bam.display()))?;
+
+        // Try to extract barcode and match to case
+        if let Some(barcode_str) = extract_and_normalize_barcode(bam_file_name) {
+            if let Some(case) = barcode_to_case.get(&barcode_str) {
+                // Found matching case!
+                let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
+
+                // Create output directory structure
+                let case_output_dir = PathBuf::from(&config.result_dir)
+                    .join(&case.case_id)
+                    .join(&sample_type_dir);
+
+                fs::create_dir_all(&case_output_dir).context(format!(
+                    "Failed to create directory: {}",
+                    case_output_dir.display()
+                ))?;
+
+                // Aligned BAM path (temporary, will be merged if multiple)
+                let aligned_bam = tmp_dir.join(format!(
+                    "{}_{}_{}_{}.bam",
+                    Uuid::new_v4(),
+                    case.case_id,
+                    sample_type_dir,
+                    barcode_str
+                ));
+
+                let job = DoradoAlign::from_config(config, bam, aligned_bam.clone());
+                align_jobs.push(job);
+
+                // Track this BAM for later merging
+                case_bam_map
+                    .entry(format!("{}_{}", case.case_id, sample_type_dir))
+                    .or_default()
+                    .push(aligned_bam);
+
+                info!(
+                    "  Matched barcode {} → case {} ({})",
+                    barcode_str, case.case_id, sample_type_dir
+                );
+            } else {
+                warn!(
+                    "Barcode {} found but no matching case in run {}",
+                    barcode_str, run.run_id
+                );
+                unmatched_bams.push(bam);
             }
         } else {
-            warn!("Low BAM size, will be removed: {}", bam.display());
+            warn!("No barcode extracted from: {}", bam.display());
+            unmatched_bams.push(bam);
         }
     }
+
+    // Handle cases without barcodes
+    if !no_barcode_cases.is_empty() && !unmatched_bams.is_empty() {
+        warn!(
+            "Found {} cases without barcodes and {} unmatched BAMs - manual association needed",
+            no_barcode_cases.len(),
+            unmatched_bams.len()
+        );
+    }
+
     let n_jobs = align_jobs.len();
-    info!("Running aligments of {n_jobs} BAMs.");
-    run_many_sbatch(align_jobs)
-        .context(format!("Failed to run aligment batch for {n_jobs} jobs"))?;
-    info!("Aligments of {n_jobs} BAMs done ✅.");
-
-    for (i, no_barcode_bam) in no_barcode_bams.iter().enumerate() {
-        info!("Keeping: {}", no_barcode_bam.display());
-        fs::copy(no_barcode_bam, tmp_dir.join(format!("no_barcode_{i}.bam")))?;
+    info!("Step 4/6: Running alignment of {} BAM files", n_jobs);
+
+    if n_jobs == 0 {
+        bail!(
+            "No BAM files to align - check barcode matching for run {}",
+            run.run_id
+        );
     }
 
-    fs::remove_dir_all(tmp_split_dir)?;
+    run_many_sbatch(align_jobs).context(format!(
+        "Failed to run alignment batch for {} jobs (run: {})",
+        n_jobs, run.run_id
+    ))?;
+
+    info!("Alignments done ✅ (run: {})", run.run_id);
+
+    // Step 5: Merge and organize final BAMs
+    info!("Step 5/6: Merging and organizing final BAM files");
+
+    for (case_key, aligned_bams) in case_bam_map.iter() {
+        let parts: Vec<&str> = case_key.split('_').collect();
+        if parts.len() < 2 {
+            warn!("Invalid case key format: {}", case_key);
+            continue;
+        }
+
+        let case_id = parts[0];
+        let sample_type_dir = parts[1];
+
+        let final_bam_path = PathBuf::from(&config.result_dir)
+            .join(case_id)
+            .join(sample_type_dir)
+            .join(format!(
+                "{}_{}_{}. bam",
+                case_id, sample_type_dir, config.reference_name
+            ));
+
+        // Check if a BAM already exists from previous runs
+        let existing_bam = if final_bam_path.exists() {
+            info!(
+                "  Found existing BAM for case {}: {}",
+                case_id,
+                final_bam_path.display()
+            );
+            Some(final_bam_path.clone())
+        } else {
+            None
+        };
+
+        // Determine which BAMs need to be merged
+        let mut all_bams_to_merge = aligned_bams.clone();
+        if let Some(ref existing) = existing_bam {
+            // Add existing BAM to the list
+            all_bams_to_merge.push(existing.clone());
+            info!(
+                "  Will merge {} new BAM(s) with existing BAM",
+                aligned_bams.len()
+            );
+        }
+
+        if all_bams_to_merge.len() == 1 {
+            // Single BAM and no existing - just move it
+            if existing_bam.is_none() {
+                fs::rename(&aligned_bams[0], &final_bam_path).context(format!(
+                    "Failed to move BAM to: {}",
+                    final_bam_path.display()
+                ))?;
+                info!("  Created: {}", final_bam_path.display());
+            } else {
+                // Existing BAM is already in place, nothing to do
+                info!("  Using existing BAM: {}", final_bam_path.display());
+            }
+        } else {
+            // Multiple BAMs - need to index first, then merge iteratively
+            info!(
+                "  Merging {} BAMs for case {} ({})",
+                all_bams_to_merge.len(),
+                case_id,
+                sample_type_dir
+            );
+
+            // Index all BAMs first
+            for bam in all_bams_to_merge.iter() {
+                // Check if index already exists
+                let index_path = PathBuf::from(format!("{}.bai", bam.display()));
+                if !index_path.exists() {
+                    info!("    Indexing: {}", bam.display());
+                    let mut index_cmd = SamtoolsIndex {
+                        bin: config.align.samtools_bin.clone(),
+                        threads: config.align.samtools_view_threads,
+                        bam: bam.to_string_lossy().to_string(),
+                    };
+                    SlurmRunner::run(&mut index_cmd).context(format!(
+                        "Failed to index BAM before merge: {}",
+                        bam.display()
+                    ))?;
+                } else {
+                    info!("    Index exists: {}", index_path.display());
+                }
+            }
+
+            // Start with the first BAM as base
+            let mut current_bam = all_bams_to_merge[0].clone();
+
+            // Merge remaining BAMs one by one into the current BAM
+            for (i, bam_to_merge) in all_bams_to_merge[1..].iter().enumerate() {
+                info!(
+                    "    Merging {} into {}",
+                    bam_to_merge.display(),
+                    current_bam.display()
+                );
+
+                let mut merge_cmd = SamtoolsMerge::from_config(config, bam_to_merge, &current_bam);
+
+                SlurmRunner::run(&mut merge_cmd).context(format!(
+                    "Failed to merge BAM {} into {}",
+                    bam_to_merge.display(),
+                    current_bam.display()
+                ))?;
+
+                // After merge, the result is in current_bam (it was updated in place)
+                // The merge command handles cleanup of bam_to_merge
+            }
+
+            // Move final merged BAM to destination (or it's already there if merging into existing)
+            if current_bam != final_bam_path {
+                fs::rename(&current_bam, &final_bam_path).context(format!(
+                    "Failed to move merged BAM to: {}",
+                    final_bam_path.display()
+                ))?;
+            }
+
+            // Clean up individual aligned BAMs (but not the existing one if it was used)
+            // Note: SamtoolsMerge already cleaned up the source BAMs during merge
+            for bam in aligned_bams {
+                let _ = fs::remove_file(bam);
+                let _ = fs::remove_file(format!("{}.bai", bam.display()));
+            }
+
+            info!("  Created (merged): {}", final_bam_path.display());
+        }
+    }
+
+    // Step 6: Index all final BAMs
+    info!("Step 6/6: Indexing final BAM files");
+
+    for (case_key, _) in case_bam_map.iter() {
+        let parts: Vec<&str> = case_key.split('_').collect();
+        if parts.len() < 2 {
+            continue;
+        }
+
+        let case_id = parts[0];
+        let sample_type_dir = parts[1];
+
+        let final_bam_path = PathBuf::from(&config.result_dir)
+            .join(case_id)
+            .join(sample_type_dir)
+            .join(format!(
+                "{}_{}_{}. bam",
+                case_id, sample_type_dir, config.reference_name
+            ));
+
+        info!("  Indexing: {}", final_bam_path.display());
+        let mut index_cmd = SamtoolsIndex {
+            bin: config.align.samtools_bin.clone(),
+            threads: config.align.samtools_view_threads,
+            bam: final_bam_path.to_string_lossy().to_string(),
+        };
+        SlurmRunner::run(&mut index_cmd).context(format!(
+            "Failed to index final BAM: {}",
+            final_bam_path.display()
+        ))?;
+    }
+    info!("  Indexing done ✅");
+
+    // Step 7: Cleanup
+    info!("Cleaning up temporary files");
+    fs::remove_dir_all(&tmp_split_dir).context(format!(
+        "Failed to remove split directory: {}",
+        tmp_split_dir.display()
+    ))?;
+
+    info!(
+        "Pipeline completed ✅ for run: {} (flowcell: {})",
+        run.run_id, run.flow_cell_id
+    );
+    info!(
+        "  Processed {} cases with {} final BAM files",
+        run.cases.len(),
+        case_bam_map.len()
+    );
 
     Ok(())
 }
 
 #[cfg(test)]
 mod tests {
+    use super::*;
+    use crate::collection::pod5::Pod5sRuns;
+    use crate::config::Config;
     use crate::helpers::test_init;
     use crate::TEST_DIR;
 
-    use super::*;
-
     #[test]
     fn slurm_pipe() -> anyhow::Result<()> {
         test_init();
 
-        basecall_align_split(
-            format!("{}/inputs/pod5/A", TEST_DIR.as_str()).into(),
-            format!("{}/outputs", TEST_DIR.as_str()).into(),
-            10 * 1024 * 1024,
-        )?;
+        let config = Config::default();
+
+        let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
+        for run in runs.data {
+            basecall_align_split(
+                &run,
+                &config,
+                format!("{}/outputs", TEST_DIR.as_str()).into(),
+                10 * 1024 * 1024,
+            )?;
+        }
 
         Ok(())
     }

+ 6 - 5
src/slurm_helpers.rs

@@ -197,7 +197,8 @@ pub fn slurm_availables() -> anyhow::Result<Vec<NodeInfo>> {
 
 pub fn max_gpu_per_node(gpu_name: &str) -> Option<i32> {
     let nodes = slurm_availables().ok()?;
-    nodes.iter()
+    nodes
+        .iter()
         .filter(|n| n.gpu_names.iter().any(|g| g.eq_ignore_ascii_case(gpu_name)))
         .map(|n| n.gpu_free)
         .max()
@@ -233,10 +234,10 @@ mod tests {
             );
         }
 
-        let u  = max_gpu_per_node("h100");
-        println!("{u:#?}");
-                let u  = max_gpu_per_node("a100");
-        println!("{u:#?}");
+        let u = max_gpu_per_node("h100");
+        println!("H100 max available: {}", u.unwrap_or(0));
+        let u = max_gpu_per_node("a100");
+        println!("A100 max available: {}", u.unwrap_or(0));
 
         Ok(())
     }