Przeglądaj źródła

Introducing new executor that takes a Vec<Vec<AnyJob>> and treat the inner vec as a sequence.jobs_seq![..] helps to define this sequence from any job that implement LocalRunner and SbatchRunner. ClairS implementation and new resume chunked (TODO: add clairs_allow_resume in config.

Thomas 1 miesiąc temu
rodzic
commit
4bbbc66981

+ 9 - 1
pandora-config.example.toml

@@ -44,9 +44,11 @@ pseudoautosomal_regions_bed = "/home/t_steimle/ref/hs1/chm13v2.0_PAR.bed"
 # Sequence dictionary (.dict) for the reference.
 dict_file = "/home/t_steimle/ref/hs1/chm13v2.0.dict"
 
-# RefSeq GFF3 annotation (sorted/indexed).
+# RefSeq GFF3 annotation (sorted/bgzipped/indexed).
 refseq_gff = "/home/t_steimle/ref/hs1/chm13v2.0_RefSeq_Liftoff_v5.1_sorted.gff3.gz"
 
+refseq_gtf = "/home/t_steimle/ref/hs1/chm13v2.0_RefSeq_Liftoff_v5.1_sorted.gtf"
+
 # dbSNP vcf.gz file (should be indexed)
 db_snp = "/home/t_steimle/ref/hs1/chm13v2.0_dbSNPv155.vcf.gz"
 
@@ -623,6 +625,12 @@ nanomonsv_solo_passed_vcf = "{output_dir}/{id}_{time}_nanomonsv-solo_PASSED.vcf.
 # Warning TBI index should exists
 nanomonsv_simple_repeat_bed = "/home/t_steimle/ref/hs1/human_chm13v2.0_simpleRepeat.bed.gz"
 
+# Path to LINE1.chm13v2.0.bed.gz file for nanomonsv.
+# https://github.com/friend1ws/nanomonsv
+# Warning TBI index should exists
+nanomonsv_line1_bed = "/home/t_steimle/ref/hs1/LINE1.chm13v2.0.bed.gz"
+
+
 #######################################
 # PromethION metadata
 #######################################

+ 426 - 242
src/callers/clairs.rs

@@ -30,11 +30,12 @@
 //!
 //! ## Execution Modes
 //!
-//! The module supports three execution strategies:
+//! The module supports three execution strategies, all dispatched through [`Run::run`]:
 //!
-//! - **Local** ([`ClairS::run_local`]) — Single-node execution, useful for debugging
-//! - **Slurm** ([`ClairS::run_sbatch`]) — Single HPC job submission
-//! - **Chunked** ([`run_clairs_chunked_sbatch_with_merge`]) — Parallel execution across genome regions
+//! - **Local** — Single-node execution via [`LocalRunner`](crate::commands::LocalRunner)
+//! - **Slurm** — Single HPC job submission via [`SbatchRunner`](crate::commands::SbatchRunner)
+//! - **Chunked** — Parallel execution across genome regions (enabled automatically when
+//!   `config.slurm_runner = true`, calls the internal `run_chunked` function)
 //!
 //! ### Chunked Parallel Execution
 //!
@@ -92,35 +93,27 @@
 //!
 //! ## Usage
 //!
-//! ### Basic Slurm Execution
+//! ### Basic Execution
 //!
 //! ```ignore
-//! use crate::config::Config;
-//! use crate::pipes::clairs::ClairS;
-//! use crate::pipes::Initialize;
+//! use pandora_lib_promethion::callers::clairs::ClairS;
+//! use pandora_lib_promethion::config::Config;
+//! use pandora_lib_promethion::pipes::Initialize;
+//! use pandora_lib_promethion::runners::Run;
 //!
 //! let config = Config::default();
+//! // config.slurm_runner = true  → chunked Slurm execution (60 parts)
+//! // config.slurm_runner = false → local single-node execution
 //! let mut clairs = ClairS::initialize("sample_001", &config)?;
-//! let output = clairs.run_sbatch()?;
-//! # Ok::<(), anyhow::Error>(())
-//! ```
-//!
-//! ### Chunked Parallel Execution (Recommended for WGS)
-//!
-//! ```ignore
-//! use crate::config::Config;
-//! use crate::pipes::clairs::run_clairs_chunked_sbatch_with_merge;
-//!
-//! let config = Config::default();
-//! let outputs = run_clairs_chunked_sbatch_with_merge("sample_001", &config, 20)?;
+//! clairs.run()?;
 //! # Ok::<(), anyhow::Error>(())
 //! ```
 //!
 //! ### Loading Variants for Downstream Analysis
 //!
 //! ```ignore
-//! use crate::annotation::Annotations;
-//! use crate::variant::vcf_variant::Variants;
+//! use pandora_lib_promethion::annotation::Annotations;
+//! use pandora_lib_promethion::variant::vcf_variant::Variants;
 //!
 //! let annotations = Annotations::new();
 //! let somatic = clairs.variants(&annotations)?;
@@ -171,8 +164,9 @@ use crate::{
     collection::vcf::Vcf,
     commands::{
         bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        exec_jobs,
         samtools::SamtoolsMergeMany,
-        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
+        AnyJob, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
         SlurmRunner,
     },
     config::Config,
@@ -181,9 +175,9 @@ use crate::{
         remove_dir_if_exists, singularity_bind_flags, split_ordered_genome_into_n_regions_exact,
     },
     io::vcf::read_vcf,
+    jobs_seq,
     locker::SampleLock,
     pipes::{Initialize, ShouldRun, Version},
-    run, run_many,
     runners::Run,
     variant::{
         variant_collection::VariantCollection,
@@ -211,43 +205,102 @@ use uuid::Uuid;
 /// # Fields
 ///
 /// - `id` - Sample identifier (e.g., "34528")
-/// - `config` - Global pipeline configuration
-/// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/clairs")
+/// - `config` - All resolved paths, tool settings, and Slurm parameters (see [`ClairsConfig`])
 /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`)
-/// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
+/// - `part_index` - Chunk index when running in parallel (e.g., `Some(3)` for part 3 of N);
+///   `None` for full-genome runs
 ///
-/// # Execution Modes
+/// # Execution
 ///
-/// - **Local** - Direct execution via `run_local()`
-/// - **Slurm** - Single job submission via `run_sbatch()`
-/// - **Chunked** - Parallel genome-wide execution via [`run_clairs_chunked_sbatch_with_merge`]
+/// Call [`Run::run`] — it dispatches automatically:
+/// - `config.slurm_runner = false` → local single-node execution
+/// - `config.slurm_runner = true`  → chunked parallel Slurm execution (60 genome parts)
 ///
 /// # Output Files
 ///
 /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
 /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
-///
-/// # Chunked Execution
-///
-/// For whole-genome sequencing, use [`run_clairs_chunked_sbatch_with_merge`] which:
-/// 1. Splits genome into N equal-sized regions
-/// 2. Runs ClairS in parallel Slurm jobs (one per region)
-/// 3. Post-processes each part (concat SNV+indel, filter PASS)
-/// 4. Merges all part PASS VCFs into final output
 #[derive(Debug, Clone)]
 pub struct ClairS {
-    /// Sample identifier
     pub id: String,
-    /// Global pipeline configuration
-    pub config: Config,
-    /// Directory for log file storage
-    pub log_dir: String,
-    /// Optional genomic region restriction (format: "chr:start-end")
+    pub config: ClairsConfig,
     pub region: Option<String>,
-    /// Optional part index for chunked parallel runs (1-indexed)
     pub part_index: Option<usize>,
 }
 
+/// Resolved configuration for a single ClairS run.
+///
+/// Built once at initialisation via [`ClairsConfig::from_config`] and carried
+/// inside [`ClairS`]. All path templates from [`Config`] are expanded here so
+/// that `ClairS` methods never need to re-expand them.
+#[derive(Debug, Clone)]
+pub struct ClairsConfig {
+    // Execution
+    pub singularity_bin: String,
+    pub clairs_image: String,
+    pub clairs_threads: u8,
+    pub clairs_platform: String,
+    pub clairs_force: bool,
+    pub clairs_keep_parts: bool,
+
+    // Paths
+    pub reference: String,
+    pub tmp_dir: String,
+    pub result_dir: String,
+
+    // Slurm
+    pub slurm_runner: bool,
+    pub slurm_max_par: u8,
+
+    // Sample-scoped paths, computed once at init
+    pub normal_bam: String,
+    pub tumoral_bam: String,
+    pub passed_vcf: String,
+    pub germline_passed_vcf: String,
+    pub germline_normal_vcf: String,
+    pub output_dir: String,
+    pub snv_vcf: String,
+    pub indel_vcf: String,
+    pub log_dir: String,
+    pub lock_dir: String,
+
+    // BcftoolsConcat
+    pub bcftools_bin: String,
+    pub bcftools_threads: u8,
+}
+
+impl ClairsConfig {
+    pub fn from_config(config: &Config, id: &str) -> Self {
+        let output_dir = config.clairs_output_dir(id);
+        let (snv_vcf, indel_vcf) = config.clairs_output_vcfs(id);
+        Self {
+            singularity_bin: config.singularity_bin.clone(),
+            clairs_image: config.clairs_image.clone(),
+            clairs_threads: config.clairs_threads,
+            clairs_platform: config.clairs_platform.clone(),
+            clairs_force: config.clairs_force,
+            clairs_keep_parts: config.clairs_keep_parts,
+            reference: config.reference.clone(),
+            tmp_dir: config.tmp_dir.clone(),
+            result_dir: config.result_dir.clone(),
+            slurm_runner: config.slurm_runner,
+            slurm_max_par: config.slurm_max_par,
+            normal_bam: config.normal_bam(id),
+            tumoral_bam: config.tumoral_bam(id),
+            passed_vcf: config.clairs_passed_vcf(id),
+            germline_passed_vcf: config.clairs_germline_passed_vcf(id),
+            germline_normal_vcf: config.clairs_germline_normal_vcf(id),
+            output_dir,
+            snv_vcf,
+            indel_vcf,
+            log_dir: format!("{}/{}/log/clairs", config.result_dir, id),
+            lock_dir: format!("{}/locks", config.result_dir),
+            bcftools_bin: config.bcftools_bin.clone(),
+            bcftools_threads: config.bcftools_threads,
+        }
+    }
+}
+
 impl fmt::Display for ClairS {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         writeln!(f, "🧬 ClairS {}", self.id)?;
@@ -262,7 +315,7 @@ impl fmt::Display for ClairS {
             self.part_index
                 .map_or("full".into(), |n| format!("part{n}"))
         )?;
-        writeln!(f, "   Log dir   : {}", self.log_dir)
+        writeln!(f, "   Log dir   : {}", self.config.log_dir)
     }
 }
 
@@ -271,18 +324,16 @@ impl Initialize for ClairS {
         let id = id.to_string();
 
         info!("Initializing ClairS for {id}");
-        let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
 
         let clairs = Self {
+            config: ClairsConfig::from_config(config, &id),
             id,
-            log_dir,
-            config: config.clone(),
             region: None,
             part_index: None,
         };
 
-        if clairs.config.clairs_force {
-            remove_dir_if_exists(&clairs.config.clairs_output_dir(&clairs.id))?;
+        if config.clairs_force {
+            remove_dir_if_exists(&config.clairs_output_dir(&clairs.id))?;
         }
 
         Ok(clairs)
@@ -292,11 +343,9 @@ impl Initialize for ClairS {
 impl ShouldRun for ClairS {
     /// Determines whether ClairS should be re-run based on BAM modification timestamps.
     fn should_run(&self) -> bool {
-        let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
-        let normal_older =
-            is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
-        let tumor_older =
-            is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
+        let passed_vcf = &self.config.passed_vcf;
+        let normal_older = is_file_older(passed_vcf, &self.config.normal_bam, true).unwrap_or(true);
+        let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam, true).unwrap_or(true);
 
         let result = normal_older || tumor_older;
         if result {
@@ -308,19 +357,19 @@ impl ShouldRun for ClairS {
 
 impl JobCommand for ClairS {
     fn init(&mut self) -> anyhow::Result<()> {
-        let output_dir = self.part_output_dir();
+        let output_dir = &self.config.output_dir;
 
-        fs::create_dir_all(&output_dir)
+        fs::create_dir_all(output_dir)
             .with_context(|| format!("Failed to create dir: {output_dir}"))?;
 
-        fs::create_dir_all(&self.log_dir)
-            .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
+        fs::create_dir_all(&self.config.log_dir)
+            .with_context(|| format!("Failed to create dir: {}", self.config.log_dir))?;
 
         Ok(())
     }
 
     fn cmd(&self) -> String {
-        let output_dir = self.part_output_dir();
+        let output_dir = &self.config.output_dir;
 
         let region_arg = self
             .region
@@ -331,16 +380,16 @@ impl JobCommand for ClairS {
         let sample_name = format!("{}_diag", self.id);
 
         let bind_flags = singularity_bind_flags([
-            &self.config.tumoral_bam(&self.id),
-            &self.config.normal_bam(&self.id),
+            &self.config.tumoral_bam,
+            &self.config.normal_bam,
             &self.config.reference,
-            &output_dir,
-            &self.log_dir,
+            output_dir,
+            &self.config.log_dir,
             &self.config.tmp_dir,
         ]);
 
         format!(
-            "{singularity_bin} exec \
+            "{singularity_bin} exec --cleanenv \
              {binds} \
              --bind {output_dir}:{output_dir} \
              {image} \
@@ -361,8 +410,8 @@ impl JobCommand for ClairS {
             singularity_bin = self.config.singularity_bin,
             binds = bind_flags,
             image = self.config.clairs_image,
-            tumor_bam = self.config.tumoral_bam(&self.id),
-            normal_bam = self.config.normal_bam(&self.id),
+            tumor_bam = self.config.tumoral_bam,
+            normal_bam = self.config.normal_bam,
             reference = self.config.reference,
             threads = self.config.clairs_threads,
             platform = self.config.clairs_platform,
@@ -374,21 +423,12 @@ impl JobCommand for ClairS {
 }
 
 impl LocalRunner for ClairS {}
-impl LocalBatchRunner for ClairS {}
 
-impl SlurmRunner for ClairS {
-    fn slurm_args(&self) -> Vec<String> {
-        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
-        SlurmParams {
-            job_name: Some(format!("clairs_{}{}", self.id, batch_id)),
-            cpus_per_task: Some(self.config.clairs_threads as u32),
-            mem: Some("50G".into()),
-            partition: Some("shortq".into()),
-            gres: None,
-        }
-        .to_args()
-    }
-}
+// impl SlurmRunner for ClairS {
+//     fn slurm_args(&self) -> Vec<String> {
+//         self.slurm_params().to_args()
+//     }
+// }
 
 impl SbatchRunner for ClairS {
     fn slurm_params(&self) -> SlurmParams {
@@ -416,39 +456,35 @@ impl Run for ClairS {
             .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
 
         if self.config.slurm_runner {
-            run_clairs_chunked(&self.id, &self.config, 50)?;
-            // merge_haplotaged_tmp_bam(&self.config, &self.id)?;
+            run_chunked(&self.id, &self.config, 60)?;
         } else {
-            run!(&self.config, self)?;
-            let mut germline = self.process_germline()?;
-            let report = run!(&self.config, &mut germline)
-                .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
-
-            report
-                .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
-                .context("Failed to save germline PASS logs")?;
-            let (mut concat, tmp_path) = self.process_somatic_concat()?;
-            let (snv_vcf, indel_vcf) = self.config.clairs_output_vcfs(&self.id);
-
-            let report = run!(&self.config, &mut concat)
-                .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
-
-            report
-                .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
-                .context("Failed to save concat logs")?;
-
-            let mut keep_pass = self.process_somatic_pass(&tmp_path)?;
-            let report = run!(&self.config, &mut keep_pass)
-                .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
-
-            report
-                .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
-                .context("Failed to save PASS filter logs")?;
+            let (somatic_concat, somatic_tmp_path) = self.process_somatic_concat()?;
+            let somatic_pass = self.process_somatic_pass(&somatic_tmp_path)?;
+            let germline = self.process_germline()?;
+
+            let outputs = exec_jobs(
+                vec![jobs_seq![
+                    self.clone(),
+                    germline,
+                    somatic_concat,
+                    somatic_pass,
+                ]],
+                false,
+                1,
+            )
+            .context("Failed to run ClairS local pipeline")?;
+
+            for output in &outputs {
+                output.save_to_file(format!("{}/clairs_", self.config.log_dir))?;
+            }
 
             // Clean up temporary concatenated VCF
-            debug!("Removing temporary file: {}", tmp_path);
-            if let Err(e) = fs::remove_file(&tmp_path) {
-                warn!("Failed to remove temporary file {}: {}", tmp_path, e);
+            debug!("Removing temporary file: {}", somatic_tmp_path);
+            if let Err(e) = fs::remove_file(&somatic_tmp_path) {
+                warn!(
+                    "Failed to remove temporary file {}: {}",
+                    somatic_tmp_path, e
+                );
             }
         }
         Ok(())
@@ -463,10 +499,12 @@ impl ClairS {
     fn germline_passed_vcf_path(&self) -> String {
         match self.part_index {
             Some(idx) => {
-                let outdir = self.part_output_dir();
-                format!("{outdir}/clair3_germline.part{idx}.pass.vcf.gz")
+                format!(
+                    "{}/clair3_germline.part{idx}.pass.vcf.gz",
+                    self.config.output_dir
+                )
             }
-            None => self.config.clairs_germline_passed_vcf(&self.id),
+            None => self.config.germline_passed_vcf.clone(),
         }
     }
 
@@ -475,15 +513,17 @@ impl ClairS {
 
         let germline_input = match self.part_index {
             Some(_) => {
-                let output_dir = self.part_output_dir();
-                let base = self.config.clairs_germline_normal_vcf(&self.id);
+                let output_dir = self.config.output_dir.clone();
+                let base = self.config.germline_normal_vcf.clone();
                 let base_name = Path::new(&base)
                     .file_name()
-                    .expect("Germline VCF should have filename")
+                    .with_context(|| {
+                        format!("germline VCF path has no filename component: {base}")
+                    })?
                     .to_string_lossy();
                 format!("{output_dir}/{base_name}")
             }
-            None => self.config.clairs_germline_normal_vcf(&self.id),
+            None => self.config.germline_normal_vcf.clone(),
         };
 
         info!(
@@ -491,22 +531,28 @@ impl ClairS {
             self.id, self.part_index
         );
 
-        let cmd =
-            BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
-
-        Ok(cmd)
+        Ok(BcftoolsKeepPass {
+            bin: self.config.bcftools_bin.clone(),
+            threads: self.config.bcftools_threads,
+            input: germline_input.into(),
+            output: germline_passed.clone().into(),
+            tmp_dir: self.config.tmp_dir.clone().into(),
+            tmp_sort: PathBuf::new(),
+            tmp_norm: PathBuf::new(),
+        })
     }
 
     fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> {
-        let output_dir = self.part_output_dir();
-        let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
+        let output_dir = self.config.output_dir.clone();
+        let snv_vcf_base = self.config.snv_vcf.clone();
+        let indel_vcf_base = self.config.indel_vcf.clone();
 
         let snv_vcf = format!(
             "{}/{}",
             output_dir,
             Path::new(&snv_vcf_base)
                 .file_name()
-                .expect("SNV VCF should have filename")
+                .with_context(|| format!("SNV VCF path has no filename component: {snv_vcf_base}"))?
                 .to_string_lossy()
         );
         let indel_vcf = format!(
@@ -514,7 +560,9 @@ impl ClairS {
             output_dir,
             Path::new(&indel_vcf_base)
                 .file_name()
-                .expect("Indel VCF should have filename")
+                .with_context(|| {
+                    format!("indel VCF path has no filename component: {indel_vcf_base}")
+                })?
                 .to_string_lossy()
         );
 
@@ -531,32 +579,40 @@ impl ClairS {
             .to_string();
 
         // Concat SNV + indel
-        Ok((
-            BcftoolsConcat::from_config(
-                &self.config,
-                vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
-                &tmp_path,
-            ),
-            tmp_path,
-        ))
+        let concat = BcftoolsConcat {
+            bin: self.config.bcftools_bin.clone(),
+            threads: self.config.bcftools_threads,
+            inputs: vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
+            output: tmp_path.clone().into(),
+            tmp_dir: self.config.tmp_dir.clone().into(),
+            tmp_list: PathBuf::new(),
+        };
+        Ok((concat, tmp_path))
     }
 
     pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result<BcftoolsKeepPass> {
         let passed_vcf = self.somatic_passed_vcf_path();
 
         // Filter PASS
-        let keep_pass = BcftoolsKeepPass::from_config(&self.config, tmp_path, passed_vcf.clone());
+        let keep_pass = BcftoolsKeepPass {
+            bin: self.config.bcftools_bin.clone(),
+            threads: self.config.bcftools_threads,
+            input: tmp_path.into(),
+            output: passed_vcf.clone().into(),
+            tmp_dir: self.config.tmp_dir.clone().into(),
+            tmp_sort: PathBuf::new(),
+            tmp_norm: PathBuf::new(),
+        };
 
         Ok(keep_pass)
     }
 
-    /// Returns the per-part output directory.
-    fn part_output_dir(&self) -> String {
-        let base_dir = self.config.clairs_output_dir(&self.id);
-        match self.part_index {
-            Some(idx) => format!("{base_dir}/part{idx}"),
-            None => base_dir,
-        }
+    fn for_part(&self, idx: usize) -> Self {
+        let mut part = self.clone();
+        part.part_index = Some(idx);
+        part.config.output_dir = format!("{}/part{idx}", self.config.output_dir);
+        part.config.log_dir = format!("{}/part{idx}", self.config.log_dir);
+        Self { ..part }
     }
 
     /// Returns the somatic PASS VCF path.
@@ -566,12 +622,15 @@ impl ClairS {
     fn somatic_passed_vcf_path(&self) -> String {
         match self.part_index {
             Some(idx) => {
-                let outdir = self.part_output_dir();
-                format!("{outdir}/clairs.part{idx}.pass.vcf.gz")
+                format!("{}/clairs.part{idx}.pass.vcf.gz", self.config.output_dir)
             }
-            None => self.config.clairs_passed_vcf(&self.id),
+            None => self.config.passed_vcf.clone(),
         }
     }
+
+    pub fn run_chunked_resume(&self, n_parts: usize) -> anyhow::Result<()> {
+        run_chunked_resume(&self.id, &self.config, n_parts)
+    }
 }
 
 impl CallerCat for ClairS {
@@ -583,7 +642,7 @@ impl CallerCat for ClairS {
 impl Variants for ClairS {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let caller = self.caller_cat();
-        let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
+        let passed_vcf = &self.config.passed_vcf.clone();
 
         info!("Loading variants from {caller}: {passed_vcf}");
 
@@ -608,7 +667,7 @@ impl ClairS {
     /// Loads germline variants from the Clair3 germline output.
     pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
-        let germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
+        let germline_passed = &self.config.germline_passed_vcf;
 
         info!("Loading germline variants from {caller}: {germline_passed}");
 
@@ -730,8 +789,7 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
     let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
 
     for i in 1..=n_parts {
-        let mut part = base.clone();
-        part.part_index = Some(i);
+        let part = base.for_part(i);
         let part_pass = part.somatic_passed_vcf_path();
 
         anyhow::ensure!(
@@ -744,7 +802,7 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
         part_pass_paths.push(PathBuf::from(part_pass));
     }
 
-    let final_passed_vcf = base.config.clairs_passed_vcf(&base.id);
+    let final_passed_vcf = base.config.passed_vcf.clone();
     let rand = Uuid::new_v4();
     let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
     let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
@@ -758,8 +816,31 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
         n_parts, final_passed_vcf
     );
 
-    let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
-    run!(&base.config, &mut concat).context("Failed to run bcftools concat for ClairS parts")?;
+    let concat = BcftoolsConcat {
+        bin: base.config.bcftools_bin.clone(),
+        threads: base.config.bcftools_threads,
+        inputs: part_pass_paths,
+        output: final_tmp.clone().into(),
+        tmp_dir: base.config.tmp_dir.clone().into(),
+        tmp_list: PathBuf::new(),
+    };
+
+    let outputs = exec_jobs(
+        vec![jobs_seq![concat]],
+        base.config.slurm_runner,
+        base.config.slurm_max_par.into(),
+    )
+    .context("Failed to run bcftools concat for ClairS germline parts")?;
+
+    for output in &outputs {
+        output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
+    }
+
+    anyhow::ensure!(
+        Path::new(&final_tmp_csi).exists(),
+        "CSI index not found after concat: {}",
+        final_tmp_csi
+    );
 
     fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
     fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
@@ -777,8 +858,7 @@ fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<
     let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
 
     for i in 1..=n_parts {
-        let mut part = base.clone();
-        part.part_index = Some(i);
+        let part = base.for_part(i);
         let part_pass = part.germline_passed_vcf_path();
 
         anyhow::ensure!(
@@ -791,7 +871,7 @@ fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<
         part_pass_paths.push(PathBuf::from(part_pass));
     }
 
-    let final_passed_vcf = base.config.clairs_germline_passed_vcf(&base.id);
+    let final_passed_vcf = base.config.germline_passed_vcf.clone();
     let rand = Uuid::new_v4();
     let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
     let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
@@ -805,9 +885,31 @@ fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<
         n_parts, final_passed_vcf
     );
 
-    let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
-    run!(&base.config, &mut concat)
-        .context("Failed to run bcftools concat for ClairS germline parts")?;
+    let concat = BcftoolsConcat {
+        bin: base.config.bcftools_bin.clone(),
+        threads: base.config.bcftools_threads,
+        inputs: part_pass_paths,
+        output: final_tmp.clone().into(),
+        tmp_dir: base.config.tmp_dir.clone().into(),
+        tmp_list: PathBuf::new(),
+    };
+
+    let outputs = exec_jobs(
+        vec![jobs_seq![concat]],
+        base.config.slurm_runner,
+        base.config.slurm_max_par.into(),
+    )
+    .context("Failed to run bcftools concat for ClairS germline parts")?;
+
+    for output in &outputs {
+        output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
+    }
+
+    anyhow::ensure!(
+        Path::new(&final_tmp_csi).exists(),
+        "CSI index not found after concat: {}",
+        final_tmp_csi
+    );
 
     fs::rename(&final_tmp, &final_passed_vcf)
         .context("Failed to rename merged ClairS germline PASS VCF")?;
@@ -832,54 +934,40 @@ pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()>
 
     let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf();
     info!("Merging {} into: {}", bams.len(), into.display());
-    let mut merge = SamtoolsMergeMany::from_config(into, bams, config);
-    run!(config, &mut merge)?;
+    let merge = SamtoolsMergeMany::from_config(into, bams, config);
+    exec_jobs(
+        vec![jobs_seq![merge]],
+        config.slurm_runner,
+        config.slurm_max_par.into(),
+    )?;
     Ok(())
 }
 
-/// Runs ClairS in parallel chunks, then merges results.
-///
-/// Splits the genome into N equal-sized regions, runs ClairS on each region
-/// in parallel (local or Slurm based on `config.slurm_runner`), post-processes
-/// each part (concatenates SNV+indel, filters PASS), and merges both somatic
-/// and germline VCFs.
-///
-/// # Arguments
+/// Runs ClairS in parallel chunks with sequential per-part post-processing, then merges results.
 ///
-/// * `id` - Sample identifier
-/// * `config` - Global pipeline configuration
-/// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
+/// # Pipeline per part (sequential)
+/// `ClairS main → germline PASS filter → somatic SNV+indel concat → somatic PASS filter`
 ///
-/// # Returns
-///
-/// `Ok(())` on success, or an error if any step fails.
+/// # Parallelism
+/// Parts run in parallel (bounded by `config.slurm_max_par`), dispatched locally or via
+/// SLURM depending on `config.slurm_runner`.
 ///
 /// # Errors
-///
-/// Returns an error if:
-/// - `n_parts` is 0
-/// - Normal BAM file cannot be opened or is corrupted
-/// - BAM header is malformed
-/// - ClairS execution fails on any part
-/// - SNV+indel concatenation fails
-/// - PASS filtering fails
-/// - Somatic or germline VCF merging fails
-/// - Output directory cannot be created
-///
-/// # Example
-///
-/// ```ignore
-/// let config = Config::default();
-/// run_clairs_chunked("sample_001", &config, 30)?;
-/// ```
-pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
+/// Fails fast within a part on any step error. Already-running jobs are not cancelled.
+fn run_chunked(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
     anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
 
-    let base = ClairS::initialize(id, config)?;
+    let base = ClairS {
+        id: id.to_string(),
+        config: config.clone(),
+        region: None,
+        part_index: None,
+    };
 
-    let normal_bam = config.normal_bam(id);
+    let normal_bam = base.config.normal_bam.clone();
     let reader = bam::Reader::from_path(&normal_bam)
         .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
+
     let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
     let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
         .into_iter()
@@ -887,85 +975,179 @@ pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::
         .collect::<Vec<String>>();
 
     let actual_n_parts = regions.len();
-
     info!(
         "Running ClairS in {} parallel parts for {}",
         actual_n_parts, id
     );
 
-    let mut jobs = Vec::with_capacity(actual_n_parts);
-    for (i, region) in regions.into_iter().enumerate() {
-        let mut job = base.clone();
-        job.part_index = Some(i + 1);
-        job.region = Some(region);
-        job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
-        info!("Planned ClairS job:\n{job}");
-        jobs.push(job);
-    }
+    let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
+        .into_iter()
+        .enumerate()
+        .map(|(i, region)| {
+            let mut job = base.for_part(i + 1);
 
-    let outputs = run_many!(config, jobs.clone())?;
+            job.region = Some(region);
 
-    for output in outputs.iter() {
-        output.save_to_file(format!("{}/clairs_", base.log_dir))?;
-    }
+            info!("Planned ClairS job:\n{job}");
 
-    // 1) Germline PASS filters (one per part)
-    let mut germlines = Vec::new();
+            let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
+            let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
+            let germline = job.process_germline()?;
 
-    // 2a) Somatic SNV+indel concats (one per part)
-    let mut somatic_concats = Vec::new();
-    let mut somatic_tmp_paths = Vec::new();
+            Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
+        })
+        .collect();
 
-    for job in &jobs {
-        // germline PASS
-        germlines.push(job.process_germline()?);
+    let outputs = exec_jobs(
+        groups?,
+        base.config.slurm_runner,
+        base.config.slurm_max_par.into(),
+    )?;
 
-        // somatic concat (SNV+indel -> tmp VCF)
-        let (concat, tmp_path) = job.process_somatic_concat()?;
-        somatic_concats.push(concat);
-        somatic_tmp_paths.push(tmp_path);
+    for output in &outputs {
+        output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
     }
 
-    // Run all germline PASS filters in parallel
-    run_many!(config, germlines)?;
+    merge_clairs_parts(&base, actual_n_parts)?;
+    merge_clairs_germline_parts(&base, actual_n_parts)?;
 
-    // Run all somatic concats in parallel
-    run_many!(config, somatic_concats)?;
+    if !config.clairs_keep_parts {
+        let somatic_final = config.passed_vcf.clone();
+        let germline_final = config.germline_passed_vcf.clone();
 
-    // 2b) Somatic PASS filters (one per part, based on tmp VCFs)
-    let mut somatic_pass_filters = Vec::new();
-    for (job, tmp_path) in jobs.iter().zip(somatic_tmp_paths.iter()) {
-        somatic_pass_filters.push(job.process_somatic_pass(tmp_path)?);
+        anyhow::ensure!(
+            Path::new(&somatic_final).exists(),
+            "Refusing to clean up parts: merged somatic VCF not found at {}",
+            somatic_final
+        );
+        anyhow::ensure!(
+            Path::new(&germline_final).exists(),
+            "Refusing to clean up parts: merged germline VCF not found at {}",
+            germline_final
+        );
+
+        let base_dir = config.output_dir.clone();
+        for i in 1..=actual_n_parts {
+            let part_dir = format!("{base_dir}/part{i}");
+            if Path::new(&part_dir).exists() {
+                fs::remove_dir_all(&part_dir)
+                    .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
+                debug!("Removed part directory: {part_dir}");
+            }
+        }
+        info!("Cleaned up {} part directories for {}", actual_n_parts, id);
     }
 
-    // Run all somatic PASS filters in parallel
-    run_many!(config, somatic_pass_filters)?;
+    info!(
+        "ClairS completed for {}: {} parts merged (somatic + germline)",
+        id, actual_n_parts
+    );
+    Ok(())
+}
+
+fn part_is_complete(part: &ClairS) -> bool {
+    Path::new(&part.somatic_passed_vcf_path()).exists()
+        && Path::new(&format!("{}.csi", part.somatic_passed_vcf_path())).exists()
+        && Path::new(&part.germline_passed_vcf_path()).exists()
+        && Path::new(&format!("{}.csi", part.germline_passed_vcf_path())).exists()
+}
+
+fn run_chunked_resume(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
+    anyhow::ensure!(n_parts > 0, "run_chunked_resume: n_parts must be > 0");
+
+    let base = ClairS {
+        id: id.to_string(),
+        config: config.clone(),
+        region: None,
+        part_index: None,
+    };
+
+    let normal_bam = base.config.normal_bam.clone();
+    let reader = bam::Reader::from_path(&normal_bam)
+        .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
+
+    let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
+    let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
+        .into_iter()
+        .flatten()
+        .collect::<Vec<String>>();
+
+    let actual_n_parts = regions.len();
+
+    info!(
+        "Resuming ClairS chunked run for {} with {} parts",
+        id, actual_n_parts
+    );
+
+    let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
+        .into_iter()
+        .enumerate()
+        .filter_map(|(i, region)| {
+            let mut job = base.for_part(i + 1);
+            job.region = Some(region);
+
+            if part_is_complete(&job) {
+                info!("Skipping completed ClairS part {} for {}", i + 1, id);
+                return None;
+            }
+
+            info!(
+                "Scheduling missing/incomplete ClairS part {}:\n{job}",
+                i + 1
+            );
+
+            Some((|| {
+                let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
+                let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
+                let germline = job.process_germline()?;
+
+                Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
+            })())
+        })
+        .collect();
+
+    let groups = groups?;
+
+    if groups.is_empty() {
+        info!("All ClairS parts already complete for {}", id);
+    } else {
+        let outputs = exec_jobs(
+            groups,
+            base.config.slurm_runner,
+            base.config.slurm_max_par.into(),
+        )?;
+
+        for output in &outputs {
+            output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
+        }
+    }
 
     merge_clairs_parts(&base, actual_n_parts)?;
     merge_clairs_germline_parts(&base, actual_n_parts)?;
 
     if !config.clairs_keep_parts {
-        let somatic_final = config.clairs_passed_vcf(id);
-        let germline_final = config.clairs_germline_passed_vcf(id);
+        let somatic_final = config.passed_vcf.clone();
+        let germline_final = config.germline_passed_vcf.clone();
 
         anyhow::ensure!(
             Path::new(&somatic_final).exists(),
             "Refusing to clean up parts: merged somatic VCF not found at {}",
             somatic_final
         );
+
         anyhow::ensure!(
             Path::new(&germline_final).exists(),
             "Refusing to clean up parts: merged germline VCF not found at {}",
             germline_final
         );
 
-        let base_dir = config.clairs_output_dir(id);
+        let base_dir = config.output_dir.clone();
+
         for i in 1..=actual_n_parts {
             let part_dir = format!("{base_dir}/part{i}");
             if Path::new(&part_dir).exists() {
                 fs::remove_dir_all(&part_dir)
                     .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
-                debug!("Removed part directory: {part_dir}");
             }
         }
 
@@ -973,7 +1155,7 @@ pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::
     }
 
     info!(
-        "ClairS completed for {}: {} parts merged (somatic + germline)",
+        "ClairS resumed/completed for {}: {} parts merged",
         id, actual_n_parts
     );
 
@@ -999,20 +1181,22 @@ mod tests {
     #[test]
     fn clairs_run() -> anyhow::Result<()> {
         test_init();
+        let id = "CHAHA";
         let config = Config::default();
-        // let clairs = ClairS::initialize("34528", &config)?;
-        // info!("{clairs}");
-
-        run_clairs_chunked("CHAHA", &config, 50)?;
+        let mut clairs = ClairS::initialize(id, &config)?;
+        crate::runners::Run::run(&mut clairs)?;
 
         Ok(())
     }
 
     #[test]
-    fn clairs_haplo() -> anyhow::Result<()> {
+    fn clairs_resume() -> anyhow::Result<()> {
         test_init();
+        let id = "CHAHA";
         let config = Config::default();
+        let mut clairs = ClairS::initialize(id, &config)?;
 
+        clairs.run_chunked_resume(60);
         Ok(())
     }
 }

+ 1 - 1
src/callers/deep_somatic.rs

@@ -248,7 +248,7 @@ impl JobCommand for DeepSomatic {
         );
 
         format!(
-            "{singularity_bin} exec \
+            "{singularity_bin} exec --cleanenv \
             {binds} \
             --bind {output_dir}:/output \
             {image} \

+ 1 - 1
src/callers/deep_variant.rs

@@ -347,7 +347,7 @@ impl JobCommand for DeepVariant {
         ]);
 
         format!(
-            "{singularity_bin} exec \
+            "{singularity_bin} exec --cleanenv \
             {binds} \
             --bind {output_dir}:/output \
             {image} \

+ 1 - 1
src/callers/gatk.rs

@@ -349,7 +349,7 @@ impl JobCommand for Mutect2 {
         );
 
         format!(
-            "{singularity_bin} exec \
+            "{singularity_bin} exec --cleanenv \
             {binds} \
             --bind {output_dir}:/output \
             {image} \

+ 36 - 0
src/callers/nanomonsv.rs

@@ -891,6 +891,42 @@ pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<(
     Ok(())
 }
 
+/// Runs NanomonSV `insert_classify` step to annotate insertion variants
+/// with mobile element / repeat content classification.
+///
+/// # Arguments
+/// * `input_vcf`  - Path to input VCF (gzipped or plain)
+/// * `output_vcf` - Path to output VCF
+/// * `config`     - Pipeline configuration (provides reference, GTF, LINE1 BED, binary, conda env)
+///
+/// # Errors
+/// Returns an error if command execution fails.
+pub fn nanomonsv_insert_classify(
+    input_vcf: &str,
+    output_vcf: &str,
+    config: &Config,
+) -> anyhow::Result<CapturedOutput> {
+    let args = vec![
+        "insert_classify".to_string(),
+        input_vcf.to_string(),
+        output_vcf.to_string(),
+        config.reference.clone(),
+        config.refseq_gtf.clone(),
+        config.nanomonsv_line1_bed.clone(),
+    ];
+
+    let mut runner = NanomonSV {
+        id: "nanomonsv_insert_classify".to_string(),
+        log_dir: config.tmp_dir.clone(),
+        config: config.clone(),
+        job_args: args,
+        threads: 1,
+        memory: Some(8),
+    };
+
+    run!(config, &mut runner)
+}
+
 /// Recursively finds NanomonSV output directories for a specific time point.
 ///
 /// # Arguments

+ 7 - 7
src/commands/bcftools.rs

@@ -25,7 +25,7 @@ use crate::config::Config;
 /// 3. `bcftools view -i "FILTER='PASS'" -Oz --write-index`
 ///
 /// The final output is a BGZF-compressed VCF (`.vcf.gz`) plus its index.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BcftoolsKeepPass {
     /// Path to the `bcftools` binary.
     pub bin: String,
@@ -38,9 +38,9 @@ pub struct BcftoolsKeepPass {
     /// Temporary dir from a global [`Config`].
     pub tmp_dir: PathBuf,
     /// Temporary sorted VCF/BCF path.
-    tmp_sort: PathBuf,
+    pub tmp_sort: PathBuf,
     /// Temporary normalized VCF/BCF path.
-    tmp_norm: PathBuf,
+    pub tmp_norm: PathBuf,
 }
 
 impl BcftoolsKeepPass {
@@ -136,7 +136,7 @@ impl super::SbatchRunner for BcftoolsKeepPass {
 /// `-e "INFO/IMPRECISE==1 || FILTER!='PASS'"`.
 ///
 /// The final output is a BGZF-compressed VCF (`.vcf.gz`) plus its index.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BcftoolsKeepPassPrecise {
     /// Path to the `bcftools` binary.
     pub bin: String,
@@ -244,7 +244,7 @@ impl super::SbatchRunner for BcftoolsKeepPassPrecise {
 /// `bcftools concat` piped through `bcftools sort` to ensure proper ordering.
 ///
 /// The output is automatically indexed after concatenation and sorting.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BcftoolsConcat {
     /// Path to the `bcftools` binary.
     pub bin: String,
@@ -257,7 +257,7 @@ pub struct BcftoolsConcat {
     /// Temporary dir from a global [`Config`].
     pub tmp_dir: PathBuf,
     /// Temporary list file path for `bcftools concat -f`.
-    tmp_list: PathBuf,
+    pub tmp_list: PathBuf,
 }
 
 impl BcftoolsConcat {
@@ -352,7 +352,7 @@ impl super::SbatchRunner for BcftoolsConcat {
 }
 
 /// Indexes a VCF/BCF file with `bcftools index`.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BcftoolsIndex {
     /// Path to the `bcftools` binary.
     pub bin: String,

+ 133 - 0
src/commands/mod.rs

@@ -1071,6 +1071,139 @@ macro_rules! run_many {
     }};
 }
 
+/// Execute a list of job groups locally with bounded parallelism.
+///
+/// # Structure
+/// - Outer `Vec`: groups executed in **parallel** (bounded by `max_par`)
+/// - Inner `Vec`: steps executed **sequentially** within each group, aborting on first error
+///
+/// # Panics
+/// Panics if `max_par` is 0.
+pub fn exec_jobs_locally(
+    jobs: Vec<Vec<Box<dyn AnyJob>>>,
+    max_par: usize,
+) -> anyhow::Result<Vec<CapturedOutput>>
+{
+    let pool = rayon::ThreadPoolBuilder::new()
+        .num_threads(max_par)
+        .build()
+        .context("failed to build rayon thread pool")?;
+
+    pool.install(|| {
+        jobs.into_par_iter()
+            .map(|group| {
+                group
+                    .into_iter()
+                    .map(|mut job| LocalRunner::exec(&mut job))
+                    .collect::<anyhow::Result<Vec<_>>>()
+            })
+            .collect::<anyhow::Result<Vec<Vec<_>>>>()
+    })
+    .map(|groups| groups.into_iter().flatten().collect())
+}
+
+/// Execute a list of job groups on the SLURM cluster with bounded parallelism.
+///
+/// # Structure
+/// - Outer `Vec`: groups submitted in **parallel** (bounded by `max_par`)
+/// - Inner `Vec`: steps submitted **sequentially** within each group, aborting on first error
+///
+/// # Note
+/// Submitted jobs are **not cancelled** on error — caller is responsible for cleanup.
+///
+/// # Panics
+/// Panics if `max_par` is 0.
+pub fn exec_jobs_slurm(jobs: Vec<Vec<Box<dyn AnyJob>>>, max_par: usize) -> anyhow::Result<Vec<CapturedOutput>>
+{
+    let pool = rayon::ThreadPoolBuilder::new()
+        .num_threads(max_par)
+        .build()
+        .context("failed to build rayon thread pool")?;
+
+    pool.install(|| {
+        jobs.into_par_iter()
+            .map(|group| {
+                group
+                    .into_iter()
+                    .map(|mut job| SbatchRunner::exec(&mut job))
+                    .collect::<anyhow::Result<Vec<_>>>()
+            })
+            .collect::<anyhow::Result<Vec<Vec<_>>>>()
+    })
+    .map(|groups| groups.into_iter().flatten().collect())
+}
+
+pub trait AnyJob: LocalRunner + SbatchRunner + Send {}
+impl<T: LocalRunner + SbatchRunner + Send> AnyJob for T {}
+
+pub type JobGroup = Vec<Box<dyn AnyJob>>;
+
+impl LocalRunner for Box<dyn AnyJob> {
+    fn shell(&self) -> &str {
+        (**self).shell()
+    }
+    fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
+        LocalRunner::exec(self.as_mut())
+    }
+}
+
+impl SbatchRunner for Box<dyn AnyJob> {
+    fn sbatch_bin(&self) -> &str {
+        (**self).sbatch_bin()
+    }
+    fn squeue_bin(&self) -> &str {
+        (**self).squeue_bin()
+    }
+    fn slurm_params(&self) -> SlurmParams {
+        (**self).slurm_params()
+    }
+    fn sbatch_extra_args(&self) -> Vec<String> {
+        (**self).sbatch_extra_args()
+    }
+    fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
+        SbatchRunner::exec(self.as_mut())
+    }
+}
+
+impl Command for Box<dyn AnyJob> {
+    fn init(&mut self) -> anyhow::Result<()> {
+        (**self).init()
+    }
+    fn cmd(&self) -> String {
+        (**self).cmd()
+    }
+    fn clean_up(&self) -> anyhow::Result<()> {
+        (**self).clean_up()
+    }
+}
+
+/// Dispatch jobs either locally or via SLURM.
+///
+/// - `with_slurm`: submit via SLURM if `true`, run locally if `false`
+/// - `max_par`: maximum parallel groups. In SLURM mode, bounds the thread pool size.
+///   In local mode, caps the Rayon parallel iterator concurrency.
+///
+/// See [`exec_jobs_locally`] and [`exec_jobs_slurm`] for the `Vec<Vec<T>>` contract.
+pub fn exec_jobs(
+    jobs: Vec<Vec<Box<dyn AnyJob>>>,
+    with_slurm: bool,
+    max_par: usize,
+) -> anyhow::Result<Vec<CapturedOutput>>
+{
+    if with_slurm {
+        exec_jobs_slurm(jobs, max_par)
+    } else {
+        exec_jobs_locally(jobs, max_par)
+    }
+}
+
+#[macro_export]
+macro_rules! jobs_seq {
+    ($($job:expr),+ $(,)?) => {
+        vec![$( Box::new($job) as Box<dyn AnyJob> ),+]
+    };
+}
+
 #[cfg(test)]
 mod tests {
     use super::{Command, SbatchRunner, SlurmParams};

+ 3 - 0
src/config.rs

@@ -65,6 +65,7 @@ pub struct Config {
 
     /// Path to the RefSeq GFF3 annotation, sorted and indexed.
     pub refseq_gff: String,
+    pub refseq_gtf: String,
 
     ///  dbSNP vcf.gz file (should be indexed)
     pub db_snp: String,
@@ -470,6 +471,8 @@ pub struct Config {
 
     pub nanomonsv_simple_repeat_bed: String,
 
+    pub nanomonsv_line1_bed: String,
+
     // === Straglr configuration ===
     /// Path to Straglr executable.
     pub straglr_bin: String,

+ 21 - 2
src/de_novo/de_novo_pipe.rs

@@ -314,6 +314,12 @@ pub struct FlyeMedakaBuilder {
     pub config: Config,
 }
 
+impl FlyeMedakaBuilder {
+    pub fn new(config: Config) -> Self {
+        Self { config }
+    }
+}
+
 pub type BoxedAssembler = Box<dyn Assembler>;
 pub type BoxedPolisher = Box<dyn Polisher>;
 
@@ -452,12 +458,13 @@ mod tests {
 
         let id = "CML2518";
         let abl_locus = ("chr9", 142_958_315, 142_958_913);
+        let bcr_locus = ("chr22", 23_713_734, 23_714_329);
 
         let config = Config::default();
 
         let bam_path = PathBuf::from(config.tumoral_bam(id));
         let work_dir = PathBuf::from(format!(
-            "{}/{id}/{}/asm_wtdbg2",
+            "{}/{id}/{}/asm_wtdbg2_22",
             config.result_dir, config.tumoral_name
         ));
         if work_dir.exists() {
@@ -472,7 +479,8 @@ mod tests {
         };
 
         // Fetch initial primary records from locus
-        let records = fetch_primary_records(&bam_path, Some(abl_locus), None)?;
+        // let records = fetch_primary_records(&bam_path, Some(abl_locus), None)?;
+        let records = fetch_primary_records(&bam_path, Some(bcr_locus), None)?;
 
         info!("Fetched {} primary records from locus", records.len());
 
@@ -483,6 +491,8 @@ mod tests {
             assembly_config.min_records
         );
 
+        // let flye = FlyeMedakaBuilder::new(config.clone());
+
         // Run iterative assembly — extends with suspicious reads each round
         let results = run_local_assembly_iterative(
             &bam_path,
@@ -495,6 +505,15 @@ mod tests {
             3,
         )?;
 
+        // let results = run_local_assembly_iterative(
+        //     &bam_path,
+        //     records,
+        //     &flye,
+        //     work_dir.clone(),
+        //     assembly_config,
+        //     3,
+        // )?;
+
         // Assertions
         assert!(!results.is_empty(), "No contigs assembled");
 

+ 136 - 17
src/helpers.rs

@@ -1327,10 +1327,8 @@ pub fn revcomp(s: &str) -> String {
 
 #[cfg(test)]
 mod tests {
-    use log::info;
-    use rust_htslib::bam::{self, Read};
-
-    use crate::helpers::test_init;
+    use hashbrown::HashSet;
+    use itertools::Itertools;
 
     use super::*;
 
@@ -1378,21 +1376,142 @@ mod tests {
         );
     }
 
-    #[test]
-    fn split_g() -> anyhow::Result<()> {
-        test_init();
-        let n_parts = 4;
-        let reader = bam::Reader::from_path(
-            "/mnt/beegfs02/scratch/t_steimle/data/wgs/34528/norm/34528_norm_hs1.bam",
+    /// Parse `"ctg:start-end"` → `(contig, start, end)` (1-based inclusive).
+    fn parse_region(s: &str) -> (&str, u64, u64) {
+        let (ctg, range) = s.split_once(':').expect("region missing ':'");
+        let (start, end) = range.split_once('-').expect("region missing '-'");
+        (
+            ctg,
+            start.parse().expect("bad start"),
+            end.parse().expect("bad end"),
         )
-        .with_context(|| format!("Failed to open BAM"))?;
-        let header = bam::Header::from_template(reader.header());
-        let genome_sizes = get_genome_sizes(&header)?;
+    }
+
+    /// Assert that `parts` cover every base of `genome` exactly once:
+    /// - All contigs are present with no extra ones.
+    /// - Regions within each contig are sorted, contiguous, start at 1, end at
+    ///   the declared length.
+    /// - No region exceeds its contig's declared length.
+    fn assert_full_coverage(parts: &[Vec<String>], genome: &[(String, u64)], label: &str) {
+        use std::collections::HashMap;
 
-        // Split genome into regions
-        let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts);
+        let mut by_contig: HashMap<&str, Vec<(u64, u64)>> = HashMap::new();
+        for part in parts {
+            for region in part {
+                let (ctg, start, end) = parse_region(region);
+                by_contig.entry(ctg).or_default().push((start, end));
+            }
+        }
 
-        info!("{:#?}", regions);
-        Ok(())
+        assert_eq!(
+            by_contig.len(),
+            genome.len(),
+            "[{label}] got {} contigs, expected {}",
+            by_contig.len(),
+            genome.len()
+        );
+
+        for (ctg, expected_len) in genome {
+            let ranges = by_contig
+                .get(ctg.as_str())
+                .unwrap_or_else(|| panic!("[{label}] contig {ctg} missing from output"));
+
+            let mut sorted = ranges.clone();
+            sorted.sort_unstable();
+
+            assert_eq!(
+                sorted[0].0, 1,
+                "[{label}] {ctg}: first region starts at {} (expected 1)",
+                sorted[0].0
+            );
+            assert_eq!(
+                sorted.last().unwrap().1,
+                *expected_len,
+                "[{label}] {ctg}: last region ends at {} (expected {expected_len})",
+                sorted.last().unwrap().1
+            );
+            for i in 1..sorted.len() {
+                assert_eq!(
+                    sorted[i].0,
+                    sorted[i - 1].1 + 1,
+                    "[{label}] {ctg}: gap between {:?} and {:?}",
+                    sorted[i - 1],
+                    sorted[i]
+                );
+            }
+            for &(start, end) in &sorted {
+                assert!(
+                    end <= *expected_len,
+                    "[{label}] {ctg}: region {start}-{end} exceeds contig length {expected_len}"
+                );
+            }
+        }
+    }
+
+    #[test]
+    fn split_g_covers_full_genome() {
+        let max_rec = 3;
+        // Synthetic genome with varied contig sizes; no file I/O needed.
+        let genome: Vec<(String, u64)> = vec![
+            ("chr1".to_string(), 248_956_422),
+            ("chr2".to_string(), 242_193_529),
+            ("chr3".to_string(), 198_295_559),
+            ("chr4".to_string(), 190_214_555),
+            ("chrX".to_string(), 156_040_895),
+            ("chrY".to_string(), 57_227_415),
+            ("chrM".to_string(), 16_569),
+        ];
+        let total_bases: u64 = genome.iter().map(|(_, l)| l).sum();
+
+        let mut rec = 0;
+        let mut regions_hash = HashSet::new();
+        while rec < max_rec {
+            for n_parts in [1, 2, 3, 4, 7, 10, 60, 100] {
+                let parts = split_ordered_genome_into_n_regions_exact(&genome, n_parts);
+                
+                let r = regions_hash.insert(parts.iter().flatten().join(", "));
+                if rec > 0 && r {
+                    panic!("Not deterministic !");
+                }
+                let label = format!("n_parts={n_parts}");
+
+                // Correct number of parts (capped at total_bases when n_parts is huge)
+                let expected_parts = n_parts.min(total_bases as usize);
+                assert_eq!(
+                    parts.len(),
+                    expected_parts,
+                    "[{label}] wrong number of parts"
+                );
+
+                // All parts non-empty
+                assert!(
+                    parts.iter().all(|p| !p.is_empty()),
+                    "[{label}] at least one empty part"
+                );
+
+                // Full coverage with no gaps or overlaps
+                assert_full_coverage(&parts, &genome, &label);
+
+                // Even distribution: sizes differ by at most 1 base
+                let sizes: Vec<u64> = parts
+                    .iter()
+                    .map(|p| {
+                        p.iter()
+                            .map(|r| {
+                                let (_, s, e) = parse_region(r);
+                                e - s + 1
+                            })
+                            .sum()
+                    })
+                    .collect();
+                let max = *sizes.iter().max().unwrap();
+                let min = *sizes.iter().min().unwrap();
+                assert!(
+                    max - min <= 1,
+                    "[{label}] uneven distribution: max={max}, min={min}"
+                );
+            }
+            rec += 1;
+        }
     }
 }

+ 14 - 0
src/variant/vcf_variant.rs

@@ -1569,6 +1569,10 @@ pub enum Info {
     FREQ(DbSnpFreq),
     COMMON,
     RS(u32),
+    // nanomonsv insert
+    INSERT_TYPE(String),
+    RMSK_INFO(String),
+    TSD(String),
 }
 
 impl FromStr for Info {
@@ -1655,6 +1659,9 @@ impl FromStr for Info {
                         .with_context(|| format!("Failed to parse FREQ: `{value}`"))?,
                 ),
                 "RS" => Info::RS(parse_value(value, key)?),
+                "INSERT_TYPE" => Info::INSERT_TYPE(value.to_string()),
+                "RMSK_INFO" => Info::RMSK_INFO(value.to_string()),
+                "TSD" => Info::TSD(value.to_string()),
 
                 _ => Info::Empty,
             })
@@ -1755,6 +1762,13 @@ impl fmt::Display for Info {
             Info::FREQ(v) => write!(f, "FREQ={v}"),
             Info::RS(v) => write!(f, "RS={v}"),
             Info::COMMON => write!(f, "COMMON"),
+
+            // nanomonsv insert
+            Info::INSERT_TYPE(v) => write!(f, "INSERT_TYPE={v}"),
+            // query_start,query_end,class,class...
+            Info::RMSK_INFO(v) => write!(f, "RMSK_INFO={v}"),
+            // Target site duplication sequence
+            Info::TSD(v) => write!(f, "TSD={v}"),
         }
     }
 }