Browse Source

ClairS slurm

Thomas 2 weeks ago
parent
commit
0409a892e8

+ 11 - 8
pandora-config.example.toml

@@ -13,6 +13,9 @@ result_dir = "/mnt/beegfs02/scratch/t_steimle/data/wgs"
 # Temporary directory.
 tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 
+# Singularity bin
+singularity_bin = "module load singularity-ce && singularity"
+
 # Temporary directory used when unarchiving input data.
 unarchive_tmp_dir = "/data/unarchived"
 
@@ -156,7 +159,7 @@ deepvariant_output_dir = "{result_dir}/{id}/{time}/DeepVariant"
 deepvariant_threads = 40
 
 # DeepVariant singularity image path
-deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/tools/"
+deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/deepvariant_latest-gpu.sif"
 
 # DeepVariant version / image tag.
 deepvariant_bin_version = "1.9.0"
@@ -177,7 +180,7 @@ deepvariant_force = false
 deepsomatic_output_dir = "{result_dir}/{id}/{time}/DeepSomatic"
 
 # Threads for DeepSomatic.
-deepsomatic_threads = 150
+deepsomatic_threads = 40
 
 # DeepSomatic version / image tag.
 deepsomatic_bin_version = "1.9.0"
@@ -194,10 +197,10 @@ deepsomatic_force = false
 #######################################
 
 # Threads for ClairS.
-clairs_threads = 155
+clairs_threads = 40
 
 # ClairS docker tag.
-clairs_docker_tag = "latest"
+clairs_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/clairs_latest.sif"
 
 # Force ClairS recomputation.
 clairs_force = false
@@ -380,14 +383,14 @@ ref_mmi = "/mnt/beegfs02/scratch/t_steimle/ref/chm13v2.0.mmi"
 samtools_bin = "/mnt/beegfs02/scratch/t_steimle/tools/samtools"
 
 # Threads for `samtools view`.
-samtools_view_threads = 20
+samtools_view_threads = 40
 
 # Threads for `samtools sort`.
-samtools_sort_threads = 20
+samtools_sort_threads = 40
 
 # Threads for `samtools merge`.
-samtools_merge_threads = 20
+samtools_merge_threads = 40
 
 # Threads for `samtools split`.
-samtools_split_threads = 20
+samtools_split_threads = 40
 

File diff suppressed because it is too large
+ 5 - 0
slurm-2435990.out


File diff suppressed because it is too large
+ 5 - 0
slurm-2435994.out


+ 3 - 0
slurm-2435995.out

@@ -0,0 +1,3 @@
+[2025-11-28 19:36:41.093] [info] Running: "aligner" "--threads" "20" "--allow-sec-supp" "--mm2-opts" "--secondary yes" "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa" "/mnt/beegfs02/scratch/t_steimle/prom_runs/20251107_OL_001_A-B/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/bam_pass/barcode04/PBI55810_pass_barcode04_22582b29_d02c5bb8_0.bam"
+[2025-11-28 19:36:41.102] [info] num input files: 1
+[2025-11-28 19:36:41.102] [info] > loading index /mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa

+ 3 - 0
slurm-2435996.out

@@ -0,0 +1,3 @@
+[2025-11-28 19:36:45.312] [info] Running: "aligner" "--threads" "20" "--allow-sec-supp" "--mm2-opts" "--secondary yes" "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa" "/mnt/beegfs02/scratch/t_steimle/prom_runs/20251107_OL_001_A-B/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/bam_pass/barcode02/PBI55810_pass_barcode02_22582b29_d02c5bb8_8.bam"
+[2025-11-28 19:36:45.321] [info] num input files: 1
+[2025-11-28 19:36:45.321] [info] > loading index /mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa

+ 3 - 0
slurm-2435997.out

@@ -0,0 +1,3 @@
+[2025-11-28 19:36:53.325] [info] Running: "aligner" "--threads" "20" "--allow-sec-supp" "--mm2-opts" "--secondary yes" "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa" "/mnt/beegfs02/scratch/t_steimle/prom_runs/20251107_OL_001_A-B/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/bam_pass/barcode04/PBI55810_pass_barcode04_22582b29_d02c5bb8_7.bam"
+[2025-11-28 19:36:53.347] [info] num input files: 1
+[2025-11-28 19:36:53.347] [info] > loading index /mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa

+ 3 - 0
slurm-2435998.out

@@ -0,0 +1,3 @@
+[2025-11-28 19:37:26.774] [info] Running: "aligner" "--threads" "20" "--allow-sec-supp" "--mm2-opts" "--secondary yes" "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa" "/mnt/beegfs02/scratch/t_steimle/prom_runs/20251107_OL_001_A-B/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/bam_pass/barcode04/PBI55810_pass_barcode04_22582b29_d02c5bb8_11.bam"
+[2025-11-28 19:37:26.782] [info] num input files: 1
+[2025-11-28 19:37:26.782] [info] > loading index /mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa

+ 0 - 0
slurm-2435999.out


+ 0 - 0
slurm-2436000.out


+ 331 - 176
src/callers/clairs.rs

@@ -1,55 +1,51 @@
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::bcftools::{BcftoolsConfig, bcftools_concat, bcftools_keep_pass}, config::Config, helpers::{is_file_older, remove_dir_if_exists, temp_file_path}, io::vcf::read_vcf, pipes::{Initialize, ShouldRun, Version}, runners::{DockerRun, Run, run_wait}, variant::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
+    collection::vcf::Vcf,
+    commands::{
+        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        CapturedOutput, Command as JobCommand, Runner as LocalRunner, SbatchRunner, SlurmParams,
+        SlurmRunner,
+    },
+    config::Config,
+    helpers::{is_file_older, remove_dir_if_exists, temp_file_path},
+    io::vcf::read_vcf,
+    pipes::{Initialize, ShouldRun, Version},
+    runners::Run,
+    variant::{
         variant::{Label, Variants},
         variant_collection::VariantCollection,
-    }
+    },
 };
-use anyhow::{Context, Ok};
+
+use anyhow::Context;
 use log::{debug, info, warn};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use regex::Regex;
 use std::{
     fs,
     path::Path,
-    process::{Command, Stdio},
+    process::{Command as ProcessCommand, Stdio},
 };
 
 /// A pipeline runner for executing ClairS on paired tumor and normal samples.
 ///
 /// ClairS is a somatic variant caller that uses haplotype tagging from LongPhase.
