Browse Source

run and run_many macro

Thomas 1 week ago
parent
commit
5e4e7c0275

+ 4 - 1
pandora-config.example.toml

@@ -13,6 +13,9 @@ result_dir = "/mnt/beegfs02/scratch/t_steimle/data/wgs"
 # Temporary directory.
 tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 
+# Should use Slurm as runner
+slurm_runner = true
+
 # Run cache directory.
 run_cache_dir = "/home/t_steimle/data/prom_runs"
 
@@ -168,7 +171,7 @@ deepvariant_output_dir = "{result_dir}/{id}/{time}/DeepVariant"
 deepvariant_threads = 20
 
 # DeepVariant singularity image path
-deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/deepvariant_latest-gpu.sif"
+deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/deepvariant_latest.sif"
 
 # DeepVariant version / image tag.
 deepvariant_bin_version = "1.9.0"

File diff suppressed because it is too large
+ 0 - 9
slurm-2507862.out


+ 196 - 216
src/callers/clairs.rs

@@ -1,18 +1,171 @@
+//! # ClairS Somatic Variant Calling Pipeline
+//!
+//! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS),
+//! a deep learning-based somatic variant caller designed for long-read sequencing data
+//! (ONT and PacBio HiFi).
+//!
+//! ## Overview
+//!
+//! ClairS performs somatic variant calling on paired tumor-normal samples using:
+//!
+//! - Haplotype-aware variant calling with LongPhase integration
+//! - Separate SNV and indel calling pipelines
+//! - Clair3-based germline variant detection on the normal sample
+//!
+//! ## Execution Modes
+//!
+//! The module supports three execution strategies:
+//!
+//! - **Local** ([`ClairS::run_local`]) — Single-node execution, useful for debugging
+//! - **Slurm** ([`ClairS::run_sbatch`]) — Single HPC job submission
+//! - **Chunked** ([`run_clairs_chunked_sbatch_with_merge`]) — Parallel execution across genome regions
+//!
+//! ### Chunked Parallel Execution
+//!
+//! For large genomes, the chunked mode provides significant speedup:
+//!
+//! ```text
+//! ┌─────────────────────────────────────────────────────────────────┐
+//! │                     Genome Splitting                            │
+//! │  chr1:1-50M  │  chr1:50M-100M  │  chr2:1-50M  │  ...  │  chrN   │
+//! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘
+//!        │                │                │                   │
+//!        ▼                ▼                ▼                   ▼
+//! ┌────────────┐  ┌────────────┐  ┌────────────┐       ┌────────────┐
+//! │  ClairS    │  │  ClairS    │  │  ClairS    │  ...  │  ClairS    │
+//! │  Part 1    │  │  Part 2    │  │  Part 3    │       │  Part N    │
+//! └─────┬──────┘  └─────┬──────┘  └─────┬──────┘       └─────┬──────┘
+//!       │               │               │                   │
+//!       ▼               ▼               ▼                   ▼
+//! ┌────────────┐  ┌────────────┐  ┌────────────┐       ┌────────────┐
+//! │ Postprocess│  │ Postprocess│  │ Postprocess│  ...  │ Postprocess│
+//! │ SNV+Indel  │  │ SNV+Indel  │  │ SNV+Indel  │       │ SNV+Indel  │
+//! │ → PASS     │  │ → PASS     │  │ → PASS     │       │ → PASS     │
+//! └─────┬──────┘  └─────┬──────┘  └─────┬──────┘       └─────┬──────┘
+//!       │               │               │                   │
+//!       └───────────────┴───────┬───────┴───────────────────┘
+//!                               ▼
+//!                    ┌─────────────────────┐
+//!                    │   bcftools concat   │
+//!                    │   (somatic PASS)    │
+//!                    └──────────┬──────────┘
+//!                               ▼
+//!                    ┌─────────────────────┐
+//!                    │   Final VCF Output  │
+//!                    └─────────────────────┘
+//! ```
+//!
+//! ## Output Files
+//!
+//! Somatic variants (PASS only):
+//! ```text
+//! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz
+//! ```
+//!
+//! Germline variants (PASS only):
+//! ```text
+//! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz
+//! ```
+//!
+//! ## Post-processing Pipeline
+//!
+//! Each ClairS run (or part) undergoes automatic post-processing:
+//!
+//! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants
+//! 2. **Germline**: Filter PASS variants from Clair3 germline output
+//!
+//! ## Usage
+//!
+//! ### Basic Slurm Execution
+//!
+//! ```ignore
+//! use crate::config::Config;
+//! use crate::pipes::clairs::ClairS;
+//! use crate::pipes::Initialize;
+//!
+//! let config = Config::default();
+//! let mut clairs = ClairS::initialize("sample_001", &config)?;
+//! let output = clairs.run_sbatch()?;
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ### Chunked Parallel Execution (Recommended for WGS)
+//!
+//! ```ignore
+//! use crate::config::Config;
+//! use crate::pipes::clairs::run_clairs_chunked_sbatch_with_merge;
+//!
+//! let config = Config::default();
+//! let outputs = run_clairs_chunked_sbatch_with_merge("sample_001", &config, 20)?;
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ### Loading Variants for Downstream Analysis
+//!
+//! ```ignore
+//! use crate::annotation::Annotations;
+//! use crate::variant::variant::Variants;
+//!
+//! let annotations = Annotations::new();
+//! let somatic = clairs.variants(&annotations)?;
+//! let germline = clairs.germline(&annotations)?;
+//!
+//! println!("Somatic: {} variants", somatic.variants.len());
+//! println!("Germline: {} variants", germline.variants.len());
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ## Configuration Requirements
+//!
+//! The following [`Config`](crate::config::Config) fields must be set:
+//!
+//! - `singularity_bin` — Path to Singularity executable
+//! - `clairs_image` — ClairS container image path
+//! - `reference` — Reference genome FASTA
+//! - `clairs_threads` — CPU threads per job
+//! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.)
+//! - `clairs_force` — Force re-run even if outputs exist
+//!
+//! ## Implemented Traits
+//!
+//! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation
+//! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating
+//! - [`JobCommand`](crate::commands::Command) — Command string generation
+//! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support
+//! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission
+//! - [`Run`](crate::runners::Run) — Unified execution interface
+//! - [`Variants`](crate::variant::variant::Variants) — Somatic variant loading
+//! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category
+//! - [`Label`](crate::variant::variant::Label) — Human-readable identifier
+//! - [`Version`](crate::pipes::Version) — Tool version extraction
+//!
+//! ## Dependencies
+//!
+//! External tools (containerized or system):
+//!
+//! - **ClairS** — Somatic variant calling
+//! - **bcftools** — VCF concatenation and filtering
+//!
+//! ## References
+//!
+//! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
+//! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
 use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     collection::vcf::Vcf,
     commands::{
         bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        run_many_sbatch, CapturedOutput, Command as JobCommand, Runner as LocalRunner,
-        SbatchRunner, SlurmParams, SlurmRunner,
+        CapturedOutput, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner,
+        SlurmParams, SlurmRunner,
     },
     config::Config,
     helpers::{
-        get_genome_sizes, is_file_older, remove_dir_if_exists, split_genome_into_n_regions,
+        get_genome_sizes, is_file_older, remove_dir_if_exists, split_genome_into_n_regions_exact,
         temp_file_path,
     },
     io::vcf::read_vcf,
     pipes::{Initialize, ShouldRun, Version},
