runners.rs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. use std::{
  2. fs::{self},
  3. io::{BufRead, BufReader, Write},
  4. path::Path,
  5. process::{Child, Command, Stdio},
  6. sync::mpsc::{self, TryRecvError},
  7. };
  8. use anyhow::Context;
  9. use chrono::{DateTime, Utc};
  10. use log::info;
  11. use serde::{Deserialize, Serialize};
  12. use uuid::Uuid;
  13. use crate::io::writers::{finalize_bgzf_file, get_gz_writer};
  14. /// Trait for running a command.
  15. pub trait Run {
  16. /// Runs the command.
  17. ///
  18. /// This method spawns the child process and sets up threads to capture
  19. /// its stdout and stderr output.
  20. ///
  21. /// # Returns
  22. /// `Ok(())` if the command was successfully started, otherwise an error
  23. fn run(&mut self) -> anyhow::Result<()>;
  24. }
  25. #[derive(Debug, thiserror::Error)]
  26. pub enum RunErr {
  27. #[error("{0} is up-to-date")]
  28. UpToDate(&'static str),
  29. }
  30. /// Trait for waiting on a command to complete.
  31. pub trait Wait {
  32. /// Waits for the command to complete.
  33. ///
  34. /// This method monitors the command's output and waits for the process to exit.
  35. ///
  36. /// # Returns
  37. /// `Ok(())` if the command completed successfully, otherwise an error
  38. fn wait(&mut self) -> anyhow::Result<()>;
  39. }
  40. /// A convenience trait alias that bundles `Run`, `Wait`, and `Log` traits
  41. /// for types that support full command execution lifecycle behavior.
  42. pub trait RunWait: Run + Wait + Log {}
  43. /// Blanket implementation of `RunWait` for any type that implements `Run + Wait + Log`.
  44. impl<T: Run + Wait + Log> RunWait for T {}
  45. /// Executes a `RunWait` task by calling `.run()` then `.wait()`, and generates a `RunReport`.
  46. ///
  47. /// This utility function captures the wall-clock start and end times of the task
  48. /// and stores its output logs for later inspection or saving.
  49. ///
  50. /// # Arguments
  51. ///
  52. /// * `item` - A mutable reference to any struct implementing the `RunWait` trait
  53. ///
  54. /// # Returns
  55. ///
  56. /// A `RunReport` struct that includes:
  57. /// - start and end timestamps (in UTC)
  58. /// - accumulated stdout/stderr log
  59. ///
  60. /// # Errors
  61. ///
  62. /// Returns an error if either the `.run()` or `.wait()` step fails.
  63. pub fn run_wait<T: RunWait>(item: &mut T) -> anyhow::Result<RunReport> {
  64. let mut run_report = RunReport {
  65. start: Utc::now(),
  66. ..Default::default()
  67. };
  68. item.run()?;
  69. item.wait()?;
  70. run_report.log = item.log();
  71. run_report.end = Utc::now();
  72. Ok(run_report)
  73. }
  74. /// Captures the lifecycle information and log output of a `RunWait` task.
  75. ///
  76. /// This is useful for tracing, debugging, audit logging, or benchmarking pipeline steps.
  77. #[derive(Debug, Default, Serialize, Deserialize)]
  78. pub struct RunReport {
  79. /// Timestamp when the task started
  80. pub start: DateTime<Utc>,
  81. /// Timestamp when the task ended
  82. pub end: DateTime<Utc>,
  83. /// Aggregated log output from the task (stdout and stderr)
  84. pub log: String,
  85. }
  86. impl RunReport {
  87. /// Serialize the RunReport to a JSON string and save it to `file_prefix<uuid>.log.gz`.
  88. ///
  89. /// If the directory part of `file_prefix` does not exist, it will be created automatically.
  90. ///
  91. /// # Arguments
  92. ///
  93. /// * `file_prefix` - Path prefix for the output file (can include directory).
  94. ///
  95. /// # Returns
  96. ///
  97. /// `Ok(())` on success, or a `std::io::Error` if writing fails.
  98. pub fn save_to_file(&self, file_prefix: &str) -> anyhow::Result<()> {
  99. // Serialize to JSON
  100. let json_data = serde_json::to_string_pretty(self)?;
  101. // Generate short UUID suffix
  102. let uuid_suffix = &Uuid::new_v4().to_string()[..5];
  103. let file_path = format!("{}{}.log.gz", file_prefix, uuid_suffix);
  104. // Ensure the parent directory exists
  105. if let Some(parent) = Path::new(&file_path).parent() {
  106. fs::create_dir_all(parent)?;
  107. }
  108. // Create and write BGZF-compressed file
  109. let mut writer = get_gz_writer(&file_path, true)
  110. .with_context(|| format!("Failed to get gz writer for: {file_path}"))?;
  111. writer.write_all(json_data.as_bytes())?;
  112. finalize_bgzf_file(writer, &file_path)?;
  113. Ok(())
  114. }
  115. }
  116. pub trait Log {
  117. /// Returns the accumulated log of the command's execution.
  118. ///
  119. /// # Returns
  120. /// A string containing the command's execution log
  121. fn log(&self) -> String;
  122. }
  123. /// Represents a command to be executed, with streaming log capture,
  124. /// real-time monitoring, and access to its output.
  125. pub struct CommandRun {
  126. /// The binary or command to execute
  127. pub bin: String,
  128. /// The list of arguments to pass to the command
  129. pub args: Vec<String>,
  130. /// The spawned child process, if started
  131. pub child: Option<Child>,
  132. /// Sender side of a channel used to stream stdout/stderr lines
  133. pub tx: mpsc::Sender<(String, String)>,
  134. /// Receiver side of the same channel, used during wait to collect output
  135. pub rx: mpsc::Receiver<(String, String)>,
  136. /// Accumulated log from stdout and stderr, organized by stream
  137. pub log: String,
  138. }
  139. impl CommandRun {
  140. /// Creates a new `CommandRun` instance with the given binary and arguments.
  141. ///
  142. /// # Arguments
  143. ///
  144. /// * `bin` - The binary or shell command to execute
  145. /// * `args` - A slice of arguments to pass to the command
  146. ///
  147. /// # Returns
  148. ///
  149. /// A new `CommandRun` instance ready to be started via `.run()`
  150. pub fn new(bin: &str, args: &[&str]) -> Self {
  151. let (tx, rx) = mpsc::channel();
  152. CommandRun {
  153. bin: bin.to_string(),
  154. args: args.iter().map(|e| e.to_string()).collect(),
  155. child: None,
  156. log: String::default(),
  157. tx,
  158. rx,
  159. }
  160. }
  161. }
  162. impl Run for CommandRun {
  163. /// Starts the command and begins capturing stdout and stderr in separate threads.
  164. ///
  165. /// This method spawns the process and streams its output line-by-line.
  166. /// It does not block until the process exits — that’s handled by `.wait()`.
  167. fn run(&mut self) -> anyhow::Result<()> {
  168. let info = format!("Running command: {} {}", &self.bin, &self.args.join(" "));
  169. info!("{info}");
  170. let mut child = Command::new(&self.bin)
  171. .args(&self.args)
  172. .stdout(Stdio::piped())
  173. .stderr(Stdio::piped())
  174. .spawn()
  175. .map_err(|e| anyhow::anyhow!("Failed to spawn {}: {e}", self.bin))?;
  176. self.log.push_str(&format!("{info}\n"));
  177. let stdout = child.stdout.take().expect("Failed to capture stdout");
  178. let stderr = child.stderr.take().expect("Failed to capture stderr");
  179. let stdout_reader = BufReader::new(stdout);
  180. let stderr_reader = BufReader::new(stderr);
  181. let tx = self.tx.clone();
  182. let bin = self.bin.clone();
  183. // Use scoped threads to avoid leaking threads or missing log lines
  184. std::thread::scope(|s| {
  185. s.spawn(|| {
  186. for line in stdout_reader.lines().map_while(Result::ok) {
  187. info!("[{}:stdout] {}", &bin, line);
  188. tx.send(("stdout".to_string(), line.to_string()))
  189. .expect("Channel send failed");
  190. }
  191. });
  192. s.spawn(|| {
  193. for line in stderr_reader.lines().map_while(Result::ok) {
  194. info!("[{}:stderr] {}", &bin, line);
  195. tx.send(("stderr".to_string(), line.to_string()))
  196. .expect("Channel send failed");
  197. }
  198. });
  199. });
  200. self.child = Some(child);
  201. Ok(())
  202. }
  203. }
  204. impl Wait for CommandRun {
  205. /// Waits for the command to exit, polling and collecting output in real time.
  206. ///
  207. /// This method consumes the receiver channel to stream logs and checks the process
  208. /// status regularly to know when to finish. Captured logs are accumulated in `self.log`.
  209. fn wait(&mut self) -> anyhow::Result<()> {
  210. if let Some(child) = &mut self.child {
  211. loop {
  212. match self.rx.try_recv() {
  213. Ok((stream, line)) => {
  214. self.log
  215. .push_str(&format!("[{}] {}: {}\n", self.bin, stream, line));
  216. }
  217. Err(TryRecvError::Empty) => {
  218. if let Some(status) = child.try_wait()? {
  219. info!("{} exited with status: {}", self.bin, status);
  220. break;
  221. }
  222. std::thread::sleep(std::time::Duration::from_millis(100));
  223. }
  224. Err(TryRecvError::Disconnected) => {
  225. break;
  226. }
  227. }
  228. }
  229. // Ensure child process has exited
  230. let status = child.wait()?;
  231. info!("{} exited with status: {}", self.bin, status);
  232. }
  233. Ok(())
  234. }
  235. }
  236. impl Log for CommandRun {
  237. /// Returns the accumulated log of the command's stdout and stderr streams.
  238. fn log(&self) -> String {
  239. self.log.clone()
  240. }
  241. }