//! # PromRun Module //! //! Import, validation, and processing pipeline for Oxford Nanopore PromethION sequencing runs. //! //! This module provides the [`PromRun`] struct which orchestrates the complete workflow from //! raw MinKNOW output ingestion through alignment, sorting, and per-case BAM finalization. //! //! ## Supported Kit Types //! //! - **Multiplexed** (`SQK-NBD114-24`, `SQK-NBD112`, `RBK`): BAMs demultiplexed by barcode //! - **Non-multiplexed** (`SQK-LSK114`, `SQK-LSK109`): Single-sample runs //! //! ## Pipeline Stages //! //! 1. **Import** — Recursive directory scan for BAM, POD5, sample sheets, pore activity logs //! 2. **Validation** — Verify BAMs are unaligned (no reference sequences in header) //! 3. **Case mapping** — Associate BAMs with cases via barcode extraction or single-case assignment //! 4. **Alignment** — Parallel Dorado alignment against reference genome //! 5. **Sort/Index** — Coordinate sort and index via samtools (Slurm batch) //! 6. **Finalize** — Merge chunks per case, atomic replacement of existing outputs //! //! ## Directory Structure //! //! Files are discovered recursively; typical MinKNOW layouts: //! //! ```text //! run_dir/ //! ├── sample_sheet_*.csv # Required //! ├── pore_activity_*.csv # Optional //! ├── pod5_pass/barcode*/*.pod5 # Multiplexed //! ├── bam_pass/barcode*/*.bam # Multiplexed //! ├── pod5/*.pod5 # Non-multiplexed //! └── bam/*.bam # Non-multiplexed //! ``` //! //! ## Example //! //! ```rust,ignore //! use crate::config::Config; //! use crate::collection::promrun::PromRun; //! //! let config = Config::default(); //! //! // Import and process //! let mut run = PromRun::from_dir("/data/prom/20240101_run", &config)?; //! run.cases.push(IdInput { case_id: "CASE001".into(), sample_type: "TUM".into(), barcode: "barcode01".into() }); //! run.process_bams(&config)?; //! //! // Or load cached //! let run = PromRun::open("protocol_run_id", &config)?; //! ``` use std::{ collections::{BTreeMap, HashMap}, fmt, fs::{self, File}, io::{BufReader, BufWriter}, path::{Path, PathBuf}, time::{Duration, Instant}, }; use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use log::{info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use rust_htslib::bam::{self, Read}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::{ collection::{ bam_stats::WGSBamStats, flowcells::IdInput, minknow::{parse_pore_activity_from_reader, MinKnowSampleSheet, PoreStateEntry}, pod5::Pod5, }, commands::{ dorado::DoradoAlign, modkit::ModkitSummary, samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort}, }, config::Config, helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard}, io::bam::read_sm_tag_or_inject, locker::SampleLock, pipes::InitializeSolo, run, run_many, }; /// Represent a complete ONT PromethION sequencing run with all associated files. /// /// Represents an imported run directory containing BAM files, POD5 raw signal /// files, sample sheets, and pore activity logs. Provides JSON serialization /// for caching parsed metadata. /// /// # Directory Structure /// /// The importer recursively scans for files, supporting various MinKNOW output layouts: /// /// ```text /// run_dir/ /// ├── sample_sheet_*.csv # Required: MinKNOW sample sheet /// ├── pore_activity_*.csv # Optional: pore state logs /// │ /// │ # POD5 files (any structure supported): /// ├── pod5/ # Non-multiplexed runs /// │ └── *.pod5 /// ├── pod5_pass/ # Multiplexed runs (pass reads) /// │ └── barcode*/ /// │ └── *.pod5 /// ├── pod5_fail/ # Failed reads (optional) /// │ └── *.pod5 /// │ /// │ # BAM files (any structure supported): /// ├── bam/ # Non-multiplexed runs /// │ └── *.bam /// └── bam_pass/ # Multiplexed runs /// └── barcode*/ /// └── *.bam /// ``` /// /// Files are discovered recursively regardless of subdirectory naming. /// /// # Example /// /// ```rust,ignore /// let config = Config::default(); /// /// // Import a new run /// let run = PromRun::from_dir("/data/runs/20240101_run", &config)?; /// /// // Load a cached run /// let run = PromRun::open("protocol_run_id_here", &config)?; /// /// println!("Flowcell: {}", run.flow_cell_id); /// println!("BAM files: {}", run.bams.len()); /// println!("POD5 files: {}", run.pod5s.len()); /// ``` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PromRun { /// Root directory of the sequencing run. pub dir: PathBuf, /// Timestamp when this run was imported/parsed. pub import_date: DateTime, /// Unique identifier for the protocol run (from sample sheet). pub protocol_run_id: String, /// Flowcell position identifier (e.g., device slot or port). pub position_id: String, /// Flowcell barcode/identifier (e.g., `FAB12345`). pub flow_cell_id: String, /// Sample ID from the sample sheet. pub sample_id: String, /// Experiment ID from the sample sheet. pub experiment_id: String, /// Flowcell product code (e.g., `FLO-MIN106`, `FLO-PRO002`). pub flow_cell_product_code: String, /// Library preparation kit identifier (e.g., `SQK-LSK114`). pub kit: String, /// Associated case identifiers for clinical/research tracking. pub cases: Vec, /// All POD5 raw signal files found in the run directory. pub pod5s: Vec, /// All BAM files found in the run directory. pub bams: Vec, /// Pore activity/state log entries, if available. pub pore_activity: Option>, } impl PromRun { /// Imports a sequencing run from a directory. /// /// Recursively scans the directory for BAM files, POD5 files, sample sheets, /// and pore activity logs. Parses all files in parallel and caches the result. /// /// # Arguments /// /// * `dir` - Path to the run directory (must exist and be a directory) /// * `config` - Application configuration (provides thread count and cache paths) /// /// # Returns /// /// Returns `Ok(PromRun)` on success, or an error if: /// - `dir` is not a directory /// - No sample sheet (`sample_sheet_*.csv`) is found /// - Sample sheet parsing fails /// - Thread pool creation fails /// /// /// # Performance /// /// BAM and POD5 file parsing is parallelized using Rayon with the thread /// count from `config.threads`. /// /// # Example /// /// ```rust,ignore /// let config = Config::default(); /// let run = PromRun::from_dir("/data/runs/my_run", &config)?; /// println!("Imported {} BAM files", run.bams.len()); /// ``` pub fn from_dir(dir: impl AsRef) -> anyhow::Result { let dir = dir.as_ref().to_path_buf(); if !dir.is_dir() { anyhow::bail!( "Failed to import Run: input path is not a directory: {}", dir.display() ); } // Collect file paths by type let mut bam_paths = Vec::new(); let mut sample_sheet_path = None; let mut pod5_paths = Vec::new(); let mut pore_activity_path = None; for file in list_files_recursive(&dir) { let name = file.file_name().and_then(|s| s.to_str()).unwrap_or(""); let ext = file.extension().and_then(|e| e.to_str()).unwrap_or(""); match ext { "bam" => bam_paths.push(file), "pod5" => pod5_paths.push(file), "csv" => { if name.starts_with("sample_sheet") { sample_sheet_path = Some(file); } else if name.starts_with("pore_activity") { pore_activity_path = Some(file); } } _ => {} } } // Parse required sample sheet let sample_sheet = sample_sheet_path .ok_or_else(|| anyhow::anyhow!("No sample_sheet_*.csv found in {}", dir.display())) .and_then(MinKnowSampleSheet::from_path)?; // Parse optional pore activity let pore_activity = pore_activity_path .and_then(|path| File::open(path).ok()) .and_then(|mut reader| parse_pore_activity_from_reader(&mut reader).ok()); // Parse BAM files in parallel let bams: Vec = bam_paths .par_iter() .filter_map(|p| match PromBam::from_path(p) { Ok(bam) => Some(bam), Err(e) => { log::warn!("Failed to parse BAM {}: {}", p.display(), e); None } }) .collect(); // Parse POD5 files in parallel let pod5s: Vec = pod5_paths .par_iter() .filter_map(|p| match Pod5::from_path(p) { Ok(pod5) => Some(pod5), Err(e) => { log::warn!("Failed to parse POD5 {}: {}", p.display(), e); None } }) .collect(); let prom_run = Self { dir, import_date: Utc::now(), protocol_run_id: sample_sheet.protocol_run_id, position_id: sample_sheet.position_id, flow_cell_id: sample_sheet.flow_cell_id, sample_id: sample_sheet.sample_id, experiment_id: sample_sheet.experiment_id, flow_cell_product_code: sample_sheet.flow_cell_product_code, kit: sample_sheet.kit, cases: Vec::new(), pod5s, bams, pore_activity, }; // Cache to disk // prom_run.save_json(prom_run.cache_path(config))?; Ok(prom_run) } /// Partitions BAMs into unaligned and already-aligned groups. /// /// Returns (unaligned, already_aligned) BAM lists. fn partition_bams_by_alignment<'a>( &self, pass_bams: &[&'a PromBam], ) -> (Vec<&'a PromBam>, Vec<&'a PromBam>) { let results: Vec<_> = pass_bams .par_iter() .map(|&bam| { let is_aligned = match bam::Reader::from_path(&bam.path) { Ok(reader) => { let header = bam::Header::from_template(reader.header()); match get_genome_sizes(&header) { Ok(sizes) => !sizes.is_empty(), Err(_) => { warn!( "Failed to parse header for {}, assuming unaligned", bam.path.display() ); false } } } Err(e) => { warn!("Failed to open BAM {}: {}", bam.path.display(), e); false } }; (bam, is_aligned) }) .collect(); let mut unaligned = Vec::new(); let mut aligned = Vec::new(); for (bam, is_aligned) in results { if is_aligned { aligned.push(bam); } else { unaligned.push(bam); } } (unaligned, aligned) } /// Maps BAM files to cases based on kit type (multiplexed vs non-multiplexed). /// /// Returns a mapping from BAM path to case, plus a list of unmatched BAMs. fn map_bams_to_cases<'a>( &'a self, pass_bams: &[&PromBam], kit_type: KitType, ) -> anyhow::Result<(HashMap, Vec)> { let mut bam_to_case: HashMap = HashMap::new(); let mut unmatched: Vec = Vec::new(); match kit_type { KitType::NonMultiplexed => { if self.cases.len() != 1 { bail!("Non-multiplexed run requires exactly one case"); } let single_case = self.cases.first().ok_or_else(|| { anyhow::anyhow!("Non-multiplexed run requires exactly one case") })?; for bam in pass_bams { bam_to_case.insert(bam.path.clone(), single_case); } } KitType::Multiplexed => { // Construct map(normalized barcode) -> IdInput let barcode_to_case: HashMap = self .cases .iter() .filter(|c| !c.barcode.is_empty()) .map(|c| (normalize_barcode(&c.barcode), c)) .collect(); for bam in pass_bams { let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or(""); let barcode_opt = extract_and_normalize_barcode(filename).or_else(|| { bam.path .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str()) .and_then(extract_and_normalize_barcode) }); match barcode_opt.and_then(|bc| barcode_to_case.get(&bc)) { Some(case) => { bam_to_case.insert(bam.path.clone(), *case); } None => { unmatched.push(bam.path.clone()); } } } } } Ok((bam_to_case, unmatched)) } /// Process BAM files from a PromethION run: align, sort, merge, and index per case. /// /// This method handles both multiplexed (SQK-NBD114-24) and non-multiplexed (SQK-LSK114) /// sequencing runs. It skips basecalling and works directly with existing unaligned BAMs. /// /// # Workflow /// /// 1. **Validate cases** based on kit type (multiplexed vs non-multiplexed) /// 2. **Map BAMs to cases** using barcode extraction or single-case assignment /// 3. **Align BAMs in parallel** using Dorado /// 4. **Sort aligned chunks in parallel** /// 5. **Merge and finalize** per case with sorting and indexing /// /// # Kit Types /// /// - **SQK-NBD114-24** (multiplexed): BAMs in `bam_pass/barcodeXX/`, matched by barcode /// - **SQK-LSK114** (non-multiplexed): All BAMs in `bam_pass/` assigned to single case /// /// # Arguments /// /// * `config` - Application configuration with alignment settings and paths /// /// # Returns /// /// Returns `Ok(())` on success, or an error if validation or processing fails. /// /// # Errors /// /// This function will return an error if: /// - No cases are associated with the run /// - Non-multiplexed run has != 1 case /// - No BAMs are available for processing /// - BAM files are already aligned /// - External commands fail (Dorado, samtools, Slurm) pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> { let lock_dir = format!("{}/locks", config.result_dir); let _lock = SampleLock::acquire(&lock_dir, &self.protocol_run_id, "process_bams") .with_context(|| format!("Cannot start Processing BAMs for {}", self.dir.display()))?; let pipeline_start = Timer::start(); let mut metrics = PipelineMetrics::default(); let tmp_dir = PathBuf::from(&config.tmp_dir); let mut temp_guard = TempFileGuard::new(); // ===================================================================== // Step 1: Validate preconditions // ===================================================================== if self.cases.is_empty() { bail!( "Run {} has no associated cases. Add cases before processing.", self.protocol_run_id ); } let kit_type = KitType::detect(&self.kit)?; if kit_type == KitType::NonMultiplexed && self.cases.len() != 1 { bail!( "Non-multiplexed run (kit: {}) requires exactly 1 case, found {}", self.kit, self.cases.len() ); } info!( "Processing PromRun: {} (flowcell: {}, kit: {} [{:?}], {} BAMs, {} cases)", self.protocol_run_id, self.flow_cell_id, self.kit, kit_type, self.bams.len(), self.cases.len() ); // ===================================================================== // Step 2: Filter and validate BAMs // ===================================================================== let validation_start = Timer::start(); // Filter out BAMs already merged into final outputs let candidate_bams = self.filter_already_merged(config); if candidate_bams.is_empty() { info!( "🎉 Run {}: all BAMs already merged — nothing to do", self.protocol_run_id ); return Ok(()); } let pass_bams = filter_pass_bams(&candidate_bams, kit_type); if pass_bams.is_empty() { bail!("No BAM files found in bam_pass directories"); } info!("Filtered to {} BAM files from bam_pass", pass_bams.len()); info!("Step 1/5: Validating BAMs are unaligned"); let (unaligned_bams, aligned_bams) = self.partition_bams_by_alignment(&pass_bams); if !aligned_bams.is_empty() { info!( "Found {} already-aligned BAMs — will skip alignment for these", aligned_bams.len() ); } if unaligned_bams.is_empty() && aligned_bams.is_empty() { bail!("No valid BAM files to process"); } metrics.validation_duration = validation_start.elapsed(); metrics.bams_processed = pass_bams.len(); metrics.bytes_input = pass_bams.iter().map(|b| b.bam_size).sum(); info!( "✅ {} unaligned, {} already aligned", unaligned_bams.len(), aligned_bams.len() ); // ===================================================================== // Step 3: Map BAMs to cases // ===================================================================== info!("Step 2/5: Mapping BAM files to cases"); // Map unaligned BAMs (need alignment) let (unaligned_bam_to_case, unmatched_unaligned) = self.map_bams_to_cases(&unaligned_bams, kit_type)?; // Map aligned BAMs (skip alignment, go straight to sort/merge) let (aligned_bam_to_case, unmatched_aligned) = self.map_bams_to_cases(&aligned_bams, kit_type)?; let total_mapped = unaligned_bam_to_case.len() + aligned_bam_to_case.len(); let total_unmatched = unmatched_unaligned.len() + unmatched_aligned.len(); if total_mapped == 0 { bail!("No BAMs were successfully mapped to cases"); } info!( "✅ Mapped {} BAMs to cases ({} unmatched)", total_mapped, total_unmatched ); // ===================================================================== // Step 4: Prepare and run alignment // ===================================================================== let align_start = Timer::start(); let mut case_bam_map: HashMap> = HashMap::new(); // Add already-aligned BAMs directly to the map (no alignment needed) for (bam_path, case) in &aligned_bam_to_case { let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config); let key = format!("{}_{}", case.case_id, sample_type_dir); case_bam_map.entry(key).or_default().push(bam_path.clone()); } if !unaligned_bam_to_case.is_empty() { let (align_jobs, aligned_case_map) = prepare_alignment_jobs(&unaligned_bam_to_case, &tmp_dir, config)?; // Track temp files for cleanup for bams in aligned_case_map.values() { temp_guard.track_many(bams.clone()); } info!("Step 3/5: Running {} alignment jobs", align_jobs.len()); run_many!(config, align_jobs).context("Alignment batch failed")?; self.verify_outputs_exist(&aligned_case_map, "alignment")?; // Merge newly aligned BAMs into the case map for (key, bams) in aligned_case_map { case_bam_map.entry(key).or_default().extend(bams); } info!("✅ Alignments completed"); } else { info!("Step 3/5: Skipping alignment — all BAMs already aligned"); } metrics.alignment_duration = align_start.elapsed(); // ===================================================================== // Step 5: Sort and index chunks // ===================================================================== info!("Step 4/5: Sorting and indexing chunks"); let sort_start = Timer::start(); let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?; // Update guard to track sorted files instead temp_guard.clear(); for bams in sorted_bam_map.values() { temp_guard.track_many(bams.clone()); } metrics.sort_index_duration = sort_start.elapsed(); info!("✅ Sorted {} chunk BAMs", sorted_bam_map.len()); // ===================================================================== // Step 6: Merge and finalize per case // ===================================================================== info!("Step 5/5: Finalizing per-case BAMs"); let finalize_start = Timer::start(); let finalized = finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?; metrics.finalize_duration = finalize_start.elapsed(); metrics.cases_finalized = finalized.len(); metrics.bytes_output = finalized .iter() .filter_map(|p| fs::metadata(p).ok()) .map(|m| m.len()) .sum(); // Success — disarm cleanup temp_guard.disarm(); metrics.total_duration = pipeline_start.elapsed(); info!("{metrics}"); // TODO: save metrics info!( "🎉 Pipeline completed for run: {} (flowcell: {})", self.protocol_run_id, self.flow_cell_id ); Ok(()) } fn verify_outputs_exist( &self, case_bam_map: &HashMap>, stage: &str, ) -> anyhow::Result<()> { let missing: Vec<_> = case_bam_map .values() .flatten() .filter(|p| !p.exists()) .collect(); if !missing.is_empty() { bail!( "{} failed: {} output file(s) not created. First missing: {}", stage, missing.len(), missing[0].display() ); } Ok(()) } /// Serializes this run to a JSON file. /// /// # Arguments /// /// * `path` - Output file path (will be created or overwritten) /// /// # Errors /// /// Returns an error if file creation or JSON serialization fails. pub fn save_json(&self, path: impl AsRef) -> anyhow::Result<()> { let path = path.as_ref(); let file = BufWriter::new( File::create(path) .with_context(|| format!("Failed to create JSON file: {}", path.display()))?, ); serde_json::to_writer_pretty(file, self) .with_context(|| format!("Failed to serialize PromRun to {}", path.display()))?; Ok(()) } /// Deserializes a run from a JSON file. /// /// # Arguments /// /// * `path` - Path to a previously saved JSON cache file /// /// # Errors /// /// Returns an error if the file cannot be read or parsed. pub fn load_json(path: impl AsRef) -> anyhow::Result { let path = path.as_ref(); let file = BufReader::new( File::open(path) .with_context(|| format!("Failed to open JSON file: {}", path.display()))?, ); let data = serde_json::from_reader(file) .with_context(|| format!("Failed to parse PromRun from {}", path.display()))?; Ok(data) } /// Returns the cache file path for this run. /// /// The cache path is `{config.run_cache_dir}/{protocol_run_id}.json`. #[must_use] pub fn cache_path(&self, config: &Config) -> PathBuf { let mut path = PathBuf::from(&config.run_cache_dir); path.push(format!("{}.json", self.protocol_run_id)); path } /// Opens a previously cached run by its protocol run ID. /// /// # Arguments /// /// * `run_id` - The protocol run ID (filename stem in the cache directory) /// * `config` - Application configuration with `run_cache_dir` /// /// # Example /// /// ```rust,ignore /// let run = PromRun::open("abc123-def456", &config)?; /// ``` pub fn open(run_id: &str, config: &Config) -> anyhow::Result { let mut path = PathBuf::from(&config.run_cache_dir); path.push(format!("{run_id}.json")); Self::load_json(path) } /// Returns the total size of all BAM files in bytes. #[must_use] pub fn total_bam_size(&self) -> u64 { self.bams.iter().map(|b| b.bam_size).sum() } /// Returns the total size of all POD5 files in bytes. #[must_use] pub fn total_pod5_size(&self) -> u64 { self.pod5s.iter().map(|p| p.file_size).sum() } /// Returns BAMs not yet merged into any case's final output. /// /// For each case, checks which source files are already in the final BAM /// and excludes them from processing. fn filter_already_merged(&self, config: &Config) -> Vec<&PromBam> { // Collect all source filenames already merged across all cases let mut all_existing_sources: FxHashSet = FxHashSet::default(); for case in &self.cases { let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type)); if !final_bam_path.exists() { info!( "No existing final BAM for case {} — all matching inputs are new", case.case_id ); continue; } match WGSBamStats::from_bam(&final_bam_path, config) { Ok(stats) => { let sources = stats.source_filenames(); info!( "Case {} has {} existing source files", case.case_id, sources.len() ); all_existing_sources.extend(sources); } Err(e) => { warn!( "Could not load stats for case {} ({}): {}", case.case_id, final_bam_path.display(), e ); } } } if all_existing_sources.is_empty() { info!( "Run {}: no existing sources found, all {} BAMs are new", self.protocol_run_id, self.bams.len() ); return self.bams.iter().collect(); } let (new_bams, skipped): (Vec<_>, Vec<_>) = self.bams.iter().partition(|bam| { let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or(""); !all_existing_sources.contains(filename) }); if !skipped.is_empty() { info!( "Run {}: skipping {} BAM(s) already merged: {}{}", self.protocol_run_id, skipped.len(), skipped .iter() .take(3) .filter_map(|b| b.path.file_name()) .map(|n| n.to_string_lossy()) .collect::>() .join(", "), if skipped.len() > 3 { "..." } else { "" } ); } info!( "Run {}: {} new BAM(s) to process", self.protocol_run_id, new_bams.len() ); new_bams } } /// Kit type classification for workflow branching. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum KitType { Multiplexed, NonMultiplexed, } impl KitType { /// Detects kit type from kit name string. /// /// Known multiplexed kits: NBD114, NBD112, RBK /// Known non-multiplexed kits: LSK114, LSK109 pub fn detect(kit: &str) -> anyhow::Result { const MULTIPLEXED_PATTERNS: &[&str] = &["NBD114", "NBD112", "RBK"]; const NON_MULTIPLEXED_PATTERNS: &[&str] = &["LSK114", "LSK109"]; let kit_upper = kit.to_uppercase(); for pattern in MULTIPLEXED_PATTERNS { if kit_upper.contains(pattern) { return Ok(Self::Multiplexed); } } for pattern in NON_MULTIPLEXED_PATTERNS { if kit_upper.contains(pattern) { return Ok(Self::NonMultiplexed); } } bail!( "Unknown kit type '{}'. Expected one of: {:?} (multiplexed) or {:?} (non-multiplexed)", kit, MULTIPLEXED_PATTERNS, NON_MULTIPLEXED_PATTERNS ) } } /// Statistics for files in a single directory. #[derive(Default)] struct DirStats { pod5_count: usize, pod5_size: u64, bam_count: usize, bam_size: u64, } impl fmt::Display for PromRun { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Converts bytes to GiB. fn to_gib(bytes: u64) -> f64 { bytes as f64 / 1024.0_f64.powi(3) } let mut dir_stats: BTreeMap = BTreeMap::new(); // Aggregate POD5 stats by directory for pod5 in &self.pod5s { let rel = pod5.path.strip_prefix(&self.dir).unwrap_or(&pod5.path); let dir = rel.parent().unwrap_or(Path::new(".")); let key = dir.display().to_string(); let entry = dir_stats.entry(key).or_default(); entry.pod5_count += 1; entry.pod5_size += pod5.file_size; } // Aggregate BAM stats by directory for bam in &self.bams { let rel = bam.path.strip_prefix(&self.dir).unwrap_or(&bam.path); let dir = rel.parent().unwrap_or(Path::new(".")); let key = dir.display().to_string(); let entry = dir_stats.entry(key).or_default(); entry.bam_count += 1; entry.bam_size += bam.bam_size; } // Header information writeln!(f, "📦 PromRun")?; writeln!(f, " dir : {}", self.dir.display())?; writeln!(f, " imported : {}", self.import_date)?; writeln!(f, " run id : {}", self.protocol_run_id)?; writeln!(f, " position : {}", self.position_id)?; writeln!( f, " flow cell : {} ({})", self.flow_cell_id, self.flow_cell_product_code )?; writeln!(f, " sample id : {}", self.sample_id)?; writeln!(f, " experiment : {}", self.experiment_id)?; writeln!(f, " kit : {}", self.kit)?; writeln!(f, " cases : {}", self.cases.len())?; match &self.pore_activity { Some(pa) => writeln!(f, " pore act. : {} entries", pa.len())?, None => writeln!(f, " pore act. : none")?, } writeln!(f)?; writeln!( f, "📁 Files by directory (relative to {})", self.dir.display() )?; if dir_stats.is_empty() { writeln!(f, " (no files)")?; return Ok(()); } for (dir, stats) in dir_stats { writeln!(f, " - {dir}")?; if stats.pod5_count > 0 { writeln!( f, " POD5 : {:>3} files, {:6.2} GiB", stats.pod5_count, to_gib(stats.pod5_size) )?; } if stats.bam_count > 0 { writeln!( f, " BAM : {:>3} files, {:6.2} GiB", stats.bam_count, to_gib(stats.bam_size) )?; } } Ok(()) } } /// Map sample type to output directory name. fn map_sample_type_to_dir(sample_type: &str, config: &Config) -> String { let normalized = sample_type.to_lowercase(); match normalized.as_str() { "nor" | "normal" | "mrd" => config.normal_name.clone(), _ => config.tumoral_name.clone(), } } /// Metadata extracted from an ONT PromethION BAM header. /// /// Wraps standard SAM header tags specifically for MinKNOW outputs. /// /// # Parsed Tags /// * `@RG` (Read Group): Extracts run ID, basecall model, flowcell ID, and sample info. /// * `@PG` (Program): Extracts MinKNOW version and GPU telemetry. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PromBam { /// Absolute path to the BAM file. pub path: PathBuf, /// Last modification timestamp of the file. pub modified: DateTime, /// File size in bytes. pub bam_size: u64, /// Unique run identifier from the `@RG DS:runid=` field. pub run_id: String, /// Basecalling model name (e.g., `dna_r10.4.1_e8.2_400bps_sup@v4.2.0`). pub basecall_model: String, /// Modified base detection models, space-separated if multiple. pub modbase_models: String, /// Instrument model (e.g., `PromethION`, `P2I`). pub instrument: String, /// Flowcell barcode identifier (e.g., `PBI55810`). pub flowcell_id: String, /// Sample identifier from the sample sheet. pub sample: String, /// Library identifier. pub library: String, /// Full read group ID string from `@RG ID:`. pub read_group_id: String, /// Sequencing timestamp in ISO 8601 format from `@RG DT:`. pub timestamp: String, /// GPU information from MinKNOW `@PG DS:`. pub gpu_info: String, /// MinKNOW version. pub minknow_version: String, } impl PromBam { /// Parses BAM header metadata from the file at the given path. /// /// Opens the BAM file, reads the SAM header, and extracts ONT-specific /// metadata from `@RG` and `@PG` records. /// /// # Arguments /// /// * `path` - Path to a BAM file (must be readable and valid BAM format) /// /// # Returns /// /// Returns `Ok(PromBam)` with populated fields, or an error if: /// - The file cannot be opened or read /// - The file is not a valid BAM format /// - The header cannot be parsed as UTF-8 /// /// # Notes /// /// - Fields may be empty strings if the corresponding header tags are missing /// - Only processes `@RG` records with `PL:ONT` (Oxford Nanopore platform) /// - Only processes `@PG` records with `PN:minknow` /// /// # Example /// /// ```rust,ignore /// let bam = PromBam::from_path("sample.bam")?; /// assert!(!bam.run_id.is_empty()); /// ``` pub fn from_path(path: impl AsRef) -> anyhow::Result { let path = path.as_ref().to_path_buf(); // Retrieve file metadata let meta = fs::metadata(&path) .with_context(|| format!("Failed to read metadata for {}", path.display()))?; let modified: DateTime = meta .modified() .map(DateTime::::from) .with_context(|| format!("Failed to get modification time for {}", path.display()))?; let bam_size = meta.len(); // Open BAM and read header let reader = bam::Reader::from_path(&path) .with_context(|| format!("Failed to open BAM file: {}", path.display()))?; let header = reader.header().clone(); let text = std::str::from_utf8(header.as_bytes()).context("BAM header contains invalid UTF-8")?; // Initialize with defaults let mut prom = PromBam { path: path.clone(), modified, bam_size, run_id: String::new(), basecall_model: String::new(), modbase_models: String::new(), instrument: String::new(), flowcell_id: String::new(), sample: String::new(), library: String::new(), read_group_id: String::new(), timestamp: String::new(), gpu_info: String::new(), minknow_version: String::new(), }; let mut found_ont_rg = false; let mut found_other_rg = false; // Parse header lines for line in text.lines() { if !line.starts_with('@') { continue; } let mut fields = line.split('\t'); let Some(tag) = fields.next() else { continue; }; let tag = tag.trim_start_matches('@'); // Build key-value map from tab-separated fields let kv: Vec<(&str, &str)> = fields .filter_map(|f| { let mut it = f.splitn(2, ':'); Some((it.next()?, it.next().unwrap_or(""))) }) .collect(); match tag { "PG" => Self::parse_pg_record(&kv, &mut prom), "RG" => { if Self::parse_rg_record(&kv, &mut prom) { found_ont_rg = true; } else { found_other_rg = true; } } _ => {} } } if !found_ont_rg && found_other_rg { warn!( "BAM {} contains non-ONT read groups but no ONT read groups. \ This may not be a Nanopore BAM file.", path.display() ); } if !found_ont_rg && !found_other_rg { warn!( "BAM {} contains no read groups. Metadata will be incomplete.", path.display() ); } Ok(prom) } /// Parses `@PG` (Program) header record for MinKNOW-specific fields. fn parse_pg_record(kv: &[(&str, &str)], prom: &mut PromBam) { // Only process MinKNOW program records let is_minknow = kv.iter().any(|(k, v)| *k == "PN" && *v == "minknow"); if !is_minknow { return; } for (k, v) in kv { match *k { "DS" => prom.gpu_info = (*v).to_string(), "VN" => prom.minknow_version = (*v).to_string(), _ => {} } } } /// Parses `@RG` (Read Group) header record for ONT-specific fields. fn parse_rg_record(kv: &[(&str, &str)], prom: &mut PromBam) -> bool { // Only process ONT platform read groups let platform = kv .iter() .find(|(k, _)| *k == "PL") .map(|(_, v)| *v) .unwrap_or(""); if platform != "ONT" { return false; } // Parse composite DS field: runid=... basecall_model=... modbase_models=... if let Some((_, ds_raw)) = kv.iter().find(|(k, _)| *k == "DS") { for part in ds_raw.split_whitespace() { if let Some(val) = part.strip_prefix("runid=") { prom.run_id = val.to_string(); } else if let Some(val) = part.strip_prefix("basecall_model=") { prom.basecall_model = val.to_string(); } else if let Some(val) = part.strip_prefix("modbase_models=") { // Remove commas from modbase_models list prom.modbase_models = val.replace(',', " "); } } } // Extract standard SAM tags for (k, v) in kv { match *k { "ID" => prom.read_group_id = (*v).to_string(), "DT" => prom.timestamp = (*v).to_string(), "LB" => prom.library = (*v).to_string(), "PM" => prom.instrument = (*v).to_string(), "PU" => prom.flowcell_id = (*v).to_string(), "SM" => prom.sample = (*v).to_string(), _ => {} } } true } } impl fmt::Display for PromBam { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "PromBam")?; writeln!(f, " Path: {}", self.path.display())?; writeln!(f, " Modified: {}", self.modified)?; writeln!( f, " BamSize: {:.2} GiB", self.bam_size as f64 / (1024.0_f64.powi(3)) )?; writeln!(f)?; writeln!(f, " Run ID: {}", self.run_id)?; writeln!(f, " Basecaller: {}", self.basecall_model)?; writeln!(f, " Modbase Models: {}", self.modbase_models)?; writeln!(f)?; writeln!(f, " Instrument: {}", self.instrument)?; writeln!(f, " Flowcell ID: {}", self.flowcell_id)?; writeln!(f, " Sample: {}", self.sample)?; writeln!(f, " Library: {}", self.library)?; writeln!(f)?; writeln!(f, " RG ID: {}", self.read_group_id)?; writeln!(f, " Timestamp: {}", self.timestamp)?; writeln!(f)?; writeln!(f, " GPU: {}", self.gpu_info)?; writeln!(f, " MinKNOW Ver: {}", self.minknow_version) } } /// Filters BAMs to only include those from bam_pass directories. fn filter_pass_bams<'a>(bams: &[&'a PromBam], kit_type: KitType) -> Vec<&'a PromBam> { bams.iter() .filter(|bam| { let p = bam.path.to_string_lossy(); if p.contains("bam_fail") { return false; } match kit_type { KitType::Multiplexed => p.contains("bam_pass"), KitType::NonMultiplexed => p.contains("/bam/") || p.contains("bam_pass"), } }) .copied() .collect() } /// Builds the final BAM path for a case. fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result { let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type)); if let Some(parent) = final_bam_path.parent() { fs::create_dir_all(parent) .with_context(|| format!("Failed to create output directory: {}", parent.display()))?; } Ok(final_bam_path) } /// Merges multiple chunk BAMs into a single BAM. /// /// If only one chunk exists, returns it directly (no merge needed). fn merge_chunk_bams( chunk_bams: &[PathBuf], case: &IdInput, sample_type_dir: &str, tmp_dir: &Path, config: &Config, ) -> anyhow::Result { if chunk_bams.len() == 1 { return Ok(chunk_bams[0].clone()); } let merged_path = tmp_dir.join(format!( "{}_{}_{}_merged.bam", Uuid::new_v4(), case.case_id, sample_type_dir )); info!( " Merging {} chunks for case {} → {}", chunk_bams.len(), case.case_id, merged_path.file_name().unwrap().to_string_lossy() ); let mut merge_cmd = SamtoolsMergeMany::from_config(merged_path.clone(), chunk_bams.to_vec(), config); run!(config, &mut merge_cmd).with_context(|| { format!( "Failed to merge {} chunk BAMs for case {}", chunk_bams.len(), case.case_id ) })?; if !merged_path.exists() { bail!( "Merge produced no output for case {}: expected {}", case.case_id, merged_path.display() ); } Ok(merged_path) } /// Atomically replaces a destination file with a source file. /// /// Uses rename for atomicity. Both files must be on the same filesystem. fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> { if destination.exists() { let backup = destination.with_extension("bam.backup"); fs::rename(destination, &backup).with_context(|| { format!( "Failed to backup {} to {}", destination.display(), backup.display() ) })?; if let Err(e) = fs::rename(source, destination) { // Attempt restore, but propagate original error regardless if let Err(restore_err) = fs::rename(&backup, destination) { warn!( "Failed to restore backup {} after failed replacement: {}", backup.display(), restore_err ); } return Err(e).with_context(|| { format!( "Failed to replace {} with {}", destination.display(), source.display() ) }); } // Remove backup and its orphaned index if let Err(e) = remove_bam_with_index(&backup) { log::warn!("Failed to clean up backup file {}: {}", backup.display(), e); } } else { fs::rename(source, destination).with_context(|| { format!( "Failed to move {} to {}", source.display(), destination.display() ) })?; } Ok(()) } /// Indexes a BAM file using samtools. fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> { let mut index_cmd = SamtoolsIndex::from_config(config, bam.to_string_lossy().as_ref()); run!(config, &mut index_cmd)?; let index_path = PathBuf::from(format!("{}.bai", bam.display())); if !index_path.exists() { bail!("Index file not created: {}", index_path.display()); } Ok(()) } /// Merges a new BAM into an existing final BAM atomically. /// /// Assumes both BAMs are coordinate-sorted. Skips re-sorting since /// samtools merge preserves sort order from sorted inputs. fn merge_into_existing_final( source: &Path, destination: &Path, tmp_dir: &Path, config: &Config, ) -> anyhow::Result<()> { info!( " Merging into existing final BAM: {}", destination.display() ); index_bam(source, config) .with_context(|| format!("Failed to index source BAM: {}", source.display()))?; index_bam(destination, config) .with_context(|| format!("Failed to index destination BAM: {}", destination.display()))?; let temp_merged = tmp_dir.join(format!(".merging_{}.bam", Uuid::new_v4())); let input_bams = vec![source.to_path_buf(), destination.to_path_buf()]; let mut merge_cmd = SamtoolsMergeMany::from_config(temp_merged.clone(), input_bams, config); run!(config, &mut merge_cmd) .with_context(|| "Failed to merge source into existing final BAM")?; // No sort needed — merge preserves coordinate order from sorted inputs atomic_replace(&temp_merged, destination)?; index_bam(destination, config) .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?; if let Err(e) = remove_bam_with_index(source) { log::warn!( "Failed to clean up merged source {}: {}", source.display(), e ); } Ok(()) } /// Creates a new final BAM from a merged per-case BAM. /// /// Source is already sorted (from sort_and_index_chunks), so only /// move and index are needed. fn create_new_final( source: &Path, destination: &Path, case: &IdInput, config: &Config, ) -> anyhow::Result<()> { info!(" Creating new final BAM: {}", destination.display()); // Source already sorted — just move and index atomic_replace(source, destination)?; // Adding SM tag in BAM without (if not demultiplexed if so it should be the barcode name) // let _ = read_sm_tag_or_inject( // &destination.to_string_lossy(), // &format!("{}_{}", case.case_id, case.sample_type), // config, // )?; index_bam(destination, config) .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?; Ok(()) } type AlignJobsCase = (Vec, HashMap>); /// Prepares alignment jobs and builds the case-to-BAM mapping. fn prepare_alignment_jobs( bam_to_case: &HashMap, tmp_dir: &Path, config: &Config, ) -> anyhow::Result { let mut align_jobs = Vec::new(); let mut case_bam_map: HashMap> = HashMap::new(); for (bam_path, case) in bam_to_case { let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config); let key = format!("{}_{}", case.case_id, sample_type_dir); let aligned_bam = tmp_dir.join(format!( "{}_{}_{}_{}.bam", Uuid::new_v4(), case.case_id, sample_type_dir, bam_path.file_stem().unwrap().to_string_lossy() )); align_jobs.push(DoradoAlign::from_config( config, bam_path, aligned_bam.clone(), )); case_bam_map.entry(key).or_default().push(aligned_bam); } Ok((align_jobs, case_bam_map)) } /// Sorts and indexes all aligned chunk BAMs in parallel via Slurm. /// /// Returns an updated map with sorted BAM paths replacing the original aligned paths. fn sort_and_index_chunks( case_bam_map: &HashMap>, config: &Config, ) -> anyhow::Result>> { let mut sort_jobs = Vec::new(); let mut original_to_sorted: HashMap = HashMap::new(); // Sort job for each BAM. for bams in case_bam_map.values() { for bam in bams { let sorted_bam = bam.with_extension("sorted.bam"); original_to_sorted.insert(bam.clone(), sorted_bam.clone()); sort_jobs.push(SamtoolsSort::from_config(config, bam, &sorted_bam)); } } info!("Submitting {} sort jobs", sort_jobs.len()); run_many!(config, sort_jobs).context("Sort batch failed")?; // Verify sorted BAM exists. let missing: Vec<_> = original_to_sorted .values() .filter(|p| !p.exists()) .collect(); if !missing.is_empty() { bail!( "Sorting failed: {} file(s) not created. First missing: {}", missing.len(), missing[0].display() ); } // Index every sorted BAM. let index_jobs: Vec = original_to_sorted .values() .map(|sorted| SamtoolsIndex::from_config(config, sorted.to_string_lossy().as_ref())) .collect(); info!("Submitting {} index jobs", index_jobs.len()); run_many!(config, index_jobs).context("Sort batch failed")?; // Verify index exists for each BAM. let missing_indices: Vec<_> = original_to_sorted .values() .map(|p| PathBuf::from(format!("{}.bai", p.display()))) .filter(|p| !p.exists()) .collect(); if !missing_indices.is_empty() { bail!( "Indexing failed: {} index file(s) not created. First missing: {}", missing_indices.len(), missing_indices[0].display() ); } // Remove input BAMs. for original in original_to_sorted.keys() { if let Err(e) = fs::remove_file(original) { log::warn!( "Failed to remove unsorted BAM {}: {}", original.display(), e ); } } // Construct the new map(case_id) -> BAMs let sorted_map: HashMap> = case_bam_map .iter() .map(|(key, bams)| { let sorted_bams: Vec = bams .iter() .filter_map(|b| original_to_sorted.get(b).cloned()) .collect(); (key.clone(), sorted_bams) }) .collect(); Ok(sorted_map) } /// Finalizes per-case BAM files by merging chunks and creating indexed outputs. fn finalize_case_bams( cases: &[IdInput], sorted_bam_map: &HashMap>, tmp_dir: &Path, config: &Config, ) -> anyhow::Result> { let mut finalized = Vec::new(); for case in cases { let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config); let key = format!("{}_{}", case.case_id, sample_type_dir); let Some(chunk_bams) = sorted_bam_map.get(&key) else { info!( "No aligned BAMs found for case {} ({}) — skipping", case.case_id, sample_type_dir ); continue; }; if chunk_bams.is_empty() { continue; } let final_bam_path = build_final_bam_path(case, config)?; info!( "Finalizing case {} ({}): {} chunks → {}", case.case_id, sample_type_dir, chunk_bams.len(), final_bam_path.display() ); let merged_case_bam = merge_chunk_bams(chunk_bams, case, &sample_type_dir, tmp_dir, config)?; if final_bam_path.exists() { merge_into_existing_final(&merged_case_bam, &final_bam_path, tmp_dir, config)?; } else { create_new_final(&merged_case_bam, &final_bam_path, case, config)?; } for chunk in chunk_bams { if chunk.exists() { if let Err(e) = remove_bam_with_index(chunk) { log::warn!("Failed to clean up temp chunk {}: {}", chunk.display(), e); } } } let stats = WGSBamStats::open(&case.case_id, &case.sample_type, config)?; ModkitSummary::initialize(&case.case_id, &case.sample_type, config)?.run()?; info!("✅ Output: {}\n{stats}", final_bam_path.display()); finalized.push(final_bam_path); } Ok(finalized) } /// Normalize barcode strings for matching. /// /// Handles various formats: /// - "barcode01" → "1" /// - "NB01" → "1" /// - "01" → "1" /// - "BC01" → "1" fn normalize_barcode(barcode: &str) -> String { let lower = barcode.to_lowercase(); // Strip known prefixes let stripped = lower .trim_start_matches("barcode") .trim_start_matches("nb") .trim_start_matches("bc") .trim_start_matches('_') .trim_start_matches('-') .trim(); // Parse as number to remove leading zeros if let Ok(num) = stripped.parse::() { return num.to_string(); } // Fallback: return cleaned string stripped.to_string() } lazy_static! { static ref BARCODE_PATTERNS: [regex::Regex; 3] = [ regex::Regex::new(r"(?i)barcode(\d+)").unwrap(), regex::Regex::new(r"(?i)nb(\d+)").unwrap(), regex::Regex::new(r"(?i)bc(\d+)").unwrap(), ]; } /// Extract barcode number from various filename patterns. /// /// Recognizes: /// - `*_barcode01_*` /// - `*_NB01_*` /// - `barcode01/` fn extract_and_normalize_barcode(text: &str) -> Option { for pattern in BARCODE_PATTERNS.iter() { if let Some(caps) = pattern.captures(text) { if let Some(num_str) = caps.get(1) { if let Ok(num) = num_str.as_str().parse::() { return Some(num.to_string()); } } } } None } #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct PipelineMetrics { pub validation_duration: Duration, pub alignment_duration: Duration, pub sort_index_duration: Duration, pub finalize_duration: Duration, pub total_duration: Duration, pub bams_processed: usize, pub cases_finalized: usize, pub bytes_input: u64, pub bytes_output: u64, } impl PipelineMetrics { pub fn throughput_mbps(&self) -> f64 { if self.total_duration.as_secs_f64() == 0.0 { return 0.0; } (self.bytes_input as f64 / 1_000_000.0) / self.total_duration.as_secs_f64() } pub fn save(&self, path: impl AsRef) -> anyhow::Result<()> { let file = BufWriter::new(File::create(path.as_ref())?); serde_json::to_writer_pretty(file, self)?; Ok(()) } } impl std::fmt::Display for PipelineMetrics { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "📊 Pipeline Metrics")?; writeln!(f, " Validation: {:>8.2?}", self.validation_duration)?; writeln!(f, " Alignment: {:>8.2?}", self.alignment_duration)?; writeln!(f, " Sort/Index: {:>8.2?}", self.sort_index_duration)?; writeln!(f, " Finalize: {:>8.2?}", self.finalize_duration)?; writeln!(f, " ─────────────────────")?; writeln!(f, " Total: {:>8.2?}", self.total_duration)?; writeln!(f)?; writeln!(f, " BAMs processed: {:>6}", self.bams_processed)?; writeln!(f, " Cases finalized: {:>6}", self.cases_finalized)?; writeln!( f, " Input: {:>8.2} GiB", self.bytes_input as f64 / 1024.0_f64.powi(3) )?; writeln!( f, " Output: {:>8.2} GiB", self.bytes_output as f64 / 1024.0_f64.powi(3) )?; writeln!(f, " Throughput: {:>8.2} MB/s", self.throughput_mbps()) } } struct Timer(Instant); impl Timer { fn start() -> Self { Self(Instant::now()) } fn elapsed(&self) -> Duration { self.0.elapsed() } } #[cfg(test)] mod tests { use super::*; use log::info; use crate::helpers::test_init; #[test] fn prom_run_process() -> anyhow::Result<()> { test_init(); let config = Config::default(); let dir = "/mnt/beegfs02/scratch/t_steimle/data/prom/20251121_001_01_CD/03/20251121_1531_P2I-00461-B_PBI56020_efa567ea"; let prom_run = PromRun::from_dir(dir)?; let prom = PromRun::open(&prom_run.protocol_run_id, &config)?; info!("{prom}"); prom.process_bams(&config)?; Ok(()) } }