+    run, run_many,
     runners::Run,
     variant::{
         variant::{Label, Variants},
@@ -182,9 +335,10 @@ impl JobCommand for ClairS {
 }
 
 impl LocalRunner for ClairS {}
+impl LocalBatchRunner for ClairS {}
 
-impl SbatchRunner for ClairS {
-    fn slurm_params(&self) -> SlurmParams {
+impl SlurmRunner for ClairS {
+    fn slurm_args(&self) -> Vec<String> {
         SlurmParams {
             job_name: Some(format!("clairs_{}", self.id)),
             cpus_per_task: Some(self.config.clairs_threads as u32),
@@ -192,16 +346,25 @@ impl SbatchRunner for ClairS {
             partition: Some("batch".into()),
             gres: None,
         }
+        .to_args()
     }
+}
 
-    fn sbatch_extra_args(&self) -> Vec<String> {
-        Vec::new()
+impl SbatchRunner for ClairS {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some(format!("clairs_{}", self.id)),
+            cpus_per_task: Some(self.config.clairs_threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("batch".into()),
+            gres: None,
+        }
     }
 }
 
 impl Run for ClairS {
     fn run(&mut self) -> anyhow::Result<()> {
-        self.run_local()?;
+        run!(&self.config, self)?;
         Ok(())
     }
 }
@@ -221,15 +384,14 @@ impl ClairS {
         }
     }
 
-    /// Post-processes ClairS output locally (concat SNV+indel, filter PASS).
-    fn postprocess_local(&self) -> anyhow::Result<()> {
-        self.process_germline_local()?;
-        self.process_somatic_local()?;
+    /// Post-processes ClairS output (concat SNV+indel, filter PASS).
+    fn postprocess(&self) -> anyhow::Result<()> {
+        self.process_germline()?;
+        self.process_somatic()?;
         Ok(())
     }
 
-    /// Processes germline VCF (PASS filter only).
-    fn process_germline_local(&self) -> anyhow::Result<()> {
+    fn process_germline(&self) -> anyhow::Result<()> {
         let germline_passed = self.germline_passed_vcf_path();
 
         if Path::new(&germline_passed).exists() {
@@ -240,12 +402,11 @@ impl ClairS {
             return Ok(());
         }
 
-        // Input path depends on whether this is a part run
         let germline_input = match self.part_index {
             Some(_) => {
                 let output_dir = self.part_output_dir();
-                let b = self.config.clairs_germline_normal_vcf(&self.id);
-                let base_name = Path::new(&b)
+                let base = self.config.clairs_germline_normal_vcf(&self.id);
+                let base_name = Path::new(&base)
                     .file_name()
                     .expect("Germline VCF should have filename")
                     .to_string_lossy();
@@ -262,7 +423,7 @@ impl ClairS {
         let mut cmd =
             BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
 
-        let report = <BcftoolsKeepPass as LocalRunner>::run(&mut cmd)
+        let report = run!(&self.config, &mut cmd)
             .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
 
         report
@@ -272,8 +433,7 @@ impl ClairS {
         Ok(())
     }
 
-    /// Processes somatic VCFs (concat SNV+indel, then PASS filter).
-    fn process_somatic_local(&self) -> anyhow::Result<()> {
+    fn process_somatic(&self) -> anyhow::Result<()> {
         let passed_vcf = self.somatic_passed_vcf_path();
 
         if Path::new(&passed_vcf).exists() {
@@ -309,7 +469,6 @@ impl ClairS {
             self.id, self.part_index
         );
 
-        // Create temp file for intermediate concat result
         let tmp_file = temp_file_path(".vcf.gz")?;
         let tmp_path = tmp_file
             .to_str()
@@ -323,7 +482,7 @@ impl ClairS {
             &tmp_path,
         );
 
-        let report = <BcftoolsConcat as LocalRunner>::run(&mut concat)
+        let report = run!(&self.config, &mut concat)
             .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
 
         report
@@ -334,7 +493,7 @@ impl ClairS {
         let mut keep_pass =
             BcftoolsKeepPass::from_config(&self.config, tmp_path.clone(), passed_vcf.clone());
 
-        let report = <BcftoolsKeepPass as LocalRunner>::run(&mut keep_pass)
+        let report = run!(&self.config, &mut keep_pass)
             .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
 
         report
@@ -346,163 +505,6 @@ impl ClairS {
         Ok(())
     }
 
-    /// Processes germline VCF via Slurm.
-    fn process_germline_sbatch(&self) -> anyhow::Result<()> {
-        let germline_passed = self.germline_passed_vcf_path();
-
-        if Path::new(&germline_passed).exists() {
-            debug!(
-                "ClairS germline PASS VCF already exists for {} part {:?}",
-                self.id, self.part_index
-            );
-            return Ok(());
-        }
-
-        let germline_input = match self.part_index {
-            Some(_) => {
-                let output_dir = self.part_output_dir();
-                let germline_file = self.config.clairs_germline_normal_vcf(&self.id);
-                let base_name = Path::new(&germline_file)
-                    .file_name()
-                    .expect("Germline VCF should have filename")
-                    .to_string_lossy();
-                format!("{output_dir}/{base_name}")
-            }
-            None => self.config.clairs_germline_normal_vcf(&self.id),
-        };
-
-        info!(
-            "Filtering germline PASS variants via Slurm for {} part {:?}",
-            self.id, self.part_index
-        );
-
-        let mut cmd =
-            BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
-
-        let report = SlurmRunner::run(&mut cmd)
-            .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
-
-        report
-            .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
-            .context("Failed to save germline PASS logs")?;
-
-        Ok(())
-    }
-
-    /// Processes somatic VCFs via Slurm.
-    fn process_somatic_sbatch(&self) -> anyhow::Result<()> {
-        let passed_vcf = self.somatic_passed_vcf_path();
-
-        if Path::new(&passed_vcf).exists() {
-            debug!(
-                "ClairS somatic PASS VCF already exists for {} part {:?}",
-                self.id, self.part_index
-            );
-            return Ok(());
-        }
-
-        let output_dir = self.part_output_dir();
-        let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
-
-        let snv_vcf = format!(
-            "{}/{}",
-            output_dir,
-            Path::new(&snv_vcf_base)
-                .file_name()
-                .expect("SNV VCF should have filename")
-                .to_string_lossy()
-        );
-        let indel_vcf = format!(
-            "{}/{}",
-            output_dir,
-            Path::new(&indel_vcf_base)
-                .file_name()
-                .expect("Indel VCF should have filename")
-                .to_string_lossy()
-        );
-
-        info!(
-            "Concatenating and filtering somatic variants via Slurm for {} part {:?}",
-            self.id, self.part_index
-        );
-
-        let tmp_file = temp_file_path(".vcf.gz")?;
-        let tmp_path = tmp_file
-            .to_str()
-            .context("Temp path not valid UTF-8")?
-            .to_string();
-
-        // Concat SNV + indel
-        let mut concat = BcftoolsConcat::from_config(
-            &self.config,
-            vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
-            &tmp_path,
-        );
-
-        let report = SlurmRunner::run(&mut concat)
-            .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
-
-        report
-            .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
-            .context("Failed to save concat logs")?;
-
-        // Filter PASS
-        let mut keep_pass =
-            BcftoolsKeepPass::from_config(&self.config, tmp_path.clone(), passed_vcf.clone());
-
-        let report = SlurmRunner::run(&mut keep_pass)
-            .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
-
-        report
-            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
-            .context("Failed to save PASS filter logs")?;
-
-        fs::remove_file(&tmp_path).context("Failed to remove temporary concat VCF")?;
-
-        Ok(())
-    }
-
-    /// Post-processes ClairS output via Slurm.
-    fn postprocess_sbatch(&self) -> anyhow::Result<()> {
-        self.process_germline_sbatch()?;
-        self.process_somatic_sbatch()?;
-        Ok(())
-    }
-
-    /// Runs ClairS locally with post-processing.
-    pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
-        if !self.should_run() {
-            debug!(
-                "ClairS output already up-to-date for {}, skipping.",
-                self.id
-            );
-            return Ok(CapturedOutput::default());
-        }
-
-        info!("Running ClairS locally for {}", self.id);
-        let out = <Self as LocalRunner>::run(self)?;
-
-        self.postprocess_local()?;
-        Ok(out)
-    }
-
-    /// Runs ClairS via Slurm with post-processing.
-    pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
-        if !self.should_run() {
-            debug!(
-                "ClairS output already up-to-date for {}, skipping.",
-                self.id
-            );
-            return Ok(CapturedOutput::default());
-        }
-
-        info!("Submitting ClairS via sbatch for {}", self.id);
-        let out = <Self as SbatchRunner>::run(self)?;
-
-        self.postprocess_sbatch()?;
-        Ok(out)
-    }
-
     /// Returns the per-part output directory.
     fn part_output_dir(&self) -> String {
         let base_dir = self.config.clairs_output_dir(&self.id);
@@ -709,7 +711,7 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
     );
 
     let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
-    SlurmRunner::run(&mut concat).context("Failed to run bcftools concat for ClairS parts")?;
+    run!(&base.config, &mut concat).context("Failed to run bcftools concat for ClairS parts")?;
 
     fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
 
@@ -721,7 +723,6 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
     Ok(())
 }
 
-/// Merges N chunked ClairS germline PASS VCFs into the final output.
 fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
     let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
 
@@ -753,7 +754,7 @@ fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<
     );
 
     let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
-    SlurmRunner::run(&mut concat)
+    run!(&base.config, &mut concat)
         .context("Failed to run bcftools concat for ClairS germline parts")?;
 
     fs::rename(&final_tmp, &final_passed_vcf)
@@ -767,26 +768,10 @@ fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<
     Ok(())
 }
 
-/// Runs ClairS in parallel chunks via Slurm, then merges results.
-///
-/// # Steps
-///
-/// 1. Split genome into N regions
-/// 2. Submit parallel Slurm jobs (one per region)
-/// 3. Post-process each part (concat SNV+indel, filter PASS)
-/// 4. Merge all part PASS VCFs into final output
-/// 5. Process germline VCF (from first part or full run)
+/// Runs ClairS in parallel chunks, then merges results.
 ///
-/// # Arguments
-///
-/// * `id` - Sample identifier
-/// * `config` - Pipeline configuration
-/// * `n_parts` - Target number of parallel jobs
-///
-/// # Returns
-///
-/// Vector of captured outputs from each Slurm job.
-pub fn run_clairs_chunked_sbatch_with_merge(
+/// Execution mode (local vs Slurm) is determined by `config.slurm_runner`.
+pub fn run_clairs_chunked_with_merge(
     id: &str,
     config: &Config,
     n_parts: usize,
@@ -800,15 +785,16 @@ pub fn run_clairs_chunked_sbatch_with_merge(
         return Ok(Vec::new());
     }
 
-    // Get genome sizes from normal BAM header
     let normal_bam = config.normal_bam(id);
     let reader = bam::Reader::from_path(&normal_bam)
         .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
     let header = bam::Header::from_template(reader.header());
     let genome_sizes = get_genome_sizes(&header)?;
+    let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
+        .into_iter()
+        .flatten()
+        .collect::<Vec<String>>();
 
-    // Split genome into regions
-    let regions = split_genome_into_n_regions(&genome_sizes, n_parts);
     let actual_n_parts = regions.len();
 
     info!(
@@ -816,7 +802,6 @@ pub fn run_clairs_chunked_sbatch_with_merge(
         actual_n_parts, id
     );
 
-    // Build jobs
     let mut jobs = Vec::with_capacity(actual_n_parts);
     for (i, region) in regions.into_iter().enumerate() {
         let mut job = base.clone();
@@ -827,18 +812,13 @@ pub fn run_clairs_chunked_sbatch_with_merge(
         jobs.push(job);
     }
 
-    // Run all parts via Slurm
-    let outputs = run_many_sbatch(jobs.clone())?;
+    let outputs = run_many!(config, jobs.clone())?;
 
-    // Post-process each part (creates .pass.vcf.gz files for both somatic and germline)
     for job in &jobs {
-        job.postprocess_sbatch()?;
+        job.postprocess()?;
     }
 
-    // Merge somatic PASS VCFs
     merge_clairs_parts(&base, actual_n_parts)?;
-
-    // Merge germline PASS VCFs
     merge_clairs_germline_parts(&base, actual_n_parts)?;
 
     info!(
@@ -869,10 +849,10 @@ mod tests {
     fn clairs_run() -> anyhow::Result<()> {
         test_init();
         let config = Config::default();
-        let clairs = ClairS::initialize("34528", &config)?;
-        info!("{clairs}");
+        // let clairs = ClairS::initialize("34528", &config)?;
+        // info!("{clairs}");
 
-        let outputs = run_clairs_chunked_sbatch_with_merge("34528", &config, 5)?;
+        let outputs = run_clairs_chunked_with_merge("34528", &config, 20)?;
         info!("Completed with {} job outputs", outputs.len());
 
         Ok(())

+ 165 - 265
src/callers/deep_variant.rs

@@ -1,3 +1,47 @@
+//! # DeepVariant Variant Calling Pipeline
+//!
+//! This module provides a pipeline runner for [DeepVariant](https://github.com/google/deepvariant),
+//! a deep learning-based variant caller.
+//!
+//! ## Overview
+//!
+//! DeepVariant performs variant calling on a single sample BAM file using:
+//!
+//! - CNN-based variant classification
+//! - Haploid calling for sex chromosomes (configurable by karyotype)
+//! - Containerized execution via Singularity/Apptainer
+//!
+//! ## Execution Modes
+//!
+//! Execution mode is automatically selected via `config.slurm_runner`:
+//!
+//! - **Local** — Single-node execution
+//! - **Slurm** — HPC job submission
+//!
+//! Both modes support chunked parallel execution via [`run_deepvariant_chunked_with_merge`].
+//!
+//! ## Output Files
+//!
+//! PASS-filtered variants:
+//! ```text
+//! {result_dir}/{id}/deepvariant/{id}_{time_point}_DeepVariant.pass.vcf.gz
+//! ```
+//!
+//! ## Usage
+//!
+//! ```ignore
+//! use crate::config::Config;
+//! use crate::pipes::deepvariant::run_deepvariant_chunked_with_merge;
+//!
+//! let config = Config::default();
+//! let outputs = run_deepvariant_chunked_with_merge("sample_001", "norm", &config, 30)?;
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ## References
+//!
+//! - [DeepVariant GitHub repository](https://github.com/google/deepvariant)
+//! - [DeepVariant publication (Nature Biotechnology, 2018)](https://doi.org/10.1038/nbt.4235)
 use anyhow::Context;
 use log::{debug, info};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
@@ -17,7 +61,7 @@ use crate::{
     },
     commands::{
         bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        run_many_sbatch, SlurmRunner,
+        LocalBatchRunner, SlurmRunner,
     },
     config::Config,
     helpers::{
@@ -25,8 +69,8 @@ use crate::{
     },
     io::vcf::read_vcf,
     pipes::{InitializeSolo, ShouldRun, Version},
+    run, run_many,
     runners::Run,
-    slurm_helpers::max_gpu_per_node,
     variant::{
         variant::{Label, Variants},
         variant_collection::VariantCollection,
@@ -36,7 +80,7 @@ use crate::{
 use crate::commands::{
     CapturedOutput,
     Command as JobCommand, // your trait
-    Runner as LocalRunner,
+    LocalRunner,
     SbatchRunner,
     SlurmParams,
 };
@@ -126,8 +170,6 @@ impl InitializeSolo for DeepVariant {
 
         let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?;
 
-        // WGSBamStats::load_j
-
         info!("Initializing DeepVariant for {id} {time_point}.");
 
         let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
@@ -239,7 +281,7 @@ impl JobCommand for DeepVariant {
         };
 
         format!(
-            "{singularity_bin} exec --nv \
+            "{singularity_bin} exec \
             --bind /mnt:/mnt \
             --bind /home/t_steimle:/home/t_steimle \
             --bind {output_dir}:/output \
@@ -276,177 +318,34 @@ impl JobCommand for DeepVariant {
     }
 }
 
-impl LocalRunner for DeepVariant {
-    // default shell() from trait ("bash") is fine
-}
+impl LocalRunner for DeepVariant {}
+impl LocalBatchRunner for DeepVariant {}
 
+impl SlurmRunner for DeepVariant {
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
+            cpus_per_task: Some(self.config.deepvariant_threads.into()),
+            mem: Some("40G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
 impl SbatchRunner for DeepVariant {
-    /// Returns Slurm parameters optimized for GPU-accelerated DeepVariant.
-    ///
-    /// Automatically selects between H100 and A100 GPUs based on availability,
-    /// preferring H100 for better performance.
-    ///
-    /// # Panics
-    ///
-    /// Panics if neither H100 nor A100 GPUs are configured in Slurm.
-    /// TODO: returning `Result`.
     fn slurm_params(&self) -> SlurmParams {
-        let gpu = if let (Some(h100_av), Some(a100_av)) =
-            (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
-        {
-            if h100_av > 0 {
-                "h100"
-            } else if a100_av > 0 {
-                "a100"
-            } else {
-                "h100" // Queue for H100 if none immediately available
-            }
-        } else {
-            panic!("Are you running slurm with a100 and h100 GPU ?");
-        };
         SlurmParams {
             job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
             cpus_per_task: Some(self.config.deepvariant_threads.into()),
-            mem: Some("60G".into()),
-            partition: Some("gpgpuq".into()),
-            gres: Some(format!("gpu:{gpu}:1")),
+            mem: Some("40G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
         }
     }
-
-    fn sbatch_extra_args(&self) -> Vec<String> {
-        // if you want anything like --time=... here
-        Vec::new()
-    }
 }
 
 impl DeepVariant {
-    /// Filters variants to keep only PASS entries using local execution.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if bcftools execution or log saving fails.
-    fn filter_pass_local(&self) -> anyhow::Result<()> {
-        let output_vcf = self.output_vcf_path();
-        let vcf_passed = self.passed_vcf_path();
-
-        // Skip if already exists (note: should never be the case if Should run was used)
-        if Path::new(&vcf_passed).exists() {
-            debug!("PASS VCF already exists: {vcf_passed}");
-            return Ok(());
-        }
-
-        info!(
-            "Filtering PASS variants locally for {} {} (part: {:?})",
-            self.id, self.time_point, self.part_index
-        );
-
-        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
-        let report = <BcftoolsKeepPass as LocalRunner>::run(&mut cmd)?;
-
-        report
-            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
-            .context("Can't save bcftools PASS logs")?;
-
-        Ok(())
-    }
-
-    /// Runs DeepVariant locally on the current machine.
-    ///
-    /// Executes the full pipeline:
-    /// 1. Checks if run is necessary via [`ShouldRun::should_run`]
-    /// 2. Runs DeepVariant via Singularity
-    /// 3. Filters to PASS variants only
-    ///
-    /// # Returns
-    ///
-    /// Captured stdout/stderr from the DeepVariant process, or an empty
-    /// [`CapturedOutput`] if skipped.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if DeepVariant execution or PASS filtering fails.
-    pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
-        if !self.should_run() {
-            debug!(
-                "DeepVariant output already up-to-date for {} {}, skipping.",
-                self.id, self.time_point
-            );
-            return Ok(CapturedOutput::default());
-        }
-
-        info!(
-            "Running DeepVariant locally for {} {}",
-            self.id, self.time_point
-        );
-        let out = <Self as LocalRunner>::run(self)?;
-
-        // local bcftools keep PASS
-        self.filter_pass_local()?;
-
-        Ok(out)
-    }
-
-    /// Filters variants to keep only PASS entries via Slurm.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if Slurm job submission or log saving fails.
-    fn filter_pass_sbatch(&self) -> anyhow::Result<()> {
-        let output_vcf = self.output_vcf_path();
-        let vcf_passed = self.passed_vcf_path();
-
-        if Path::new(&vcf_passed).exists() {
-            return Ok(());
-        }
-
-        info!(
-            "Filtering PASS variants via Slurm for {} {} (part: {:?})",
-            self.id, self.time_point, self.part_index
-        );
-
-        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
-        let report = SlurmRunner::run(&mut cmd)?;
-
-        report
-            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
-            .context("Can't save bcftools PASS logs")?;
-
-        Ok(())
-    }
-
-    /// Runs DeepVariant via Slurm batch submission.
-    ///
-    /// Submits the job and waits for completion (tailing slurm output).
-    /// Automatically runs PASS filtering after variant calling completes.
-    ///
-    /// # Returns
-    ///
-    /// Captured output from the Slurm job, or empty if skipped.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if job submission, execution, or filtering fails.
-    pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
-        if !self.should_run() {
-            debug!(
-                "DeepVariant output already up-to-date for {} {}, skipping.",
-                self.id, self.time_point
-            );
-            return Ok(CapturedOutput::default());
-        }
-
-        info!(
-            "Submitting DeepVariant via sbatch for {} {}",
-            self.id, self.time_point
-        );
-        let out = <Self as SbatchRunner>::run(self)?;
-
-        // Slurm bcftools keep PASS
-        self.filter_pass_sbatch()?;
-
-        Ok(out)
-    }
-
     /// Returns the per-part output directory.
     ///
     /// For partitioned runs, creates an isolated subdirectory to prevent
@@ -494,11 +393,32 @@ impl DeepVariant {
             }
         }
     }
+
+    /// Filters variants to keep only PASS entries.
+    fn filter_pass(&self) -> anyhow::Result<()> {
+        let output_vcf = self.output_vcf_path();
+        let vcf_passed = self.passed_vcf_path();
+
+        info!(
+            "Filtering PASS variants for {} {} (part: {:?})",
+            self.id, self.time_point, self.part_index
+        );
+
+        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
+        let report = run!(&self.config, &mut cmd)?;
+
+        report
+            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
+            .context("Can't save bcftools PASS logs")?;
+
+        Ok(())
+    }
 }
 
 impl Run for DeepVariant {
     fn run(&mut self) -> anyhow::Result<()> {
-        self.run_local()?;
+        run!(&self.config, self)?;
+        self.filter_pass()?;
         Ok(())
     }
 }
@@ -695,100 +615,9 @@ fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
         .to_string())
 }
 
-/// Run DeepVariant in 4 sbatch jobs (by regions), then merge their PASS VCFs
-/// into the final deepvariant_solo_passed_vcf().
-pub fn run_deepvariant_quartered_sbatch_with_merge(
-    id: &str,
-    time_point: &str,
-    config: &Config,
-) -> anyhow::Result<Vec<CapturedOutput>> {
-    let n_parts = 4;
-    let base = DeepVariant::initialize(id, time_point, config)?;
-
-    // if already up-to-date, nothing to do
-    if !base.should_run() {
-        debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
-        return Ok(Vec::new());
-    }
-
-    let bam_path = config.solo_bam(id, time_point);
-
-    // 1) split regions into 4 chunks
-    let reader = bam::Reader::from_path(&bam_path)
-        .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
-    let header = bam::Header::from_template(reader.header());
-    let genome_sizes = get_genome_sizes(&header)?;
-
-    // Split genome into regions
-    let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
-        .into_iter()
-        .map(|v| v.join(" "))
-        .collect::<Vec<String>>();
-
-    let n_parts = region_chunks.len();
-
-    // Build parallel jobs
-    let mut jobs = Vec::with_capacity(n_parts);
-    for (i, regions) in region_chunks.into_iter().enumerate() {
-        let mut job = base.clone();
-        job.regions = regions;
-        job.part_index = Some(i + 1);
-        job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
-        info!("Planned job:\n{job}");
-        jobs.push(job);
-    }
-
-    // 3) run them concurrently on Slurm
-    let outputs = run_many_sbatch(jobs)?;
-
-    // 4) merge PASS VCFs
-    merge_deepvariant_parts(&base, n_parts)?;
-
-    // we don’t really have a single merged CapturedOutput;
-    // return an empty one or synthesize something from `outputs`.
-    Ok(outputs)
-    // Ok(vec![])
-}
-
-/// Merges PASS-filtered VCFs from parallel DeepVariant runs.
-///
-/// Uses `bcftools concat` to combine part VCFs in order, then atomically
-/// renames the result to the final output path. All part directories are
-/// preserved, including VCF stats reports and intermediate files.
-///
-/// # Arguments
-///
-/// * `base` - Base DeepVariant instance (without `part_index`)
-/// * `n_parts` - Number of parts to merge
-///
-/// # Directory Structure After Completion
-///
-/// ```text
-/// {output_dir}/
-/// ├── part1/
-/// │   ├── {id}_{time_point}_DeepVariant.part1.vcf.gz
-/// │   ├── {id}_{time_point}_DeepVariant.part1.pass.vcf.gz
-/// │   ├── {id}_{time_point}_DeepVariant.part1.vcf.gz.visual_report.html
-/// │   └── {id}_{time_point}_DeepVariant_part1_logs/
-/// ├── part2/
-/// │   └── ...
-/// ├── part3/
-/// │   └── ...
-/// ├── part4/
-/// │   └── ...
-/// └── {id}_{time_point}_DeepVariant.pass.vcf.gz  # Final merged output
-/// ```
-///
-/// # Errors
-///
-/// Returns an error if:
-/// - Any part VCF is missing
-/// - `bcftools concat` fails
-/// - File rename fails
 fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
     let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
 
-    // Collect and verify all part PASS VCFs exist
     for i in 1..=n_parts {
         let mut dv = base.clone();
         dv.part_index = Some(i);
@@ -802,13 +631,11 @@ fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result
         part_pass_paths.push(PathBuf::from(part_pass));
     }
 
-    // Final merged PASS VCF path
     let final_passed_vcf = base
         .config
         .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
     let final_tmp = format!("{final_passed_vcf}.tmp");
 
-    // Ensure output directory exists
     if let Some(parent) = Path::new(&final_passed_vcf).parent() {
         fs::create_dir_all(parent)?;
     }
@@ -818,12 +645,10 @@ fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result
         n_parts, final_passed_vcf
     );
 
-    // Concatenate via bcftools
     let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
+    run!(&base.config, &mut concat)
+        .context("Failed to run bcftools concat for DeepVariant parts")?;
 
-    SlurmRunner::run(&mut concat).context("Failed to run bcftools concat for DeepVariant parts")?;
-
-    // Atomic rename to final path
     fs::rename(&final_tmp, &final_passed_vcf)
         .context("Failed to rename merged DeepVariant PASS VCF")?;
 
@@ -835,6 +660,79 @@ fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result
     Ok(())
 }
 
+/// Runs DeepVariant in parallel chunks, then merges results.
+///
+/// Execution mode (local vs Slurm) is determined by `config.slurm_runner`.
+pub fn run_deepvariant_chunked_with_merge(
+    id: &str,
+    time_point: &str,
+    config: &Config,
+    n_parts: usize,
+) -> anyhow::Result<Vec<CapturedOutput>> {
+    anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
+
+    let base = DeepVariant::initialize(id, time_point, config)?;
+
+    if !base.should_run() {
+        debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
+        return Ok(Vec::new());
+    }
+
+    let bam_path = config.solo_bam(id, time_point);
+    let reader = bam::Reader::from_path(&bam_path)
+        .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
+    let header = bam::Header::from_template(reader.header());
+    let genome_sizes = get_genome_sizes(&header)?;
+
+    let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
+        .into_iter()
+        .map(|v| v.join(" "))
+        .collect::<Vec<String>>();
+
+    let actual_n_parts = region_chunks.len();
+
+    info!(
+        "Running DeepVariant in {} parallel parts for {} {}",
+        actual_n_parts, id, time_point
+    );
+
+    let mut jobs = Vec::with_capacity(actual_n_parts);
+    for (i, regions) in region_chunks.into_iter().enumerate() {
+        let mut job = base.clone();
+        job.regions = regions;
+        job.part_index = Some(i + 1);
+        job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
+        info!("Planned DeepVariant job:\n{job}");
+        jobs.push(job);
+    }
+
+    // Run all DeepVariant jobs
+    let outputs = run_many!(config, jobs.clone())?;
+
+    // Filter PASS variants for each part
+    info!(
+        "Filtering PASS variants for all {} parts in parallel",
+        actual_n_parts
+    );
+    let filter_jobs: Vec<_> = jobs
+        .iter()
+        .map(|job| {
+            BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
+        })
+        .collect();
+    run_many!(config, filter_jobs)?;
+
+    // Merge PASS VCFs
+    merge_deepvariant_parts(&base, actual_n_parts)?;
+
+    info!(
+        "DeepVariant completed for {} {}: {} parts merged",
+        id, time_point, actual_n_parts
+    );
+
+    Ok(outputs)
+}
+
 #[cfg(test)]
 mod tests {
     use crate::helpers::test_init;
@@ -857,7 +755,9 @@ mod tests {
         test_init();
         let config = Config::default();
 
-        let _ = run_deepvariant_quartered_sbatch_with_merge("34528", "norm", &config)?;
+        for id in ["36167", "36434", "36480"] {
+            let _ = run_deepvariant_chunked_with_merge(id, "norm", &config, 30)?;
+        }
         Ok(())
     }
 }

+ 66 - 35
src/collection/bam_stats.rs

@@ -67,9 +67,21 @@ pub struct KaryotypeCall {
 impl fmt::Display for KaryotypeCall {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         writeln!(f, "Karyotype call: {}", self.karyotype)?;
-        writeln!(f, "  Autosomal mean coverage: {:.2}x", self.autosomal_mean_cov)?;
-        writeln!(f, "  chrX coverage: {:.2}x (ratio: {:.3})", self.chrx_cov, self.chrx_ratio)?;
-        write!(f, "  chrY coverage: {:.2}x (ratio: {:.3})", self.chry_cov, self.chry_ratio)
+        writeln!(
+            f,
+            "  Autosomal mean coverage: {:.2}x",
+            self.autosomal_mean_cov
+        )?;
+        writeln!(
+            f,
+            "  chrX coverage: {:.2}x (ratio: {:.3})",
+            self.chrx_cov, self.chrx_ratio
+        )?;
+        write!(
+            f,
+            "  chrY coverage: {:.2}x (ratio: {:.3})",
+            self.chry_cov, self.chry_ratio
+        )
     }
 }
 
@@ -337,7 +349,7 @@ pub struct WGSBamStats {
     pub flag_stats: FlagStats,
 }
 
-impl WGSBamStats { 
+impl WGSBamStats {
     /// Opens BAM stats from cache or computes from BAM if needed.
     ///
     /// Uses the BAM file path to derive the cache location.
@@ -394,10 +406,6 @@ impl WGSBamStats {
         Ok(stats)
     }
 
-    // =========================================================================
-    // Karyotype Inference
-    // =========================================================================
-
     /// Infers biological karyotype (XX or XY) from sex chromosome coverage.
     ///
     /// # Algorithm
@@ -405,8 +413,8 @@ impl WGSBamStats {
     /// 1. Compute mean coverage across autosomes (chr1-22)
     /// 2. Compute chrX and chrY coverage ratios relative to autosomal mean
     /// 3. Classify:
-    ///    - chrY ratio < 0.1 and chrX ratio > 0.7 → XX
-    ///    - chrY ratio > 0.3 and chrX ratio 0.3-0.7 → XY
+    ///    - chrY ratio < 0.05 and chrX ratio > 0.75 → XX
+    ///    - chrY ratio > 0.1 and chrX ratio 0.35-0.65 → XY
     ///    - Otherwise → Error (ambiguous)
     ///
     /// # Expected Ratios
@@ -414,7 +422,7 @@ impl WGSBamStats {
     /// | Karyotype | chrX ratio | chrY ratio |
     /// |-----------|------------|------------|
     /// | XX        | ~1.0       | ~0.0       |
-    /// | XY        | ~0.5       | ~0.5       |
+    /// | XY        | ~0.5       | 0.1-0.5    | (Y often has lower coverage)
     ///
     /// # Errors
     ///
@@ -429,33 +437,46 @@ impl WGSBamStats {
             })
             .filter(|&cov| cov > 0.0)
             .collect();
-
         anyhow::ensure!(
             !autosomal_covs.is_empty(),
             "No autosomal coverage data available for karyotype inference"
         );
-
         let autosomal_mean_cov = autosomal_covs.iter().sum::<f64>() / autosomal_covs.len() as f64;
-
         anyhow::ensure!(
             autosomal_mean_cov > 0.0,
             "Autosomal mean coverage is zero, cannot infer karyotype"
         );
-
         let chrx_cov = self.get_contig_coverage("chrX")?;
         let chry_cov = self.get_contig_coverage("chrY")?;
-
         let chrx_ratio = chrx_cov / autosomal_mean_cov;
         let chry_ratio = chry_cov / autosomal_mean_cov;
 
-        let karyotype = if chry_ratio < 0.1 && chrx_ratio > 0.7 {
+        // Adjusted thresholds based on real-world data
+        const XX_CHRX_MIN: f64 = 0.75; // XX: high X coverage
+        const XX_CHRY_MAX: f64 = 0.05; // XX: minimal/no Y
+        const XY_CHRX_MIN: f64 = 0.35; // XY: X around 0.5 (allow wider range)
+        const XY_CHRX_MAX: f64 = 0.65; // XY: X around 0.5 (allow wider range)
+        const XY_CHRY_MIN: f64 = 0.1; // XY: Y present (lowered from 0.3)
+
+        let karyotype = if chry_ratio < XX_CHRY_MAX && chrx_ratio > XX_CHRX_MIN {
+            // Clear XX: high X, essentially no Y
             Karyotype::XX
-        } else if chry_ratio > 0.3 && (0.3..0.7).contains(&chrx_ratio) {
+        } else if chry_ratio > XY_CHRY_MIN && (XY_CHRX_MIN..=XY_CHRX_MAX).contains(&chrx_ratio) {
+            // Clear XY: X around 0.5, Y present
+            Karyotype::XY
+        } else if chry_ratio > 0.05 && (XY_CHRX_MIN..=XY_CHRX_MAX).contains(&chrx_ratio) {
+            // Borderline XY: X in male range, Y detectable but low coverage
+            log::warn!(
+                "Low Y chromosome coverage detected (ratio={:.3}), but X ratio ({:.3}) \
+             and Y presence suggest XY karyotype",
+                chry_ratio,
+                chrx_ratio
+            );
             Karyotype::XY
         } else {
             anyhow::bail!(
                 "Ambiguous karyotype: chrX ratio = {:.3}, chrY ratio = {:.3}. \
-                 Expected XX (chrX ~1.0, chrY ~0.0) or XY (chrX ~0.5, chrY ~0.5)",
+             Expected XX (chrX ~1.0, chrY ~0.0) or XY (chrX ~0.5, chrY >0.1)",
                 chrx_ratio,
                 chry_ratio
             );
@@ -618,10 +639,10 @@ impl WGSBamStats {
 
         let mut bam = rust_htslib::bam::Reader::from_path(bam_path)
             .with_context(|| format!("Failed to open BAM: {}", bam_path.display()))?;
-        
+
         let header_view = bam.header().clone();
         let header = rust_htslib::bam::Header::from_template(&header_view);
-        
+
         bam.set_threads(bam_n_threads as usize)
             .context("Failed to set BAM reader threads")?;
 
@@ -759,7 +780,14 @@ impl WGSBamStats {
                     0.0
                 };
 
-                (*tid, contig_len, contig, mapped_sum, mean_cov, coverage_ratio)
+                (
+                    *tid,
+                    contig_len,
+                    contig,
+                    mapped_sum,
+                    mean_cov,
+                    coverage_ratio,
+                )
             })
             .collect();
 
@@ -769,7 +797,10 @@ impl WGSBamStats {
         let mut by_rg: Vec<RGStats> = rg_stats
             .into_iter()
             .map(|(h, (count, yield_sum))| RGStats {
-                rg_id: rg_names.get(&h).cloned().unwrap_or_else(|| "unknown".into()),
+                rg_id: rg_names
+                    .get(&h)
+                    .cloned()
+                    .unwrap_or_else(|| "unknown".into()),
                 n_reads: count,
                 mapped_yield: yield_sum,
                 mean_read_length: if count > 0 {
@@ -785,7 +816,10 @@ impl WGSBamStats {
         let mut by_fn: Vec<FnStats> = fn_stats
             .into_iter()
             .map(|(h, (count, yield_sum))| FnStats {
-                filename: fn_names.get(&h).cloned().unwrap_or_else(|| "unknown".into()),
+                filename: fn_names
+                    .get(&h)
+                    .cloned()
+                    .unwrap_or_else(|| "unknown".into()),
                 n_reads: count,
                 mapped_yield: yield_sum,
                 mean_read_length: if count > 0 {
@@ -894,7 +928,6 @@ pub fn n50_from_hist(hist: &BTreeMap<u64, u64>, mapped_yield: u64) -> u64 {
     0
 }
 
-
 // =============================================================================
 // Display Implementations
 // =============================================================================
@@ -1015,10 +1048,7 @@ impl fmt::Display for WGSBamStats {
             };
             let bar_len = (*count as f64 / max_flag as f64 * 30.0) as usize;
             let bar: String = "█".repeat(bar_len);
-            writeln!(
-                f,
-                "    {name:<14} │{bar:<30}│ {count:>12} ({pct:>6.2}%)"
-            )?;
+            writeln!(f, "    {name:<14} │{bar:<30}│ {count:>12} ({pct:>6.2}%)")?;
         }
 
         // Read groups
@@ -1197,11 +1227,7 @@ impl QNameSet {
 
     /// Returns the intersection of two sets with overlap fraction.
     pub fn intersect(&self, other: &Self) -> (Self, f64) {
-        let intersection: FxHashSet<_> = self
-            .qnames
-            .intersection(&other.qnames)
-            .cloned()
-            .collect();
+        let intersection: FxHashSet<_> = self.qnames.intersection(&other.qnames).cloned().collect();
 
         let frac = if self.qnames.is_empty() {
             0.0
@@ -1209,7 +1235,12 @@ impl QNameSet {
             intersection.len() as f64 / self.qnames.len() as f64
         };
 
-        (Self { qnames: intersection }, frac)
+        (
+            Self {
+                qnames: intersection,
+            },
+            frac,
+        )
     }
 
     /// Saves the set to a binary file (16 bytes per QNAME).

File diff suppressed because it is too large
+ 927 - 289
src/collection/prom_run.rs


+ 40 - 10
src/commands/bcftools.rs

@@ -101,7 +101,8 @@ impl super::Command for BcftoolsKeepPass {
     }
 }
 
-impl super::Runner for BcftoolsKeepPass {}
+impl super::LocalRunner for BcftoolsKeepPass {}
+impl super::LocalBatchRunner for BcftoolsKeepPass {}
 
 impl super::SlurmRunner for BcftoolsKeepPass {
     /// Slurm resource request for the `bcftools keep pass` job.
@@ -109,7 +110,7 @@ impl super::SlurmRunner for BcftoolsKeepPass {
         SlurmParams {
             job_name: Some("bcftools_keep_pass".into()),
             cpus_per_task: Some(self.threads as u32),
-            mem: Some("45G".into()),
+            mem: Some("30G".into()),
             partition: Some("shortq".into()),
             gres: None,
         }
@@ -117,6 +118,18 @@ impl super::SlurmRunner for BcftoolsKeepPass {
     }
 }
 
+impl super::SbatchRunner for BcftoolsKeepPass {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("bcftools_keep_pass".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("30G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
 /// Sorts, normalizes and keeps only *precise* `FILTER=PASS` variants.
 ///
 /// Differs from [`BcftoolsKeepPass`] by excluding imprecise calls using
@@ -196,7 +209,7 @@ impl super::Command for BcftoolsKeepPassPrecise {
     }
 }
 
-impl super::Runner for BcftoolsKeepPassPrecise {}
+impl super::LocalRunner for BcftoolsKeepPassPrecise {}
 
 impl super::SlurmRunner for BcftoolsKeepPassPrecise {
     /// Slurm resource request for the `bcftools keep pass precise` job.
@@ -215,7 +228,9 @@ impl super::SlurmRunner for BcftoolsKeepPassPrecise {
 /// Concatenates multiple VCF/BCF files with `bcftools concat`.
 ///
 /// Builds a temporary list file containing all inputs, then runs
-/// `bcftools concat -a -D -f list --write-index` to produce `output`.
+/// `bcftools concat` piped through `bcftools sort` to ensure proper ordering.
+///
+/// The output is automatically indexed after concatenation and sorting.
 #[derive(Debug)]
 pub struct BcftoolsConcat {
     /// Path to the `bcftools` binary.
@@ -277,10 +292,13 @@ impl super::Command for BcftoolsConcat {
         Ok(())
     }
 
-    /// Returns the shell command that runs `bcftools concat` and removes the list file.
+    /// Returns the shell command that runs `bcftools concat`, sorts, and indexes the output.
     fn cmd(&self) -> String {
         format!(
-            "{bin} concat --write-index --threads {threads} -a -D -f {list} -Oz -o {output} && rm -f {list}",
+            "{bin} concat --threads {threads} -a -D -f {list} -Oz | \
+             {bin} sort -Oz -o {output} && \
+             {bin} index --threads {threads} {output} && \
+             rm -f {list}",
             bin = self.bin,
             threads = self.threads,
             list = self.tmp_list.display(),
@@ -289,7 +307,7 @@ impl super::Command for BcftoolsConcat {
     }
 }
 
-impl super::Runner for BcftoolsConcat {}
+impl super::LocalRunner for BcftoolsConcat {}
 
 impl super::SlurmRunner for BcftoolsConcat {
     // Slurm resource request for the `bcftools concat` job.
@@ -297,7 +315,7 @@ impl super::SlurmRunner for BcftoolsConcat {
         SlurmParams {
             job_name: Some("bcftools_concat".into()),
             cpus_per_task: Some(self.threads as u32),
-            mem: Some("45G".into()),
+            mem: Some("30G".into()),
             partition: Some("shortq".into()),
             gres: None,
         }
@@ -305,6 +323,18 @@ impl super::SlurmRunner for BcftoolsConcat {
     }
 }
 
+impl super::SbatchRunner for BcftoolsConcat {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("bcftools_concat".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("30G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
 /// Indexes a VCF/BCF file with `bcftools index`.
 #[derive(Debug)]
 pub struct BcftoolsIndex {
@@ -347,7 +377,7 @@ impl super::Command for BcftoolsIndex {
     }
 }
 
-impl super::Runner for BcftoolsIndex {}
+impl super::LocalRunner for BcftoolsIndex {}
 impl super::SlurmRunner for BcftoolsIndex {
     /// Slurm resource request for the `bcftools index` job.
     fn slurm_args(&self) -> Vec<String> {
@@ -418,7 +448,7 @@ impl super::Command for BcftoolsCompress {
     }
 }
 
-impl super::Runner for BcftoolsCompress {}
+impl super::LocalRunner for BcftoolsCompress {}
 impl super::SlurmRunner for BcftoolsCompress {
     /// Slurm resource request for the `bcftools compress` job.
     fn slurm_args(&self) -> Vec<String> {

+ 5 - 0
src/commands/dorado.rs

@@ -170,6 +170,8 @@ impl super::SlurmRunner for DoradoBasecall {
     }
 }
 
+impl super::LocalRunner for DoradoBasecall {}
+
 /// Run Dorado alignment using a reference FASTA and an input BAM.
 ///
 /// This command:
@@ -251,6 +253,9 @@ impl super::Command for DoradoAlign {
     }
 }
 
+impl super::LocalRunner for DoradoAlign {}
+impl super::LocalBatchRunner for DoradoAlign {}
+
 impl super::SlurmRunner for DoradoAlign {
     /// Default Slurm parameters for running the Dorado aligner.
     fn slurm_args(&self) -> Vec<String> {

+ 228 - 11
src/commands/mod.rs

@@ -11,7 +11,7 @@ use std::{
     io::{self, BufReader, Read, Seek, SeekFrom, Write},
     path::{Path, PathBuf},
     process::{Command as ProcessCommand, Stdio},
-    sync::{Arc, Mutex, mpsc},
+    sync::{mpsc, Arc, Mutex},
     thread,
     time::Duration,
 };
@@ -40,7 +40,7 @@ pub trait Command {
 /// ```ignore
 /// my_command.run()?;
 /// ```
-pub trait Runner: Command {
+pub trait LocalRunner: Command {
     /// The shell binary used to run commands.  
     /// Override if you want to use `sh` or something else.
     fn shell(&self) -> &str {
@@ -157,6 +157,7 @@ pub trait Runner: Command {
 
 use anyhow::Context;
 use log::info;
+use rayon::iter::{IntoParallelIterator, ParallelIterator};
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 
@@ -403,9 +404,6 @@ impl SlurmParams {
         args
     }
 }
-fn printable(s: &str) -> String {
-    s.chars().filter(|c| !c.is_control()).collect()
-}
 
 pub trait SbatchRunner: Command {
     fn sbatch_bin(&self) -> &str {
@@ -490,10 +488,15 @@ pub trait SbatchRunner: Command {
                             let mut remaining = Vec::new();
                             if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
                                 let s = String::from_utf8_lossy(&remaining);
-                                s.split("\n")
-                                    .map(printable)
-                                    .filter(|v| !v.is_empty())
-                                    .for_each(|v| info!("{job_id_clone}: {v}"));
+                                let cleaned = clean_output(&s);
+
+                                for line in cleaned.lines() {
+                                    let line = line.trim();
+                                    if !line.is_empty() && !is_progress_line(line) {
+                                        info!("{}: {}", job_id_clone, line);
+                                    }
+                                }
+
                                 if let Ok(mut c) = captured_stdout_clone.lock() {
                                     c.push_str(&s);
                                 }
@@ -523,8 +526,15 @@ pub trait SbatchRunner: Command {
                                     let bytes = &buf[..n];
                                     // forward to terminal
                                     let s = String::from_utf8_lossy(bytes);
-                                    s.split("\n").for_each(|v| info!("{job_id_clone}: {v}"));
-                                    // print!("{s}");
+                                    let cleaned = clean_output(&s);
+
+                                    // Log non-progress lines
+                                    for line in cleaned.lines() {
+                                        let line = line.trim();
+                                        if !line.is_empty() && !is_progress_line(line) {
+                                            info!("{}: {}", job_id_clone, line);
+                                        }
+                                    }
                                     io::stdout().flush().ok();
                                     // capture
                                     if let Ok(mut c) = captured_stdout_clone.lock() {
@@ -624,6 +634,38 @@ pub trait SbatchRunner: Command {
     }
 }
 
+#[macro_export]
+macro_rules! run {
+    ($cfg:expr, $cmd:expr) => {{
+        if $cfg.slurm_runner {
+            $crate::commands::SlurmRunner::run($cmd)
+        } else {
+            $crate::commands::LocalRunner::run($cmd)
+        }
+    }};
+}
+
+/// Clean output by handling carriage returns properly.
+///
+/// When a line contains \r (carriage return), only the text after the last \r
+/// is kept, simulating terminal behavior where \r moves cursor to start of line.
+fn clean_output(s: &str) -> String {
+    s.split('\n')
+        .map(|line| {
+            // For lines with \r, keep only the last segment (what's visible after all \r overwrites)
+            line.split('\r').next_back().unwrap_or("")
+        })
+        .collect::<Vec<_>>()
+        .join("\n")
+}
+
+/// Check if a line is a progress indicator that should be filtered from logs.
+fn is_progress_line(line: &str) -> bool {
+    line.starts_with("> Output records written:")
+        || line.starts_with("> Processing")
+        || (line.starts_with('>') && line.contains("records"))
+}
+
 use std::collections::HashMap;
 
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -777,6 +819,181 @@ where
     Ok(results)
 }
 
+/// Local batch runner: execute the command directly on the machine,
+/// mimicking SbatchRunner behavior (output to file, async monitoring).
+pub trait LocalBatchRunner: Command {
+    fn shell(&self) -> &str {
+        "bash"
+    }
+
+    /// Output file pattern. Use `{}` as placeholder for job ID (UUID).
+    fn output_pattern(&self) -> &str {
+        "local-{}.out"
+    }
+
+    fn run(&mut self) -> anyhow::Result<CapturedOutput> {
+        self.init()?;
+
+        let job_id = Uuid::new_v4()
+            .to_string()
+            .split('-')
+            .next()
+            .unwrap_or("local")
+            .to_string();
+        let out_file = self.output_pattern().replace("{}", &job_id);
+
+        info!("Running local batch job: {job_id}");
+
+        let out_file_handle =
+            File::create(&out_file).with_context(|| format!("failed to create output file: {out_file}"))?;
+
+        let out_file_write = out_file_handle
+            .try_clone()
+            .context("failed to clone file handle for stderr")?;
+
+        let mut child = ProcessCommand::new(self.shell())
+            .arg("-c")
+            .arg(self.cmd())
+            .stdout(Stdio::from(out_file_handle))
+            .stderr(Stdio::from(out_file_write))
+            .spawn()
+            .context("failed to spawn local batch command")?;
+
+        let (stop_tx, stop_rx) = mpsc::channel::<()>();
+        let out_path = out_file.clone();
+        let captured_stdout = Arc::new(Mutex::new(String::new()));
+        let captured_stdout_clone = Arc::clone(&captured_stdout);
+        let job_id_clone = job_id.clone();
+
+        let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
+            let mut pos: u64 = 0;
+
+            loop {
+                if stop_rx.try_recv().is_ok() {
+                    if let Ok(mut f) = File::open(&out_path) {
+                        if f.seek(SeekFrom::Start(pos)).is_ok() {
+                            let mut remaining = Vec::new();
+                            if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
+                                let s = String::from_utf8_lossy(&remaining);
+                                let cleaned = clean_output(&s);
+
+                                for line in cleaned.lines() {
+                                    let line = line.trim();
+                                    if !line.is_empty() && !is_progress_line(line) {
+                                        info!("{}: {}", job_id_clone, line);
+                                    }
+                                }
+
+                                if let Ok(mut c) = captured_stdout_clone.lock() {
+                                    c.push_str(&s);
+                                }
+                            }
+                        }
+                    }
+                    break;
+                }
+
+                if let Ok(mut f) = File::open(&out_path) {
+                    if f.seek(SeekFrom::Start(pos)).is_ok() {
+                        let mut buf = [0u8; 8192];
+                        loop {
+                            match f.read(&mut buf) {
+                                Ok(0) => break,
+                                Ok(n) => {
+                                    pos += n as u64;
+                                    let s = String::from_utf8_lossy(&buf[..n]);
+                                    let cleaned = clean_output(&s);
+
+                                    for line in cleaned.lines() {
+                                        let line = line.trim();
+                                        if !line.is_empty() && !is_progress_line(line) {
+                                            info!("{}: {}", job_id_clone, line);
+                                        }
+                                    }
+
+                                    if let Ok(mut c) = captured_stdout_clone.lock() {
+                                        c.push_str(&s);
+                                    }
+                                }
+                                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
+                                Err(_) => break,
+                            }
+                        }
+                    }
+                }
+
+                thread::sleep(Duration::from_millis(100));
+            }
+
+            Ok(())
+        });
+
+        let status = child.wait().context("failed to wait on local batch command")?;
+
+        let _ = stop_tx.send(());
+        match tail_handle.join() {
+            Ok(Ok(())) => {}
+            Ok(Err(e)) => return Err(e.context("tail thread failed")),
+            Err(_) => anyhow::bail!("tail thread panicked"),
+        }
+
+        if !status.success() {
+            anyhow::bail!("local batch command failed with status: {status}");
+        }
+
+        self.clean_up()?;
+
+        let stdout = match Arc::try_unwrap(captured_stdout) {
+            Ok(mutex) => mutex.into_inner().unwrap_or_default(),
+            Err(arc) => arc.lock().unwrap_or_else(|e| e.into_inner()).clone(),
+        };
+
+        let _ = std::fs::remove_file(&out_file);
+
+        Ok(CapturedOutput {
+            stdout,
+            stderr: String::new(),
+            slurm_epilog: None,
+        })
+    }
+}
+
+pub fn run_many_local_batch<T>(jobs: Vec<T>, threads: usize) -> anyhow::Result<Vec<CapturedOutput>>
+where
+    T: LocalBatchRunner + Send,
+{
+    // Set thread pool size based on max_concurrent
+    let pool = rayon::ThreadPoolBuilder::new()
+        .num_threads(threads)
+        .build()
+        .context("failed to build thread pool")?;
+
+    pool.install(|| {
+        jobs.into_par_iter()
+            .map(|mut job| LocalBatchRunner::run(&mut job))
+            .collect()
+    })
+}
+
+/// Macro to run multiple batch commands in parallel, either via SLURM or locally.
+///
+/// Usage:
+/// ```ignore
+/// let results = run_many!(cfg, jobs)?;
+///
+/// ```
+#[macro_export]
+macro_rules! run_many {
+    ($cfg:expr, $jobs:expr) => {{
+        if $cfg.slurm_runner {
+            $crate::commands::run_many_sbatch($jobs)
+        } else {
+            $crate::commands::run_many_local_batch($jobs, $cfg.threads.into())
+        }
+    }}
+}
+
+
 #[cfg(test)]
 mod tests {
     use super::{Command, SbatchRunner, SlurmParams};

+ 41 - 45
src/commands/samtools.rs

@@ -1,3 +1,5 @@
+//! samtools.rs
+//!
 use std::{
     fs,
     path::{Path, PathBuf},
@@ -9,7 +11,7 @@ use uuid::Uuid;
 
 use crate::{
     collection::bam_stats::{QNameSet, WGSBamStats},
-    commands::{SbatchRunner, SlurmParams},
+    commands::{LocalBatchRunner, SbatchRunner, SlurmParams},
     config::Config,
 };
 
@@ -38,14 +40,14 @@ impl super::Command for SamtoolsIndex {
     }
 }
 
-impl super::Runner for SamtoolsIndex {}
+impl super::LocalRunner for SamtoolsIndex {}
 
 impl super::SlurmRunner for SamtoolsIndex {
     fn slurm_args(&self) -> Vec<String> {
         SlurmParams {
             job_name: Some("samtools_index".into()),
             cpus_per_task: Some(self.threads as u32),
-            mem: Some("60G".into()),
+            mem: Some("20G".into()),
             partition: Some("shortq".into()),
             gres: None,
         }
@@ -53,6 +55,18 @@ impl super::SlurmRunner for SamtoolsIndex {
     }
 }
 
+impl super::SbatchRunner for SamtoolsIndex {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("samtools_index".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("20G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
 /// Wrapper around a `samtools merge` invocation used to append one BAM into
 /// another while preserving read group (RG) uniqueness.
 ///
@@ -243,7 +257,7 @@ impl super::Command for SamtoolsMerge {
     }
 }
 
-impl super::Runner for SamtoolsMerge {}
+impl super::LocalRunner for SamtoolsMerge {}
 
 impl super::SlurmRunner for SamtoolsMerge {
     fn slurm_args(&self) -> Vec<String> {
@@ -299,10 +313,7 @@ impl super::Command for SamtoolsMergeMany {
         }
 
         if self.into.exists() {
-            anyhow::bail!(
-                "Destination BAM already exists: {}",
-                self.into.display()
-            );
+            anyhow::bail!("Destination BAM already exists: {}", self.into.display());
         }
 
         // Create output directory if needed
@@ -316,11 +327,7 @@ impl super::Command for SamtoolsMergeMany {
 
         // Check all input BAMs exist
         for bam in &self.bams {
-            anyhow::ensure!(
-                bam.exists(),
-                "BAM file doesn't exist: {}",
-                bam.display()
-            );
+            anyhow::ensure!(bam.exists(), "BAM file doesn't exist: {}", bam.display());
         }
 
         // Write BAM list file
@@ -334,8 +341,7 @@ impl super::Command for SamtoolsMergeMany {
             .with_context(|| format!("Failed to create BAM list file: {}", list_file.display()))?;
 
         for bam in &self.bams {
-            writeln!(file, "{}", bam.display())
-                .context("Failed to write to BAM list file")?;
+            writeln!(file, "{}", bam.display()).context("Failed to write to BAM list file")?;
         }
 
         self.bam_list_file = Some(list_file);
@@ -344,11 +350,8 @@ impl super::Command for SamtoolsMergeMany {
         if self.bams.len() > 1 {
             self.check_qname_collisions()?;
         } else {
-            let qset = QNameSet::load_or_create(
-                &self.bams[0],
-                self.config.bam_min_mapq,
-                self.threads,
-            )?;
+            let qset =
+                QNameSet::load_or_create(&self.bams[0], self.config.bam_min_mapq, self.threads)?;
             qset.save(WGSBamStats::qnames_path_from_bam(&self.into))?;
         }
 
@@ -389,20 +392,13 @@ impl SamtoolsMergeMany {
         let mut iter = self.bams.iter();
         let first_bam = iter.next().expect("checked non-empty");
 
-        let mut union_qset = QNameSet::load_or_create(
-            first_bam,
-            self.config.bam_min_mapq,
-            self.threads,
-        )
-        .with_context(|| format!("Failed to load qnames for {}", first_bam.display()))?;
+        let mut union_qset =
+            QNameSet::load_or_create(first_bam, self.config.bam_min_mapq, self.threads)
+                .with_context(|| format!("Failed to load qnames for {}", first_bam.display()))?;
 
         for bam in iter {
-            let bam_qset = QNameSet::load_or_create(
-                bam,
-                self.config.bam_min_mapq,
-                self.threads,
-            )
-            .with_context(|| format!("Failed to load qnames for {}", bam.display()))?;
+            let bam_qset = QNameSet::load_or_create(bam, self.config.bam_min_mapq, self.threads)
+                .with_context(|| format!("Failed to load qnames for {}", bam.display()))?;
 
             let (intersection, frac) = union_qset.intersect(&bam_qset);
             let n_shared = intersection.len();
@@ -420,14 +416,15 @@ impl SamtoolsMergeMany {
         }
 
         let qnames_path = WGSBamStats::qnames_path_from_bam(&self.into);
-        union_qset.save(&qnames_path)
+        union_qset
+            .save(&qnames_path)
             .with_context(|| format!("Failed to save union qnames to {}", qnames_path.display()))?;
 
         Ok(())
     }
 }
 
-impl super::Runner for SamtoolsMergeMany {}
+impl super::LocalRunner for SamtoolsMergeMany {}
 
 impl super::SlurmRunner for SamtoolsMergeMany {
     fn slurm_args(&self) -> Vec<String> {
@@ -442,7 +439,6 @@ impl super::SlurmRunner for SamtoolsMergeMany {
     }
 }
 
-
 #[derive(Debug)]
 pub struct SamtoolsSplit {
     /// Path to the `samtools` executable.
@@ -499,7 +495,7 @@ impl super::Command for SamtoolsSplit {
     }
 }
 
-impl super::Runner for SamtoolsSplit {}
+impl super::LocalRunner for SamtoolsSplit {}
 
 impl super::SlurmRunner for SamtoolsSplit {
     fn slurm_args(&self) -> Vec<String> {
@@ -547,7 +543,7 @@ impl SamtoolsSort {
             slurm_params: SlurmParams {
                 job_name: Some("samtools_sort".into()),
                 cpus_per_task: Some(threads as u32),
-                mem: Some("60G".into()),
+                mem: Some("30G".into()),
                 partition: Some("shortq".into()),
                 gres: None,
             },
@@ -580,7 +576,7 @@ impl super::Command for SamtoolsSort {
     }
 }
 
-impl super::Runner for SamtoolsSort {}
+impl super::LocalRunner for SamtoolsSort {}
 
 impl super::SlurmRunner for SamtoolsSort {
     fn slurm_args(&self) -> Vec<String> {
@@ -589,11 +585,13 @@ impl super::SlurmRunner for SamtoolsSort {
 }
 
 impl SbatchRunner for SamtoolsSort {
-    fn sbatch_extra_args(&self) -> Vec<String> {
-        self.slurm_params.to_args()
+    fn slurm_params(&self) -> SlurmParams {
+        self.slurm_params.clone()
     }
 }
 
+impl LocalBatchRunner for SamtoolsSort {}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -603,7 +601,7 @@ mod tests {
         commands::{run_many_sbatch, SlurmRunner},
         config::Config,
         helpers::test_init,
-        TEST_DIR,
+        run, TEST_DIR,
     };
 
     #[test]
@@ -630,17 +628,16 @@ mod tests {
             threads: config.align.samtools_view_threads,
             bam: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
         };
-        let captured_output = SlurmRunner::run(&mut idx)?;
+        let captured_output = run!(config, &mut idx)?;
         assert_eq!(captured_output.stderr, String::new());
 
         info!("Indexing BAM 2.");
         let mut idx = SamtoolsIndex {
             bin: config.align.samtools_bin.clone(),
-
             threads: config.align.samtools_view_threads,
             bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
         };
-        let captured_output = SlurmRunner::run(&mut idx)?;
+        let captured_output = run!(config, &mut idx)?;
         assert_eq!(captured_output.stderr, String::new());
 
         info!("Mergin both BAM.");
@@ -653,7 +650,6 @@ mod tests {
         let captured_output = SlurmRunner::run(&mut idx)?;
         assert_eq!(captured_output.stderr, String::new());
 
-        // TODO: add number of reads verification use bam_compo
         Ok(())
     }
 

+ 3 - 0
src/config.rs

@@ -25,6 +25,9 @@ pub struct Config {
     /// Run cache directory.
     pub run_cache_dir: String,
 
+    /// Runner can slurm
+    pub slurm_runner: bool,
+
     /// Software threads
     pub threads: u8,
 

+ 59 - 3
src/helpers.rs

@@ -10,7 +10,7 @@ use std::{
     fmt, fs,
     iter::Sum,
     ops::{Add, Div},
-    path::{Path, PathBuf},
+    path::{Path, PathBuf}, sync::atomic::AtomicBool,
 };
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
@@ -794,6 +794,60 @@ impl Drop for TempDirGuard {
     }
 }
 
+/// Guard that tracks temporary files and cleans them up on drop unless disarmed.
+pub struct TempFileGuard {
+    files: Vec<PathBuf>,
+    disarmed: AtomicBool,
+}
+
+impl TempFileGuard {
+    pub fn new() -> Self {
+        Self {
+            files: Vec::new(),
+            disarmed: AtomicBool::new(false),
+        }
+    }
+
+    /// Registers a temporary file for cleanup.
+    pub fn track(&mut self, path: PathBuf) {
+        self.files.push(path);
+    }
+
+    /// Registers multiple temporary files for cleanup.
+    pub fn track_many(&mut self, paths: impl IntoIterator<Item = PathBuf>) {
+        self.files.extend(paths);
+    }
+
+    /// Disarms the guard, preventing cleanup on drop.
+    /// Call this when the pipeline succeeds.
+    pub fn disarm(&self) {
+        self.disarmed.store(true, std::sync::atomic::Ordering::SeqCst);
+    }
+
+    /// Manually clean up all tracked files.
+    pub fn cleanup(&self) {
+        for file in &self.files {
+            if file.exists() {
+                if let Err(e) = remove_bam_with_index(file) {
+                    warn!("Failed to clean up temp file {}: {}", file.display(), e);
+                }
+            }
+        }
+    }
+}
+
+impl Drop for TempFileGuard {
+    fn drop(&mut self) {
+        if !self.disarmed.load(std::sync::atomic::Ordering::SeqCst) {
+            warn!(
+                "Pipeline failed or panicked, cleaning up {} temporary files",
+                self.files.len()
+            );
+            self.cleanup();
+        }
+    }
+}
+
 /// Extracts genome sizes from BAM header.
 pub fn get_genome_sizes(
     header: &rust_htslib::bam::Header,
@@ -974,8 +1028,10 @@ mod tests {
     fn split_g() -> anyhow::Result<()> {
         test_init();
         let n_parts = 4;
-        let reader = bam::Reader::from_path("/mnt/beegfs02/scratch/t_steimle/data/wgs/34528/norm/34528_norm_hs1.bam")
-            .with_context(|| format!("Failed to open BAM"))?;
+        let reader = bam::Reader::from_path(
+            "/mnt/beegfs02/scratch/t_steimle/data/wgs/34528/norm/34528_norm_hs1.bam",
+        )
+        .with_context(|| format!("Failed to open BAM"))?;
         let header = bam::Header::from_template(reader.header());
         let genome_sizes = get_genome_sizes(&header)?;
 

Some files were not shown because too many files changed in this diff