Thomas 3 тижнів тому
батько
коміт
069f6f8b38

+ 49 - 2
pandora-config.example.toml

@@ -13,8 +13,6 @@ tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 # Should use Slurm as runner
 slurm_runner = true
 
-# slurm_genome_chunks = 150
-
 # Run cache directory.
 run_cache_dir = "/home/t_steimle/data/prom_runs"
 
@@ -208,6 +206,55 @@ clairs_platform = "ont_r10_dorado_sup_5khz_ssrs"
 # {result_dir}, {id}
 clairs_output_dir = "{result_dir}/{id}/diag/ClairS"
 
+#######################################
+# GATK configuration
+#######################################
+# Path to the GATK container image (Singularity/Apptainer .sif, or a docker:// URI
+# if you pull at runtime).
+#
+# Examples:
+# - "/containers/gatk_4.6.0.0.sif"
+gatk_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/gatk_latest.sif"
+
+# Path to a BED file restricting analysis to target regions (0-based, half-open).
+# Must match contig naming of the reference/BAMs (e.g. "chr9" vs "9").
+#
+# Used for targeted calling (e.g. Mutect2 `-L` or region chunking).
+gatk_bed_path = "/home/t_steimle/ref/hs1/chm13v2.0_RefSeq_Liftoff_v5.1_Genes.bed"
+
+# Local single-run CPU threads (non-Slurm execution).
+# Used for full-run Mutect2 or other GATK tools.
+# Typically forwarded to:
+#   - `--native-pair-hmm-threads`
+#   - `--reader-threads`
+# Should match available cores on the node.
+gatk_threads = 100
+
+# Local single-run memory limit in GB.
+# Used to size Java heap:
+#   `--java-options "-Xmx{mem}g"`
+# Should leave headroom for native memory (PairHMM, buffers).
+gatk_mem_gb = 120
+
+# Per-chunk CPU threads when running chunked under Slurm.
+# Applies to each parallel job independently.
+gatk_slurm_threads = 8
+
+# Per-chunk memory (GB) when running under Slurm.
+# Used both for scheduler request and Java heap sizing per chunk.
+# Must be sufficient for interval-restricted Mutect2.
+gatk_slurm_mem_gb = 32
+
+# If true, force re-run of GATK steps by removing or ignoring existing outputs.
+gatk_force = false
+
+# GATK output directory template.
+# {result_dir}, {id}
+gatk_output_dir = "{result_dir}/{id}/{tumoral_name}/GATK"
+
+# GATK passed VCF.
+gatk_passed_vcf = "{output_dir}/{id}_{tumoral_name}_{reference_name}_GATK_PASSED.vcf.gz"
+
 #######################################
 # Savana configuration
 #######################################

+ 2 - 0
src/annotation/mod.rs

@@ -269,6 +269,7 @@ pub enum Caller {
     Savana,
     Severus,
     DeepSomatic,
+    Mutect2,
 }
 
 impl fmt::Display for Caller {
@@ -282,6 +283,7 @@ impl fmt::Display for Caller {
             Savana => write!(f, "Savana"),
             Severus => write!(f, "Severus"),
             DeepSomatic => write!(f, "DeepSomatic"),
+            Mutect2 => write!(f, "Mutect2"),
         }
     }
 }

+ 847 - 0
src/callers/gatk.rs

