| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- use std::{
- fs::{self},
- io::{BufRead, BufReader, Write},
- path::Path,
- process::{Child, Command, Stdio},
- sync::mpsc::{self, TryRecvError},
- };
- use anyhow::Context;
- use chrono::{DateTime, Utc};
- use log::info;
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- use crate::io::writers::{finalize_bgzf_file, get_gz_writer};
- /// Trait for running a command.
- pub trait Run {
- /// Runs the command.
- ///
- /// This method spawns the child process and sets up threads to capture
- /// its stdout and stderr output.
- ///
- /// # Returns
- /// `Ok(())` if the command was successfully started, otherwise an error
- fn run(&mut self) -> anyhow::Result<()>;
- }
- #[derive(Debug, thiserror::Error)]
- pub enum RunErr {
- #[error("{0} is up-to-date")]
- UpToDate(&'static str),
- }
- /// Trait for waiting on a command to complete.
- pub trait Wait {
- /// Waits for the command to complete.
- ///
- /// This method monitors the command's output and waits for the process to exit.
- ///
- /// # Returns
- /// `Ok(())` if the command completed successfully, otherwise an error
- fn wait(&mut self) -> anyhow::Result<()>;
- }
- /// A convenience trait alias that bundles `Run`, `Wait`, and `Log` traits
- /// for types that support full command execution lifecycle behavior.
- pub trait RunWait: Run + Wait + Log {}
- /// Blanket implementation of `RunWait` for any type that implements `Run + Wait + Log`.
- impl<T: Run + Wait + Log> RunWait for T {}
- /// Executes a `RunWait` task by calling `.run()` then `.wait()`, and generates a `RunReport`.
- ///
- /// This utility function captures the wall-clock start and end times of the task
- /// and stores its output logs for later inspection or saving.
- ///
- /// # Arguments
- ///
- /// * `item` - A mutable reference to any struct implementing the `RunWait` trait
- ///
- /// # Returns
- ///
- /// A `RunReport` struct that includes:
- /// - start and end timestamps (in UTC)
- /// - accumulated stdout/stderr log
- ///
- /// # Errors
- ///
- /// Returns an error if either the `.run()` or `.wait()` step fails.
- pub fn run_wait<T: RunWait>(item: &mut T) -> anyhow::Result<RunReport> {
- let mut run_report = RunReport {
- start: Utc::now(),
- ..Default::default()
- };
- item.run()?;
- item.wait()?;
- run_report.log = item.log();
- run_report.end = Utc::now();
- Ok(run_report)
- }
- /// Captures the lifecycle information and log output of a `RunWait` task.
- ///
- /// This is useful for tracing, debugging, audit logging, or benchmarking pipeline steps.
- #[derive(Debug, Default, Serialize, Deserialize)]
- pub struct RunReport {
- /// Timestamp when the task started
- pub start: DateTime<Utc>,
- /// Timestamp when the task ended
- pub end: DateTime<Utc>,
- /// Aggregated log output from the task (stdout and stderr)
- pub log: String,
- }
- impl RunReport {
- /// Serialize the RunReport to a JSON string and save it to `file_prefix<uuid>.log.gz`.
- ///
- /// If the directory part of `file_prefix` does not exist, it will be created automatically.
- ///
- /// # Arguments
- ///
- /// * `file_prefix` - Path prefix for the output file (can include directory).
- ///
- /// # Returns
- ///
- /// `Ok(())` on success, or a `std::io::Error` if writing fails.
- pub fn save_to_file(&self, file_prefix: &str) -> anyhow::Result<()> {
- // Serialize to JSON
- let json_data = serde_json::to_string_pretty(self)?;
- // Generate short UUID suffix
- let uuid_suffix = &Uuid::new_v4().to_string()[..5];
- let file_path = format!("{}{}.log.gz", file_prefix, uuid_suffix);
- // Ensure the parent directory exists
- if let Some(parent) = Path::new(&file_path).parent() {
- fs::create_dir_all(parent)?;
- }
- // Create and write BGZF-compressed file
- let mut writer = get_gz_writer(&file_path, true)
- .with_context(|| format!("Failed to get gz writer for: {file_path}"))?;
- writer.write_all(json_data.as_bytes())?;
- finalize_bgzf_file(writer, &file_path)?;
- Ok(())
- }
- }
- pub trait Log {
- /// Returns the accumulated log of the command's execution.
- ///
- /// # Returns
- /// A string containing the command's execution log
- fn log(&self) -> String;
- }
- /// Represents a command to be executed, with streaming log capture,
- /// real-time monitoring, and access to its output.
- pub struct CommandRun {
- /// The binary or command to execute
- pub bin: String,
- /// The list of arguments to pass to the command
- pub args: Vec<String>,
- /// The spawned child process, if started
- pub child: Option<Child>,
- /// Sender side of a channel used to stream stdout/stderr lines
- pub tx: mpsc::Sender<(String, String)>,
- /// Receiver side of the same channel, used during wait to collect output
- pub rx: mpsc::Receiver<(String, String)>,
- /// Accumulated log from stdout and stderr, organized by stream
- pub log: String,
- }
- impl CommandRun {
- /// Creates a new `CommandRun` instance with the given binary and arguments.
- ///
- /// # Arguments
- ///
- /// * `bin` - The binary or shell command to execute
- /// * `args` - A slice of arguments to pass to the command
- ///
- /// # Returns
- ///
- /// A new `CommandRun` instance ready to be started via `.run()`
- pub fn new(bin: &str, args: &[&str]) -> Self {
- let (tx, rx) = mpsc::channel();
- CommandRun {
- bin: bin.to_string(),
- args: args.iter().map(|e| e.to_string()).collect(),
- child: None,
- log: String::default(),
- tx,
- rx,
- }
- }
- }
- impl Run for CommandRun {
- /// Starts the command and begins capturing stdout and stderr in separate threads.
- ///
- /// This method spawns the process and streams its output line-by-line.
- /// It does not block until the process exits — that’s handled by `.wait()`.
- fn run(&mut self) -> anyhow::Result<()> {
- let info = format!("Running command: {} {}", &self.bin, &self.args.join(" "));
- info!("{info}");
- let mut child = Command::new(&self.bin)
- .args(&self.args)
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .spawn()
- .map_err(|e| anyhow::anyhow!("Failed to spawn {}: {e}", self.bin))?;
- self.log.push_str(&format!("{info}\n"));
- let stdout = child.stdout.take().expect("Failed to capture stdout");
- let stderr = child.stderr.take().expect("Failed to capture stderr");
- let stdout_reader = BufReader::new(stdout);
- let stderr_reader = BufReader::new(stderr);
- let tx = self.tx.clone();
- let bin = self.bin.clone();
- // Use scoped threads to avoid leaking threads or missing log lines
- std::thread::scope(|s| {
- s.spawn(|| {
- for line in stdout_reader.lines().map_while(Result::ok) {
- info!("[{}:stdout] {}", &bin, line);
- tx.send(("stdout".to_string(), line.to_string()))
- .expect("Channel send failed");
- }
- });
- s.spawn(|| {
- for line in stderr_reader.lines().map_while(Result::ok) {
- info!("[{}:stderr] {}", &bin, line);
- tx.send(("stderr".to_string(), line.to_string()))
- .expect("Channel send failed");
- }
- });
- });
- self.child = Some(child);
- Ok(())
- }
- }
- impl Wait for CommandRun {
- /// Waits for the command to exit, polling and collecting output in real time.
- ///
- /// This method consumes the receiver channel to stream logs and checks the process
- /// status regularly to know when to finish. Captured logs are accumulated in `self.log`.
- fn wait(&mut self) -> anyhow::Result<()> {
- if let Some(child) = &mut self.child {
- loop {
- match self.rx.try_recv() {
- Ok((stream, line)) => {
- self.log
- .push_str(&format!("[{}] {}: {}\n", self.bin, stream, line));
- }
- Err(TryRecvError::Empty) => {
- if let Some(status) = child.try_wait()? {
- info!("{} exited with status: {}", self.bin, status);
- break;
- }
- std::thread::sleep(std::time::Duration::from_millis(100));
- }
- Err(TryRecvError::Disconnected) => {
- break;
- }
- }
- }
- // Ensure child process has exited
- let status = child.wait()?;
- info!("{} exited with status: {}", self.bin, status);
- }
- Ok(())
- }
- }
- impl Log for CommandRun {
- /// Returns the accumulated log of the command's stdout and stderr streams.
- fn log(&self) -> String {
- self.log.clone()
- }
- }
|