|
@@ -1,7 +1,54 @@
|
|
|
//! # PromRun Module
|
|
//! # PromRun Module
|
|
|
//!
|
|
//!
|
|
|
-//! Handles the import, parsing, and processing of Oxford Nanopore (ONT) PromethION sequencing runs.
|
|
|
|
|
-//! This module orchestrates the workflow from raw data ingestion (BAM/POD5) to alignment execution.
|
|
|
|
|
|
|
+//! Import, validation, and processing pipeline for Oxford Nanopore PromethION sequencing runs.
|
|
|
|
|
+//!
|
|
|
|
|
+//! This module provides the [`PromRun`] struct which orchestrates the complete workflow from
|
|
|
|
|
+//! raw MinKNOW output ingestion through alignment, sorting, and per-case BAM finalization.
|
|
|
|
|
+//!
|
|
|
|
|
+//! ## Supported Kit Types
|
|
|
|
|
+//!
|
|
|
|
|
+//! - **Multiplexed** (`SQK-NBD114-24`, `SQK-NBD112`, `RBK`): BAMs demultiplexed by barcode
|
|
|
|
|
+//! - **Non-multiplexed** (`SQK-LSK114`, `SQK-LSK109`): Single-sample runs
|
|
|
|
|
+//!
|
|
|
|
|
+//! ## Pipeline Stages
|
|
|
|
|
+//!
|
|
|
|
|
+//! 1. **Import** — Recursive directory scan for BAM, POD5, sample sheets, pore activity logs
|
|
|
|
|
+//! 2. **Validation** — Verify BAMs are unaligned (no reference sequences in header)
|
|
|
|
|
+//! 3. **Case mapping** — Associate BAMs with cases via barcode extraction or single-case assignment
|
|
|
|
|
+//! 4. **Alignment** — Parallel Dorado alignment against reference genome
|
|
|
|
|
+//! 5. **Sort/Index** — Coordinate sort and index via samtools (Slurm batch)
|
|
|
|
|
+//! 6. **Finalize** — Merge chunks per case, atomic replacement of existing outputs
|
|
|
|
|
+//!
|
|
|
|
|
+//! ## Directory Structure
|
|
|
|
|
+//!
|
|
|
|
|
+//! Files are discovered recursively; typical MinKNOW layouts:
|
|
|
|
|
+//!
|
|
|
|
|
+//! ```text
|
|
|
|
|
+//! run_dir/
|
|
|
|
|
+//! ├── sample_sheet_*.csv # Required
|
|
|
|
|
+//! ├── pore_activity_*.csv # Optional
|
|
|
|
|
+//! ├── pod5_pass/barcode*/*.pod5 # Multiplexed
|
|
|
|
|
+//! ├── bam_pass/barcode*/*.bam # Multiplexed
|
|
|
|
|
+//! ├── pod5/*.pod5 # Non-multiplexed
|
|
|
|
|
+//! └── bam/*.bam # Non-multiplexed
|
|
|
|
|
+//! ```
|
|
|
|
|
+//!
|
|
|
|
|
+//! ## Example
|
|
|
|
|
+//!
|
|
|
|
|
+//! ```rust,ignore
|
|
|
|
|
+//! use crate::config::Config;
|
|
|
|
|
+//! use crate::collection::promrun::PromRun;
|
|
|
|
|
+//!
|
|
|
|
|
+//! let config = Config::default();
|
|
|
|
|
+//!
|
|
|
|
|
+//! // Import and process
|
|
|
|
|
+//! let mut run = PromRun::from_dir("/data/prom/20240101_run", &config)?;
|
|
|
|
|
+//! run.cases.push(IdInput { case_id: "CASE001".into(), sample_type: "TUM".into(), barcode: "barcode01".into() });
|
|
|
|
|
+//! run.process_bams(&config)?;
|
|
|
|
|
+//!
|
|
|
|
|
+//! // Or load cached
|
|
|
|
|
+//! let run = PromRun::open("protocol_run_id", &config)?;
|
|
|
|
|
+//! ```
|
|
|
|
|
|
|
|
use std::{
|
|
use std::{
|
|
|
collections::{BTreeMap, HashMap},
|
|
collections::{BTreeMap, HashMap},
|
|
@@ -9,6 +56,7 @@ use std::{
|
|
|
fs::{self, File},
|
|
fs::{self, File},
|
|
|
io::{BufReader, BufWriter},
|
|
io::{BufReader, BufWriter},
|
|
|
path::{Path, PathBuf},
|
|
path::{Path, PathBuf},
|
|
|
|
|
+ time::{Duration, Instant},
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
use anyhow::{bail, Context};
|
|
use anyhow::{bail, Context};
|
|
@@ -24,13 +72,19 @@ use uuid::Uuid;
|
|
|
|
|
|
|
|
use crate::{
|
|
use crate::{
|
|
|
collection::{
|
|
collection::{
|
|
|
- bam_stats::WGSBamStats, flowcells::IdInput, minknow::{MinKnowSampleSheet, PoreStateEntry, parse_pore_activity_from_reader}, pod5::Pod5
|
|
|
|
|
|
|
+ bam_stats::WGSBamStats,
|
|
|
|
|
+ flowcells::IdInput,
|
|
|
|
|
+ minknow::{parse_pore_activity_from_reader, MinKnowSampleSheet, PoreStateEntry},
|
|
|
|
|
+ pod5::Pod5,
|
|
|
},
|
|
},
|
|
|
commands::{
|
|
commands::{
|
|
|
- SlurmRunner, dorado::DoradoAlign, run_many_sbatch, samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsMergeMany, SamtoolsSort}
|
|
|
|
|
|
|
+ dorado::DoradoAlign,
|
|
|
|
|
+ run_many_sbatch,
|
|
|
|
|
+ samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort},
|
|
|
|
|
+ SlurmRunner,
|
|
|
},
|
|
},
|
|
|
config::Config,
|
|
config::Config,
|
|
|
- helpers::{TempFileGuard, get_genome_sizes, list_files_recursive, remove_bam_with_index},
|
|
|
|
|
|
|
+ helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
|
|
|
run, run_many,
|
|
run, run_many,
|
|
|
};
|
|
};
|
|
|
|
|
|
|
@@ -422,6 +476,9 @@ impl PromRun {
|
|
|
/// - BAM files are already aligned
|
|
/// - BAM files are already aligned
|
|
|
/// - External commands fail (Dorado, samtools, Slurm)
|
|
/// - External commands fail (Dorado, samtools, Slurm)
|
|
|
pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
|
|
pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
|
|
|
|
|
+ let pipeline_start = Timer::start();
|
|
|
|
|
+ let mut metrics = PipelineMetrics::default();
|
|
|
|
|
+
|
|
|
let tmp_dir = PathBuf::from(&config.tmp_dir);
|
|
let tmp_dir = PathBuf::from(&config.tmp_dir);
|
|
|
let mut temp_guard = TempFileGuard::new();
|
|
let mut temp_guard = TempFileGuard::new();
|
|
|
|
|
|
|
@@ -460,6 +517,7 @@ impl PromRun {
|
|
|
// Step 2: Filter and validate BAMs
|
|
// Step 2: Filter and validate BAMs
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
|
|
|
|
|
|
|
|
+ let validation_start = Timer::start();
|
|
|
let pass_bams = self.filter_pass_bams();
|
|
let pass_bams = self.filter_pass_bams();
|
|
|
|
|
|
|
|
if pass_bams.is_empty() {
|
|
if pass_bams.is_empty() {
|
|
@@ -475,6 +533,11 @@ impl PromRun {
|
|
|
|
|
|
|
|
info!("Step 1/6: Validating BAMs are unaligned");
|
|
info!("Step 1/6: Validating BAMs are unaligned");
|
|
|
self.validate_bams_unaligned(&pass_bams, &pool)?;
|
|
self.validate_bams_unaligned(&pass_bams, &pool)?;
|
|
|
|
|
+
|
|
|
|
|
+ metrics.validation_duration = validation_start.elapsed();
|
|
|
|
|
+ metrics.bams_processed = pass_bams.len();
|
|
|
|
|
+ metrics.bytes_input = pass_bams.iter().map(|b| b.bam_size).sum();
|
|
|
|
|
+
|
|
|
info!("✅ All {} BAM files are unaligned", pass_bams.len());
|
|
info!("✅ All {} BAM files are unaligned", pass_bams.len());
|
|
|
|
|
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
@@ -503,6 +566,7 @@ impl PromRun {
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
|
|
|
|
|
|
info!("Step 3/6: Preparing alignment jobs");
|
|
info!("Step 3/6: Preparing alignment jobs");
|
|
|
|
|
+ let align_start = Timer::start();
|
|
|
let (align_jobs, case_bam_map) = prepare_alignment_jobs(&bam_to_case, &tmp_dir, config)?;
|
|
let (align_jobs, case_bam_map) = prepare_alignment_jobs(&bam_to_case, &tmp_dir, config)?;
|
|
|
|
|
|
|
|
// Track temp files for cleanup
|
|
// Track temp files for cleanup
|
|
@@ -511,9 +575,12 @@ impl PromRun {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
info!("Step 4/6: Running {} alignment jobs", align_jobs.len());
|
|
info!("Step 4/6: Running {} alignment jobs", align_jobs.len());
|
|
|
- run_many_sbatch(align_jobs).context("Alignment batch failed")?;
|
|
|
|
|
|
|
+ run_many!(config, align_jobs).context("Alignment batch failed")?;
|
|
|
|
|
|
|
|
self.verify_outputs_exist(&case_bam_map, "alignment")?;
|
|
self.verify_outputs_exist(&case_bam_map, "alignment")?;
|
|
|
|
|
+
|
|
|
|
|
+ metrics.alignment_duration = align_start.elapsed();
|
|
|
|
|
+
|
|
|
info!("✅ Alignments completed");
|
|
info!("✅ Alignments completed");
|
|
|
|
|
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
@@ -521,7 +588,17 @@ impl PromRun {
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
|
|
|
|
|
|
info!("Step 5/6: Sorting and indexing chunks");
|
|
info!("Step 5/6: Sorting and indexing chunks");
|
|
|
|
|
+ let sort_start = Timer::start();
|
|
|
let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?;
|
|
let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?;
|
|
|
|
|
+
|
|
|
|
|
+ // Update guard to track sorted files instead
|
|
|
|
|
+ temp_guard.clear();
|
|
|
|
|
+ for bams in sorted_bam_map.values() {
|
|
|
|
|
+ temp_guard.track_many(bams.clone());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ metrics.sort_index_duration = sort_start.elapsed();
|
|
|
|
|
+
|
|
|
info!("✅ Sorted {} chunk BAMs", sorted_bam_map.len());
|
|
info!("✅ Sorted {} chunk BAMs", sorted_bam_map.len());
|
|
|
|
|
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
@@ -529,11 +606,25 @@ impl PromRun {
|
|
|
// =====================================================================
|
|
// =====================================================================
|
|
|
|
|
|
|
|
info!("Step 6/6: Finalizing per-case BAMs");
|
|
info!("Step 6/6: Finalizing per-case BAMs");
|
|
|
- finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?;
|
|
|
|
|
|
|
+ let finalize_start = Timer::start();
|
|
|
|
|
+
|
|
|
|
|
+ let finalized = finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?;
|
|
|
|
|
+
|
|
|
|
|
+ metrics.finalize_duration = finalize_start.elapsed();
|
|
|
|
|
+ metrics.cases_finalized = finalized.len();
|
|
|
|
|
+ metrics.bytes_output = finalized
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .filter_map(|p| fs::metadata(p).ok())
|
|
|
|
|
+ .map(|m| m.len())
|
|
|
|
|
+ .sum();
|
|
|
|
|
|
|
|
// Success — disarm cleanup
|
|
// Success — disarm cleanup
|
|
|
temp_guard.disarm();
|
|
temp_guard.disarm();
|
|
|
|
|
|
|
|
|
|
+ metrics.total_duration = pipeline_start.elapsed();
|
|
|
|
|
+ info!("{metrics}");
|
|
|
|
|
+
|
|
|
|
|
+ // TODO: save metrics
|
|
|
info!(
|
|
info!(
|
|
|
"🎉 Pipeline completed for run: {} (flowcell: {})",
|
|
"🎉 Pipeline completed for run: {} (flowcell: {})",
|
|
|
self.protocol_run_id, self.flow_cell_id
|
|
self.protocol_run_id, self.flow_cell_id
|
|
@@ -565,602 +656,6 @@ impl PromRun {
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- pub fn process_bams_old(&self, config: &Config) -> anyhow::Result<()> {
|
|
|
|
|
- let tmp_dir = PathBuf::from(&config.tmp_dir);
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 1: Validate cases and determine kit type
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- if self.cases.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Run {} (flowcell: {}) has no associated cases. Please add cases before processing.",
|
|
|
|
|
- self.protocol_run_id,
|
|
|
|
|
- self.flow_cell_id
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Processing PromRun: {} (flowcell: {}, kit: {}, {} BAM files, {} cases)",
|
|
|
|
|
- self.protocol_run_id,
|
|
|
|
|
- self.flow_cell_id,
|
|
|
|
|
- self.kit,
|
|
|
|
|
- self.bams.len(),
|
|
|
|
|
- self.cases.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // Filter BAMs to only include those from bam_pass directories (exclude bam_fail)
|
|
|
|
|
- let pass_bams: Vec<&PromBam> = self
|
|
|
|
|
- .bams
|
|
|
|
|
- .iter()
|
|
|
|
|
- .filter(|bam| {
|
|
|
|
|
- let path_str = bam.path.to_string_lossy();
|
|
|
|
|
- let is_pass = path_str.contains("bam_pass");
|
|
|
|
|
- let is_fail = path_str.contains("bam_fail");
|
|
|
|
|
-
|
|
|
|
|
- if is_fail {
|
|
|
|
|
- info!(" Skipping failed read BAM: {}", bam.path.display());
|
|
|
|
|
- false
|
|
|
|
|
- } else if is_pass {
|
|
|
|
|
- true
|
|
|
|
|
- } else {
|
|
|
|
|
- // If neither pass nor fail, be conservative and include it
|
|
|
|
|
- warn!(
|
|
|
|
|
- " BAM path ambiguous (not in bam_pass or bam_fail), including: {}",
|
|
|
|
|
- bam.path.display()
|
|
|
|
|
- );
|
|
|
|
|
- true
|
|
|
|
|
- }
|
|
|
|
|
- })
|
|
|
|
|
- // .take(2)
|
|
|
|
|
- .collect();
|
|
|
|
|
-
|
|
|
|
|
- if pass_bams.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "No BAM files found in bam_pass directories for run {}",
|
|
|
|
|
- self.protocol_run_id
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Filtered to {} BAM files from bam_pass (excluded {} from bam_fail)",
|
|
|
|
|
- pass_bams.len(),
|
|
|
|
|
- self.bams.len() - pass_bams.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // Determine if multiplexed based on kit name
|
|
|
|
|
- let is_multiplexed = self.kit.to_uppercase().contains("NBD114");
|
|
|
|
|
- let is_non_multiplexed = self.kit.to_uppercase().contains("LSK114");
|
|
|
|
|
-
|
|
|
|
|
- if !is_multiplexed && !is_non_multiplexed {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "Kit '{}' is not recognized as SQK-NBD114-24 or SQK-LSK114. Attempting to detect from directory structure.",
|
|
|
|
|
- self.kit
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Validate case count for non-multiplexed runs
|
|
|
|
|
- if is_non_multiplexed && self.cases.len() != 1 {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Non-multiplexed run (kit: {}) must have exactly 1 case, found {}",
|
|
|
|
|
- self.kit,
|
|
|
|
|
- self.cases.len()
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Log cases
|
|
|
|
|
- for case in &self.cases {
|
|
|
|
|
- info!(
|
|
|
|
|
- " Case: {} ({}) - barcode: {}",
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- case.sample_type,
|
|
|
|
|
- if case.barcode.is_empty() {
|
|
|
|
|
- "none"
|
|
|
|
|
- } else {
|
|
|
|
|
- &case.barcode
|
|
|
|
|
- }
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 2: Validate BAMs are unaligned
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!("Step 1/5: Validating BAM files are unaligned");
|
|
|
|
|
-
|
|
|
|
|
- // Build thread pool for parallel validation
|
|
|
|
|
- let pool = ThreadPoolBuilder::new()
|
|
|
|
|
- .num_threads(config.threads.into())
|
|
|
|
|
- .build()
|
|
|
|
|
- .context("Failed to build Rayon thread pool")?;
|
|
|
|
|
-
|
|
|
|
|
- // Check in parallel if any BAM appears to be aligned using safe header parsing
|
|
|
|
|
- let aligned_bams: Vec<_> = pool.install(|| {
|
|
|
|
|
- pass_bams
|
|
|
|
|
- .par_iter()
|
|
|
|
|
- .filter_map(|bam| {
|
|
|
|
|
- // Check if BAM has alignment info by looking for reference sequences in header
|
|
|
|
|
- match bam::Reader::from_path(&bam.path) {
|
|
|
|
|
- Ok(reader) => {
|
|
|
|
|
- let header = bam::Header::from_template(reader.header());
|
|
|
|
|
- // Use safe header parsing to check for reference sequences
|
|
|
|
|
- match get_genome_sizes(&header) {
|
|
|
|
|
- Ok(sizes) => {
|
|
|
|
|
- if !sizes.is_empty() {
|
|
|
|
|
- Some(bam.path.clone())
|
|
|
|
|
- } else {
|
|
|
|
|
- None
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- Err(_) => {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "Failed to parse header for {}, assuming unaligned",
|
|
|
|
|
- bam.path.display()
|
|
|
|
|
- );
|
|
|
|
|
- None
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- Err(e) => {
|
|
|
|
|
- warn!("Failed to open BAM {}: {}", bam.path.display(), e);
|
|
|
|
|
- None
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- })
|
|
|
|
|
- .collect()
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- if !aligned_bams.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Found {} BAM file(s) that appear to be already aligned. This method only processes unaligned BAMs. First aligned BAM: {}",
|
|
|
|
|
- aligned_bams.len(),
|
|
|
|
|
- aligned_bams[0].display()
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("✅ All {} BAM files are unaligned", pass_bams.len());
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 3: Map BAMs to cases
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!("Step 2/5: Mapping BAM files to cases");
|
|
|
|
|
-
|
|
|
|
|
- let mut bam_to_case_map: HashMap<PathBuf, &IdInput> = HashMap::new();
|
|
|
|
|
- let mut unmatched_bams = Vec::new();
|
|
|
|
|
-
|
|
|
|
|
- if is_non_multiplexed {
|
|
|
|
|
- // Non-multiplexed: assign all BAMs to the single case
|
|
|
|
|
- let single_case = &self.cases[0];
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Non-multiplexed run: assigning all {} BAMs to case {}",
|
|
|
|
|
- pass_bams.len(),
|
|
|
|
|
- single_case.case_id
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- for bam in &pass_bams {
|
|
|
|
|
- bam_to_case_map.insert(bam.path.clone(), single_case);
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- // Multiplexed: map by barcode
|
|
|
|
|
- info!("Multiplexed run: mapping BAMs by barcode");
|
|
|
|
|
-
|
|
|
|
|
- // Create normalized barcode → case mapping
|
|
|
|
|
- let mut barcode_to_case: HashMap<String, &IdInput> = HashMap::new();
|
|
|
|
|
-
|
|
|
|
|
- for case in &self.cases {
|
|
|
|
|
- if case.barcode.is_empty() {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "Case {} has no barcode in multiplexed run - will not be matched",
|
|
|
|
|
- case.case_id
|
|
|
|
|
- );
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let normalized = normalize_barcode(&case.barcode);
|
|
|
|
|
- barcode_to_case.insert(normalized.clone(), case);
|
|
|
|
|
- info!(
|
|
|
|
|
- " Barcode mapping: '{}' → '{}' (case: {})",
|
|
|
|
|
- case.barcode, normalized, case.case_id
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Match BAMs to cases by barcode
|
|
|
|
|
- for bam in &pass_bams {
|
|
|
|
|
- let bam_file_name =
|
|
|
|
|
- bam.path
|
|
|
|
|
- .file_name()
|
|
|
|
|
- .and_then(|n| n.to_str())
|
|
|
|
|
- .ok_or_else(|| {
|
|
|
|
|
- anyhow::anyhow!("Invalid BAM filename: {}", bam.path.display())
|
|
|
|
|
- })?;
|
|
|
|
|
-
|
|
|
|
|
- // Try to extract barcode from filename or parent directory
|
|
|
|
|
- let barcode_opt = extract_and_normalize_barcode(bam_file_name).or_else(|| {
|
|
|
|
|
- bam.path
|
|
|
|
|
- .parent()
|
|
|
|
|
- .and_then(|p| p.file_name())
|
|
|
|
|
- .and_then(|n| n.to_str())
|
|
|
|
|
- .and_then(extract_and_normalize_barcode)
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- if let Some(barcode_str) = barcode_opt {
|
|
|
|
|
- if let Some(case) = barcode_to_case.get(&barcode_str) {
|
|
|
|
|
- bam_to_case_map.insert(bam.path.clone(), case);
|
|
|
|
|
- info!(
|
|
|
|
|
- " Matched: {} → barcode {} → case {}",
|
|
|
|
|
- bam.path.file_name().unwrap().to_str().unwrap(),
|
|
|
|
|
- barcode_str,
|
|
|
|
|
- case.case_id
|
|
|
|
|
- );
|
|
|
|
|
- } else {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "Barcode {} found in {} but no matching case",
|
|
|
|
|
- barcode_str,
|
|
|
|
|
- bam.path.display()
|
|
|
|
|
- );
|
|
|
|
|
- unmatched_bams.push(&bam.path);
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- warn!("No barcode extracted from: {}", bam.path.display());
|
|
|
|
|
- unmatched_bams.push(&bam.path);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if bam_to_case_map.is_empty() {
|
|
|
|
|
- bail!("No BAMs were successfully mapped to cases");
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "✅ Mapped {} BAMs to cases ({} unmatched)",
|
|
|
|
|
- bam_to_case_map.len(),
|
|
|
|
|
- unmatched_bams.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 4: Prepare alignment jobs (in parallel)
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Step 3/5: Preparing alignment jobs for {} BAM files",
|
|
|
|
|
- bam_to_case_map.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- let mut align_jobs = Vec::new();
|
|
|
|
|
- let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
|
|
|
|
|
-
|
|
|
|
|
- for (bam_path, case) in bam_to_case_map {
|
|
|
|
|
- let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
|
|
|
|
|
-
|
|
|
|
|
- // Create output directory structure
|
|
|
|
|
- let case_output_dir = PathBuf::from(&config.result_dir)
|
|
|
|
|
- .join(&case.case_id)
|
|
|
|
|
- .join(&sample_type_dir);
|
|
|
|
|
-
|
|
|
|
|
- fs::create_dir_all(&case_output_dir).context(format!(
|
|
|
|
|
- "Failed to create directory: {}",
|
|
|
|
|
- case_output_dir.display()
|
|
|
|
|
- ))?;
|
|
|
|
|
-
|
|
|
|
|
- // Temporary aligned BAM path
|
|
|
|
|
- let aligned_bam = tmp_dir.join(format!(
|
|
|
|
|
- "{}_{}_{}_{}.bam",
|
|
|
|
|
- Uuid::new_v4(),
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- sample_type_dir,
|
|
|
|
|
- bam_path.file_stem().unwrap().to_str().unwrap()
|
|
|
|
|
- ));
|
|
|
|
|
-
|
|
|
|
|
- let job = DoradoAlign::from_config(config, &bam_path, aligned_bam.clone());
|
|
|
|
|
- align_jobs.push(job);
|
|
|
|
|
-
|
|
|
|
|
- // Track for later merging
|
|
|
|
|
- case_bam_map
|
|
|
|
|
- .entry(format!("{}_{}", case.case_id, sample_type_dir))
|
|
|
|
|
- .or_default()
|
|
|
|
|
- .push(aligned_bam);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("✅ Prepared {} alignment jobs", align_jobs.len());
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 5: Run alignment in parallel
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Step 4/5: Running alignment of {} BAM files in parallel",
|
|
|
|
|
- align_jobs.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- run_many!(config, align_jobs).context(format!(
|
|
|
|
|
- "Failed to run alignment batch for run {}",
|
|
|
|
|
- self.protocol_run_id
|
|
|
|
|
- ))?;
|
|
|
|
|
-
|
|
|
|
|
- // run_many_sbatch(align_jobs).context(format!(
|
|
|
|
|
- // "Failed to run alignment batch for run {}",
|
|
|
|
|
- // self.protocol_run_id
|
|
|
|
|
- // ))?;
|
|
|
|
|
-
|
|
|
|
|
- // Verify all aligned BAMs were created
|
|
|
|
|
- let mut missing_aligned = Vec::new();
|
|
|
|
|
- for bams in case_bam_map.values() {
|
|
|
|
|
- for bam in bams {
|
|
|
|
|
- if !bam.exists() {
|
|
|
|
|
- missing_aligned.push(bam.clone());
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if !missing_aligned.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Alignment failed: {} output BAM file(s) were not created. First missing: {}",
|
|
|
|
|
- missing_aligned.len(),
|
|
|
|
|
- missing_aligned[0].display()
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("✅ Alignments completed");
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 6: Sort aligned chunks using Slurm batch jobs
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!("Step 5/5: Sorting and indexing aligned chunks using Slurm");
|
|
|
|
|
-
|
|
|
|
|
- // Flatten all aligned BAMs from case_bam_map and prepare sort jobs
|
|
|
|
|
- let mut sort_jobs = Vec::new();
|
|
|
|
|
- let mut bam_to_sorted: HashMap<PathBuf, PathBuf> = HashMap::new();
|
|
|
|
|
-
|
|
|
|
|
- for (_key, bams) in case_bam_map.iter() {
|
|
|
|
|
- for bam in bams {
|
|
|
|
|
- let sorted_bam = bam.with_extension("sorted.bam");
|
|
|
|
|
- bam_to_sorted.insert(bam.clone(), sorted_bam.clone());
|
|
|
|
|
-
|
|
|
|
|
- let sort_cmd = SamtoolsSort::from_config(config, bam, &sorted_bam);
|
|
|
|
|
- sort_jobs.push(sort_cmd);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("Submitting {} sort jobs in parallel", sort_jobs.len());
|
|
|
|
|
-
|
|
|
|
|
- run_many_sbatch(sort_jobs).context(format!(
|
|
|
|
|
- "Failed to run sorting batch for run {}",
|
|
|
|
|
- self.protocol_run_id
|
|
|
|
|
- ))?;
|
|
|
|
|
-
|
|
|
|
|
- // Verify all sorted BAMs were created
|
|
|
|
|
- let mut missing_sorted = Vec::new();
|
|
|
|
|
- for sorted_bam in bam_to_sorted.values() {
|
|
|
|
|
- if !sorted_bam.exists() {
|
|
|
|
|
- missing_sorted.push(sorted_bam.clone());
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if !missing_sorted.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Sorting failed: {} sorted BAM file(s) were not created. First missing: {}",
|
|
|
|
|
- missing_sorted.len(),
|
|
|
|
|
- missing_sorted[0].display()
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "✅ Sorting completed for {} chunk BAMs",
|
|
|
|
|
- bam_to_sorted.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // Index sorted chunks in parallel using Slurm
|
|
|
|
|
- let mut index_jobs = Vec::new();
|
|
|
|
|
-
|
|
|
|
|
- for sorted_bam in bam_to_sorted.values() {
|
|
|
|
|
- let index_cmd = SamtoolsIndex {
|
|
|
|
|
- bin: config.align.samtools_bin.clone(),
|
|
|
|
|
- threads: config.align.samtools_view_threads,
|
|
|
|
|
- bam: sorted_bam.to_string_lossy().to_string(),
|
|
|
|
|
- };
|
|
|
|
|
- index_jobs.push(index_cmd);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("Submitting {} index jobs in parallel", index_jobs.len());
|
|
|
|
|
-
|
|
|
|
|
- run_many_sbatch(index_jobs).context(format!(
|
|
|
|
|
- "Failed to run indexing batch for run {}",
|
|
|
|
|
- self.protocol_run_id
|
|
|
|
|
- ))?;
|
|
|
|
|
-
|
|
|
|
|
- // Verify all index files (.bai) were created
|
|
|
|
|
- let mut missing_indices = Vec::new();
|
|
|
|
|
- for sorted_bam in bam_to_sorted.values() {
|
|
|
|
|
- let index_path = PathBuf::from(format!("{}.bai", sorted_bam.display()));
|
|
|
|
|
- if !index_path.exists() {
|
|
|
|
|
- missing_indices.push(index_path);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if !missing_indices.is_empty() {
|
|
|
|
|
- bail!(
|
|
|
|
|
- "Indexing failed: {} index file(s) were not created. First missing: {}",
|
|
|
|
|
- missing_indices.len(),
|
|
|
|
|
- missing_indices[0].display()
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- info!("✅ Indexed {} chunk BAMs", bam_to_sorted.len());
|
|
|
|
|
-
|
|
|
|
|
- // Remove unsorted BAMs and update case_bam_map with sorted paths
|
|
|
|
|
- for (original, _sorted) in bam_to_sorted.iter() {
|
|
|
|
|
- fs::remove_file(original).with_context(|| {
|
|
|
|
|
- format!("Failed to remove unsorted BAM: {}", original.display())
|
|
|
|
|
- })?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Update case_bam_map with sorted paths
|
|
|
|
|
- for bams in case_bam_map.values_mut() {
|
|
|
|
|
- for bam in bams.iter_mut() {
|
|
|
|
|
- if let Some(sorted) = bam_to_sorted.get(bam) {
|
|
|
|
|
- *bam = sorted.clone();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Step 7: Merge and finalize per case
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!("Step 6/5: Merging, sorting, and indexing final BAM files per case");
|
|
|
|
|
-
|
|
|
|
|
- for case in &self.cases {
|
|
|
|
|
- let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
|
|
|
|
|
- let key = format!("{}_{}", case.case_id, sample_type_dir);
|
|
|
|
|
-
|
|
|
|
|
- let Some(aligned_bams) = case_bam_map.get(&key) else {
|
|
|
|
|
- info!("No aligned BAMs found for case {} - skipping", case.case_id);
|
|
|
|
|
- continue;
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- if aligned_bams.is_empty() {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let final_bam_path = PathBuf::from(&config.result_dir)
|
|
|
|
|
- .join(&case.case_id)
|
|
|
|
|
- .join(&sample_type_dir)
|
|
|
|
|
- .join(format!(
|
|
|
|
|
- "{}_{}_{}.bam",
|
|
|
|
|
- case.case_id, sample_type_dir, config.reference_name
|
|
|
|
|
- ));
|
|
|
|
|
-
|
|
|
|
|
- fs::create_dir_all(final_bam_path.parent().unwrap())?;
|
|
|
|
|
-
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
- // Merge all chunk BAMs for this case into a single per-case BAM
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
- let case_merged_bam: PathBuf = if aligned_bams.len() == 1 {
|
|
|
|
|
- // Only one chunk: use it directly
|
|
|
|
|
- aligned_bams[0].clone()
|
|
|
|
|
- } else {
|
|
|
|
|
- let tmp_merged = tmp_dir.join(format!(
|
|
|
|
|
- "{}_{}_{}_merged.bam",
|
|
|
|
|
- Uuid::new_v4(),
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- sample_type_dir
|
|
|
|
|
- ));
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- " Merging {} chunk BAMs for case {} → {}",
|
|
|
|
|
- aligned_bams.len(),
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- tmp_merged.display()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- let mut merge_many_cmd = SamtoolsMergeMany::from_config(
|
|
|
|
|
- tmp_merged.clone(),
|
|
|
|
|
- aligned_bams.clone(),
|
|
|
|
|
- config,
|
|
|
|
|
- );
|
|
|
|
|
- SlurmRunner::run(&mut merge_many_cmd)?;
|
|
|
|
|
-
|
|
|
|
|
- // Remove chunk BAMs and their indices
|
|
|
|
|
- for bam in aligned_bams {
|
|
|
|
|
- if bam != &tmp_merged {
|
|
|
|
|
- remove_bam_with_index(bam).ok();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- tmp_merged
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
- // Merge the per-case BAM into the final BAM (if it already exists)
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
- if final_bam_path.exists() {
|
|
|
|
|
- info!(
|
|
|
|
|
- " Final BAM already exists for case {}, merging into existing: {}",
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- final_bam_path.display()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // Index both source and destination BAMs
|
|
|
|
|
- let mut index_cmd = SamtoolsIndex {
|
|
|
|
|
- bin: config.align.samtools_bin.clone(),
|
|
|
|
|
- threads: config.align.samtools_view_threads,
|
|
|
|
|
- bam: case_merged_bam.to_string_lossy().to_string(),
|
|
|
|
|
- };
|
|
|
|
|
- SlurmRunner::run(&mut index_cmd)?;
|
|
|
|
|
-
|
|
|
|
|
- let mut index_cmd = SamtoolsIndex {
|
|
|
|
|
- bin: config.align.samtools_bin.clone(),
|
|
|
|
|
- threads: config.align.samtools_view_threads,
|
|
|
|
|
- bam: final_bam_path.to_string_lossy().to_string(),
|
|
|
|
|
- };
|
|
|
|
|
- SlurmRunner::run(&mut index_cmd)?;
|
|
|
|
|
-
|
|
|
|
|
- // Merge into existing final BAM
|
|
|
|
|
- let mut merge_cmd =
|
|
|
|
|
- SamtoolsMerge::from_config(config, &case_merged_bam, &final_bam_path);
|
|
|
|
|
- SlurmRunner::run(&mut merge_cmd)?;
|
|
|
|
|
- } else {
|
|
|
|
|
- info!(
|
|
|
|
|
- " Creating new final BAM for case {} → {}",
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- final_bam_path.display()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- // Merged per-case BAM becomes the base final BAM
|
|
|
|
|
- fs::rename(&case_merged_bam, &final_bam_path)?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
- // Sort and index the final BAM
|
|
|
|
|
- // -----------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
- let sorted_tmp = final_bam_path.with_extension("sorted.bam");
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- " Sorting final BAM for case {} → {}",
|
|
|
|
|
- case.case_id,
|
|
|
|
|
- final_bam_path.display()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- let mut sort_cmd = SamtoolsSort::from_config(config, &final_bam_path, &sorted_tmp);
|
|
|
|
|
- SlurmRunner::run(&mut sort_cmd)?;
|
|
|
|
|
-
|
|
|
|
|
- // Replace unsorted BAM with sorted BAM
|
|
|
|
|
- fs::rename(&sorted_tmp, &final_bam_path)?;
|
|
|
|
|
-
|
|
|
|
|
- // Index the sorted final BAM
|
|
|
|
|
- let mut index_cmd = SamtoolsIndex {
|
|
|
|
|
- bin: config.align.samtools_bin.clone(),
|
|
|
|
|
- threads: config.align.samtools_view_threads,
|
|
|
|
|
- bam: final_bam_path.to_string_lossy().to_string(),
|
|
|
|
|
- };
|
|
|
|
|
- SlurmRunner::run(&mut index_cmd)?;
|
|
|
|
|
-
|
|
|
|
|
- info!(" ✅ Output: {}", final_bam_path.display());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
- // Final summary
|
|
|
|
|
- // =====================================================================
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "🎉 Pipeline completed for run: {} (flowcell: {})",
|
|
|
|
|
- self.protocol_run_id, self.flow_cell_id
|
|
|
|
|
- );
|
|
|
|
|
- info!(
|
|
|
|
|
- " Processed {} cases with {} final BAM files",
|
|
|
|
|
- self.cases.len(),
|
|
|
|
|
- case_bam_map.len()
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
/// Serializes this run to a JSON file.
|
|
/// Serializes this run to a JSON file.
|
|
|
///
|
|
///
|
|
|
/// # Arguments
|
|
/// # Arguments
|
|
@@ -1637,10 +1132,7 @@ impl fmt::Display for PromBam {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Builds the final BAM path for a case.
|
|
/// Builds the final BAM path for a case.
|
|
|
-fn build_final_bam_path(
|
|
|
|
|
- case: &IdInput,
|
|
|
|
|
- config: &Config,
|
|
|
|
|
-) -> anyhow::Result<PathBuf> {
|
|
|
|
|
|
|
+fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result<PathBuf> {
|
|
|
let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
|
|
let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
|
|
|
|
|
|
|
|
if let Some(parent) = final_bam_path.parent() {
|
|
if let Some(parent) = final_bam_path.parent() {
|
|
@@ -1709,25 +1201,37 @@ fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
|
|
|
let backup = destination.with_extension("bam.backup");
|
|
let backup = destination.with_extension("bam.backup");
|
|
|
|
|
|
|
|
fs::rename(destination, &backup).with_context(|| {
|
|
fs::rename(destination, &backup).with_context(|| {
|
|
|
- format!("Failed to backup existing file: {}", destination.display())
|
|
|
|
|
|
|
+ format!(
|
|
|
|
|
+ "Failed to backup {} to {}",
|
|
|
|
|
+ destination.display(),
|
|
|
|
|
+ backup.display()
|
|
|
|
|
+ )
|
|
|
})?;
|
|
})?;
|
|
|
|
|
|
|
|
if let Err(e) = fs::rename(source, destination) {
|
|
if let Err(e) = fs::rename(source, destination) {
|
|
|
- fs::rename(&backup, destination).ok();
|
|
|
|
|
|
|
+ // Attempt restore, but propagate original error regardless
|
|
|
|
|
+ if let Err(restore_err) = fs::rename(&backup, destination) {
|
|
|
|
|
+ warn!(
|
|
|
|
|
+ "Failed to restore backup {} after failed replacement: {}",
|
|
|
|
|
+ backup.display(),
|
|
|
|
|
+ restore_err
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
return Err(e).with_context(|| {
|
|
return Err(e).with_context(|| {
|
|
|
format!(
|
|
format!(
|
|
|
- "Failed to rename {} to {}",
|
|
|
|
|
- source.display(),
|
|
|
|
|
- destination.display()
|
|
|
|
|
|
|
+ "Failed to replace {} with {}",
|
|
|
|
|
+ destination.display(),
|
|
|
|
|
+ source.display()
|
|
|
)
|
|
)
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Remove backup and its orphaned index
|
|
|
remove_bam_with_index(&backup).ok();
|
|
remove_bam_with_index(&backup).ok();
|
|
|
} else {
|
|
} else {
|
|
|
fs::rename(source, destination).with_context(|| {
|
|
fs::rename(source, destination).with_context(|| {
|
|
|
format!(
|
|
format!(
|
|
|
- "Failed to rename {} to {}",
|
|
|
|
|
|
|
+ "Failed to move {} to {}",
|
|
|
source.display(),
|
|
source.display(),
|
|
|
destination.display()
|
|
destination.display()
|
|
|
)
|
|
)
|
|
@@ -1736,7 +1240,6 @@ fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
|
|
|
|
|
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
/// Indexes a BAM file using samtools.
|
|
/// Indexes a BAM file using samtools.
|
|
|
fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> {
|
|
fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> {
|
|
|
let mut index_cmd = SamtoolsIndex {
|
|
let mut index_cmd = SamtoolsIndex {
|
|
@@ -1784,7 +1287,7 @@ fn merge_into_existing_final(
|
|
|
let input_bams = vec![source.to_path_buf(), destination.to_path_buf()];
|
|
let input_bams = vec![source.to_path_buf(), destination.to_path_buf()];
|
|
|
let mut merge_cmd = SamtoolsMergeMany::from_config(temp_merged.clone(), input_bams, config);
|
|
let mut merge_cmd = SamtoolsMergeMany::from_config(temp_merged.clone(), input_bams, config);
|
|
|
|
|
|
|
|
- SlurmRunner::run(&mut merge_cmd)
|
|
|
|
|
|
|
+ run!(config, &mut merge_cmd)
|
|
|
.with_context(|| "Failed to merge source into existing final BAM")?;
|
|
.with_context(|| "Failed to merge source into existing final BAM")?;
|
|
|
|
|
|
|
|
// No sort needed — merge preserves coordinate order from sorted inputs
|
|
// No sort needed — merge preserves coordinate order from sorted inputs
|
|
@@ -1803,11 +1306,7 @@ fn merge_into_existing_final(
|
|
|
///
|
|
///
|
|
|
/// Source is already sorted (from sort_and_index_chunks), so only
|
|
/// Source is already sorted (from sort_and_index_chunks), so only
|
|
|
/// move and index are needed.
|
|
/// move and index are needed.
|
|
|
-fn create_new_final(
|
|
|
|
|
- source: &Path,
|
|
|
|
|
- destination: &Path,
|
|
|
|
|
- config: &Config,
|
|
|
|
|
-) -> anyhow::Result<()> {
|
|
|
|
|
|
|
+fn create_new_final(source: &Path, destination: &Path, config: &Config) -> anyhow::Result<()> {
|
|
|
info!(" Creating new final BAM: {}", destination.display());
|
|
info!(" Creating new final BAM: {}", destination.display());
|
|
|
|
|
|
|
|
// Source already sorted — just move and index
|
|
// Source already sorted — just move and index
|
|
@@ -1943,7 +1442,8 @@ fn finalize_case_bams(
|
|
|
sorted_bam_map: &HashMap<String, Vec<PathBuf>>,
|
|
sorted_bam_map: &HashMap<String, Vec<PathBuf>>,
|
|
|
tmp_dir: &Path,
|
|
tmp_dir: &Path,
|
|
|
config: &Config,
|
|
config: &Config,
|
|
|
-) -> anyhow::Result<()> {
|
|
|
|
|
|
|
+) -> anyhow::Result<Vec<PathBuf>> {
|
|
|
|
|
+ let mut finalized = Vec::new();
|
|
|
for case in cases {
|
|
for case in cases {
|
|
|
let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
|
|
let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
|
|
|
let key = format!("{}_{}", case.case_id, sample_type_dir);
|
|
let key = format!("{}_{}", case.case_id, sample_type_dir);
|
|
@@ -1985,12 +1485,12 @@ fn finalize_case_bams(
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-
|
|
|
|
|
let stats = WGSBamStats::open(&case.case_id, &case.sample_type, config)?;
|
|
let stats = WGSBamStats::open(&case.case_id, &case.sample_type, config)?;
|
|
|
- info!(" ✅ Output: {}\n{stats}", final_bam_path.display());
|
|
|
|
|
|
|
+ info!("✅ Output: {}\n{stats}", final_bam_path.display());
|
|
|
|
|
+ finalized.push(final_bam_path);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- Ok(())
|
|
|
|
|
|
|
+ Ok(finalized)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Normalize barcode strings for matching.
|
|
/// Normalize barcode strings for matching.
|
|
@@ -2048,6 +1548,73 @@ fn extract_and_normalize_barcode(text: &str) -> Option<String> {
|
|
|
None
|
|
None
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
|
|
|
|
+pub struct PipelineMetrics {
|
|
|
|
|
+ pub validation_duration: Duration,
|
|
|
|
|
+ pub alignment_duration: Duration,
|
|
|
|
|
+ pub sort_index_duration: Duration,
|
|
|
|
|
+ pub finalize_duration: Duration,
|
|
|
|
|
+ pub total_duration: Duration,
|
|
|
|
|
+
|
|
|
|
|
+ pub bams_processed: usize,
|
|
|
|
|
+ pub cases_finalized: usize,
|
|
|
|
|
+ pub bytes_input: u64,
|
|
|
|
|
+ pub bytes_output: u64,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl PipelineMetrics {
|
|
|
|
|
+ pub fn throughput_mbps(&self) -> f64 {
|
|
|
|
|
+ if self.total_duration.as_secs_f64() == 0.0 {
|
|
|
|
|
+ return 0.0;
|
|
|
|
|
+ }
|
|
|
|
|
+ (self.bytes_input as f64 / 1_000_000.0) / self.total_duration.as_secs_f64()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn save(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
|
|
|
|
|
+ let file = BufWriter::new(File::create(path.as_ref())?);
|
|
|
|
|
+ serde_json::to_writer_pretty(file, self)?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl std::fmt::Display for PipelineMetrics {
|
|
|
|
|
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
+ writeln!(f, "📊 Pipeline Metrics")?;
|
|
|
|
|
+ writeln!(f, " Validation: {:>8.2?}", self.validation_duration)?;
|
|
|
|
|
+ writeln!(f, " Alignment: {:>8.2?}", self.alignment_duration)?;
|
|
|
|
|
+ writeln!(f, " Sort/Index: {:>8.2?}", self.sort_index_duration)?;
|
|
|
|
|
+ writeln!(f, " Finalize: {:>8.2?}", self.finalize_duration)?;
|
|
|
|
|
+ writeln!(f, " ─────────────────────")?;
|
|
|
|
|
+ writeln!(f, " Total: {:>8.2?}", self.total_duration)?;
|
|
|
|
|
+ writeln!(f)?;
|
|
|
|
|
+ writeln!(f, " BAMs processed: {:>6}", self.bams_processed)?;
|
|
|
|
|
+ writeln!(f, " Cases finalized: {:>6}", self.cases_finalized)?;
|
|
|
|
|
+ writeln!(
|
|
|
|
|
+ f,
|
|
|
|
|
+ " Input: {:>8.2} GiB",
|
|
|
|
|
+ self.bytes_input as f64 / 1024.0_f64.powi(3)
|
|
|
|
|
+ )?;
|
|
|
|
|
+ writeln!(
|
|
|
|
|
+ f,
|
|
|
|
|
+ " Output: {:>8.2} GiB",
|
|
|
|
|
+ self.bytes_output as f64 / 1024.0_f64.powi(3)
|
|
|
|
|
+ )?;
|
|
|
|
|
+ writeln!(f, " Throughput: {:>8.2} MB/s", self.throughput_mbps())
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+struct Timer(Instant);
|
|
|
|
|
+
|
|
|
|
|
+impl Timer {
|
|
|
|
|
+ fn start() -> Self {
|
|
|
|
|
+ Self(Instant::now())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn elapsed(&self) -> Duration {
|
|
|
|
|
+ self.0.elapsed()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod tests {
|
|
mod tests {
|
|
|
use super::*;
|
|
use super::*;
|