@@ -0,0 +1,847 @@
+//! # GATK Mutect2 Somatic Variant Calling Pipeline
+//!
+//! This module provides a pipeline runner for [GATK Mutect2](https://gatk.broadinstitute.org/),
+//! a somatic variant caller for paired tumor-normal samples using BED-based interval targeting.
+//!
+//! ## Overview
+//!
+//! Mutect2 performs somatic variant calling using:
+//!
+//! - Local assembly-based haplotype caller tuned for somatic mutations
+//! - Paired tumor-normal analysis
+//! - Read orientation artifact filtering (LearnReadOrientationModel + FilterMutectCalls)
+//! - Containerized execution via Singularity
+//!
+//! ## Pipeline Steps (chained per chunk)
+//!
+//! 1. `gatk Mutect2` — raw somatic calls + F1R2 orientation counts
+//! 2. `gatk LearnReadOrientationModel` — artifact prior estimation
+//! 3. `gatk FilterMutectCalls` — apply filters (orientation, contamination, etc.)
+//! 4. `bcftools view -f PASS` — extract PASS-only variants
+//!
+//! Steps 1–3 run as a single chained command inside Singularity.
+//! Step 4 runs as a separate bcftools command.
+//!
+//! ## Requirements
+//!
+//! - Tumor and normal BAMs indexed (`.bai` files present)
+//! - BED interval list accessible (`config.gatk_bed_path`)
+//! - Reference genome with `.fai` and `.dict`
+//! - GATK Singularity image available (`config.gatk_image`)
+//! - `--normal-sample` must match the `@RG SM` tag in the normal BAM header
+//!
+//! ## Execution Modes
+//!
+//! Execution mode is selected via `config.slurm_runner`:
+//!
+//! - **Local** — Single-node execution
+//! - **Slurm** — HPC job submission
+//!
+//! Both modes support chunked parallel execution via [`run_mutect2_chunked`].
+//!
+//! ## Thread and Memory Consistency
+//!
+//! Two separate resource profiles exist:
+//!
+//! - **Local (non-chunked)**: `config.gatk_threads` / `config.gatk_mem_gb` — full machine resources
+//! - **Slurm (chunked)**: `config.gatk_slurm_threads` / `config.gatk_slurm_mem_gb` — per-job allocation
+//!
+//! The struct resolves the correct values via `effective_threads()` / `effective_mem_gb()` based on
+//! whether `part_index` is set. JVM heap (`-Xmx`) is always ~85% of the effective memory to leave
+//! headroom for off-heap and OS overhead.
+//!
+//! ## Output Files
+//!
+//! PASS-filtered somatic variants:
+//! ```text
+//! {result_dir}/{id}/gatk/{id}_{tumoral_name}_Mutect2.pass.vcf.gz
+//! ```
+//!
+//! ## Usage
+//!
+//! ### Chunked Parallel Execution (Recommended for WGS/WES)
+//!
+//! ```ignore
+//! use pandora_lib_promethion::callers::gatk::run_mutect2_chunked;
+//! use pandora_lib_promethion::config::Config;
+//!
+//! let config = Config::default();
+//! // Run Mutect2 in 30 parallel chunks over the BED intervals
+//! run_mutect2_chunked("sample_001", &config, 30)?;
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ### Single-Run Execution
+//!
+//! ```ignore
+//! use pandora_lib_promethion::callers::gatk::Mutect2;
+//! use pandora_lib_promethion::config::Config;
+//! use pandora_lib_promethion::pipes::Initialize;
+//! use pandora_lib_promethion::runners::Run;
+//!
+//! let config = Config::default();
+//! let mut caller = Mutect2::initialize("sample_001", &config)?;
+//!
+//! if caller.should_run() {
+//!     caller.run()?;
+//! }
+//!
+//! // Load somatic variants
+//! let variants = caller.variants(&annotations)?;
+//! println!("Found {} somatic variants", variants.variants.len());
+//! # Ok::<(), anyhow::Error>(())
+//! ```
+//!
+//! ## Config Fields Required
+//!
+//! The following must be present in [`Config`]:
+//!
+//! | Field                 | Example                             | Description                              |
+//! |-----------------------|-------------------------------------|------------------------------------------|
+//! | `gatk_image`          | `"/images/gatk_4.6.1.0.sif"`       | Singularity image path                   |
+//! | `gatk_bed_path`       | `"/refs/targets.bed"`               | BED interval list                        |
+//! | `gatk_threads`        | `32`                                | Threads for local (non-chunked) runs     |
+//! | `gatk_mem_gb`         | `120`                               | Memory (GB) for local (non-chunked) runs |
+//! | `gatk_slurm_threads`  | `8`                                 | Threads per Slurm chunk                  |
+//! | `gatk_slurm_mem_gb`   | `32`                                | Memory (GB) per Slurm chunk              |
+//! | `gatk_force`          | `false`                             | Force rerun even if outputs exist        |
+//!
+//! And the following methods:
+//!
+//! - `gatk_output_dir(&self, id: &str) -> String`
+//! - `gatk_passed_vcf(&self, id: &str) -> String`
+//!
+//! ## References
+//!
+//! - [GATK Mutect2 documentation](https://gatk.broadinstitute.org/hc/en-us/articles/360037593851-Mutect2)
+//! - [GATK Best Practices for somatic calling](https://gatk.broadinstitute.org/hc/en-us/articles/360035894731)
+
+use std::{
+    fmt, fs,
+    io::{BufRead, BufReader, Write},
+    path::{Path, PathBuf},
+    process::{Command as ProcessCommand, Stdio},
+};
+
+use anyhow::Context;
+use log::{debug, info};
+use rayon::prelude::*;
+use regex::Regex;
+use uuid::Uuid;
+
+use crate::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
+    collection::vcf::Vcf,
+    commands::{
+        bcftools::{BcftoolsConcat, BcftoolsIndex, BcftoolsKeepPass},
+        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
+        SlurmRunner,
+    },
+    config::Config,
+    helpers::{is_file_older, remove_dir_if_exists, singularity_bind_flags},
+    io::{bed::BedRow, vcf::read_vcf},
+    locker::SampleLock,
+    pipes::{Initialize, ShouldRun, Version},
+    run, run_many,
+    runners::Run,
+    variant::{
+        variant_collection::VariantCollection,
+        vcf_variant::{Label, Variants},
+    },
+};
+
+// ---------------------------------------------------------------------------
+// Constants
+// ---------------------------------------------------------------------------
+
+/// Fraction of `gatk_mem_gb` allocated to JVM heap (`-Xmx`).
+///
+/// The remaining ~15% covers JVM off-heap, native allocations (htslib, PairHMM),
+/// and OS overhead. This is the standard GATK recommendation.
+const JVM_HEAP_FRACTION: f64 = 0.85;
+
+/// Number of parallel BED chunks for Mutect2 (default when called from `Run`).
+const DEFAULT_N_PARTS: usize = 30;
+
+// ---------------------------------------------------------------------------
+// Struct
+// ---------------------------------------------------------------------------
+
+/// GATK Mutect2 paired tumor-normal somatic variant caller.
+///
+/// Executes the full Mutect2 pipeline (calling → orientation model → filtering)
+/// with BED-based interval targeting and automatic PASS extraction.
+///
+/// # Fields
+///
+/// - `id` — Sample identifier (e.g., `"34528"`)
+/// - `bed_path` — Path to the BED interval file for this run/chunk
+/// - `log_dir` — Directory for execution logs
+/// - `config` — Global pipeline configuration
+/// - `part_index` — Optional chunk index for parallel execution (1-indexed)
+#[derive(Debug, Clone)]
+pub struct Mutect2 {
+    /// Sample identifier.
+    pub id: String,
+    /// BED interval file for this run (full BED or chunk sub-BED).
+    pub bed_path: PathBuf,
+    /// Directory for log file storage.
+    pub log_dir: String,
+    /// Global pipeline configuration.
+    pub config: Config,
+    /// Optional part index for chunked parallel runs (1-indexed).
+    pub part_index: Option<usize>,
+}
+
+impl fmt::Display for Mutect2 {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        writeln!(f, "🧬 Mutect2")?;
+        writeln!(f, "   Case ID  : {}", self.id)?;
+        writeln!(f, "   BED      : {}", self.bed_path.display())?;
+        writeln!(f, "   Log dir  : {}", self.log_dir)?;
+        writeln!(
+            f,
+            "   Part     : {}",
+            self.part_index
+                .map_or("full".into(), |n| format!("part{n}"))
+        )
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Trait implementations
+// ---------------------------------------------------------------------------
+
+impl Initialize for Mutect2 {
+    fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
+        let id = id.to_string();
+        info!("Initializing Mutect2 for {id}.");
+
+        let log_dir = format!("{}/{}/log/gatk", config.result_dir, &id);
+
+        let bed_path = PathBuf::from(&config.gatk_bed_path);
+        anyhow::ensure!(
+            bed_path.exists(),
+            "GATK BED interval file not found: {}",
+            bed_path.display()
+        );
+
+        let mutect2 = Self {
+            id,
+            bed_path,
+            log_dir,
+            config: config.clone(),
+            part_index: None,
+        };
+
+        if mutect2.config.gatk_force {
+            remove_dir_if_exists(&mutect2.config.gatk_output_dir(&mutect2.id))?;
+        }
+
+        Ok(mutect2)
+    }
+}
+
+impl ShouldRun for Mutect2 {
+    fn should_run(&self) -> bool {
+        let passed_vcf = &self.config.gatk_passed_vcf(&self.id);
+        let normal_older =
+            is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
+        let tumor_older =
+            is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
+        let bed_older = is_file_older(passed_vcf, &self.config.gatk_bed_path, true).unwrap_or(true);
+
+        let result = normal_older || tumor_older || bed_older;
+        if result {
+            info!("Mutect2 should run for id: {}.", self.id);
+        }
+        result
+    }
+}
+
+impl JobCommand for Mutect2 {
+    fn init(&mut self) -> anyhow::Result<()> {
+        let output_dir = self.part_output_dir();
+        fs::create_dir_all(&output_dir)
+            .with_context(|| format!("Failed to create dir: {output_dir}"))?;
+        fs::create_dir_all(&self.log_dir)
+            .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
+        Ok(())
+    }
+
+    fn cmd(&self) -> String {
+        let output_dir = self.part_output_dir();
+        let output_vcf_name = self.output_vcf_filename();
+        let jvm_heap = self.jvm_heap_gb();
+        let bed = self.bed_path.display();
+
+        // Normal sample name must match @RG SM tag in normal BAM header.
+        let sample_name_normal = format!("{}_{}", self.id, self.config.normal_name);
+
+        let bind_flags = singularity_bind_flags([
+            &self.config.normal_bam(&self.id),
+            &self.config.tumoral_bam(&self.id),
+            &self.config.reference,
+            &output_dir,
+            &self.log_dir,
+            &self.config.tmp_dir,
+            &self.bed_path.display().to_string(),
+        ]);
+
+        // Three-step pipeline chained inside a single Singularity exec.
+        //
+        // 1. Mutect2          → raw VCF + f1r2 counts
+        // 2. LearnReadOrientationModel → artifact priors
+        // 3. FilterMutectCalls → filtered VCF with PASS/non-PASS annotations
+        let gatk_cmd = format!(
+            "gatk --java-options '-Xmx{jvm_heap}g' Mutect2 \
+                --reference {reference} \
+                --input {tumor_bam} \
+                --input {normal_bam} \
+                --normal-sample {normal_sample} \
+                --intervals {bed} \
+                --native-pair-hmm-threads {threads} \
+                --f1r2-tar-gz /output/f1r2.tar.gz \
+                --output /output/raw.vcf.gz \
+            && gatk --java-options '-Xmx{jvm_heap}g' LearnReadOrientationModel \
+                -I /output/f1r2.tar.gz \
+                -O /output/artifact_priors.tar.gz \
+            && gatk --java-options '-Xmx{jvm_heap}g' FilterMutectCalls \
+                -V /output/raw.vcf.gz \
+                --reference {reference} \
+                --ob-priors /output/artifact_priors.tar.gz \
+                -O /output/{output_vcf_name}",
+            reference = self.config.reference,
+            tumor_bam = self.config.tumoral_bam(&self.id),
+            normal_bam = self.config.normal_bam(&self.id),
+            normal_sample = sample_name_normal,
+            threads = self.effective_threads(),
+        );
+
+        format!(
+            "{singularity_bin} exec \
+            {binds} \
+            --bind {output_dir}:/output \
+            {image} \
+            bash -c \"{gatk_cmd}\"",
+            singularity_bin = self.config.singularity_bin,
+            binds = bind_flags,
+            image = self.config.gatk_image,
+        )
+    }
+}
+
+impl LocalRunner for Mutect2 {}
+impl LocalBatchRunner for Mutect2 {}
+
+impl SlurmRunner for Mutect2 {
+    fn slurm_args(&self) -> Vec<String> {
+        self.slurm_params().to_args()
+    }
+}
+
+impl SbatchRunner for Mutect2 {
+    fn slurm_params(&self) -> SlurmParams {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
+        SlurmParams {
+            job_name: Some(format!("mutect2_{}{}", self.id, batch_id)),
+            cpus_per_task: Some(self.config.gatk_slurm_threads as u32),
+            mem: Some(format!("{}G", self.config.gatk_slurm_mem_gb)),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Private helpers
+// ---------------------------------------------------------------------------
+
+impl Mutect2 {
+    /// Returns effective thread count based on execution context.
+    ///
+    /// - Chunked (Slurm): `config.gatk_slurm_threads` — per-job allocation
+    /// - Non-chunked (local): `config.gatk_threads` — full machine
+    fn effective_threads(&self) -> usize {
+        if self.part_index.is_some() {
+            self.config.gatk_slurm_threads
+        } else {
+            self.config.gatk_threads
+        }
+    }
+
+    /// Returns effective memory in GB based on execution context.
+    ///
+    /// - Chunked (Slurm): `config.gatk_slurm_mem_gb`
+    /// - Non-chunked (local): `config.gatk_mem_gb`
+    fn effective_mem_gb(&self) -> u32 {
+        if self.part_index.is_some() {
+            self.config.gatk_slurm_mem_gb
+        } else {
+            self.config.gatk_mem_gb
+        }
+    }
+
+    /// JVM heap derived from effective memory (~85% to leave room for off-heap + OS).
+    fn jvm_heap_gb(&self) -> u32 {
+        (self.effective_mem_gb() as f64 * JVM_HEAP_FRACTION) as u32
+    }
+
+    /// Part-specific output directory (or base output dir for non-chunked runs).
+    fn part_output_dir(&self) -> String {
+        let base_dir = self.config.gatk_output_dir(&self.id);
+        match self.part_index {
+            Some(idx) => format!("{base_dir}/part{idx}"),
+            None => base_dir,
+        }
+    }
+
+    /// Filename of the FilterMutectCalls output VCF (inside the part dir).
+    fn output_vcf_filename(&self) -> String {
+        format!("{}_{}_Mutect2.vcf.gz", self.id, self.config.tumoral_name)
+    }
+
+    /// Full host path to the FilterMutectCalls output VCF.
+    fn output_vcf_path(&self) -> String {
+        let output_dir = self.part_output_dir();
+        format!("{output_dir}/{}", self.output_vcf_filename())
+    }
+
+    /// Path to the PASS-only VCF (per-part or final).
+    fn passed_vcf_path(&self) -> String {
+        match self.part_index {
+            Some(_) => self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz"),
+            None => self.config.gatk_passed_vcf(&self.id),
+        }
+    }
+
+    /// Filter the Mutect2-filtered VCF to keep only PASS variants.
+    fn filter_pass(&self) -> anyhow::Result<()> {
+        let output_vcf = self.output_vcf_path();
+        let vcf_passed = self.passed_vcf_path();
+
+        if Path::new(&vcf_passed).exists() {
+            debug!("PASS VCF already exists: {vcf_passed}");
+            return Ok(());
+        }
+
+        info!(
+            "Extracting PASS variants for Mutect2 {} (part: {:?})",
+            self.id, self.part_index
+        );
+
+        let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, &vcf_passed);
+        let report = run!(&self.config, &mut cmd)
+            .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
+        report
+            .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
+            .context("Failed to save bcftools PASS logs")?;
+
+        let mut cmd = BcftoolsIndex::from_config(&self.config, vcf_passed);
+        let report = run!(&self.config, &mut cmd)
+            .with_context(|| format!("Failed to index PASS VCF for {}", self.id))?;
+        report
+            .save_to_file(format!("{}/bcftools_index_", self.log_dir))
+            .context("Failed to save bcftools index logs")?;
+
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Run
+// ---------------------------------------------------------------------------
+
+impl Run for Mutect2 {
+    fn run(&mut self) -> anyhow::Result<()> {
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "mutect2")
+            .with_context(|| format!("Cannot start Mutect2 for {}", self.id))?;
+
+        if !self.should_run() {
+            info!("Mutect2 is up-to-date for {}.", self.id);
+            return Ok(());
+        }
+
+        if self.config.slurm_runner {
+            run_mutect2_chunked(&self.id, &self.config, DEFAULT_N_PARTS)
+        } else {
+            run!(&self.config, self)?;
+            self.filter_pass()
+        }
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Annotation / Variants
+// ---------------------------------------------------------------------------
+
+impl CallerCat for Mutect2 {
+    fn caller_cat(&self) -> Annotation {
+        Annotation::Callers(Caller::Mutect2, Sample::Somatic)
+    }
+}
+
+impl Variants for Mutect2 {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let caller = self.caller_cat();
+        let vcf_passed = self.config.gatk_passed_vcf(&self.id);
+
+        info!("Loading variants from {caller}: {vcf_passed}");
+
+        let variants = read_vcf(&vcf_passed)
+            .with_context(|| format!("Failed to read Mutect2 VCF {vcf_passed}"))?;
+
+        variants.par_iter().for_each(|v| {
+            annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
+        });
+
+        info!("{caller}: {} variants loaded", variants.len());
+
+        Ok(VariantCollection {
+            variants,
+            vcf: Vcf::new(vcf_passed.into())?,
+            caller,
+        })
+    }
+}
+
+impl Label for Mutect2 {
+    fn label(&self) -> String {
+        self.caller_cat().to_string()
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Version
+// ---------------------------------------------------------------------------
+
+impl Version for Mutect2 {
+    fn version(config: &Config) -> anyhow::Result<String> {
+        let out = ProcessCommand::new("bash")
+            .arg("-c")
+            .arg(format!(
+                "{} exec {} gatk --version",
+                config.singularity_bin, config.gatk_image
+            ))
+            .stdout(Stdio::piped())
+            .stderr(Stdio::piped())
+            .output()
+            .context("Failed to spawn Singularity process for GATK version")?;
+
+        if !out.status.success() {
+            let combined = format!(
+                "{}{}",
+                String::from_utf8_lossy(&out.stdout),
+                String::from_utf8_lossy(&out.stderr)
+            );
+            anyhow::bail!(
+                "Singularity exec failed with status {}: {}",
+                out.status,
+                combined
+            );
+        }
+
+        let combined = format!(
+            "{}{}",
+            String::from_utf8_lossy(&out.stdout),
+            String::from_utf8_lossy(&out.stderr)
+        );
+
+        parse_gatk_version(&combined)
+    }
+
+    fn version_slurm(config: &Config) -> anyhow::Result<String> {
+        struct GatkVersionJob<'a> {
+            config: &'a Config,
+        }
+
+        impl JobCommand for GatkVersionJob<'_> {
+            fn cmd(&self) -> String {
+                format!(
+                    "{} exec {} gatk --version",
+                    self.config.singularity_bin, self.config.gatk_image
+                )
+            }
+        }
+
+        impl SlurmRunner for GatkVersionJob<'_> {
+            fn slurm_args(&self) -> Vec<String> {
+                SlurmParams {
+                    job_name: Some("gatk_version".into()),
+                    partition: Some("shortq".into()),
+                    cpus_per_task: Some(1),
+                    mem: Some("10G".into()),
+                    gres: None,
+                }
+                .to_args()
+            }
+        }
+
+        let mut job = GatkVersionJob { config };
+        let out = crate::commands::SlurmRunner::exec(&mut job)
+            .context("Failed to run GATK --version via Slurm")?;
+
+        let mut combined = out.stdout.clone();
+        if let Some(epilog) = &out.slurm_epilog {
+            combined.push_str(&epilog.to_string());
+        }
+        combined.push_str(&out.stderr);
+
+        parse_gatk_version(&combined)
+    }
+}
+
+/// Parses GATK version from `gatk --version` output.
+///
+/// Expected format: `The Genome Analysis Toolkit (GATK) v4.6.1.0`
+fn parse_gatk_version(output: &str) -> anyhow::Result<String> {
+    let re = Regex::new(r"(?m)GATK\)\s*v([^\s]+)").expect("GATK version regex is valid");
+
+    let caps = re
+        .captures(output)
+        .context("Could not parse GATK version from output")?;
+
+    Ok(caps
+        .get(1)
+        .expect("Regex has capture group 1")
+        .as_str()
+        .to_string())
+}
+
+// ---------------------------------------------------------------------------
+// BED I/O helpers
+// ---------------------------------------------------------------------------
+
+/// Reads a BED file into a vector of [`BedRow`], skipping comments and empty lines.
+fn read_bed_file(path: &Path) -> anyhow::Result<Vec<BedRow>> {
+    let file = fs::File::open(path)
+        .with_context(|| format!("Failed to open BED file: {}", path.display()))?;
+    let reader = BufReader::new(file);
+
+    reader
+        .lines()
+        .filter_map(|line| {
+            let line = line.ok()?;
+            let trimmed = line.trim();
+            if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with("track") {
+                None
+            } else {
+                Some(trimmed.parse::<BedRow>())
+            }
+        })
+        .collect::<Result<Vec<_>, _>>()
+        .context("Failed to parse BED file")
+}
+
+/// Writes a slice of [`BedRow`] as BED3 (contig, start, end) to `path`.
+fn write_bed_chunk(rows: &[BedRow], path: &Path) -> anyhow::Result<()> {
+    let mut f = fs::File::create(path)
+        .with_context(|| format!("Failed to create BED chunk: {}", path.display()))?;
+    for row in rows {
+        let r = &row.range;
+        // Adjust field access to match your GenomeRange struct.
+        writeln!(f, "{}\t{}\t{}", r.contig(), r.range.start, r.range.end)?;
+    }
+    Ok(())
+}
+
+/// Splits BED rows into `n` roughly equal chunks.
+///
+/// The last chunk absorbs any remainder rows. Returns at most `n` chunks
+/// (fewer if `rows.len() < n`).
+fn split_bed_rows(rows: &[BedRow], n: usize) -> Vec<&[BedRow]> {
+    if n == 0 || rows.is_empty() {
+        return vec![];
+    }
+    let n = n.min(rows.len());
+    let chunk_size = rows.len() / n;
+    let remainder = rows.len() % n;
+
+    let mut chunks = Vec::with_capacity(n);
+    let mut offset = 0;
+    for i in 0..n {
+        let extra = if i < remainder { 1 } else { 0 };
+        let end = offset + chunk_size + extra;
+        chunks.push(&rows[offset..end]);
+        offset = end;
+    }
+    chunks
+}
+
+// ---------------------------------------------------------------------------
+// Chunked execution
+// ---------------------------------------------------------------------------
+
+/// Merges per-part PASS VCFs into the final PASS VCF.
+fn merge_mutect2_parts(base: &Mutect2, n_parts: usize) -> anyhow::Result<()> {
+    let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
+
+    for i in 1..=n_parts {
+        let mut m = base.clone();
+        m.part_index = Some(i);
+        let part_pass = m.passed_vcf_path();
+
+        anyhow::ensure!(
+            Path::new(&part_pass).exists(),
+            "Missing Mutect2 part {i} PASS VCF: {part_pass}"
+        );
+
+        part_pass_paths.push(PathBuf::from(part_pass));
+    }
+
+    let final_passed_vcf = base.config.gatk_passed_vcf(&base.id);
+    let rand = Uuid::new_v4();
+    let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
+    let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
+
+    if let Some(parent) = Path::new(&final_passed_vcf).parent() {
+        fs::create_dir_all(parent)?;
+    }
+
+    info!(
+        "Concatenating {} Mutect2 part VCFs into {}",
+        n_parts, final_passed_vcf
+    );
+
+    let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
+    run!(&base.config, &mut concat).context("Failed to run bcftools concat for Mutect2 parts")?;
+
+    fs::rename(&final_tmp, &final_passed_vcf)
+        .context("Failed to rename merged Mutect2 PASS VCF")?;
+
+    fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
+        .context("Failed to rename merged Mutect2 PASS VCF CSI index")?;
+
+    info!(
+        "Successfully merged {} Mutect2 parts into {}",
+        n_parts, final_passed_vcf
+    );
+
+    Ok(())
+}
+
+/// Runs GATK Mutect2 in parallel chunks over BED intervals, then merges results.
+///
+/// 1. Reads the BED interval file from `config.gatk_bed_path`
+/// 2. Splits intervals into `n_parts` sub-BED files
+/// 3. Runs the full Mutect2 pipeline on each chunk (local or Slurm)
+/// 4. Filters PASS variants per chunk
+/// 5. Concatenates all PASS VCFs into the final output
+///
+/// # Arguments
+///
+/// * `id` — Sample identifier
+/// * `config` — Global pipeline configuration
+/// * `n_parts` — Number of parallel chunks (typically 20–30 for WGS)
+///
+/// # Errors
+///
+/// Returns an error if:
+/// - `n_parts` is 0
+/// - BED file cannot be read or is empty
+/// - Mutect2 execution fails on any chunk
+/// - PASS filtering or VCF merging fails
+pub fn run_mutect2_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
+    anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
+
+    let base = Mutect2::initialize(id, config)?;
+
+    // Read and split BED intervals
+    let bed_rows = read_bed_file(&base.bed_path)
+        .with_context(|| format!("Failed to read BED: {}", base.bed_path.display()))?;
+
+    anyhow::ensure!(
+        !bed_rows.is_empty(),
+        "BED file is empty: {}",
+        base.bed_path.display()
+    );
+
+    let chunks = split_bed_rows(&bed_rows, n_parts);
+    let actual_n_parts = chunks.len();
+
+    info!(
+        "Running Mutect2 in {} parallel parts for {} ({} BED intervals)",
+        actual_n_parts,
+        id,
+        bed_rows.len()
+    );
+
+    // Build jobs: write sub-BEDs and configure each chunk
+    let mut jobs = Vec::with_capacity(actual_n_parts);
+    for (i, chunk) in chunks.into_iter().enumerate() {
+        let mut job = base.clone();
+        job.part_index = Some(i + 1);
+        job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
+
+        // Write sub-BED into the part output directory
+        let part_dir = job.part_output_dir();
+        fs::create_dir_all(&part_dir)
+            .with_context(|| format!("Failed to create part dir: {part_dir}"))?;
+
+        let sub_bed = PathBuf::from(format!("{part_dir}/intervals.bed"));
+        write_bed_chunk(chunk, &sub_bed)
+            .with_context(|| format!("Failed to write sub-BED for part {}", i + 1))?;
+
+        job.bed_path = sub_bed;
+        info!("Planned Mutect2 job:\n{job}");
+        jobs.push(job);
+    }
+
+    // Run all Mutect2 jobs (local or Slurm, depending on config)
+    let outputs = run_many!(config, jobs.clone())?;
+    for output in outputs.iter() {
+        output.save_to_file(format!("{}/mutect2_", base.log_dir))?;
+    }
+
+    // Filter PASS variants for each part
+    info!(
+        "Filtering PASS variants for all {} Mutect2 parts",
+        actual_n_parts
+    );
+    let filter_jobs: Vec<_> = jobs
+        .iter()
+        .map(|job| {
+            BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
+        })
+        .collect();
+    run_many!(config, filter_jobs)?;
+
+    // Merge all PASS VCFs
+    merge_mutect2_parts(&base, actual_n_parts)?;
+
+    info!(
+        "Mutect2 completed for {}: {} parts merged",
+        id, actual_n_parts
+    );
+
+    Ok(())
+}
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::helpers::test_init;
+
+    #[test]
+    fn gatk_version() -> anyhow::Result<()> {
+        test_init();
+        let vl = Mutect2::version(&Config::default())?;
+        info!("GATK local version: {vl}");
+        let vs = Mutect2::version_slurm(&Config::default())?;
+        info!("GATK slurm version: {vs}");
+        assert_eq!(vl, vs);
+        Ok(())
+    }
+
+    #[test]
+    fn mutect2_run() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+        run_mutect2_chunked("DUMCO", &config, 50)
+    }
+}

