Thomas 1 week ago
parent
commit
78f31bc229

+ 10 - 4
pandora-config.example.toml

@@ -13,6 +13,12 @@ result_dir = "/mnt/beegfs02/scratch/t_steimle/data/wgs"
 # Temporary directory.
 tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 
+# Run cache directory.
+run_cache_dir = "/home/t_steimle/data/prom_runs"
+
+# Software threads
+threads = 5
+
 # Singularity bin
 singularity_bin = "module load singularity-ce && singularity"
 
@@ -374,7 +380,7 @@ dorado_basecall_arg = "-x 'cuda:all' sup,5mC_5hmC"
 dorado_should_realign = false
 
 # Dorado aligner threads number
-dorado_aligner_threads = 20
+dorado_aligner_threads = 10
 
 # Reference FASTA used for alignment.
 ref_fa = "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa"
@@ -386,14 +392,14 @@ ref_mmi = ""
 samtools_bin = "/mnt/beegfs02/scratch/t_steimle/tools/samtools"
 
 # Threads for `samtools view`.
-samtools_view_threads = 40
+samtools_view_threads = 10
 
 # Threads for `samtools sort`.
-samtools_sort_threads = 40
+samtools_sort_threads = 20
 
 # Threads for `samtools merge`.
 samtools_merge_threads = 40
 
 # Threads for `samtools split`.
-samtools_split_threads = 40
+samtools_split_threads = 20
 

+ 6 - 9
src/callers/deep_variant.rs

@@ -15,8 +15,7 @@ use crate::{
         vcf::Vcf,
     },
     commands::{
-        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        SlurmRunner,
+        SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, run_many_sbatch
     },
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists},
@@ -757,15 +756,15 @@ pub fn run_deepvariant_quartered_sbatch_with_merge(
     }
 
     // 3) run them concurrently on Slurm
-    // let outputs = run_many_sbatch(jobs)?;
+    let outputs = run_many_sbatch(jobs)?;
 
     // 4) merge PASS VCFs
-    // merge_deepvariant_parts(&base, n_parts)?;
+    merge_deepvariant_parts(&base, n_parts)?;
 
     // we don’t really have a single merged CapturedOutput;
     // return an empty one or synthesize something from `outputs`.
-    // Ok(outputs)
-    Ok(vec![])
+    Ok(outputs)
+    // Ok(vec![])
 }
 
 /// Merges PASS-filtered VCFs from parallel DeepVariant runs.
@@ -874,10 +873,8 @@ mod tests {
     fn deepvariant_run() -> anyhow::Result<()> {
         test_init();
         let config = Config::default();
-        let u = DeepVariant::initialize("34528", "norm", &config)?;
-        info!("{u}");
 
-        let _ = run_deepvariant_quartered_sbatch_with_merge("36167", "norm", &config)?;
+        let _ = run_deepvariant_quartered_sbatch_with_merge("34528", "norm", &config)?;
         Ok(())
     }
 }

+ 6 - 0
src/collection/bam_stats.rs

@@ -1322,8 +1322,14 @@ mod tests {
         test_init();
 
         let config = Config::default();
+        let stats = WGSBamStats::open("34528", "norm", &config)?;
+        println!("{stats}");
         let stats = WGSBamStats::open("36167", "norm", &config)?;
         println!("{stats}");
+        let stats = WGSBamStats::open("36434", "norm", &config)?;
+        println!("{stats}");
+        let stats = WGSBamStats::open("36480", "norm", &config)?;
+        println!("{stats}");
 
         Ok(())
     }

+ 4 - 12
src/collection/minknow.rs

@@ -1,4 +1,4 @@
-use std::{collections::BTreeMap, str::FromStr};
+use std::{collections::BTreeMap, path::Path, str::FromStr};
 
 use anyhow::Context;
 use ordered_float::OrderedFloat;
