Browse Source

BcfTools Slurm

Thomas 2 weeks ago
parent
commit
0d26c83b3e

+ 12 - 0
pandora-config.example.toml

@@ -10,6 +10,9 @@ pod_dir = "/data/run_data"
 # Root directory where all results will be written.
 result_dir = "/mnt/beegfs02/scratch/t_steimle/data/wgs"
 
+# Temporary directory.
+tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
+
 # Temporary directory used when unarchiving input data.
 unarchive_tmp_dir = "/data/unarchived"
 
@@ -266,6 +269,15 @@ severus_output_dir = "{result_dir}/{id}/diag/severus"
 # {result_dir}, {id}, {time}
 severus_solo_output_dir = "{result_dir}/{id}/{time}/severus"
 
+#######################################
+# Bcftools configuration
+#######################################
+
+# Path to longphase binary.
+bcftools_bin = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/bcftools"
+
+# Threads for longphase.
+bcftools_threads = 30
 
 #######################################
 # Longphase configuration

File diff suppressed because it is too large
+ 0 - 5
slurm-2415516.out


+ 18 - 31
src/callers/deep_variant.rs

@@ -4,17 +4,14 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use regex::Regex;
 use std::{
     fs,
-    path::Path,
+    path::{Path, PathBuf},
     process::{Command as ProcessCommand, Stdio},
 };
 
 use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     collection::vcf::Vcf,
-    commands::{
-        bcftools::{bcftools_keep_pass, BcftoolsConfig},
-        run_many_sbatch,
-    },
+    commands::{SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, run_many_sbatch},
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists},
     io::vcf::read_vcf,
@@ -196,7 +193,9 @@ impl JobCommand for DeepVariant {
                 self.id, self.time_point, self.part_index
             );
 
-            let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default())?;
+            let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
+            let report = SlurmRunner::run(&mut cmd)?;
+
             report
                 .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
                 .context("Can't save bcftools PASS logs")?;
@@ -452,7 +451,7 @@ pub fn run_deepvariant_quartered_sbatch_with_merge(
     }
 
     // 3) run them concurrently on Slurm
-    let outputs = run_many_sbatch(jobs)?;
+    let _outputs = run_many_sbatch(jobs)?;
 
     // 4) merge PASS VCFs
     merge_deepvariant_parts(&base, 4)?;
@@ -467,55 +466,43 @@ pub fn run_deepvariant_quartered_sbatch_with_merge(
 }
 
 fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
-    let mut part_pass_paths = Vec::with_capacity(n_parts);
+    let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
 
+    // Collect all part PASS VCFs
     for i in 1..=n_parts {
         let mut dv = base.clone();
         dv.part_index = Some(i);
         let part_pass = dv.passed_vcf_path();
+
         if !Path::new(&part_pass).exists() {
             anyhow::bail!("Missing part PASS VCF: {part_pass}");
         }
-        part_pass_paths.push(part_pass);
+
+        part_pass_paths.push(PathBuf::from(part_pass));
     }
 
+    // Final merged PASS VCF path
     let final_passed_vcf = base
         .config
         .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
     let final_tmp = format!("{final_passed_vcf}.tmp");
 
+    // Ensure output directory exists
     fs::create_dir_all(
         Path::new(&final_passed_vcf)
             .parent()
             .unwrap_or_else(|| Path::new(".")),
     )?;
 
-    // bcftools concat parts -> tmp file, then index and rename
-    let concat_cmd = format!(
-        "bcftools concat -O z -o {out_tmp} {parts} && \
-         bcftools index -t {out_tmp}",
-        out_tmp = final_tmp,
-        parts = part_pass_paths.join(" ")
-    );
-
-    let out = ProcessCommand::new("bash")
-        .arg("-lc")
-        .arg(concat_cmd)
-        .stdout(Stdio::piped())
-        .stderr(Stdio::piped())
-        .output()
-        .context("failed to run bcftools concat for DeepVariant parts")?;
+    // Run bcftools concat via our Command/SlurmRunner abstraction
+    let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
 
-    if !out.status.success() {
-        let mut log = String::from_utf8_lossy(&out.stdout).to_string();
-        log.push_str(&String::from_utf8_lossy(&out.stderr));
-        anyhow::bail!("bcftools concat failed: {}\n{}", out.status, log);
-    }
+    let _ = SlurmRunner::run(&mut concat)
+        .context("failed to run bcftools concat for DeepVariant parts")?;
 
-    // move tmp → final
+    // Move tmp → final
     fs::rename(&final_tmp, &final_passed_vcf)
         .context("failed to rename merged DeepVariant PASS VCF")?;
 
     Ok(())
 }
-

+ 21 - 24
src/callers/mod.rs

@@ -1,27 +1,24 @@
-use deep_variant::DeepVariant;
-use nanomonsv::NanomonSV;
-use clairs::ClairS;
 
-pub mod clairs;
+// pub mod clairs;
 pub mod deep_variant;
-pub mod nanomonsv;
-pub mod savana;
-pub mod severus;
-pub mod deep_somatic;
+// pub mod nanomonsv;
+// pub mod savana;
+// pub mod severus;
+// pub mod deep_somatic;
 
-#[derive(Debug)]
-pub enum CallersSolo {
-    DeepVariant(DeepVariant),
-    NanomonSV(NanomonSV),
-}
-
-#[derive(Debug)]
-pub enum CallersSomatic {
-    ClairS(ClairS)
-}
-
-#[derive(Debug)]
-pub enum Callers {
-    CallersSomatic,
-    CallersSolo,
-}
+// #[derive(Debug)]
+// pub enum CallersSolo {
+//     DeepVariant(DeepVariant),
+//     NanomonSV(NanomonSV),
+// }
+//
+// #[derive(Debug)]
+// pub enum CallersSomatic {
+//     ClairS(ClairS)
+// }
+//
+// #[derive(Debug)]
+// pub enum Callers {
+//     CallersSomatic,
+//     CallersSolo,
+// }

+ 1 - 3
src/collection/vcf.rs