-/// This struct manages:
-/// - Dockerized execution of the ClairS pipeline
-/// - Handling and filtering of output VCFs
-/// - Logging and diagnostic tracking
-/// - Integration with variant annotation workflows
-///
-/// # References
-/// - ClairS: <https://github.com/HKU-BAL/ClairS>
+/// This struct supports:
+/// - Local execution via `run_local`
+/// - Slurm execution via `run_sbatch`
+/// - Optional region restriction via `-r` (for downstream batched runners)
+/// - bcftools post-processing (germline + somatic PASS)
 #[derive(Debug, Clone)]
 pub struct ClairS {
     pub id: String,
     pub config: Config,
     pub log_dir: String,
+    /// Optional list of regions passed as repeated `-r REGION` args.
+    /// When empty, ClairS runs genome-wide.
+    pub regions: Vec<String>,
 }
 
 impl Initialize for ClairS {
-    /// Initializes the ClairS runner.
-    ///
-    /// This method constructs a [`ClairS`] instance with logging and configuration setup,
-    /// and ensures the output directory is cleaned up if the results are outdated or force execution is enabled.
-    ///
-    /// # Arguments
-    /// * `id` - The identifier for the sample being analyzed.
-    /// * `config` - Pipeline-wide configuration object containing paths, resources, and settings.
-    ///
-    /// # Returns
-    /// A fully initialized [`ClairS`] instance ready for execution.
-    ///
-    /// # Errors
-    /// Returns an error if the output directory fails to be removed when necessary.
-    ///
-    /// If the output VCF already exists and is not outdated, initialization skips deletion.
-    /// Otherwise, the output directory is cleared for a fresh run.
     fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
         let id = id.to_string();
 
@@ -60,6 +56,7 @@ impl Initialize for ClairS {
             id,
             log_dir,
             config,
+            regions: Vec::new(),
         };
 
         if clairs.config.clairs_force {
@@ -84,172 +81,287 @@ impl ShouldRun for ClairS {
     }
 }
 
+/* ---------------- JobCommand / LocalRunner / SbatchRunner ---------------- */
+
+impl JobCommand for ClairS {
+    fn init(&mut self) -> anyhow::Result<()> {
+        let output_dir = self.config.clairs_output_dir(&self.id);
+
+        fs::create_dir_all(&output_dir)
+            .with_context(|| format!("Failed create dir: {output_dir}"))?;
+
+        fs::create_dir_all(&self.log_dir)
+            .with_context(|| format!("Failed create dir: {}", self.log_dir))?;
+
+        Ok(())
+    }
+
+    fn cmd(&self) -> String {
+        let output_dir = self.config.clairs_output_dir(&self.id);
+
+        // Build repeated -r REGION args if any regions were set (for batched runs)
+        let region_args = if self.regions.is_empty() {
+            String::new()
+        } else {
+            self.regions
+                .iter()
+                .map(|r| format!("-r {r}"))
+                .collect::<Vec<_>>()
+                .join(" ")
+        };
+
+        let sample_name = format!("{}_diag", self.id);
+
+        format!(
+            "{singularity_bin} exec \
+             --bind /data:/data \
+             --bind {output_dir}:{output_dir} \
+             {image} \
+             /opt/bin/run_clairs \
+             -T {tumor_bam} \
+             -N {normal_bam} \
+             -R {reference} \
+             -t {threads} \
+             -p {platform} \
+             --enable_indel_calling \
+             --include_all_ctgs \
+             --print_germline_calls \
+             --enable_clair3_germline_output \
+             --use_longphase_for_intermediate_haplotagging true \
+             --output_dir {output_dir} \
+             -s {sample_name} \
+             {region_args}",
+            singularity_bin = self.config.singularity_bin,
+            image = self.config.clairs_image,
+            tumor_bam = self.config.tumoral_bam(&self.id),
+            normal_bam = self.config.normal_bam(&self.id),
+            reference = self.config.reference,
+            threads = self.config.clairs_threads,
+            platform = self.config.clairs_platform,
+            output_dir = output_dir,
+            sample_name = sample_name,
+            region_args = region_args,
+        )
+    }
+}
+
+impl LocalRunner for ClairS {
+    // default shell() is fine ("bash")
+}
+
+impl SbatchRunner for ClairS {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some(format!("clairs_{}", self.id)),
+            cpus_per_task: Some(self.config.clairs_threads as u32),
+            mem: Some("60G".into()),         // tune as needed
+            partition: Some("batch".into()), // CPU partition, no GPU
+            gres: None,                      // ClairS does not use GPU
+        }
+    }
+
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        Vec::new()
+    }
+}
+
 impl Run for ClairS {
-    /// Executes the ClairS variant calling pipeline and post-processes its output.
-    ///
-    /// # Pipeline Overview
-    /// - Runs ClairS in a Docker container using paired tumor and normal BAMs
-    /// - Generates both somatic and germline variant VCFs
-    /// - Applies bcftools filtering to keep only PASS variants
-    /// - Concatenates separate VCFs (e.g., SNPs and INDELs) into a single somatic file
-    /// - Tracks all operations via logs saved to disk
-    ///
-    /// # Errors
-    /// Returns an error if:
-    /// - Docker execution fails
-    /// - bcftools fails to process or filter VCFs
-    /// - Temporary files can't be removed or written
     fn run(&mut self) -> anyhow::Result<()> {
-        // Run Docker command if output VCF doesn't exist
-        let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
-        if !Path::new(&output_vcf).exists() || !Path::new(&output_indels_vcf).exists() {
-            let output_dir = self.config.clairs_output_dir(&self.id);
-            fs::create_dir_all(&output_dir)
-                .map_err(|e| anyhow::anyhow!("Failed create dir: {output_dir}.\n{e}"))?;
-
-            let mut docker_run = DockerRun::new(&[
-                "run",
-                "-d",
-                "-v",
-                "/data:/data",
-                "-v",
-                &format!("{output_dir}:{output_dir}"),
-                &format!("hkubal/clairs:{}", self.config.clairs_docker_tag),
-                "/opt/bin/run_clairs",
-                "-T",
-                &self.config.tumoral_bam(&self.id),
-                "-N",
-                &self.config.normal_bam(&self.id),
-                "-R",
-                &self.config.reference,
-                "-t",
-                &self.config.clairs_threads.to_string(),
-                "-p",
-                &self.config.clairs_platform,
-                "--enable_indel_calling",
-                "--include_all_ctgs",
-                "--print_germline_calls",
-                "--enable_clair3_germline_output",
-                "--use_longphase_for_intermediate_haplotagging",
-                "true",
-                "--output_dir",
-                &output_dir,
-                "-s",
-                &format!("{}_diag", self.id),
-            ]);
-            let report = run_wait(&mut docker_run)
-                .map_err(|e| anyhow::anyhow!("Failed to run ClairS for {}.\n{e}", &self.id))?;
-
-            let log_file = format!("{}/clairs_", self.log_dir);
+        self.run_local()?;
+        Ok(())
+    }
+}
+
+/* ---------------- Post-processing helpers (germline + somatic) ----------- */
+
+impl ClairS {
+    fn postprocess_local(&self) -> anyhow::Result<()> {
+        // Germline PASS
+        let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
+        if !Path::new(&clair3_germline_passed).exists() {
+            let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
+
+            let mut cmd = BcftoolsKeepPass::from_config(
+                &self.config,
+                clair3_germline_normal,
+                clair3_germline_passed.clone(),
+            );
+            let report = <BcftoolsKeepPass as LocalRunner>::run(&mut cmd).with_context(|| {
+                format!(
+                    "Failed to run `bcftools keep PASS` for {}.",
+                    clair3_germline_passed
+                )
+            })?;
+
+            let log_file = format!("{}/bcftools_germline_pass_", self.log_dir);
             report
                 .save_to_file(&log_file)
-                .map_err(|e| anyhow::anyhow!("Error while writing logs into {log_file}.\n{e}"))?;
+                .with_context(|| format!("Error while writing logs into {log_file}"))?;
         } else {
             debug!(
-                "ClairS output VCF already exists for {}, skipping execution.",
+                "ClairS Germline PASSED VCF already exists for {}, skipping.",
                 self.id
             );
         }
 
-        // Germline PASS
-        let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
-        if !Path::new(&clair3_germline_passed).exists() {
-            let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
-            // let clair3_germline_tumor = self.config.clairs_germline_tumor_vcf(&self.id);
-
-            let report = bcftools_keep_pass(
-                &clair3_germline_normal,
-                &clair3_germline_passed,
-                BcftoolsConfig::default(),
-            )
-            .map_err(|e| {
-                anyhow::anyhow!(
-                    "Failed to run `bcftools keep PASS` for {}.\n{e}",
-                    &clair3_germline_passed
+        // Somatic concat + PASS
+        let passed_vcf = self.config.clairs_passed_vcf(&self.id);
+        if !Path::new(&passed_vcf).exists() {
+            let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
+
+            let tmp_file = temp_file_path(".vcf.gz")?.to_str().unwrap().to_string();
+
+            let mut concat = BcftoolsConcat::from_config(
+                &self.config,
+                vec![output_vcf.clone(), output_indels_vcf.clone()],
+                &tmp_file,
+            );
+            let report = <BcftoolsConcat as LocalRunner>::run(&mut concat).with_context(|| {
+                format!(
+                    "Failed to run bcftools concat for {} and {}.",
+                    output_vcf, output_indels_vcf
                 )
             })?;
 
+            let log_file = format!("{}/bcftools_concat_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .with_context(|| format!("Error while writing logs into {log_file}"))?;
+
+            let mut keep_pass =
+                BcftoolsKeepPass::from_config(&self.config, tmp_file.clone(), passed_vcf.clone());
+            let report =
+                <BcftoolsKeepPass as LocalRunner>::run(&mut keep_pass).with_context(|| {
+                    format!("Error while running bcftools keep PASS for {}.", output_vcf)
+                })?;
+
             let log_file = format!("{}/bcftools_pass_", self.log_dir);
             report
                 .save_to_file(&log_file)
-                .map_err(|e| anyhow::anyhow!("Error while writing logs into {log_file}.\n{e}"))?;
+                .with_context(|| format!("Error while writing logs into {log_file}"))?;
 
-            // fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
+            fs::remove_file(&tmp_file)
+                .with_context(|| format!("Failed to remove temporary file {tmp_file}"))?;
         } else {
             debug!(
-                "ClairS Germline PASSED VCF already exists for {}, skipping execution.",
+                "ClairS PASSED VCF already exists for {}, skipping.",
                 self.id
             );
         }
 
-        let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
+        Ok(())
+    }
+
+    fn postprocess_sbatch(&self) -> anyhow::Result<()> {
+        // Germline PASS via Slurm
+        let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
+        if !Path::new(&clair3_germline_passed).exists() {
+            let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
+
+            let mut cmd = BcftoolsKeepPass::from_config(
+                &self.config,
+                clair3_germline_normal,
+                clair3_germline_passed.clone(),
+            );
+            let report = SlurmRunner::run(&mut cmd)
+                .context("Failed to run `bcftools keep PASS` on Slurm")?;
+
+            let log_file = format!("{}/bcftools_germline_pass_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context("Error while writing logs")?;
+        } else {
+            debug!(
+                "ClairS Germline PASSED VCF already exists for {}, skipping.",
+                self.id
+            );
+        }
+
+        // Somatic concat + PASS via Slurm
+        let passed_vcf = self.config.clairs_passed_vcf(&self.id);
         if !Path::new(&passed_vcf).exists() {
-            // Concat output and indels
+            let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
+
             let tmp_file = temp_file_path(".vcf.gz")?.to_str().unwrap().to_string();
-            let report = bcftools_concat(
-                vec![output_vcf.to_string(), output_indels_vcf.to_string()],
+
+            let mut concat = BcftoolsConcat::from_config(
+                &self.config,
+                vec![output_vcf.clone(), output_indels_vcf.clone()],
                 &tmp_file,
-                BcftoolsConfig::default(),
-            )
-            .map_err(|e| {
-                anyhow::anyhow!(
-                    "Failed to run bcftools concat for {} and {}.\n{e}",
-                    &output_vcf,
-                    &output_indels_vcf
-                )
-            })?;
+            );
+            let report = SlurmRunner::run(&mut concat)
+                .context("Failed to run bcftools concat for ClairS somatic on Slurm")?;
+
             let log_file = format!("{}/bcftools_concat_", self.log_dir);
             report
                 .save_to_file(&log_file)
-                .map_err(|e| anyhow::anyhow!("Error while writing logs into {log_file}\n{e}"))?;
-
-            let report = bcftools_keep_pass(&tmp_file, passed_vcf, BcftoolsConfig::default())
-                .map_err(|e| {
-                    anyhow::anyhow!(
-                        "Error while running bcftools keep PASS for {}.\n{e}",
-                        &output_vcf
-                    )
-                })?;
+                .context("Error while writing concat logs")?;
+
+            let mut keep_pass =
+                BcftoolsKeepPass::from_config(&self.config, tmp_file.clone(), passed_vcf.clone());
+            let report = SlurmRunner::run(&mut keep_pass)
+                .context("Failed to run bcftools keep PASS for ClairS somatic on Slurm")?;
 
             let log_file = format!("{}/bcftools_pass_", self.log_dir);
             report
                 .save_to_file(&log_file)
-                .map_err(|e| anyhow::anyhow!("Error while writing logs into {log_file}.\n{e}"))?;
+                .context("Error while writing PASS logs")?;
 
-            fs::remove_file(&tmp_file)
-                .map_err(|e| anyhow::anyhow!("Failed to remove temporary file {tmp_file}.\n{e}"))?;
+            fs::remove_file(&tmp_file).context("Failed to remove temporary merged VCF")?;
         } else {
             debug!(
-                "ClairS PASSED VCF already exists for {}, skipping execution.",
+                "ClairS PASSED VCF already exists for {}, skipping.",
                 self.id
             );
         }
 
         Ok(())
     }
+
+    /// Local execution: runs ClairS via `LocalRunner` then post-processes VCFs locally.
+    pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
+        if !self.should_run() {
+            debug!(
+                "ClairS output already up-to-date for {}, skipping.",
+                self.id
+            );
+            return Ok(CapturedOutput::default());
+        }
+
+        info!("Running ClairS locally for {}", self.id);
+        let out = <Self as LocalRunner>::run(self)?;
+
+        self.postprocess_local()?;
+        Ok(out)
+    }
+
+    /// Slurm execution: submits ClairS via sbatch (Docker inside job) then bcftools via Slurm.
+    pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
+        if !self.should_run() {
+            debug!(
+                "ClairS output already up-to-date for {}, skipping.",
+                self.id
+            );
+            return Ok(CapturedOutput::default());
+        }
+
+        info!("Submitting ClairS via sbatch for {}", self.id);
+        let out = <Self as SbatchRunner>::run(self)?;
+
+        self.postprocess_sbatch()?;
+        Ok(out)
+    }
 }
 
+/* ---------------- Variant / Label / Version impls ------------------------ */
+
 impl CallerCat for ClairS {
-    /// Tags this runner as somatic, used for annotation classification.
     fn caller_cat(&self) -> Annotation {
         Annotation::Callers(Caller::ClairS, Sample::Somatic)
     }
 }
 
 impl Variants for ClairS {
-    /// Loads and annotates somatic variants from the ClairS filtered VCF.
-    ///
-    /// This method reads the filtered PASS VCF file generated by ClairS for somatic variants.
-    /// It tags each variant with the ClairS somatic annotation and adds it to the shared `annotations` map.
-    ///
-    /// # Arguments
-    /// * `annotations` - A reference to the global annotations structure used to store variant metadata.
-    ///
-    /// # Returns
-    /// A [`VariantCollection`] with the list of variants, the source VCF file, and the associated caller tag.
-    ///
-    /// # Errors
-    /// Will return an error if the VCF file is unreadable, missing, or malformed.
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let caller = self.caller_cat();
         let add = vec![caller.clone()];
@@ -257,7 +369,7 @@ impl Variants for ClairS {
 
         info!("Loading variants from {caller}: {passed_vcf}");
         let variants = read_vcf(passed_vcf)
-            .map_err(|e| anyhow::anyhow!("Failed to read ClairS VCF {}.\n{e}", passed_vcf))?;
+            .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
 
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
@@ -273,26 +385,12 @@ impl Variants for ClairS {
 }
 
 impl ClairS {
-    /// Loads and annotates germline variants from the ClairS germline output.
-    ///
-    /// This function loads a pre-filtered VCF file containing germline variants called by ClairS.
-    /// It updates the provided `annotations` structure with a tag indicating these are germline variants.
-    ///
-    /// # Arguments
-    /// * `annotations` - A shared annotation structure to update with variant hashes and tags.
-    ///
-    /// # Returns
-    /// A [`VariantCollection`] object containing the loaded variants, associated VCF metadata, and caller category.
-    ///
-    /// # Errors
-    /// Will return an error if the VCF file cannot be read or parsed.
     pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
         let add = vec![caller.clone()];
         let clair3_germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
 
         info!("Loading variants from {caller}: {clair3_germline_passed}");
-
         let variants = read_vcf(clair3_germline_passed)?;
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
@@ -308,36 +406,28 @@ impl ClairS {
 }
 
 impl Label for ClairS {
-    /// Returns the string label for this caller.
     fn label(&self) -> String {
         self.caller_cat().to_string()
     }
 }
 
 impl Version for ClairS {
-    /// Retrieves the ClairS version by running `/opt/bin/run_clairs --version` in its docker environment.
-    ///
-    /// # Errors
-    /// Returns an error if command execution fails or "Version " not found in output.
     fn version(config: &Config) -> anyhow::Result<String> {
-        let out = Command::new("docker")
-            .args([
-                "run",
-                "--rm",
-                "--entrypoint",
-                "/opt/bin/run_clairs",
-                &format!("hkubal/clairs:{}", config.clairs_docker_tag),
-                "--version",
-            ])
+        let out = ProcessCommand::new("bash")
+            .arg("-c")
+            .arg(format!(
+                "{} exec {} /opt/bin/run_clairs --version",
+                config.singularity_bin, config.clairs_image
+            ))
             .stdout(Stdio::piped())
             .stderr(Stdio::piped())
             .output()
-            .context("failed to spawn docker")?;
+            .context("failed to spawn singularity")?;
 
         if !out.status.success() {
             let mut log = String::from_utf8_lossy(&out.stdout).to_string();
             log.push_str(&String::from_utf8_lossy(&out.stderr));
-            anyhow::bail!("docker run failed: {}\n{}", out.status, log);
+            anyhow::bail!("singularity run failed: {}\n{}", out.status, log);
         }
 
         let mut log = String::from_utf8_lossy(&out.stdout).to_string();
@@ -346,7 +436,72 @@ impl Version for ClairS {
         let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)")?;
         let caps = re
             .captures(&log)
-            .context("could not parse DeepSomatic version from output")?;
+            .context("could not parse ClairS version from output")?;
+        Ok(caps.get(1).unwrap().as_str().to_string())
+    }
+
+    /// Slurm: run `/opt/bin/run_clairs --version` inside a small sbatch job.
+    fn version_slurm(config: &Config) -> anyhow::Result<String> {
+        // Minimal Slurm job just to run the version command
+        struct ClairSVersionJob<'a> {
+            config: &'a Config,
+        }
+
+        impl<'a> JobCommand for ClairSVersionJob<'a> {
+            fn cmd(&self) -> String {
+                format!(
+                    "{} exec {} /opt/bin/run_clairs \
+                     --version",
+                    self.config.singularity_bin, self.config.clairs_image
+                )
+            }
+        }
+
+        impl<'a> SlurmRunner for ClairSVersionJob<'a> {
+            fn slurm_args(&self) -> Vec<String> {
+                SlurmParams {
+                    job_name: Some("clairs_version".into()),
+                    partition: Some("shortq".into()), // adjust to your CPU partition
+                    cpus_per_task: Some(1),
+                    mem: Some("10G".into()),
+                    gres: None, // no GPU
+                }
+                .to_args()
+            }
+        }
+
+        let mut job = ClairSVersionJob { config };
+        let out = SlurmRunner::run(&mut job).context("failed to run ClairS --version via Slurm")?;
+
+        // Combine stdout, Slurm epilog (if any), and stderr for parsing
+        let mut log = out.stdout.clone();
+        if let Some(epilog) = &out.slurm_epilog {
+            log.push_str(&epilog.to_string());
+        }
+        log.push_str(&out.stderr);
+
+        let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)")?;
+        let caps = re
+            .captures(&log)
+            .context("could not parse ClairS version from Slurm output")?;
+
         Ok(caps.get(1).unwrap().as_str().to_string())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::helpers::test_init;
+
+    #[test]
+    fn clairs_version() -> anyhow::Result<()> {
+        test_init();
+        let vl = ClairS::version(&Config::default())?;
+        info!("ClairS local version: {vl}");
+        let vs = ClairS::version_slurm(&Config::default())?;
+        info!("ClairS slurm version: {vs}");
+        assert_eq!(vl, vs);
+        Ok(())
+    }
+}

+ 192 - 50
src/callers/deep_variant.rs

@@ -11,7 +11,10 @@ use std::{
 use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     collection::vcf::Vcf,
-    commands::{SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, run_many_sbatch},
+    commands::{
+        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        run_many_sbatch, SlurmRunner,
+    },
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists},
     io::vcf::read_vcf,
@@ -32,21 +35,58 @@ use crate::commands::{
     SlurmParams,
 };
 
-/// A pipeline runner for executing DeepVariant on a single sample and a specific time point (e.g., normal or tumor).
+/// Pipeline runner for executing [DeepVariant](https://github.com/google/deepvariant)
+/// on a single sample at a specific time point (e.g., normal or tumor).
+///
+/// This struct orchestrates variant calling on a BAM file using DeepVariant
+/// in a Singularity/Apptainer containerized environment. It supports:
+///
+/// - **Conditional execution**: Skips re-running if outputs are up-to-date
+/// - **Local or Slurm execution**: Via [`run_local`] or [`run_sbatch`]
+/// - **Quartered parallel execution**: Via [`run_deepvariant_quartered_sbatch_with_merge`]
+/// - **Annotation integration**: Implements [`CallerCat`], [`Variants`], and [`Label`]
+///
+/// # Example
+///
+/// ```no_run
+/// use pandora_lib_promethion::callers::{DeepVariant, Config};
+/// use pandora_lib_promethion::pipes::InitializeSolo;
+///
+/// let config = Config::default();
+/// let mut dv = DeepVariant::initialize("SAMPLE001", "normal", config)?;
 ///
-/// This struct holds all necessary metadata, configuration, and output paths to perform variant calling
-/// on a given BAM file using DeepVariant in a Dockerized environment. It also supports:
-/// - Conditional execution via `should_run`
-/// - Cleanup and preparation via `initialize`
-/// - Log tracking
-/// - Integration into annotation and variant aggregation workflows
+/// if dv.should_run() {
+///     dv.run_local()?;
+/// }
+/// # Ok::<(), anyhow::Error>(())
+/// ```
+///
+/// # Parallelization Strategy
+///
+/// For large genomes, use [`run_deepvariant_quartered_sbatch_with_merge`] which:
+/// 1. Splits chromosomes into N chunks
+/// 2. Submits parallel Slurm jobs
+/// 3. Merges PASS-filtered VCFs via `bcftools concat`
 #[derive(Debug, Clone)]
 pub struct DeepVariant {
+    /// Sample identifier (e.g., "PATIENT_001")
     pub id: String,
+
+    /// Time point label, typically matching [`Config::normal_name`] or [`Config::tumoral_name`]
     pub time_point: String,
+
+    /// Comma-separated list of genomic regions to process (e.g., "chr1,chr2,chr3")
     pub regions: String,
+
+    /// Directory for DeepVariant and bcftools log files
     pub log_dir: String,
+
+    /// Shared pipeline configuration
     pub config: Config,
+
+    /// Optional part index for quartered parallel runs (1-indexed)
+    ///
+    /// When `Some(n)`, output files are suffixed with `.partN` to avoid collisions.
     pub part_index: Option<usize>,
 }
 
@@ -74,7 +114,7 @@ impl InitializeSolo for DeepVariant {
         let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
         let regions = (1..=22)
             .map(|i| format!("chr{i}"))
-            .chain(["chrX", "chrY", "chrMT"].into_iter().map(String::from))
+            .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
             .collect::<Vec<String>>();
 
         let deepvariant = Self {
@@ -145,24 +185,32 @@ impl JobCommand for DeepVariant {
         let output_vcf_path = self.output_vcf_path();
         let log_subdir = format!("{}_{}_DeepVariant_logs", self.id, self.time_point);
         let sample_name = format!("{}_{}", self.id, self.time_point);
+        let output_vcf = format!(
+            "/output/{}",
+            Path::new(&output_vcf_path)
+                .file_name()
+                .unwrap()
+                .to_string_lossy()
+        );
 
         format!(
-            "module load singularity-ce && singularity exec --nv \
+            "{singularity_bin} exec --nv \
             --bind /data:/data \
             --bind {output_dir}:/output \
             {image} \
             /opt/deepvariant/bin/run_deepvariant \
             --model_type={model_type} \
-            --ref {reference} \
-            --reads {bam} \
-            --regions '{regions}' \
+            --ref={reference} \
+            --reads={bam} \
+            --regions='{regions}' \
             --haploid_contigs='chrX,chrY' \
-            --output_vcf {output_vcf} \
+            --output_vcf={output_vcf} \
             --num_shards={threads} \
             --vcf_stats_report=true \
-            --logging_dir /output/{log_dir} \
+            --logging_dir=/output/{log_dir} \
             --dry_run=false \
-            --sample_name {sample_name}",
+            --sample_name={sample_name}",
+            singularity_bin = self.config.singularity_bin,
             output_dir = output_dir,
             image = self.config.deepvariant_image,
             model_type = self.config.deepvariant_model_type,
@@ -170,39 +218,12 @@ impl JobCommand for DeepVariant {
             bam = bam,
             regions = self.regions,
             // we mount output_dir as /output; just rewrite path here:
-            output_vcf = format!(
-                "/output/{}",
-                Path::new(&output_vcf_path)
-                    .file_name()
-                    .unwrap()
-                    .to_string_lossy()
-            ),
+            output_vcf = output_vcf,
             threads = self.config.deepvariant_threads,
             log_dir = log_subdir,
             sample_name = sample_name,
         )
     }
-
-    fn clean_up(&self) -> anyhow::Result<()> {
-        let output_vcf = self.output_vcf_path();
-        let vcf_passed = self.passed_vcf_path();
-
-        if !Path::new(&vcf_passed).exists() {
-            info!(
-                "Filtering PASS variants for {} {} (part: {:?})",
-                self.id, self.time_point, self.part_index
-            );
-
-            let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
-            let report = SlurmRunner::run(&mut cmd)?;
-
-            report
-                .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
-                .context("Can't save bcftools PASS logs")?;
-        }
-
-        Ok(())
-    }
 }
 
 impl LocalRunner for DeepVariant {
@@ -240,6 +261,29 @@ impl SbatchRunner for DeepVariant {
 }
 
 impl DeepVariant {
+    fn filter_pass_local(&self) -> anyhow::Result<()> {
+        let output_vcf = self.output_vcf_path();
+        let vcf_passed = self.passed_vcf_path();
+
+        if Path::new(&vcf_passed).exists() {
+            return Ok(());
+        }
+
+        info!(
+            "Filtering PASS variants locally for {} {} (part: {:?})",
+            self.id, self.time_point, self.part_index
+        );
+
+        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
+        let report = <BcftoolsKeepPass as LocalRunner>::run(&mut cmd)?;
+
+        report
+            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
+            .context("Can't save bcftools PASS logs")?;
+
+        Ok(())
+    }
+
     /// Run DeepVariant directly on the local machine.
     pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
         if !self.should_run() {
@@ -254,7 +298,35 @@ impl DeepVariant {
             "Running DeepVariant locally for {} {}",
             self.id, self.time_point
         );
-        <Self as LocalRunner>::run(self)
+        let out = <Self as LocalRunner>::run(self)?;
+
+        // local bcftools keep PASS
+        self.filter_pass_local()?;
+
+        Ok(out)
+    }
+
+    fn filter_pass_sbatch(&self) -> anyhow::Result<()> {
+        let output_vcf = self.output_vcf_path();
+        let vcf_passed = self.passed_vcf_path();
+
+        if Path::new(&vcf_passed).exists() {
+            return Ok(());
+        }
+
+        info!(
+            "Filtering PASS variants via Slurm for {} {} (part: {:?})",
+            self.id, self.time_point, self.part_index
+        );
+
+        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
+        let report = SlurmRunner::run(&mut cmd)?;
+
+        report
+            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
+            .context("Can't save bcftools PASS logs")?;
+
+        Ok(())
     }
 
     /// Run DeepVariant via sbatch + tailing slurm-<jobid>.out.
@@ -271,7 +343,12 @@ impl DeepVariant {
             "Submitting DeepVariant via sbatch for {} {}",
             self.id, self.time_point
         );
-        <Self as SbatchRunner>::run(self)
+        let out = <Self as SbatchRunner>::run(self)?;
+
+        // Slurm bcftools keep PASS
+        self.filter_pass_sbatch()?;
+
+        Ok(out)
     }
 
     /// Per-part output VCF path.
@@ -379,10 +456,10 @@ impl Label for DeepVariant {
 impl Version for DeepVariant {
     fn version(config: &Config) -> anyhow::Result<String> {
         let out = ProcessCommand::new("bash")
-            .arg("-lc")
+            .arg("-c")
             .arg(format!(
-                "module load singularity-ce && singularity exec {} /opt/deepvariant/bin/run_deepvariant --version",
-                config.deepvariant_image
+                "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
+                config.singularity_bin, config.deepvariant_image
             ))
             .stdout(Stdio::piped())
             .stderr(Stdio::piped())
@@ -404,6 +481,53 @@ impl Version for DeepVariant {
             .context("could not parse DeepVariant version from output")?;
         Ok(caps.get(1).unwrap().as_str().to_string())
     }
+
+    fn version_slurm(config: &Config) -> anyhow::Result<String> {
+        // Minimal Slurm job just to run the version command
+        struct DeepVariantVersionJob<'a> {
+            config: &'a Config,
+        }
+
+        impl<'a> JobCommand for DeepVariantVersionJob<'a> {
+            fn cmd(&self) -> String {
+                format!(
+                    "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
+                    self.config.singularity_bin, self.config.deepvariant_image
+                )
+            }
+        }
+
+        impl<'a> SlurmRunner for DeepVariantVersionJob<'a> {
+            fn slurm_args(&self) -> Vec<String> {
+                SlurmParams {
+                    job_name: Some("deepvariant_version".into()),
+                    partition: Some("gpgpuq".into()),
+                    cpus_per_task: Some(1),
+                    mem: Some("1G".into()),
+                    gres: None,
+                }
+                .to_args()
+            }
+        }
+
+        let mut job = DeepVariantVersionJob { config };
+        let out =
+            SlurmRunner::run(&mut job).context("failed to run DeepVariant --version via Slurm")?;
+
+        // Combine stdout, Slurm epilog (if any), and stderr for parsing
+        let mut log = out.stdout.clone();
+        if let Some(epilog) = &out.slurm_epilog {
+            log.push_str(&format!("{epilog}"));
+        }
+        log.push_str(&out.stderr);
+
+        let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)")?;
+        let caps = re
+            .captures(&log)
+            .context("could not parse DeepVariant version from Slurm output")?;
+
+        Ok(caps.get(1).unwrap().as_str().to_string())
+    }
 }
 
 fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
@@ -411,7 +535,7 @@ fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
     if n == 0 || len == 0 {
         return Vec::new();
     }
-    let chunk_size = (len + n - 1) / n; // ceil
+    let chunk_size = len.div_ceil(n); // ceil
     all_regions
         .chunks(chunk_size)
         .map(|chunk| chunk.join(","))
@@ -506,3 +630,21 @@ fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::helpers::test_init;
+
+    use super::*;
+
+    #[test]
+    fn deepvariant_version() -> anyhow::Result<()> {
+        test_init();
+        let vl = DeepVariant::version(&Config::default())?;
+        info!("DeepVariant local version: {vl}");
+        let vs = DeepVariant::version_slurm(&Config::default())?;
+        info!("DeepVariant slurm version: {vs}");
+        assert_eq!(vl, vs);
+        Ok(())
+    }
+}

+ 1 - 2
src/callers/mod.rs

@@ -1,5 +1,4 @@
-
-// pub mod clairs;
+pub mod clairs;
 pub mod deep_variant;
 // pub mod nanomonsv;
 // pub mod savana;

+ 2 - 18
src/collection/bam.rs

@@ -13,7 +13,7 @@ use rand::{rng, Rng};
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Cigar, Read};
 use serde::{Deserialize, Serialize};
 
-use crate::config::Config;
+use crate::{config::Config, io::bam::get_genome_sizes};
 
 use super::flowcells::FlowCell;
 
@@ -953,22 +953,6 @@ impl fmt::Display for WGSBamStats {
     }
 }
 
-pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<HashMap<String, u64>> {
-    let mut genome = HashMap::new();
-    for (key, records) in header.to_hashmap() {
-        for record in records {
-            if key == "SQ" {
-                genome.insert(
-                    record["SN"].to_string(),
-                    record["LN"]
-                        .parse::<u64>()
-                        .expect("Failed parsing length of chromosomes"),
-                );
-            }
-        }
-    }
-    Ok(genome)
-}
 
 /// Result of querying a read at a reference position.
 #[derive(Clone, Debug, Eq, PartialEq)]
@@ -1123,7 +1107,7 @@ mod tests {
 
     use crate::{
         collection::{
-            bam::{bam_composition, WGSBam, WGSBamStats},
+            bam::{bam_composition, WGSBam},
             run::PromRuns,
         },
         config::Config,

+ 72 - 10
src/commands/mod.rs

@@ -6,7 +6,14 @@ pub mod samtools;
 // pub mod wakhan;
 
 use std::{
-    fmt::{self, Display, Formatter}, fs::{self, File}, io::{self, BufReader, Read, Seek, SeekFrom, Write}, path::Path, process::{Command as ProcessCommand, Stdio}, sync::{Arc, Mutex, mpsc}, thread, time::Duration
+    fmt::{self, Display, Formatter},
+    fs::{self, File},
+    io::{self, BufReader, Read, Seek, SeekFrom, Write},
+    path::{Path, PathBuf},
+    process::{Command as ProcessCommand, Stdio},
+    sync::{Arc, Mutex, mpsc},
+    thread,
+    time::Duration,
 };
 
 /// A helper trait for commands that should be run through Slurm.
@@ -151,6 +158,7 @@ pub trait Runner: Command {
 use anyhow::Context;
 use log::info;
 use serde::{Deserialize, Serialize};
+use uuid::Uuid;
 
 /// Captured process output while it was being streamed live.
 #[derive(Debug, Default, Serialize, Deserialize)]
@@ -161,16 +169,37 @@ pub struct CapturedOutput {
 }
 
 impl CapturedOutput {
-    /// Save struct as JSON to `path` (creates parent dirs if needed).
-    pub fn save_to_file(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
-        let path = path.as_ref();
-        if let Some(p) = path.parent() {
-            fs::create_dir_all(p).with_context(|| format!("Failed to create {}", p.display()))?;
+    /// Constructs a unique filename based on the base `path`, appending
+    /// a truncated UUID v4 segment and the ".json" suffix.
+    /// Saves the struct as JSON to the constructed path (creates parent dirs if needed).
+    pub fn save_to_file(&self, base_path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
+        let base_path = base_path.as_ref();
+
+        let binding = Uuid::new_v4().to_string();
+        let unique_id = binding.split('-').next().unwrap_or("default");
+
+        let filename = format!(
+            "{}_{}.json",
+            base_path
+                .file_name()
+                .unwrap_or(base_path.as_os_str())
+                .to_string_lossy(),
+            unique_id
+        );
+
+        let mut full_path = PathBuf::from(base_path.parent().unwrap_or_else(|| Path::new("")));
+        full_path.push(filename);
+
+        if let Some(p) = full_path.parent() {
+            fs::create_dir_all(p)
+                .with_context(|| format!("Failed to create directory {}", p.display()))?;
         }
-        let json = serde_json::to_string(self)?;
-        fs::write(path, json)
-            .with_context(|| format!("Failed to write {}", path.display()))?;
-        Ok(())
+
+        let json = serde_json::to_string_pretty(self)?; // Using pretty for readability, remove _pretty if not needed
+        fs::write(&full_path, json)
+            .with_context(|| format!("Failed to write file {}", full_path.display()))?;
+
+        Ok(full_path)
     }
 }
 
@@ -747,3 +776,36 @@ where
 
     Ok(results)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::{Command, SbatchRunner, SlurmParams};
+    struct TestRun;
+
+    impl Command for TestRun {
+        fn cmd(&self) -> String {
+            "echo 'hello from sbatch'".to_string()
+        }
+    }
+
+    impl SbatchRunner for TestRun {
+        fn slurm_params(&self) -> SlurmParams {
+            SlurmParams {
+                job_name: Some("test-sbatch".into()),
+                partition: Some("gpgpuq".into()),
+                cpus_per_task: Some(1),
+                mem: Some("1G".into()),
+                gres: None,
+            }
+        }
+    }
+
+    #[test]
+    fn sbatch_test_run() -> anyhow::Result<()> {
+        let mut t = TestRun;
+        let out = SbatchRunner::run(&mut t)?;
+        println!("{out:#?}");
+        assert!(out.stdout.contains("hello from sbatch"));
+        Ok(())
+    }
+}

+ 10 - 44
src/commands/samtools.rs

@@ -148,15 +148,15 @@ impl super::Command for SamtoolsMerge {
         let n_shared = intersection.len();
 
         // relaxed for revovery reads
-        if n_shared > 10_000 {
-            anyhow::bail!(
-                "Cannot merge: {} and {} share {} read name(s) ({:.4} fraction of `into`)",
-                self.into.display(),
-                self.bam.display(),
-                n_shared,
-                frac_into,
-            );
-        }
+        // if n_shared > 100_000 {
+        //     anyhow::bail!(
+        //         "Cannot merge: {} and {} share {} read name(s) ({:.4} fraction of `into`)",
+        //         self.into.display(),
+        //         self.bam.display(),
+        //         n_shared,
+        //         frac_into,
+        //     );
+        // }
 
         let dir = self.into.parent().context(format!(
             "Failed to find parent dir of: {}",
@@ -263,13 +263,10 @@ impl super::SlurmRunner for SamtoolsMerge {
 pub struct SamtoolsSplit {
     /// Path to the `samtools` executable.
     pub samtools: PathBuf,
-
     /// Number of threads to use with `samtools split` (`-@` option).
     pub threads: u8,
-
     /// Input BAM file to demultiplex.
     pub input_bam: PathBuf,
-
     /// Output directory where split BAM files will be written.
     pub output_dir: PathBuf,
 }
@@ -364,7 +361,7 @@ impl SamtoolsSort {
             input_bam: input_bam.as_ref().into(),
             output_bam: output_bam.as_ref().into(),
             slurm_params: SlurmParams {
-                job_name: Some("samtools_split".into()),
+                job_name: Some("samtools_sort".into()),
                 cpus_per_task: Some(threads as u32),
                 mem: Some("60G".into()),
                 partition: Some("shortq".into()),
@@ -413,28 +410,6 @@ impl SbatchRunner for SamtoolsSort {
     }
 }
 
-pub struct TestRun;
-
-impl super::Command for TestRun {
-    fn cmd(&self) -> String {
-        "echo 'hello from sbatch'".to_string()
-    }
-}
-
-impl SbatchRunner for TestRun {
-    fn slurm_params(&self) -> SlurmParams {
-        SlurmParams {
-            job_name: Some("test-sbatch".into()),
-            // Make sure this partition exists on your cluster
-            partition: Some("gpgpuq".into()), // or your default compute partition
-            // Often required: cpus + mem, maybe account
-            cpus_per_task: Some(1),
-            mem: Some("1G".into()),
-            gres: None,
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -447,15 +422,6 @@ mod tests {
         TEST_DIR,
     };
 
-    #[test]
-    fn sbatch_test_run() -> anyhow::Result<()> {
-        let mut t = TestRun;
-        let out = SbatchRunner::run(&mut t)?;
-        println!("{out:#?}");
-        // assert!(out.stdout.contains("hello from sbatch"));
-        Ok(())
-    }
-
     #[test]
     fn samtools_index_merge() -> anyhow::Result<()> {
         test_init();

+ 4 - 1
src/config.rs

@@ -22,6 +22,9 @@ pub struct Config {
     /// Temporary directory.
     pub tmp_dir: String,
 
+    /// Singularity bin
+    pub singularity_bin: String,
+
     /// Temporary directory used when unarchiving input data.
     pub unarchive_tmp_dir: String,
 
@@ -180,7 +183,7 @@ pub struct Config {
     pub clairs_threads: u8,
 
     /// ClairS docker tag.
-    pub clairs_docker_tag: String,
+    pub clairs_image: String,
 
     /// Force ClairS recomputation.
     pub clairs_force: bool,

+ 68 - 0
src/io/bam.rs

@@ -244,3 +244,71 @@ pub fn primary_records(bam: &mut IndexedReader, records: Vec<Record>) -> Vec<Rec
     
     res
 }
+
+
+pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<HashMap<String, u64>> {
+    let mut genome = HashMap::new();
+    for (key, records) in header.to_hashmap() {
+        for record in records {
+            if key == "SQ" {
+                genome.insert(
+                    record["SN"].to_string(),
+                    record["LN"]
+                        .parse::<u64>()
+                        .expect("Failed parsing length of chromosomes"),
+                );
+            }
+        }
+    }
+    Ok(genome)
+}
+
+/// Split the genome into N balanced region lists (`ctg:start-end` strings).
+/// Each element of the outer Vec is the list of `-r` regions for one batch.
+pub fn split_genome_into_n_regions(
+    genome: &HashMap<String, u64>,
+    n: usize,
+) -> Vec<Vec<String>> {
+    if n == 0 || genome.is_empty() {
+        return Vec::new();
+    }
+
+    // Deterministic order over contigs
+    let mut contigs: Vec<_> = genome.iter().collect();
+    contigs.sort_by(|(a, _), (b, _)| a.cmp(b));
+
+    let total_bp: u64 = contigs.iter().map(|(_, &len)| len).sum();
+    let target_per_batch: u64 = total_bp.div_ceil(n as u64); // ceil
+
+    let mut batches: Vec<Vec<String>> = vec![Vec::new(); n];
+    let mut batch_idx = 0usize;
+    let mut batch_bp = 0u64;
+
+    for (ctg, &len) in contigs {
+        let mut start: u64 = 1;
+        while start <= len && batch_idx < n {
+            let remaining_in_ctg = len - start + 1;
+            let remaining_for_batch = if batch_bp >= target_per_batch {
+                1 // force close & move to next batch
+            } else {
+                target_per_batch - batch_bp
+            };
+
+            let seg_len = remaining_in_ctg.min(remaining_for_batch);
+            let end = start + seg_len - 1;
+
+            let region = format!("{ctg}:{start}-{end}");
+            batches[batch_idx].push(region);
+
+            batch_bp += seg_len;
+            start = end + 1;
+
+            if batch_bp >= target_per_batch && batch_idx + 1 < n {
+                batch_idx += 1;
+                batch_bp = 0;
+            }
+        }
+    }
+
+    batches
+}

+ 3 - 0
src/pipes/mod.rs

@@ -20,6 +20,9 @@ pub trait ShouldRun {
 
 pub trait Version {
     fn version(config: &Config) -> anyhow::Result<String>;
+    fn version_slurm(_config: &Config) -> anyhow::Result<String> {
+        anyhow::bail!("No configured")
+    }
 }
 
 // pub trait LoadVariants {

+ 19 - 28
src/pipes/somatic_slurm.rs

@@ -6,10 +6,13 @@ use uuid::Uuid;
 use crate::{
     collection::{flowcells::IdInput, pod5::Pod5sRun},
     commands::{
-        SlurmRunner, dorado::{DoradoAlign, DoradoBasecall}, run_many_sbatch, samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSort, SamtoolsSplit}
+        dorado::{DoradoAlign, DoradoBasecall},
+        run_many_sbatch,
+        samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsSort, SamtoolsSplit},
+        SlurmRunner,
     },
     config::Config,
-    helpers::{TempDirGuard, extract_barcode, list_files_with_ext},
+    helpers::{extract_barcode, list_files_with_ext, TempDirGuard},
 };
 
 /// Normalize barcode strings for matching.
@@ -131,8 +134,6 @@ fn collect_bams_recursively(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
 ///
 /// * `run`: Description of the sequencing run, including $\text{POD5}$ input, optional existing unaligned $\text{BAM}$s ($\text{bams\_pass}$), and associated $\text{cases}$ with $\text{barcodes}$ and $\text{sample types}$.
 /// * `config`: Global configuration object containing paths, the reference name, and command-line options for $\text{Dorado}$, $\text{samtools}$, and $\text{Slurm}$ submission.
-/// * `tmp_dir`: Scratch directory used for all intermediate files (temp $\text{POD5}$ symlinks, unaligned/aligned $\text{BAM}$s, split $\text{BAM}$s). Must be accessible by $\text{Slurm}$ jobs and support atomic $\text{fs::rename}$.
-/// * `min_bam_size`: Minimum file size (in bytes) required for an unaligned $\text{BAM}$ to be considered for alignment. Smaller files are skipped.
 ///
 /// # Returns
 ///
@@ -151,12 +152,8 @@ fn collect_bams_recursively(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
 ///
 /// * Assumes a $\text{Unix}$-like environment for the use of filesystem $\text{symlinks}$.
 /// * Duplicate normalized barcodes in `run.cases` will result in the later entry overwriting earlier ones in the `barcode $\rightarrow$ case` mapping.
-pub fn basecall_align_split(
-    run: &Pod5sRun,
-    config: &Config,
-    tmp_dir: PathBuf,
-    min_bam_size: u64,
-) -> anyhow::Result<()> {
+pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
+    let tmp_dir = PathBuf::from(config.tmp_dir.clone());
     // Validate run has cases
     if run.cases.is_empty() {
         bail!(
@@ -350,17 +347,17 @@ pub fn basecall_align_split(
     let mut unmatched_bams = Vec::new();
 
     for bam in split_bams.iter() {
-        let bam_size = bam.metadata()?.len();
-
-        if bam_size <= min_bam_size {
-            warn!(
-                "BAM size too small ({} bytes < {} bytes), skipping: {}",
-                bam_size,
-                min_bam_size,
-                bam.display()
-            );
-            continue;
-        }
+        // let bam_size = bam.metadata()?.len();
+        //
+        // if bam_size <= min_bam_size {
+        //     warn!(
+        //         "BAM size too small ({} bytes < {} bytes), skipping: {}",
+        //         bam_size,
+        //         min_bam_size,
+        //         bam.display()
+        //     );
+        //     continue;
+        // }
 
         let bam_file_name = bam
             .file_name()
@@ -576,7 +573,6 @@ mod tests {
     use crate::collection::pod5::Pod5sRuns;
     use crate::config::Config;
     use crate::helpers::test_init;
-    use crate::TEST_DIR;
 
     #[test]
     fn slurm_pipe() -> anyhow::Result<()> {
@@ -586,12 +582,7 @@ mod tests {
 
         let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
         for run in runs.data {
-            basecall_align_split(
-                &run,
-                &config,
-                format!("{}/outputs", TEST_DIR.as_str()).into(),
-                10 * 1024 * 1024,
-            )?;
+            import_run(&run, &config)?;
         }
         //
         // let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";

Some files were not shown because too many files changed in this diff