locker.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. //! Sample-level locking for pipeline execution.
  2. //!
  3. //! Uses atomic directory creation for reliable locking on distributed filesystems (BeegFS, NFS, etc.).
  4. use anyhow::Context;
  5. use log::{debug, info, warn};
  6. use std::{
  7. fs, io,
  8. path::{Path, PathBuf},
  9. process::Command,
  10. time::Duration,
  11. };
  12. /// Atomic filesystem lock for preventing concurrent pipeline execution on the same sample.
  13. ///
  14. /// Uses directory creation (atomic on POSIX) rather than file locks, which are unreliable
  15. /// on distributed filesystems like BeegFS.
  16. ///
  17. /// # Example
  18. ///
  19. /// ```ignore
  20. /// let _lock = SampleLock::acquire("/data/locks", "34528", "clairs")?;
  21. /// // Lock held until _lock is dropped
  22. /// ```
  23. #[derive(Debug)]
  24. pub struct SampleLock {
  25. path: PathBuf,
  26. }
  27. impl SampleLock {
  28. /// Maximum age before a lock is considered stale (24 hours).
  29. const STALE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
  30. /// Attempts to acquire an exclusive lock for a sample/pipeline combination.
  31. ///
  32. /// # Arguments
  33. ///
  34. /// * `lock_dir` - Base directory for lock files
  35. /// * `id` - Sample identifier
  36. /// * `pipeline` - Pipeline name (e.g., "clairs", "deepvariant")
  37. ///
  38. /// # Errors
  39. ///
  40. /// Returns an error if:
  41. /// - Lock directory cannot be created
  42. /// - Lock is already held by another process (and not stale)
  43. /// - Metadata cannot be written
  44. pub fn acquire(lock_dir: &str, id: &str, pipeline: &str) -> anyhow::Result<Self> {
  45. fs::create_dir_all(lock_dir)
  46. .with_context(|| format!("Failed to create lock directory: {lock_dir}"))?;
  47. let path = PathBuf::from(format!("{}/{}.{}.lock", lock_dir, id, pipeline));
  48. match fs::create_dir(&path) {
  49. Ok(_) => {
  50. let lock = Self { path };
  51. lock.write_metadata()?;
  52. info!("Acquired lock: {}", lock.path.display());
  53. Ok(lock)
  54. }
  55. Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
  56. // Check if stale
  57. if Self::is_stale(&path) {
  58. warn!("Removing stale lock: {}", path.display());
  59. fs::remove_dir_all(&path)?;
  60. // Retry once
  61. fs::create_dir(&path)?;
  62. let lock = Self { path };
  63. lock.write_metadata()?;
  64. info!(
  65. "Acquired lock (after stale removal): {}",
  66. lock.path.display()
  67. );
  68. Ok(lock)
  69. } else {
  70. let holder = Self::read_holder_info(&path);
  71. anyhow::bail!(
  72. "Sample '{}' is locked for '{}' pipeline.\nLock: {}\n{}",
  73. id,
  74. pipeline,
  75. path.display(),
  76. holder
  77. )
  78. }
  79. }
  80. Err(e) => Err(e).with_context(|| format!("Failed to create lock: {}", path.display())),
  81. }
  82. }
  83. /// Writes metadata to the lock directory for debugging and stale detection.
  84. fn write_metadata(&self) -> anyhow::Result<()> {
  85. let meta_path = self.path.join("meta.txt");
  86. let hostname = hostname::get()
  87. .map(|h| h.to_string_lossy().into_owned())
  88. .unwrap_or_else(|_| "unknown".into());
  89. let slurm_job = std::env::var("SLURM_JOB_ID").unwrap_or_default();
  90. let slurm_node = std::env::var("SLURMD_NODENAME").unwrap_or_default();
  91. let content = format!(
  92. "pid={}\nhost={}\nslurm_job_id={}\nslurm_node={}\ntime={}\n",
  93. std::process::id(),
  94. hostname,
  95. slurm_job,
  96. slurm_node,
  97. chrono::Utc::now().to_rfc3339()
  98. );
  99. fs::write(&meta_path, content)
  100. .with_context(|| format!("Failed to write lock metadata: {}", meta_path.display()))?;
  101. Ok(())
  102. }
  103. /// Reads holder information for error messages.
  104. fn read_holder_info(path: &Path) -> String {
  105. let meta = path.join("meta.txt");
  106. fs::read_to_string(&meta).unwrap_or_else(|_| "Holder info unavailable".into())
  107. }
  108. /// Checks if a lock is stale (process/job no longer running).
  109. fn is_stale(path: &Path) -> bool {
  110. let meta = path.join("meta.txt");
  111. if let Ok(content) = fs::read_to_string(&meta) {
  112. // Check SLURM job first (most reliable for cluster jobs)
  113. if let Some(job_id) = Self::parse_field(&content, "slurm_job_id") {
  114. if !job_id.is_empty() {
  115. return !Self::slurm_job_exists(job_id);
  116. }
  117. }
  118. // Check local PID (only valid on same host)
  119. if let Some(lock_host) = Self::parse_field(&content, "host") {
  120. let current_host = hostname::get()
  121. .map(|h| h.to_string_lossy().into_owned())
  122. .unwrap_or_default();
  123. if lock_host == current_host {
  124. if let Some(pid) =
  125. Self::parse_field(&content, "pid").and_then(|p| p.parse::<u32>().ok())
  126. {
  127. return !Path::new(&format!("/proc/{}", pid)).exists();
  128. }
  129. }
  130. }
  131. }
  132. // Fall back to timeout-based staleness
  133. if let Ok(meta) = fs::metadata(path) {
  134. if let Ok(modified) = meta.modified() {
  135. if let Ok(age) = modified.elapsed() {
  136. let is_stale = age > Self::STALE_TIMEOUT;
  137. if is_stale {
  138. debug!(
  139. "Lock {} is stale (age: {:?} > {:?})",
  140. path.display(),
  141. age,
  142. Self::STALE_TIMEOUT
  143. );
  144. }
  145. return is_stale;
  146. }
  147. }
  148. }
  149. false
  150. }
  151. /// Parses a field from the metadata content.
  152. fn parse_field<'a>(content: &'a str, field: &str) -> Option<&'a str> {
  153. content
  154. .lines()
  155. .find(|l| l.starts_with(&format!("{}=", field)))
  156. .and_then(|l| l.split('=').nth(1))
  157. }
  158. /// Checks if a SLURM job is still running.
  159. fn slurm_job_exists(job_id: &str) -> bool {
  160. Command::new("squeue")
  161. .args(["-j", job_id, "-h", "-o", "%i"])
  162. .output()
  163. .map(|o| o.status.success() && !o.stdout.is_empty())
  164. .unwrap_or(false)
  165. }
  166. }
  167. impl Drop for SampleLock {
  168. fn drop(&mut self) {
  169. debug!("Releasing lock: {}", self.path.display());
  170. if let Err(e) = fs::remove_dir_all(&self.path) {
  171. warn!("Failed to release lock {}: {}", self.path.display(), e);
  172. }
  173. }
  174. }