@@ -8,8 +8,6 @@ use std::{collections::HashMap, fs::Metadata, os::unix::fs::MetadataExt, path::P
 use noodles_csi as csi;
 use num_format::{Locale, ToFormattedString};
 
-use crate::commands::bcftools::{bcftools_index, BcftoolsConfig};
-
 #[derive(Debug, Clone)]
 pub struct Vcf {
     pub id: String,
@@ -33,7 +31,7 @@ impl Vcf {
         let caller = stem_splt[2..stem_splt.len() - 1].join("_");
 
         if !PathBuf::from(format!("{}.csi", path.display())).exists() {
-            bcftools_index(&path.display().to_string(), &BcftoolsConfig::default())?;
+            anyhow::bail!("CSI index for VCF doesn't exist.")
         }
 
         let n_variants = n_variants(path.to_str().context("Can't convert path to str")?)?;

+ 664 - 183
src/commands/bcftools.rs

@@ -1,195 +1,676 @@
+//! Helpers to run common `bcftools` operations behind the generic
+//! `Command` / `Runner` / `SlurmRunner` traits.
+//!
+//! All commands are configured from a [`Config`] and expose a `from_config`
+//! constructor plus a generated shell `cmd()` string suitable for local
+//! or Slurm execution.
+
 use anyhow::Context;
 use log::info;
-use std::{fs, path::Path};
+use std::{
+    fs,
+    path::{Path, PathBuf},
+};
 use uuid::Uuid;
 
-use crate::runners::{run_wait, CommandRun, RunReport};
+use crate::commands::SlurmParams;
+use crate::config::Config;
+
+/// Sorts, normalizes and keeps only `FILTER=PASS` variants with `bcftools`.
+///
+/// The command sequence is:
+///
+/// 1. `bcftools sort` on `input`
+/// 2. `bcftools norm` with `--atom-overlaps .`
+/// 3. `bcftools view -i "FILTER='PASS'" -Oz --write-index`
+///
+/// The final output is a BGZF-compressed VCF (`.vcf.gz`) plus its index.
+#[derive(Debug)]
+pub struct BcftoolsKeepPass {
+    /// Path to the `bcftools` binary.
+    pub bin: String,
+    /// Number of threads to pass to `bcftools` where supported.
+    pub threads: u8,
+    /// Input VCF/BCF file.
+    pub input: PathBuf,
+    /// Output VCF/BCF file with only `FILTER=PASS` records.
+    pub output: PathBuf,
+    /// Temporary dir from a global [`Config`].
+    pub tmp_dir: PathBuf,
+    /// Temporary sorted VCF/BCF path.
+    tmp_sort: PathBuf,
+    /// Temporary normalized VCF/BCF path.
+    tmp_norm: PathBuf,
+}
+
+impl BcftoolsKeepPass {
+    /// Builds a [`BcftoolsKeepPass`] from a global [`Config`].
+    ///
+    /// The `input` and `output` paths are not accessed at construction time;
+    /// they are validated in [`Command::init`].
+    pub fn from_config(config: &Config, input: impl AsRef<Path>, output: impl AsRef<Path>) -> Self {
+        Self {
+            bin: config.bcftools_bin.clone(),
+            threads: config.bcftools_threads,
+            input: input.as_ref().into(),
+            output: output.as_ref().into(),
+            tmp_dir: config.tmp_dir.clone().into(),
+            tmp_sort: PathBuf::new(),
+            tmp_norm: PathBuf::new(),
+        }
+    }
+}
+
+impl super::Command for BcftoolsKeepPass {
+    /// Validates the input file and allocates random temporary paths.
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.input.exists() {
+            anyhow::bail!("File doesnt exist {}", self.input.display());
+        }
+        self.tmp_sort = self.tmp_dir.join(Uuid::new_v4().to_string());
+        self.tmp_norm = self.tmp_dir.join(Uuid::new_v4().to_string());
+        Ok(())
+    }
+
+    /// Returns the shell command that performs sort, norm and `FILTER=PASS` view.
+    fn cmd(&self) -> String {
+        format!(
+            "{bin} sort {input} -o {tmp_sort} && \
+             {bin} norm --threads {threads} -a --atom-overlaps . {tmp_sort} -o {tmp_norm} && \
+             {bin} view --write-index --threads {threads} -i \"FILTER='PASS'\" {tmp_norm} -Oz -o {output}",
+            bin = self.bin,
+            threads = self.threads,
+            input = self.input.display(),
+            tmp_sort = self.tmp_sort.display(),
+            tmp_norm = self.tmp_norm.display(),
+            output = self.output.display(),
+        )
+    }
+
+    /// Removes temporary files created during [`Command::init`] / [`Command::cmd`].
+    fn clean_up(&self) -> anyhow::Result<()> {
+        if self.tmp_sort.exists() {
+            fs::remove_file(&self.tmp_sort)
+                .with_context(|| format!("Failed to remove {}", self.tmp_sort.display()))?;
+        }
+        if self.tmp_norm.exists() {
+            fs::remove_file(&self.tmp_norm)
+                .with_context(|| format!("Failed to remove {}", self.tmp_norm.display()))?;
+        }
+        Ok(())
+    }
+}
+
+impl super::Runner for BcftoolsKeepPass {}
+
+impl super::SlurmRunner for BcftoolsKeepPass {
+    /// Slurm resource request for the `bcftools keep pass` job.
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("bcftools_keep_pass".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("45G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+/// Sorts, normalizes and keeps only *precise* `FILTER=PASS` variants.
+///
+/// Differs from [`BcftoolsKeepPass`] by excluding imprecise calls using
+/// `-e "INFO/IMPRECISE==1 || FILTER!='PASS'"`.
+///
+/// The final output is a BGZF-compressed VCF (`.vcf.gz`) plus its index.
+#[derive(Debug)]
+pub struct BcftoolsKeepPassPrecise {
+    /// Path to the `bcftools` binary.
+    pub bin: String,
+    /// Number of threads to pass to `bcftools` where supported.
+    pub threads: u8,
+    /// Input VCF/BCF file.
+    pub input: PathBuf,
+    /// Output VCF/BCF file with precise `FILTER=PASS` records.
+    pub output: PathBuf,
+    /// Temporary dir from a global [`Config`].
+    pub tmp_dir: PathBuf,
+    /// Temporary sorted VCF/BCF path.
+    tmp_sort: PathBuf,
+    /// Temporary normalized VCF/BCF path.
+    tmp_norm: PathBuf,
+}
+
+impl BcftoolsKeepPassPrecise {
+    /// Builds a [`BcftoolsKeepPassPrecise`] from a global [`Config`].
+    pub fn from_config(config: &Config, input: impl AsRef<Path>, output: impl AsRef<Path>) -> Self {
+        Self {
+            bin: config.bcftools_bin.clone(),
+            threads: config.bcftools_threads,
+            input: input.as_ref().into(),
+            output: output.as_ref().into(),
+            tmp_dir: config.tmp_dir.clone().into(),
+            tmp_sort: PathBuf::new(),
+            tmp_norm: PathBuf::new(),
+        }
+    }
+}
+
+impl super::Command for BcftoolsKeepPassPrecise {
+    /// Validates the input file and allocates random temporary paths.
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.input.exists() {
+            anyhow::bail!("File doesnt exist {}", self.input.display());
+        }
+        self.tmp_sort = self.tmp_dir.join(Uuid::new_v4().to_string());
+        self.tmp_norm = self.tmp_dir.join(Uuid::new_v4().to_string());
+        Ok(())
+    }
+
+    /// Returns the shell command that performs sort, norm and precise PASS filtering.
+    fn cmd(&self) -> String {
+        format!(
+            "{bin} sort {input} -o {tmp_sort} && \
+             {bin} norm --threads {threads} -a --atom-overlaps . {tmp_sort} -o {tmp_norm} && \
+             {bin} view --write-index --threads {threads} -e \"INFO/IMPRECISE==1 || FILTER!='PASS'\" {tmp_norm} -Oz -o {output}",
+            bin = self.bin,
+            threads = self.threads,
+            input = self.input.display(),
+            tmp_sort = self.tmp_sort.display(),
+            tmp_norm = self.tmp_norm.display(),
+            output = self.output.display(),
+        )
+    }
+
+    /// Removes temporary files created during [`Command::init`] / [`Command::cmd`].
+    fn clean_up(&self) -> anyhow::Result<()> {
+        if self.tmp_sort.exists() {
+            fs::remove_file(&self.tmp_sort)
+                .with_context(|| format!("Failed to remove {}", self.tmp_sort.display()))?;
+        }
+        if self.tmp_norm.exists() {
+            fs::remove_file(&self.tmp_norm)
+                .with_context(|| format!("Failed to remove {}", self.tmp_norm.display()))?;
+        }
+        Ok(())
+    }
+}
+
+impl super::Runner for BcftoolsKeepPassPrecise {}
+
+impl super::SlurmRunner for BcftoolsKeepPassPrecise {
+    /// Slurm resource request for the `bcftools keep pass precise` job.
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("bcftools_keep_pass_precise".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("45G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+/// Concatenates multiple VCF/BCF files with `bcftools concat`.
+///
+/// Builds a temporary list file containing all inputs, then runs
+/// `bcftools concat -a -D -f list --write-index` to produce `output`.
+#[derive(Debug)]
+pub struct BcftoolsConcat {
+    /// Path to the `bcftools` binary.
+    pub bin: String,
+    /// Number of threads to pass to `bcftools`.
+    pub threads: u8,
+    /// List of input VCF/BCF files to concatenate.
+    pub inputs: Vec<PathBuf>,
+    /// Output concatenated VCF/BCF file.
+    pub output: PathBuf,
+    /// Temporary dir from a global [`Config`].
+    pub tmp_dir: PathBuf,
+    /// Temporary list file path for `bcftools concat -f`.
+    tmp_list: PathBuf,
+}
+
+impl BcftoolsConcat {
+    /// Builds a [`BcftoolsConcat`] from a global [`Config`].
+    ///
+    /// Input paths are converted to [`PathBuf`] immediately, but existence
+    /// is validated in [`Command::init`].
+    pub fn from_config(
+        config: &Config,
+        inputs: Vec<impl AsRef<Path>>,
+        output: impl AsRef<Path>,
+    ) -> Self {
+        Self {
+            bin: config.bcftools_bin.clone(),
+            threads: config.bcftools_threads,
+            inputs: inputs.into_iter().map(|p| p.as_ref().into()).collect(),
+            output: output.as_ref().into(),
+            tmp_dir: config.tmp_dir.clone().into(),
+            tmp_list: PathBuf::new(),
+        }
+    }
+}
+
+impl super::Command for BcftoolsConcat {
+    /// Validates input VCFs and writes the temporary concat list file
+    fn init(&mut self) -> anyhow::Result<()> {
+        if self.inputs.is_empty() {
+            anyhow::bail!("No VCFs provided");
+        }
+        for p in &self.inputs {
+            if !p.exists() {
+                anyhow::bail!("VCF doesnt exist: {}", p.display());
+            }
+        }
+        self.tmp_list = self.tmp_dir.join(Uuid::new_v4().to_string());
+        let list = self
+            .inputs
+            .iter()
+            .map(|p| p.display().to_string())
+            .collect::<Vec<_>>()
+            .join("\n");
+
+        info!("Writing bcftools concat list: {}", self.tmp_list.display());
+        fs::write(&self.tmp_list, list)?;
+        Ok(())
+    }
+
+    /// Returns the shell command that runs `bcftools concat` and removes the list file.
+    fn cmd(&self) -> String {
+        format!(
+            "{bin} concat --write-index --threads {threads} -a -D -f {list} -Oz -o {output} && rm -f {list}",
+            bin = self.bin,
+            threads = self.threads,
+            list = self.tmp_list.display(),
+            output = self.output.display(),
+        )
+    }
+}
+
+impl super::Runner for BcftoolsConcat {}
+
+impl super::SlurmRunner for BcftoolsConcat {
+    // Slurm resource request for the `bcftools concat` job.
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("bcftools_concat".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("45G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+/// Indexes a VCF/BCF file with `bcftools index`.
+#[derive(Debug)]
+pub struct BcftoolsIndex {
+    /// Path to the `bcftools` binary.
+    pub bin: String,
+    /// Number of threads to pass to `bcftools index`.
+    pub threads: u8,
+    /// VCF/BCF file to index.
+    pub vcf: PathBuf,
+}
+
+impl BcftoolsIndex {
+    /// Builds a [`BcftoolsIndex`] from a global [`Config`].
+    pub fn from_config(config: &Config, vcf: impl AsRef<Path>) -> Self {
+        Self {
+            bin: config.bcftools_bin.clone(),
+            threads: config.bcftools_threads,
+            vcf: vcf.as_ref().into(),
+        }
+    }
+}
+
+impl super::Command for BcftoolsIndex {
+    /// Validates that the VCF/BCF file exists.
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.vcf.exists() {
+            anyhow::bail!("VCF doesnt exist: {}", self.vcf.display());
+        }
+        Ok(())
+    }
+
+    /// Returns the shell command that runs `bcftools index`.
+    fn cmd(&self) -> String {
+        format!(
+            "{bin} index --threads {threads} {vcf}",
+            bin = self.bin,
+            threads = self.threads,
+            vcf = self.vcf.display()
+        )
+    }
+}
+
+impl super::Runner for BcftoolsIndex {}
+impl super::SlurmRunner for BcftoolsIndex {
+    /// Slurm resource request for the `bcftools index` job.
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("bcftools_index".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("45G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
 
+/// Compresses a VCF/BCF file to BGZF using `bcftools view -Oz`.
 #[derive(Debug)]
-pub struct BcftoolsConfig {
+pub struct BcftoolsCompress {
+    /// Path to the `bcftools` binary.
     pub bin: String,
+    /// Number of threads to pass to `bcftools`.
     pub threads: u8,
+    /// Input uncompressed VCF/BCF file.
+    pub in_vcf: PathBuf,
+    /// Output compressed VCF/BCF (`.vcf.gz` / `.bcf`) file.
+    pub out_vcf: PathBuf,
 }
 
-impl Default for BcftoolsConfig {
-    fn default() -> Self {
+impl BcftoolsCompress {
+    /// Builds a [`BcftoolsCompress`] from a global [`Config`].
+    ///
+    /// Existence of `in_vcf` and non-existence of `out_vcf` are enforced
+    /// in [`Command::init`].
+    pub fn from_config(
+        config: &Config,
+        in_vcf: impl AsRef<Path>,
+        out_vcf: impl AsRef<Path>,
+    ) -> Self {
         Self {
-            bin: "/data/tools/bcftools-1.21/bcftools".to_string(),
-            threads: 20,
-        }
-    }
-}
-
-pub fn bcftools_keep_pass(
-    input: &str,
-    output: &str,
-    config: BcftoolsConfig,
-) -> anyhow::Result<RunReport> {
-    if !Path::new(input).exists() {
-        anyhow::bail!("File doesnt exist {input}")
-    }
-    // First sort
-    let tmp_file = format!("/tmp/{}", Uuid::new_v4());
-    let mut cmd_run = CommandRun::new(&config.bin, &["sort", input, "-o", &tmp_file]);
-    let _ = run_wait(&mut cmd_run)?;
-
-    // 2. norm
-    let tmp2_file = format!("/tmp/{}", Uuid::new_v4());
-    let mut cmd_run = CommandRun::new(
-        &config.bin,
-        &[
-            "norm",
-            "--threads",
-            &config.threads.to_string(),
-            "-a",
-            "--atom-overlaps",
-            ".",
-            &tmp_file,
-            "-o",
-            &tmp2_file,
-        ],
-    );
-    let _ = run_wait(&mut cmd_run)?;
-    fs::remove_file(tmp_file)?;
-
-    // Then filter
-    let mut cmd_run = CommandRun::new(
-        &config.bin,
-        &[
-            "view",
-            "--write-index",
-            "--threads",
-            &config.threads.to_string(),
-            "-i",
-            "FILTER='PASS'",
-            &tmp2_file,
-            "-o",
-            output,
-        ],
-    );
-    let res = run_wait(&mut cmd_run)?;
-    fs::remove_file(tmp2_file)?;
-    Ok(res)
-}
-
-pub fn bcftools_keep_pass_precise(
-    input: &str,
-    output: &str,
-    config: BcftoolsConfig,
-) -> anyhow::Result<RunReport> {
-    if !Path::new(input).exists() {
-        anyhow::bail!("File doesnt exist {input}")
-    }
-    // First sort
-    let tmp_file = format!("/tmp/{}", Uuid::new_v4());
-    let mut cmd_run = CommandRun::new(&config.bin, &["sort", input, "-o", &tmp_file]);
-    let _ = run_wait(&mut cmd_run)?;
-
-    // 2. norm
-    let tmp2_file = format!("/tmp/{}", Uuid::new_v4());
-    let mut cmd_run = CommandRun::new(
-        &config.bin,
-        &[
-            "norm",
-            "--threads",
-            &config.threads.to_string(),
-            "-a",
-            "--atom-overlaps",
-            ".",
-            &tmp_file,
-            "-o",
-            &tmp2_file,
-        ],
-    );
-    let _ = run_wait(&mut cmd_run)?;
-    fs::remove_file(tmp_file)?;
-
-    // Then filter
-    let mut cmd_run = CommandRun::new(
-        &config.bin,
-        &[
-            "view",
-            "--write-index",
-            "--threads",
-            &config.threads.to_string(),
-            "-e",
-            "INFO/IMPRECISE==1 || FILTER!=\"PASS\"",
-            &tmp2_file,
-            "-o",
-            output,
-        ],
-    );
-    let res = run_wait(&mut cmd_run)?;
-    fs::remove_file(tmp2_file)?;
-    Ok(res)
-}
-
-pub fn bcftools_concat(
-    inputs: Vec<String>,
-    output: &str,
-    config: BcftoolsConfig,
-) -> anyhow::Result<RunReport> {
-    info!("Concatening vcf with bcftools: {}", inputs.join(", "));
-    let tmp_file = format!("/tmp/{}", Uuid::new_v4());
-    fs::write(&tmp_file, inputs.join("\n"))?;
-
-    let args = [
-        "concat",
-        "--write-index",
-        "--threads",
-        &config.threads.to_string(),
-        "-a",
-        "-D",
-        "-f",
-        &tmp_file,
-        "-o",
-        output,
-    ];
-    // Then filter
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let res = run_wait(&mut cmd_run)
-        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
-    fs::remove_file(tmp_file)?;
-    Ok(res)
-}
-
-pub fn bcftools_keep_only_in_a(
-    a: &str,
-    b: &str,
-    out: &str,
-    config: &BcftoolsConfig,
-) -> anyhow::Result<()> {
-    let args = ["isec", "-C", "-w", "1", a, b, "-o", out];
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let _ = run_wait(&mut cmd_run)
-        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
-    Ok(())
-}
-
-pub fn bcftools_index(vcf: &str, config: &BcftoolsConfig) -> anyhow::Result<()> {
-    let args = ["index", "--threads", &config.threads.to_string(), vcf];
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let _ = run_wait(&mut cmd_run)
-        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
-    Ok(())
-}
-
-pub fn bcftools_compress(
-    in_vcf: &str,
-    out_vcf: &str,
-    config: &BcftoolsConfig,
-) -> anyhow::Result<()> {
-    let args = [
-        "view",
-        "--threads",
-        &config.threads.to_string(),
-        in_vcf,
-        "-Oz",
-        "-o",
-        out_vcf,
-    ];
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let _ = run_wait(&mut cmd_run)
-        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
-    Ok(())
+            bin: config.bcftools_bin.clone(),
+            threads: config.bcftools_threads,
+            in_vcf: in_vcf.as_ref().into(),
+            out_vcf: out_vcf.as_ref().into(),
+        }
+    }
+}
+
+impl super::Command for BcftoolsCompress {
+    /// Validates input/output VCF paths before running the command.
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.in_vcf.exists() {
+            anyhow::bail!("VCF doesnt exist: {}", self.in_vcf.display());
+        }
+        if self.out_vcf.exists() {
+            anyhow::bail!("Output VCF exists: {}", self.out_vcf.display());
+        }
+        Ok(())
+    }
+
+    /// Returns the shell command that compresses the VCF with `bcftools view -Oz`.
+    fn cmd(&self) -> String {
+        format!(
+            "{bin} view --threads {threads} {in_vcf} -Oz -o {out_vcf}",
+            bin = self.bin,
+            threads = self.threads,
+            in_vcf = self.in_vcf.display(),
+            out_vcf = self.out_vcf.display()
+        )
+    }
+}
+
+impl super::Runner for BcftoolsCompress {}
+impl super::SlurmRunner for BcftoolsCompress {
+    /// Slurm resource request for the `bcftools compress` job.
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("bcftools_compress".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("45G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::{
+        fs,
+        path::{Path, PathBuf},
+    };
+
+    use log::info;
+    use uuid::Uuid;
+
+    use crate::{
+        commands::SlurmRunner,
+        config::Config,
+        helpers::{test_init, TempDirGuard},
+    };
+
+    /// Small valid VCF with 3 variants and full header.
+    fn write_small_vcf(path: &Path) -> anyhow::Result<()> {
+        let vcf = "\
+##fileformat=VCFv4.2
+##contig=<ID=1,length=1000000>
+##INFO=<ID=IMPRECISE,Number=1,Type=Integer,Description=\"Imprecise call indicator\">
+##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">
+##FILTER=<ID=q10,Description=\"Quality <10\">
+#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ts1
+1\t100\t.\tA\tC\t50\tPASS\t.\tGT\t0/1
+1\t200\t.\tG\tT\t50\tq10\t.\tGT\t0/1
+1\t300\t.\tT\tG\t50\tPASS\tIMPRECISE=1\tGT\t0/1
+";
+        if let Some(parent) = path.parent() {
+            fs::create_dir_all(parent)?;
+        }
+        info!("Writing small test VCF to {}", path.display());
+        fs::write(path, vcf)?;
+        Ok(())
+    }
+
+    #[test]
+    fn bcftools_compress_index_slurm() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+        let base = PathBuf::from(&config.tmp_dir).join(Uuid::new_v4().to_string());
+        fs::create_dir_all(&base)?;
+        let _guard = TempDirGuard::new(base.clone());
+
+        let raw = base.join("raw.vcf");
+        let raw_gz = base.join("raw.vcf.gz");
+
+        write_small_vcf(&raw)?;
+
+        // 1) compress raw.vcf -> raw.vcf.gz
+        let mut compress = BcftoolsCompress::from_config(&config, &raw, &raw_gz);
+        let out_compress = SlurmRunner::run(&mut compress)?;
+        println!("{out_compress}");
+        assert!(
+            raw_gz.exists(),
+            "compressed VCF should exist after compress"
+        );
+
+        // 2) index raw.vcf.gz
+        let mut index = BcftoolsIndex::from_config(&config, &raw_gz);
+        let out_index = SlurmRunner::run(&mut index)?;
+        println!("{out_index}");
+
+        let tbi = PathBuf::from(format!("{}.tbi", raw_gz.display()));
+        let csi = PathBuf::from(format!("{}.csi", raw_gz.display()));
+        assert!(
+            tbi.exists() || csi.exists(),
+            "index file (.tbi or .csi) should exist"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn bcftools_compress_index_then_keep_pass_slurm() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+        let base = PathBuf::from(&config.tmp_dir).join(Uuid::new_v4().to_string());
+        fs::create_dir_all(&base)?;
+        let _guard = TempDirGuard::new(base.clone());
+
+        let raw = base.join("raw.vcf");
+        let raw_gz = base.join("raw.vcf.gz");
+        let pass_vcf_gz = base.join("pass.vcf.gz");
+
+        write_small_vcf(&raw)?;
+
+        // 1) compress raw.vcf -> raw.vcf.gz
+        let mut compress = BcftoolsCompress::from_config(&config, &raw, &raw_gz);
+        let out_compress = SlurmRunner::run(&mut compress)?;
+        println!("{out_compress}");
+        assert!(
+            raw_gz.exists(),
+            "compressed VCF should exist after compress"
+        );
+
+        // 2) index raw.vcf.gz
+        let mut index = BcftoolsIndex::from_config(&config, &raw_gz);
+        let out_index = SlurmRunner::run(&mut index)?;
+        println!("{out_index}");
+
+        let raw_tbi = PathBuf::from(format!("{}.tbi", raw_gz.display()));
+        let raw_csi = PathBuf::from(format!("{}.csi", raw_gz.display()));
+        assert!(
+            raw_tbi.exists() || raw_csi.exists(),
+            "index file (.tbi or .csi) for raw.vcf.gz should exist"
+        );
+
+        // 3) keep only FILTER=PASS from raw.vcf.gz -> pass.vcf.gz (with index)
+        let mut keep = BcftoolsKeepPass::from_config(&config, &raw_gz, &pass_vcf_gz);
+        let out_keep = SlurmRunner::run(&mut keep)?;
+        println!("{out_keep}");
+
+        assert!(
+            pass_vcf_gz.exists(),
+            "PASS-only BGZF VCF should exist after keep-pass step"
+        );
+        let pass_tbi = PathBuf::from(format!("{}.tbi", pass_vcf_gz.display()));
+        let pass_csi = PathBuf::from(format!("{}.csi", pass_vcf_gz.display()));
+        assert!(
+            pass_tbi.exists() || pass_csi.exists(),
+            "index for pass.vcf.gz should exist (written by --write-index)"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn bcftools_compress_index_then_keep_pass_precise_slurm() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+        let base = PathBuf::from(&config.tmp_dir).join(Uuid::new_v4().to_string());
+        fs::create_dir_all(&base)?;
+        let _guard = TempDirGuard::new(base.clone());
+
+        let raw = base.join("raw.vcf");
+        let raw_gz = base.join("raw.vcf.gz");
+        let pass_precise_vcf_gz = base.join("pass_precise.vcf.gz");
+
+        write_small_vcf(&raw)?;
+
+        // 1) compress raw.vcf -> raw.vcf.gz
+        let mut compress = BcftoolsCompress::from_config(&config, &raw, &raw_gz);
+        let out_compress = SlurmRunner::run(&mut compress)?;
+        println!("{out_compress}");
+        assert!(
+            raw_gz.exists(),
+            "compressed VCF should exist after compress"
+        );
+
+        // 2) index raw.vcf.gz
+        let mut index = BcftoolsIndex::from_config(&config, &raw_gz);
+        let out_index = SlurmRunner::run(&mut index)?;
+        println!("{out_index}");
+
+        let tbi = PathBuf::from(format!("{}.tbi", raw_gz.display()));
+        let csi = PathBuf::from(format!("{}.csi", raw_gz.display()));
+        assert!(
+            tbi.exists() || csi.exists(),
+            "index file (.tbi or .csi) should exist"
+        );
+
+        // 3) keep only precise FILTER=PASS from raw.vcf.gz
+        let mut keep_precise =
+            BcftoolsKeepPassPrecise::from_config(&config, &raw_gz, &pass_precise_vcf_gz);
+        let out_keep_precise = SlurmRunner::run(&mut keep_precise)?;
+        println!("{out_keep_precise}");
+        assert!(pass_precise_vcf_gz.exists());
+        let pp_tbi = PathBuf::from(format!("{}.tbi", pass_precise_vcf_gz.display()));
+        let pp_csi = PathBuf::from(format!("{}.csi", pass_precise_vcf_gz.display()));
+        assert!(pp_tbi.exists() || pp_csi.exists());
+
+        Ok(())
+    }
+
+    #[test]
+    fn bcftools_concat_compress_index_slurm() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+        let base = PathBuf::from(&config.tmp_dir).join(Uuid::new_v4().to_string());
+        fs::create_dir_all(&base)?;
+        let _guard = TempDirGuard::new(base.clone());
+
+        let raw1 = base.join("a.vcf");
+        let raw2 = base.join("b.vcf");
+        let raw3 = base.join("c.vcf");
+        let gz1 = base.join("a.vcf.gz");
+        let gz2 = base.join("b.vcf.gz");
+        let gz3 = base.join("c.vcf.gz");
+        let concat_gz = base.join("concat.vcf.gz");
+
+        // write three small raw VCFs
+        write_small_vcf(&raw1)?;
+        write_small_vcf(&raw2)?;
+        write_small_vcf(&raw3)?;
+
+        // 1) compress each raw VCF
+        let mut c1 = BcftoolsCompress::from_config(&config, &raw1, &gz1);
+        let mut c2 = BcftoolsCompress::from_config(&config, &raw2, &gz2);
+        let mut c3 = BcftoolsCompress::from_config(&config, &raw3, &gz3);
+        SlurmRunner::run(&mut c1)?;
+        SlurmRunner::run(&mut c2)?;
+        SlurmRunner::run(&mut c3)?;
+        assert!(
+            gz1.exists() && gz2.exists() && gz3.exists(),
+            "all compressed VCFs should exist"
+        );
+
+        // 2) index each compressed VCF
+        let mut i1 = BcftoolsIndex::from_config(&config, &gz1);
+        let mut i2 = BcftoolsIndex::from_config(&config, &gz2);
+        let mut i3 = BcftoolsIndex::from_config(&config, &gz3);
+        SlurmRunner::run(&mut i1)?;
+        SlurmRunner::run(&mut i2)?;
+        SlurmRunner::run(&mut i3)?;
+        // just basic existence checks; index suffix may be .tbi or .csi
+        assert!(
+            gz1.with_extension("vcf.gz.tbi").exists()
+                || gz1.with_extension("vcf.gz.csi").exists()
+                || PathBuf::from(format!("{}.tbi", gz1.display())).exists()
+                || PathBuf::from(format!("{}.csi", gz1.display())).exists()
+        );
+
+        // 3) concat compressed VCFs (bcftools can concat bgzip VCFs)
+        let mut concat = BcftoolsConcat::from_config(&config, vec![&gz1, &gz2, &gz3], &concat_gz);
+        let out_concat = SlurmRunner::run(&mut concat)?;
+        println!("{out_concat}");
+        assert!(concat_gz.exists(), "concatenated BGZF VCF should exist");
+
+        let concat_tbi = PathBuf::from(format!("{}.tbi", concat_gz.display()));
+        let concat_csi = PathBuf::from(format!("{}.csi", concat_gz.display()));
+        assert!(
+            concat_tbi.exists() || concat_csi.exists(),
+            "index for concatenated VCF should exist"
+        );
+
+        Ok(())
+    }
 }

+ 70 - 10
src/commands/mod.rs

@@ -1,17 +1,12 @@
 pub mod bcftools;
 pub mod dorado;
-pub mod longphase;
+// pub mod longphase;
 pub mod modkit;
 pub mod samtools;
-pub mod wakhan;
+// pub mod wakhan;
 
 use std::{
-    fs::File,
-    io::{self, BufReader, Read, Seek, SeekFrom, Write},
-    process::{Command as ProcessCommand, Stdio},
-    sync::{mpsc, Arc, Mutex},
-    thread,
-    time::Duration,
+    fmt::{self, Display, Formatter}, fs::{self, File}, io::{self, BufReader, Read, Seek, SeekFrom, Write}, path::Path, process::{Command as ProcessCommand, Stdio}, sync::{Arc, Mutex, mpsc}, thread, time::Duration
 };
 
 /// A helper trait for commands that should be run through Slurm.
@@ -155,15 +150,58 @@ pub trait Runner: Command {
 
 use anyhow::Context;
 use log::info;
+use serde::{Deserialize, Serialize};
 
 /// Captured process output while it was being streamed live.
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Serialize, Deserialize)]
 pub struct CapturedOutput {
     pub stdout: String,
     pub stderr: String,
     pub slurm_epilog: Option<SlurmEpilog>,
 }
 
+impl CapturedOutput {
+    /// Save struct as JSON to `path` (creates parent dirs if needed).
+    pub fn save_to_file(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
+        let path = path.as_ref();
+        if let Some(p) = path.parent() {
+            fs::create_dir_all(p).with_context(|| format!("Failed to create {}", p.display()))?;
+        }
+        let json = serde_json::to_string(self)?;
+        fs::write(path, json)
+            .with_context(|| format!("Failed to write {}", path.display()))?;
+        Ok(())
+    }
+}
+
+impl Display for CapturedOutput {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        // 1. Write STDOUT
+        writeln!(f, "--- STANDARD OUTPUT ---")?;
+        if self.stdout.is_empty() {
+            writeln!(f, "[No standard output was captured.]")?;
+        } else {
+            writeln!(f, "{}", self.stdout)?;
+        }
+
+        // 2. Write STDERR
+        writeln!(f, "\n--- STANDARD ERROR ---")?;
+        if self.stderr.is_empty() {
+            writeln!(f, "[No standard error was captured.]")?;
+        } else {
+            writeln!(f, "{}", self.stderr)?;
+        }
+
+        // 3. Write SlurmEpilog if present
+        if let Some(epilog) = &self.slurm_epilog {
+            // Using a separate line break before the epilog for clarity
+            writeln!(f, "\n{}", epilog)?;
+        }
+
+        Ok(())
+    }
+}
+
 /// Super-trait adding a Slurm `run()` method on top of [`Command`].
 pub trait SlurmRunner: Command {
     /// Slurm launcher binary. Defaults to `srun`.
@@ -559,7 +597,7 @@ pub trait SbatchRunner: Command {
 
 use std::collections::HashMap;
 
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 pub struct SlurmEpilog {
     pub job_id: String,
     pub cluster: String,
@@ -639,6 +677,28 @@ impl SlurmEpilog {
     }
 }
 
+impl Display for SlurmEpilog {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        writeln!(f, "--- SLURM JOB EPILOG (ID: {}) ---", self.job_id)?;
+        writeln!(f, "  Cluster: {}", self.cluster)?;
+        writeln!(f, "  User: {} (Group: {})", self.user, self.group)?;
+        writeln!(f, "  Nodelist: {}", self.nodelist)?;
+        writeln!(f, "  Cores Used: {}", self.cores)?;
+        writeln!(f)?;
+        writeln!(f, "  --- Timestamps ---")?;
+        writeln!(f, "  Started At: {}", self.job_started_at)?;
+        writeln!(f, "  Ended At:   {}", self.job_ended_at)?;
+        writeln!(f, "  Wall Time:  {}", self.wall_clock_time)?;
+        writeln!(f)?;
+        writeln!(f, "  --- Resource Utilization ---")?;
+        writeln!(f, "  CPU Utilized:   {}", self.cpu_utilized)?;
+        writeln!(f, "  CPU Efficiency: {}", self.cpu_efficiency)?;
+        writeln!(f, "  Memory Utilized:  {}", self.memory_utilized)?;
+        writeln!(f, "  Memory Efficiency:{}", self.memory_efficiency)?;
+        writeln!(f, "------------------------------------")
+    }
+}
+
 /// Split output into job output and epilog
 pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
     if let Some(epilog_start) = full_output.find("----------------------------------------------") {

+ 9 - 4
src/commands/samtools.rs

@@ -1,5 +1,4 @@
 use std::{
-    collections::HashSet,
     fs,
     path::{Path, PathBuf},
 };
@@ -9,7 +8,7 @@ use log::info;
 use uuid::Uuid;
 
 use crate::{
-    collection::bam_stats::{QNameSet, WGSBamStats},
+    collection::bam_stats::QNameSet,
     commands::{SbatchRunner, SlurmParams},
     config::Config,
 };
@@ -135,7 +134,12 @@ impl super::Command for SamtoolsMerge {
 
         // Load qname sets
         let mut into_qset = QNameSet::load_or_create(&self.into, self.config.bam_min_mapq, 4)
-            .with_context(|| format!("Failed to load or create qnames for {}", self.into.display()))?;
+            .with_context(|| {
+                format!(
+                    "Failed to load or create qnames for {}",
+                    self.into.display()
+                )
+            })?;
 
         let bam_qset = QNameSet::from_bam_in_memory(&self.bam, self.config.bam_min_mapq, 4)?;
 
@@ -143,7 +147,8 @@ impl super::Command for SamtoolsMerge {
         let (intersection, frac_into) = into_qset.intersect(&bam_qset);
         let n_shared = intersection.len();
 
-        if n_shared > 0 {
+        // relaxed for revovery reads
+        if n_shared > 10_000 {
             anyhow::bail!(
                 "Cannot merge: {} and {} share {} read name(s) ({:.4} fraction of `into`)",
                 self.into.display(),

+ 10 - 0
src/config.rs

@@ -19,6 +19,9 @@ pub struct Config {
     /// Root directory where all results will be written.
     pub result_dir: String,
 
+    /// Temporary directory.
+    pub tmp_dir: String,
+
     /// Temporary directory used when unarchiving input data.
     pub unarchive_tmp_dir: String,
 
@@ -243,6 +246,13 @@ pub struct Config {
     /// Placeholders: `{result_dir}`, `{id}`, `{time}`.
     pub severus_solo_output_dir: String,
 
+    // === Bcftools configuration ===
+    /// Path to Bcftools binary.
+    pub bcftools_bin: String,
+
+    /// Number of threads for Bcftools.
+    pub bcftools_threads: u8,
+
     // === Longphase configuration ===
     /// Path to longphase binary.
     pub longphase_bin: String,

+ 27 - 2
src/helpers.rs

@@ -1,7 +1,7 @@
 use anyhow::Context;
 use bitcode::{Decode, Encode};
 use glob::glob;
-use log::{debug, warn};
+use log::{debug, error, warn};
 use serde::{Deserialize, Serialize};
 use std::{
     cmp::Ordering,
@@ -704,7 +704,6 @@ pub fn detect_repetition(s: &str) -> Repeat {
     Repeat::None
 }
 
-
 pub fn test_init() {
     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
         .try_init();
@@ -747,4 +746,30 @@ pub fn human_size(bytes: u64) -> String {
         format!("{:.2} {}", size, UNITS[unit])
     }
 }
+// A RAII (Resource Acquisition Is Initialization) guard for cleaning up temporary directories.
+pub struct TempDirGuard {
+    path: PathBuf,
+}
+
+impl TempDirGuard {
+    pub fn new(path: PathBuf) -> Self {
+        Self { path }
+    }
+}
 
+// The core cleanup logic: executed automatically when the TempDirGuard is dropped.
+impl Drop for TempDirGuard {
+    fn drop(&mut self) {
+        if self.path.exists() {
+            // Log the attempt. Using eprintln/warn! ensures the error is noted,
+            // but the program does not panic.
+            if let Err(e) = fs::remove_dir_all(&self.path) {
+                error!(
+                    "Failed to remove temporary directory '{}': {}",
+                    self.path.display(),
+                    e
+                );
+            }
+        }
+    }
+}

+ 182 - 202
src/lib.rs

@@ -166,38 +166,20 @@ mod tests {
         vep::{VepLine, VEP},
         Annotations,
     };
-    use callers::{
-        savana::{Savana, SavanaReadCounts},
-        severus::{Severus, SeverusSolo},
-    };
+
     use collection::bam::{counts_at, counts_ins_at, nt_pileup, WGSBam};
-    use commands::longphase::{
-        LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase,
-    };
     use functions::assembler::{Assembler, AssemblerConfig};
     use helpers::estimate_shannon_entropy;
     use io::bed::read_bed;
     use log::{error, info};
-    use pipes::somatic::SomaticPipe;
     use positions::{overlaps_par, GenomePosition, GenomeRange};
     use rayon::prelude::*;
-    use runners::Run;
-    use variant::{
-        variant::{Variants, VcfVariant},
-        variant_collection,
-    };
+    use variant::{variant::VcfVariant, variant_collection};
 
     use self::{/* collection::pod5::{FlowCellCase, Pod5Collection}, */ config::Config};
     use super::*;
     use crate::{
         annotation::Annotation,
-        callers::{
-            clairs::ClairS,
-            deep_somatic::DeepSomatic,
-            deep_variant::DeepVariant,
-            nanomonsv::{NanomonSV, NanomonSVSolo},
-            savana::SavanaCN,
-        },
         collection::{
             bam::{self},
             flowcells::{scan_archive, FlowCells},
@@ -205,7 +187,6 @@ mod tests {
         },
         helpers::find_files,
         io::{bed::bedrow_overlaps_par, dict::read_dict, gff::features_ranges},
-        pipes::{somatic::const_stats, Initialize, InitializeSolo, ShouldRun, Version},
         positions::{merge_overlapping_genome_ranges, range_intersection_par, sort_ranges},
         scan::scan::somatic_scan,
         variant::{
@@ -308,34 +289,34 @@ mod tests {
     //     ClairS::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
     // }
 
-    #[test]
-    fn nanomonsv() -> anyhow::Result<()> {
-        init();
-        let id = "HAMROUNE";
-        NanomonSV::initialize(id, Config::default())?.run()
-    }
-
-    #[test]
-    fn nanomonsv_version() -> anyhow::Result<()> {
-        init();
-        let v = NanomonSV::version(&Config::default())?;
-        println!("NanomonSV version: {v}");
-        let v = DeepVariant::version(&Config::default())?;
-        println!("DeepVariant version: {v}");
-        let v = Savana::version(&Config::default())?;
-        println!("Savana version: {v}");
-        let v = DeepSomatic::version(&Config::default())?;
-        println!("DeepSomatic version: {v}");
-        let v = ClairS::version(&Config::default())?;
-        println!("ClairS version: {v}");
-        Ok(())
-    }
-
-    #[test]
-    fn nanomonsv_solo() -> anyhow::Result<()> {
-        init();
-        NanomonSVSolo::initialize("LAKHDHAR", "diag", Config::default())?.run()
-    }
+    // #[test]
+    // fn nanomonsv() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "HAMROUNE";
+    //     NanomonSV::initialize(id, Config::default())?.run()
+    // }
+    //
+    // #[test]
+    // fn nanomonsv_version() -> anyhow::Result<()> {
+    //     init();
+    //     let v = NanomonSV::version(&Config::default())?;
+    //     println!("NanomonSV version: {v}");
+    //     let v = DeepVariant::version(&Config::default())?;
+    //     println!("DeepVariant version: {v}");
+    //     let v = Savana::version(&Config::default())?;
+    //     println!("Savana version: {v}");
+    //     let v = DeepSomatic::version(&Config::default())?;
+    //     println!("DeepSomatic version: {v}");
+    //     let v = ClairS::version(&Config::default())?;
+    //     println!("ClairS version: {v}");
+    //     Ok(())
+    // }
+    //
+    // #[test]
+    // fn nanomonsv_solo() -> anyhow::Result<()> {
+    //     init();
+    //     NanomonSVSolo::initialize("LAKHDHAR", "diag", Config::default())?.run()
+    // }
 
     #[test]
     fn run_assemblers() -> anyhow::Result<()> {
@@ -367,28 +348,28 @@ mod tests {
     //     Ok(())
     // }
 
-    #[test]
-    fn run_severus() -> anyhow::Result<()> {
-        init();
-        Severus::initialize("BANGA", Config::default())?.run()
-    }
-
-    #[test]
-    fn run_severus_solo() -> anyhow::Result<()> {
-        init();
-        SeverusSolo::initialize("LAKHDHAR", "diag", Config::default())?.run()
-    }
-
-    #[test]
-    fn check_versions() -> anyhow::Result<()> {
-        init();
-        let config = Config::default();
-        let v = Savana::version(&config)?;
-        info!("Savanna version {v}");
-        let v = Severus::version(&config)?;
-        info!("Severus version {v}");
-        Ok(())
-    }
+    // #[test]
+    // fn run_severus() -> anyhow::Result<()> {
+    //     init();
+    //     Severus::initialize("BANGA", Config::default())?.run()
+    // }
+    //
+    // #[test]
+    // fn run_severus_solo() -> anyhow::Result<()> {
+    //     init();
+    //     SeverusSolo::initialize("LAKHDHAR", "diag", Config::default())?.run()
+    // }
+    //
+    // #[test]
+    // fn check_versions() -> anyhow::Result<()> {
+    //     init();
+    //     let config = Config::default();
+    //     let v = Savana::version(&config)?;
+    //     info!("Savanna version {v}");
+    //     let v = Severus::version(&config)?;
+    //     info!("Severus version {v}");
+    //     Ok(())
+    // }
 
     // #[test]
     // fn run_deepvariant() -> anyhow::Result<()> {
@@ -396,38 +377,38 @@ mod tests {
     //     DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run()
     // }
 
-    #[test]
-    fn run_clairs() -> anyhow::Result<()> {
-        init();
-        ClairS::initialize("ADJAGBA", Config::default())?.run()
-    }
-
-    #[test]
-    fn run_longphase() -> anyhow::Result<()> {
-        init();
-        let id = "BECERRA";
-        let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
-        let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/clair3_normal_tumoral_germline_output.vcf.gz");
-        let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1.bam");
-
-        LongphaseHap::new(id, &diag_bam, &vcf, LongphaseConfig::default()).run()?;
-        LongphaseHap::new(id, &mrd_bam, &vcf, LongphaseConfig::default()).run()
-    }
+    // #[test]
+    // fn run_clairs() -> anyhow::Result<()> {
+    //     init();
+    //     ClairS::initialize("ADJAGBA", Config::default())?.run()
+    // }
 
-    #[test]
-    fn run_longphase_modcall() -> anyhow::Result<()> {
-        init();
-        let id = "ADJAGBA";
-        let time = "diag";
-        LongphaseModcallSolo::initialize(id, time, Config::default())?.run()
-    }
+    // #[test]
+    // fn run_longphase() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "BECERRA";
+    //     let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
+    //     let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/clair3_normal_tumoral_germline_output.vcf.gz");
+    //     let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1.bam");
+    //
+    //     LongphaseHap::new(id, &diag_bam, &vcf, LongphaseConfig::default()).run()?;
+    //     LongphaseHap::new(id, &mrd_bam, &vcf, LongphaseConfig::default()).run()
+    // }
 
-    #[test]
-    fn run_longphase_phase() -> anyhow::Result<()> {
-        init();
-        let id = "ADJAGBA";
-        LongphasePhase::initialize(id, Config::default())?.run()
-    }
+    // #[test]
+    // fn run_longphase_modcall() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "ADJAGBA";
+    //     let time = "diag";
+    //     LongphaseModcallSolo::initialize(id, time, Config::default())?.run()
+    // }
+    //
+    // #[test]
+    // fn run_longphase_phase() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "ADJAGBA";
+    //     LongphasePhase::initialize(id, Config::default())?.run()
+    // }
 
     #[test]
     fn snv_parse() -> anyhow::Result<()> {
@@ -787,49 +768,49 @@ mod tests {
     }
 
     #[test]
-    fn variant_load_deepvariant() -> anyhow::Result<()> {
-        init();
-        let id = "ADJAGBA";
-        let time = "diag";
-        let dv = DeepVariant::initialize(id, time, Config::default())?;
-        let annotations = Annotations::default();
-        let variant_collection = dv.variants(&annotations)?;
-        println!(
-            "Deepvariant for {id} {time}: variants {} {}",
-            variant_collection.variants.len(),
-            variant_collection.vcf.n_variants
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn variant_load_clairs() -> anyhow::Result<()> {
-        init();
-        let id = "ELKLIFI";
-        let clairs = NanomonSV::initialize(id, Config::default())?;
-        let u = clairs.should_run();
-        info!("should_run {u:?}");
-        // let annotations = Annotations::default();
-        // let variant_collection = clairs.variants(&annotations)?;
-        // println!("ClairS for {id}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
-        Ok(())
-    }
+    // fn variant_load_deepvariant() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "ADJAGBA";
+    //     let time = "diag";
+    //     let dv = DeepVariant::initialize(id, time, Config::default())?;
+    //     let annotations = Annotations::default();
+    //     let variant_collection = dv.variants(&annotations)?;
+    //     println!(
+    //         "Deepvariant for {id} {time}: variants {} {}",
+    //         variant_collection.variants.len(),
+    //         variant_collection.vcf.n_variants
+    //     );
+    //     Ok(())
+    // }
 
-    #[test]
-    fn variant_load_nanomonsv() -> anyhow::Result<()> {
-        init();
-        let id = "ADJAGBA";
-        let nanomonsv = NanomonSV::initialize(id, Config::default())?;
-        let annotations = Annotations::default();
-        let variant_collection = nanomonsv.variants(&annotations)?;
-        println!(
-            "NanomonSV for {id}: variants {} {}",
-            variant_collection.variants.len(),
-            variant_collection.vcf.n_variants
-        );
-        println!("{:?}", variant_collection.variants.first());
-        Ok(())
-    }
+    // #[test]
+    // fn variant_load_clairs() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "ELKLIFI";
+    //     let clairs = NanomonSV::initialize(id, Config::default())?;
+    //     let u = clairs.should_run();
+    //     info!("should_run {u:?}");
+    //     // let annotations = Annotations::default();
+    //     // let variant_collection = clairs.variants(&annotations)?;
+    //     // println!("ClairS for {id}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
+    //     Ok(())
+    // }
+    //
+    // #[test]
+    // fn variant_load_nanomonsv() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "ADJAGBA";
+    //     let nanomonsv = NanomonSV::initialize(id, Config::default())?;
+    //     let annotations = Annotations::default();
+    //     let variant_collection = nanomonsv.variants(&annotations)?;
+    //     println!(
+    //         "NanomonSV for {id}: variants {} {}",
+    //         variant_collection.variants.len(),
+    //         variant_collection.vcf.n_variants
+    //     );
+    //     println!("{:?}", variant_collection.variants.first());
+    //     Ok(())
+    // }
 
     // #[test]
     // fn variant_load_clairs_germline() -> anyhow::Result<()> {
@@ -841,7 +822,6 @@ mod tests {
     //     println!("ClairS for {id}: variants {} {}", germline_variant_collection.variants.len(), germline_variant_collection.vcf.n_variants);
     //     Ok(())
     // }
-
     #[test]
     fn overlaps() {
         init();
@@ -1020,17 +1000,17 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn savana_cn() -> anyhow::Result<()> {
-        init();
-        let id = "CAMARA";
-        // let s = SavanaCopyNumber::load_id(id, Config::default())?;
-        let s = SavanaReadCounts::load_id(id, Config::default())?;
-        println!("tumoral reads: {}", s.n_tumoral_reads());
-        println!("normal reads: {}", s.n_normal_reads());
-        println!("tumoral:\n{:#?}", s.norm_chr_counts());
-        Ok(())
-    }
+    // #[test]
+    // fn savana_cn() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "CAMARA";
+    //     // let s = SavanaCopyNumber::load_id(id, Config::default())?;
+    //     let s = SavanaReadCounts::load_id(id, Config::default())?;
+    //     println!("tumoral reads: {}", s.n_tumoral_reads());
+    //     println!("normal reads: {}", s.n_normal_reads());
+    //     println!("tumoral:\n{:#?}", s.norm_chr_counts());
+    //     Ok(())
+    // }
 
     #[test]
     fn load_bam() -> anyhow::Result<()> {
@@ -1049,20 +1029,20 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn somatic_cases() -> anyhow::Result<()> {
-        init();
-        let id = "PASSARD";
-        let config = Config {
-            somatic_pipe_force: true,
-            ..Default::default()
-        };
-        match SomaticPipe::initialize(id, config)?.run() {
-            Ok(_) => (),
-            Err(e) => error!("{id} {e}"),
-        };
-        Ok(())
-    }
+    // #[test]
+    // fn somatic_cases() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "PASSARD";
+    //     let config = Config {
+    //         somatic_pipe_force: true,
+    //         ..Default::default()
+    //     };
+    //     match SomaticPipe::initialize(id, config)?.run() {
+    //         Ok(_) => (),
+    //         Err(e) => error!("{id} {e}"),
+    //     };
+    //     Ok(())
+    // }
 
     #[test]
     fn load_variants() -> anyhow::Result<()> {
@@ -1174,41 +1154,41 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn constit_stats() {
-        init();
-        let id = "ADJAGBA";
-        let config = Config::default();
-
-        let _ = const_stats(id.to_string(), config);
-    }
-
-    #[test]
-    fn test_bnd() -> anyhow::Result<()> {
-        init();
-        let id = "COIFFET";
-        let config = Config::default();
+    // #[test]
+    // fn constit_stats() {
+    //     init();
+    //     let id = "ADJAGBA";
+    //     let config = Config::default();
+    //
+    //     let _ = const_stats(id.to_string(), config);
+    // }
 
-        let annotations = Annotations::default();
-        let s = Savana::initialize(id, config)?.variants(&annotations)?;
-        s.variants.iter().for_each(|e| {
-            if let Ok(bnd) = e.bnd_desc() {
-                println!("{}\t{}\t{}", e.position, e.reference, e.alternative);
-                println!("{:#?}", bnd);
-            }
-        });
-        Ok(())
-    }
+    // #[test]
+    // fn test_bnd() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "COIFFET";
+    //     let config = Config::default();
+    //
+    //     let annotations = Annotations::default();
+    //     let s = Savana::initialize(id, config)?.variants(&annotations)?;
+    //     s.variants.iter().for_each(|e| {
+    //         if let Ok(bnd) = e.bnd_desc() {
+    //             println!("{}\t{}\t{}", e.position, e.reference, e.alternative);
+    //             println!("{:#?}", bnd);
+    //         }
+    //     });
+    //     Ok(())
+    // }
 
-    #[test]
-    fn parse_savana_seg() {
-        init();
-        let r = SavanaCN::parse_file("ADJAGBA", &config::Config::default())
-            .unwrap()
-            .segments;
-        println!("{} lines", r.len());
-        println!("{:#?}", r.first().unwrap());
-    }
+    // #[test]
+    // fn parse_savana_seg() {
+    //     init();
+    //     let r = SavanaCN::parse_file("ADJAGBA", &config::Config::default())
+    //         .unwrap()
+    //         .segments;
+    //     println!("{} lines", r.len());
+    //     println!("{:#?}", r.first().unwrap());
+    // }
 
     #[test]
     fn whole_scan() -> anyhow::Result<()> {

+ 1 - 1
src/pipes/mod.rs

@@ -2,7 +2,7 @@ use std::path::Path;
 
 use crate::{config::Config};
 
-pub mod somatic;
+// pub mod somatic;
 pub mod somatic_slurm;
 
 

+ 16 - 34
src/pipes/somatic_slurm.rs

@@ -1,18 +1,15 @@
 use anyhow::{bail, Context};
-use log::{error, info, warn};
+use log::{info, warn};
 use std::{collections::HashMap, fs, path::PathBuf};
 use uuid::Uuid;
 
 use crate::{
     collection::{flowcells::IdInput, pod5::Pod5sRun},
     commands::{
-        dorado::{DoradoAlign, DoradoBasecall},
-        run_many_sbatch,
-        samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSort, SamtoolsSplit},
-        SlurmRunner,
+        SlurmRunner, dorado::{DoradoAlign, DoradoBasecall}, run_many_sbatch, samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSort, SamtoolsSplit}
     },
     config::Config,
-    helpers::{extract_barcode, list_files_with_ext},
+    helpers::{TempDirGuard, extract_barcode, list_files_with_ext},
 };
 
 /// Normalize barcode strings for matching.
@@ -80,34 +77,6 @@ fn collect_bams_recursively(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
     Ok(bams)
 }
 
-// A RAII (Resource Acquisition Is Initialization) guard for cleaning up temporary directories.
-pub struct TempDirGuard {
-    path: PathBuf,
-}
-
-impl TempDirGuard {
-    pub fn new(path: PathBuf) -> Self {
-        Self { path }
-    }
-}
-
-// The core cleanup logic: executed automatically when the TempDirGuard is dropped.
-impl Drop for TempDirGuard {
-    fn drop(&mut self) {
-        if self.path.exists() {
-            // Log the attempt. Using eprintln/warn! ensures the error is noted,
-            // but the program does not panic.
-            if let Err(e) = fs::remove_dir_all(&self.path) {
-                error!(
-                    "Failed to remove temporary directory '{}': {}",
-                    self.path.display(),
-                    e
-                );
-            }
-        }
-    }
-}
-
 /// Orchestrates the full Nanopore sequencing pipeline: $\text{POD5} \rightarrow \text{Unaligned BAM} \rightarrow \text{Aligned, Merged, Sorted, and Indexed BAM}$ per case.
 ///
 /// This function manages the entire process for a single sequencing run, employing a six-step plan
@@ -527,6 +496,19 @@ pub fn basecall_align_split(
 
         for bam in aligned_bams {
             if final_bam_path.exists() {
+                let mut index_cmd = SamtoolsIndex {
+                    bin: config.align.samtools_bin.clone(),
+                    threads: config.align.samtools_view_threads,
+                    bam: bam.to_string_lossy().to_string(),
+                };
+                SlurmRunner::run(&mut index_cmd)?;
+                let mut index_cmd = SamtoolsIndex {
+                    bin: config.align.samtools_bin.clone(),
+                    threads: config.align.samtools_view_threads,
+                    bam: final_bam_path.to_string_lossy().to_string(),
+                };
+                SlurmRunner::run(&mut index_cmd)?;
+
                 // Merge into existing - clean_up() removes source bam
                 let mut merge_cmd = SamtoolsMerge::from_config(config, bam, &final_bam_path);
                 SlurmRunner::run(&mut merge_cmd)?;

Some files were not shown because too many files changed in this diff