Explorar el Código

reduced clairS threads; added slurm_max_par default: 10; use of run! and run_many! in bam import

Thomas hace 1 semana
padre
commit
f216c34dce
Se han modificado 6 ficheros con 103 adiciones y 49 borrados
  1. 7 4
      pandora-config.example.toml
  2. 33 24
      src/commands/dorado.rs
  3. 7 5
      src/commands/mod.rs
  4. 25 1
      src/commands/samtools.rs
  5. 3 0
      src/config.rs
  6. 28 15
      src/pipes/somatic_slurm.rs

+ 7 - 4
pandora-config.example.toml

@@ -13,6 +13,9 @@ tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 # Should use Slurm as runner
 slurm_runner = true
 
+# Slurm max parallel jobs
+slurm_max_par = 20
+
 # Run cache directory.
 run_cache_dir = "/home/t_steimle/data/prom_runs"
 
@@ -191,7 +194,7 @@ deepsomatic_force = false
 #######################################
 
 # Threads for ClairS.
-clairs_threads = 20
+clairs_threads = 10
 
 # ClairS docker tag.
 clairs_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/clairs_latest.sif"
@@ -380,11 +383,11 @@ echtvar_sources = [
 # Bcftools configuration
 #######################################
 
-# Path to longphase binary.
+# Path to bcftools binary.
 bcftools_bin = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/bcftools"
 
-# Threads for longphase.
-bcftools_threads = 30
+# Threads for bcftools.
+bcftools_threads = 10
 
 #######################################
 # Longphase configuration

+ 33 - 24
src/commands/dorado.rs

@@ -152,37 +152,46 @@ impl Command for DoradoBasecall {
 ///   --gres=gpu:h100:4 \
 ///   bash -c "<dorado command…>"
 /// ```
-impl super::SlurmRunner for DoradoBasecall {
-    fn slurm_args(&self) -> Vec<String> {
-        let (gpu, n) = if let (Some(h100_av), Some(a100_av)) =
-            (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
-        {
-            debug!("Available H100: {h100_av} and A100: {a100_av}");
-            let (gpu, n) = if h100_av >= a100_av {
-                ("h100", h100_av)
-            } else {
-                ("a100", a100_av)
-            };
-
-            let n = n.clamp(1, 4);
-            (gpu, n)
+fn dorado_slurm_params() -> super::SlurmParams {
+    let (gpu, n) = if let (Some(h100_av), Some(a100_av)) =
+        (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
+    {
+        debug!("Available H100: {h100_av} and A100: {a100_av}");
+        let (gpu, n) = if h100_av >= a100_av {
+            ("h100", h100_av)
         } else {
-            panic!("Are you running slurm with a100 and h100 GPU ?");
+            ("a100", a100_av)
         };
-        info!("Running Dorado Basecaller with: {gpu} x {n}");
-        super::SlurmParams {
-            job_name: Some("dorado_basecall".into()),
-            cpus_per_task: Some(10),
-            mem: Some("60G".into()),
-            partition: Some("gpgpuq".into()),
-            gres: Some(format!("gpu:{gpu}:{n}")),
-        }
-        .to_args()
+
+        let n = n.clamp(1, 4);
+        (gpu, n)
+    } else {
+        panic!("Are you running slurm with a100 and h100 GPU ?");
+    };
+    info!("Running Dorado Basecaller with: {gpu} x {n}");
+    super::SlurmParams {
+        job_name: Some("dorado_basecall".into()),
+        cpus_per_task: Some(10),
+        mem: Some("60G".into()),
+        partition: Some("gpgpuq".into()),
+        gres: Some(format!("gpu:{gpu}:{n}")),
+    }
+}
+
+impl super::SlurmRunner for DoradoBasecall {
+    fn slurm_args(&self) -> Vec<String> {
+        dorado_slurm_params().to_args()
     }
 }
 
 impl super::LocalRunner for DoradoBasecall {}
 
+impl super::SbatchRunner for DoradoBasecall {
+    fn slurm_params(&self) -> SlurmParams {
+        dorado_slurm_params()
+    }
+}
+
 /// Run Dorado alignment using a reference FASTA and an input BAM.
 ///
 /// This command:

+ 7 - 5
src/commands/mod.rs

@@ -669,6 +669,8 @@ fn is_progress_line(line: &str) -> bool {
 
 use std::collections::HashMap;
 
+use crate::config::Config;
+
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 pub struct SlurmEpilog {
     pub job_id: String,
@@ -797,7 +799,7 @@ pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
 /// NOTE:
 ///   - Outputs from different jobs may interleave on the terminal.
 ///   - Slurm still decides scheduling order (queue vs run).
-pub fn run_many_sbatch<T>(jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
+pub fn run_many_sbatch<T>(jobs: Vec<T>, config: &Config) -> anyhow::Result<Vec<CapturedOutput>>
 where
     T: SbatchRunner + Send + 'static,
 {
@@ -817,10 +819,10 @@ where
     //     results.push(res);
     // }
 
-    // Rule for the IGR cluster because the admins dont want to implement propoper rules so we are 
-    let max_parallel = 22;
+    // Rule for the IGR cluster because the admins dont want to implement proper rules so we are 
+    let max_parallel = config.slurm_max_par;
     let pool = rayon::ThreadPoolBuilder::new()
-        .num_threads(max_parallel)
+        .num_threads(max_parallel.into())
         .build()
         .context("failed to build rayon thread pool")?;
 
@@ -998,7 +1000,7 @@ where
 macro_rules! run_many {
     ($cfg:expr, $jobs:expr) => {{
         if $cfg.slurm_runner {
-            $crate::commands::run_many_sbatch($jobs)
+            $crate::commands::run_many_sbatch($jobs, $cfg)
         } else {
             $crate::commands::run_many_local_batch($jobs)
         }

+ 25 - 1
src/commands/samtools.rs

@@ -392,6 +392,18 @@ impl super::SlurmRunner for SamtoolsMerge {
     }
 }
 
+impl super::SbatchRunner for SamtoolsMerge {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("samtools_merge".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
 use std::io::Write;
 
 /// Merges multiple BAM files into a single output BAM.
@@ -642,6 +654,18 @@ impl super::SlurmRunner for SamtoolsSplit {
     }
 }
 
+impl super::SbatchRunner for SamtoolsSplit {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("samtools_split".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
 #[derive(Debug)]
 pub struct SamtoolsSort {
     /// Path to the `samtools` executable.
@@ -818,7 +842,7 @@ mod tests {
         //     format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()),
         // );
 
-        let r = run_many_sbatch(vec![sort_1])?;
+        let r = run_many_sbatch(vec![sort_1], &config)?;
 
         println!("{r:#?}");
         Ok(())

+ 3 - 0
src/config.rs

@@ -25,6 +25,9 @@ pub struct Config {
     /// Runner can slurm
     pub slurm_runner: bool,
 
+    /// Slurm max parallel jobs
+    pub slurm_max_par: u8,
+
     /// Software threads
     pub threads: u8,
 

+ 28 - 15
src/pipes/somatic_slurm.rs

@@ -7,9 +7,7 @@ use crate::{
     collection::{flowcells::IdInput, pod5::Pod5sRun},
     commands::{
         dorado::{DoradoAlign, DoradoBasecall},
-        run_many_sbatch,
         samtools::{SamtoolsIndex, SamtoolsMerge, SamtoolsMergeMany, SamtoolsSort, SamtoolsSplit},
-        SlurmRunner,
     },
     config::Config,
     helpers::{extract_barcode, list_files_with_ext, remove_bam_with_index, TempDirGuard},
@@ -305,7 +303,8 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
         // DoradoBasecall must accept a directory of POD5s here
         let mut cmd =
             DoradoBasecall::from_config(config, local_pod5_dir.clone(), tmp_basecalled_bam.clone());
-        let _out = SlurmRunner::exec(&mut cmd)?;
+        let _out = crate::run!(config, &mut cmd)?;
+        // let _out = SlurmRunner::exec(&mut cmd)?;
         info!("Basecalled ✅ (run: {})", run.run_id);
 
         // Split by read group
@@ -317,7 +316,7 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
 
         let mut cmd =
             SamtoolsSplit::from_config(config, &tmp_basecalled_bam, local_split_dir.clone());
-        let _out = SlurmRunner::exec(&mut cmd)?;
+        let _out = crate::run!(config, &mut cmd)?;
         fs::remove_file(&tmp_basecalled_bam).context(format!(
             "Failed to remove temporary basecalled BAM: {}",
             tmp_basecalled_bam.display()
@@ -437,10 +436,14 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
         );
     }
 
-    run_many_sbatch(align_jobs).context(format!(
+    crate::run_many!(config, align_jobs).context(format!(
         "Failed to run alignment batch for {} jobs (run: {})",
         n_jobs, run.run_id
     ))?;
+    // run_many_sbatch(align_jobs, config).context(format!(
+    //     "Failed to run alignment batch for {} jobs (run: {})",
+    //     n_jobs, run.run_id
+    // ))?;
 
     info!("Alignments done ✅ (run: {})", run.run_id);
 
@@ -455,14 +458,18 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
             );
 
             let mut sort_cmd = SamtoolsSort::from_config(config, &bam, &sorted_bam);
-            SlurmRunner::exec(&mut sort_cmd)?;
+            crate::run!(config, &mut sort_cmd)?;
+            // SlurmRunner::exec(&mut sort_cmd)?;
 
             // remove unsorted chunk
             fs::remove_file(&bam)?;
 
-            let mut index_cmd =
-                SamtoolsIndex::from_config(config, sorted_bam.to_string_lossy().to_string().as_ref());
-            SlurmRunner::exec(&mut index_cmd)?;
+            let mut index_cmd = SamtoolsIndex::from_config(
+                config,
+                sorted_bam.to_string_lossy().to_string().as_ref(),
+            );
+            crate::run!(config, &mut index_cmd)?;
+            // SlurmRunner::exec(&mut index_cmd)?;
 
             // replace path in case_bam_map with sorted version
             *bam = sorted_bam;
@@ -528,7 +535,8 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
 
             let mut merge_many_cmd =
                 SamtoolsMergeMany::from_config(tmp_merged.clone(), aligned_bams.clone(), config);
-            SlurmRunner::exec(&mut merge_many_cmd)?;
+            crate::run!(config, &mut merge_many_cmd)?;
+            // SlurmRunner::exec(&mut merge_many_cmd)?;
 
             // Remove chunk BAMs and their indices
             for bam in aligned_bams {
@@ -555,16 +563,19 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
             // Index both source (per-case) and destination BAMs
             let mut index_cmd =
                 SamtoolsIndex::from_config(config, case_merged_bam.to_string_lossy().as_ref());
-            SlurmRunner::exec(&mut index_cmd)?;
+            crate::run!(config, &mut index_cmd)?;
+            // SlurmRunner::exec(&mut index_cmd)?;
 
             let mut index_cmd =
                 SamtoolsIndex::from_config(config, final_bam_path.to_string_lossy().as_ref());
-            SlurmRunner::exec(&mut index_cmd)?;
+            crate::run!(config, &mut index_cmd)?;
+            // SlurmRunner::exec(&mut index_cmd)?;
 
             // Merge into existing final BAM.
             let mut merge_cmd =
                 SamtoolsMerge::from_config(config, &case_merged_bam, &final_bam_path);
-            SlurmRunner::exec(&mut merge_cmd)?;
+            crate::run!(config, &mut merge_cmd)?;
+            // SlurmRunner::exec(&mut merge_cmd)?;
         } else {
             info!(
                 "  Creating new final BAM for case {} → {}",
@@ -589,7 +600,8 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
         );
 
         let mut sort_cmd = SamtoolsSort::from_config(config, &final_bam_path, &sorted_tmp);
-        SlurmRunner::exec(&mut sort_cmd)?;
+        crate::run!(config, &mut sort_cmd)?;
+        // SlurmRunner::exec(&mut sort_cmd)?;
 
         // Replace unsorted BAM with sorted BAM
         fs::rename(&sorted_tmp, &final_bam_path)?;
@@ -597,7 +609,8 @@ pub fn import_run(run: &Pod5sRun, config: &Config) -> anyhow::Result<()> {
         // Index the **sorted** final BAM
         let mut index_cmd =
             SamtoolsIndex::from_config(config, final_bam_path.to_string_lossy().as_ref());
-        SlurmRunner::exec(&mut index_cmd)?;
+        crate::run!(config, &mut index_cmd)?;
+        // SlurmRunner::exec(&mut index_cmd)?;
 
         info!("  Output: {}", final_bam_path.display());