@@ -101,12 +101,13 @@ impl MinKnowSampleSheet {
     /// let sheet = MinKnowSampleSheet::from_path("samplesheet.csv")?;
     /// println!("Sample ID: {}", sheet.sample_id);
     /// ```
-    pub fn from_path(path: &str) -> anyhow::Result<Self> {
+    pub fn from_path(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let path = path.as_ref().to_path_buf();
         use std::fs::File;
         use std::io::{self, BufRead};
 
         let file =
-            File::open(path).map_err(|e| anyhow::anyhow!("Can't open file: {path}\n\t{e}"))?;
+            File::open(&path).map_err(|e| anyhow::anyhow!("Can't open file: {}\n\t{e}", path.display()))?;
         let reader = io::BufReader::new(file);
 
         let mut lines = reader.lines();
@@ -171,15 +172,6 @@ impl MinKnowSampleSheet {
 /// - The CSV is malformed or missing expected headers
 /// - A status value cannot be parsed into `NanoporeChannelStatus`
 ///
-/// # Example
-///
-/// ```rust
-/// let entries = load_channel_states("nanopore_data.csv")?;
-/// for entry in entries {
-///     println!("{:?}", entry);
-/// }
-/// ```
-///
 /// # Dependencies
 ///
 /// Requires the [`csv`](https://docs.rs/csv), [`serde`](https://docs.rs/serde), and [`serde_derive`](https://docs.rs/serde_derive) crates.

+ 1 - 0
src/collection/mod.rs

@@ -6,3 +6,4 @@ pub mod pod5;
 pub mod run;
 pub mod vcf;
 pub mod bam_stats;
+pub mod prom_run;

+ 736 - 0
src/collection/prom_run.rs

@@ -0,0 +1,736 @@
+//! # PromRun Module
+//!
+//! This module handles Oxford Nanopore Technologies (ONT) PromethION sequencing run data,
+//! including BAM file metadata extraction and run directory management.
+//!
+//! ## Overview
+//!
+//! The module provides two main structures:
+//! - [`PromBam`]: Represents metadata extracted from a single BAM file produced by MinKNOW
+//! - [`PromRun`]: Represents a complete sequencing run directory with all associated files
+//!
+//! ## Usage
+//!
+//! ```rust,ignore
+//! use crate::collection::prom::{PromBam, PromRun};
+//! use crate::config::Config;
+//!
+//! // Parse a single BAM file
+//! let bam = PromBam::from_path("/path/to/file.bam")?;
+//! println!("{}", bam);
+//!
+//! // Import an entire run directory
+//! let config = Config::default();
+//! let run = PromRun::from_dir("/path/to/run_dir", &config)?;
+//! println!("{}", run);
+//! ```
+
+use std::{
+    collections::BTreeMap,
+    fmt,
+    fs::{self, File},
+    io::{BufReader, BufWriter},
+    path::{Path, PathBuf},
+};
+
+use anyhow::Context;
+use chrono::{DateTime, Utc};
+use rayon::{
+    iter::{IntoParallelRefIterator, ParallelIterator},
+    ThreadPoolBuilder,
+};
+use rust_htslib::bam::{self, Read};
+use serde::{Deserialize, Serialize};
+
+use crate::{
+    collection::{
+        flowcells::IdInput,
+        minknow::{parse_pore_activity_from_reader, MinKnowSampleSheet, PoreStateEntry},
+        pod5::Pod5,
+    },
+    config::Config,
+    helpers::list_files_recursive,
+};
+
+/// Metadata extracted from an ONT PromethION BAM file header.
+///
+/// This structure parses and stores relevant information from BAM headers
+/// produced by MinKNOW basecalling, including run identifiers, basecalling
+/// models, and instrument information.
+///
+/// # Header Parsing
+///
+/// The parser extracts information from two SAM header record types:
+/// - `@RG` (Read Group): Run ID, basecall model, sample info, timestamps
+/// - `@PG` (Program): MinKNOW version and GPU information
+///
+/// # Example
+///
+/// ```rust,ignore
+/// let bam = PromBam::from_path("/path/to/reads.bam")?;
+///
+/// println!("Run: {}", bam.run_id);
+/// println!("Sample: {}", bam.sample);
+/// println!("Basecaller: {}", bam.basecall_model);
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PromBam {
+    /// Absolute path to the BAM file.
+    pub path: PathBuf,
+
+    /// Last modification timestamp of the file.
+    pub modified: DateTime<Utc>,
+
+    /// File size in bytes.
+    pub bam_size: u64,
+
+    /// Unique run identifier from the `@RG DS:runid=` field.
+    pub run_id: String,
+
+    /// Basecalling model name (e.g., `dna_r10.4.1_e8.2_400bps_sup@v4.2.0`).
+    pub basecall_model: String,
+
+    /// Modified base detection models, space-separated if multiple.
+    pub modbase_models: String,
+
+    /// Instrument/device model (e.g., `PromethION`, `P2I`).
+    pub instrument: String,
+
+    /// Flowcell barcode identifier (e.g., `PBI55810`).
+    pub flowcell_id: String,
+
+    /// Sample identifier from the sample sheet.
+    pub sample: String,
+
+    /// Library identifier.
+    pub library: String,
+
+    /// Full read group ID string from `@RG ID:`.
+    pub read_group_id: String,
+
+    /// Sequencing timestamp in ISO 8601 format from `@RG DT:`.
+    pub timestamp: String,
+
+    /// GPU information from MinKNOW `@PG DS:` field.
+    pub gpu_info: String,
+
+    /// MinKNOW software version.
+    pub minknow_version: String,
+}
+
+impl PromBam {
+    /// Parses BAM header metadata from the file at the given path.
+    ///
+    /// Opens the BAM file, reads the SAM header, and extracts ONT-specific
+    /// metadata from `@RG` and `@PG` records.
+    ///
+    /// # Arguments
+    ///
+    /// * `path` - Path to a BAM file (must be readable and valid BAM format)
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(PromBam)` with populated fields, or an error if:
+    /// - The file cannot be opened or read
+    /// - The file is not a valid BAM format
+    /// - The header cannot be parsed as UTF-8
+    ///
+    /// # Notes
+    ///
+    /// - Fields may be empty strings if the corresponding header tags are missing
+    /// - Only processes `@RG` records with `PL:ONT` (Oxford Nanopore platform)
+    /// - Only processes `@PG` records with `PN:minknow`
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// let bam = PromBam::from_path("sample.bam")?;
+    /// assert!(!bam.run_id.is_empty());
+    /// ```
+    pub fn from_path(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let path = path.as_ref().to_path_buf();
+
+        // Retrieve file metadata
+        let meta = fs::metadata(&path)
+            .with_context(|| format!("Failed to read metadata for {}", path.display()))?;
+        let modified: DateTime<Utc> = meta
+            .modified()
+            .map(DateTime::<Utc>::from)
+            .with_context(|| format!("Failed to get modification time for {}", path.display()))?;
+        let bam_size = meta.len();
+
+        // Open BAM and read header
+        let reader = bam::Reader::from_path(&path)
+            .with_context(|| format!("Failed to open BAM file: {}", path.display()))?;
+
+        let header = reader.header().clone();
+        let text =
+            std::str::from_utf8(header.as_bytes()).context("BAM header contains invalid UTF-8")?;
+
+        // Initialize with defaults
+        let mut prom = PromBam {
+            path,
+            modified,
+            bam_size,
+            run_id: String::new(),
+            basecall_model: String::new(),
+            modbase_models: String::new(),
+            instrument: String::new(),
+            flowcell_id: String::new(),
+            sample: String::new(),
+            library: String::new(),
+            read_group_id: String::new(),
+            timestamp: String::new(),
+            gpu_info: String::new(),
+            minknow_version: String::new(),
+        };
+
+        // Parse header lines
+        for line in text.lines() {
+            if !line.starts_with('@') {
+                continue;
+            }
+
+            let mut fields = line.split('\t');
+            let Some(tag) = fields.next() else {
+                continue;
+            };
+            let tag = tag.trim_start_matches('@');
+
+            // Build key-value map from tab-separated fields
+            let kv: Vec<(&str, &str)> = fields
+                .filter_map(|f| {
+                    let mut it = f.splitn(2, ':');
+                    Some((it.next()?, it.next().unwrap_or("")))
+                })
+                .collect();
+
+            match tag {
+                "PG" => Self::parse_pg_record(&kv, &mut prom),
+                "RG" => Self::parse_rg_record(&kv, &mut prom),
+                _ => {}
+            }
+        }
+
+        Ok(prom)
+    }
+
+    /// Parses `@PG` (Program) header record for MinKNOW-specific fields.
+    fn parse_pg_record(kv: &[(&str, &str)], prom: &mut PromBam) {
+        // Only process MinKNOW program records
+        let is_minknow = kv.iter().any(|(k, v)| *k == "PN" && *v == "minknow");
+        if !is_minknow {
+            return;
+        }
+
+        for (k, v) in kv {
+            match *k {
+                "DS" => prom.gpu_info = (*v).to_string(),
+                "VN" => prom.minknow_version = (*v).to_string(),
+                _ => {}
+            }
+        }
+    }
+
+    /// Parses `@RG` (Read Group) header record for ONT-specific fields.
+    fn parse_rg_record(kv: &[(&str, &str)], prom: &mut PromBam) {
+        // Only process ONT platform read groups
+        let is_ont = kv.iter().any(|(k, v)| *k == "PL" && *v == "ONT");
+        if !is_ont {
+            return;
+        }
+
+        // Parse composite DS field: runid=... basecall_model=... modbase_models=...
+        if let Some((_, ds_raw)) = kv.iter().find(|(k, _)| *k == "DS") {
+            for part in ds_raw.split_whitespace() {
+                if let Some(val) = part.strip_prefix("runid=") {
+                    prom.run_id = val.to_string();
+                } else if let Some(val) = part.strip_prefix("basecall_model=") {
+                    prom.basecall_model = val.to_string();
+                } else if let Some(val) = part.strip_prefix("modbase_models=") {
+                    // Remove commas from modbase_models list
+                    prom.modbase_models = val.replace(',', " ");
+                }
+            }
+        }
+
+        // Extract standard SAM tags
+        for (k, v) in kv {
+            match *k {
+                "ID" => prom.read_group_id = (*v).to_string(),
+                "DT" => prom.timestamp = (*v).to_string(),
+                "LB" => prom.library = (*v).to_string(),
+                "PM" => prom.instrument = (*v).to_string(),
+                "PU" => prom.flowcell_id = (*v).to_string(),
+                "SM" => prom.sample = (*v).to_string(),
+                _ => {}
+            }
+        }
+    }
+}
+
+impl fmt::Display for PromBam {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        writeln!(f, "PromBam")?;
+        writeln!(f, "  Path:            {}", self.path.display())?;
+        writeln!(f, "  Modified:        {}", self.modified)?;
+        writeln!(
+            f,
+            "  BamSize:         {:.2} GiB",
+            self.bam_size as f64 / (1024.0_f64.powi(3))
+        )?;
+        writeln!(f)?;
+        writeln!(f, "  Run ID:          {}", self.run_id)?;
+        writeln!(f, "  Basecaller:      {}", self.basecall_model)?;
+        writeln!(f, "  Modbase Models:  {}", self.modbase_models)?;
+        writeln!(f)?;
+        writeln!(f, "  Instrument:      {}", self.instrument)?;
+        writeln!(f, "  Flowcell ID:     {}", self.flowcell_id)?;
+        writeln!(f, "  Sample:          {}", self.sample)?;
+        writeln!(f, "  Library:         {}", self.library)?;
+        writeln!(f)?;
+        writeln!(f, "  RG ID:           {}", self.read_group_id)?;
+        writeln!(f, "  Timestamp:       {}", self.timestamp)?;
+        writeln!(f)?;
+        writeln!(f, "  GPU:             {}", self.gpu_info)?;
+        writeln!(f, "  MinKNOW Ver:     {}", self.minknow_version)
+    }
+}
+
+/// A complete ONT PromethION sequencing run with all associated files.
+///
+/// Represents an imported run directory containing BAM files, POD5 raw signal
+/// files, sample sheets, and pore activity logs. Provides JSON serialization
+/// for caching parsed metadata.
+///
+/// # Directory Structure
+///
+/// The importer recursively scans for files, supporting various MinKNOW output layouts:
+///
+/// ```text
+/// run_dir/
+/// ├── sample_sheet_*.csv       # Required: MinKNOW sample sheet
+/// ├── pore_activity_*.csv      # Optional: pore state logs
+/// │
+/// │   # POD5 files (any structure supported):
+/// ├── pod5/                    # Non-multiplexed runs
+/// │   └── *.pod5
+/// ├── pod5_pass/               # Multiplexed runs (pass reads)
+/// │   └── barcode*/
+/// │       └── *.pod5
+/// ├── pod5_fail/               # Failed reads (optional)
+/// │   └── *.pod5
+/// │
+/// │   # BAM files (any structure supported):
+/// ├── bam/                     # Non-multiplexed runs
+/// │   └── *.bam
+/// └── bam_pass/                # Multiplexed runs
+///     └── barcode*/
+///         └── *.bam
+/// ```
+///
+/// Files are discovered recursively regardless of subdirectory naming.
+///
+/// # Example
+///
+/// ```rust,ignore
+/// let config = Config::default();
+///
+/// // Import a new run
+/// let run = PromRun::from_dir("/data/runs/20240101_run", &config)?;
+///
+/// // Load a cached run
+/// let run = PromRun::open("protocol_run_id_here", &config)?;
+///
+/// println!("Flowcell: {}", run.flow_cell_id);
+/// println!("BAM files: {}", run.bams.len());
+/// println!("POD5 files: {}", run.pod5s.len());
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PromRun {
+    /// Root directory of the sequencing run.
+    pub dir: PathBuf,
+
+    /// Timestamp when this run was imported/parsed.
+    pub import_date: DateTime<Utc>,
+
+    /// Unique identifier for the protocol run (from sample sheet).
+    pub protocol_run_id: String,
+
+    /// Flowcell position identifier (e.g., device slot or port).
+    pub position_id: String,
+
+    /// Flowcell barcode/identifier (e.g., `FAB12345`).
+    pub flow_cell_id: String,
+
+    /// Sample ID from the sample sheet.
+    pub sample_id: String,
+
+    /// Experiment ID from the sample sheet.
+    pub experiment_id: String,
+
+    /// Flowcell product code (e.g., `FLO-MIN106`, `FLO-PRO002`).
+    pub flow_cell_product_code: String,
+
+    /// Library preparation kit identifier (e.g., `SQK-LSK114`).
+    pub kit: String,
+
+    /// Associated case identifiers for clinical/research tracking.
+    pub cases: Vec<IdInput>,
+
+    /// All POD5 raw signal files found in the run directory.
+    pub pod5s: Vec<Pod5>,
+
+    /// All BAM files found in the run directory.
+    pub bams: Vec<PromBam>,
+
+    /// Pore activity/state log entries, if available.
+    pub pore_activity: Option<Vec<PoreStateEntry>>,
+}
+
+impl PromRun {
+    /// Imports a sequencing run from a directory.
+    ///
+    /// Recursively scans the directory for BAM files, POD5 files, sample sheets,
+    /// and pore activity logs. Parses all files in parallel and caches the result.
+    ///
+    /// # Arguments
+    ///
+    /// * `dir` - Path to the run directory (must exist and be a directory)
+    /// * `config` - Application configuration (provides thread count and cache paths)
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(PromRun)` on success, or an error if:
+    /// - `dir` is not a directory
+    /// - No sample sheet (`sample_sheet_*.csv`) is found
+    /// - Sample sheet parsing fails
+    /// - Thread pool creation fails
+    ///
+    /// # Side Effects
+    ///
+    /// Automatically saves the parsed run to the JSON cache directory specified
+    /// in `config.run_cache_dir`.
+    ///
+    /// # Performance
+    ///
+    /// BAM and POD5 file parsing is parallelized using Rayon with the thread
+    /// count from `config.threads`.
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// let config = Config::default();
+    /// let run = PromRun::from_dir("/data/runs/my_run", &config)?;
+    /// println!("Imported {} BAM files", run.bams.len());
+    /// ```
+    pub fn from_dir(dir: impl AsRef<Path>, config: &Config) -> anyhow::Result<Self> {
+        let dir = dir.as_ref().to_path_buf();
+        if !dir.is_dir() {
+            anyhow::bail!(
+                "Failed to import Run: input path is not a directory: {}",
+                dir.display()
+            );
+        }
+
+        // Collect file paths by type
+        let mut bam_paths = Vec::new();
+        let mut sample_sheet_path = None;
+        let mut pod5_paths = Vec::new();
+        let mut pore_activity_path = None;
+
+        for file in list_files_recursive(&dir) {
+            let name = file.file_name().and_then(|s| s.to_str()).unwrap_or("");
+            let ext = file.extension().and_then(|e| e.to_str()).unwrap_or("");
+
+            match ext {
+                "bam" => bam_paths.push(file),
+                "pod5" => pod5_paths.push(file),
+                "csv" => {
+                    if name.starts_with("sample_sheet") {
+                        sample_sheet_path = Some(file);
+                    } else if name.starts_with("pore_activity") {
+                        pore_activity_path = Some(file);
+                    }
+                }
+                _ => {}
+            }
+        }
+
+        // Parse required sample sheet
+        let sample_sheet = sample_sheet_path
+            .ok_or_else(|| anyhow::anyhow!("No sample_sheet_*.csv found in {}", dir.display()))
+            .and_then(MinKnowSampleSheet::from_path)?;
+
+        // Parse optional pore activity
+        let pore_activity = pore_activity_path
+            .and_then(|path| File::open(path).ok())
+            .and_then(|mut reader| parse_pore_activity_from_reader(&mut reader).ok());
+
+        // Build thread pool for parallel parsing
+        let pool = ThreadPoolBuilder::new()
+            .num_threads(config.threads.into())
+            .build()
+            .context("Failed to build Rayon thread pool")?;
+
+        // Parse BAM files in parallel
+        let bams: Vec<PromBam> = pool.install(|| {
+            bam_paths
+                .par_iter()
+                .filter_map(|p| match PromBam::from_path(p) {
+                    Ok(bam) => Some(bam),
+                    Err(e) => {
+                        log::warn!("Failed to parse BAM {}: {}", p.display(), e);
+                        None
+                    }
+                })
+                .collect()
+        });
+
+        // Parse POD5 files in parallel
+        let pod5s: Vec<Pod5> = pool.install(|| {
+            pod5_paths
+                .par_iter()
+                .filter_map(|p| match Pod5::from_path(p) {
+                    Ok(pod5) => Some(pod5),
+                    Err(e) => {
+                        log::warn!("Failed to parse POD5 {}: {}", p.display(), e);
+                        None
+                    }
+                })
+                .collect()
+        });
+
+        let prom_run = Self {
+            dir,
+            import_date: Utc::now(),
+            protocol_run_id: sample_sheet.protocol_run_id,
+            position_id: sample_sheet.position_id,
+            flow_cell_id: sample_sheet.flow_cell_id,
+            sample_id: sample_sheet.sample_id,
+            experiment_id: sample_sheet.experiment_id,
+            flow_cell_product_code: sample_sheet.flow_cell_product_code,
+            kit: sample_sheet.kit,
+            cases: Vec::new(),
+            pod5s,
+            bams,
+            pore_activity,
+        };
+
+        // Cache to disk
+        prom_run.save_json(prom_run.cache_path(config))?;
+
+        Ok(prom_run)
+    }
+
+    /// Serializes this run to a JSON file.
+    ///
+    /// # Arguments
+    ///
+    /// * `path` - Output file path (will be created or overwritten)
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if file creation or JSON serialization fails.
+    pub fn save_json(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
+        let path = path.as_ref();
+        let file = BufWriter::new(
+            File::create(path)
+                .with_context(|| format!("Failed to create JSON file: {}", path.display()))?,
+        );
+        serde_json::to_writer_pretty(file, self)
+            .with_context(|| format!("Failed to serialize PromRun to {}", path.display()))?;
+        Ok(())
+    }
+
+    /// Deserializes a run from a JSON file.
+    ///
+    /// # Arguments
+    ///
+    /// * `path` - Path to a previously saved JSON cache file
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be read or parsed.
+    pub fn load_json(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let path = path.as_ref();
+        let file = BufReader::new(
+            File::open(path)
+                .with_context(|| format!("Failed to open JSON file: {}", path.display()))?,
+        );
+        let data = serde_json::from_reader(file)
+            .with_context(|| format!("Failed to parse PromRun from {}", path.display()))?;
+        Ok(data)
+    }
+
+    /// Returns the cache file path for this run.
+    ///
+    /// The cache path is `{config.run_cache_dir}/{protocol_run_id}.json`.
+    #[must_use]
+    pub fn cache_path(&self, config: &Config) -> PathBuf {
+        let mut path = PathBuf::from(&config.run_cache_dir);
+        path.push(format!("{}.json", self.protocol_run_id));
+        path
+    }
+
+    /// Opens a previously cached run by its protocol run ID.
+    ///
+    /// # Arguments
+    ///
+    /// * `run_id` - The protocol run ID (filename stem in the cache directory)
+    /// * `config` - Application configuration with `run_cache_dir`
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// let run = PromRun::open("abc123-def456", &config)?;
+    /// ```
+    pub fn open(run_id: &str, config: &Config) -> anyhow::Result<Self> {
+        let mut path = PathBuf::from(&config.run_cache_dir);
+        path.push(format!("{run_id}.json"));
+        Self::load_json(path)
+    }
+
+    /// Returns the total size of all BAM files in bytes.
+    #[must_use]
+    pub fn total_bam_size(&self) -> u64 {
+        self.bams.iter().map(|b| b.bam_size).sum()
+    }
+
+    /// Returns the total size of all POD5 files in bytes.
+    #[must_use]
+    pub fn total_pod5_size(&self) -> u64 {
+        self.pod5s.iter().map(|p| p.file_size).sum()
+    }
+}
+
+/// Statistics for files in a single directory.
+#[derive(Default)]
+struct DirStats {
+    pod5_count: usize,
+    pod5_size: u64,
+    bam_count: usize,
+    bam_size: u64,
+}
+
+impl fmt::Display for PromRun {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        /// Converts bytes to GiB.
+        fn to_gib(bytes: u64) -> f64 {
+            bytes as f64 / 1024.0_f64.powi(3)
+        }
+
+        let mut dir_stats: BTreeMap<String, DirStats> = BTreeMap::new();
+
+        // Aggregate POD5 stats by directory
+        for pod5 in &self.pod5s {
+            let rel = pod5.path.strip_prefix(&self.dir).unwrap_or(&pod5.path);
+            let dir = rel.parent().unwrap_or(Path::new("."));
+            let key = dir.display().to_string();
+
+            let entry = dir_stats.entry(key).or_default();
+            entry.pod5_count += 1;
+            entry.pod5_size += pod5.file_size;
+        }
+
+        // Aggregate BAM stats by directory
+        for bam in &self.bams {
+            let rel = bam.path.strip_prefix(&self.dir).unwrap_or(&bam.path);
+            let dir = rel.parent().unwrap_or(Path::new("."));
+            let key = dir.display().to_string();
+
+            let entry = dir_stats.entry(key).or_default();
+            entry.bam_count += 1;
+            entry.bam_size += bam.bam_size;
+        }
+
+        // Header information
+        writeln!(f, "📦 PromRun")?;
+        writeln!(f, "  dir        : {}", self.dir.display())?;
+        writeln!(f, "  imported   : {}", self.import_date)?;
+        writeln!(f, "  run id     : {}", self.protocol_run_id)?;
+        writeln!(f, "  position   : {}", self.position_id)?;
+        writeln!(
+            f,
+            "  flow cell  : {} ({})",
+            self.flow_cell_id, self.flow_cell_product_code
+        )?;
+        writeln!(f, "  sample id  : {}", self.sample_id)?;
+        writeln!(f, "  experiment : {}", self.experiment_id)?;
+        writeln!(f, "  kit        : {}", self.kit)?;
+        writeln!(f, "  cases      : {}", self.cases.len())?;
+
+        match &self.pore_activity {
+            Some(pa) => writeln!(f, "  pore act.  : {} entries", pa.len())?,
+            None => writeln!(f, "  pore act.  : none")?,
+        }
+
+        writeln!(f)?;
+        writeln!(
+            f,
+            "📁 Files by directory (relative to {})",
+            self.dir.display()
+        )?;
+
+        if dir_stats.is_empty() {
+            writeln!(f, "  (no files)")?;
+            return Ok(());
+        }
+
+        for (dir, stats) in dir_stats {
+            writeln!(f, "  - {dir}")?;
+            if stats.pod5_count > 0 {
+                writeln!(
+                    f,
+                    "      POD5 : {:>3} files, {:6.2} GiB",
+                    stats.pod5_count,
+                    to_gib(stats.pod5_size)
+                )?;
+            }
+            if stats.bam_count > 0 {
+                writeln!(
+                    f,
+                    "      BAM  : {:>3} files, {:6.2} GiB",
+                    stats.bam_count,
+                    to_gib(stats.bam_size)
+                )?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use log::info;
+
+    use crate::helpers::test_init;
+
+    #[test]
+    fn prom_run_bam() -> anyhow::Result<()> {
+        test_init();
+
+        let bam_file = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A/bam_pass/barcode02/PBI55810_pass_barcode02_22582b29_d02c5bb8_0.bam";
+        let bam = PromBam::from_path(bam_file)?;
+        info!("{bam}");
+
+        let bam_file = "/home/t_steimle/mnt/prom/20251121_001_01_CD/01/20251121_1531_P2I-00461-A_PBI52256_b1dd5673/bam_pass/PBI52256_pass_b1dd5673_414982db_0.bam";
+        let bam = PromBam::from_path(bam_file)?;
+        info!("{bam}");
+        Ok(())
+    }
+
+    #[test]
+    fn prom_run_import() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+
+        let dir = "/home/t_steimle/mnt/prom/20251121_001_01_CD/01/20251121_1531_P2I-00461-A_PBI52256_b1dd5673";
+
+        let prom_run = PromRun::from_dir(dir, &config)?;
+        info!("{prom_run}");
+        Ok(())
+    }
+}

+ 1 - 1
src/commands/dorado.rs

@@ -210,7 +210,7 @@ impl DoradoAlign {
             slurm_params: SlurmParams {
                 job_name: Some("dorado_align".into()),
                 cpus_per_task: Some(threads.into()),
-                mem: Some("60G".into()),
+                mem: Some("30G".into()),
                 partition: Some("shortq".into()),
                 gres: None,
             },

+ 83 - 86
src/commands/samtools.rs

@@ -258,10 +258,11 @@ impl super::SlurmRunner for SamtoolsMerge {
     }
 }
 
+use std::io::Write;
+
 /// Merges multiple BAM files into a single output BAM.
 ///
-/// Uses the first input BAM as the header source while including
-/// reads from all input BAMs.
+/// Uses a temporary file list (`-b` option) to avoid command-line length limits.
 pub struct SamtoolsMergeMany {
     /// Path to samtools binary
     pub bin: String,
@@ -273,17 +274,25 @@ pub struct SamtoolsMergeMany {
     pub bams: Vec<PathBuf>,
     /// Pipeline configuration
     pub config: Config,
+    /// Temporary file containing list of BAM paths
+    bam_list_file: Option<PathBuf>,
+}
+
+impl SamtoolsMergeMany {
+    /// Creates a new merge command from config defaults.
+    pub fn from_config(into: PathBuf, bams: Vec<PathBuf>, config: &Config) -> Self {
+        Self {
+            bin: config.align.samtools_bin.clone(),
+            threads: config.align.samtools_merge_threads as usize,
+            into,
+            bams,
+            config: config.clone(),
+            bam_list_file: None,
+        }
+    }
 }
 
 impl super::Command for SamtoolsMergeMany {
-    /// Prepares the filesystem and validates preconditions for the multi-merge.
-    ///
-    /// This method:
-    /// 1. Ensures at least one input BAM is provided
-    /// 2. Ensures all input BAMs exist
-    /// 3. Creates the output directory if needed
-    /// 4. Ensures no QNAME overlap across any pair of BAMs
-    /// 5. Saves the union QNAME set next to the output BAM
     fn init(&mut self) -> anyhow::Result<()> {
         if self.bams.is_empty() {
             anyhow::bail!("At least one input BAM is required");
@@ -305,7 +314,7 @@ impl super::Command for SamtoolsMergeMany {
             }
         }
 
-        // Check that all input BAMs exist
+        // Check all input BAMs exist
         for bam in &self.bams {
             anyhow::ensure!(
                 bam.exists(),
@@ -314,39 +323,86 @@ impl super::Command for SamtoolsMergeMany {
             );
         }
 
-        // Skip QNAME collision check if only one BAM
-        if self.bams.len() == 1 {
+        // Write BAM list file
+        let list_file = self
+            .into
+            .parent()
+            .unwrap_or(Path::new("."))
+            .join(format!(".bam_list_{}.txt", Uuid::new_v4()));
+
+        let mut file = fs::File::create(&list_file)
+            .with_context(|| format!("Failed to create BAM list file: {}", list_file.display()))?;
+
+        for bam in &self.bams {
+            writeln!(file, "{}", bam.display())
+                .context("Failed to write to BAM list file")?;
+        }
+
+        self.bam_list_file = Some(list_file);
+
+        // QNAME collision check
+        if self.bams.len() > 1 {
+            self.check_qname_collisions()?;
+        } else {
             let qset = QNameSet::load_or_create(
                 &self.bams[0],
                 self.config.bam_min_mapq,
                 self.threads,
             )?;
             qset.save(WGSBamStats::qnames_path_from_bam(&self.into))?;
-            return Ok(());
         }
 
-        // QNAME uniqueness check across all BAMs
+        Ok(())
+    }
+
+    fn cmd(&self) -> String {
+        let header_bam = self
+            .bams
+            .first()
+            .expect("bams should be non-empty in cmd()");
+
+        let list_file = self
+            .bam_list_file
+            .as_ref()
+            .expect("bam_list_file should be set after init()");
+
+        format!(
+            "{bin} merge -@ {threads} -c -p -h {header} -b {list} {output}",
+            bin = self.bin,
+            threads = self.threads,
+            header = header_bam.display(),
+            list = list_file.display(),
+            output = self.into.display(),
+        )
+    }
+
+    fn clean_up(&self) -> anyhow::Result<()> {
+        if let Some(ref list_file) = self.bam_list_file {
+            fs::remove_file(list_file).ok();
+        }
+        Ok(())
+    }
+}
+
+impl SamtoolsMergeMany {
+    fn check_qname_collisions(&self) -> anyhow::Result<()> {
         let mut iter = self.bams.iter();
-        let first_bam = iter.next().expect("checked non-empty above");
+        let first_bam = iter.next().expect("checked non-empty");
 
         let mut union_qset = QNameSet::load_or_create(
             first_bam,
             self.config.bam_min_mapq,
-            self.threads,  // Fixed: was hardcoded to 4
+            self.threads,
         )
-        .with_context(|| {
-            format!("Failed to load or create qnames for {}", first_bam.display())
-        })?;
+        .with_context(|| format!("Failed to load qnames for {}", first_bam.display()))?;
 
         for bam in iter {
             let bam_qset = QNameSet::load_or_create(
                 bam,
                 self.config.bam_min_mapq,
-                self.threads,  // Fixed: was hardcoded to 4
+                self.threads,
             )
-            .with_context(|| {
-                format!("Failed to load or create qnames for {}", bam.display())
-            })?;
+            .with_context(|| format!("Failed to load qnames for {}", bam.display()))?;
 
             let (intersection, frac) = union_qset.intersect(&bam_qset);
             let n_shared = intersection.len();
@@ -363,74 +419,14 @@ impl super::Command for SamtoolsMergeMany {
             union_qset.merge(&bam_qset);
         }
 
-        // Save the union QNAME set next to the OUTPUT BAM
         let qnames_path = WGSBamStats::qnames_path_from_bam(&self.into);
-        union_qset.save(&qnames_path).with_context(|| {
-            format!("Failed to save union qnames to {}", qnames_path.display())
-        })?;
+        union_qset.save(&qnames_path)
+            .with_context(|| format!("Failed to save union qnames to {}", qnames_path.display()))?;
 
         Ok(())
     }
-
-    /// Builds the `samtools merge` command line.
-    ///
-    /// # Command Format
-    ///
-    /// ```text
-    /// samtools merge -@ {threads} -c -p -h {header_bam} {output} {input1} {input2} ...
-    /// ```
-    ///
-    /// - `-@`: Number of compression threads
-    /// - `-c`: Combine @RG headers with colliding IDs (use first occurrence)
-    /// - `-p`: Combine @PG headers with colliding IDs (use first occurrence)
-    /// - `-h`: Use header from specified BAM (reads from that file are ignored for header)
-    ///
-    /// The first input BAM provides the header. All input BAMs (including the first)
-    /// contribute reads to the output.
-    fn cmd(&self) -> String {
-        let header_bam = self
-            .bams
-            .first()
-            .expect("bams should be non-empty in cmd()");
-
-        let inputs = self
-            .bams
-            .iter()
-            .map(|p| p.display().to_string())
-            .collect::<Vec<_>>()
-            .join(" ");
-
-        // -h header_bam: use header from first BAM
-        // inputs: all BAMs including first (for reads)
-        format!(
-            "{bin} merge -@ {threads} -c -p -h {header} {output} {inputs}",
-            bin = self.bin,
-            threads = self.threads,
-            header = header_bam.display(),
-            output = self.into.display(),
-            inputs = inputs,
-        )
-    }
-
-    fn clean_up(&self) -> anyhow::Result<()> {
-        Ok(())
-    }
-}
-
-impl SamtoolsMergeMany {
-    /// Creates a new merge command from config defaults.
-    pub fn from_config(into: PathBuf, bams: Vec<PathBuf>, config: &Config) -> Self {
-        Self {
-            bin: config.align.samtools_bin.clone(),
-            threads: config.bam_n_threads as usize,
-            into,
-            bams,
-            config: config.clone(),
-        }
-    }
 }
 
-
 impl super::Runner for SamtoolsMergeMany {}
 
 impl super::SlurmRunner for SamtoolsMergeMany {
@@ -446,6 +442,7 @@ impl super::SlurmRunner for SamtoolsMergeMany {
     }
 }
 
+
 #[derive(Debug)]
 pub struct SamtoolsSplit {
     /// Path to the `samtools` executable.

+ 6 - 0
src/config.rs

@@ -22,6 +22,12 @@ pub struct Config {
     /// Temporary directory.
     pub tmp_dir: String,
 
+    /// Run cache directory.
+    pub run_cache_dir: String,
+
+    /// Software threads
+    pub threads: u8,
+
     /// Singularity bin
     pub singularity_bin: String,
 

+ 41 - 0
src/helpers.rs

@@ -518,6 +518,25 @@ pub fn list_directories(dir_path: PathBuf) -> std::io::Result<Vec<String>> {
     Ok(directories)
 }
 
+pub fn list_files_recursive(root: impl AsRef<Path>) -> Vec<PathBuf> {
+    fn walk(dir: &Path, out: &mut Vec<PathBuf>) {
+        if let Ok(entries) = fs::read_dir(dir) {
+            for entry in entries.flatten() {
+                let path = entry.path();
+                if path.is_dir() {
+                    walk(&path, out);
+                } else if path.is_file() {
+                    out.push(path);
+                }
+            }
+        }
+    }
+
+    let mut files = Vec::new();
+    walk(root.as_ref(), &mut files);
+    files
+}
+
 /// Checks whether the modification time of `file1` is older than `file2`.
 ///
 /// If `rm` is `true` and `file1` is older, attempts to remove the directory containing `file1`.
@@ -839,3 +858,25 @@ pub fn split_genome_into_n_regions(
 
     regions
 }
+
+/// Removes a BAM file and its associated index file (.bai).
+pub fn remove_bam_with_index(bam: &Path) -> anyhow::Result<()> {
+    // Remove BAM
+    if bam.exists() {
+        fs::remove_file(bam)
+            .with_context(|| format!("Failed to remove BAM: {}", bam.display()))?;
+    }
+
+    // Remove index (.bam.bai or .bai)
+    let bai = bam.with_extension("bam.bai");
+    if bai.exists() {
+        fs::remove_file(&bai).ok(); // Don't fail if index doesn't exist
+    }
+
+    let bai_alt = bam.with_extension("bai");
+    if bai_alt.exists() {
+        fs::remove_file(&bai_alt).ok();
+    }
+
+    Ok(())
+}

+ 43 - 23
src/pipes/somatic_slurm.rs

@@ -12,7 +12,7 @@ use crate::{
         SlurmRunner,
     },
     config::Config,
-    helpers::{extract_barcode, list_files_with_ext, TempDirGuard},
+    helpers::{extract_barcode, list_files_with_ext, remove_bam_with_index, TempDirGuard},
 };
 
 /// Normalize barcode strings for matching.
@@ -264,7 +264,6 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
     let mut tmp_split_dir: Option<PathBuf> = None;
     let mut _temp_guards: Vec<TempDirGuard> = Vec::new();
 
-
     if !pod5s_to_basecall.is_empty() {
         info!(
             "Will basecall {} POD5 file(s) without corresponding BAM",
@@ -287,8 +286,9 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
                 .ok_or_else(|| anyhow::anyhow!("Invalid POD5 filename: {}", src.display()))?;
             let dst = local_pod5_dir.join(file_name);
             #[cfg(unix)]
-            std::os::unix::fs::symlink(src, &dst)
-                .with_context(|| format!("Failed to symlink {} → {}", src.display(), dst.display()))?;
+            std::os::unix::fs::symlink(src, &dst).with_context(|| {
+                format!("Failed to symlink {} → {}", src.display(), dst.display())
+            })?;
 
             #[cfg(not(unix))]
             fs::copy(src, &dst)
@@ -515,7 +515,7 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
             // Only one chunk: use it directly
             aligned_bams[0].clone()
         } else {
-            let tmp_case_merged = tmp_dir.join(format!(
+            let tmp_merged = tmp_dir.join(format!(
                 "{}_{}_{}_merged.bam",
                 Uuid::new_v4(),
                 case.case_id,
@@ -526,29 +526,21 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
                 "  Merging {} chunk BAMs for case {} → {}",
                 aligned_bams.len(),
                 case.case_id,
-                tmp_case_merged.display()
+                tmp_merged.display()
             );
 
-            let mut merge_many_cmd = SamtoolsMergeMany::from_config(
-                tmp_case_merged.clone(),
-                aligned_bams.clone(),
-                config,
-            );
+            let mut merge_many_cmd =
+                SamtoolsMergeMany::from_config(tmp_merged.clone(), aligned_bams.clone(), config);
             SlurmRunner::run(&mut merge_many_cmd)?;
 
-            // We can safely remove the chunk BAMs now (they were all inputs).
+            // Remove chunk BAMs and their indices
             for bam in aligned_bams {
-                if bam != &tmp_case_merged {
-                    if let Err(e) = fs::remove_file(bam) {
-                        warn!(
-                            "Failed to remove chunk BAM {} after merge: {e}",
-                            bam.display()
-                        );
-                    }
+                if bam != &tmp_merged {
+                    remove_bam_with_index(bam).ok();
                 }
             }
 
-            tmp_case_merged
+            tmp_merged
         };
 
         // -----------------------------------------------------------------
@@ -589,7 +581,7 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
                 final_bam_path.display()
             );
 
-            // First per-case BAM becomes the base final BAM.
+            // Merged per-case BAM becomes the base final BAM.
             fs::rename(&case_merged_bam, &final_bam_path)?;
         }
 
@@ -705,14 +697,42 @@ mod tests {
     use crate::helpers::test_init;
 
     #[test]
-    fn slurm_pipe() -> anyhow::Result<()> {
+    fn import_run_test() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+
+        let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
+
+        let mut runs = Pod5sRuns::new();
+        runs.add_run_dir(dir)?;
+
+        runs.data[0].add_id_input(IdInput {
+            case_id: "TEST_02".to_string(),
+            sample_type: "NOR".to_string(),
+            barcode: "NB02".to_string(),
+            flow_cell_id: "".to_string(),
+        });
+        runs.data[0].add_id_input(IdInput {
+            case_id: "TEST_04".to_string(),
+            sample_type: "NOR".to_string(),
+            barcode: "NB04".to_string(),
+            flow_cell_id: "".to_string(),
+        });
+
+        import_run(&runs.data[0], &config)?;
+
+        Ok(())
+    }
+
+    #[test]
+    fn slurm_pipe_prod() -> anyhow::Result<()> {
         test_init();
 
         let config = Config::default();
 
         let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
         for run in runs.data {
-            if run.flow_cell_id != "PBI54633" {
+            if run.pod5s.len() != 2 {
                 continue;
             }
             info!("Importing run:\n{}", run);