Thomas пре 1 месец
родитељ
комит
211191d148

+ 23 - 5
Cargo.lock

@@ -753,7 +753,7 @@ dependencies = [
  "num-traits",
  "num-traits",
  "serde",
  "serde",
  "wasm-bindgen",
  "wasm-bindgen",
- "windows-link",
+ "windows-link 0.1.3",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -1843,6 +1843,17 @@ dependencies = [
  "windows-sys 0.59.0",
  "windows-sys 0.59.0",
 ]
 ]
 
 
+[[package]]
+name = "hostname"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "617aaa3557aef3810a6369d0a99fac8a080891b68bd9f9812a1eeda0c0730cbd"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "windows-link 0.2.1",
+]
+
 [[package]]
 [[package]]
 name = "hts-sys"
 name = "hts-sys"
 version = "2.2.0"
 version = "2.2.0"
@@ -2992,6 +3003,7 @@ dependencies = [
  "glob",
  "glob",
  "hashbrown 0.15.5",
  "hashbrown 0.15.5",
  "hex",
  "hex",
+ "hostname",
  "indicatif 0.17.11",
  "indicatif 0.17.11",
  "itertools 0.14.0",
  "itertools 0.14.0",
  "lazy_static",
  "lazy_static",
@@ -5247,7 +5259,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
 dependencies = [
 dependencies = [
  "windows-implement",
  "windows-implement",
  "windows-interface",
  "windows-interface",
- "windows-link",
+ "windows-link 0.1.3",
  "windows-result",
  "windows-result",
  "windows-strings",
  "windows-strings",
 ]
 ]
@@ -5280,13 +5292,19 @@ version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
 checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
 
 
+[[package]]
+name = "windows-link"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
+
 [[package]]
 [[package]]
 name = "windows-result"
 name = "windows-result"
 version = "0.3.4"
 version = "0.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
 checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
 dependencies = [
 dependencies = [
- "windows-link",
+ "windows-link 0.1.3",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -5295,7 +5313,7 @@ version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
 checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
 dependencies = [
 dependencies = [
- "windows-link",
+ "windows-link 0.1.3",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -5371,7 +5389,7 @@ version = "0.53.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
 checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
 dependencies = [
 dependencies = [
- "windows-link",
+ "windows-link 0.1.3",
  "windows_aarch64_gnullvm 0.53.0",
  "windows_aarch64_gnullvm 0.53.0",
  "windows_aarch64_msvc 0.53.0",
  "windows_aarch64_msvc 0.53.0",
  "windows_i686_gnu 0.53.0",
  "windows_i686_gnu 0.53.0",

+ 1 - 0
Cargo.toml

@@ -53,6 +53,7 @@ rustc-hash = "2.1.1"
 hex = "0.4.3"
 hex = "0.4.3"
 walkdir = "2.5.0"
 walkdir = "2.5.0"
 thiserror = "2.0.17"
 thiserror = "2.0.17"
+hostname = "0.4.2"
 
 
 [profile.dev]
 [profile.dev]
 opt-level = 0
 opt-level = 0

+ 15 - 19
src/callers/clairs.rs

@@ -167,27 +167,15 @@
 //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
 //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
 //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
 //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
 use crate::{
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
-    collection::vcf::Vcf,
-    commands::{
-        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        samtools::SamtoolsMergeMany,
-        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
-        SlurmRunner,
-    },
-    config::Config,
-    helpers::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
+        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, samtools::SamtoolsMergeMany
+    }, config::Config, helpers::{
         get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists,
         get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists,
         singularity_bind_flags, split_genome_into_n_regions_exact,
         singularity_bind_flags, split_genome_into_n_regions_exact,
-    },
-    io::vcf::read_vcf,
-    pipes::{Initialize, ShouldRun, Version},
-    run, run_many,
-    runners::Run,
-    variant::{
+    }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{
         variant_collection::VariantCollection,
         variant_collection::VariantCollection,
         vcf_variant::{Label, Variants},
         vcf_variant::{Label, Variants},
-    },
+    }
 };
 };
 
 
 use anyhow::Context;
 use anyhow::Context;
@@ -402,10 +390,14 @@ impl SbatchRunner for ClairS {
 
 
 impl Run for ClairS {
 impl Run for ClairS {
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
+            .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("ClairS is up-to-date.");
             info!("ClairS is up-to-date.");
             return Ok(());
             return Ok(());
-            // return Err(RunErr::UpToDate("ClairS").into());
         }
         }
 
 
         if self.config.slurm_runner {
         if self.config.slurm_runner {
@@ -866,7 +858,11 @@ pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()>
 /// run_clairs_chunked("sample_001", &config, 30)?;
 /// run_clairs_chunked("sample_001", &config, 30)?;
 /// ```
 /// ```
 pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
 pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
-    anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
+    anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
+
+    let lock_dir = format!("{}/locks", config.result_dir);
+    let _lock = SampleLock::acquire(&lock_dir, id, "clairs")
+        .with_context(|| format!("Cannot start ClairS chunked for {}", id))?;
 
 
     let base = ClairS::initialize(id, config)?;
     let base = ClairS::initialize(id, config)?;
 
 

+ 10 - 0
src/callers/deep_somatic.rs

@@ -108,6 +108,7 @@ use crate::{
         split_genome_into_n_regions_exact,
         split_genome_into_n_regions_exact,
     },
     },
     io::vcf::read_vcf,
     io::vcf::read_vcf,
+    locker::SampleLock,
     pipes::{Initialize, ShouldRun, Version},
     pipes::{Initialize, ShouldRun, Version},
     run, run_many,
     run, run_many,
     runners::Run,
     runners::Run,
@@ -364,6 +365,10 @@ impl DeepSomatic {
 
 
 impl Run for DeepSomatic {
 impl Run for DeepSomatic {
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepsomatic")
+            .with_context(|| format!("Cannot start DeepSomatic chunked for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("DeepSomatic is up-to-data.");
             info!("DeepSomatic is up-to-data.");
             return Ok(());
             return Ok(());
@@ -588,6 +593,11 @@ fn merge_deepsomatic_parts(base: &DeepSomatic, n_parts: usize) -> anyhow::Result
 pub fn run_deepsomatic_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
 pub fn run_deepsomatic_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
     anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
     anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
 
 
+    // Lock at the chunked level (caller may have already locked, but this is idempotent protection)
+    let lock_dir = format!("{}/locks", config.result_dir);
+    let _lock = SampleLock::acquire(&lock_dir, id, "deepsomatic")
+        .with_context(|| format!("Cannot start DeepSomatic chunked for {}", id))?;
+
     let base = DeepSomatic::initialize(id, config)?;
     let base = DeepSomatic::initialize(id, config)?;
 
 
     if !base.should_run() {
     if !base.should_run() {

+ 9 - 0
src/callers/deep_variant.rs

@@ -110,6 +110,7 @@ use crate::{
         split_genome_into_n_regions_exact,
         split_genome_into_n_regions_exact,
     },
     },
     io::vcf::read_vcf,
     io::vcf::read_vcf,
+    locker::SampleLock,
     pipes::{InitializeSolo, ShouldRun, Version},
     pipes::{InitializeSolo, ShouldRun, Version},
     run, run_many,
     run, run_many,
     runners::Run,
     runners::Run,
@@ -482,6 +483,10 @@ impl DeepVariant {
 
 
 impl Run for DeepVariant {
 impl Run for DeepVariant {
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepvariant")
+            .with_context(|| format!("Cannot start DeepVariant chunked for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("DeepVariant is up-to-data.");
             info!("DeepVariant is up-to-data.");
             return Ok(());
             return Ok(());
@@ -788,6 +793,10 @@ pub fn run_deepvariant_chunked(
 ) -> anyhow::Result<()> {
 ) -> anyhow::Result<()> {
     anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
     anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
 
 
+    let lock_dir = format!("{}/locks", config.result_dir);
+    let _lock = SampleLock::acquire(&lock_dir, id, "deepvariant")
+        .with_context(|| format!("Cannot start DeepVariant chunked for {}", id))?;
+
     let base = DeepVariant::initialize(id, time_point, config)?;
     let base = DeepVariant::initialize(id, time_point, config)?;
 
 
     if !base.should_run() {
     if !base.should_run() {

+ 12 - 0
src/callers/mod.rs

@@ -69,6 +69,18 @@
 //!
 //!
 //! Execution mode is automatically selected based on `config.slurm_runner`.
 //! Execution mode is automatically selected based on `config.slurm_runner`.
 //!
 //!
+//! ## Concurrency Control
+//!
+//! All callers use [`SampleLock`] to prevent concurrent execution on the same sample.
+//! This is critical for:
+//! - Preventing data corruption from parallel writes
+//! - Avoiding redundant computation when multiple jobs target the same sample
+//! - Ensuring atomicity of multi-step pipelines (e.g., chunked execution + merge)
+//!
+//! The locking mechanism uses atomic directory creation, which is reliable on distributed
+//! filesystems (BeegFS, NFS, Lustre). Stale locks are automatically detected and cleaned
+//! via SLURM job ID validation or PID checks.
+//!
 //! ## Typical Workflow
 //! ## Typical Workflow
 //!
 //!
 //! 1. **Initialize** - Create caller instance with `Initialize::initialize()` or `InitializeSolo::initialize()`
 //! 1. **Initialize** - Create caller instance with `Initialize::initialize()` or `InitializeSolo::initialize()`

+ 14 - 14
src/callers/nanomonsv.rs

@@ -81,22 +81,12 @@ use anyhow::Context;
 use log::{debug, error, info, warn};
 use log::{debug, error, info, warn};
 
 
 use crate::{
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
-    collection::vcf::Vcf,
-    commands::{
-        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner,
-    },
-    config::Config,
-    helpers::{is_file_older, remove_dir_if_exists},
-    io::vcf::read_vcf,
-    pipes::{Initialize, InitializeSolo, ShouldRun, Version},
-    run,
-    runners::Run,
-    variant::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
+        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}
+    }, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, InitializeSolo, ShouldRun, Version}, run, runners::Run, variant::{
         variant_collection::VariantCollection,
         variant_collection::VariantCollection,
         vcf_variant::{Label, Variants},
         vcf_variant::{Label, Variants},
-    },
+    }
 };
 };
 
 
 /// NanomonSV paired (tumor-normal) structural variant caller.
 /// NanomonSV paired (tumor-normal) structural variant caller.
@@ -268,6 +258,11 @@ impl Run for NanomonSV {
     /// - Log files cannot be written
     /// - Log files cannot be written
     /// - Reference genome or control panel files are missing
     /// - Reference genome or control panel files are missing
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "nanomonsv")
+            .with_context(|| format!("Cannot start NanomonSV for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("NanomonSV is up-to-data.");
             info!("NanomonSV is up-to-data.");
             return Ok(());
             return Ok(());
@@ -537,6 +532,11 @@ impl Run for NanomonSVSolo {
     /// # Errors
     /// # Errors
     /// Returns an error if any pipeline step fails or log files cannot be written.
     /// Returns an error if any pipeline step fails or log files cannot be written.
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "nanomonsv_solo")
+            .with_context(|| format!("Cannot start NanomonSV Solo for {}", self.id))?;
+
         // Parse
         // Parse
         info!("Nanomonsv Parse");
         info!("Nanomonsv Parse");
         let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point);
         let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point);

+ 16 - 28
src/callers/savana.rs

@@ -74,18 +74,22 @@ use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
     collection::vcf::Vcf,
     collection::vcf::Vcf,
     commands::{
     commands::{
-        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::BcftoolsKeepPass, longphase::{LongphaseHap, LongphasePhase}, samtools::SamtoolsIndex
+        bcftools::BcftoolsKeepPass,
+        longphase::{LongphaseHap, LongphasePhase},
+        samtools::SamtoolsIndex,
+        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner,
     },
     },
     config::Config,
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists},
     helpers::{is_file_older, remove_dir_if_exists},
     io::{readers::get_gz_reader, vcf::read_vcf},
     io::{readers::get_gz_reader, vcf::read_vcf},
-    pipes::{Initialize, ShouldRun, Version},
-    positions::{GenomeRange, num_to_contig},
+    locker::SampleLock,
+    pipes::{Initialize, InitializeSolo, ShouldRun, Version},
+    positions::{num_to_contig, GenomeRange},
     run,
     run,
     runners::Run,
     runners::Run,
     variant::{
     variant::{
-        vcf_variant::{Label, Variants},
         variant_collection::VariantCollection,
         variant_collection::VariantCollection,
+        vcf_variant::{Label, Variants},
     },
     },
 };
 };
 use anyhow::Context;
 use anyhow::Context;
@@ -234,6 +238,10 @@ impl Run for Savana {
     ///
     ///
     /// `Ok(())` if the run completes successfully, or an error otherwise.
     /// `Ok(())` if the run completes successfully, or an error otherwise.
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "savana")
+            .with_context(|| format!("Cannot start SAVANA chunked for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("Savana is up-to-date.");
             info!("Savana is up-to-date.");
             return Ok(());
             return Ok(());
@@ -261,14 +269,9 @@ impl Run for Savana {
 
 
             // Check for haplotagged bam
             // Check for haplotagged bam
             if !Path::new(&normal_hp_bam).exists() {
             if !Path::new(&normal_hp_bam).exists() {
-                let mut normal_hap = LongphaseHap::new(
-                    &self.id,
-                    &self.config.normal_bam(&self.id),
-                    &phased_germline_vcf,
-                    self.config.clone(),
-                );
+                let mut normal_hap =
+                    LongphaseHap::initialize(&self.id, &self.config.normal_name, &self.config)?;
                 normal_hap.run()?;
                 normal_hap.run()?;
-                // crate::runners::Run::run(&mut normal_hap)?;
             } else {
             } else {
                 info!("Haplotagged normal BAM is already up-to-date");
                 info!("Haplotagged normal BAM is already up-to-date");
             }
             }
@@ -279,14 +282,9 @@ impl Run for Savana {
             }
             }
 
 
             if !Path::new(&tumoral_hp_bam).exists() {
             if !Path::new(&tumoral_hp_bam).exists() {
-                let mut tumoral_hap = LongphaseHap::new(
-                    &self.id,
-                    &self.config.tumoral_bam(&self.id),
-                    &phased_germline_vcf,
-                    self.config.clone(),
-                );
+                let mut tumoral_hap =
+                    LongphaseHap::initialize(&self.id, &self.config.tumoral_name, &self.config)?;
                 tumoral_hap.run()?;
                 tumoral_hap.run()?;
-                // crate::runners::Run::run(&mut tumoral_hap)?;
             } else {
             } else {
                 info!("Haplotagged tumoral BAM is already up-to-date");
                 info!("Haplotagged tumoral BAM is already up-to-date");
             }
             }
@@ -345,16 +343,6 @@ impl Run for Savana {
     }
     }
 }
 }
 
 
-impl Savana {
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        if self.should_run() {
-            <Self as crate::runners::Run>::run(self)
-        } else {
-            anyhow::bail!("Savana is up-to-date.")
-        }
-    }
-}
-
 impl Version for Savana {
 impl Version for Savana {
     /// Retrieves the Savana version by running `savana --version` in its conda environment.
     /// Retrieves the Savana version by running `savana --version` in its conda environment.
     ///
     ///

+ 15 - 16
src/callers/severus.rs

@@ -78,21 +78,11 @@
 //! - [Severus GitHub](https://github.com/KolmogorovLab/Severus)
 //! - [Severus GitHub](https://github.com/KolmogorovLab/Severus)
 //! - [Severus Paper](https://doi.org/10.1038/s41587-024-02340-1)
 //! - [Severus Paper](https://doi.org/10.1038/s41587-024-02340-1)
 use crate::{
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
-    collection::vcf::Vcf,
-    commands::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
         Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::BcftoolsKeepPassPrecise, longphase::LongphasePhase
         Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::BcftoolsKeepPassPrecise, longphase::LongphasePhase
-    },
-    config::Config,
-    helpers::{is_file_older, remove_dir_if_exists},
-    io::vcf::read_vcf,
-    pipes::{Initialize, InitializeSolo, ShouldRun, Version},
-    run,
-    runners::Run,
-    variant::{
-        vcf_variant::{Label, Variants},
-        variant_collection::VariantCollection,
-    },
+    }, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, InitializeSolo, ShouldRun, Version}, run, runners::Run, variant::{
+        variant_collection::VariantCollection, vcf_variant::{Label, Variants}
+    }
 };
 };
 use anyhow::Context;
 use anyhow::Context;
 use log::{debug, info};
 use log::{debug, info};
@@ -198,6 +188,11 @@ impl Run for Severus {
     /// - PASS filtering via bcftools fails
     /// - PASS filtering via bcftools fails
     /// - Log files cannot be written
     /// - Log files cannot be written
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "severus")
+            .with_context(|| format!("Cannot start Severus for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("Severus is up-to-date");
             info!("Severus is up-to-date");
             return Ok(());
             return Ok(());
@@ -213,8 +208,7 @@ impl Run for Severus {
             // Run Longphase if necessary
             // Run Longphase if necessary
             if !Path::new(constit_phased_vcf).exists() {
             if !Path::new(constit_phased_vcf).exists() {
                 let mut phase = LongphasePhase::initialize(&self.id, &self.config)?;
                 let mut phase = LongphasePhase::initialize(&self.id, &self.config)?;
-                crate::runners::Run::run(&mut phase)?;
-                // run!(&self.config, &mut phase)?;
+                run!(&self.config, &mut phase)?;
             }
             }
 
 
             fs::create_dir_all(self.config.severus_output_dir(id))
             fs::create_dir_all(self.config.severus_output_dir(id))
@@ -470,6 +464,11 @@ impl Run for SeverusSolo {
     /// # Errors
     /// # Errors
     /// Returns an error if Severus execution, filtering, or log writing fails.
     /// Returns an error if Severus execution, filtering, or log writing fails.
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "severus_solo")
+            .with_context(|| format!("Cannot start Severus Solo for {}", self.id))?;
+
         let id = &self.id;
         let id = &self.id;
         let time = &self.time;
         let time = &self.time;
 
 

+ 6 - 7
src/callers/straglr.rs

@@ -124,13 +124,7 @@ use crate::{
     commands::{
     commands::{
         Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
         Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
         SlurmRunner,
         SlurmRunner,
-    },
-    config::Config,
-    helpers::{is_file_older, remove_dir_if_exists},
-    io::straglr::{read_straglr_tsv, StraglrRow},
-    pipes::{Initialize, InitializeSolo, ShouldRun, Version},
-    run, run_many,
-    runners::Run,
+    }, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::straglr::{StraglrRow, read_straglr_tsv}, locker::SampleLock, pipes::{Initialize, InitializeSolo, ShouldRun, Version}, run, run_many, runners::Run
 };
 };
 use anyhow::Context;
 use anyhow::Context;
 use log::{debug, info};
 use log::{debug, info};
@@ -246,6 +240,11 @@ impl Run for Straglr {
     /// - Straglr execution fails for either sample
     /// - Straglr execution fails for either sample
     /// - Log files cannot be written
     /// - Log files cannot be written
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
+        // Acquire lock before any work
+        let lock_dir = format!("{}/locks", self.config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.id, "straglr")
+            .with_context(|| format!("Cannot start Straglr for {}", self.id))?;
+
         if !self.should_run() {
         if !self.should_run() {
             info!("Straglr is up-to-date");
             info!("Straglr is up-to-date");
             return Ok(());
             return Ok(());

+ 8 - 1
src/collection/prom_run.rs

@@ -82,6 +82,7 @@ use crate::{
     },
     },
     config::Config,
     config::Config,
     helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
     helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
+    locker::SampleLock,
     pipes::InitializeSolo,
     pipes::InitializeSolo,
     run, run_many,
     run, run_many,
 };
 };
@@ -439,6 +440,10 @@ impl PromRun {
     /// - BAM files are already aligned
     /// - BAM files are already aligned
     /// - External commands fail (Dorado, samtools, Slurm)
     /// - External commands fail (Dorado, samtools, Slurm)
     pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
     pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
+        let lock_dir = format!("{}/locks", config.result_dir);
+        let _lock = SampleLock::acquire(&lock_dir, &self.protocol_run_id, "process_bams")
+            .with_context(|| format!("Cannot start Processing BAMs for {}", self.dir.display()))?;
+
         let pipeline_start = Timer::start();
         let pipeline_start = Timer::start();
         let mut metrics = PipelineMetrics::default();
         let mut metrics = PipelineMetrics::default();
 
 
@@ -1222,7 +1227,9 @@ fn filter_pass_bams<'a>(bams: &[&'a PromBam], kit_type: KitType) -> Vec<&'a Prom
     bams.iter()
     bams.iter()
         .filter(|bam| {
         .filter(|bam| {
             let p = bam.path.to_string_lossy();
             let p = bam.path.to_string_lossy();
-            if p.contains("bam_fail") { return false; }
+            if p.contains("bam_fail") {
+                return false;
+            }
 
 
             match kit_type {
             match kit_type {
                 KitType::Multiplexed => p.contains("bam_pass"),
                 KitType::Multiplexed => p.contains("bam_pass"),

+ 175 - 29
src/commands/longphase.rs

@@ -1,19 +1,57 @@
 //! Longphase haplotagging, phasing, and modcall runners.
 //! Longphase haplotagging, phasing, and modcall runners.
 //!
 //!
-//! All steps use the shared runner traits (local/Slurm) driven by the global `Config`.
+//! This module provides runners for [longphase](https://github.com/twolinin/longphase),
+//! a tool for SNP phasing and haplotagging using long-read sequencing data.
+//!
+//! # Overview
+//!
+//! The module implements three main operations for somatic cancer genomics workflows:
+//!
+//! - **Modcall** ([`LongphaseModcallSolo`]): Extracts methylation information from BAM files
+//!   to assist phasing. Uses adaptive thresholds derived from modkit summary.
+//! - **Phase** ([`LongphasePhase`]): Phases [`ClairS`] germline variants using read-backed phasing
+//!   with methylation support.
+//! - **Haplotag** ([`LongphaseHap`]): Tags BAM reads with haplotype assignments (HP tags)
+//!   based on phased variants.
+//!
+//! # Execution Modes
+//!
+//! All runners support both local and SLURM cluster execution via the shared runner traits.
+//! [`LongphaseHap`] additionally supports chunked per-chromosome execution on SLURM for
+//! improved parallelism on large BAMs.
+//!
+//! # Workflow
+//!
+//! The typical somatic workflow is orchestrated by [`run_phasing_somatic`]:
+//!
+//! ```text
+//! 1. Modcall (normal)  ─┐
+//! 2. Modcall (tumor)   ─┼─> 3. Phase (germline) ─┬─> 4. Haplotag (normal)
+//!                                                └─> 5. Haplotag (tumor)
+//! ```
+//!
+//! # Example
+//!
+//! ```no_run
+//! use crate::config::Config;
+//! use crate::commands::longphase::run_phasing_somatic;
+//!
+//! let config = Config::load("config.toml")?;
+//! run_phasing_somatic("SAMPLE_001", &config)?;
+//! ```
 use crate::{
 use crate::{
     commands::{
     commands::{
         bcftools::{BcftoolsCompress, BcftoolsIndex, BcftoolsKeepPass},
         bcftools::{BcftoolsCompress, BcftoolsIndex, BcftoolsKeepPass},
         samtools::{SamtoolsIndex, SamtoolsMergeMany},
         samtools::{SamtoolsIndex, SamtoolsMergeMany},
     },
     },
     config::Config,
     config::Config,
-    helpers::{get_genome_sizes, is_file_older, path_prefix},
+    helpers::{get_genome_sizes, is_file_older, path_prefix, TempDirGuard},
+    locker::SampleLock,
     pipes::{Initialize, InitializeSolo, ShouldRun},
     pipes::{Initialize, InitializeSolo, ShouldRun},
     run, run_many,
     run, run_many,
     runners::Run,
     runners::Run,
 };
 };
 use anyhow::Context;
 use anyhow::Context;
-use log::warn;
 use rust_htslib::bam::{self, Read};
 use rust_htslib::bam::{self, Read};
 use std::{
 use std::{
     fs,
     fs,
@@ -24,7 +62,39 @@ use uuid::Uuid;
 
 
 use super::modkit::ModkitSummary;
 use super::modkit::ModkitSummary;
 
 
+/// Runs the complete somatic phasing pipeline for a sample.
+///
+/// This function orchestrates the full phasing workflow for tumor/normal pairs:
+///
+/// 1. Runs modcall on normal BAM to extract methylation calls
+/// 2. Runs modcall on tumor BAM to extract methylation calls  
+/// 3. Phases germline variants from [`ClairS`] using normal BAM and methylation data
+/// 4. Haplotags normal BAM with phased haplotype assignments
+/// 5. Haplotags tumor BAM with phased haplotype assignments
+///
+/// # Arguments
+///
+/// * `id` - Sample identifier
+/// * `config` - Pipeline configuration containing paths and parameters
+///
+/// # Errors
+///
+/// Returns an error if:
+/// - Sample lock cannot be acquired (another process is working on this sample)
+/// - Any pipeline step fails
+///
+/// # Example
+///
+/// ```no_run
+/// let config = Config::load("config.toml")?;
+/// run_phasing_somatic("SAMPLE_001", &config)?;
+/// ```
 pub fn run_phasing_somatic(id: &str, config: &Config) -> anyhow::Result<()> {
 pub fn run_phasing_somatic(id: &str, config: &Config) -> anyhow::Result<()> {
+    // Acquire lock before any work
+    let lock_dir = format!("{}/locks", config.result_dir);
+    let _lock = SampleLock::acquire(&lock_dir, id, "phasing_somatic")
+        .with_context(|| format!("Cannot start somatic phasing for {id}"))?;
+
     LongphaseModcallSolo::initialize(id, &config.normal_name, config)?.run()?;
     LongphaseModcallSolo::initialize(id, &config.normal_name, config)?.run()?;
     LongphaseModcallSolo::initialize(id, &config.tumoral_name, config)?.run()?;
     LongphaseModcallSolo::initialize(id, &config.tumoral_name, config)?.run()?;
 
 
@@ -36,14 +106,40 @@ pub fn run_phasing_somatic(id: &str, config: &Config) -> anyhow::Result<()> {
     Ok(())
     Ok(())
 }
 }
 
 
+/// Haplotags BAM reads using phased variant calls.
+///
+/// Assigns HP (haplotype) tags to reads based on their overlap with phased
+/// heterozygous variants. This enables downstream haplotype-aware analyses
+/// such as allele-specific expression or somatic variant phasing.
+///
+/// # Execution Modes
+///
+/// - **Local**: Processes entire BAM in a single job
+/// - **SLURM**: Splits by chromosome, processes in parallel, then merges
+///
+/// The SLURM chunked mode significantly improves runtime for large BAMs
+/// by parallelizing across chromosomes.
+///
+/// # Output
+///
+/// Produces a haplotagged BAM file with HP:i:1 or HP:i:2 tags on reads
+/// that overlap phased variants. Also tags supplementary alignments
+/// via `--tagSupplementary`.
 #[derive(Debug, Clone)]
 #[derive(Debug, Clone)]
 pub struct LongphaseHap {
 pub struct LongphaseHap {
+    /// Sample identifier.
     pub id: String,
     pub id: String,
+    /// Path to phased VCF file.
     pub vcf: String,
     pub vcf: String,
+    /// Input BAM file path.
     pub bam: PathBuf,
     pub bam: PathBuf,
+    /// Output haplotagged BAM prefix (longphase appends `.bam`).
     pub bam_hp: PathBuf,
     pub bam_hp: PathBuf,
+    /// Pipeline configuration.
     pub config: Config,
     pub config: Config,
+    /// Directory for log files.
     pub log_dir: String,
     pub log_dir: String,
+    /// Command arguments (built at runtime).
     job_args: Vec<String>,
     job_args: Vec<String>,
 }
 }
 
 
@@ -69,24 +165,6 @@ impl InitializeSolo for LongphaseHap {
 }
 }
 
 
 impl LongphaseHap {
 impl LongphaseHap {
-    pub fn new(id: &str, bam: &str, phased_vcf: &str, config: Config) -> Self {
-        let log_dir = format!("{}/{}/log/longphase", config.result_dir, id);
-
-        let bam = Path::new(bam);
-        let new_fn = format!("{}_HP", bam.file_stem().unwrap().to_str().unwrap());
-        let bam_hp = bam.with_file_name(new_fn);
-
-        Self {
-            id: id.to_string(),
-            bam: bam.to_path_buf(),
-            config,
-            log_dir,
-            vcf: phased_vcf.to_string(),
-            bam_hp: bam_hp.to_path_buf(),
-            job_args: Vec::new(),
-        }
-    }
-
     /// Build haplotag arguments, optionally restricted to a given region.
     /// Build haplotag arguments, optionally restricted to a given region.
     fn build_job_args(&self, region: Option<&str>, out_prefix: &Path) -> Vec<String> {
     fn build_job_args(&self, region: Option<&str>, out_prefix: &Path) -> Vec<String> {
         let mut args = vec![
         let mut args = vec![
@@ -115,6 +193,23 @@ impl LongphaseHap {
 }
 }
 
 
 impl LongphaseHap {
 impl LongphaseHap {
+    /// Executes haplotagging in chunked mode on SLURM.
+    ///
+    /// Splits the BAM by chromosome, submits parallel SLURM jobs for each,
+    /// then merges results into the final haplotagged BAM.
+    ///
+    /// # Process
+    ///
+    /// 1. Read chromosome list from BAM header
+    /// 2. Create temporary directory for per-chromosome outputs
+    /// 3. Submit parallel haplotag jobs via SLURM
+    /// 4. Merge all chromosome BAMs with samtools
+    /// 5. Index final merged BAM
+    /// 6. Clean up temporary files
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if any SLURM job fails, merge fails, or indexing fails.
     fn run_chunked_slurm(&mut self) -> anyhow::Result<()> {
     fn run_chunked_slurm(&mut self) -> anyhow::Result<()> {
         // Read contigs from BAM header
         // Read contigs from BAM header
         let reader = bam::Reader::from_path(&self.bam)
         let reader = bam::Reader::from_path(&self.bam)
@@ -134,6 +229,7 @@ impl LongphaseHap {
             Path::new(&self.config.tmp_dir).join(format!("longphase_hap_{}", Uuid::new_v4()));
             Path::new(&self.config.tmp_dir).join(format!("longphase_hap_{}", Uuid::new_v4()));
         fs::create_dir_all(&tmp_dir)
         fs::create_dir_all(&tmp_dir)
             .with_context(|| format!("Failed to create tmp dir {}", tmp_dir.display()))?;
             .with_context(|| format!("Failed to create tmp dir {}", tmp_dir.display()))?;
+        let _guard = TempDirGuard::new(tmp_dir.clone());
 
 
         let mut jobs = Vec::with_capacity(chroms.len());
         let mut jobs = Vec::with_capacity(chroms.len());
         let mut tmp_bams = Vec::with_capacity(chroms.len());
         let mut tmp_bams = Vec::with_capacity(chroms.len());
@@ -179,11 +275,9 @@ impl LongphaseHap {
         run!(&self.config, &mut merge)
         run!(&self.config, &mut merge)
             .context("samtools merge failed for Longphase haplotag chunks")?;
             .context("samtools merge failed for Longphase haplotag chunks")?;
 
 
-        // (Optional) cleanup temp BAMs
         for bam in tmp_bams {
         for bam in tmp_bams {
-            if let Err(e) = fs::remove_file(&bam) {
-                warn!("Failed to remove temp BAM {}: {e}", bam.display());
-            }
+            let _ = fs::remove_file(&bam);
+            let _ = fs::remove_file(format!("{}.bai", bam.display()));
         }
         }
 
 
         // Index final BAM
         // Index final BAM
@@ -203,7 +297,7 @@ impl Run for LongphaseHap {
         }
         }
 
 
         if !Path::new(&self.log_dir).exists() {
         if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory.");
+            fs::create_dir_all(&self.log_dir).context("Failed to create output directory.")?;
         }
         }
 
 
         if self.bam_hp.exists() {
         if self.bam_hp.exists() {
@@ -282,16 +376,40 @@ impl crate::commands::SbatchRunner for LongphaseHap {
     }
     }
 }
 }
 
 
-// /data/tools/longphase_linux-x64 phase -s ClairS/clair3_normal_tumoral_germline_output.vcf.gz -b CUNY_diag_hs1_hp.bam -r /data/ref/hs1/chm13v2.0.fa -t 155 --ont -o ClairS/clair3_normal_tumoral_germline_output_PS
+/// Phases germline variants using read-backed phasing with methylation support.
+///
+/// Uses longphase's `phase` subcommand to assign phase sets (PS tags) to
+/// heterozygous germline variants. Incorporates methylation data from modcall
+/// to improve phasing accuracy and block lengths.
+///
+/// # Input Requirements
+///
+/// - Germline VCF (from ClairS constitutional calling)
+/// - Normal BAM file
+/// - Modcall VCF (methylation calls)
+///
+/// # Output
+///
+/// Produces a phased VCF with:
+/// - GT fields updated with phased genotypes (e.g., `0|1` instead of `0/1`)
+/// - PS (phase set) tags indicating contiguous phased blocks
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct LongphasePhase {
 pub struct LongphasePhase {
+    /// Sample identifier.
     pub id: String,
     pub id: String,
+    /// Input germline VCF path.
     pub vcf: String,
     pub vcf: String,
+    /// Output file prefix.
     pub out_prefix: String,
     pub out_prefix: String,
+    /// Normal BAM file path.
     pub bam: String,
     pub bam: String,
+    /// Pipeline configuration.
     pub config: Config,
     pub config: Config,
+    /// Directory for log files.
     pub log_dir: String,
     pub log_dir: String,
+    /// Modcall VCF with methylation data.
     pub modcall_vcf: String,
     pub modcall_vcf: String,
+    /// Command arguments (built at runtime).
     job_args: Vec<String>,
     job_args: Vec<String>,
 }
 }
 
 
@@ -363,7 +481,8 @@ impl crate::commands::SbatchRunner for LongphasePhase {
 impl Run for LongphasePhase {
 impl Run for LongphasePhase {
     fn run(&mut self) -> anyhow::Result<()> {
     fn run(&mut self) -> anyhow::Result<()> {
         if !self.should_run() {
         if !self.should_run() {
-            info!("Germline phased vcf is up-to-date for {}.", self.id)
+            info!("Germline phased vcf is up-to-date for {}.", self.id);
+            return Ok(());
         }
         }
         info!("Running longphase phase for: {}", self.vcf);
         info!("Running longphase phase for: {}", self.vcf);
         info!("Saving longphase phase results in: {}", self.out_prefix);
         info!("Saving longphase phase results in: {}", self.out_prefix);
@@ -419,17 +538,44 @@ impl Run for LongphasePhase {
     }
     }
 }
 }
 
 
+/// Extracts methylation calls from a BAM file for phasing support.
+///
+/// Uses longphase's `modcall` subcommand to identify methylated CpG sites
+/// from Oxford Nanopore modified base calls (MM/ML tags). The resulting
+/// methylation information improves phasing accuracy by providing additional
+/// haplotype-informative signal.
+///
+/// # Threshold Selection
+///
+/// The methylation threshold is automatically determined from the modkit
+/// summary file, ensuring optimal sensitivity/specificity for the specific
+/// sample's modification calling quality.
+///
+/// # Output
+///
+/// Produces a VCF file with methylation calls at CpG sites, filtered to
+/// PASS-only variants for downstream phasing use.
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct LongphaseModcallSolo {
 pub struct LongphaseModcallSolo {
+    /// Sample identifier.
     pub id: String,
     pub id: String,
+    /// Time point (normal/tumor identifier).
     pub time: String,
     pub time: String,
+    /// Input BAM file path.
     pub bam: String,
     pub bam: String,
+    /// Output file prefix.
     pub prefix: String,
     pub prefix: String,
+    /// Reference genome path.
     pub reference: String,
     pub reference: String,
+    /// Number of threads for modcall.
     pub threads: u8,
     pub threads: u8,
+    /// Directory for log files.
     pub log_dir: String,
     pub log_dir: String,
+    /// Methylation probability threshold from modkit.
     pub mod_threshold: f64,
     pub mod_threshold: f64,
+    /// Pipeline configuration.
     pub config: Config,
     pub config: Config,
+    /// Command arguments (built at runtime).
     job_args: Vec<String>,
     job_args: Vec<String>,
 }
 }
 
 
@@ -512,7 +658,7 @@ impl crate::commands::SlurmRunner for LongphaseModcallSolo {
 impl crate::commands::SbatchRunner for LongphaseModcallSolo {
 impl crate::commands::SbatchRunner for LongphaseModcallSolo {
     fn slurm_params(&self) -> super::SlurmParams {
     fn slurm_params(&self) -> super::SlurmParams {
         crate::commands::SlurmParams {
         crate::commands::SlurmParams {
-            job_name: Some(format!("longphase_modcall_{}", self.id)),
+            job_name: Some(format!("longphase_modcall_{}_{}", self.id, self.time)),
             cpus_per_task: Some(self.threads as u32),
             cpus_per_task: Some(self.threads as u32),
             mem: Some("60G".into()),
             mem: Some("60G".into()),
             partition: Some("shortq".into()),
             partition: Some("shortq".into()),

+ 1 - 0
src/lib.rs

@@ -147,6 +147,7 @@ pub mod runners;
 pub mod scan;
 pub mod scan;
 pub mod slurm_helpers;
 pub mod slurm_helpers;
 pub mod variant;
 pub mod variant;
+pub mod locker;
 
 
 #[macro_use]
 #[macro_use]
 extern crate lazy_static;
 extern crate lazy_static;

+ 227 - 0
src/locker.rs

@@ -0,0 +1,227 @@
+//! Sample-level locking for pipeline execution.
+//!
+//! Uses atomic directory creation for reliable locking on distributed filesystems (BeegFS, NFS, etc.).
+
+use anyhow::Context;
+use log::{debug, info, warn};
+use std::{
+    fs, io,
+    path::{Path, PathBuf},
+    process::Command,
+    time::Duration,
+};
+
+/// Atomic filesystem lock for preventing concurrent pipeline execution on the same sample.
+///
+/// Uses directory creation (atomic on POSIX) rather than file locks, which are unreliable
+/// on distributed filesystems like BeegFS.
+///
+/// # Example
+///
+/// ```ignore
+/// let _lock = SampleLock::acquire("/data/locks", "34528", "clairs")?;
+/// // Lock held until _lock is dropped
+/// ```
+#[derive(Debug)]
+pub struct SampleLock {
+    path: PathBuf,
+}
+
+impl SampleLock {
+    /// Maximum age before a lock is considered stale (24 hours).
+    const STALE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
+
+    /// Attempts to acquire an exclusive lock for a sample/pipeline combination.
+    ///
+    /// # Arguments
+    ///
+    /// * `lock_dir` - Base directory for lock files
+    /// * `id` - Sample identifier
+    /// * `pipeline` - Pipeline name (e.g., "clairs", "deepvariant")
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - Lock directory cannot be created
+    /// - Lock is already held by another process (and not stale)
+    /// - Metadata cannot be written
+    pub fn acquire(lock_dir: &str, id: &str, pipeline: &str) -> anyhow::Result<Self> {
+        fs::create_dir_all(lock_dir)
+            .with_context(|| format!("Failed to create lock directory: {lock_dir}"))?;
+
+        let path = PathBuf::from(format!("{}/{}.{}.lock", lock_dir, id, pipeline));
+
+        match fs::create_dir(&path) {
+            Ok(_) => {
+                let lock = Self { path };
+                lock.write_metadata()?;
+                info!("Acquired lock: {}", lock.path.display());
+                Ok(lock)
+            }
+            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
+                // Check if stale
+                if Self::is_stale(&path) {
+                    warn!("Removing stale lock: {}", path.display());
+                    fs::remove_dir_all(&path)?;
+                    // Retry once
+                    fs::create_dir(&path)?;
+                    let lock = Self { path };
+                    lock.write_metadata()?;
+                    info!(
+                        "Acquired lock (after stale removal): {}",
+                        lock.path.display()
+                    );
+                    Ok(lock)
+                } else {
+                    let holder = Self::read_holder_info(&path);
+                    anyhow::bail!(
+                        "Sample '{}' is locked for '{}' pipeline.\nLock: {}\n{}",
+                        id,
+                        pipeline,
+                        path.display(),
+                        holder
+                    )
+                }
+            }
+            Err(e) => Err(e).with_context(|| format!("Failed to create lock: {}", path.display())),
+        }
+    }
+
+    /// Writes metadata to the lock directory for debugging and stale detection.
+    fn write_metadata(&self) -> anyhow::Result<()> {
+        let meta_path = self.path.join("meta.txt");
+        let hostname = hostname::get()
+            .map(|h| h.to_string_lossy().into_owned())
+            .unwrap_or_else(|_| "unknown".into());
+
+        let slurm_job = std::env::var("SLURM_JOB_ID").unwrap_or_default();
+        let slurm_node = std::env::var("SLURMD_NODENAME").unwrap_or_default();
+
+        let content = format!(
+            "pid={}\nhost={}\nslurm_job_id={}\nslurm_node={}\ntime={}\n",
+            std::process::id(),
+            hostname,
+            slurm_job,
+            slurm_node,
+            chrono::Utc::now().to_rfc3339()
+        );
+
+        fs::write(&meta_path, content)
+            .with_context(|| format!("Failed to write lock metadata: {}", meta_path.display()))?;
+
+        Ok(())
+    }
+
+    /// Reads holder information for error messages.
+    fn read_holder_info(path: &Path) -> String {
+        let meta = path.join("meta.txt");
+        fs::read_to_string(&meta).unwrap_or_else(|_| "Holder info unavailable".into())
+    }
+
+    /// Checks if a lock is stale (process/job no longer running).
+    fn is_stale(path: &Path) -> bool {
+        let meta = path.join("meta.txt");
+
+        if let Ok(content) = fs::read_to_string(&meta) {
+            // Check SLURM job first (most reliable for cluster jobs)
+            if let Some(job_id) = Self::parse_field(&content, "slurm_job_id") {
+                if !job_id.is_empty() {
+                    return !Self::slurm_job_exists(job_id);
+                }
+            }
+
+            // Check local PID (only valid on same host)
+            if let Some(lock_host) = Self::parse_field(&content, "host") {
+                let current_host = hostname::get()
+                    .map(|h| h.to_string_lossy().into_owned())
+                    .unwrap_or_default();
+
+                if lock_host == current_host {
+                    if let Some(pid) =
+                        Self::parse_field(&content, "pid").and_then(|p| p.parse::<u32>().ok())
+                    {
+                        return !Path::new(&format!("/proc/{}", pid)).exists();
+                    }
+                }
+            }
+        }
+
+        // Fall back to timeout-based staleness
+        if let Ok(meta) = fs::metadata(path) {
+            if let Ok(modified) = meta.modified() {
+                if let Ok(age) = modified.elapsed() {
+                    let is_stale = age > Self::STALE_TIMEOUT;
+                    if is_stale {
+                        debug!(
+                            "Lock {} is stale (age: {:?} > {:?})",
+                            path.display(),
+                            age,
+                            Self::STALE_TIMEOUT
+                        );
+                    }
+                    return is_stale;
+                }
+            }
+        }
+
+        false
+    }
+
+    /// Parses a field from the metadata content.
+    fn parse_field<'a>(content: &'a str, field: &str) -> Option<&'a str> {
+        content
+            .lines()
+            .find(|l| l.starts_with(&format!("{}=", field)))
+            .and_then(|l| l.split('=').nth(1))
+    }
+
+    /// Checks if a SLURM job is still running.
+    fn slurm_job_exists(job_id: &str) -> bool {
+        Command::new("squeue")
+            .args(["-j", job_id, "-h", "-o", "%i"])
+            .output()
+            .map(|o| o.status.success() && !o.stdout.is_empty())
+            .unwrap_or(false)
+    }
+}
+
+impl Drop for SampleLock {
+    fn drop(&mut self) {
+        debug!("Releasing lock: {}", self.path.display());
+        if let Err(e) = fs::remove_dir_all(&self.path) {
+            warn!("Failed to release lock {}: {}", self.path.display(), e);
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use tempfile::TempDir;
+
+    #[test]
+    fn test_lock_acquire_release() -> anyhow::Result<()> {
+        let tmp = TempDir::new()?;
+        let lock_dir = tmp.path().to_str().unwrap();
+
+        {
+            let lock = SampleLock::acquire(lock_dir, "sample1", "clairs")?;
+            assert!(lock.path.exists());
+
+            // Second acquire should fail
+            let result = SampleLock::acquire(lock_dir, "sample1", "clairs");
+            assert!(result.is_err());
+
+            // Different sample should succeed
+            let _lock2 = SampleLock::acquire(lock_dir, "sample2", "clairs")?;
+
+            // Different pipeline should succeed
+            let _lock3 = SampleLock::acquire(lock_dir, "sample1", "deepvariant")?;
+        }
+
+        // After drop, should be able to acquire again
+        let _lock = SampleLock::acquire(lock_dir, "sample1", "clairs")?;
+
+        Ok(())
+    }
+}

+ 2 - 2
src/variant/variant_collection.rs

@@ -2,7 +2,7 @@ use std::{
     collections::{HashMap, HashSet},
     collections::{HashMap, HashSet},
     fs::{self, File},
     fs::{self, File},
     io::{Read, Write},
     io::{Read, Write},
-    path::Path, sync::Arc,
+    path::{Path, PathBuf}, sync::Arc,
 };
 };
 
 
 use anyhow::Context;
 use anyhow::Context;
@@ -1533,7 +1533,7 @@ impl ExternalAnnotation {
         mut unfound: Vec<VcfVariant>,
         mut unfound: Vec<VcfVariant>,
         annotations: &Annotations,
         annotations: &Annotations,
     ) -> anyhow::Result<()> {
     ) -> anyhow::Result<()> {
-        let temp_dir = std::env::temp_dir();
+        let temp_dir = PathBuf::from(&self.config.tmp_dir);
 
 
         unfound.par_sort();
         unfound.par_sort();
         unfound.dedup();
         unfound.dedup();