Browse Source

Update DeepVariant for slurm

Thomas 2 weeks ago
parent
commit
59232e836c
8 changed files with 504 additions and 252 deletions
  1. 7 4
      pandora-config.example.toml
  2. 5 0
      slurm-2415516.out
  3. 315 91
      src/callers/deep_variant.rs
  4. 6 2
      src/commands/dorado.rs
  5. 3 0
      src/config.rs
  6. 5 5
      src/lib.rs
  7. 109 149
      src/pipes/somatic_slurm.rs
  8. 54 1
      src/slurm_helpers.rs

+ 7 - 4
pandora-config.example.toml

@@ -150,7 +150,10 @@ min_n_callers = 1
 deepvariant_output_dir = "{result_dir}/{id}/{time}/DeepVariant"
 
 # Threads for DeepVariant.
-deepvariant_threads = 150
+deepvariant_threads = 40
+
+# DeepVariant singularity image path
+deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/tools/"
 
 # DeepVariant version / image tag.
 deepvariant_bin_version = "1.9.0"
@@ -368,11 +371,11 @@ samtools_bin = "/mnt/beegfs02/scratch/t_steimle/tools/samtools"
 samtools_view_threads = 20
 
 # Threads for `samtools sort`.
-samtools_sort_threads = 48
+samtools_sort_threads = 20
 
 # Threads for `samtools merge`.
-samtools_merge_threads = 48
+samtools_merge_threads = 20
 
 # Threads for `samtools split`.
-samtools_split_threads = 48
+samtools_split_threads = 20
 

File diff suppressed because it is too large
+ 5 - 0
slurm-2415516.out


+ 315 - 91
src/callers/deep_variant.rs

@@ -5,24 +5,36 @@ use regex::Regex;
 use std::{
     fs,
     path::Path,
-    process::{Command, Stdio},
+    process::{Command as ProcessCommand, Stdio},
 };
 
 use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     collection::vcf::Vcf,
-    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    commands::{
+        bcftools::{bcftools_keep_pass, BcftoolsConfig},
+        run_many_sbatch,
+    },
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists},
     io::vcf::read_vcf,
     pipes::{InitializeSolo, ShouldRun, Version},
-    runners::{run_wait, DockerRun, Run},
+    runners::Run,
+    slurm_helpers::max_gpu_per_node,
     variant::{
         variant::{Label, Variants},
         variant_collection::VariantCollection,
     },
 };
 
+use crate::commands::{
+    CapturedOutput,
+    Command as JobCommand, // your trait
+    Runner as LocalRunner,
+    SbatchRunner,
+    SlurmParams,
+};
+
 /// A pipeline runner for executing DeepVariant on a single sample and a specific time point (e.g., normal or tumor).
 ///
 /// This struct holds all necessary metadata, configuration, and output paths to perform variant calling
