| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767 |
- //! # PromRun Module
- //!
- //! Import, validation, and processing pipeline for Oxford Nanopore PromethION sequencing runs.
- //!
- //! This module provides the [`PromRun`] struct which orchestrates the complete workflow from
- //! raw MinKNOW output ingestion through alignment, sorting, and per-case BAM finalization.
- //!
- //! ## Supported Kit Types
- //!
- //! - **Multiplexed** (`SQK-NBD114-24`, `SQK-NBD112`, `RBK`): BAMs demultiplexed by barcode
- //! - **Non-multiplexed** (`SQK-LSK114`, `SQK-LSK109`): Single-sample runs
- //!
- //! ## Pipeline Stages
- //!
- //! 1. **Import** — Recursive directory scan for BAM, POD5, sample sheets, pore activity logs
- //! 2. **Validation** — Verify BAMs are unaligned (no reference sequences in header)
- //! 3. **Case mapping** — Associate BAMs with cases via barcode extraction or single-case assignment
- //! 4. **Alignment** — Parallel Dorado alignment against reference genome
- //! 5. **Sort/Index** — Coordinate sort and index via samtools (Slurm batch)
- //! 6. **Finalize** — Merge chunks per case, atomic replacement of existing outputs
- //!
- //! ## Directory Structure
- //!
- //! Files are discovered recursively; typical MinKNOW layouts:
- //!
- //! ```text
- //! run_dir/
- //! ├── sample_sheet_*.csv # Required
- //! ├── pore_activity_*.csv # Optional
- //! ├── pod5_pass/barcode*/*.pod5 # Multiplexed
- //! ├── bam_pass/barcode*/*.bam # Multiplexed
- //! ├── pod5/*.pod5 # Non-multiplexed
- //! └── bam/*.bam # Non-multiplexed
- //! ```
- //!
- //! ## Example
- //!
- //! ```rust,ignore
- //! use crate::config::Config;
- //! use crate::collection::promrun::PromRun;
- //!
- //! let config = Config::default();
- //!
- //! // Import and process
- //! let mut run = PromRun::from_dir("/data/prom/20240101_run", &config)?;
- //! run.cases.push(IdInput { case_id: "CASE001".into(), sample_type: "TUM".into(), barcode: "barcode01".into() });
- //! run.process_bams(&config)?;
- //!
- //! // Or load cached
- //! let run = PromRun::open("protocol_run_id", &config)?;
- //! ```
- use std::{
- collections::{BTreeMap, HashMap},
- fmt,
- fs::{self, File},
- io::{BufReader, BufWriter},
- path::{Path, PathBuf},
- time::{Duration, Instant},
- };
- use anyhow::{bail, Context};
- use chrono::{DateTime, Utc};
- use log::{info, warn};
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use rust_htslib::bam::{self, Read};
- use rustc_hash::FxHashSet;
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- use crate::{
- collection::{
- bam_stats::WGSBamStats,
- flowcells::IdInput,
- minknow::{parse_pore_activity_from_reader, MinKnowSampleSheet, PoreStateEntry},
- pod5::Pod5,
- },
- commands::{
- dorado::DoradoAlign,
- modkit::ModkitSummary,
- samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort},
- },
- config::Config,
- helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
- io::bam::read_sm_tag_or_inject,
- locker::SampleLock,
- pipes::InitializeSolo,
- run, run_many,
- };
- /// Represent a complete ONT PromethION sequencing run with all associated files.
- ///
- /// Represents an imported run directory containing BAM files, POD5 raw signal
- /// files, sample sheets, and pore activity logs. Provides JSON serialization
- /// for caching parsed metadata.
- ///
- /// # Directory Structure
- ///
- /// The importer recursively scans for files, supporting various MinKNOW output layouts:
- ///
- /// ```text
- /// run_dir/
- /// ├── sample_sheet_*.csv # Required: MinKNOW sample sheet
- /// ├── pore_activity_*.csv # Optional: pore state logs
- /// │
- /// │ # POD5 files (any structure supported):
- /// ├── pod5/ # Non-multiplexed runs
- /// │ └── *.pod5
- /// ├── pod5_pass/ # Multiplexed runs (pass reads)
- /// │ └── barcode*/
- /// │ └── *.pod5
- /// ├── pod5_fail/ # Failed reads (optional)
- /// │ └── *.pod5
- /// │
- /// │ # BAM files (any structure supported):
- /// ├── bam/ # Non-multiplexed runs
- /// │ └── *.bam
- /// └── bam_pass/ # Multiplexed runs
- /// └── barcode*/
- /// └── *.bam
- /// ```
- ///
- /// Files are discovered recursively regardless of subdirectory naming.
- ///
- /// # Example
- ///
- /// ```rust,ignore
- /// let config = Config::default();
- ///
- /// // Import a new run
- /// let run = PromRun::from_dir("/data/runs/20240101_run", &config)?;
- ///
- /// // Load a cached run
- /// let run = PromRun::open("protocol_run_id_here", &config)?;
- ///
- /// println!("Flowcell: {}", run.flow_cell_id);
- /// println!("BAM files: {}", run.bams.len());
- /// println!("POD5 files: {}", run.pod5s.len());
- /// ```
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct PromRun {
- /// Root directory of the sequencing run.
- pub dir: PathBuf,
- /// Timestamp when this run was imported/parsed.
- pub import_date: DateTime<Utc>,
- /// Unique identifier for the protocol run (from sample sheet).
- pub protocol_run_id: String,
- /// Flowcell position identifier (e.g., device slot or port).
- pub position_id: String,
- /// Flowcell barcode/identifier (e.g., `FAB12345`).
- pub flow_cell_id: String,
- /// Sample ID from the sample sheet.
- pub sample_id: String,
- /// Experiment ID from the sample sheet.
- pub experiment_id: String,
- /// Flowcell product code (e.g., `FLO-MIN106`, `FLO-PRO002`).
- pub flow_cell_product_code: String,
- /// Library preparation kit identifier (e.g., `SQK-LSK114`).
- pub kit: String,
- /// Associated case identifiers for clinical/research tracking.
- pub cases: Vec<IdInput>,
- /// All POD5 raw signal files found in the run directory.
- pub pod5s: Vec<Pod5>,
- /// All BAM files found in the run directory.
- pub bams: Vec<PromBam>,
- /// Pore activity/state log entries, if available.
- pub pore_activity: Option<Vec<PoreStateEntry>>,
- }
- impl PromRun {
- /// Imports a sequencing run from a directory.
- ///
- /// Recursively scans the directory for BAM files, POD5 files, sample sheets,
- /// and pore activity logs. Parses all files in parallel and caches the result.
- ///
- /// # Arguments
- ///
- /// * `dir` - Path to the run directory (must exist and be a directory)
- /// * `config` - Application configuration (provides thread count and cache paths)
- ///
- /// # Returns
- ///
- /// Returns `Ok(PromRun)` on success, or an error if:
- /// - `dir` is not a directory
- /// - No sample sheet (`sample_sheet_*.csv`) is found
- /// - Sample sheet parsing fails
- /// - Thread pool creation fails
- ///
- ///
- /// # Performance
- ///
- /// BAM and POD5 file parsing is parallelized using Rayon with the thread
- /// count from `config.threads`.
- ///
- /// # Example
- ///
- /// ```rust,ignore
- /// let config = Config::default();
- /// let run = PromRun::from_dir("/data/runs/my_run", &config)?;
- /// println!("Imported {} BAM files", run.bams.len());
- /// ```
- pub fn from_dir(dir: impl AsRef<Path>) -> anyhow::Result<Self> {
- let dir = dir.as_ref().to_path_buf();
- if !dir.is_dir() {
- anyhow::bail!(
- "Failed to import Run: input path is not a directory: {}",
- dir.display()
- );
- }
- // Collect file paths by type
- let mut bam_paths = Vec::new();
- let mut sample_sheet_path = None;
- let mut pod5_paths = Vec::new();
- let mut pore_activity_path = None;
- for file in list_files_recursive(&dir) {
- let name = file.file_name().and_then(|s| s.to_str()).unwrap_or("");
- let ext = file.extension().and_then(|e| e.to_str()).unwrap_or("");
- match ext {
- "bam" => bam_paths.push(file),
- "pod5" => pod5_paths.push(file),
- "csv" => {
- if name.starts_with("sample_sheet") {
- sample_sheet_path = Some(file);
- } else if name.starts_with("pore_activity") {
- pore_activity_path = Some(file);
- }
- }
- _ => {}
- }
- }
- // Parse required sample sheet
- let sample_sheet = sample_sheet_path
- .ok_or_else(|| anyhow::anyhow!("No sample_sheet_*.csv found in {}", dir.display()))
- .and_then(MinKnowSampleSheet::from_path)?;
- // Parse optional pore activity
- let pore_activity = pore_activity_path
- .and_then(|path| File::open(path).ok())
- .and_then(|mut reader| parse_pore_activity_from_reader(&mut reader).ok());
- // Parse BAM files in parallel
- let bams: Vec<PromBam> = bam_paths
- .par_iter()
- .filter_map(|p| match PromBam::from_path(p) {
- Ok(bam) => Some(bam),
- Err(e) => {
- log::warn!("Failed to parse BAM {}: {}", p.display(), e);
- None
- }
- })
- .collect();
- // Parse POD5 files in parallel
- let pod5s: Vec<Pod5> = pod5_paths
- .par_iter()
- .filter_map(|p| match Pod5::from_path(p) {
- Ok(pod5) => Some(pod5),
- Err(e) => {
- log::warn!("Failed to parse POD5 {}: {}", p.display(), e);
- None
- }
- })
- .collect();
- let prom_run = Self {
- dir,
- import_date: Utc::now(),
- protocol_run_id: sample_sheet.protocol_run_id,
- position_id: sample_sheet.position_id,
- flow_cell_id: sample_sheet.flow_cell_id,
- sample_id: sample_sheet.sample_id,
- experiment_id: sample_sheet.experiment_id,
- flow_cell_product_code: sample_sheet.flow_cell_product_code,
- kit: sample_sheet.kit,
- cases: Vec::new(),
- pod5s,
- bams,
- pore_activity,
- };
- // Cache to disk
- // prom_run.save_json(prom_run.cache_path(config))?;
- Ok(prom_run)
- }
- /// Partitions BAMs into unaligned and already-aligned groups.
- ///
- /// Returns (unaligned, already_aligned) BAM lists.
- fn partition_bams_by_alignment<'a>(
- &self,
- pass_bams: &[&'a PromBam],
- ) -> (Vec<&'a PromBam>, Vec<&'a PromBam>) {
- let results: Vec<_> = pass_bams
- .par_iter()
- .map(|&bam| {
- let is_aligned = match bam::Reader::from_path(&bam.path) {
- Ok(reader) => {
- let header = bam::Header::from_template(reader.header());
- match get_genome_sizes(&header) {
- Ok(sizes) => !sizes.is_empty(),
- Err(_) => {
- warn!(
- "Failed to parse header for {}, assuming unaligned",
- bam.path.display()
- );
- false
- }
- }
- }
- Err(e) => {
- warn!("Failed to open BAM {}: {}", bam.path.display(), e);
- false
- }
- };
- (bam, is_aligned)
- })
- .collect();
- let mut unaligned = Vec::new();
- let mut aligned = Vec::new();
- for (bam, is_aligned) in results {
- if is_aligned {
- aligned.push(bam);
- } else {
- unaligned.push(bam);
- }
- }
- (unaligned, aligned)
- }
- /// Maps BAM files to cases based on kit type (multiplexed vs non-multiplexed).
- ///
- /// Returns a mapping from BAM path to case, plus a list of unmatched BAMs.
- fn map_bams_to_cases<'a>(
- &'a self,
- pass_bams: &[&PromBam],
- kit_type: KitType,
- ) -> anyhow::Result<(HashMap<PathBuf, &'a IdInput>, Vec<PathBuf>)> {
- let mut bam_to_case: HashMap<PathBuf, &IdInput> = HashMap::new();
- let mut unmatched: Vec<PathBuf> = Vec::new();
- match kit_type {
- KitType::NonMultiplexed => {
- if self.cases.len() != 1 {
- bail!("Non-multiplexed run requires exactly one case");
- }
- let single_case = self.cases.first().ok_or_else(|| {
- anyhow::anyhow!("Non-multiplexed run requires exactly one case")
- })?;
- for bam in pass_bams {
- bam_to_case.insert(bam.path.clone(), single_case);
- }
- }
- KitType::Multiplexed => {
- // Construct map(normalized barcode) -> IdInput
- let barcode_to_case: HashMap<String, &IdInput> = self
- .cases
- .iter()
- .filter(|c| !c.barcode.is_empty())
- .map(|c| (normalize_barcode(&c.barcode), c))
- .collect();
- for bam in pass_bams {
- let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or("");
- let barcode_opt = extract_and_normalize_barcode(filename).or_else(|| {
- bam.path
- .parent()
- .and_then(|p| p.file_name())
- .and_then(|n| n.to_str())
- .and_then(extract_and_normalize_barcode)
- });
- match barcode_opt.and_then(|bc| barcode_to_case.get(&bc)) {
- Some(case) => {
- bam_to_case.insert(bam.path.clone(), *case);
- }
- None => {
- unmatched.push(bam.path.clone());
- }
- }
- }
- }
- }
- Ok((bam_to_case, unmatched))
- }
- /// Process BAM files from a PromethION run: align, sort, merge, and index per case.
- ///
- /// This method handles both multiplexed (SQK-NBD114-24) and non-multiplexed (SQK-LSK114)
- /// sequencing runs. It skips basecalling and works directly with existing unaligned BAMs.
- ///
- /// # Workflow
- ///
- /// 1. **Validate cases** based on kit type (multiplexed vs non-multiplexed)
- /// 2. **Map BAMs to cases** using barcode extraction or single-case assignment
- /// 3. **Align BAMs in parallel** using Dorado
- /// 4. **Sort aligned chunks in parallel**
- /// 5. **Merge and finalize** per case with sorting and indexing
- ///
- /// # Kit Types
- ///
- /// - **SQK-NBD114-24** (multiplexed): BAMs in `bam_pass/barcodeXX/`, matched by barcode
- /// - **SQK-LSK114** (non-multiplexed): All BAMs in `bam_pass/` assigned to single case
- ///
- /// # Arguments
- ///
- /// * `config` - Application configuration with alignment settings and paths
- ///
- /// # Returns
- ///
- /// Returns `Ok(())` on success, or an error if validation or processing fails.
- ///
- /// # Errors
- ///
- /// This function will return an error if:
- /// - No cases are associated with the run
- /// - Non-multiplexed run has != 1 case
- /// - No BAMs are available for processing
- /// - BAM files are already aligned
- /// - External commands fail (Dorado, samtools, Slurm)
- pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
- let lock_dir = format!("{}/locks", config.result_dir);
- let _lock = SampleLock::acquire(&lock_dir, &self.protocol_run_id, "process_bams")
- .with_context(|| format!("Cannot start Processing BAMs for {}", self.dir.display()))?;
- let pipeline_start = Timer::start();
- let mut metrics = PipelineMetrics::default();
- let tmp_dir = PathBuf::from(&config.tmp_dir);
- let mut temp_guard = TempFileGuard::new();
- // =====================================================================
- // Step 1: Validate preconditions
- // =====================================================================
- if self.cases.is_empty() {
- bail!(
- "Run {} has no associated cases. Add cases before processing.",
- self.protocol_run_id
- );
- }
- let kit_type = KitType::detect(&self.kit)?;
- if kit_type == KitType::NonMultiplexed && self.cases.len() != 1 {
- bail!(
- "Non-multiplexed run (kit: {}) requires exactly 1 case, found {}",
- self.kit,
- self.cases.len()
- );
- }
- info!(
- "Processing PromRun: {} (flowcell: {}, kit: {} [{:?}], {} BAMs, {} cases)",
- self.protocol_run_id,
- self.flow_cell_id,
- self.kit,
- kit_type,
- self.bams.len(),
- self.cases.len()
- );
- // =====================================================================
- // Step 2: Filter and validate BAMs
- // =====================================================================
- let validation_start = Timer::start();
- // Filter out BAMs already merged into final outputs
- let candidate_bams = self.filter_already_merged(config);
- if candidate_bams.is_empty() {
- info!(
- "🎉 Run {}: all BAMs already merged — nothing to do",
- self.protocol_run_id
- );
- return Ok(());
- }
- let pass_bams = filter_pass_bams(&candidate_bams, kit_type);
- if pass_bams.is_empty() {
- bail!("No BAM files found in bam_pass directories");
- }
- info!("Filtered to {} BAM files from bam_pass", pass_bams.len());
- info!("Step 1/5: Validating BAMs are unaligned");
- let (unaligned_bams, aligned_bams) = self.partition_bams_by_alignment(&pass_bams);
- if !aligned_bams.is_empty() {
- info!(
- "Found {} already-aligned BAMs — will skip alignment for these",
- aligned_bams.len()
- );
- }
- if unaligned_bams.is_empty() && aligned_bams.is_empty() {
- bail!("No valid BAM files to process");
- }
- metrics.validation_duration = validation_start.elapsed();
- metrics.bams_processed = pass_bams.len();
- metrics.bytes_input = pass_bams.iter().map(|b| b.bam_size).sum();
- info!(
- "✅ {} unaligned, {} already aligned",
- unaligned_bams.len(),
- aligned_bams.len()
- );
- // =====================================================================
- // Step 3: Map BAMs to cases
- // =====================================================================
- info!("Step 2/5: Mapping BAM files to cases");
- // Map unaligned BAMs (need alignment)
- let (unaligned_bam_to_case, unmatched_unaligned) =
- self.map_bams_to_cases(&unaligned_bams, kit_type)?;
- // Map aligned BAMs (skip alignment, go straight to sort/merge)
- let (aligned_bam_to_case, unmatched_aligned) =
- self.map_bams_to_cases(&aligned_bams, kit_type)?;
- let total_mapped = unaligned_bam_to_case.len() + aligned_bam_to_case.len();
- let total_unmatched = unmatched_unaligned.len() + unmatched_aligned.len();
- if total_mapped == 0 {
- bail!("No BAMs were successfully mapped to cases");
- }
- info!(
- "✅ Mapped {} BAMs to cases ({} unmatched)",
- total_mapped, total_unmatched
- );
- // =====================================================================
- // Step 4: Prepare and run alignment
- // =====================================================================
- let align_start = Timer::start();
- let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
- // Add already-aligned BAMs directly to the map (no alignment needed)
- for (bam_path, case) in &aligned_bam_to_case {
- let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
- let key = format!("{}_{}", case.case_id, sample_type_dir);
- case_bam_map.entry(key).or_default().push(bam_path.clone());
- }
- if !unaligned_bam_to_case.is_empty() {
- let (align_jobs, aligned_case_map) =
- prepare_alignment_jobs(&unaligned_bam_to_case, &tmp_dir, config)?;
- // Track temp files for cleanup
- for bams in aligned_case_map.values() {
- temp_guard.track_many(bams.clone());
- }
- info!("Step 3/5: Running {} alignment jobs", align_jobs.len());
- run_many!(config, align_jobs).context("Alignment batch failed")?;
- self.verify_outputs_exist(&aligned_case_map, "alignment")?;
- // Merge newly aligned BAMs into the case map
- for (key, bams) in aligned_case_map {
- case_bam_map.entry(key).or_default().extend(bams);
- }
- info!("✅ Alignments completed");
- } else {
- info!("Step 3/5: Skipping alignment — all BAMs already aligned");
- }
- metrics.alignment_duration = align_start.elapsed();
- // =====================================================================
- // Step 5: Sort and index chunks
- // =====================================================================
- info!("Step 4/5: Sorting and indexing chunks");
- let sort_start = Timer::start();
- let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?;
- // Update guard to track sorted files instead
- temp_guard.clear();
- for bams in sorted_bam_map.values() {
- temp_guard.track_many(bams.clone());
- }
- metrics.sort_index_duration = sort_start.elapsed();
- info!("✅ Sorted {} chunk BAMs", sorted_bam_map.len());
- // =====================================================================
- // Step 6: Merge and finalize per case
- // =====================================================================
- info!("Step 5/5: Finalizing per-case BAMs");
- let finalize_start = Timer::start();
- let finalized = finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?;
- metrics.finalize_duration = finalize_start.elapsed();
- metrics.cases_finalized = finalized.len();
- metrics.bytes_output = finalized
- .iter()
- .filter_map(|p| fs::metadata(p).ok())
- .map(|m| m.len())
- .sum();
- // Success — disarm cleanup
- temp_guard.disarm();
- metrics.total_duration = pipeline_start.elapsed();
- info!("{metrics}");
- // TODO: save metrics
- info!(
- "🎉 Pipeline completed for run: {} (flowcell: {})",
- self.protocol_run_id, self.flow_cell_id
- );
- Ok(())
- }
- fn verify_outputs_exist(
- &self,
- case_bam_map: &HashMap<String, Vec<PathBuf>>,
- stage: &str,
- ) -> anyhow::Result<()> {
- let missing: Vec<_> = case_bam_map
- .values()
- .flatten()
- .filter(|p| !p.exists())
- .collect();
- if !missing.is_empty() {
- bail!(
- "{} failed: {} output file(s) not created. First missing: {}",
- stage,
- missing.len(),
- missing[0].display()
- );
- }
- Ok(())
- }
- /// Serializes this run to a JSON file.
- ///
- /// # Arguments
- ///
- /// * `path` - Output file path (will be created or overwritten)
- ///
- /// # Errors
- ///
- /// Returns an error if file creation or JSON serialization fails.
- pub fn save_json(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
- let path = path.as_ref();
- let file = BufWriter::new(
- File::create(path)
- .with_context(|| format!("Failed to create JSON file: {}", path.display()))?,
- );
- serde_json::to_writer_pretty(file, self)
- .with_context(|| format!("Failed to serialize PromRun to {}", path.display()))?;
- Ok(())
- }
- /// Deserializes a run from a JSON file.
- ///
- /// # Arguments
- ///
- /// * `path` - Path to a previously saved JSON cache file
- ///
- /// # Errors
- ///
- /// Returns an error if the file cannot be read or parsed.
- pub fn load_json(path: impl AsRef<Path>) -> anyhow::Result<Self> {
- let path = path.as_ref();
- let file = BufReader::new(
- File::open(path)
- .with_context(|| format!("Failed to open JSON file: {}", path.display()))?,
- );
- let data = serde_json::from_reader(file)
- .with_context(|| format!("Failed to parse PromRun from {}", path.display()))?;
- Ok(data)
- }
- /// Returns the cache file path for this run.
- ///
- /// The cache path is `{config.run_cache_dir}/{protocol_run_id}.json`.
- #[must_use]
- pub fn cache_path(&self, config: &Config) -> PathBuf {
- let mut path = PathBuf::from(&config.run_cache_dir);
- path.push(format!("{}.json", self.protocol_run_id));
- path
- }
- /// Opens a previously cached run by its protocol run ID.
- ///
- /// # Arguments
- ///
- /// * `run_id` - The protocol run ID (filename stem in the cache directory)
- /// * `config` - Application configuration with `run_cache_dir`
- ///
- /// # Example
- ///
- /// ```rust,ignore
- /// let run = PromRun::open("abc123-def456", &config)?;
- /// ```
- pub fn open(run_id: &str, config: &Config) -> anyhow::Result<Self> {
- let mut path = PathBuf::from(&config.run_cache_dir);
- path.push(format!("{run_id}.json"));
- Self::load_json(path)
- }
- /// Returns the total size of all BAM files in bytes.
- #[must_use]
- pub fn total_bam_size(&self) -> u64 {
- self.bams.iter().map(|b| b.bam_size).sum()
- }
- /// Returns the total size of all POD5 files in bytes.
- #[must_use]
- pub fn total_pod5_size(&self) -> u64 {
- self.pod5s.iter().map(|p| p.file_size).sum()
- }
- /// Returns BAMs not yet merged into any case's final output.
- ///
- /// For each case, checks which source files are already in the final BAM
- /// and excludes them from processing.
- fn filter_already_merged(&self, config: &Config) -> Vec<&PromBam> {
- // Collect all source filenames already merged across all cases
- let mut all_existing_sources: FxHashSet<String> = FxHashSet::default();
- for case in &self.cases {
- let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
- if !final_bam_path.exists() {
- info!(
- "No existing final BAM for case {} — all matching inputs are new",
- case.case_id
- );
- continue;
- }
- match WGSBamStats::from_bam(&final_bam_path, config) {
- Ok(stats) => {
- let sources = stats.source_filenames();
- info!(
- "Case {} has {} existing source files",
- case.case_id,
- sources.len()
- );
- all_existing_sources.extend(sources);
- }
- Err(e) => {
- warn!(
- "Could not load stats for case {} ({}): {}",
- case.case_id,
- final_bam_path.display(),
- e
- );
- }
- }
- }
- if all_existing_sources.is_empty() {
- info!(
- "Run {}: no existing sources found, all {} BAMs are new",
- self.protocol_run_id,
- self.bams.len()
- );
- return self.bams.iter().collect();
- }
- let (new_bams, skipped): (Vec<_>, Vec<_>) = self.bams.iter().partition(|bam| {
- let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or("");
- !all_existing_sources.contains(filename)
- });
- if !skipped.is_empty() {
- info!(
- "Run {}: skipping {} BAM(s) already merged: {}{}",
- self.protocol_run_id,
- skipped.len(),
- skipped
- .iter()
- .take(3)
- .filter_map(|b| b.path.file_name())
- .map(|n| n.to_string_lossy())
- .collect::<Vec<_>>()
- .join(", "),
- if skipped.len() > 3 { "..." } else { "" }
- );
- }
- info!(
- "Run {}: {} new BAM(s) to process",
- self.protocol_run_id,
- new_bams.len()
- );
- new_bams
- }
- }
- /// Kit type classification for workflow branching.
- #[derive(Debug, Clone, Copy, PartialEq, Eq)]
- pub enum KitType {
- Multiplexed,
- NonMultiplexed,
- }
- impl KitType {
- /// Detects kit type from kit name string.
- ///
- /// Known multiplexed kits: NBD114, NBD112, RBK
- /// Known non-multiplexed kits: LSK114, LSK109
- pub fn detect(kit: &str) -> anyhow::Result<Self> {
- const MULTIPLEXED_PATTERNS: &[&str] = &["NBD114", "NBD112", "RBK"];
- const NON_MULTIPLEXED_PATTERNS: &[&str] = &["LSK114", "LSK109"];
- let kit_upper = kit.to_uppercase();
- for pattern in MULTIPLEXED_PATTERNS {
- if kit_upper.contains(pattern) {
- return Ok(Self::Multiplexed);
- }
- }
- for pattern in NON_MULTIPLEXED_PATTERNS {
- if kit_upper.contains(pattern) {
- return Ok(Self::NonMultiplexed);
- }
- }
- bail!(
- "Unknown kit type '{}'. Expected one of: {:?} (multiplexed) or {:?} (non-multiplexed)",
- kit,
- MULTIPLEXED_PATTERNS,
- NON_MULTIPLEXED_PATTERNS
- )
- }
- }
- /// Statistics for files in a single directory.
- #[derive(Default)]
- struct DirStats {
- pod5_count: usize,
- pod5_size: u64,
- bam_count: usize,
- bam_size: u64,
- }
- impl fmt::Display for PromRun {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- /// Converts bytes to GiB.
- fn to_gib(bytes: u64) -> f64 {
- bytes as f64 / 1024.0_f64.powi(3)
- }
- let mut dir_stats: BTreeMap<String, DirStats> = BTreeMap::new();
- // Aggregate POD5 stats by directory
- for pod5 in &self.pod5s {
- let rel = pod5.path.strip_prefix(&self.dir).unwrap_or(&pod5.path);
- let dir = rel.parent().unwrap_or(Path::new("."));
- let key = dir.display().to_string();
- let entry = dir_stats.entry(key).or_default();
- entry.pod5_count += 1;
- entry.pod5_size += pod5.file_size;
- }
- // Aggregate BAM stats by directory
- for bam in &self.bams {
- let rel = bam.path.strip_prefix(&self.dir).unwrap_or(&bam.path);
- let dir = rel.parent().unwrap_or(Path::new("."));
- let key = dir.display().to_string();
- let entry = dir_stats.entry(key).or_default();
- entry.bam_count += 1;
- entry.bam_size += bam.bam_size;
- }
- // Header information
- writeln!(f, "📦 PromRun")?;
- writeln!(f, " dir : {}", self.dir.display())?;
- writeln!(f, " imported : {}", self.import_date)?;
- writeln!(f, " run id : {}", self.protocol_run_id)?;
- writeln!(f, " position : {}", self.position_id)?;
- writeln!(
- f,
- " flow cell : {} ({})",
- self.flow_cell_id, self.flow_cell_product_code
- )?;
- writeln!(f, " sample id : {}", self.sample_id)?;
- writeln!(f, " experiment : {}", self.experiment_id)?;
- writeln!(f, " kit : {}", self.kit)?;
- writeln!(f, " cases : {}", self.cases.len())?;
- match &self.pore_activity {
- Some(pa) => writeln!(f, " pore act. : {} entries", pa.len())?,
- None => writeln!(f, " pore act. : none")?,
- }
- writeln!(f)?;
- writeln!(
- f,
- "📁 Files by directory (relative to {})",
- self.dir.display()
- )?;
- if dir_stats.is_empty() {
- writeln!(f, " (no files)")?;
- return Ok(());
- }
- for (dir, stats) in dir_stats {
- writeln!(f, " - {dir}")?;
- if stats.pod5_count > 0 {
- writeln!(
- f,
- " POD5 : {:>3} files, {:6.2} GiB",
- stats.pod5_count,
- to_gib(stats.pod5_size)
- )?;
- }
- if stats.bam_count > 0 {
- writeln!(
- f,
- " BAM : {:>3} files, {:6.2} GiB",
- stats.bam_count,
- to_gib(stats.bam_size)
- )?;
- }
- }
- Ok(())
- }
- }
- /// Map sample type to output directory name.
- fn map_sample_type_to_dir(sample_type: &str, config: &Config) -> String {
- let normalized = sample_type.to_lowercase();
- match normalized.as_str() {
- "nor" | "normal" | "mrd" => config.normal_name.clone(),
- _ => config.tumoral_name.clone(),
- }
- }
- /// Metadata extracted from an ONT PromethION BAM header.
- ///
- /// Wraps standard SAM header tags specifically for MinKNOW outputs.
- ///
- /// # Parsed Tags
- /// * `@RG` (Read Group): Extracts run ID, basecall model, flowcell ID, and sample info.
- /// * `@PG` (Program): Extracts MinKNOW version and GPU telemetry.
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct PromBam {
- /// Absolute path to the BAM file.
- pub path: PathBuf,
- /// Last modification timestamp of the file.
- pub modified: DateTime<Utc>,
- /// File size in bytes.
- pub bam_size: u64,
- /// Unique run identifier from the `@RG DS:runid=` field.
- pub run_id: String,
- /// Basecalling model name (e.g., `dna_r10.4.1_e8.2_400bps_sup@v4.2.0`).
- pub basecall_model: String,
- /// Modified base detection models, space-separated if multiple.
- pub modbase_models: String,
- /// Instrument model (e.g., `PromethION`, `P2I`).
- pub instrument: String,
- /// Flowcell barcode identifier (e.g., `PBI55810`).
- pub flowcell_id: String,
- /// Sample identifier from the sample sheet.
- pub sample: String,
- /// Library identifier.
- pub library: String,
- /// Full read group ID string from `@RG ID:`.
- pub read_group_id: String,
- /// Sequencing timestamp in ISO 8601 format from `@RG DT:`.
- pub timestamp: String,
- /// GPU information from MinKNOW `@PG DS:`.
- pub gpu_info: String,
- /// MinKNOW version.
- pub minknow_version: String,
- }
- impl PromBam {
- /// Parses BAM header metadata from the file at the given path.
- ///
- /// Opens the BAM file, reads the SAM header, and extracts ONT-specific
- /// metadata from `@RG` and `@PG` records.
- ///
- /// # Arguments
- ///
- /// * `path` - Path to a BAM file (must be readable and valid BAM format)
- ///
- /// # Returns
- ///
- /// Returns `Ok(PromBam)` with populated fields, or an error if:
- /// - The file cannot be opened or read
- /// - The file is not a valid BAM format
- /// - The header cannot be parsed as UTF-8
- ///
- /// # Notes
- ///
- /// - Fields may be empty strings if the corresponding header tags are missing
- /// - Only processes `@RG` records with `PL:ONT` (Oxford Nanopore platform)
- /// - Only processes `@PG` records with `PN:minknow`
- ///
- /// # Example
- ///
- /// ```rust,ignore
- /// let bam = PromBam::from_path("sample.bam")?;
- /// assert!(!bam.run_id.is_empty());
- /// ```
- pub fn from_path(path: impl AsRef<Path>) -> anyhow::Result<Self> {
- let path = path.as_ref().to_path_buf();
- // Retrieve file metadata
- let meta = fs::metadata(&path)
- .with_context(|| format!("Failed to read metadata for {}", path.display()))?;
- let modified: DateTime<Utc> = meta
- .modified()
- .map(DateTime::<Utc>::from)
- .with_context(|| format!("Failed to get modification time for {}", path.display()))?;
- let bam_size = meta.len();
- // Open BAM and read header
- let reader = bam::Reader::from_path(&path)
- .with_context(|| format!("Failed to open BAM file: {}", path.display()))?;
- let header = reader.header().clone();
- let text =
- std::str::from_utf8(header.as_bytes()).context("BAM header contains invalid UTF-8")?;
- // Initialize with defaults
- let mut prom = PromBam {
- path: path.clone(),
- modified,
- bam_size,
- run_id: String::new(),
- basecall_model: String::new(),
- modbase_models: String::new(),
- instrument: String::new(),
- flowcell_id: String::new(),
- sample: String::new(),
- library: String::new(),
- read_group_id: String::new(),
- timestamp: String::new(),
- gpu_info: String::new(),
- minknow_version: String::new(),
- };
- let mut found_ont_rg = false;
- let mut found_other_rg = false;
- // Parse header lines
- for line in text.lines() {
- if !line.starts_with('@') {
- continue;
- }
- let mut fields = line.split('\t');
- let Some(tag) = fields.next() else {
- continue;
- };
- let tag = tag.trim_start_matches('@');
- // Build key-value map from tab-separated fields
- let kv: Vec<(&str, &str)> = fields
- .filter_map(|f| {
- let mut it = f.splitn(2, ':');
- Some((it.next()?, it.next().unwrap_or("")))
- })
- .collect();
- match tag {
- "PG" => Self::parse_pg_record(&kv, &mut prom),
- "RG" => {
- if Self::parse_rg_record(&kv, &mut prom) {
- found_ont_rg = true;
- } else {
- found_other_rg = true;
- }
- }
- _ => {}
- }
- }
- if !found_ont_rg && found_other_rg {
- warn!(
- "BAM {} contains non-ONT read groups but no ONT read groups. \
- This may not be a Nanopore BAM file.",
- path.display()
- );
- }
- if !found_ont_rg && !found_other_rg {
- warn!(
- "BAM {} contains no read groups. Metadata will be incomplete.",
- path.display()
- );
- }
- Ok(prom)
- }
- /// Parses `@PG` (Program) header record for MinKNOW-specific fields.
- fn parse_pg_record(kv: &[(&str, &str)], prom: &mut PromBam) {
- // Only process MinKNOW program records
- let is_minknow = kv.iter().any(|(k, v)| *k == "PN" && *v == "minknow");
- if !is_minknow {
- return;
- }
- for (k, v) in kv {
- match *k {
- "DS" => prom.gpu_info = (*v).to_string(),
- "VN" => prom.minknow_version = (*v).to_string(),
- _ => {}
- }
- }
- }
- /// Parses `@RG` (Read Group) header record for ONT-specific fields.
- fn parse_rg_record(kv: &[(&str, &str)], prom: &mut PromBam) -> bool {
- // Only process ONT platform read groups
- let platform = kv
- .iter()
- .find(|(k, _)| *k == "PL")
- .map(|(_, v)| *v)
- .unwrap_or("");
- if platform != "ONT" {
- return false;
- }
- // Parse composite DS field: runid=... basecall_model=... modbase_models=...
- if let Some((_, ds_raw)) = kv.iter().find(|(k, _)| *k == "DS") {
- for part in ds_raw.split_whitespace() {
- if let Some(val) = part.strip_prefix("runid=") {
- prom.run_id = val.to_string();
- } else if let Some(val) = part.strip_prefix("basecall_model=") {
- prom.basecall_model = val.to_string();
- } else if let Some(val) = part.strip_prefix("modbase_models=") {
- // Remove commas from modbase_models list
- prom.modbase_models = val.replace(',', " ");
- }
- }
- }
- // Extract standard SAM tags
- for (k, v) in kv {
- match *k {
- "ID" => prom.read_group_id = (*v).to_string(),
- "DT" => prom.timestamp = (*v).to_string(),
- "LB" => prom.library = (*v).to_string(),
- "PM" => prom.instrument = (*v).to_string(),
- "PU" => prom.flowcell_id = (*v).to_string(),
- "SM" => prom.sample = (*v).to_string(),
- _ => {}
- }
- }
- true
- }
- }
- impl fmt::Display for PromBam {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "PromBam")?;
- writeln!(f, " Path: {}", self.path.display())?;
- writeln!(f, " Modified: {}", self.modified)?;
- writeln!(
- f,
- " BamSize: {:.2} GiB",
- self.bam_size as f64 / (1024.0_f64.powi(3))
- )?;
- writeln!(f)?;
- writeln!(f, " Run ID: {}", self.run_id)?;
- writeln!(f, " Basecaller: {}", self.basecall_model)?;
- writeln!(f, " Modbase Models: {}", self.modbase_models)?;
- writeln!(f)?;
- writeln!(f, " Instrument: {}", self.instrument)?;
- writeln!(f, " Flowcell ID: {}", self.flowcell_id)?;
- writeln!(f, " Sample: {}", self.sample)?;
- writeln!(f, " Library: {}", self.library)?;
- writeln!(f)?;
- writeln!(f, " RG ID: {}", self.read_group_id)?;
- writeln!(f, " Timestamp: {}", self.timestamp)?;
- writeln!(f)?;
- writeln!(f, " GPU: {}", self.gpu_info)?;
- writeln!(f, " MinKNOW Ver: {}", self.minknow_version)
- }
- }
- /// Filters BAMs to only include those from bam_pass directories.
- fn filter_pass_bams<'a>(bams: &[&'a PromBam], kit_type: KitType) -> Vec<&'a PromBam> {
- bams.iter()
- .filter(|bam| {
- let p = bam.path.to_string_lossy();
- if p.contains("bam_fail") {
- return false;
- }
- match kit_type {
- KitType::Multiplexed => p.contains("bam_pass"),
- KitType::NonMultiplexed => p.contains("/bam/") || p.contains("bam_pass"),
- }
- })
- .copied()
- .collect()
- }
- /// Builds the final BAM path for a case.
- fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result<PathBuf> {
- let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
- if let Some(parent) = final_bam_path.parent() {
- fs::create_dir_all(parent)
- .with_context(|| format!("Failed to create output directory: {}", parent.display()))?;
- }
- Ok(final_bam_path)
- }
- /// Merges multiple chunk BAMs into a single BAM.
- ///
- /// If only one chunk exists, returns it directly (no merge needed).
- fn merge_chunk_bams(
- chunk_bams: &[PathBuf],
- case: &IdInput,
- sample_type_dir: &str,
- tmp_dir: &Path,
- config: &Config,
- ) -> anyhow::Result<PathBuf> {
- if chunk_bams.len() == 1 {
- return Ok(chunk_bams[0].clone());
- }
- let merged_path = tmp_dir.join(format!(
- "{}_{}_{}_merged.bam",
- Uuid::new_v4(),
- case.case_id,
- sample_type_dir
- ));
- info!(
- " Merging {} chunks for case {} → {}",
- chunk_bams.len(),
- case.case_id,
- merged_path.file_name().unwrap().to_string_lossy()
- );
- let mut merge_cmd =
- SamtoolsMergeMany::from_config(merged_path.clone(), chunk_bams.to_vec(), config);
- run!(config, &mut merge_cmd).with_context(|| {
- format!(
- "Failed to merge {} chunk BAMs for case {}",
- chunk_bams.len(),
- case.case_id
- )
- })?;
- if !merged_path.exists() {
- bail!(
- "Merge produced no output for case {}: expected {}",
- case.case_id,
- merged_path.display()
- );
- }
- Ok(merged_path)
- }
- /// Atomically replaces a destination file with a source file.
- ///
- /// Uses rename for atomicity. Both files must be on the same filesystem.
- fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
- if destination.exists() {
- let backup = destination.with_extension("bam.backup");
- fs::rename(destination, &backup).with_context(|| {
- format!(
- "Failed to backup {} to {}",
- destination.display(),
- backup.display()
- )
- })?;
- if let Err(e) = fs::rename(source, destination) {
- // Attempt restore, but propagate original error regardless
- if let Err(restore_err) = fs::rename(&backup, destination) {
- warn!(
- "Failed to restore backup {} after failed replacement: {}",
- backup.display(),
- restore_err
- );
- }
- return Err(e).with_context(|| {
- format!(
- "Failed to replace {} with {}",
- destination.display(),
- source.display()
- )
- });
- }
- // Remove backup and its orphaned index
- if let Err(e) = remove_bam_with_index(&backup) {
- log::warn!("Failed to clean up backup file {}: {}", backup.display(), e);
- }
- } else {
- fs::rename(source, destination).with_context(|| {
- format!(
- "Failed to move {} to {}",
- source.display(),
- destination.display()
- )
- })?;
- }
- Ok(())
- }
- /// Indexes a BAM file using samtools.
- fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> {
- let mut index_cmd = SamtoolsIndex::from_config(config, bam.to_string_lossy().as_ref());
- run!(config, &mut index_cmd)?;
- let index_path = PathBuf::from(format!("{}.bai", bam.display()));
- if !index_path.exists() {
- bail!("Index file not created: {}", index_path.display());
- }
- Ok(())
- }
- /// Merges a new BAM into an existing final BAM atomically.
- ///
- /// Assumes both BAMs are coordinate-sorted. Skips re-sorting since
- /// samtools merge preserves sort order from sorted inputs.
- fn merge_into_existing_final(
- source: &Path,
- destination: &Path,
- tmp_dir: &Path,
- config: &Config,
- ) -> anyhow::Result<()> {
- info!(
- " Merging into existing final BAM: {}",
- destination.display()
- );
- index_bam(source, config)
- .with_context(|| format!("Failed to index source BAM: {}", source.display()))?;
- index_bam(destination, config)
- .with_context(|| format!("Failed to index destination BAM: {}", destination.display()))?;
- let temp_merged = tmp_dir.join(format!(".merging_{}.bam", Uuid::new_v4()));
- let input_bams = vec![source.to_path_buf(), destination.to_path_buf()];
- let mut merge_cmd = SamtoolsMergeMany::from_config(temp_merged.clone(), input_bams, config);
- run!(config, &mut merge_cmd)
- .with_context(|| "Failed to merge source into existing final BAM")?;
- // No sort needed — merge preserves coordinate order from sorted inputs
- atomic_replace(&temp_merged, destination)?;
- index_bam(destination, config)
- .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?;
- if let Err(e) = remove_bam_with_index(source) {
- log::warn!(
- "Failed to clean up merged source {}: {}",
- source.display(),
- e
- );
- }
- Ok(())
- }
- /// Creates a new final BAM from a merged per-case BAM.
- ///
- /// Source is already sorted (from sort_and_index_chunks), so only
- /// move and index are needed.
- fn create_new_final(
- source: &Path,
- destination: &Path,
- case: &IdInput,
- config: &Config,
- ) -> anyhow::Result<()> {
- info!(" Creating new final BAM: {}", destination.display());
- // Source already sorted — just move and index
- atomic_replace(source, destination)?;
- // Adding SM tag in BAM without (if not demultiplexed if so it should be the barcode name)
- // let _ = read_sm_tag_or_inject(
- // &destination.to_string_lossy(),
- // &format!("{}_{}", case.case_id, case.sample_type),
- // config,
- // )?;
- index_bam(destination, config)
- .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?;
- Ok(())
- }
- type AlignJobsCase = (Vec<DoradoAlign>, HashMap<String, Vec<PathBuf>>);
- /// Prepares alignment jobs and builds the case-to-BAM mapping.
- fn prepare_alignment_jobs(
- bam_to_case: &HashMap<PathBuf, &IdInput>,
- tmp_dir: &Path,
- config: &Config,
- ) -> anyhow::Result<AlignJobsCase> {
- let mut align_jobs = Vec::new();
- let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
- for (bam_path, case) in bam_to_case {
- let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
- let key = format!("{}_{}", case.case_id, sample_type_dir);
- let aligned_bam = tmp_dir.join(format!(
- "{}_{}_{}_{}.bam",
- Uuid::new_v4(),
- case.case_id,
- sample_type_dir,
- bam_path.file_stem().unwrap().to_string_lossy()
- ));
- align_jobs.push(DoradoAlign::from_config(
- config,
- bam_path,
- aligned_bam.clone(),
- ));
- case_bam_map.entry(key).or_default().push(aligned_bam);
- }
- Ok((align_jobs, case_bam_map))
- }
- /// Sorts and indexes all aligned chunk BAMs in parallel via Slurm.
- ///
- /// Returns an updated map with sorted BAM paths replacing the original aligned paths.
- fn sort_and_index_chunks(
- case_bam_map: &HashMap<String, Vec<PathBuf>>,
- config: &Config,
- ) -> anyhow::Result<HashMap<String, Vec<PathBuf>>> {
- let mut sort_jobs = Vec::new();
- let mut original_to_sorted: HashMap<PathBuf, PathBuf> = HashMap::new();
- // Sort job for each BAM.
- for bams in case_bam_map.values() {
- for bam in bams {
- let sorted_bam = bam.with_extension("sorted.bam");
- original_to_sorted.insert(bam.clone(), sorted_bam.clone());
- sort_jobs.push(SamtoolsSort::from_config(config, bam, &sorted_bam));
- }
- }
- info!("Submitting {} sort jobs", sort_jobs.len());
- run_many!(config, sort_jobs).context("Sort batch failed")?;
- // Verify sorted BAM exists.
- let missing: Vec<_> = original_to_sorted
- .values()
- .filter(|p| !p.exists())
- .collect();
- if !missing.is_empty() {
- bail!(
- "Sorting failed: {} file(s) not created. First missing: {}",
- missing.len(),
- missing[0].display()
- );
- }
- // Index every sorted BAM.
- let index_jobs: Vec<SamtoolsIndex> = original_to_sorted
- .values()
- .map(|sorted| SamtoolsIndex::from_config(config, sorted.to_string_lossy().as_ref()))
- .collect();
- info!("Submitting {} index jobs", index_jobs.len());
- run_many!(config, index_jobs).context("Sort batch failed")?;
- // Verify index exists for each BAM.
- let missing_indices: Vec<_> = original_to_sorted
- .values()
- .map(|p| PathBuf::from(format!("{}.bai", p.display())))
- .filter(|p| !p.exists())
- .collect();
- if !missing_indices.is_empty() {
- bail!(
- "Indexing failed: {} index file(s) not created. First missing: {}",
- missing_indices.len(),
- missing_indices[0].display()
- );
- }
- // Remove input BAMs.
- for original in original_to_sorted.keys() {
- if let Err(e) = fs::remove_file(original) {
- log::warn!(
- "Failed to remove unsorted BAM {}: {}",
- original.display(),
- e
- );
- }
- }
- // Construct the new map(case_id) -> BAMs
- let sorted_map: HashMap<String, Vec<PathBuf>> = case_bam_map
- .iter()
- .map(|(key, bams)| {
- let sorted_bams: Vec<PathBuf> = bams
- .iter()
- .filter_map(|b| original_to_sorted.get(b).cloned())
- .collect();
- (key.clone(), sorted_bams)
- })
- .collect();
- Ok(sorted_map)
- }
- /// Finalizes per-case BAM files by merging chunks and creating indexed outputs.
- fn finalize_case_bams(
- cases: &[IdInput],
- sorted_bam_map: &HashMap<String, Vec<PathBuf>>,
- tmp_dir: &Path,
- config: &Config,
- ) -> anyhow::Result<Vec<PathBuf>> {
- let mut finalized = Vec::new();
- for case in cases {
- let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
- let key = format!("{}_{}", case.case_id, sample_type_dir);
- let Some(chunk_bams) = sorted_bam_map.get(&key) else {
- info!(
- "No aligned BAMs found for case {} ({}) — skipping",
- case.case_id, sample_type_dir
- );
- continue;
- };
- if chunk_bams.is_empty() {
- continue;
- }
- let final_bam_path = build_final_bam_path(case, config)?;
- info!(
- "Finalizing case {} ({}): {} chunks → {}",
- case.case_id,
- sample_type_dir,
- chunk_bams.len(),
- final_bam_path.display()
- );
- let merged_case_bam =
- merge_chunk_bams(chunk_bams, case, &sample_type_dir, tmp_dir, config)?;
- if final_bam_path.exists() {
- merge_into_existing_final(&merged_case_bam, &final_bam_path, tmp_dir, config)?;
- } else {
- create_new_final(&merged_case_bam, &final_bam_path, case, config)?;
- }
- for chunk in chunk_bams {
- if chunk.exists() {
- if let Err(e) = remove_bam_with_index(chunk) {
- log::warn!("Failed to clean up temp chunk {}: {}", chunk.display(), e);
- }
- }
- }
- let stats = WGSBamStats::open(&case.case_id, &case.sample_type, config)?;
- ModkitSummary::initialize(&case.case_id, &case.sample_type, config)?.run()?;
- info!("✅ Output: {}\n{stats}", final_bam_path.display());
- finalized.push(final_bam_path);
- }
- Ok(finalized)
- }
- /// Normalize barcode strings for matching.
- ///
- /// Handles various formats:
- /// - "barcode01" → "1"
- /// - "NB01" → "1"
- /// - "01" → "1"
- /// - "BC01" → "1"
- fn normalize_barcode(barcode: &str) -> String {
- let lower = barcode.to_lowercase();
- // Strip known prefixes
- let stripped = lower
- .trim_start_matches("barcode")
- .trim_start_matches("nb")
- .trim_start_matches("bc")
- .trim_start_matches('_')
- .trim_start_matches('-')
- .trim();
- // Parse as number to remove leading zeros
- if let Ok(num) = stripped.parse::<u32>() {
- return num.to_string();
- }
- // Fallback: return cleaned string
- stripped.to_string()
- }
- lazy_static! {
- static ref BARCODE_PATTERNS: [regex::Regex; 3] = [
- regex::Regex::new(r"(?i)barcode(\d+)").unwrap(),
- regex::Regex::new(r"(?i)nb(\d+)").unwrap(),
- regex::Regex::new(r"(?i)bc(\d+)").unwrap(),
- ];
- }
- /// Extract barcode number from various filename patterns.
- ///
- /// Recognizes:
- /// - `*_barcode01_*`
- /// - `*_NB01_*`
- /// - `barcode01/`
- fn extract_and_normalize_barcode(text: &str) -> Option<String> {
- for pattern in BARCODE_PATTERNS.iter() {
- if let Some(caps) = pattern.captures(text) {
- if let Some(num_str) = caps.get(1) {
- if let Ok(num) = num_str.as_str().parse::<u32>() {
- return Some(num.to_string());
- }
- }
- }
- }
- None
- }
- #[derive(Debug, Clone, Default, Serialize, Deserialize)]
- pub struct PipelineMetrics {
- pub validation_duration: Duration,
- pub alignment_duration: Duration,
- pub sort_index_duration: Duration,
- pub finalize_duration: Duration,
- pub total_duration: Duration,
- pub bams_processed: usize,
- pub cases_finalized: usize,
- pub bytes_input: u64,
- pub bytes_output: u64,
- }
- impl PipelineMetrics {
- pub fn throughput_mbps(&self) -> f64 {
- if self.total_duration.as_secs_f64() == 0.0 {
- return 0.0;
- }
- (self.bytes_input as f64 / 1_000_000.0) / self.total_duration.as_secs_f64()
- }
- pub fn save(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
- let file = BufWriter::new(File::create(path.as_ref())?);
- serde_json::to_writer_pretty(file, self)?;
- Ok(())
- }
- }
- impl std::fmt::Display for PipelineMetrics {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- writeln!(f, "📊 Pipeline Metrics")?;
- writeln!(f, " Validation: {:>8.2?}", self.validation_duration)?;
- writeln!(f, " Alignment: {:>8.2?}", self.alignment_duration)?;
- writeln!(f, " Sort/Index: {:>8.2?}", self.sort_index_duration)?;
- writeln!(f, " Finalize: {:>8.2?}", self.finalize_duration)?;
- writeln!(f, " ─────────────────────")?;
- writeln!(f, " Total: {:>8.2?}", self.total_duration)?;
- writeln!(f)?;
- writeln!(f, " BAMs processed: {:>6}", self.bams_processed)?;
- writeln!(f, " Cases finalized: {:>6}", self.cases_finalized)?;
- writeln!(
- f,
- " Input: {:>8.2} GiB",
- self.bytes_input as f64 / 1024.0_f64.powi(3)
- )?;
- writeln!(
- f,
- " Output: {:>8.2} GiB",
- self.bytes_output as f64 / 1024.0_f64.powi(3)
- )?;
- writeln!(f, " Throughput: {:>8.2} MB/s", self.throughput_mbps())
- }
- }
- struct Timer(Instant);
- impl Timer {
- fn start() -> Self {
- Self(Instant::now())
- }
- fn elapsed(&self) -> Duration {
- self.0.elapsed()
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use log::info;
- use crate::helpers::test_init;
- #[test]
- fn prom_run_process() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- let dir = "/mnt/beegfs02/scratch/t_steimle/data/prom/20251121_001_01_CD/03/20251121_1531_P2I-00461-B_PBI56020_efa567ea";
- let prom_run = PromRun::from_dir(dir)?;
- let prom = PromRun::open(&prom_run.protocol_run_id, &config)?;
- info!("{prom}");
- prom.process_bams(&config)?;
- Ok(())
- }
- }
|