Ver código fonte

In the CLUSTER !!!

Thomas 3 meses atrás
pai
commit
4de4f971fe
9 arquivos alterados com 1612 adições e 174 exclusões
  1. 20 7
      pandora-config.example.toml
  2. 418 148
      src/commands/dorado.rs
  3. 570 2
      src/commands/mod.rs
  4. 527 0
      src/commands/samtools.rs
  5. 30 16
      src/config.rs
  6. 0 1
      src/helpers.rs
  7. 1 0
      src/lib.rs
  8. 1 0
      src/pipes/mod.rs
  9. 45 0
      src/pipes/somatic_slurm.rs

+ 20 - 7
pandora-config.example.toml

@@ -23,8 +23,6 @@ db_cases_path = "/data/cases.sqlite"
 conda_sh = "/data/miniconda3/etc/profile.d/conda.sh"
 
 
-
-
 #######################################
 # Reference genome & annotations
 #######################################
@@ -346,20 +344,35 @@ promethion_runs_input = "/data/pandora-flowcell-id.json"
 
 [align]
 # Path to Dorado binary.
-dorado_bin = "/data/tools/dorado-1.1.1-linux-x64/bin/dorado"
+dorado_bin = "/mnt/beegfs02/scratch/t_steimle/tools/dorado-latest-linux-x64/bin/dorado"
 
 # Dorado basecalling arguments (device, model, modifications…).
-dorado_basecall_arg = "-x 'cuda:0,1,2,3' sup,5mC_5hmC"
+dorado_basecall_arg = "-x 'cuda:all' sup,5mC_5hmC"
+
+# Should dorado re-align after demux ?
+dorado_should_realign = false
+
+# Dorado aligner threads number
+dorado_aligner_threads = 80
 
 # Reference FASTA used for alignment.
-ref_fa = "/data/ref/hs1/chm13v2.0.fa"
+ref_fa = "/mnt/beegfs02/scratch/t_steimle/ref/hs1/chm13v2.0.fa"
 
 # Minimap2 index used for alignment.
-ref_mmi = "/data/ref/chm13v2.0.mmi"
+ref_mmi = "/mnt/beegfs02/scratch/t_steimle/ref/chm13v2.0.mmi"
+
+# Samtools bin 
+samtools_bin = "/mnt/beegfs02/scratch/t_steimle/tools/samtools"
 
 # Threads for `samtools view`.
 samtools_view_threads = 20
 
 # Threads for `samtools sort`.
-samtools_sort_threads = 50
+samtools_sort_threads = 48
+
+# Threads for `samtools merge`.
+samtools_merge_threads = 48
+
+# Threads for `samtools split`.
+samtools_split_threads = 48
 

+ 418 - 148
src/commands/dorado.rs

@@ -4,17 +4,233 @@ use std::{
     time::SystemTime,
 };
 
+use anyhow::Context;
 use duct::cmd;
 use log::{debug, info, warn};
 use uuid::Uuid;
 
 use crate::{
     collection::{bam::bam_composition, flowcells::FlowCell, pod5::FlowCellCase},
+    commands::Command,
     config::Config,
     helpers::find_unique_file,
     io::pod5_infos::Pod5Info,
 };
 