+ 6 - 2
src/callers/mod.rs

@@ -138,8 +138,8 @@ use std::{sync::Arc, thread};
 
 use crate::{
     callers::{
-        clairs::ClairS, deep_somatic::DeepSomatic, deep_variant::DeepVariant, nanomonsv::NanomonSV,
-        savana::Savana, severus::Severus, straglr::Straglr,
+        clairs::ClairS, deep_somatic::DeepSomatic, deep_variant::DeepVariant, gatk::Mutect2,
+        nanomonsv::NanomonSV, savana::Savana, severus::Severus, straglr::Straglr,
     },
     commands::longphase::run_phasing_somatic,
     config::Config,
@@ -151,6 +151,7 @@ use crate::{
 pub mod clairs;
 pub mod deep_somatic;
 pub mod deep_variant;
+pub mod gatk;
 pub mod nanomonsv;
 pub mod savana;
 pub mod severus;
@@ -285,6 +286,9 @@ pub fn run_chunkeds(id: &str, config: &Config) -> anyhow::Result<()> {
     // DeepSomatic - somatic SNV/indels
     DeepSomatic::initialize(id, config)?.run()?;
 
+    // Mutect2 - somatic SNV/indels caller
+    Mutect2::initialize(id, config)?.run()?;
+
     // DeepVariant - germline variants for normal sample
     DeepVariant::initialize(id, &config.normal_name, config)?.run()?;
 

+ 3 - 3
src/collection/bam_stats.rs

@@ -1359,10 +1359,10 @@ mod tests {
 
         let config = Config::default();
 
-        let stats = WGSBamStats::open("36167", "norm", &config)?;
-        println!("{stats}");
-        let stats = WGSBamStats::open("36434", "norm", &config)?;
+        let stats = WGSBamStats::open("CHALO", "diag", &config)?;
         println!("{stats}");
+        // let stats = WGSBamStats::open("36434", "norm", &config)?;
+        // println!("{stats}");
         Ok(())
     }
 }

+ 20 - 8
src/commands/dorado.rs

@@ -4,7 +4,7 @@ use std::{
 };
 
 use anyhow::Context;
-use log::info;
+use log::{debug, info};
 
 use crate::{
     collection::pod5::Pod5,
@@ -95,9 +95,19 @@ impl Command for DoradoBasecall {
         let dorado_arg = &self.dorado_basecall_arg;
         let sequencing_kit = &self.sequencing_kit;
 
+        // Error while trying to trim with a non barcode kit !!!
+        // Should find the right way....
+        let sequencing_kit = "SQK-NBD114-24";
+        let trim = if sequencing_kit == "SQK-LSK114" {
+            "adapters"
+        } else {
+            "all"
+        };
+
         format!(
-            "{} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves > {}",
+            "{} basecaller --kit-name {sequencing_kit} --trim {} --emit-moves {dorado_arg} --models-directory /home/t_steimle/ref/dorado_models {} > {}",
             dorado_bin.display(),
+            trim,
             pod_dir.display(),
             bam.display()
         )
@@ -147,13 +157,14 @@ impl super::SlurmRunner for DoradoBasecall {
         let (gpu, n) = if let (Some(h100_av), Some(a100_av)) =
             (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
         {
+            debug!("Available H100: {h100_av} and A100: {a100_av}");
             let (gpu, n) = if h100_av >= a100_av {
                 ("h100", h100_av)
             } else {
                 ("a100", a100_av)
             };
 
-            let n = n.max(2);
+            let n = n.clamp(1, 4);
             (gpu, n)
         } else {
             panic!("Are you running slurm with a100 and h100 GPU ?");
@@ -213,7 +224,7 @@ impl DoradoAlign {
                 job_name: Some("dorado_align".into()),
                 cpus_per_task: Some(threads.into()),
                 mem: Some("30G".into()),
-                partition: Some("shortq".into()),
+                partition: Some("mediumq".into()),
                 gres: None,
             },
         }
@@ -283,11 +294,12 @@ mod tests {
         test_init();
 
         let config = Config::default();
+        let tmp_dir = config.tmp_dir.clone();
 
         let mut dca = DoradoBasecall::from_config(
             &config,
-            format!("{}/inputs/pod5/A", TEST_DIR.as_str()),
-            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()),
+            format!("/mnt/beegfs02/scratch/t_steimle/data/runs/20260206_1540_P2I-00461-A_PBI80774_67adb0bc/pod5"),
+            format!("/mnt/beegfs02/scratch/t_steimle/data/36122_unaligned.bam"),
         );
 
         info!("Basecalling");
@@ -306,8 +318,8 @@ mod tests {
 
         let mut dca = DoradoAlign::from_config(
             &config,
-            format!("{}/outputs/unaligned_10.bam", TEST_DIR.as_str()),
-            format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()),
+            format!("/mnt/beegfs02/scratch/t_steimle/data/36122_unaligned.bam"),
+            format!("/mnt/beegfs02/scratch/t_steimle/data/36122_aligned.bam"),
         );
 
         info!("Basecalling");

+ 2 - 2
src/commands/samtools.rs

@@ -713,8 +713,8 @@ mod tests {
 
         let sort_1 = SamtoolsSort::from_config(
             &config,
-            "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/DUMCO_diag_hs1_HP.bam",
-            "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/DUMCO_diag_hs1_HP_sort.bam",
+            "/mnt/beegfs02/scratch/t_steimle/data/wgs/CHALO/diag/CHALO_diag_hs1.bam",
+            "/mnt/beegfs02/scratch/t_steimle/data/wgs/CHALO/diag/CHALO_diag_hs1ss.bam",
         );
 
         // let sort_2 = SamtoolsSort::from_config(

+ 64 - 0
src/config.rs

@@ -197,6 +197,53 @@ pub struct Config {
     /// Template for ClairS output directory (`{result_dir}`, `{id}`).
     pub clairs_output_dir: String,
 
+    // === GATK configuration ===
+    /// Path to the GATK container image (Singularity/Apptainer .sif, or a docker:// URI
+    /// if you pull at runtime).
+    ///
+    /// Examples:
+    /// - "/containers/gatk_4.6.0.0.sif"
+    /// - "docker://broadinstitute/gatk:latest"
+    pub gatk_image: String,
+
+    /// Path to a BED file restricting analysis to target regions (0-based, half-open).
+    /// Must match contig naming of the reference/BAMs (e.g. "chr9" vs "9").
+    ///
+    /// Used for targeted calling (e.g. Mutect2 `-L` or region chunking).
+    pub gatk_bed_path: String,
+
+    /// Local single-run CPU threads (non-Slurm execution).
+    /// Used for full-run Mutect2 or other GATK tools.
+    /// Typically forwarded to:
+    ///   - `--native-pair-hmm-threads`
+    ///   - `--reader-threads`
+    /// Should match available cores on the node.
+    pub gatk_threads: usize, // e.g. 32
+
+    /// Local single-run memory limit in GB.
+    /// Used to size Java heap:
+    ///   `--java-options "-Xmx{mem}g"`
+    /// Should leave headroom for native memory (PairHMM, buffers).
+    pub gatk_mem_gb: u32, // e.g. 120
+
+    /// Per-chunk CPU threads when running chunked under Slurm.
+    /// Applies to each parallel job independently.
+    pub gatk_slurm_threads: usize, // e.g. 8
+
+    /// Per-chunk memory (GB) when running under Slurm.
+    /// Used both for scheduler request and Java heap sizing per chunk.
+    /// Must be sufficient for interval-restricted Mutect2.
+    pub gatk_slurm_mem_gb: u32, // e.g. 32
+
+    /// If true, force re-run of GATK steps by removing or ignoring existing outputs.
+    pub gatk_force: bool,
+
+    /// Template for GATK output directory (`{result_dir}`, `{id}`).
+    pub gatk_output_dir: String,
+
+    /// GATK passed VCF
+    pub gatk_passed_vcf: String,
+
     // === Savana configuration ===
     /// Savana binary name or full path.
     pub savana_bin: String,
@@ -771,6 +818,23 @@ impl Config {
         format!("{dir}/{id}_diag_clair3-germline_PASSED.vcf.gz")
     }
 
+    /// gatk_output_dir = "{result_dir}/{id}/{tumoral_name}/GATK"
+    pub fn gatk_output_dir(&self, id: &str) -> String {
+        self.gatk_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+            .replace("{tumoral_name}", &self.tumoral_name)
+    }
+
+    /// gatk_passed_vcf = "{output_dir}/{id}_{tumoral_name}_{reference_name}_GATK_PASSED.vcf.gz"
+    pub fn gatk_passed_vcf(&self, id: &str) -> String {
+        self.gatk_passed_vcf
+            .replace("{output_dir}", &self.gatk_output_dir(id))
+            .replace("{id}", id)
+            .replace("{tumoral_name}", &self.tumoral_name)
+            .replace("{reference_name}", &self.reference_name)
+    }
+
     /// Paired nanomonsv output directory.
     pub fn nanomonsv_output_dir(&self, id: &str, time: &str) -> String {
         self.nanomonsv_output_dir

+ 4 - 0
src/positions.rs

@@ -487,6 +487,10 @@ impl GenomeRange {
             range: start..end,
         }
     }
+
+    pub fn contig(&self) -> String {
+        num_to_contig(self.contig)
+    }
     /// Creates a `GenomeRange` from 1-based inclusive start and end positions.
     ///
     /// This method is useful when working with data sources that use 1-based coordinates