@@ -35,8 +47,10 @@ use crate::{
 pub struct DeepVariant {
     pub id: String,
     pub time_point: String,
+    pub regions: String,
     pub log_dir: String,
     pub config: Config,
+    pub part_index: Option<usize>,
 }
 
 impl InitializeSolo for DeepVariant {
@@ -61,12 +75,18 @@ impl InitializeSolo for DeepVariant {
         info!("Initializing DeepVariant for {id} {time_point}.");
 
         let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
+        let regions = (1..=22)
+            .map(|i| format!("chr{i}"))
+            .chain(["chrX", "chrY", "chrMT"].into_iter().map(String::from))
+            .collect::<Vec<String>>();
 
         let deepvariant = Self {
             id,
             time_point,
             log_dir,
             config,
+            regions: regions.join(","),
+            part_index: None,
         };
 
         if deepvariant.config.deepvariant_force {
@@ -104,93 +124,192 @@ impl ShouldRun for DeepVariant {
     }
 }
 
-impl Run for DeepVariant {
-    /// Executes DeepVariant inside Docker and filters PASS variants via bcftools.
-    /// Runs DeepVariant inside Docker and filters variants using bcftools.
-    ///
-    /// This function:
-    /// - Creates necessary output directories
-    /// - Executes DeepVariant through Docker if needed
-    /// - Filters for PASS variants
-    /// - Saves logs and handles caching logic via file existence
-    ///
-    /// # Errors
-    /// Returns an error if any external command or file operation fails.
-    fn run(&mut self) -> anyhow::Result<()> {
+impl JobCommand for DeepVariant {
+    fn init(&mut self) -> anyhow::Result<()> {
+        // output dir for DeepVariant
+        let output_dir = self
+            .config
+            .deepvariant_output_dir(&self.id, &self.time_point);
+
+        fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?;
+
+        // log dir
+        fs::create_dir_all(&self.log_dir)
+            .context(format!("Failed to create dir: {}", self.log_dir))?;
+
+        Ok(())
+    }
+
+    fn cmd(&self) -> String {
         let bam = self.config.solo_bam(&self.id, &self.time_point);
-        let output_vcf = self
+        let output_dir = self
             .config
-            .deepvariant_solo_output_vcf(&self.id, &self.time_point);
+            .deepvariant_output_dir(&self.id, &self.time_point);
+        let output_vcf_path = self.output_vcf_path();
+        let log_subdir = format!("{}_{}_DeepVariant_logs", self.id, self.time_point);
+        let sample_name = format!("{}_{}", self.id, self.time_point);
 
-        // Run Docker command if output VCF doesn't exist
-        if !Path::new(&output_vcf).exists() {
-            let output_dir = self
-                .config
-                .deepvariant_output_dir(&self.id, &self.time_point);
-
-            fs::create_dir_all(&output_dir)
-                .context(format!("Failed to create dir: {output_dir}"))?;
-
-            let mut docker_run = DockerRun::new(&[
-                "run",
-                "-d",
-                "-v",
-                "/data:/data",
-                "-v",
-                &format!("{}:/output", output_dir),
-                &format!("google/deepvariant:{}", self.config.deepvariant_bin_version),
-                "/opt/deepvariant/bin/run_deepvariant",
-                &format!("--model_type={}", self.config.deepvariant_model_type),
-                "--ref",
-                &self.config.reference,
-                "--reads",
-                &bam,
-                "--output_vcf",
-                &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point),
-                // "--output_gvcf",
-                // &format!(
-                //     "/output/{}_{}_DeepVariant.g.vcf.gz",
-                //     self.id, self.time_point
-                // ),
-                &format!("--num_shards={}", self.config.deepvariant_threads),
-                "--logging_dir",
-                "--vcf_stats_report=true",
-                &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
-                "--dry_run=false",
-                "--sample_name",
-                &format!("{}_{}", self.id, self.time_point),
-            ]);
-
-            let report = run_wait(&mut docker_run).context(format!(
-                "Erreur while running DeepVariant for {} {}",
-                self.id, self.time_point
-            ))?;
+        format!(
+            "module load singularity-ce && singularity exec --nv \
+            --bind /data:/data \
+            --bind {output_dir}:/output \
+            {image} \
+            /opt/deepvariant/bin/run_deepvariant \
+            --model_type={model_type} \
+            --ref {reference} \
+            --reads {bam} \
+            --regions '{regions}' \
+            --haploid_contigs='chrX,chrY' \
+            --output_vcf {output_vcf} \
+            --num_shards={threads} \
+            --vcf_stats_report=true \
+            --logging_dir /output/{log_dir} \
+            --dry_run=false \
+            --sample_name {sample_name}",
+            output_dir = output_dir,
+            image = self.config.deepvariant_image,
+            model_type = self.config.deepvariant_model_type,
+            reference = self.config.reference,
+            bam = bam,
+            regions = self.regions,
+            // we mount output_dir as /output; just rewrite path here:
+            output_vcf = format!(
+                "/output/{}",
+                Path::new(&output_vcf_path)
+                    .file_name()
+                    .unwrap()
+                    .to_string_lossy()
+            ),
+            threads = self.config.deepvariant_threads,
+            log_dir = log_subdir,
+            sample_name = sample_name,
+        )
+    }
 
+    fn clean_up(&self) -> anyhow::Result<()> {
+        let output_vcf = self.output_vcf_path();
+        let vcf_passed = self.passed_vcf_path();
+
+        if !Path::new(&vcf_passed).exists() {
+            info!(
+                "Filtering PASS variants for {} {} (part: {:?})",
+                self.id, self.time_point, self.part_index
+            );
+
+            let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default())?;
             report
-                .save_to_file(&format!("{}/deepvariant_", self.log_dir))
-                .context("Can't save DeepVariant logs")?;
+                .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
+                .context("Can't save bcftools PASS logs")?;
+        }
+
+        Ok(())
+    }
+}
+
+impl LocalRunner for DeepVariant {
+    // default shell() from trait ("bash") is fine
+}
+
+impl SbatchRunner for DeepVariant {
+    fn slurm_params(&self) -> SlurmParams {
+        let gpu = if let (Some(h100_av), Some(a100_av)) =
+            (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
+        {
+            if h100_av > 0 {
+                "h100"
+            } else if a100_av > 0 {
+                "a100"
+            } else {
+                "h100" // waiting for free h100
+            }
         } else {
+            panic!("Are you running slurm with a100 and h100 GPU ?");
+        };
+        SlurmParams {
+            job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
+            cpus_per_task: Some(10),
+            mem: Some("60G".into()),
+            partition: Some("gpgpuq".into()),
+            gres: Some(format!("gpu:{gpu}:1")),
+        }
+    }
+
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        // if you want anything like --time=... here
+        Vec::new()
+    }
+}
+
+impl DeepVariant {
+    /// Run DeepVariant directly on the local machine.
+    pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
+        if !self.should_run() {
             debug!(
-                "DeepVariant output already exists for {} {}, skipping execution.",
+                "DeepVariant output already up-to-date for {} {}, skipping.",
                 self.id, self.time_point
             );
+            return Ok(CapturedOutput::default());
         }
 
-        let vcf_passed = self
-            .config
-            .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
-        if !Path::new(&vcf_passed).exists() {
-            info!(
-                "Filtering PASS variants for {} {}",
+        info!(
+            "Running DeepVariant locally for {} {}",
+            self.id, self.time_point
+        );
+        <Self as LocalRunner>::run(self)
+    }
+
+    /// Run DeepVariant via sbatch + tailing slurm-<jobid>.out.
+    pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
+        if !self.should_run() {
+            debug!(
+                "DeepVariant output already up-to-date for {} {}, skipping.",
                 self.id, self.time_point
             );
-            let report =
-                bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
-            report
-                .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
-                .unwrap();
+            return Ok(CapturedOutput::default());
+        }
+
+        info!(
+            "Submitting DeepVariant via sbatch for {} {}",
+            self.id, self.time_point
+        );
+        <Self as SbatchRunner>::run(self)
+    }
+
+    /// Per-part output VCF path.
+    fn output_vcf_path(&self) -> String {
+        let output_dir = self
+            .config
+            .deepvariant_output_dir(&self.id, &self.time_point);
+
+        match self.part_index {
+            Some(idx) => format!(
+                "{output_dir}/{}_{}_DeepVariant.part{idx}.vcf.gz",
+                self.id, self.time_point
+            ),
+            None => format!(
+                "{output_dir}/{}_{}_DeepVariant.vcf.gz",
+                self.id, self.time_point
+            ),
+        }
+    }
+
+    /// Per-part PASS VCF path (for part runs) or final PASS VCF (single run).
+    fn passed_vcf_path(&self) -> String {
+        match self.part_index {
+            Some(idx) => {
+                let v = self.output_vcf_path();
+                v.replace(".vcf.gz", &format!(".part{idx}.pass.vcf.gz"))
+            }
+            None => self
+                .config
+                .deepvariant_solo_passed_vcf(&self.id, &self.time_point),
         }
+    }
+}
 
+impl Run for DeepVariant {
+    fn run(&mut self) -> anyhow::Result<()> {
+        self.run_local()?;
         Ok(())
     }
 }
@@ -259,35 +378,27 @@ impl Label for DeepVariant {
 }
 
 impl Version for DeepVariant {
-    /// Retrieves the DeepVariant version by running `savana --version` in its conda environment.
-    ///
-    /// # Errors
-    /// Returns an error if command execution fails or "Version " not found in output.
     fn version(config: &Config) -> anyhow::Result<String> {
-        let out = Command::new("docker")
-            .args([
-                "run",
-                "--rm",
-                "--entrypoint",
-                "/opt/deepvariant/bin/run_deepvariant",
-                &format!("google/deepvariant:{}", config.deepvariant_bin_version),
-                "--version",
-            ])
+        let out = ProcessCommand::new("bash")
+            .arg("-lc")
+            .arg(format!(
+                "module load singularity-ce && singularity exec {} /opt/deepvariant/bin/run_deepvariant --version",
+                config.deepvariant_image
+            ))
             .stdout(Stdio::piped())
             .stderr(Stdio::piped())
             .output()
-            .context("failed to spawn docker")?;
+            .context("failed to spawn singularity")?;
 
         if !out.status.success() {
             let mut log = String::from_utf8_lossy(&out.stdout).to_string();
             log.push_str(&String::from_utf8_lossy(&out.stderr));
-            anyhow::bail!("docker run failed: {}\n{}", out.status, log);
+            anyhow::bail!("singularity exec failed: {}\n{}", out.status, log);
         }
 
         let mut log = String::from_utf8_lossy(&out.stdout).to_string();
         log.push_str(&String::from_utf8_lossy(&out.stderr));
 
-        // e.g. “DeepVariant version 1.9.0”
         let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)")?;
         let caps = re
             .captures(&log)
@@ -295,3 +406,116 @@ impl Version for DeepVariant {
         Ok(caps.get(1).unwrap().as_str().to_string())
     }
 }
+
+fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
+    let len = all_regions.len();
+    if n == 0 || len == 0 {
+        return Vec::new();
+    }
+    let chunk_size = (len + n - 1) / n; // ceil
+    all_regions
+        .chunks(chunk_size)
+        .map(|chunk| chunk.join(","))
+        .collect()
+}
+
+/// Run DeepVariant in 4 sbatch jobs (by regions), then merge their PASS VCFs
+/// into the final deepvariant_solo_passed_vcf().
+pub fn run_deepvariant_quartered_sbatch_with_merge(
+    id: &str,
+    time_point: &str,
+    config: Config,
+) -> anyhow::Result<CapturedOutput> {
+    let base = DeepVariant::initialize(id, time_point, config)?;
+
+    // if already up-to-date, nothing to do
+    if !base.should_run() {
+        debug!(
+            "DeepVariant PASS VCF already up-to-date for {} {}, skipping.",
+            id, time_point
+        );
+        return Ok(CapturedOutput::default());
+    }
+
+    // 1) split regions into 4 chunks
+    let all_regions: Vec<&str> = base.regions.split(',').collect();
+    let region_chunks = split_regions_into_n(&all_regions, 4);
+
+    // 2) build jobs
+    let mut jobs = Vec::with_capacity(region_chunks.len());
+    for (i, regions) in region_chunks.into_iter().enumerate() {
+        let mut job = base.clone();
+        job.regions = regions;
+        job.part_index = Some(i + 1);
+        job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
+        jobs.push(job);
+    }
+
+    // 3) run them concurrently on Slurm
+    let outputs = run_many_sbatch(jobs)?;
+
+    // 4) merge PASS VCFs
+    merge_deepvariant_parts(&base, 4)?;
+
+    // we don’t really have a single merged CapturedOutput;
+    // return an empty one or synthesize something from `outputs`.
+    Ok(CapturedOutput {
+        stdout: String::new(),
+        stderr: String::new(),
+        slurm_epilog: None,
+    })
+}
+
+fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
+    let mut part_pass_paths = Vec::with_capacity(n_parts);
+
+    for i in 1..=n_parts {
+        let mut dv = base.clone();
+        dv.part_index = Some(i);
+        let part_pass = dv.passed_vcf_path();
+        if !Path::new(&part_pass).exists() {
+            anyhow::bail!("Missing part PASS VCF: {part_pass}");
+        }
+        part_pass_paths.push(part_pass);
+    }
+
+    let final_passed_vcf = base
+        .config
+        .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
+    let final_tmp = format!("{final_passed_vcf}.tmp");
+
+    fs::create_dir_all(
+        Path::new(&final_passed_vcf)
+            .parent()
+            .unwrap_or_else(|| Path::new(".")),
+    )?;
+
+    // bcftools concat parts -> tmp file, then index and rename
+    let concat_cmd = format!(
+        "bcftools concat -O z -o {out_tmp} {parts} && \
+         bcftools index -t {out_tmp}",
+        out_tmp = final_tmp,
+        parts = part_pass_paths.join(" ")
+    );
+
+    let out = ProcessCommand::new("bash")
+        .arg("-lc")
+        .arg(concat_cmd)
+        .stdout(Stdio::piped())
+        .stderr(Stdio::piped())
+        .output()
+        .context("failed to run bcftools concat for DeepVariant parts")?;
+
+    if !out.status.success() {
+        let mut log = String::from_utf8_lossy(&out.stdout).to_string();
+        log.push_str(&String::from_utf8_lossy(&out.stderr));
+        anyhow::bail!("bcftools concat failed: {}\n{}", out.status, log);
+    }
+
+    // move tmp → final
+    fs::rename(&final_tmp, &final_passed_vcf)
+        .context("failed to rename merged DeepVariant PASS VCF")?;
+
+    Ok(())
+}
+

+ 6 - 2
src/commands/dorado.rs

@@ -6,7 +6,11 @@ use std::{
 use anyhow::Context;
 
 use crate::{
-    collection::pod5::Pod5, commands::{Command, SlurmParams}, config::Config, io::pod5_infos::Pod5Info, slurm_helpers::max_gpu_per_node
+    collection::pod5::Pod5,
+    commands::{Command, SlurmParams},
+    config::Config,
+    io::pod5_infos::Pod5Info,
+    slurm_helpers::max_gpu_per_node,
 };
 
 /// Run Dorado basecalling on a directory of POD5 files.
@@ -206,7 +210,7 @@ impl DoradoAlign {
                 job_name: Some("dorado_align".into()),
                 cpus_per_task: Some(threads.into()),
                 mem: Some("60G".into()),
-                partition: Some("gpgpuq".into()),
+                partition: Some("shortq".into()),
                 gres: None,
             },
         }

+ 3 - 0
src/config.rs

@@ -142,6 +142,9 @@ pub struct Config {
     /// Number of threads to use for DeepVariant.
     pub deepvariant_threads: u8,
 
+    /// DeepVariant singularity image path
+    pub deepvariant_image: String,
+
     /// DeepVariant docker / binary version.
     pub deepvariant_bin_version: String,
 

+ 5 - 5
src/lib.rs

@@ -390,11 +390,11 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn run_deepvariant() -> anyhow::Result<()> {
-        init();
-        DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run()
-    }
+    // #[test]
+    // fn run_deepvariant() -> anyhow::Result<()> {
+    //     init();
+    //     DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run()
+    // }
 
     #[test]
     fn run_clairs() -> anyhow::Result<()> {

+ 109 - 149
src/pipes/somatic_slurm.rs

@@ -1,5 +1,5 @@
 use anyhow::{bail, Context};
-use log::{info, warn};
+use log::{error, info, warn};
 use std::{collections::HashMap, fs, path::PathBuf};
 use uuid::Uuid;
 
@@ -80,137 +80,108 @@ fn collect_bams_recursively(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
     Ok(bams)
 }
 
-/// Run the full POD5 → unaligned BAM → aligned, merged, sorted and indexed BAM
-/// pipeline for a single sequencing run on a flowcell.
+// A RAII (Resource Acquisition Is Initialization) guard for cleaning up temporary directories.
+pub struct TempDirGuard {
+    path: PathBuf,
+}
+
+impl TempDirGuard {
+    pub fn new(path: PathBuf) -> Self {
+        Self { path }
+    }
+}
+
+// The core cleanup logic: executed automatically when the TempDirGuard is dropped.
+impl Drop for TempDirGuard {
+    fn drop(&mut self) {
+        if self.path.exists() {
+            // Log the attempt. Using eprintln/warn! ensures the error is noted,
+            // but the program does not panic.
+            if let Err(e) = fs::remove_dir_all(&self.path) {
+                error!(
+                    "Failed to remove temporary directory '{}': {}",
+                    self.path.display(),
+                    e
+                );
+            }
+        }
+    }
+}
+
+/// Orchestrates the full Nanopore sequencing pipeline: $\text{POD5} \rightarrow \text{Unaligned BAM} \rightarrow \text{Aligned, Merged, Sorted, and Indexed BAM}$ per case.
+///
+/// This function manages the entire process for a single sequencing run, employing a six-step plan
+/// that leverages $\text{Dorado}$ for basecalling/alignment and $\text{Samtools}$ for data manipulation,
+/// executed via $\text{Slurm}$.
+///
+/// The process is designed to be **efficient** by reusing existing data and utilizing parallel
+/// processing for alignment.
+///
+/// ---
+///
+/// ## Pipeline Steps
 ///
-/// This function orchestrates a five–step process:
+/// 1.  **Case Preparation & Barcode Mapping**
+///     * Verifies the run has associated cases and logs all cases.
+///     * Creates a `normalized_barcode` $\rightarrow$ `case` map.
 ///
-/// 1. **Inspect cases and define barcode mappings**
-///    - Verifies that `run` has at least one associated case.
-///    - Logs all cases and builds a `barcode → case` map using
-///      [`normalize_barcode`] on each case’s `barcode`.
+/// 2.  **Input Collection & Basecalling Decision**
+///     * If `run.bams_pass` is set, collects existing $\text{BAM}$s and adds them to the pool of unaligned $\text{BAM}$s ($\text{split\_bams}$).
+///     * Compares the file stem of every $\text{POD5}$ against existing $\text{BAM}$ stems. Any $\text{POD5}$ with a matching $\text{BAM}$ stem is considered **already basecalled** and is skipped.
+///     * Queues the remaining $\text{POD5}$ files for basecalling.
 ///
-/// 2. **Reuse existing unaligned BAMs and decide which POD5 files to basecall**
-///    - If `run.bams_pass` is set, recursively collects all BAM files under
-///      that directory via [`collect_bams_recursively`] and treats them as
-///      already split per barcode. The BAM file stem (filename without
-///      extension) is used as the POD5 stem key.
-///    - For every POD5 in `run.pod5s`, compares its file stem against the set
-///      of existing BAM stems:
-///        * if a matching BAM exists, the POD5 is considered already
-///          basecalled and is **not** reprocessed;
-///        * otherwise, the POD5 is queued for basecalling.
+/// 3.  **Basecalling and Splitting (if needed)**
+///     * If any $\text{POD5}$ files were queued:
+///         * Creates a temporary directory with symlinks to the required $\text{POD5}$ files (Unix only).
+///         * Runs $\text{DoradoBasecall}$ on the temporary $\text{POD5}$ directory to produce a single combined $\text{BAM}$.
+///         * Splits the combined $\text{BAM}$ by read group (typically per-barcode) using $\text{SamtoolsSplit}$ into a temporary split directory.
+///         * Deletes the combined $\text{BAM}$ and adds the newly split $\text{BAM}$s to $\text{split\_bams}$.
 ///
-/// 3. **Basecall missing POD5 files and split by read group (if needed)**
-///    - If there are POD5 files without corresponding BAMs:
-///        * Creates a temporary directory under `tmp_dir` containing only
-///          symlinks to the required POD5s (Linux/Unix only).
-///        * Basecalls this mini–run using [`DoradoBasecall::from_config`],
-///          producing a temporary combined BAM.
-///        * Splits the combined BAM by read group into per–read–group BAMs
-///          (typically per–barcode) via [`SamtoolsSplit::from_config`].
-///        * Deletes the combined basecalled BAM and adds all split BAMs to the
-///          pool of unaligned BAMs (`split_bams`).
-///    - If all POD5 files already had corresponding BAMs, no basecalling is
-///      performed.
+/// 4.  **Alignment Scheduling (Parallel)**
+///     * Iterates over all unaligned $\text{BAM}$s in $\text{split\_bams}$.
+///     * Skips files smaller than or equal to $\text{min\_bam\_size}$ bytes.
+///     * **Matches** the $\text{BAM}$'s extracted barcode to a case in the mapping created in Step 1.
+///     * For each match, sets up the output directory, allocates a temporary aligned $\text{BAM}$ path, and constructs a $\text{DoradoAlign}$ job.
+///     * All $\text{DoradoAlign}$ jobs are submitted and executed **in parallel** via $\text{run\_many\_sbatch}$.
 ///
-/// 4. **Match unaligned BAMs to cases and schedule alignment jobs**
-///    - Iterates over all unaligned split BAMs:
-///        * Skips files smaller than or equal to `min_bam_size` bytes to avoid
-///          trivial or corrupted inputs.
-///        * Extracts and normalizes a barcode from the BAM filename using
-///          [`extract_and_normalize_barcode`].
-///        * If the normalized barcode matches a case from the earlier
-///          `barcode → case` map:
-///            - Computes a sample–type–specific subdirectory with
-///              [`map_sample_type_to_dir`].
-///            - Ensures that
-///              `config.result_dir/<case_id>/<sample_type_dir>` exists.
-///            - Allocates a temporary output BAM path in `tmp_dir` for the
-///              aligned BAM.
-///            - Constructs a [`DoradoAlign`] job from `config`, the unaligned
-///              BAM, and the temporary aligned BAM path.
-///            - Groups the aligned BAM paths in an internal map keyed by
-///              `<case_id>_<sample_type_dir>` to later merge per
-///              case/sample–type.
-///        * BAMs whose barcode cannot be extracted or does not match any case
-///          are recorded as unmatched and logged.
-///    - If there are both cases without barcodes and unmatched BAMs, a warning
-///      is emitted indicating that manual association may be required.
-///    - If no alignment jobs are scheduled, the function bails with an error
-///      indicating that there are no BAM files to align for this run.
-///    - Otherwise, all alignment jobs are submitted and executed via
-///      [`run_many_sbatch`], which is expected to block until completion.
+/// 5.  **Final BAM Processing: Sort, Merge, and Index**
+///     * **Sorts** each individual aligned $\text{BAM}$ chunk (from the temporary directory) using $\text{SamtoolsSort}$ to prepare for merging.
+///     * For each $\text{case/sample\_type}$ group:
+///         * The first $\text{BAM}$ chunk is **renamed** to the final target path.
+///         * Subsequent chunks are **merged** into the target $\text{BAM}$ using $\text{SamtoolsMerge}$.
+///     * The resulting fully merged $\text{BAM}$ is then **coordinate-sorted** again with $\text{SamtoolsSort}$ to guarantee proper order.
+///     * The final sorted $\text{BAM}$ is **indexed** using $\text{SamtoolsIndex}$, making it ready for downstream analysis.
 ///
-/// 5. **Merge, sort, and index final case–level BAMs**
-///    - For each case with a non-empty barcode:
-///        * Derives a key `<case_id>_<sample_type_dir>` and looks up the
-///          corresponding aligned BAMs from the internal map.
-///        * If no aligned BAMs are found, a warning is logged and the case is
-///          skipped.
-///        * Otherwise, constructs a final BAM path:
-///          `config.result_dir/<case_id>/<sample_type_dir>/\
-///           <case_id>_<sample_type_dir>_<reference_name>.bam`.
-///        * For the first aligned BAM:
-///            - Moves it into place as the initial `final_bam_path`
-///              (`fs::rename`).
-///            - Indexes it using [`SamtoolsIndex`].
-///        * For subsequent aligned BAMs:
-///            - Merges them into `final_bam_path` using [`SamtoolsMerge`].
-///        * After all aligned BAMs for a case/sample–type are merged:
-///            - Sorts the final BAM into a temporary
-///              `*.sorted.bam` file via [`SamtoolsSort`].
-///            - Renames the sorted BAM back to `final_bam_path`.
-///            - Indexes the sorted BAM using [`SamtoolsIndex`] again.
-///    - The resulting, per–case, per–sample–type BAMs are aligned, merged,
-///      coordinate–sorted, and indexed, ready for downstream analysis.
+/// 6.  **Cleanup & Summary**
+///     * Removes all temporary directories created during the process.
+///     * Logs a final summary of processed cases and output files.
 ///
-/// 6. **Cleanup**
-///    - Removes the temporary split BAM directory and temporary POD5 directory
-///      created in step 3, if they exist.
-///    - Logs a final summary including the number of cases and the number of
-///      case/sample–type groups that produced final BAMs.
+/// ---
 ///
 /// # Arguments
 ///
-/// * `run` – Description of a sequencing run, including POD5 input files,
-///   optional existing unaligned BAMs (`bams_pass`), and associated cases with
-///   barcodes and sample types.
-/// * `config` – Global configuration object with paths, reference name,
-///   and command–line options for Dorado, samtools and Slurm submission.
-/// * `tmp_dir` – Scratch directory used to create temporary POD5 directories,
-///   intermediate unaligned/ aligned BAMs, and split BAMs. It should be on a
-///   filesystem accessible to the Slurm jobs and support `rename`.
-/// * `min_bam_size` – Minimum BAM file size (in bytes) required for an
-///   unaligned BAM to be considered for alignment. Smaller files are skipped.
+/// * `run`: Description of the sequencing run, including $\text{POD5}$ input, optional existing unaligned $\text{BAM}$s ($\text{bams\_pass}$), and associated $\text{cases}$ with $\text{barcodes}$ and $\text{sample types}$.
+/// * `config`: Global configuration object containing paths, the reference name, and command-line options for $\text{Dorado}$, $\text{samtools}$, and $\text{Slurm}$ submission.
+/// * `tmp_dir`: Scratch directory used for all intermediate files (temp $\text{POD5}$ symlinks, unaligned/aligned $\text{BAM}$s, split $\text{BAM}$s). Must be accessible by $\text{Slurm}$ jobs and support atomic $\text{fs::rename}$.
+/// * `min_bam_size`: Minimum file size (in bytes) required for an unaligned $\text{BAM}$ to be considered for alignment. Smaller files are skipped.
 ///
 /// # Returns
 ///
-/// Returns `Ok(())` if all stages of the pipeline complete successfully and
-/// final BAMs are produced (where possible) for the run’s cases. On any error,
-/// returns an [`anyhow::Error`] with contextual information.
+/// Returns `Ok(())` upon successful completion of all pipeline stages, resulting in final $\text{BAM}$s for the run’s cases. Returns an [`anyhow::Error`] with contextual information on any failure.
 ///
 /// # Errors
 ///
 /// This function will return an error if:
 ///
 /// * The run has no associated cases.
-/// * Collecting existing BAMs, creating temporary directories, or symlinking
-///   POD5 files fails.
-/// * Any external command submission or execution (Dorado basecalling,
-///   samtools split/merge/sort/index, alignment jobs) fails.
-/// * Merging, sorting, indexing, or moving BAM files fails.
-/// * Cleanup of temporary directories fails.
+/// * File system operations fail (e.g., creating temporary directories, $\text{fs::rename}$).
+/// * Any external command submission or execution fails (e.g., $\text{Dorado}$ basecalling/alignment, $\text{Samtools}$ operations, $\text{Slurm}$ batch submission).
+/// * No unaligned $\text{BAM}$ files are available to align after processing and filtering.
 ///
-/// # Notes
+/// # Notes on Implementation
 ///
-/// * This function assumes a Unix-like environment for the use of filesystem
-///   symlinks (`std::os::unix::fs::symlink`).
-/// * Duplicate normalized barcodes in `run.cases` will cause later entries to
-///   overwrite earlier ones in the `barcode → case` mapping.
-/// * BAM indexing is currently performed both before and after sorting; this is
-///   redundant but harmless and can be optimized by indexing only the final
-///   sorted BAM.
+/// * Assumes a $\text{Unix}$-like environment for the use of filesystem $\text{symlinks}$.
+/// * Duplicate normalized barcodes in `run.cases` will result in the later entry overwriting earlier ones in the `barcode $\rightarrow$ case` mapping.
 pub fn basecall_align_split(
     run: &Pod5sRun,
     config: &Config,
@@ -324,9 +295,7 @@ pub fn basecall_align_split(
         }
     }
 
-    // 3) If there are POD5s to basecall, create a tmp POD5 dir and basecall + split
     let mut tmp_split_dir: Option<PathBuf> = None;
-    let mut tmp_pod5_dir: Option<PathBuf> = None;
 
     if !pod5s_to_basecall.is_empty() {
         info!(
@@ -336,6 +305,7 @@ pub fn basecall_align_split(
 
         // Create temporary directory containing only the POD5s that need basecalling
         let local_pod5_dir = tmp_dir.join(format!("pod5s_{}", Uuid::new_v4()));
+        let _pod5_guard = TempDirGuard::new(local_pod5_dir.clone());
         fs::create_dir_all(&local_pod5_dir).context(format!(
             "Failed to create temporary POD5 directory: {}",
             local_pod5_dir.display()
@@ -356,8 +326,6 @@ pub fn basecall_align_split(
             })?;
         }
 
-        tmp_pod5_dir = Some(local_pod5_dir.clone());
-
         let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
         info!(
             "Step 1/5: Basecalling {} POD5 file(s) into: {}",
@@ -602,17 +570,9 @@ pub fn basecall_align_split(
         info!("  Output: {}", final_bam_path.display());
     }
 
-    // Cleanup temporary dirs we created
-    info!("Cleaning up temporary files");
     if let Some(d) = tmp_split_dir {
-        fs::remove_dir_all(&d)
-            .context(format!("Failed to remove split directory: {}", d.display()))?;
-    }
-    if let Some(d) = tmp_pod5_dir {
-        fs::remove_dir_all(&d).context(format!(
-            "Failed to remove temporary POD5 directory: {}",
-            d.display()
-        ))?;
+        info!("Removing temporary split BAM directory: {}", d.display());
+        fs::remove_dir_all(&d)?;
     }
 
     info!(
@@ -642,30 +602,30 @@ mod tests {
 
         let config = Config::default();
 
-        // let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
-        // for run in runs.data {
-        //     basecall_align_split(
-        //         &run,
-        //         &config,
-        //         format!("{}/outputs", TEST_DIR.as_str()).into(),
-        //         10 * 1024 * 1024,
-        //     )?;
-        // }
-
-        let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
-
-        let mut runs = Pod5sRuns::new();
-        runs.add_run_dir(dir)?;
-
-        runs.data[0].add_id_input(IdInput { case_id: "TEST_02".to_string(), sample_type: "NOR".to_string(), barcode: "NB02".to_string(), flow_cell_id: "".to_string() });
-        runs.data[0].add_id_input(IdInput { case_id: "TEST_04".to_string(), sample_type: "NOR".to_string(), barcode: "NB04".to_string(), flow_cell_id: "".to_string() });
-
-        basecall_align_split(
-            &runs.data[0],
-            &config,
-            format!("{}/outputs", TEST_DIR.as_str()).into(),
-            10 * 1024 * 1024,
-        )?;
+        let runs = Pod5sRuns::load_json("/home/t_steimle/data/pod5_runs.json")?;
+        for run in runs.data {
+            basecall_align_split(
+                &run,
+                &config,
+                format!("{}/outputs", TEST_DIR.as_str()).into(),
+                10 * 1024 * 1024,
+            )?;
+        }
+        //
+        // let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
+        //
+        // let mut runs = Pod5sRuns::new();
+        // runs.add_run_dir(dir)?;
+        //
+        // runs.data[0].add_id_input(IdInput { case_id: "TEST_02".to_string(), sample_type: "NOR".to_string(), barcode: "NB02".to_string(), flow_cell_id: "".to_string() });
+        // runs.data[0].add_id_input(IdInput { case_id: "TEST_04".to_string(), sample_type: "NOR".to_string(), barcode: "NB04".to_string(), flow_cell_id: "".to_string() });
+        //
+        // basecall_align_split(
+        //     &runs.data[0],
+        //     &config,
+        //     format!("{}/outputs", TEST_DIR.as_str()).into(),
+        //     10 * 1024 * 1024,
+        // )?;
 
         Ok(())
     }

+ 54 - 1
src/slurm_helpers.rs

@@ -204,9 +204,60 @@ pub fn max_gpu_per_node(gpu_name: &str) -> Option<i32> {
         .max()
 }
 
+/// Return the **maximum idle CPUs on a single non-drained node** in `shortq`.
+pub fn max_cpus_per_task_shortq() -> anyhow::Result<u32> {
+    let out = Command::new("sinfo")
+        .args(["-p", "shortq", "-N", "-h", "-o", "%C %t"])
+        .output()?;
+
+    let stdout = String::from_utf8_lossy(&out.stdout);
+    let mut max_idle = 0;
+
+    for line in stdout.lines() {
+        let mut parts = line.split_whitespace();
+        let cpus = parts.next().unwrap_or("");
+        let state = parts.next().unwrap_or("").to_lowercase();
+        if state.contains("drain") { continue; }
+
+        // CPUS(A/I/O/T) → take idle `I` at index 1
+        if let Some(i) = cpus.split('/').nth(1).and_then(|v| v.parse::<u32>().ok()) {
+            max_idle = max_idle.max(i);
+        }
+    }
+
+    Ok(max_idle)
+}
+
+// pub fn available_cpus_shortq() -> Result<u32> {
+//     use std::process::Command;
+//
+//     let out = Command::new("sinfo")
+//         .args(["-p", "shortq", "-N", "-h", "-o", "%C %t"])
+//         .output()?;
+//
+//     let stdout = String::from_utf8_lossy(&out.stdout);
+//     let mut idle = 0;
+//
+//     for line in stdout.lines() {
+//         let mut parts = line.split_whitespace();
+//         let cpus = parts.next().unwrap_or("");
+//         let state = parts.next().unwrap_or("").to_lowercase();
+//         if state.contains("drain") {
+//             continue;
+//         }
+//
+//         // CPUS(A/I/O/T) → idle is index 1
+//         if let Some(i) = cpus.split('/').nth(1).and_then(|v| v.parse::<u32>().ok()) {
+//             idle = idle.saturating_add(i);
+//         }
+//     }
+//
+//     Ok(idle)
+// }
+
 #[cfg(test)]
 mod tests {
-    use crate::slurm_helpers::{max_gpu_per_node, slurm_availables};
+    use crate::slurm_helpers::{max_cpus_per_task_shortq, max_gpu_per_node, slurm_availables};
 
     #[test]
     fn slurm_info() -> anyhow::Result<()> {
@@ -238,6 +289,8 @@ mod tests {
         println!("H100 max available: {}", u.unwrap_or(0));
         let u = max_gpu_per_node("a100");
         println!("A100 max available: {}", u.unwrap_or(0));
+        let u = max_cpus_per_task_shortq();
+        println!("CPU max available: {}", u.unwrap_or(0));
 
         Ok(())
     }

Some files were not shown because too many files changed in this diff