+#[derive(Debug)]
+pub struct DoradoBasecall {
+    dorado: PathBuf,
+    output_bam: PathBuf,
+    pod5_dir: PathBuf,
+    sequencing_kit: String,
+    dorado_basecall_arg: String,
+}
+
+impl DoradoBasecall {
+    pub fn from_config(config: &Config, pod5_dir: PathBuf, output_bam: PathBuf) -> Self {
+        Self {
+            dorado: (&config.align.dorado_bin).into(),
+            output_bam,
+            pod5_dir,
+            sequencing_kit: String::new(),
+            dorado_basecall_arg: config.align.dorado_basecall_arg.clone(),
+        }
+    }
+}
+
+impl Command for DoradoBasecall {
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.pod5_dir.exists() || !self.pod5_dir.is_dir() {
+            anyhow::bail!(
+                "The pod5 dir is not accessible.\n{}",
+                self.pod5_dir.display()
+            );
+        }
+
+        if self.output_bam.exists() {
+            anyhow::bail!("The output BAM file already exists.");
+        }
+
+        let pod_path = fs::read_dir(&self.pod5_dir)
+            .context(format!(
+                "Failed to read pod5 dir: {}",
+                self.pod5_dir.display()
+            ))?
+            .filter_map(Result::ok) // keep only Ok entries
+            .map(|e| e.path())
+            .find(|p| p.extension().and_then(|e| e.to_str()) == Some("pod5"))
+            .context("No .pod5 file found")?;
+
+        self.sequencing_kit = Pod5Info::from_pod5(&pod_path.to_string_lossy())
+            .sequencing_kit
+            .to_uppercase();
+
+        Ok(())
+    }
+
+    fn cmd(&self) -> String {
+        let dorado_bin = &self.dorado;
+        let pod_dir = &self.pod5_dir;
+        let bam = &self.output_bam;
+        let dorado_arg = &self.dorado_basecall_arg;
+        let sequencing_kit = &self.sequencing_kit;
+
+        format!(
+            "{} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves > {}",
+            dorado_bin.display(),
+            pod_dir.display(),
+            bam.display()
+        )
+
+        // let samtools_view = format!(
+        //     "{} view -h -@ {samtools_view_threads} -b /dev/stdin",
+        //     samtools.display()
+        // );
+        // let samtools_sort = format!(
+        //     "{} sort -@ {samtools_sort_threads} /dev/stdin -o {}",
+        //     samtools.display(),
+        //     bam.display()
+        // );
+        //
+        // // format!("{dorado} | {samtools_view} | {samtools_sort}")
+        // format!("{dorado} | {samtools_view} > {}", bam.display())
+    }
+}
+
+#[derive(Debug)]
+pub struct DoradoAlign {
+    pub dorado: PathBuf,
+    pub reference: PathBuf,
+    pub input: PathBuf,
+    pub output: PathBuf,
+    pub threads: u8,
+}
+
+impl DoradoAlign {
+    pub fn from_config(config: &Config, input: PathBuf, output: PathBuf) -> Self {
+        Self {
+            dorado: (&config.align.dorado_bin).into(),
+            reference: (&config.align.ref_fa).into(),
+            input,
+            output,
+            threads: config.align.dorado_aligner_threads,
+        }
+    }
+}
+
+impl super::Command for DoradoAlign {
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.input.exists() {
+            anyhow::bail!(
+                "The input BAM file is not accessible.\n{}",
+                self.input.display()
+            );
+        }
+
+        if self.output.exists() {
+            anyhow::bail!(
+                "The output BAM file already exists.\n{}",
+                self.output.display()
+            );
+        }
+
+        Ok(())
+    }
+    fn cmd(&self) -> String {
+        format!(
+            "{} aligner --threads {} --allow-sec-supp --mm2-opts '--secondary yes' {} {} > {}",
+            self.dorado.display(),
+            self.threads,
+            self.reference.display(),
+            self.input.display(),
+            self.output.display()
+        )
+    }
+}
+
+impl super::SlurmRunner for DoradoAlign {
+    fn slurm_args(&self) -> Vec<String> {
+        super::SlurmParams {
+            job_name: Some("dorado_align".into()),
+            cpus_per_task: Some(self.threads.into()),
+            mem: Some("60G".into()),
+            partition: Some("gpgpuq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+// Running with Slurm: srun --job-name=dorado_basecall --cpus-per-task=X --mem=60G --partition=gpgpuq --gres=gpu:h100:4 bash -c /mnt/beegfs02/scratch/t_steimle/tools/dorado-latest-linux-x64/bin/dorado basecaller --kit-name SQK-NBD114-24 -x 'cuda:all' sup,5mC_5hmC /mnt/beegfs02/scratch/t_steimle/test_data/inputs/pod5/A --trim all --emit-moves > /mnt/beegfs02/scratch/t_steimle/test_data/outputs/aligned_5.bam
+// 04 cpu Basecalled @ Samples/s: 5.359770e+07
+// 05 cpu Basecalled @ Samples/s: 6.155305e+07
+// 06 cpu Basecalled @ Samples/s: 6.870292e+07
+// 07 cpu Basecalled @ Samples/s: 7.230430e+07
+// 08 cpu Basecalled @ Samples/s: 7.669054e+07
+// 10 cpu Basecalled @ Samples/s: 8.398348e+07
+// 15 cpu Basecalled @ Samples/s: 8.776285e+07
+impl super::SlurmRunner for DoradoBasecall {
+    fn slurm_args(&self) -> Vec<String> {
+        super::SlurmParams {
+            job_name: Some("dorado_basecall".into()),
+            cpus_per_task: Some(10),
+            mem: Some("60G".into()),
+            partition: Some("gpgpuq".into()),
+            gres: Some("gpu:h100:4".into()),
+        }
+        .to_args()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use log::info;
+    use crate::TEST_DIR;
+
+    use crate::{commands::SlurmRunner, config::Config, helpers::test_init};
+
+
+    #[test]
+    fn dorado_basecall() -> anyhow::Result<()> {
+        test_init();
+
+        let config = Config::default();
+
+        let mut dca = DoradoBasecall::from_config(
+            &config,
+            format!("{}/inputs/pod5/A", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()).into(),
+        );
+
+        info!("Basecalling");
+        let out = SlurmRunner::run(&mut dca)?;
+
+        println!("{out:#?}");
+
+        Ok(())
+    }
+
+    #[test]
+    fn dorado_align() -> anyhow::Result<()> {
+        test_init();
+
+        let config = Config::default();
+
+        let mut dca = DoradoAlign::from_config(
+            &config,
+            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
+        );
+
+        info!("Basecalling");
+        let _out = SlurmRunner::run(&mut dca)?;
+
+        Ok(())
+    }
+
+}
+
 #[derive(Debug, Clone)]
 pub struct DoradoParams {
     pub ref_fa: String,
@@ -58,37 +274,39 @@ impl Dorado {
         })
     }
 
-    fn create_reference_mmi(&self) -> anyhow::Result<()> {
-        if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
-            cmd!(
-                "minimap2",
-                "-x",
-                "map-ont",
-                "-d",
-                &self.config.align.ref_mmi,
-                &self.config.align.ref_fa
-            )
-            .run()?;
-        }
+    // ------------------------------------------------------------------
+    // Small helper to actually execute a shell command
+    // ------------------------------------------------------------------
+    fn run_shell(cmdline: &str) -> anyhow::Result<()> {
+        info!("Running: {cmdline}");
+        cmd!("bash", "-c", cmdline)
+            .run()
+            .map_err(|e| anyhow::anyhow!("Failed to run: {cmdline}\n\t{}", e.to_string()))?;
         Ok(())
     }
 
-    fn create_directories(&self) -> anyhow::Result<()> {
-        if !std::path::Path::new(&self.case_dir).exists() {
-            info!("Creating directory {}", self.case_dir);
-            fs::create_dir(&self.case_dir)?;
-        }
-        if !std::path::Path::new(&self.time_dir).exists() {
-            info!("Creating directory {}", self.time_dir);
-            fs::create_dir(&self.time_dir)?;
+    // ------------------------------------------------------------------
+    // Command builders (return strings)
+    // ------------------------------------------------------------------
+
+    /// minimap2 index creation (returns None if index already exists)
+    fn create_reference_mmi_cmd(&self) -> Option<String> {
+        if std::path::Path::new(&self.config.align.ref_mmi).exists() {
+            None
+        } else {
+            Some(format!(
+                "minimap2 -x map-ont -d {} {}",
+                self.config.align.ref_mmi, self.config.align.ref_fa
+            ))
         }
-        Ok(())
     }
 
-    fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
+    /// Dorado + samtools pipeline for basecalling + alignment
+    fn basecall_align_cmd(&self, dorado_bin: &str) -> anyhow::Result<String> {
         let pod_dir = &self.case.pod_dir;
-        let ref_mmi = &self.config.align.ref_mmi;
+        let ref_fa = &self.config.align.ref_fa;
         let bam = &self.bam;
+        let samtools = &self.config.align.samtools_bin;
         let samtools_view_threads = self.config.align.samtools_view_threads;
         let samtools_sort_threads = self.config.align.samtools_sort_threads;
         let dorado_arg = self.config.align.dorado_basecall_arg.clone();
@@ -102,72 +320,164 @@ impl Dorado {
             .collect::<Vec<PathBuf>>()
             .pop()
             .unwrap();
+
         let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
             .sequencing_kit
             .to_uppercase();
 
         let dorado = format!(
-            "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_mmi} ",
+            "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_fa}",
             pod_dir.display()
         );
-        info!("running Dorado: {dorado}");
-        let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
-        let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
-        let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
-        info!("Running: {pipe}");
-
-        let pipe_cmd = cmd!("bash", "-c", &pipe);
-        pipe_cmd
-            .run()
-            .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
+        info!("Dorado command: {dorado}");
 
-        // let pipe_cmd = cmd!("bash", "-c", pipe);
-        // let mut reader = pipe_cmd.stderr_capture().reader()?;
-        //
-        // let mut buffer = [0; 1];
-        // let mut line = String::new();
-        //
-        // loop {
-        //     match reader.read(&mut buffer) {
-        //         Ok(0) => break, // End of output
-        //         Ok(_) => {
-        //             let char = buffer[0] as char;
-        //             eprint!("-{}", char);
-        //             std::io::stderr().flush()?;
-        //
-        //             if char == '\n' {
-        //                 // Send the complete line
-        //                 self.log.push(line.clone());
-        //                 line.clear();
-        //             } else {
-        //                 line.push(char);
-        //             }
-        //         }
-        //         Err(err) => {
-        //             warn!("Error reading from stderr: {}", err);
-        //             break;
-        //         }
-        //     }
-        // }
+        let samtools_view = format!("{samtools} view -h -@ {samtools_view_threads} -b /dev/stdin");
+        let samtools_sort =
+            format!("{samtools} sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
 
-        Ok(())
+        Ok(format!("{dorado} | {samtools_view} | {samtools_sort}"))
     }
 
-    pub fn index(&self) -> anyhow::Result<()> {
+    /// samtools index command
+    fn index_cmd(&self) -> String {
         let t = self.config.align.samtools_view_threads.to_string();
-        let cmd = format!("index -@ {t} {}", &self.bam);
-        info!("Running  samtools {cmd}");
-        cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
+        format!(
+            "{} index -@ {t} {}",
+            self.config.align.samtools_bin, self.bam
+        )
+    }
+
+    /// cramino QC command
+    fn cramino_cmd(&self) -> String {
+        format!("cramino -t 150 --karyotype {}", self.bam)
+    }
+
+    /// modkit summary command
+    fn modkit_cmd(&self) -> String {
+        format!("modkit summary -t 50 {}", self.bam)
+    }
+
+    /// fastq export pipeline from BAM
+    fn create_fastq_cmd(&self) -> String {
+        let bam = &self.bam;
+        let fastq = format!(
+            "{}/{}/{}/{}_{}.fastq.gz",
+            self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
+        );
+        let samtools = format!("samtools fastq -@ 150 {bam}");
+        let crabz = format!("crabz -f bgzf - -o {fastq}");
+        format!("{samtools} | {crabz}")
+    }
+
+    /// samtools merge command used in `merge_bam`
+    fn merge_bam_cmd(&self, bam: &Path, into: &Path) -> String {
+        format!(
+            "{} merge -@ 160 -h {} {} {} {}",
+            self.config.align.samtools_bin,
+            bam.display(),
+            into.display(),
+            bam.display(),
+            into.display() // placeholder, real tmp path is managed outside
+        )
+    }
+
+    // mux basecall + samtools view into muxed.bam
+    fn from_mux_basecall_cmd(
+        config: &Config,
+        sequencing_kit: &str,
+        pod_dir: &str,
+        muxed_bam: &str,
+    ) -> String {
+        let dorado_bin = &config.align.dorado_bin;
+        let dorado_arg = &config.align.dorado_basecall_arg;
+        let ref_mmi = &config.align.ref_mmi;
+        let samtools_bin = &config.align.samtools_bin;
+        let samtools_view_threads = config.align.samtools_view_threads;
+
+        let dorado = format!(
+            "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
+        );
+        let samtools_view =
+            format!("{samtools_bin} view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
+        format!("{dorado} | {samtools_view}")
+    }
+
+    /// samtools split command for demux
+    fn demux_cmd(config: &Config, muxed_bam: &str, tmp_demux_dir: &str) -> String {
+        format!(
+            "{} split -@ {} -f '{}/%*_%!.bam' {}",
+            config.align.samtools_bin, config.align.samtools_view_threads, tmp_demux_dir, muxed_bam
+        )
+    }
+
+    /// dorado aligner + samtools for realignment in from_mux
+    fn realign_cmd(
+        config: &Config,
+        sequencing_kit: &str,
+        barcode: &str,
+        bam: &str,
+        aligned_bam: &str,
+    ) -> String {
+        let dorado = format!(
+            "{} aligner --threads {} {} {}",
+            config.align.dorado_bin, config.align.dorado_aligner_threads, config.align.ref_fa, bam,
+        );
+        let samtools_view = format!(
+            "{} view -h -@ {} -b /dev/stdin",
+            config.align.samtools_bin, config.align.samtools_view_threads
+        );
+        let samtools_sort = format!(
+            "{} sort -@ {} /dev/stdin -o {}",
+            config.align.samtools_bin, config.align.samtools_sort_threads, aligned_bam
+        );
+        let _ = sequencing_kit; // not used here but kept for symmetry
+        format!("{dorado} | {samtools_view} | {samtools_sort}")
+    }
+
+    // ------------------------------------------------------------------
+    // Workflow methods that now *run* the commands
+    // ------------------------------------------------------------------
+
+    fn create_reference_mmi(&self) -> anyhow::Result<()> {
+        if let Some(cmdline) = self.create_reference_mmi_cmd() {
+            Self::run_shell(&cmdline)?;
+        }
         Ok(())
     }
 
+    fn create_directories(&self) -> anyhow::Result<()> {
+        if !std::path::Path::new(&self.case_dir).exists() {
+            info!("Creating directory {}", self.case_dir);
+            fs::create_dir(&self.case_dir)?;
+        }
+        if !std::path::Path::new(&self.time_dir).exists() {
+            info!("Creating directory {}", self.time_dir);
+            fs::create_dir(&self.time_dir)?;
+        }
+        Ok(())
+    }
+
+    fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
+        let pipe = self.basecall_align_cmd(dorado_bin)?;
+        Self::run_shell(&pipe)
+            .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))
+    }
+
+    pub fn index(&self) -> anyhow::Result<()> {
+        let cmdline = self.index_cmd();
+        info!("Running samtools index for {}", self.bam);
+        Self::run_shell(&cmdline)
+    }
+
     pub fn run_cramino(&self) -> anyhow::Result<()> {
         let cramino_out = format!(
             "{}/{}_{}_hs1_cramino.txt",
             self.time_dir, self.case.id, self.case.time_point
         );
         info!("Quality control with cramino for BAM: {}", self.bam);
-        let output = duct::cmd!("cramino", "-t", "150", "--karyotype", &self.bam)
+        let cmdline = self.cramino_cmd();
+
+        let output = cmd!("bash", "-c", &cmdline)
             .stdout_capture()
             .unchecked()
             .run()?;
@@ -182,7 +492,9 @@ impl Dorado {
             self.time_dir, self.case.id, self.case.time_point
         );
         info!("Generating base modification summary for BAM: {}", self.bam);
-        let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
+        let cmdline = self.modkit_cmd();
+
+        let output = cmd!("bash", "-c", &cmdline)
             .stdout_capture()
             .unchecked()
             .run()?;
@@ -192,18 +504,13 @@ impl Dorado {
     }
 
     pub fn create_fastq(&self) -> anyhow::Result<()> {
-        let bam = &self.bam;
         let fastq = format!(
             "{}/{}/{}/{}_{}.fastq.gz",
             self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
         );
         if !std::path::Path::new(&fastq).exists() {
-            let samtools = format!("samtools fastq -@ 150 {bam}");
-            let crabz = format!("crabz -f bgzf - -o {fastq}");
-            let pipe = format!("{samtools} | {crabz}");
-            info!("Running: {pipe}");
-            let pipe_cmd = duct::cmd!("bash", "-c", pipe);
-            pipe_cmd.run()?;
+            let pipe = self.create_fastq_cmd();
+            Self::run_shell(&pipe)?;
         }
         Ok(())
     }
@@ -251,26 +558,18 @@ impl Dorado {
         );
         fs::rename(original_i, tmp_original_i.clone())?;
 
-        let cmd = format!(
-            "samtools merge -@ 160 -h {} {} {} {}",
+        // real merge command with the correct tmp path
+        let merge_cmdline = format!(
+            "{} merge -@ 160 -h {} {} {} {}",
+            self.config.align.samtools_bin,
             bam.display(),
             into.display(),
             bam.display(),
             tmp_original.display()
         );
-        info!("Running {cmd}");
-        cmd!(
-            "samtools",
-            "merge",
-            "-@",
-            "160",
-            "-h",
-            bam,
-            into,
-            bam,
-            tmp_original.clone()
-        )
-        .run()?;
+        info!("Running {merge_cmdline}");
+        Self::run_shell(&merge_cmdline)?;
+
         fs::remove_file(tmp_original)?;
         fs::remove_file(tmp_original_i)?;
         fs::remove_file(bam)?;
@@ -280,20 +579,15 @@ impl Dorado {
     }
 
     pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
-        // Creating a temporary directory
+        // tmp dir
         let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
         info!("Creating tmp dir {tmp_dir}");
         fs::create_dir(&tmp_dir)?;
 
-        // Dorado base calling and align into a temporary bam file
+        // basecalling into muxed.bam
         let muxed_bam = format!("{tmp_dir}/muxed.bam");
-        let dorado_bin = &config.align.dorado_bin;
-        let dorado_arg = &config.align.dorado_basecall_arg;
-        let pod_dir = cases[0].pod_dir.display();
-        let ref_mmi = &config.align.ref_mmi;
-        let samtools_view_threads = config.align.samtools_view_threads;
+        let pod_dir = cases[0].pod_dir.display().to_string();
 
-        // Get the sequencing kit from the first pod5 file
         let muxed_pod_dir = &cases.first().unwrap().pod_dir;
         let pod_path = fs::read_dir(muxed_pod_dir)
             .map_err(|e| {
@@ -313,34 +607,21 @@ impl Dorado {
             .sequencing_kit
             .to_uppercase();
 
-        let dorado = format!(
-            "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
-        );
-        let samtools_view =
-            format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
-        let pipe = format!("{dorado} | {samtools_view}");
-        info!("Running: {pipe}");
-        let pipe_cmd = cmd!("bash", "-c", &pipe);
-        pipe_cmd
-            .run()
-            .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
-
+        let basecall_pipe =
+            Self::from_mux_basecall_cmd(&config, &sequencing_kit, &pod_dir, &muxed_bam);
+        info!("Running: {basecall_pipe}");
+        Self::run_shell(&basecall_pipe)?;
         info!("Basecalling ✅");
 
-        // Demux the temporary bam file
+        // demux
         let tmp_demux_dir = format!("{tmp_dir}/demuxed");
         fs::create_dir(&tmp_demux_dir)?;
-
-        let pipe = format!(
-            "samtools split -@ {} -f '{tmp_demux_dir}/%*_%!.bam' {muxed_bam}",
-            config.align.samtools_view_threads
-        );
-        info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
-        info!("Running: {pipe}");
-        let pipe_cmd = cmd!("bash", "-c", pipe);
-        pipe_cmd.run()?;
-
+        let demux_cmdline = Self::demux_cmd(&config, &muxed_bam, &tmp_demux_dir);
+        info!("Demux from {sequencing_kit} into {tmp_demux_dir}");
+        info!("Running: {demux_cmdline}");
+        Self::run_shell(&demux_cmdline)?;
         info!("Demux ✅");
+
         for case in cases.iter() {
             let barcode = case.barcode.replace("NB", "");
             let bam = find_unique_file(
@@ -348,29 +629,20 @@ impl Dorado {
                 &format!("{sequencing_kit}_barcode{}.bam", barcode),
             )?;
 
-            // Align
-            let aligned_bam = format!(
-                "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
-                barcode
-            );
-            let dorado = format!(
-                //         "{} aligner --threads 160 {} {trimmed_bam}",
-                "{} aligner --threads 160 {} {bam}",
-                config.align.dorado_bin, config.align.ref_fa,
-            );
-            let samtools_view = format!(
-                "samtools view -h -@ {} -b /dev/stdin",
-                &config.align.samtools_view_threads
-            );
-            let samtools_sort = format!(
-                "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
-                &config.align.samtools_sort_threads
-            );
-            let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
-
-            info!("Running {pipe}");
-            cmd!("bash", "-c", pipe).run()?;
-            info!("Alignement ✅");
+            let aligned_bam = if !config.align.dorado_should_realign {
+                bam.clone()
+            } else {
+                let aligned_bam = format!(
+                    "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
+                    barcode
+                );
+                let pipe =
+                    Self::realign_cmd(&config, &sequencing_kit, &barcode, &bam, &aligned_bam);
+                info!("Running {pipe}");
+                Self::run_shell(&pipe)?;
+                info!("Alignement ✅");
+                aligned_bam.into()
+            };
 
             let d = Dorado::init(case.clone(), config.clone())?;
             d.create_directories()?;
@@ -397,7 +669,6 @@ impl Dorado {
         self.start_time = start_time;
 
         debug!("Running Dorado with config: {:#?}", self.config);
-
         let dorado_bin = self.config.align.dorado_bin.clone();
 
         self.create_reference_mmi()?;
@@ -414,7 +685,6 @@ impl Dorado {
             self.basecall_align(&dorado_bin)?;
             self.index()?;
         } else {
-            // check if merge before call
             let new_bam_path = bam_path
                 .parent()
                 .unwrap()
@@ -444,6 +714,7 @@ impl Dorado {
         Ok(())
     }
 
+    // from_flowcell stays mostly as-is; it just calls run_pipe/from_mux
     pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
         let tp_conv = |time_point: &str| -> String {
             match time_point {
@@ -516,7 +787,6 @@ impl Dorado {
             }
         }
 
-        
         Ok(())
     }
 }

+ 570 - 2
src/commands/mod.rs

@@ -1,8 +1,576 @@
-pub mod dorado;
 pub mod bcftools;
-pub mod modkit;
+pub mod dorado;
 pub mod longphase;
+pub mod modkit;
+pub mod samtools;
 pub mod wakhan;
 
+use std::{
+    fs::File,
+    io::{self, BufReader, Read, Seek, SeekFrom, Write},
+    process::{Command as ProcessCommand, Stdio},
+    sync::{mpsc, Arc, Mutex},
+    thread,
+    time::Duration,
+};
+
+/// A helper trait for commands that should be run through Slurm.
+///
+/// Types implementing `SlurmRunner` must also implement [`Command`].
+/// The provided [`run`](SlurmRunner::run) method:
+///
+/// 1. Calls [`Command::init`].
+/// 2. Runs `srun <slurm_args...> bash -lc "<cmd()>”`.
+/// 3. On success, calls [`Command::clean_up`].
+pub trait Command {
+    fn init(&mut self) -> anyhow::Result<()> {
+        Ok(())
+    }
+    fn cmd(&self) -> String;
+    fn clean_up(&self) -> anyhow::Result<()> {
+        Ok(())
+    }
+}
+
+/// Local runner: execute the command directly on the machine.
+///
+/// Usage:
+/// ```ignore
+/// my_command.run()?;
+/// ```
+pub trait Runner: Command {
+    /// The shell binary used to run commands.  
+    /// Override if you want to use `sh` or something else.
+    fn shell(&self) -> &str {
+        "bash"
+    }
+
+    /// Run locally with:
+    ///     <shell> -c "<cmd()>"
+    fn run(&mut self) -> anyhow::Result<CapturedOutput> {
+        self.init()?;
+        let mut cmd = ProcessCommand::new(self.shell());
+        cmd.arg("-c")
+            .arg(self.cmd())
+            .stdout(Stdio::piped())
+            .stderr(Stdio::piped());
+
+        info!(
+            "Running: {} {}",
+            cmd.get_program().to_string_lossy(),
+            cmd.get_args()
+                .map(|arg| arg.to_string_lossy())
+                .collect::<Vec<_>>()
+                .join(" ")
+        );
+
+        let mut child = cmd.spawn().context("failed to spawn local command")?;
+        let stdout = child.stdout.take().context("failed to capture stdout")?;
+        let stderr = child.stderr.take().context("failed to capture stderr")?;
+
+        let stderr_handle = thread::spawn(move || {
+            let mut reader = BufReader::new(stderr);
+            let mut buf = Vec::new();
+            let mut line = Vec::new();
+            let mut byte = [0u8; 1];
+
+            loop {
+                match reader.read(&mut byte) {
+                    Ok(0) => {
+                        if !line.is_empty() {
+                            let text = String::from_utf8_lossy(&line);
+                            eprint!("{}", text);
+                            buf.extend_from_slice(&line);
+                        }
+                        break;
+                    }
+                    Ok(_) => {
+                        line.push(byte[0]);
+                        if byte[0] == b'\n' {
+                            let text = String::from_utf8_lossy(&line);
+                            eprint!("{}", text);
+                            buf.extend_from_slice(&line);
+                            line.clear();
+                        }
+                    }
+                    Err(_) => break,
+                }
+            }
+
+            String::from_utf8_lossy(&buf).to_string()
+        });
+
+        // --- stdout in a separate thread ---
+        let stdout_handle = thread::spawn(move || {
+            let mut reader = BufReader::new(stdout);
+            let mut buf = Vec::new();
+            let mut line = Vec::new();
+            let mut byte = [0u8; 1];
+
+            loop {
+                match reader.read(&mut byte) {
+                    Ok(0) => {
+                        if !line.is_empty() {
+                            let text = String::from_utf8_lossy(&line);
+                            print!("{}", text);
+                            buf.extend_from_slice(&line);
+                        }
+                        break;
+                    }
+                    Ok(_) => {
+                        line.push(byte[0]);
+                        if byte[0] == b'\n' {
+                            let text = String::from_utf8_lossy(&line);
+                            print!("{}", text);
+                            buf.extend_from_slice(&line);
+                            line.clear();
+                        }
+                    }
+                    Err(_) => break,
+                }
+            }
+
+            String::from_utf8_lossy(&buf).to_string()
+        });
+        // wait for process
+        let status = child.wait().context("failed to wait on local command")?;
+
+        // wait for stderr thread and collect stderr
+        let captured_stdout = stdout_handle.join().unwrap_or_default();
+        let captured_stderr = stderr_handle.join().unwrap_or_default();
+
+        if !status.success() {
+            anyhow::bail!("local command failed with status: {status}");
+        }
+
+        self.clean_up()?;
+
+        Ok(CapturedOutput {
+            stdout: captured_stdout,
+            stderr: captured_stderr,
+        })
+    }
+}
+
+use anyhow::Context;
+use log::info;
+
+/// Captured process output while it was being streamed live.
+#[derive(Debug, Default)]
+pub struct CapturedOutput {
+    pub stdout: String,
+    pub stderr: String,
+}
+
+/// Super-trait adding a Slurm `run()` method on top of [`Command`].
+pub trait SlurmRunner: Command {
+    /// Slurm launcher binary. Defaults to `srun`.
+    ///
+    /// Override if you want to use `sbatch` or another wrapper.
+    fn slurm_bin(&self) -> &str {
+        "srun"
+    }
+
+    /// Additional Slurm arguments (e.g. `--time=1:00:00`, `--cpus-per-task=4`).
+    ///
+    /// Default is no extra arguments.
+    fn slurm_args(&self) -> Vec<String> {
+        Vec::new()
+    }
+
+    /// Run the command through Slurm.
+    ///
+    /// The effective shell command is:
+    ///
+    /// ```text
+    /// <slurm_bin> <slurm_args...> bash -lc "<cmd()>"
+    /// ```
+    fn run(&mut self) -> anyhow::Result<CapturedOutput> {
+        self.init()?;
+
+        let mut cmd = ProcessCommand::new(self.slurm_bin());
+        cmd.args(self.slurm_args())
+            .arg("bash")
+            .arg("-c")
+            .arg(self.cmd())
+            .stdout(Stdio::piped())
+            .stderr(Stdio::piped());
+
+        info!(
+            "Running with Slurm: {} {}",
+            cmd.get_program().to_string_lossy(),
+            cmd.get_args()
+                .map(|arg| arg.to_string_lossy())
+                .collect::<Vec<_>>()
+                .join(" ")
+        );
+
+        let mut child = cmd.spawn().context("failed to spawn slurm job")?;
+
+        let stdout = child.stdout.take().context("failed to capture stdout")?;
+        let stderr = child.stderr.take().context("failed to capture stderr")?;
+
+        let stderr_handle = thread::spawn(move || {
+            let mut reader = BufReader::new(stderr);
+            let mut buf = Vec::new();
+            let mut chunk = [0u8; 8192];
+            let mut out = io::stderr();
+
+            loop {
+                match reader.read(&mut chunk) {
+                    Ok(0) => break, // EOF
+                    Ok(n) => {
+                        // keep a copy
+                        buf.extend_from_slice(&chunk[..n]);
+                        // forward to our stderr (preserves colors, \r, etc.)
+                        let _ = out.write_all(&chunk[..n]);
+                        let _ = out.flush();
+                    }
+                    Err(_) => break,
+                }
+            }
+
+            String::from_utf8_lossy(&buf).to_string()
+        });
+
+        // --- stdout in a separate thread ---
+        let stdout_handle = thread::spawn(move || {
+            let mut reader = BufReader::new(stdout);
+            let mut buf = Vec::new();
+            let mut chunk = [0u8; 8192];
+            let mut out = io::stdout();
+
+            loop {
+                match reader.read(&mut chunk) {
+                    Ok(0) => break, // EOF
+                    Ok(n) => {
+                        // capture
+                        buf.extend_from_slice(&chunk[..n]);
+                        // forward
+                        let _ = out.write_all(&chunk[..n]);
+                        let _ = out.flush();
+                    }
+                    Err(_) => break,
+                }
+            }
+
+            String::from_utf8_lossy(&buf).to_string()
+        });
+
+        // wait for process
+        let status = child.wait().context("failed to wait on local command")?;
+
+        // wait for both threads and collect output
+        let stdout = stdout_handle.join().unwrap_or_default();
+        let stderr = stderr_handle.join().unwrap_or_default();
+
+        if !status.success() {
+            anyhow::bail!("slurm job failed with status: {status}");
+        }
+
+        self.clean_up()?;
+
+        Ok(CapturedOutput { stdout, stderr })
+    }
+}
+
+/// Holds optional Slurm parameters that can be converted to CLI args.
+///
+/// Example:
+/// ```rust
+/// let params = SlurmParams {
+///     job_name: Some("dorado_basecall".into()),
+///     cpus_per_task: Some(47),
+///     mem: Some("60G".into()),
+///     partition: Some("gpgpuq".into()),
+///     gres: Some("gpu:h100:4".into()),
+/// };
+///
+/// let args = params.to_args();
+/// ```
+///
+/// Produces:
+/// ```text
+/// --job-name=dorado_basecall
+/// --cpus-per-task=47
+/// --mem=60G
+/// --partition=gpgpuq
+/// --gres=gpu:h100:4
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct SlurmParams {
+    pub job_name: Option<String>,
+    pub cpus_per_task: Option<u32>,
+    pub mem: Option<String>,
+    pub partition: Option<String>,
+    pub gres: Option<String>,
+}
+
+impl SlurmParams {
+    /// Convert all non-empty fields into Slurm CLI arguments.
+    pub fn to_args(&self) -> Vec<String> {
+        let mut args = Vec::new();
+
+        if let Some(v) = &self.job_name {
+            args.push(format!("--job-name={v}"));
+        }
+        if let Some(v) = self.cpus_per_task {
+            args.push(format!("--cpus-per-task={v}"));
+        }
+        if let Some(v) = &self.mem {
+            args.push(format!("--mem={v}"));
+        }
+        if let Some(v) = &self.partition {
+            args.push(format!("--partition={v}"));
+        }
+        if let Some(v) = &self.gres {
+            args.push(format!("--gres={v}"));
+        }
+
+        args
+    }
+}
+
+pub trait SbatchRunner: Command {
+    fn sbatch_bin(&self) -> &str {
+        "sbatch"
+    }
+
+    fn squeue_bin(&self) -> &str {
+        "squeue"
+    }
+
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams::default()
+    }
+
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        Vec::new()
+    }
+
+    /// Submit via sbatch, tail slurm-<jobid>.out "live", wait for job to finish,
+    /// then return captured stdout.
+    fn run(&mut self) -> anyhow::Result<CapturedOutput> {
+        self.init()?;
+
+        // 1. sbatch --parsable --output=slurm-%j.out --wrap "<cmd>"
+        let mut args = self.slurm_params().to_args();
+        args.extend(self.sbatch_extra_args());
+        args.push("--parsable".to_string());
+        args.push("--output=slurm-%j.out".to_string());
+        args.push("--wrap".to_string());
+        args.push(self.cmd());
+
+        let output = ProcessCommand::new(self.sbatch_bin())
+            .args(&args)
+            .output()
+            .context("failed to spawn sbatch")?;
+
+        let sbatch_stdout = String::from_utf8_lossy(&output.stdout).to_string();
+        let sbatch_stderr = String::from_utf8_lossy(&output.stderr).to_string();
+
+        if !output.status.success() {
+            anyhow::bail!(
+                "sbatch failed with status: {}\nstdout:\n{}\nstderr:\n{}",
+                output.status,
+                sbatch_stdout,
+                sbatch_stderr
+            );
+        }
+
+        // --parsable usually returns "<jobid>" or "<jobid>;..."
+        let job_id = sbatch_stdout
+            .trim()
+            .split(';')
+            .next()
+            .unwrap_or("")
+            .to_string();
+
+        if job_id.is_empty() {
+            anyhow::bail!("failed to parse job id from sbatch output: {sbatch_stdout}");
+        }
+
+        let out_file = format!("slurm-{job_id}.out");
+
+        // 2. Tail thread: emulate `tail -f slurm-<jobid>.out`
+        let (stop_tx, stop_rx) = mpsc::channel::<()>();
+        let out_path = out_file.clone();
+        let captured_stdout = Arc::new(Mutex::new(String::new()));
+        let captured_stdout_clone = Arc::clone(&captured_stdout);
+
+        let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
+            let mut pos: u64 = 0;
+            let mut file_opened = false;
+
+            loop {
+                // stop signal?
+                if stop_rx.try_recv().is_ok() {
+                    // Do one final read before stopping
+                    if let Ok(mut f) = File::open(&out_path) {
+                        if f.seek(SeekFrom::Start(pos)).is_ok() {
+                            let mut remaining = Vec::new();
+                            if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
+                                let s = String::from_utf8_lossy(&remaining);
+                                print!("{s}");
+                                if let Ok(mut c) = captured_stdout_clone.lock() {
+                                    c.push_str(&s);
+                                }
+                            }
+                        }
+                    }
+                    break;
+                }
+
+                match File::open(&out_path) {
+                    Ok(mut f) => {
+                        if !file_opened {
+                            file_opened = true;
+                        }
+
+                        // seek to last position and read new bytes
+                        if let Err(e) = f.seek(SeekFrom::Start(pos)) {
+                            anyhow::bail!("failed to seek in output file: {}", e);
+                        }
+
+                        let mut buf = [0u8; 8192];
+                        loop {
+                            match f.read(&mut buf) {
+                                Ok(0) => break, // EOF for now
+                                Ok(n) => {
+                                    pos += n as u64;
+                                    let bytes = &buf[..n];
+                                    // forward to terminal
+                                    let s = String::from_utf8_lossy(bytes);
+                                    print!("{s}");
+                                    io::stdout().flush().ok();
+                                    // capture
+                                    if let Ok(mut c) = captured_stdout_clone.lock() {
+                                        c.push_str(&s);
+                                    }
+                                }
+                                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
+                                Err(e) => {
+                                    anyhow::bail!("error reading output file: {}", e);
+                                }
+                            }
+                        }
+                    }
+                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
+                        // file not yet created; this is expected initially
+                    }
+                    Err(e) => {
+                        // Only bail if we've already opened the file once
+                        // (unexpected disappearance or permission issues)
+                        if file_opened {
+                            anyhow::bail!("error opening output file: {}", e);
+                        }
+                    }
+                }
+
+                thread::sleep(Duration::from_millis(500));
+            }
+
+            Ok(())
+        });
+
+        // 3. Poll squeue until job is no longer in the queue
+        loop {
+            let sq_out = ProcessCommand::new(self.squeue_bin())
+                .args(["-j", &job_id])
+                .output()
+                .context("failed to run squeue")?;
+
+            let out_str = String::from_utf8_lossy(&sq_out.stdout);
+
+            // squeue prints a header line + job line if job exists
+            // If job is gone, usually only the header OR nothing
+            let has_job = out_str.lines().skip(1).any(|l| !l.trim().is_empty());
+
+            if !has_job {
+                break;
+            }
+
+            thread::sleep(Duration::from_secs(2));
+        }
+
+        // Wait for output file to stop growing (SLURM epilog may still be writing)
+        let mut stable_count = 0;
+        let mut last_size = 0u64;
+        while stable_count < 2 {
+            thread::sleep(Duration::from_millis(300));
+
+            if let Ok(metadata) = std::fs::metadata(&out_file) {
+                let current_size = metadata.len();
+                if current_size == last_size {
+                    stable_count += 1;
+                } else {
+                    stable_count = 0;
+                    last_size = current_size;
+                }
+            } else {
+                // File doesn't exist yet, keep waiting
+                stable_count = 0;
+            }
+        }
+
+        // 4. Stop tail thread and join
+        let _ = stop_tx.send(());
+        match tail_handle.join() {
+            Ok(Ok(())) => {}
+            Ok(Err(e)) => return Err(e.context("tail thread failed")),
+            Err(_) => anyhow::bail!("tail thread panicked"),
+        }
+
+        self.clean_up()?;
+
+        let stdout = match Arc::try_unwrap(captured_stdout) {
+            Ok(mutex) => mutex.into_inner().unwrap_or_default(),
+            Err(arc) => arc.lock().unwrap().clone(),
+        };
+
+        // Remove the SLURM output file
+        let _ = std::fs::remove_file(&out_file);
+
+        Ok(CapturedOutput {
+            stdout,
+            stderr: String::new(), // all job output is in the .out file
+        })
+    }
+}
+
+/// Run multiple SbatchRunner jobs in parallel.
+///
+/// Each job:
+///   - calls `init()`
+///   - runs `sbatch --wait ... --wrap "<cmd>"`
+///   - streams its output to this process' stdout/stderr
+///   - returns `CapturedOutput`
+///
+/// NOTE:
+///   - Outputs from different jobs may interleave on the terminal.
+///   - Slurm still decides scheduling order (queue vs run).
+pub fn run_many_sbatch<T>(mut jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
+where
+    T: SbatchRunner + Send + 'static,
+{
+    let mut handles = Vec::with_capacity(jobs.len());
+
+    for mut job in jobs.drain(..) {
+        let handle = thread::spawn(move || -> anyhow::Result<CapturedOutput> {
+            SbatchRunner::run(&mut job)
+        });
+        handles.push(handle);
+    }
+
+    let mut results = Vec::with_capacity(handles.len());
+    for h in handles {
+        // propagate the first error you hit
+        let res = h.join().expect("thread panicked")?;
+        results.push(res);
+    }
+
+    Ok(results)
+}
+
 #[cfg(test)]
 mod tests;

+ 527 - 0
src/commands/samtools.rs

@@ -0,0 +1,527 @@
+use std::{fs, path::PathBuf};
+
+use anyhow::Context;
+use log::info;
+use uuid::Uuid;
+
+use crate::{
+    collection::bam::bam_composition,
+    commands::{SbatchRunner, SlurmParams},
+    config::Config,
+};
+
+/// Wrapper around a `samtools index` invocation.
+///
+/// This struct is meant to be used with the [`super::Command`] trait to
+/// build a command-line string of the form:
+///
+/// ```text
+/// <bin> index -@ <threads> <bam>
+/// ```
+#[derive(Debug)]
+pub struct SamtoolsIndex {
+    /// Path to the `samtools` binary.
+    pub bin: String,
+    /// Number of threads passed to `-@`.
+    pub threads: u8,
+    /// Path to the BAM file to index.
+    pub bam: String,
+}
+
+impl super::Command for SamtoolsIndex {
+    /// Build the `samtools index` command line as a single string.
+    fn cmd(&self) -> String {
+        format!("{} index -@ {} {}", self.bin, self.threads, self.bam)
+    }
+}
+
+impl super::Runner for SamtoolsIndex {}
+
+impl super::SlurmRunner for SamtoolsIndex {
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("samtools_index".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+/// Wrapper around a `samtools merge` invocation used to append one BAM into
+/// another while preserving read group (RG) uniqueness.
+///
+/// Typical lifecycle with the [`super::Command`] trait:
+///
+/// 1. [`init`](super::Command::init):  
+///    - Checks that `into` and `bam` exist.
+///    - Checks that there is no RG overlap between `into` and `bam`.
+///    - Renames `into` and its index to temporary files in the same directory.
+/// 2. [`cmd`](super::Command::cmd):  
+///    - Returns the `samtools merge` command line using:
+///      - `-h <bam>`
+///      - Output: `<into>`
+///      - Inputs: `<bam> <tmp_bam>`
+/// 3. [`clean_up`](super::Command::clean_up):  
+///    - Removes the temporary BAM, its index, and the source `bam`.
+///
+/// # Invariants
+///
+/// - `init()` must be called successfully before `cmd()`/`clean_up()`.
+/// - `tmp_bam` and `tmp_bam_index` are only valid after `init()` has run.
+pub struct SamtoolsMerge {
+    /// Path to the `samtools` binary.
+    pub bin: String,
+    /// Number of threads passed to `-@`.
+    pub threads: u8,
+    /// Destination BAM (final merged output path).
+    pub into: PathBuf,
+    /// Source BAM to merge into `into`.
+    pub bam: PathBuf,
+    /// Temporary location for the original `into` BAM (created in `init()`).
+    tmp_bam: PathBuf,
+    /// Temporary location for the original `into` BAI (created in `init()`).
+    tmp_bam_index: PathBuf,
+}
+
+impl super::Command for SamtoolsMerge {
+    /// Prepare the filesystem and validate preconditions for the merge.
+    ///
+    /// This method:
+    /// - Ensures both `into` and `bam` BAM files exist.
+    /// - Ensures there is no overlap of RG IDs between the two BAMs.
+    /// - Ensures a BAI index exists for `into`.
+    /// - Moves `into` and its BAI to temporary files in the same directory.
+    fn init(&mut self) -> anyhow::Result<()> {
+        // Collect RG IDs from destination BAM.
+        let composition_a: Vec<String> =
+            bam_composition(self.into.to_string_lossy().as_ref(), 20000)?
+                .iter()
+                .map(|(rg, _, _)| rg.clone())
+                .collect();
+
+        // Collect RG IDs from source BAM.
+        let composition_b: Vec<String> =
+            bam_composition(self.bam.to_string_lossy().as_ref(), 20000)?
+                .iter()
+                .map(|(rg, _, _)| rg.clone())
+                .collect();
+
+        // Check for overlapping RGs between the two BAM files.
+        let n_id = composition_a
+            .iter()
+            .filter(|id| composition_b.contains(id))
+            .count();
+
+        if n_id > 0 {
+            anyhow::bail!(
+                "{} {} are already merged, reads with the same RG in the destination BAM.",
+                self.into.display(),
+                self.bam.display()
+            );
+        }
+
+        if !self.into.exists() {
+            anyhow::bail!("BAM file doesn't exists for: {}", self.into.display());
+        }
+
+        if !self.bam.exists() {
+            anyhow::bail!("BAM file doesn't exists for: {}", self.bam.display());
+        }
+
+        let dir = self.into.parent().context(format!(
+            "Failed to find parent dir of: {}",
+            self.into.display()
+        ))?;
+
+        let into_file_name = self
+            .into
+            .file_name()
+            .context(format!(
+                "Failed to get file name of: {}",
+                self.into.display()
+            ))?
+            .to_string_lossy()
+            .to_string();
+
+        // For merging, the destination BAM must be indexed.
+        let from_index = dir.join(format!("{into_file_name}.bai"));
+        if !from_index.exists() {
+            anyhow::bail!("BAI file is required for: {}", self.into.display());
+        }
+
+        let tmp_original_file = format!("{}.bam", Uuid::new_v4());
+        self.tmp_bam = dir.join(&tmp_original_file);
+        self.tmp_bam_index = dir.join(format!("{tmp_original_file}.bai"));
+
+        // Move BAM and BAI to temporary names.
+        info!(
+            "Moving {} to {}",
+            self.into.display(),
+            self.tmp_bam.display()
+        );
+        fs::rename(&self.into, &self.tmp_bam).context(format!(
+            "Failed to move {} to {}",
+            self.into.display(),
+            self.tmp_bam.display()
+        ))?;
+
+        info!(
+            "Moving {} to {}",
+            from_index.display(),
+            self.tmp_bam_index.display()
+        );
+        fs::rename(&from_index, &self.tmp_bam_index).context(format!(
+            "Failed to move {} to {}",
+            from_index.display(),
+            self.tmp_bam_index.display()
+        ))?;
+
+        Ok(())
+    }
+
+    /// Build the `samtools merge` command line as a single string.
+    ///
+    /// Output: `self.into`  
+    /// Inputs (in order): `self.bam`, `self.tmp_bam`
+    fn cmd(&self) -> String {
+        format!(
+            "{} merge -@ {} -h {} {} {} {}",
+            self.bin,
+            self.threads,
+            self.bam.display(),
+            self.into.display(),
+            self.bam.display(),
+            self.tmp_bam.display()
+        )
+    }
+
+    /// Clean up temporary files after a successful merge.
+    ///
+    /// This removes:
+    /// - The temporary original BAM (`tmp_bam`)
+    /// - The temporary original index (`tmp_bam_index`)
+    /// - The source BAM (`bam`)
+    fn clean_up(&self) -> anyhow::Result<()> {
+        fs::remove_file(&self.tmp_bam)?;
+        fs::remove_file(&self.tmp_bam_index)?;
+        fs::remove_file(&self.bam)?;
+        Ok(())
+    }
+}
+
+impl super::Runner for SamtoolsMerge {}
+
+impl super::SlurmRunner for SamtoolsMerge {
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("samtools_merge".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+#[derive(Debug)]
+pub struct SamtoolsSplit {
+    /// Path to the `samtools` executable.
+    pub samtools: PathBuf,
+
+    /// Number of threads to use with `samtools split` (`-@` option).
+    pub threads: u8,
+
+    /// Input BAM file to demultiplex.
+    pub input_bam: PathBuf,
+
+    /// Output directory where split BAM files will be written.
+    pub output_dir: PathBuf,
+}
+
+impl SamtoolsSplit {
+    pub fn from_config(config: &Config, input_bam: PathBuf, output_dir: PathBuf) -> Self {
+        Self {
+            samtools: (&config.align.samtools_bin).into(),
+            threads: config.align.samtools_split_threads,
+            input_bam,
+            output_dir,
+        }
+    }
+}
+
+impl super::Command for SamtoolsSplit {
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.input_bam.exists() {
+            anyhow::bail!("No BAM file at: {}", self.input_bam.display());
+        }
+
+        if self.output_dir.exists() {
+            anyhow::bail!("Output dir already exists: {}", self.output_dir.display());
+        } else {
+            fs::create_dir(&self.output_dir).context(format!(
+                "Failed to create dir: {}",
+                self.output_dir.display()
+            ))?
+        }
+        Ok(())
+    }
+
+    /// Build the `samtools split` command line as a single shell string.
+    fn cmd(&self) -> String {
+        format!(
+            "{} split -@ {} -f '{}/%*_%!.bam' {}",
+            self.samtools.display(),
+            self.threads,
+            self.output_dir.display(),
+            self.input_bam.display()
+        )
+    }
+}
+
+impl super::Runner for SamtoolsSplit {}
+
+impl super::SlurmRunner for SamtoolsSplit {
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some("samtools_split".into()),
+            cpus_per_task: Some(self.threads as u32),
+            mem: Some("60G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+
+#[derive(Debug)]
+pub struct SamtoolsSort {
+    /// Path to the `samtools` executable.
+    pub samtools: PathBuf,
+
+    /// Number of threads to use with `samtools sort` (`-@` option).
+    pub threads: u8,
+
+    /// Input BAM file to sort.
+    pub input_bam: PathBuf,
+
+    /// Output sorted BAM.
+    pub output_bam: PathBuf,
+
+    /// Slurm params
+    pub slurm_params: SlurmParams,
+}
+
+impl SamtoolsSort {
+    pub fn from_config(config: &Config, input_bam: PathBuf, output_bam: PathBuf) -> Self {
+        let threads = config.align.samtools_sort_threads;
+        Self {
+            samtools: (&config.align.samtools_bin).into(),
+            threads,
+            input_bam,
+            output_bam,
+            slurm_params: SlurmParams {
+                job_name: Some("samtools_split".into()),
+                cpus_per_task: Some(threads as u32),
+                mem: Some("60G".into()),
+                partition: Some("shortq".into()),
+                gres: None,
+            },
+        }
+    }
+}
+
+impl super::Command for SamtoolsSort {
+    fn init(&mut self) -> anyhow::Result<()> {
+        if !self.input_bam.exists() {
+            anyhow::bail!("No BAM file at: {}", self.input_bam.display());
+        }
+
+        if self.output_bam.exists() {
+            anyhow::bail!("Output BAM already exists: {}", self.output_bam.display());
+        }
+
+        Ok(())
+    }
+
+    /// Build the `samtools sort` command line as a single shell string.
+    fn cmd(&self) -> String {
+        format!(
+            "{} sort -@ {} -o {} {}",
+            self.samtools.display(),
+            self.threads,
+            self.input_bam.display(),
+            self.output_bam.display(),
+        )
+    }
+}
+
+impl super::Runner for SamtoolsSort {}
+
+impl super::SlurmRunner for SamtoolsSort {
+    fn slurm_args(&self) -> Vec<String> {
+        self.slurm_params.to_args()
+    }
+}
+
+impl SbatchRunner for SamtoolsSort {
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        self.slurm_params.to_args()
+    }
+}
+
+struct TestRun;
+
+impl super::Command for TestRun {
+    fn cmd(&self) -> String {
+        "echo 'hello from sbatch'".to_string()
+    }
+}
+
+impl SbatchRunner for TestRun {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("test-sbatch".into()),
+            // Make sure this partition exists on your cluster
+            partition: Some("gpgpuq".into()), // or your default compute partition
+            // Often required: cpus + mem, maybe account
+            cpus_per_task: Some(1),
+            mem: Some("1G".into()),
+            gres: None,
+        }
+    }
+
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        // If your cluster requires account:
+        // vec!["--account=myaccount".into()]
+        Vec::new()
+    }
+}
+
+// pub struct TestRun {}
+// impl super::Command for TestRun {
+//     fn cmd(&self) -> String {
+//         "echo 'test'".to_string()
+//     }
+// }
+// impl super::Runner for TestRun {}
+// impl super::SlurmRunner for TestRun {}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use log::info;
+
+    use crate::{
+        commands::{run_many_sbatch, SlurmRunner},
+        config::Config,
+        helpers::test_init,
+        TEST_DIR,
+    };
+
+    #[test]
+    fn sbatch_test_run() -> anyhow::Result<()> {
+        let mut t = TestRun;
+        let out = SbatchRunner::run(&mut t)?;
+        println!("{out:#?}");
+        // assert!(out.stdout.contains("hello from sbatch"));
+        Ok(())
+    }
+
+    #[test]
+    fn samtools_index_merge() -> anyhow::Result<()> {
+        test_init();
+
+        let config = Config::default();
+
+        info!("Copying the test BAM 1.");
+        fs::copy(
+            format!("{}/inputs/fcA_NB02.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
+        )?;
+
+        info!("Copying the test BAM 2.");
+        fs::copy(
+            format!("{}/inputs/fcB_NB02.bam", TEST_DIR.as_str()),
+            format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
+        )?;
+
+        info!("Indexing BAM 1.");
+        let mut idx = SamtoolsIndex {
+            bin: config.align.samtools_bin.clone(),
+            threads: config.align.samtools_view_threads,
+            bam: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
+        };
+        let captured_output = SlurmRunner::run(&mut idx)?;
+        assert_eq!(captured_output.stderr, String::new());
+
+        info!("Indexing BAM 2.");
+        let mut idx = SamtoolsIndex {
+            bin: config.align.samtools_bin.clone(),
+
+            threads: config.align.samtools_view_threads,
+            bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
+        };
+        let captured_output = SlurmRunner::run(&mut idx)?;
+        assert_eq!(captured_output.stderr, String::new());
+
+        info!("Mergin both BAM.");
+        let mut idx = SamtoolsMerge {
+            bin: config.align.samtools_bin,
+            threads: config.align.samtools_merge_threads as u8,
+            bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(),
+            into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(),
+            tmp_bam: "".into(),
+            tmp_bam_index: "".into(),
+        };
+
+        let captured_output = SlurmRunner::run(&mut idx)?;
+        assert_eq!(captured_output.stderr, String::new());
+
+        // TODO: add number of reads verification use bam_compo
+        Ok(())
+    }
+
+    #[test]
+    fn samtools_split() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+
+        let mut cmd = SamtoolsSplit::from_config(
+            &config,
+            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()).into(),
+        );
+
+        let captured_output = SlurmRunner::run(&mut cmd)?;
+        assert_eq!(captured_output.stderr, String::new());
+
+        Ok(())
+    }
+
+    #[test]
+    fn samtools_sort_sbatch() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+
+        let sort_1 = SamtoolsSort::from_config(
+            &config,
+            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()).into(),
+        );
+
+        let sort_2 = SamtoolsSort::from_config(
+            &config,
+            format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()).into(),
+            format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()).into(),
+        );
+
+        let r = run_many_sbatch(vec![sort_1, sort_2])?;
+
+        println!("{r:#?}");
+        Ok(())
+    }
+}

+ 30 - 16
src/config.rs

@@ -308,17 +308,32 @@ pub struct AlignConfig {
     /// Arguments passed to `dorado basecaller` (e.g. devices and model name).
     pub dorado_basecall_arg: String,
 
+    /// Should dorado re-align after demux ?
+    pub dorado_should_realign: bool,
+
+    /// Dorado aligner threads number
+    pub dorado_aligner_threads: u8,
+
     /// Reference FASTA used for alignment.
     pub ref_fa: String,
 
     /// Minimap2 index (`.mmi`) used by Dorado or downstream tools.
     pub ref_mmi: String,
 
+    /// Path to Samtools binary.
+    pub samtools_bin: String,
+
     /// Number of threads given to `samtools view`.
-    pub samtools_view_threads: u16,
+    pub samtools_view_threads: u8,
 
     /// Number of threads given to `samtools sort`.
-    pub samtools_sort_threads: u16,
+    pub samtools_sort_threads: u8,
+
+    /// Number of threads given to `samtools merge`.
+    pub samtools_merge_threads: u8,
+
+    /// Number of threads given to `samtools split`.
+    pub samtools_split_threads: u8,
 }
 
 // Here comes names that can't be changed from output of tools
@@ -335,19 +350,19 @@ lazy_static! {
     static ref CLAIRS_GERMLINE_TUMOR: &'static str = "clair3_tumor_germline_output.vcf.gz";
 }
 
-impl Default for AlignConfig {
-    fn default() -> Self {
-        Self {
-            dorado_bin: "/data/tools/dorado-1.1.1-linux-x64/bin/dorado".to_string(),
-            dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(),
-            ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
-            samtools_view_threads: 20,
-            samtools_sort_threads: 50,
-        }
-    }
-}
-
+// impl Default for AlignConfig {
+//     fn default() -> Self {
+//         Self {
+//             dorado_bin: "/data/tools/dorado-1.1.1-linux-x64/bin/dorado".to_string(),
+//             dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(),
+//             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+//             ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
+//             samtools_view_threads: 20,
+//             samtools_sort_threads: 50,
+//         }
+//     }
+// }
+//
 impl Config {
     /// Returns the config file path, e.g.:
     /// `~/.local/share/pandora/pandora-config.toml`.
@@ -852,4 +867,3 @@ impl Default for Config {
         }
     }
 }
-

+ 0 - 1
src/helpers.rs

@@ -677,7 +677,6 @@ pub fn detect_repetition(s: &str) -> Repeat {
 
 pub fn test_init() {
     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
-        .is_test(true)
         .try_init();
 }
 

+ 1 - 0
src/lib.rs

@@ -155,6 +155,7 @@ extern crate lazy_static;
 // Define DOCKER_ID lock for handling Docker kill when ctrlc is pressed
 lazy_static! {
     static ref DOCKER_ID: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
+    static ref TEST_DIR: String = "/mnt/beegfs02/scratch/t_steimle/test_data".to_string();
 }
 
 #[cfg(test)]

+ 1 - 0
src/pipes/mod.rs

@@ -1,2 +1,3 @@
 pub mod somatic;
+pub mod somatic_slurm;
 

+ 45 - 0
src/pipes/somatic_slurm.rs

@@ -0,0 +1,45 @@
+use std::{fs, path::PathBuf};
+use anyhow::Context;
+use uuid::Uuid;
+
+use log::info;
+
+use crate::{
+    commands::{
+        SlurmRunner, dorado::{DoradoAlign, DoradoBasecall}, samtools::SamtoolsSplit
+    },
+    config::Config,
+};
+
+pub fn basecall_align_split(pod5_dir: PathBuf, tmp_dir: PathBuf) -> anyhow::Result<()> {
+    let config = Config::default();
+
+    let tmp_basecalled_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
+    info!("Basecalling into: {}", tmp_basecalled_bam.display());
+    let mut cmd = DoradoBasecall::from_config(&config, pod5_dir, tmp_basecalled_bam.clone());
+    let _out = SlurmRunner::run(&mut cmd)?;
+    info!("Basecalled ✅");
+
+    let tmp_split_dir = tmp_dir.join(format!("{}", Uuid::new_v4()));
+    info!("Split into: {}", tmp_split_dir.display());
+    let mut cmd = SamtoolsSplit::from_config(&config, tmp_basecalled_bam, tmp_split_dir);
+    let _out = SlurmRunner::run(&mut cmd)?;
+    info!("Splitted ✅");
+
+
+
+    // let tmp_aligned_bam = tmp_dir.join(format!("{}.bam", Uuid::new_v4()));
+    // info!("Aligning into: {}", tmp_aligned_bam.display());
+    // let mut cmd =
+    //     DoradoAlign::from_config(&config, tmp_basecalled_bam.clone(), tmp_aligned_bam.clone());
+    // let _out = SlurmRunner::run(&mut cmd)?;
+    // info!("Aligned ✅");
+    //
+    // fs::remove_file(&tmp_basecalled_bam).context(format!(
+    //     "Failed to remove: {}",
+    //     tmp_basecalled_bam.display()
+    // ))?;
+
+    
+    Ok(())
+}