//! Sample-level locking for pipeline execution. //! //! Uses atomic directory creation for reliable locking on distributed filesystems (BeegFS, NFS, etc.). use anyhow::Context; use log::{debug, info, warn}; use std::{ fs, io, path::{Path, PathBuf}, process::Command, time::Duration, }; /// Atomic filesystem lock for preventing concurrent pipeline execution on the same sample. /// /// Uses directory creation (atomic on POSIX) rather than file locks, which are unreliable /// on distributed filesystems like BeegFS. /// /// # Example /// /// ```ignore /// let _lock = SampleLock::acquire("/data/locks", "34528", "clairs")?; /// // Lock held until _lock is dropped /// ``` #[derive(Debug)] pub struct SampleLock { path: PathBuf, } impl SampleLock { /// Maximum age before a lock is considered stale (24 hours). const STALE_TIMEOUT: Duration = Duration::from_secs(24 * 3600); /// Attempts to acquire an exclusive lock for a sample/pipeline combination. /// /// # Arguments /// /// * `lock_dir` - Base directory for lock files /// * `id` - Sample identifier /// * `pipeline` - Pipeline name (e.g., "clairs", "deepvariant") /// /// # Errors /// /// Returns an error if: /// - Lock directory cannot be created /// - Lock is already held by another process (and not stale) /// - Metadata cannot be written pub fn acquire(lock_dir: &str, id: &str, pipeline: &str) -> anyhow::Result { fs::create_dir_all(lock_dir) .with_context(|| format!("Failed to create lock directory: {lock_dir}"))?; let path = PathBuf::from(format!("{}/{}.{}.lock", lock_dir, id, pipeline)); match fs::create_dir(&path) { Ok(_) => { let lock = Self { path }; lock.write_metadata()?; info!("Acquired lock: {}", lock.path.display()); Ok(lock) } Err(e) if e.kind() == io::ErrorKind::AlreadyExists => { // Check if stale if Self::is_stale(&path) { warn!("Removing stale lock: {}", path.display()); fs::remove_dir_all(&path)?; // Retry once fs::create_dir(&path)?; let lock = Self { path }; lock.write_metadata()?; info!( "Acquired lock (after stale removal): {}", lock.path.display() ); Ok(lock) } else { let holder = Self::read_holder_info(&path); anyhow::bail!( "Sample '{}' is locked for '{}' pipeline.\nLock: {}\n{}", id, pipeline, path.display(), holder ) } } Err(e) => Err(e).with_context(|| format!("Failed to create lock: {}", path.display())), } } /// Writes metadata to the lock directory for debugging and stale detection. fn write_metadata(&self) -> anyhow::Result<()> { let meta_path = self.path.join("meta.txt"); let hostname = hostname::get() .map(|h| h.to_string_lossy().into_owned()) .unwrap_or_else(|_| "unknown".into()); let slurm_job = std::env::var("SLURM_JOB_ID").unwrap_or_default(); let slurm_node = std::env::var("SLURMD_NODENAME").unwrap_or_default(); let content = format!( "pid={}\nhost={}\nslurm_job_id={}\nslurm_node={}\ntime={}\n", std::process::id(), hostname, slurm_job, slurm_node, chrono::Utc::now().to_rfc3339() ); fs::write(&meta_path, content) .with_context(|| format!("Failed to write lock metadata: {}", meta_path.display()))?; Ok(()) } /// Reads holder information for error messages. fn read_holder_info(path: &Path) -> String { let meta = path.join("meta.txt"); fs::read_to_string(&meta).unwrap_or_else(|_| "Holder info unavailable".into()) } /// Checks if a lock is stale (process/job no longer running). fn is_stale(path: &Path) -> bool { let meta = path.join("meta.txt"); if let Ok(content) = fs::read_to_string(&meta) { // Check SLURM job first (most reliable for cluster jobs) if let Some(job_id) = Self::parse_field(&content, "slurm_job_id") { if !job_id.is_empty() { return !Self::slurm_job_exists(job_id); } } // Check local PID (only valid on same host) if let Some(lock_host) = Self::parse_field(&content, "host") { let current_host = hostname::get() .map(|h| h.to_string_lossy().into_owned()) .unwrap_or_default(); if lock_host == current_host { if let Some(pid) = Self::parse_field(&content, "pid").and_then(|p| p.parse::().ok()) { return !Path::new(&format!("/proc/{}", pid)).exists(); } } } } // Fall back to timeout-based staleness if let Ok(meta) = fs::metadata(path) { if let Ok(modified) = meta.modified() { if let Ok(age) = modified.elapsed() { let is_stale = age > Self::STALE_TIMEOUT; if is_stale { debug!( "Lock {} is stale (age: {:?} > {:?})", path.display(), age, Self::STALE_TIMEOUT ); } return is_stale; } } } false } /// Parses a field from the metadata content. fn parse_field<'a>(content: &'a str, field: &str) -> Option<&'a str> { content .lines() .find(|l| l.starts_with(&format!("{}=", field))) .and_then(|l| l.split('=').nth(1)) } /// Checks if a SLURM job is still running. fn slurm_job_exists(job_id: &str) -> bool { Command::new("squeue") .args(["-j", job_id, "-h", "-o", "%i"]) .output() .map(|o| o.status.success() && !o.stdout.is_empty()) .unwrap_or(false) } } impl Drop for SampleLock { fn drop(&mut self) { debug!("Releasing lock: {}", self.path.display()); if let Err(e) = fs::remove_dir_all(&self.path) { warn!("Failed to release lock {}: {}", self.path.display(), e); } } }