use std::{ fs::{self}, io::{BufRead, BufReader, Write}, path::Path, process::{Child, Command, Stdio}, sync::mpsc::{self, TryRecvError}, }; use anyhow::Context; use chrono::{DateTime, Utc}; use log::info; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::io::writers::{finalize_bgzf_file, get_gz_writer}; /// Trait for running a command. pub trait Run { /// Runs the command. /// /// This method spawns the child process and sets up threads to capture /// its stdout and stderr output. /// /// # Returns /// `Ok(())` if the command was successfully started, otherwise an error fn run(&mut self) -> anyhow::Result<()>; } #[derive(Debug, thiserror::Error)] pub enum RunErr { #[error("{0} is up-to-date")] UpToDate(&'static str), } /// Trait for waiting on a command to complete. pub trait Wait { /// Waits for the command to complete. /// /// This method monitors the command's output and waits for the process to exit. /// /// # Returns /// `Ok(())` if the command completed successfully, otherwise an error fn wait(&mut self) -> anyhow::Result<()>; } /// A convenience trait alias that bundles `Run`, `Wait`, and `Log` traits /// for types that support full command execution lifecycle behavior. pub trait RunWait: Run + Wait + Log {} /// Blanket implementation of `RunWait` for any type that implements `Run + Wait + Log`. impl RunWait for T {} /// Executes a `RunWait` task by calling `.run()` then `.wait()`, and generates a `RunReport`. /// /// This utility function captures the wall-clock start and end times of the task /// and stores its output logs for later inspection or saving. /// /// # Arguments /// /// * `item` - A mutable reference to any struct implementing the `RunWait` trait /// /// # Returns /// /// A `RunReport` struct that includes: /// - start and end timestamps (in UTC) /// - accumulated stdout/stderr log /// /// # Errors /// /// Returns an error if either the `.run()` or `.wait()` step fails. pub fn run_wait(item: &mut T) -> anyhow::Result { let mut run_report = RunReport { start: Utc::now(), ..Default::default() }; item.run()?; item.wait()?; run_report.log = item.log(); run_report.end = Utc::now(); Ok(run_report) } /// Captures the lifecycle information and log output of a `RunWait` task. /// /// This is useful for tracing, debugging, audit logging, or benchmarking pipeline steps. #[derive(Debug, Default, Serialize, Deserialize)] pub struct RunReport { /// Timestamp when the task started pub start: DateTime, /// Timestamp when the task ended pub end: DateTime, /// Aggregated log output from the task (stdout and stderr) pub log: String, } impl RunReport { /// Serialize the RunReport to a JSON string and save it to `file_prefix.log.gz`. /// /// If the directory part of `file_prefix` does not exist, it will be created automatically. /// /// # Arguments /// /// * `file_prefix` - Path prefix for the output file (can include directory). /// /// # Returns /// /// `Ok(())` on success, or a `std::io::Error` if writing fails. pub fn save_to_file(&self, file_prefix: &str) -> anyhow::Result<()> { // Serialize to JSON let json_data = serde_json::to_string_pretty(self)?; // Generate short UUID suffix let uuid_suffix = &Uuid::new_v4().to_string()[..5]; let file_path = format!("{}{}.log.gz", file_prefix, uuid_suffix); // Ensure the parent directory exists if let Some(parent) = Path::new(&file_path).parent() { fs::create_dir_all(parent)?; } // Create and write BGZF-compressed file let mut writer = get_gz_writer(&file_path, true) .with_context(|| format!("Failed to get gz writer for: {file_path}"))?; writer.write_all(json_data.as_bytes())?; finalize_bgzf_file(writer, &file_path)?; Ok(()) } } pub trait Log { /// Returns the accumulated log of the command's execution. /// /// # Returns /// A string containing the command's execution log fn log(&self) -> String; } /// Represents a command to be executed, with streaming log capture, /// real-time monitoring, and access to its output. pub struct CommandRun { /// The binary or command to execute pub bin: String, /// The list of arguments to pass to the command pub args: Vec, /// The spawned child process, if started pub child: Option, /// Sender side of a channel used to stream stdout/stderr lines pub tx: mpsc::Sender<(String, String)>, /// Receiver side of the same channel, used during wait to collect output pub rx: mpsc::Receiver<(String, String)>, /// Accumulated log from stdout and stderr, organized by stream pub log: String, } impl CommandRun { /// Creates a new `CommandRun` instance with the given binary and arguments. /// /// # Arguments /// /// * `bin` - The binary or shell command to execute /// * `args` - A slice of arguments to pass to the command /// /// # Returns /// /// A new `CommandRun` instance ready to be started via `.run()` pub fn new(bin: &str, args: &[&str]) -> Self { let (tx, rx) = mpsc::channel(); CommandRun { bin: bin.to_string(), args: args.iter().map(|e| e.to_string()).collect(), child: None, log: String::default(), tx, rx, } } } impl Run for CommandRun { /// Starts the command and begins capturing stdout and stderr in separate threads. /// /// This method spawns the process and streams its output line-by-line. /// It does not block until the process exits — that’s handled by `.wait()`. fn run(&mut self) -> anyhow::Result<()> { let info = format!("Running command: {} {}", &self.bin, &self.args.join(" ")); info!("{info}"); let mut child = Command::new(&self.bin) .args(&self.args) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .map_err(|e| anyhow::anyhow!("Failed to spawn {}: {e}", self.bin))?; self.log.push_str(&format!("{info}\n")); let stdout = child.stdout.take().expect("Failed to capture stdout"); let stderr = child.stderr.take().expect("Failed to capture stderr"); let stdout_reader = BufReader::new(stdout); let stderr_reader = BufReader::new(stderr); let tx = self.tx.clone(); let bin = self.bin.clone(); // Use scoped threads to avoid leaking threads or missing log lines std::thread::scope(|s| { s.spawn(|| { for line in stdout_reader.lines().map_while(Result::ok) { info!("[{}:stdout] {}", &bin, line); tx.send(("stdout".to_string(), line.to_string())) .expect("Channel send failed"); } }); s.spawn(|| { for line in stderr_reader.lines().map_while(Result::ok) { info!("[{}:stderr] {}", &bin, line); tx.send(("stderr".to_string(), line.to_string())) .expect("Channel send failed"); } }); }); self.child = Some(child); Ok(()) } } impl Wait for CommandRun { /// Waits for the command to exit, polling and collecting output in real time. /// /// This method consumes the receiver channel to stream logs and checks the process /// status regularly to know when to finish. Captured logs are accumulated in `self.log`. fn wait(&mut self) -> anyhow::Result<()> { if let Some(child) = &mut self.child { loop { match self.rx.try_recv() { Ok((stream, line)) => { self.log .push_str(&format!("[{}] {}: {}\n", self.bin, stream, line)); } Err(TryRecvError::Empty) => { if let Some(status) = child.try_wait()? { info!("{} exited with status: {}", self.bin, status); break; } std::thread::sleep(std::time::Duration::from_millis(100)); } Err(TryRecvError::Disconnected) => { break; } } } // Ensure child process has exited let status = child.wait()?; info!("{} exited with status: {}", self.bin, status); } Ok(()) } } impl Log for CommandRun { /// Returns the accumulated log of the command's stdout and stderr streams. fn log(&self) -> String { self.log.clone() } }