| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- //! Sample-level locking for pipeline execution.
- //!
- //! Uses atomic directory creation for reliable locking on distributed filesystems (BeegFS, NFS, etc.).
- use anyhow::Context;
- use log::{debug, info, warn};
- use std::{
- fs, io,
- path::{Path, PathBuf},
- process::Command,
- time::Duration,
- };
- /// Atomic filesystem lock for preventing concurrent pipeline execution on the same sample.
- ///
- /// Uses directory creation (atomic on POSIX) rather than file locks, which are unreliable
- /// on distributed filesystems like BeegFS.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let _lock = SampleLock::acquire("/data/locks", "34528", "clairs")?;
- /// // Lock held until _lock is dropped
- /// ```
- #[derive(Debug)]
- pub struct SampleLock {
- path: PathBuf,
- }
- impl SampleLock {
- /// Maximum age before a lock is considered stale (24 hours).
- const STALE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
- /// Attempts to acquire an exclusive lock for a sample/pipeline combination.
- ///
- /// # Arguments
- ///
- /// * `lock_dir` - Base directory for lock files
- /// * `id` - Sample identifier
- /// * `pipeline` - Pipeline name (e.g., "clairs", "deepvariant")
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - Lock directory cannot be created
- /// - Lock is already held by another process (and not stale)
- /// - Metadata cannot be written
- pub fn acquire(lock_dir: &str, id: &str, pipeline: &str) -> anyhow::Result<Self> {
- fs::create_dir_all(lock_dir)
- .with_context(|| format!("Failed to create lock directory: {lock_dir}"))?;
- let path = PathBuf::from(format!("{}/{}.{}.lock", lock_dir, id, pipeline));
- match fs::create_dir(&path) {
- Ok(_) => {
- let lock = Self { path };
- lock.write_metadata()?;
- info!("Acquired lock: {}", lock.path.display());
- Ok(lock)
- }
- Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
- // Check if stale
- if Self::is_stale(&path) {
- warn!("Removing stale lock: {}", path.display());
- fs::remove_dir_all(&path)?;
- // Retry once
- fs::create_dir(&path)?;
- let lock = Self { path };
- lock.write_metadata()?;
- info!(
- "Acquired lock (after stale removal): {}",
- lock.path.display()
- );
- Ok(lock)
- } else {
- let holder = Self::read_holder_info(&path);
- anyhow::bail!(
- "Sample '{}' is locked for '{}' pipeline.\nLock: {}\n{}",
- id,
- pipeline,
- path.display(),
- holder
- )
- }
- }
- Err(e) => Err(e).with_context(|| format!("Failed to create lock: {}", path.display())),
- }
- }
- /// Writes metadata to the lock directory for debugging and stale detection.
- fn write_metadata(&self) -> anyhow::Result<()> {
- let meta_path = self.path.join("meta.txt");
- let hostname = hostname::get()
- .map(|h| h.to_string_lossy().into_owned())
- .unwrap_or_else(|_| "unknown".into());
- let slurm_job = std::env::var("SLURM_JOB_ID").unwrap_or_default();
- let slurm_node = std::env::var("SLURMD_NODENAME").unwrap_or_default();
- let content = format!(
- "pid={}\nhost={}\nslurm_job_id={}\nslurm_node={}\ntime={}\n",
- std::process::id(),
- hostname,
- slurm_job,
- slurm_node,
- chrono::Utc::now().to_rfc3339()
- );
- fs::write(&meta_path, content)
- .with_context(|| format!("Failed to write lock metadata: {}", meta_path.display()))?;
- Ok(())
- }
- /// Reads holder information for error messages.
- fn read_holder_info(path: &Path) -> String {
- let meta = path.join("meta.txt");
- fs::read_to_string(&meta).unwrap_or_else(|_| "Holder info unavailable".into())
- }
- /// Checks if a lock is stale (process/job no longer running).
- fn is_stale(path: &Path) -> bool {
- let meta = path.join("meta.txt");
- if let Ok(content) = fs::read_to_string(&meta) {
- // Check SLURM job first (most reliable for cluster jobs)
- if let Some(job_id) = Self::parse_field(&content, "slurm_job_id") {
- if !job_id.is_empty() {
- return !Self::slurm_job_exists(job_id);
- }
- }
- // Check local PID (only valid on same host)
- if let Some(lock_host) = Self::parse_field(&content, "host") {
- let current_host = hostname::get()
- .map(|h| h.to_string_lossy().into_owned())
- .unwrap_or_default();
- if lock_host == current_host {
- if let Some(pid) =
- Self::parse_field(&content, "pid").and_then(|p| p.parse::<u32>().ok())
- {
- return !Path::new(&format!("/proc/{}", pid)).exists();
- }
- }
- }
- }
- // Fall back to timeout-based staleness
- if let Ok(meta) = fs::metadata(path) {
- if let Ok(modified) = meta.modified() {
- if let Ok(age) = modified.elapsed() {
- let is_stale = age > Self::STALE_TIMEOUT;
- if is_stale {
- debug!(
- "Lock {} is stale (age: {:?} > {:?})",
- path.display(),
- age,
- Self::STALE_TIMEOUT
- );
- }
- return is_stale;
- }
- }
- }
- false
- }
- /// Parses a field from the metadata content.
- fn parse_field<'a>(content: &'a str, field: &str) -> Option<&'a str> {
- content
- .lines()
- .find(|l| l.starts_with(&format!("{}=", field)))
- .and_then(|l| l.split('=').nth(1))
- }
- /// Checks if a SLURM job is still running.
- fn slurm_job_exists(job_id: &str) -> bool {
- Command::new("squeue")
- .args(["-j", job_id, "-h", "-o", "%i"])
- .output()
- .map(|o| o.status.success() && !o.stdout.is_empty())
- .unwrap_or(false)
- }
- }
- impl Drop for SampleLock {
- fn drop(&mut self) {
- debug!("Releasing lock: {}", self.path.display());
- if let Err(e) = fs::remove_dir_all(&self.path) {
- warn!("Failed to release lock {}: {}", self.path.display(), e);
- }
- }
- }
|