Explorar el Código

rm collections

Thomas hace 3 meses
padre
commit
ba99b771cc

+ 1 - 1
pandora-config.example.toml

@@ -353,7 +353,7 @@ dorado_basecall_arg = "-x 'cuda:all' sup,5mC_5hmC"
 dorado_should_realign = false
 
 # Dorado aligner threads number
-dorado_aligner_threads = 80
+dorado_aligner_threads = 20
 
 # Reference FASTA used for alignment.
 ref_fa = "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa"

+ 95 - 2
src/collection/bam.rs

@@ -995,7 +995,7 @@ impl From<u8> for PileBase {
             b'C' | b'c' => PileBase::C,
             b'G' | b'g' => PileBase::G,
             b'T' | b't' => PileBase::T,
-            _           => PileBase::N,
+            _ => PileBase::N,
         }
     }
 }
@@ -1013,7 +1013,11 @@ fn decode(n: u8) -> PileBase {
     }
 }
 
-pub fn base_at_new(record: &rust_htslib::bam::Record, at_pos: i64, with_next_ins: bool) -> Option<PileBase> {
+pub fn base_at_new(
+    record: &rust_htslib::bam::Record,
+    at_pos: i64,
+    with_next_ins: bool,
+) -> Option<PileBase> {
     if record.pos() > at_pos {
         return None;
     }
@@ -1114,3 +1118,92 @@ pub fn nt_pileup_new(
     }
     Ok(pile)
 }
+
+#[cfg(test)]
+mod tests {
+    use log::info;
+
+    use crate::{
+        collection::{
+            bam::{bam_composition, WGSBam, WGSBamStats},
+            run::PromRuns,
+        },
+        config::Config,
+        helpers::test_init,
+    };
+
+    #[test]
+    fn bam_stats() -> anyhow::Result<()> {
+        test_init();
+        let case_id = "HOULNE";
+        let time = "diag";
+
+        info!("WGSBamStats::new");
+
+        let r = WGSBamStats::new(case_id, time, &Config::default())?;
+        println!("{r}");
+        assert_eq!(r.all_records, 22417713);
+        let c = r
+            .karyotype
+            .iter()
+            .find_map(|e| if e.2 == "chr9" { Some(e.6) } else { None })
+            .unwrap();
+        assert_eq!(c, 0.8800890147031387);
+
+        Ok(())
+    }
+
+    #[test]
+    fn bam_new() -> anyhow::Result<()> {
+        test_init();
+        let case_id = "HOULNE";
+        let time = "diag";
+
+        info!("WGSBam::new");
+
+        let c = Config::default();
+
+        let b = WGSBam::new(c.solo_bam(case_id, time).into())?;
+
+        println!("{:?}", b.composition);
+
+        Ok(())
+    }
+
+    #[test]
+    fn bam_c() -> anyhow::Result<()> {
+        test_init();
+        let case_id = "HOULNE";
+        let time = "diag";
+
+        info!("bam_composition");
+
+        let c = Config::default();
+
+        let compo = bam_composition(&c.solo_bam(case_id, time), 20_000)?;
+        assert_eq!(compo.len(), 2);
+
+        for (rg, flowcell, pct) in compo {
+            println!("{flowcell} {rg}: {pct:.2}%");
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn samplerun() -> anyhow::Result<()> {
+        test_init();
+
+        // let r = PromRun::from_local_dir("/home/prom/mnt/store/20250428_AgC/FIANXA-DG-N06_MONCA-DG-N07/20250428_1600_2E_PBC92750_b7090451")?;
+        // println!("{}", r.id());
+
+        let mut rs = PromRuns::new("/data/promethion-runs-metadata".into());
+        // rs.scan_local("/home/prom/mnt/store/20250428_AgC")?;
+        rs.scan_archives_dir("/home/prom/mnt/lto")?;
+        rs.archive(true)?;
+
+        // rs.load_sample_sheets()?;
+        rs.stats();
+        Ok(())
+    }
+}

+ 1 - 1
src/collection/flowcells.rs

@@ -541,7 +541,7 @@ impl FlowCellExperiment {
     /// - `Some(FlowCellExperiment)` if a known subdirectory is found.
     /// - `None` if no match is detected.
     pub fn from_path(flowcell_path: &str) -> Option<Self> {
-        for dir in list_directories(flowcell_path).ok().unwrap_or_default() {
+        for dir in list_directories(flowcell_path.into()).ok().unwrap_or_default() {
             if dir == "pod5" {
                 return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string()));
             }

+ 11 - 14
src/collection/mod.rs

@@ -14,10 +14,10 @@ use log::{error, info, warn};
 use modbases::{Dmr, ModBasesCollection, ModType};
 use pandora_lib_variants::variants::Variants;
 
-use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
+use self::{bam::BamCollection, /* pod5::Pod5Collection, */ vcf::VcfCollection};
 use crate::{
     callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV},
-    collection::pod5::FlowCellCase,
+    /* collection::pod5::FlowCellCase, */
     commands::{
         dorado::Dorado as BasecallAlign,
         modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig},
@@ -31,16 +31,13 @@ use crate::{
     scan::scan::par_whole_scan,
 };
 
-pub mod bam;
-pub mod modbases;
-pub mod pod5;
-pub mod vcf;
-pub mod flowcells;
-pub mod minknow;
-
-#[cfg(test)]
-mod tests;
-pub mod run;
+// pub mod bam;
+// pub mod modbases;
+// pub mod pod5;
+// pub mod vcf;
+// pub mod flowcells;
+// pub mod minknow;
+// pub mod run;
 
 #[derive(Debug, Clone)]
 pub struct CollectionsConfig {
@@ -70,7 +67,7 @@ impl Default for CollectionsConfig {
 #[derive(Debug)]
 pub struct Collections {
     pub config: CollectionsConfig,
-    pub pod5: Pod5Collection,
+    // pub pod5: Pod5Collection,
     pub bam: BamCollection,
     pub vcf: VcfCollection,
     pub modbases: ModBasesCollection,
@@ -91,7 +88,7 @@ impl Collections {
         let modbases = ModBasesCollection::new(result_dir);
 
         Ok(Self {
-            pod5: Pod5Collection::default(),
+            // pod5: Pod5Collection::default(),
             bam,
             vcf,
             modbases,

+ 1 - 0
src/collection/pod5.rs

@@ -679,3 +679,4 @@ pub struct FCLine {
     /// Corrected flowcell name used to resolve naming inconsistencies.
     pub ref_flow_cell: String,
 }
+

+ 0 - 92
src/collection/tests.rs

@@ -1,92 +0,0 @@
-use log::info;
-
-use crate::{
-    collection::{
-        bam::{bam_composition, WGSBam, WGSBamStats},
-        run::PromRuns,
-    },
-    config::Config,
-};
-
-use super::run::PromRun;
-
-fn init() {
-    let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
-        .is_test(true)
-        .try_init();
-}
-
-#[test]
-fn bam_stats() -> anyhow::Result<()> {
-    init();
-    let case_id = "HOULNE";
-    let time = "diag";
-
-    info!("WGSBamStats::new");
-
-    let r = WGSBamStats::new(case_id, time, &Config::default())?;
-    println!("{r}");
-    assert_eq!(r.all_records, 22417713);
-    let c = r
-        .karyotype
-        .iter()
-        .find_map(|e| if e.2 == "chr9" { Some(e.6) } else { None })
-        .unwrap();
-    assert_eq!(c, 0.8800890147031387);
-
-    Ok(())
-}
-
-#[test]
-fn bam_new() -> anyhow::Result<()> {
-    init();
-    let case_id = "HOULNE";
-    let time = "diag";
-
-    info!("WGSBam::new");
-
-    let c = Config::default();
-
-    let b = WGSBam::new(c.solo_bam(case_id, time).into())?;
-
-    println!("{:?}", b.composition);
-
-    Ok(())
-}
-
-#[test]
-fn bam_c() -> anyhow::Result<()> {
-    init();
-    let case_id = "HOULNE";
-    let time = "diag";
-
-    info!("bam_composition");
-
-    let c = Config::default();
-
-    let compo = bam_composition(&c.solo_bam(case_id, time), 20_000)?;
-    assert_eq!(compo.len(), 2);
-
-    for (rg, flowcell, pct) in compo {
-        println!("{flowcell} {rg}: {pct:.2}%");
-    }
-
-    Ok(())
-}
-
-#[test]
-fn samplerun() -> anyhow::Result<()> {
-    init();
-
-    // let r = PromRun::from_local_dir("/home/prom/mnt/store/20250428_AgC/FIANXA-DG-N06_MONCA-DG-N07/20250428_1600_2E_PBC92750_b7090451")?;
-    // println!("{}", r.id());
-
-    let mut rs = PromRuns::new("/data/promethion-runs-metadata".into());
-    // rs.scan_local("/home/prom/mnt/store/20250428_AgC")?;
-    rs.scan_archives_dir("/home/prom/mnt/lto")?;
-    rs.archive(true)?;
-
-    // rs.load_sample_sheets()?;
-    rs.stats();
-    Ok(())
-}

+ 140 - 54
src/commands/dorado.rs

@@ -11,27 +11,52 @@ use uuid::Uuid;
 
 use crate::{
     collection::{bam::bam_composition, flowcells::FlowCell, pod5::FlowCellCase},
-    commands::Command,
+    commands::{Command, SlurmParams},
     config::Config,
     helpers::find_unique_file,
     io::pod5_infos::Pod5Info,
+    slurm_helpers::max_gpu_per_node,
 };
 
+/// Run Dorado basecalling on a directory of POD5 files.
+///
+/// This command:
+/// - Validates that the POD5 directory exists
+/// - Locates the first `.pod5` file to extract the sequencing kit (dont mix pod5 from different
+///   runs into the same dir)
+/// - Builds a Dorado `basecaller` command line using the configured arguments
+///
+/// The resulting BAM is written to `output_bam`.
 #[derive(Debug)]
 pub struct DoradoBasecall {
+    /// Path to the Dorado executable.
     dorado: PathBuf,
-    output_bam: PathBuf,
+    /// Directory containing `.pod5` reads.
     pod5_dir: PathBuf,
+    /// Output BAM file produced by Dorado.
+    output_bam: PathBuf,
+    /// Sequencing kit extracted from the POD5 file metadata.
     sequencing_kit: String,
+    /// Additional basecalling arguments from configuration.
     dorado_basecall_arg: String,
 }
 
 impl DoradoBasecall {
-    pub fn from_config(config: &Config, pod5_dir: PathBuf, output_bam: PathBuf) -> Self {
+    /// Build a `DoradoBasecall` command from configuration and input/output paths.
+    ///
+    /// # Parameters
+    /// - `config`: global configuration providing Dorado binary path and args
+    /// - `pod5_dir`: directory containing POD5 files
+    /// - `output_bam`: destination BAM path
+    pub fn from_config(
+        config: &Config,
+        pod5_dir: impl AsRef<Path>,
+        output_bam: impl AsRef<Path>,
+    ) -> Self {
         Self {
             dorado: (&config.align.dorado_bin).into(),
-            output_bam,
-            pod5_dir,
+            pod5_dir: pod5_dir.as_ref().into(),
+            output_bam: output_bam.as_ref().into(),
             sequencing_kit: String::new(),
             dorado_basecall_arg: config.align.dorado_basecall_arg.clone(),
         }
@@ -39,6 +64,7 @@ impl DoradoBasecall {
 }
 
 impl Command for DoradoBasecall {
+    /// Validate input directory, ensure no output overwrite, and extract sequencing kit.
     fn init(&mut self) -> anyhow::Result<()> {
         if !self.pod5_dir.exists() || !self.pod5_dir.is_dir() {
             anyhow::bail!(
@@ -68,6 +94,7 @@ impl Command for DoradoBasecall {
         Ok(())
     }
 
+    /// Build the Dorado basecaller command.
     fn cmd(&self) -> String {
         let dorado_bin = &self.dorado;
         let pod_dir = &self.pod5_dir;
@@ -81,44 +108,123 @@ impl Command for DoradoBasecall {
             pod_dir.display(),
             bam.display()
         )
+    }
+}
+
+/// Slurm execution parameters for `DoradoBasecall`.
+///
+/// This configuration launches Dorado on a GPU partition with:
+/// - 10 CPU threads (`--cpus-per-task=10`)
+/// - 60 GB memory
+/// - 4× H100 GPUs
+///
+/// # Performance Notes
+///
+/// Dorado basecalling throughput increases with CPU count when using GPUs.
+/// Example measurements (Samples/s):
+///
+/// │ CPUs │ Throughput (Samples/s)     │
+/// │------│-----------------------------│
+/// │  4   │ 5.36×10⁷                    │
+/// │  5   │ 6.16×10⁷                    │
+/// │  6   │ 6.87×10⁷                    │
+/// │  7   │ 7.23×10⁷                    │
+/// │  8   │ 7.67×10⁷                    │
+/// │ 10   │ 8.40×10⁷                    │
+/// │ 15   │ 8.78×10⁷                    │
+///
+/// Throughput gains diminish beyond ~10 CPUs when the GPU becomes the bottleneck.
+/// The runner uses **10 CPUs** by default as a balanced configuration.
+///
+/// # Resulting Slurm Command
+///
+/// Equivalent to:
+///
+/// ```text
+/// srun \
+///   --job-name=dorado_basecall \
+///   --cpus-per-task=10 \
+///   --mem=60G \
+///   --partition=gpgpuq \
+///   --gres=gpu:h100:4 \
+///   bash -c "<dorado command…>"
+/// ```
+impl super::SlurmRunner for DoradoBasecall {
+    fn slurm_args(&self) -> Vec<String> {
+        let (gpu, n) = if let (Some(h100_av), Some(a100_av)) = (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
+        {
+            let (gpu, n) = if h100_av >= a100_av {
+                ("h100", h100_av)
+            } else {
+                ("a100", a100_av)
+            };
 
-        // let samtools_view = format!(
-        //     "{} view -h -@ {samtools_view_threads} -b /dev/stdin",
-        //     samtools.display()
-        // );
-        // let samtools_sort = format!(
-        //     "{} sort -@ {samtools_sort_threads} /dev/stdin -o {}",
-        //     samtools.display(),
-        //     bam.display()
-        // );
-        //
-        // // format!("{dorado} | {samtools_view} | {samtools_sort}")
-        // format!("{dorado} | {samtools_view} > {}", bam.display())
+            let n = n.max(2);
+            (gpu, n)
+        } else {
+            panic!("Are you running slurm with a100 and h100 GPU ?");
+        };
+        super::SlurmParams {
+            job_name: Some("dorado_basecall".into()),
+            cpus_per_task: Some(10),
+            mem: Some("60G".into()),
+            partition: Some("gpgpuq".into()),
+            gres: Some(format!("gpu:{gpu}:{n}")),
+        }
+        .to_args()
     }
 }
 
+/// Run Dorado alignment using a reference FASTA and an input BAM.
+///
+/// This command:
+/// - Validates input/output paths
+/// - Invokes Dorado `aligner`
+/// - Produces an aligned BAM at `output`
 #[derive(Debug)]
 pub struct DoradoAlign {
+    /// Path to the Dorado executable.
     pub dorado: PathBuf,
+    /// Reference FASTA used for alignment.
     pub reference: PathBuf,
+    /// Input BAM to align.
     pub input: PathBuf,
+    /// Output aligned BAM.
     pub output: PathBuf,
+    /// Number of threads for the Dorado aligner.
     pub threads: u8,
+    /// Slurm params
+    pub slurm_params: SlurmParams,
 }
 
 impl DoradoAlign {
-    pub fn from_config(config: &Config, input: PathBuf, output: PathBuf) -> Self {
+    /// Build a `DoradoAlign` command from configuration and input/output paths.
+    ///
+    /// # Parameters
+    /// - `config`: global configuration
+    /// - `input`: input BAM
+    /// - `output`: aligned BAM
+    pub fn from_config(config: &Config, input: impl AsRef<Path>, output: impl AsRef<Path>) -> Self {
+        let threads = config.align.dorado_aligner_threads;
         Self {
             dorado: (&config.align.dorado_bin).into(),
             reference: (&config.align.ref_fa).into(),
-            input,
-            output,
-            threads: config.align.dorado_aligner_threads,
+            input: input.as_ref().into(),
+            output: output.as_ref().into(),
+            threads,
+            slurm_params: SlurmParams {
+                job_name: Some("dorado_align".into()),
+                cpus_per_task: Some(threads.into()),
+                mem: Some("60G".into()),
+                partition: Some("gpgpuq".into()),
+                gres: None,
+            },
         }
     }
 }
 
 impl super::Command for DoradoAlign {
+    /// Validate input BAM and ensure the output does not already exist.
     fn init(&mut self) -> anyhow::Result<()> {
         if !self.input.exists() {
             anyhow::bail!(
@@ -136,6 +242,8 @@ impl super::Command for DoradoAlign {
 
         Ok(())
     }
+
+    /// Build the Dorado aligner command.
     fn cmd(&self) -> String {
         format!(
             "{} aligner --threads {} --allow-sec-supp --mm2-opts '--secondary yes' {} {} > {}",
@@ -149,48 +257,27 @@ impl super::Command for DoradoAlign {
 }
 
 impl super::SlurmRunner for DoradoAlign {
+    /// Default Slurm parameters for running the Dorado aligner.
     fn slurm_args(&self) -> Vec<String> {
-        super::SlurmParams {
-            job_name: Some("dorado_align".into()),
-            cpus_per_task: Some(self.threads.into()),
-            mem: Some("60G".into()),
-            partition: Some("gpgpuq".into()),
-            gres: None,
-        }
-        .to_args()
+        self.slurm_params.to_args()
     }
 }
 
-// Running with Slurm: srun --job-name=dorado_basecall --cpus-per-task=X --mem=60G --partition=gpgpuq --gres=gpu:h100:4 bash -c /mnt/beegfs02/scratch/t_steimle/tools/dorado-latest-linux-x64/bin/dorado basecaller --kit-name SQK-NBD114-24 -x 'cuda:all' sup,5mC_5hmC /mnt/beegfs02/scratch/t_steimle/test_data/inputs/pod5/A --trim all --emit-moves > /mnt/beegfs02/scratch/t_steimle/test_data/outputs/aligned_5.bam
-// 04 cpu Basecalled @ Samples/s: 5.359770e+07
-// 05 cpu Basecalled @ Samples/s: 6.155305e+07
-// 06 cpu Basecalled @ Samples/s: 6.870292e+07
-// 07 cpu Basecalled @ Samples/s: 7.230430e+07
-// 08 cpu Basecalled @ Samples/s: 7.669054e+07
-// 10 cpu Basecalled @ Samples/s: 8.398348e+07
-// 15 cpu Basecalled @ Samples/s: 8.776285e+07
-impl super::SlurmRunner for DoradoBasecall {
-    fn slurm_args(&self) -> Vec<String> {
-        super::SlurmParams {
-            job_name: Some("dorado_basecall".into()),
-            cpus_per_task: Some(10),
-            mem: Some("60G".into()),
-            partition: Some("gpgpuq".into()),
-            gres: Some("gpu:h100:4".into()),
-        }
-        .to_args()
+impl super::SbatchRunner for DoradoAlign {
+    /// Default Slurm parameters for running the Dorado aligner.
+    fn slurm_params(&self) -> SlurmParams {
+        self.slurm_params.clone()
     }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
-    use log::info;
     use crate::TEST_DIR;
+    use log::info;
 
     use crate::{commands::SlurmRunner, config::Config, helpers::test_init};
 
-
     #[test]
     fn dorado_basecall() -> anyhow::Result<()> {
         test_init();
@@ -199,8 +286,8 @@ mod tests {
 
         let mut dca = DoradoBasecall::from_config(
             &config,
-            format!("{}/inputs/pod5/A", TEST_DIR.as_str()).into(),
-            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()).into(),
+            format!("{}/inputs/pod5/A", TEST_DIR.as_str()),
+            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()),
         );
 
         info!("Basecalling");
@@ -219,8 +306,8 @@ mod tests {
 
         let mut dca = DoradoAlign::from_config(
             &config,
-            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()).into(),
-            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()),
         );
 
         info!("Basecalling");
@@ -228,7 +315,6 @@ mod tests {
 
         Ok(())
     }
-
 }
 
 #[derive(Debug, Clone)]

+ 118 - 5
src/commands/mod.rs

@@ -148,6 +148,7 @@ pub trait Runner: Command {
         Ok(CapturedOutput {
             stdout: captured_stdout,
             stderr: captured_stderr,
+            slurm_epilog: None,
         })
     }
 }
@@ -160,6 +161,7 @@ use log::info;
 pub struct CapturedOutput {
     pub stdout: String,
     pub stderr: String,
+    pub slurm_epilog: Option<SlurmEpilog>,
 }
 
 /// Super-trait adding a Slurm `run()` method on top of [`Command`].
@@ -270,7 +272,11 @@ pub trait SlurmRunner: Command {
 
         self.clean_up()?;
 
-        Ok(CapturedOutput { stdout, stderr })
+        Ok(CapturedOutput {
+            stdout,
+            stderr,
+            slurm_epilog: None,
+        })
     }
 }
 
@@ -386,6 +392,8 @@ pub trait SbatchRunner: Command {
             .unwrap_or("")
             .to_string();
 
+        info!("Running jobid: {job_id}");
+
         if job_id.is_empty() {
             anyhow::bail!("failed to parse job id from sbatch output: {sbatch_stdout}");
         }
@@ -398,6 +406,7 @@ pub trait SbatchRunner: Command {
         let captured_stdout = Arc::new(Mutex::new(String::new()));
         let captured_stdout_clone = Arc::clone(&captured_stdout);
 
+        let job_id_clone = job_id.clone();
         let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
             let mut pos: u64 = 0;
             let mut file_opened = false;
@@ -411,7 +420,7 @@ pub trait SbatchRunner: Command {
                             let mut remaining = Vec::new();
                             if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
                                 let s = String::from_utf8_lossy(&remaining);
-                                print!("{s}");
+                                s.split("\n").for_each(|v| info!("{job_id_clone}: {v}"));
                                 if let Ok(mut c) = captured_stdout_clone.lock() {
                                     c.push_str(&s);
                                 }
@@ -441,7 +450,8 @@ pub trait SbatchRunner: Command {
                                     let bytes = &buf[..n];
                                     // forward to terminal
                                     let s = String::from_utf8_lossy(bytes);
-                                    print!("{s}");
+                                    s.split("\n").for_each(|v| info!("{job_id_clone}: {v}"));
+                                    // print!("{s}");
                                     io::stdout().flush().ok();
                                     // capture
                                     if let Ok(mut c) = captured_stdout_clone.lock() {
@@ -490,7 +500,7 @@ pub trait SbatchRunner: Command {
                 break;
             }
 
-            thread::sleep(Duration::from_secs(2));
+            thread::sleep(Duration::from_secs(1));
         }
 
         // Wait for output file to stop growing (SLURM epilog may still be writing)
@@ -531,11 +541,111 @@ pub trait SbatchRunner: Command {
         // Remove the SLURM output file
         let _ = std::fs::remove_file(&out_file);
 
+        let (stdout, slurm_epilog) = split_output(&stdout);
+
         Ok(CapturedOutput {
             stdout,
             stderr: String::new(), // all job output is in the .out file
+            slurm_epilog,
+        })
+    }
+}
+
+use std::collections::HashMap;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct SlurmEpilog {
+    pub job_id: String,
+    pub cluster: String,
+    pub user: String,
+    pub group: String,
+    pub nodelist: String,
+    pub cores: u32,
+    pub job_started_at: String,
+    pub job_ended_at: String,
+    pub wall_clock_time: String,
+    pub cpu_utilized: String,
+    pub cpu_efficiency: String,
+    pub memory_utilized: String,
+    pub memory_efficiency: String,
+}
+
+impl SlurmEpilog {
+    /// Parse SLURM epilog from output string
+    pub fn parse(output: &str) -> Option<Self> {
+        // Find the epilog section
+        let epilog_start = output.find("SLURM EPILOG")?;
+        let epilog_section = &output[epilog_start..];
+
+        // Parse key-value pairs
+        let mut fields: HashMap<String, String> = HashMap::new();
+
+        for line in epilog_section.lines() {
+            if let Some((key, value)) = Self::parse_line(line) {
+                fields.insert(key, value);
+            }
+        }
+
+        // Extract user/group
+        let (user, group) = fields.get("User/Group").and_then(|s| {
+            let parts: Vec<&str> = s.split('/').collect();
+            if parts.len() == 2 {
+                Some((parts[0].to_string(), parts[1].to_string()))
+            } else {
+                None
+            }
+        })?;
+
+        Some(SlurmEpilog {
+            job_id: fields.get("Job ID")?.clone(),
+            cluster: fields.get("Cluster")?.clone(),
+            user,
+            group,
+            nodelist: fields.get("Nodelist")?.clone(),
+            cores: fields.get("Cores")?.parse().ok()?,
+            job_started_at: fields.get("Job started at")?.clone(),
+            job_ended_at: fields.get("Job ended at")?.clone(),
+            wall_clock_time: fields.get("Job Wall-clock time")?.clone(),
+            cpu_utilized: fields.get("CPU Utilized")?.clone(),
+            cpu_efficiency: fields.get("CPU Efficiency")?.clone(),
+            memory_utilized: fields.get("Memory Utilized")?.clone(),
+            memory_efficiency: fields.get("Memory Efficiency")?.clone(),
         })
     }
+
+    fn parse_line(line: &str) -> Option<(String, String)> {
+        // Skip lines that are just decorations or headers
+        if line.contains("---") || line.contains("SLURM EPILOG") || line.trim().is_empty() {
+            return None;
+        }
+
+        // Look for "Key: Value" pattern
+        let parts: Vec<&str> = line.splitn(2, ':').collect();
+        if parts.len() == 2 {
+            let key = parts[0].trim().to_string();
+            let value = parts[1].trim().to_string();
+            if !key.is_empty() && !value.is_empty() {
+                return Some((key, value));
+            }
+        }
+
+        None
+    }
+}
+
+/// Split output into job output and epilog
+pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
+    if let Some(epilog_start) = full_output.find("----------------------------------------------") {
+        // Check if this is actually the epilog header
+        if full_output[epilog_start..].contains("SLURM EPILOG") {
+            let job_output = full_output[..epilog_start].trim_end().to_string();
+            let epilog = SlurmEpilog::parse(full_output);
+            return (job_output, epilog);
+        }
+    }
+
+    // No epilog found, return full output
+    (full_output.to_string(), None)
 }
 
 /// Run multiple SbatchRunner jobs in parallel.
@@ -573,4 +683,7 @@ where
 }
 
 #[cfg(test)]
-mod tests;
+mod tests {
+    use super::*;
+}
+

+ 26 - 30
src/commands/samtools.rs

@@ -1,4 +1,7 @@
-use std::{fs, path::PathBuf};
+use std::{
+    fs,
+    path::{Path, PathBuf},
+};
 
 use anyhow::Context;
 use log::info;
@@ -243,12 +246,16 @@ pub struct SamtoolsSplit {
 }
 
 impl SamtoolsSplit {
-    pub fn from_config(config: &Config, input_bam: PathBuf, output_dir: PathBuf) -> Self {
+    pub fn from_config(
+        config: &Config,
+        input_bam: impl AsRef<Path>,
+        output_dir: impl AsRef<Path>,
+    ) -> Self {
         Self {
             samtools: (&config.align.samtools_bin).into(),
             threads: config.align.samtools_split_threads,
-            input_bam,
-            output_dir,
+            input_bam: input_bam.as_ref().into(),
+            output_dir: output_dir.as_ref().into(),
         }
     }
 }
@@ -316,13 +323,17 @@ pub struct SamtoolsSort {
 }
 
 impl SamtoolsSort {
-    pub fn from_config(config: &Config, input_bam: PathBuf, output_bam: PathBuf) -> Self {
+    pub fn from_config(
+        config: &Config,
+        input_bam: impl AsRef<Path>,
+        output_bam: impl AsRef<Path>,
+    ) -> Self {
         let threads = config.align.samtools_sort_threads;
         Self {
             samtools: (&config.align.samtools_bin).into(),
             threads,
-            input_bam,
-            output_bam,
+            input_bam: input_bam.as_ref().into(),
+            output_bam: output_bam.as_ref().into(),
             slurm_params: SlurmParams {
                 job_name: Some("samtools_split".into()),
                 cpus_per_task: Some(threads as u32),
@@ -373,7 +384,7 @@ impl SbatchRunner for SamtoolsSort {
     }
 }
 
-struct TestRun;
+pub struct TestRun;
 
 impl super::Command for TestRun {
     fn cmd(&self) -> String {
@@ -393,23 +404,8 @@ impl SbatchRunner for TestRun {
             gres: None,
         }
     }
-
-    fn sbatch_extra_args(&self) -> Vec<String> {
-        // If your cluster requires account:
-        // vec!["--account=myaccount".into()]
-        Vec::new()
-    }
 }
 
-// pub struct TestRun {}
-// impl super::Command for TestRun {
-//     fn cmd(&self) -> String {
-//         "echo 'test'".to_string()
-//     }
-// }
-// impl super::Runner for TestRun {}
-// impl super::SlurmRunner for TestRun {}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -471,7 +467,7 @@ mod tests {
         info!("Mergin both BAM.");
         let mut idx = SamtoolsMerge {
             bin: config.align.samtools_bin,
-            threads: config.align.samtools_merge_threads as u8,
+            threads: config.align.samtools_merge_threads,
             bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(),
             into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(),
             tmp_bam: "".into(),
@@ -492,8 +488,8 @@ mod tests {
 
         let mut cmd = SamtoolsSplit::from_config(
             &config,
-            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
-            format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()),
         );
 
         let captured_output = SlurmRunner::run(&mut cmd)?;
@@ -509,14 +505,14 @@ mod tests {
 
         let sort_1 = SamtoolsSort::from_config(
             &config,
-            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()).into(),
-            format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()),
         );
 
         let sort_2 = SamtoolsSort::from_config(
             &config,
-            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()).into(),
-            format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()),
         );
 
         let r = run_many_sbatch(vec![sort_1, sort_2])?;

+ 50 - 1
src/helpers.rs

@@ -469,7 +469,37 @@ pub fn find_files(pattern: &str) -> anyhow::Result<Vec<PathBuf>> {
 //         .map(|duration| Utc.timestamp(duration.as_secs() as i64, duration.subsec_nanos()))
 // }
 
-pub fn list_directories(dir_path: &str) -> std::io::Result<Vec<String>> {
+/// List all files in a directory that have the given suffix (file extension).
+///
+/// # Arguments
+/// * `dir` – Directory to scan.
+/// * `suffix` – File extension to match (without the dot).
+///
+/// # Returns
+/// A vector of `PathBuf` entries whose extension matches `suffix`.
+///
+/// # Errors
+/// Returns any I/O error from `read_dir` or directory traversal.
+pub fn list_files_with_ext(dir: impl AsRef<Path>, ext: &str) -> std::io::Result<Vec<PathBuf>> {
+    let mut out = Vec::new();
+
+    for entry in fs::read_dir(dir)? {
+        let entry = entry?;
+        let path = entry.path();
+
+        if path.is_file() {
+            if let Some(file_ext) = path.extension().and_then(|e| e.to_str()) {
+                if file_ext == ext {
+                    out.push(path);
+                }
+            }
+        }
+    }
+
+    Ok(out)
+}
+
+pub fn list_directories(dir_path: PathBuf) -> std::io::Result<Vec<String>> {
     let mut directories = Vec::new();
 
     for entry in fs::read_dir(dir_path)? {
@@ -680,4 +710,23 @@ pub fn test_init() {
         .try_init();
 }
 
+use regex::Regex;
+
+/// Extracts the numeric barcode suffix from a filename.
+///
+/// # Examples
+///
+/// ```
+/// let f = "sample_SQK-NBD114-24_barcode07.bam";
+/// assert_eq!(extract_barcode(f), Some(7));
+/// ```
+///
+/// Returns `None` if no `barcodeNN` pattern is found
+/// or if the number cannot be parsed.
+pub fn extract_barcode(name: &str) -> Option<u32> {
+    let re = Regex::new(r"barcode(\d+)").unwrap();
+    re.captures(name)
+        .and_then(|c| c.get(1))
+        .and_then(|m| m.as_str().parse::<u32>().ok())
+}
 

+ 23 - 1
src/io/pod5_infos.rs

@@ -11,7 +11,7 @@ use chrono::{DateTime, Utc};
 use super::pod5_footer_generated::minknow::reads_format::{ContentType, Footer};
 // use podders::{root_as_footer, ContentType};
 
-pub fn root_as_footer(buf: &[u8]) -> Result<Footer, flatbuffers::InvalidFlatbuffer> {
+pub fn root_as_footer(buf: &[u8]) -> Result<Footer<'_>, flatbuffers::InvalidFlatbuffer> {
   flatbuffers::root::<Footer>(buf)
 }
 
@@ -266,3 +266,25 @@ fn read_arrow_table(
 
     Ok(batches)
 }
+
+
+#[cfg(test)]
+mod tests {
+    use crate::helpers::{list_files_with_ext, test_init};
+    use crate::TEST_DIR;
+
+    use super::*;
+
+    #[test]
+    fn load_pod5s() -> anyhow::Result<()> {
+        test_init();
+
+        let pods = list_files_with_ext("/mnt/beegfs02/scratch/t_steimle/prom_runs/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/pod5_recovered", "pod5")?;
+
+        if let Some(fpod_path) = pods.first() {
+            let i = Pod5Info::from_pod5(fpod_path.to_str().unwrap());
+            println!("{i:#?}");
+        }
+        Ok(())
+    }
+}

+ 43 - 42
src/lib.rs

@@ -148,6 +148,7 @@ pub mod annotation;
 pub mod cases;
 pub mod scan;
 pub mod math;
+pub mod slurm_helpers;
 
 #[macro_use]
 extern crate lazy_static;
@@ -178,7 +179,7 @@ mod tests {
     use runners::Run;
     use variant::{variant::{Variants, VcfVariant}, variant_collection};
 
-    use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
+    use self::{/* collection::pod5::{FlowCellCase, Pod5Collection}, */ commands::dorado, config::Config};
     use super::*;
     use crate::{annotation::Annotation, callers::{clairs::ClairS, deep_somatic::DeepSomatic, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}, savana::SavanaCN}, collection::{bam::{self, nt_pileup_new}, flowcells::{scan_archive, FlowCells}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig, ShouldRun}, commands::dorado::Dorado, helpers::find_files, io::{bed::bedrow_overlaps_par, dict::read_dict, gff::features_ranges}, pipes::somatic::const_stats, positions::{merge_overlapping_genome_ranges, range_intersection_par, sort_ranges}, scan::scan::somatic_scan, variant::{variant::{AlterationCategory, BNDDesc, BNDGraph, GroupByThreshold, ToBNDGraph}, variant_collection::{group_variants_by_bnd_desc, group_variants_by_bnd_rc, Variant, VariantCollection}, variants_stats::{self, somatic_depth_quality_ranges, VariantsStats}}};
 
@@ -203,30 +204,30 @@ mod tests {
         // modkit::modkit(bam_path);
     }
 
-    #[test]
-    fn run_dorado() -> anyhow::Result<()> {
-        init();
-        let case = FlowCellCase { 
-            id: "CONSIGNY".to_string(), 
-            time_point: "mrd".to_string(), barcode: "07".to_string(), pod_dir: "/mnt/beegfs01/ data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".into() 
-        };
-        dorado::Dorado::init(case, Config::default())?.run_pipe()
-    }
-
-    #[test]
-    fn pod5() -> anyhow::Result<()> {
-        let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
-            .build();
+    // #[test]
+    // fn run_dorado() -> anyhow::Result<()> {
+    //     init();
+    //     let case = FlowCellCase { 
+    //         id: "CONSIGNY".to_string(), 
+    //         time_point: "mrd".to_string(), barcode: "07".to_string(), pod_dir: "/mnt/beegfs01/ data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".into() 
+    //     };
+    //     dorado::Dorado::init(case, Config::default())?.run_pipe()
+    // }
 
-        let coll = Pod5Collection::new(
-            "/data/run_data",
-            "/data/flow_cells.tsv",
-            "/data/longreads_basic_pipe",
-        )?;
-        println!("{coll:#?}");
-        // let runs = Runs::import_dir("/home/prom/store/banana-pool/run_data", "/data/flow_cells.tsv")?;
-        Ok(())
-    }
+    // #[test]
+    // fn pod5() -> anyhow::Result<()> {
+    //     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
+    //         .build();
+    //
+    //     let coll = Pod5Collection::new(
+    //         "/data/run_data",
+    //         "/data/flow_cells.tsv",
+    //         "/data/longreads_basic_pipe",
+    //     )?;
+    //     println!("{coll:#?}");
+    //     // let runs = Runs::import_dir("/home/prom/store/banana-pool/run_data", "/data/flow_cells.tsv")?;
+    //     Ok(())
+    // }
 
     #[test]
     fn bam() -> anyhow::Result<()> {
@@ -260,24 +261,24 @@ mod tests {
 
     // pod5 view -I /data/run_data/20240903-CL/ARMEM-DG-N02_ASSJU-DG-N03/20240903_1428_1B_PAW47629_fc24c3cf/pod5/PAW47629_fc24c3cf_77b07847_0.pod5 | head -5000 | awk '{if(NR==1){print "target,"$0}else{print "subset_1.pod5,"$0}}' > /tmp/subset_ids.csv
     // pod5 subset /data/run_data/20240903-CL/ARMEM-DG-N02_ASSJU-DG-N03/20240903_1428_1B_PAW47629_fc24c3cf/pod5/PAW47629_fc24c3cf_77b07847_0.pod5 --csv /tmp/subset_ids.csv -o /data/test_suite/pod5/muxed/
-    #[test]
-    fn mux() -> anyhow::Result<()> {
-        init();
-        let result_dir = "/data/test_suite/results".to_string();
-        let cases = vec![
-            FlowCellCase { id: "test_02".to_string(), time_point: "diag".to_string(), barcode: "02".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() },
-            FlowCellCase { id: "test_03".to_string(), time_point: "diag".to_string(), barcode: "03".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() },
-        ];
-
-        cases.iter().for_each(|c| {
-            let dir = format!("{result_dir}/{}", c.id);
-            if Path::new(&dir).exists() {
-                fs::remove_dir_all(dir).unwrap();
-            }
-        });
-        let config = Config { result_dir, ..Default::default()  };
-        Dorado::from_mux(cases, config)
-    }
+    // #[test]
+    // fn mux() -> anyhow::Result<()> {
+    //     init();
+    //     let result_dir = "/data/test_suite/results".to_string();
+    //     let cases = vec![
+    //         FlowCellCase { id: "test_02".to_string(), time_point: "diag".to_string(), barcode: "02".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() },
+    //         FlowCellCase { id: "test_03".to_string(), time_point: "diag".to_string(), barcode: "03".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() },
+    //     ];
+    //
+    //     cases.iter().for_each(|c| {
+    //         let dir = format!("{result_dir}/{}", c.id);
+    //         if Path::new(&dir).exists() {
+    //             fs::remove_dir_all(dir).unwrap();
+    //         }
+    //     });
+    //     let config = Config { result_dir, ..Default::default()  };
+    //     Dorado::from_mux(cases, config)
+    // }
 
     // #[test_log::test]
     // fn clairs() -> anyhow::Result<()> {

+ 74 - 17
src/pipes/somatic_slurm.rs

@@ -1,45 +1,102 @@
-use std::{fs, path::PathBuf};
 use anyhow::Context;
+use std::{collections::HashMap, fs, path::PathBuf};
 use uuid::Uuid;
 
-use log::info;
+use log::{info, warn};
 
 use crate::{
     commands::{
-        SlurmRunner, dorado::{DoradoAlign, DoradoBasecall}, samtools::SamtoolsSplit
+        dorado::{DoradoAlign, DoradoBasecall},
+        run_many_sbatch,
+        samtools::SamtoolsSplit,
+        SlurmRunner,
     },
     config::Config,
+    helpers::{extract_barcode, list_files_with_ext},
 };
 
-pub fn basecall_align_split(pod5_dir: PathBuf, tmp_dir: PathBuf) -> anyhow::Result<()> {
+// const TEN_MB: u64 = 10 * 1024 * 1024;
+pub fn basecall_align_split(
+    pod5_dir: PathBuf,
+    tmp_dir: PathBuf,
+    min_bam_size: u64,
+) -> anyhow::Result<()> {
     let config = Config::default();
 
+    // Basecalling pod5 dir
     let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
     info!("Basecalling into: {}", tmp_basecalled_bam.display());
     let mut cmd = DoradoBasecall::from_config(&config, pod5_dir, tmp_basecalled_bam.clone());
     let _out = SlurmRunner::run(&mut cmd)?;
     info!("Basecalled ✅");
 
+    // Splitting by RG
     let tmp_split_dir = tmp_dir.join(format!("{}", Uuid::new_v4()));
     info!("Split into: {}", tmp_split_dir.display());
-    let mut cmd = SamtoolsSplit::from_config(&config, tmp_basecalled_bam, tmp_split_dir);
+    let mut cmd = SamtoolsSplit::from_config(&config, &tmp_basecalled_bam, tmp_split_dir.clone());
     let _out = SlurmRunner::run(&mut cmd)?;
+    fs::remove_file(tmp_basecalled_bam)?;
     info!("Splitted ✅");
 
+    let bams = list_files_with_ext(&tmp_split_dir, "bam").context(format!(
+        "error while listing bam files in: {}",
+        tmp_split_dir.display()
+    ))?;
 
+    // Aligments
+    let mut align_jobs = Vec::new();
+    let mut names_to_barcode = HashMap::new();
+    let mut no_barcode_bams = Vec::new();
+    for bam in bams.iter() {
+        if bam.metadata()?.len() > min_bam_size {
+            if let Some(Some(bam_file_name)) = bam.file_name().map(|name| name.to_str()) {
+                if let Some(barcode) = extract_barcode(bam_file_name) {
+                    let bam_out = tmp_dir.join(format!("{}_barcode{barcode}.bam", Uuid::new_v4()));
+                    names_to_barcode.insert(barcode, bam_out.clone());
+                    let job = DoradoAlign::from_config(&config, bam, bam_out);
+                    align_jobs.push(job);
+                } else {
+                    no_barcode_bams.push(bam);
+                    warn!("No barcode and no alignement for: {}", bam.display());
+                }
+            }
+        } else {
+            warn!("Low BAM size, will be removed: {}", bam.display());
+        }
+    }
+    let n_jobs = align_jobs.len();
+    info!("Running aligments of {n_jobs} BAMs.");
+    run_many_sbatch(align_jobs)
+        .context(format!("Failed to run aligment batch for {n_jobs} jobs"))?;
+    info!("Aligments of {n_jobs} BAMs done ✅.");
 
-    // let tmp_aligned_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
-    // info!("Aligning into: {}", tmp_aligned_bam.display());
-    // let mut cmd =
-    //     DoradoAlign::from_config(&config, tmp_basecalled_bam.clone(), tmp_aligned_bam.clone());
-    // let _out = SlurmRunner::run(&mut cmd)?;
-    // info!("Aligned ✅");
-    //
-    // fs::remove_file(&tmp_basecalled_bam).context(format!(
-    //     "Failed to remove: {}",
-    //     tmp_basecalled_bam.display()
-    // ))?;
+    for (i, no_barcode_bam) in no_barcode_bams.iter().enumerate() {
+        info!("Keeping: {}", no_barcode_bam.display());
+        fs::copy(no_barcode_bam, tmp_dir.join(format!("no_barcode_{i}.bam")))?;
+    }
+
+    fs::remove_dir_all(tmp_split_dir)?;
 
-    
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::helpers::test_init;
+    use crate::TEST_DIR;
+
+    use super::*;
+
+    #[test]
+    fn slurm_pipe() -> anyhow::Result<()> {
+        test_init();
+
+        basecall_align_split(
+            format!("{}/inputs/pod5/A", TEST_DIR.as_str()).into(),
+            format!("{}/outputs", TEST_DIR.as_str()).into(),
+            10 * 1024 * 1024,
+        )?;
+
+        Ok(())
+    }
+}

+ 249 - 0
src/slurm_helpers.rs

@@ -0,0 +1,249 @@
+use std::collections::HashMap;
+use std::process::Command;
+
+use anyhow::Context;
+
+#[derive(Debug)]
+struct NodeInfo {
+    partition: String,
+    node: String,
+    cpu_total: i32,
+    cpu_free: i32,
+    gpu_total: i32,
+    gpu_free: i32,
+    gpu_names: Vec<String>, // <- models/types for GRES
+}
+
+fn parse_kv_line(line: &str) -> HashMap<String, String> {
+    let mut map = HashMap::new();
+    for token in line.split_whitespace() {
+        if let Some(eq) = token.find('=') {
+            let (k, v) = token.split_at(eq);
+            map.insert(k.to_string(), v[1..].to_string());
+        }
+    }
+    map
+}
+
+// ---- GPU count helpers (as before) ----
+
+fn parse_gpu_from_tres(s: &str) -> u32 {
+    // handles "gres/gpu=4" and "gres/gpu:V100=4"
+    if let Some(idx) = s.find("gres/gpu") {
+        let rest = &s[idx + "gres/gpu".len()..];
+        let rest = rest.strip_prefix(':').unwrap_or(rest); // optional ":TYPE"
+        let rest = rest.split('=').nth(1).unwrap_or("");
+        let num: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect();
+        return num.parse().unwrap_or(0);
+    }
+    0
+}
+
+fn parse_gpu_from_gres(s: &str) -> u32 {
+    // handles "gpu:4" or "gpu:V100:4"
+    if let Some(idx) = s.find("gpu:") {
+        let rest = &s[idx + "gpu:".len()..];
+        // could be "V100:4" or "4"
+        let parts: Vec<&str> = rest.split(|c| c == ':' || c == '(').collect();
+        // last numeric field is count
+        for p in parts.iter().rev() {
+            if let Ok(v) = p.parse() {
+                return v;
+            }
+        }
+    }
+    0
+}
+
+fn extract_total_gpus(fields: &HashMap<String, String>) -> u32 {
+    if let Some(cfg) = fields.get("CfgTRES") {
+        let n = parse_gpu_from_tres(cfg);
+        if n > 0 {
+            return n;
+        }
+    }
+    if let Some(gres) = fields.get("Gres") {
+        return parse_gpu_from_gres(gres);
+    }
+    0
+}
+
+fn extract_alloc_gpus(fields: &HashMap<String, String>) -> u32 {
+    if let Some(alloc) = fields.get("AllocTRES") {
+        let n = parse_gpu_from_tres(alloc);
+        if n > 0 {
+            return n;
+        }
+    }
+    if let Some(gres_used) = fields.get("GresUsed") {
+        return parse_gpu_from_gres(gres_used);
+    }
+    0
+}
+
+// ---- GPU name / type extraction ----
+
+fn extract_gpu_names_from_gres(s: &str) -> Vec<String> {
+    // Examples:
+    //   "gpu:V100:4(S:0-3)"
+    //   "gpu:A100:8(S:0-7)"
+    //   "gpu:4(S:0-3)"           -> no type
+    let mut names = Vec::new();
+
+    for item in s.split(',') {
+        if let Some(idx) = item.find("gpu:") {
+            let rest = &item[idx + "gpu:".len()..];
+            // strip trailing "(S:...)" etc.
+            let rest = rest.split('(').next().unwrap_or(rest);
+            let parts: Vec<&str> = rest.split(':').collect();
+
+            if parts.len() == 1 {
+                // "4" -> no explicit type
+                continue;
+            } else {
+                // "V100:4" -> type = parts[0]
+                let name = parts[0].trim();
+                if !name.is_empty() && !name.chars().all(|c| c.is_ascii_digit()) {
+                    names.push(name.to_string());
+                }
+            }
+        }
+    }
+    names.sort();
+    names.dedup();
+    names
+}
+
+fn extract_gpu_names_from_cfg_tres(s: &str) -> Vec<String> {
+    // Example:
+    //   "cpu=64,gres/gpu:V100=4,gres/gpu:A100=2"
+    let mut names = Vec::new();
+
+    for item in s.split(',') {
+        if let Some(idx) = item.find("gres/gpu") {
+            let rest = &item[idx + "gres/gpu".len()..];
+            if let Some(rest) = rest.strip_prefix(':') {
+                // rest = "V100=4"
+                let name = rest.split('=').next().unwrap_or("").trim();
+                if !name.is_empty() {
+                    names.push(name.to_string());
+                }
+            }
+        }
+    }
+    names.sort();
+    names.dedup();
+    names
+}
+
+fn extract_gpu_names(fields: &HashMap<String, String>) -> Vec<String> {
+    // Prefer CfgTRES if it has types, else Gres
+    if let Some(cfg) = fields.get("CfgTRES") {
+        let names = extract_gpu_names_from_cfg_tres(cfg);
+        if !names.is_empty() {
+            return names;
+        }
+    }
+    if let Some(gres) = fields.get("Gres") {
+        return extract_gpu_names_from_gres(gres);
+    }
+    Vec::new()
+}
+
+// ---- Node parsing ----
+
+fn parse_node(fields: HashMap<String, String>) -> NodeInfo {
+    let partition = fields.get("Partitions").cloned().unwrap_or_default();
+    let node = fields.get("NodeName").cloned().unwrap_or_default();
+
+    let cpu_total: i32 = fields
+        .get("CPUTot")
+        .and_then(|s| s.parse().ok())
+        .unwrap_or(0);
+    let cpu_alloc: i32 = fields
+        .get("CPUAlloc")
+        .and_then(|s| s.parse().ok())
+        .unwrap_or(0);
+    let cpu_free = cpu_total - cpu_alloc;
+
+    let gpu_total = extract_total_gpus(&fields) as i32;
+    let gpu_alloc = extract_alloc_gpus(&fields) as i32;
+    let gpu_free = gpu_total - gpu_alloc;
+
+    let gpu_names = extract_gpu_names(&fields);
+
+    NodeInfo {
+        partition,
+        node,
+        cpu_total,
+        cpu_free,
+        gpu_total,
+        gpu_free,
+        gpu_names,
+    }
+}
+
+pub fn slurm_availables() -> anyhow::Result<Vec<NodeInfo>> {
+    let output = Command::new("scontrol")
+        .args(["show", "nodes", "-o"])
+        .output()
+        .context("failed to run scontrol")?;
+
+    let stdout = String::from_utf8_lossy(&output.stdout);
+
+    let mut results = Vec::new();
+    for line in stdout.lines() {
+        let kv = parse_kv_line(line);
+        let info = parse_node(kv);
+        results.push(info);
+    }
+
+    Ok(results)
+}
+
+pub fn max_gpu_per_node(gpu_name: &str) -> Option<i32> {
+    let nodes = slurm_availables().ok()?;
+    nodes.iter()
+        .filter(|n| n.gpu_names.iter().any(|g| g.eq_ignore_ascii_case(gpu_name)))
+        .map(|n| n.gpu_free)
+        .max()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::slurm_helpers::{max_gpu_per_node, slurm_availables};
+
+    #[test]
+    fn slurm_info() -> anyhow::Result<()> {
+        println!(
+            "{:15} {:15} {:>7} {:>8} {:>7} {:>8}  {}",
+            "Partition", "Node", "CPU_TOT", "CPU_FREE", "GPU_TOT", "GPU_FREE", "GPU_NAMES"
+        );
+
+        for info in slurm_availables()? {
+            let names = if info.gpu_names.is_empty() {
+                "-".to_string()
+            } else {
+                info.gpu_names.join(",")
+            };
+
+            println!(
+                "{:15} {:15} {:7} {:8} {:7} {:8}  {}",
+                info.partition,
+                info.node,
+                info.cpu_total,
+                info.cpu_free,
+                info.gpu_total,
+                info.gpu_free,
+                names
+            );
+        }
+
+        let u  = max_gpu_per_node("h100");
+        println!("{u:#?}");
+                let u  = max_gpu_per_node("a100");
+        println!("{u:#?}");
+
+        Ok(())
+    }
+}