| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040 |
- pub mod bcftools;
- pub mod dorado;
- // pub mod longphase;
- pub mod modkit;
- pub mod samtools;
- pub mod longphase;
- // pub mod wakhan;
- use std::{
- fmt::{self, Display, Formatter},
- fs::{self, File},
- io::{self, BufReader, Read, Seek, SeekFrom, Write},
- path::{Path, PathBuf},
- process::{Command as ProcessCommand, Stdio},
- sync::{mpsc, Arc, Mutex},
- thread,
- time::Duration,
- };
- /// A helper trait for commands that should be run through Slurm.
- ///
- /// Types implementing `SlurmRunner` must also implement [`Command`].
- /// The provided [`run`](SlurmRunner::run) method:
- ///
- /// 1. Calls [`Command::init`].
- /// 2. Runs `srun <slurm_args...> bash -lc "<cmd()>”`.
- /// 3. On success, calls [`Command::clean_up`].
- pub trait Command {
- fn init(&mut self) -> anyhow::Result<()> {
- Ok(())
- }
- fn cmd(&self) -> String;
- fn clean_up(&self) -> anyhow::Result<()> {
- Ok(())
- }
- }
- /// Local runner: execute the command directly on the machine.
- ///
- /// Usage:
- /// ```ignore
- /// my_command.run()?;
- /// ```
- pub trait LocalRunner: Command {
- /// The shell binary used to run commands.
- /// Override if you want to use `sh` or something else.
- fn shell(&self) -> &str {
- "bash"
- }
- /// Run locally with:
- /// <shell> -c "<cmd()>"
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let mut cmd = ProcessCommand::new(self.shell());
- cmd.arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped());
- info!(
- "Running: {} {}",
- cmd.get_program().to_string_lossy(),
- cmd.get_args()
- .map(|arg| arg.to_string_lossy())
- .collect::<Vec<_>>()
- .join(" ")
- );
- let mut child = cmd.spawn().context("failed to spawn local command")?;
- let stdout = child.stdout.take().context("failed to capture stdout")?;
- let stderr = child.stderr.take().context("failed to capture stderr")?;
- let stderr_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stderr);
- let mut buf = Vec::new();
- let mut line = Vec::new();
- let mut byte = [0u8; 1];
- loop {
- match reader.read(&mut byte) {
- Ok(0) => {
- if !line.is_empty() {
- let text = String::from_utf8_lossy(&line);
- eprint!("{}", text);
- buf.extend_from_slice(&line);
- }
- break;
- }
- Ok(_) => {
- line.push(byte[0]);
- if byte[0] == b'\n' {
- let text = String::from_utf8_lossy(&line);
- eprint!("{}", text);
- buf.extend_from_slice(&line);
- line.clear();
- }
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // --- stdout in a separate thread ---
- let stdout_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stdout);
- let mut buf = Vec::new();
- let mut line = Vec::new();
- let mut byte = [0u8; 1];
- loop {
- match reader.read(&mut byte) {
- Ok(0) => {
- if !line.is_empty() {
- let text = String::from_utf8_lossy(&line);
- print!("{}", text);
- buf.extend_from_slice(&line);
- }
- break;
- }
- Ok(_) => {
- line.push(byte[0]);
- if byte[0] == b'\n' {
- let text = String::from_utf8_lossy(&line);
- print!("{}", text);
- buf.extend_from_slice(&line);
- line.clear();
- }
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // wait for process
- let status = child.wait().context("failed to wait on local command")?;
- // wait for stderr thread and collect stderr
- let captured_stdout = stdout_handle.join().unwrap_or_default();
- let captured_stderr = stderr_handle.join().unwrap_or_default();
- if !status.success() {
- anyhow::bail!("local command failed with status: {status}");
- }
- self.clean_up()?;
- Ok(CapturedOutput {
- stdout: captured_stdout,
- stderr: captured_stderr,
- slurm_epilog: None,
- })
- }
- }
- use anyhow::Context;
- use log::info;
- use rayon::iter::{IntoParallelIterator, ParallelIterator};
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- /// Captured process output while it was being streamed live.
- #[derive(Debug, Default, Serialize, Deserialize)]
- pub struct CapturedOutput {
- pub stdout: String,
- pub stderr: String,
- pub slurm_epilog: Option<SlurmEpilog>,
- }
- impl CapturedOutput {
- /// Constructs a unique filename based on the base `path`, appending
- /// a truncated UUID v4 segment and the ".json" suffix.
- /// Saves the struct as JSON to the constructed path (creates parent dirs if needed).
- pub fn save_to_file(&self, base_path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
- let base_path = base_path.as_ref();
- let binding = Uuid::new_v4().to_string();
- let unique_id = binding.split('-').next().unwrap_or("default");
- let filename = format!(
- "{}_{}.json",
- base_path
- .file_name()
- .unwrap_or(base_path.as_os_str())
- .to_string_lossy(),
- unique_id
- );
- let mut full_path = PathBuf::from(base_path.parent().unwrap_or_else(|| Path::new("")));
- full_path.push(filename);
- if let Some(p) = full_path.parent() {
- fs::create_dir_all(p)
- .with_context(|| format!("Failed to create directory {}", p.display()))?;
- }
- let json = serde_json::to_string_pretty(self)?; // Using pretty for readability, remove _pretty if not needed
- fs::write(&full_path, json)
- .with_context(|| format!("Failed to write file {}", full_path.display()))?;
- Ok(full_path)
- }
- }
- impl Display for CapturedOutput {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- // 1. Write STDOUT
- writeln!(f, "--- STANDARD OUTPUT ---")?;
- if self.stdout.is_empty() {
- writeln!(f, "[No standard output was captured.]")?;
- } else {
- writeln!(f, "{}", self.stdout)?;
- }
- // 2. Write STDERR
- writeln!(f, "\n--- STANDARD ERROR ---")?;
- if self.stderr.is_empty() {
- writeln!(f, "[No standard error was captured.]")?;
- } else {
- writeln!(f, "{}", self.stderr)?;
- }
- // 3. Write SlurmEpilog if present
- if let Some(epilog) = &self.slurm_epilog {
- // Using a separate line break before the epilog for clarity
- writeln!(f, "\n{}", epilog)?;
- }
- Ok(())
- }
- }
- /// Super-trait adding a Slurm `run()` method on top of [`Command`].
- pub trait SlurmRunner: Command {
- /// Slurm launcher binary. Defaults to `srun`.
- ///
- /// Override if you want to use `sbatch` or another wrapper.
- fn slurm_bin(&self) -> &str {
- "srun"
- }
- /// Additional Slurm arguments (e.g. `--time=1:00:00`, `--cpus-per-task=4`).
- ///
- /// Default is no extra arguments.
- fn slurm_args(&self) -> Vec<String> {
- Vec::new()
- }
- /// Run the command through Slurm.
- ///
- /// The effective shell command is:
- ///
- /// ```text
- /// <slurm_bin> <slurm_args...> bash -lc "<cmd()>"
- /// ```
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let mut cmd = ProcessCommand::new(self.slurm_bin());
- cmd.args(self.slurm_args())
- .arg("bash")
- .arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped());
- info!(
- "Running with Slurm: {} {}",
- cmd.get_program().to_string_lossy(),
- cmd.get_args()
- .map(|arg| arg.to_string_lossy())
- .collect::<Vec<_>>()
- .join(" ")
- );
- let mut child = cmd.spawn().context("failed to spawn slurm job")?;
- let stdout = child.stdout.take().context("failed to capture stdout")?;
- let stderr = child.stderr.take().context("failed to capture stderr")?;
- let stderr_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stderr);
- let mut buf = Vec::new();
- let mut chunk = [0u8; 8192];
- let mut out = io::stderr();
- loop {
- match reader.read(&mut chunk) {
- Ok(0) => break, // EOF
- Ok(n) => {
- // keep a copy
- buf.extend_from_slice(&chunk[..n]);
- // forward to our stderr (preserves colors, \r, etc.)
- let _ = out.write_all(&chunk[..n]);
- let _ = out.flush();
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // --- stdout in a separate thread ---
- let stdout_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stdout);
- let mut buf = Vec::new();
- let mut chunk = [0u8; 8192];
- let mut out = io::stdout();
- loop {
- match reader.read(&mut chunk) {
- Ok(0) => break, // EOF
- Ok(n) => {
- // capture
- buf.extend_from_slice(&chunk[..n]);
- // forward
- let _ = out.write_all(&chunk[..n]);
- let _ = out.flush();
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // wait for process
- let status = child.wait().context("failed to wait on local command")?;
- // wait for both threads and collect output
- let stdout = stdout_handle.join().unwrap_or_default();
- let stderr = stderr_handle.join().unwrap_or_default();
- if !status.success() {
- anyhow::bail!("slurm job failed with status: {status}");
- }
- self.clean_up()?;
- Ok(CapturedOutput {
- stdout,
- stderr,
- slurm_epilog: None,
- })
- }
- }
- /// Holds optional Slurm parameters that can be converted to CLI args.
- ///
- /// Example:
- /// ```rust
- /// let params = SlurmParams {
- /// job_name: Some("dorado_basecall".into()),
- /// cpus_per_task: Some(47),
- /// mem: Some("60G".into()),
- /// partition: Some("gpgpuq".into()),
- /// gres: Some("gpu:h100:4".into()),
- /// };
- ///
- /// let args = params.to_args();
- /// ```
- ///
- /// Produces:
- /// ```text
- /// --job-name=dorado_basecall
- /// --cpus-per-task=47
- /// --mem=60G
- /// --partition=gpgpuq
- /// --gres=gpu:h100:4
- /// ```
- #[derive(Debug, Clone, Default)]
- pub struct SlurmParams {
- pub job_name: Option<String>,
- pub cpus_per_task: Option<u32>,
- pub mem: Option<String>,
- pub partition: Option<String>,
- pub gres: Option<String>,
- }
- impl SlurmParams {
- /// Convert all non-empty fields into Slurm CLI arguments.
- pub fn to_args(&self) -> Vec<String> {
- let mut args = Vec::new();
- if let Some(v) = &self.job_name {
- args.push(format!("--job-name={v}"));
- }
- if let Some(v) = self.cpus_per_task {
- args.push(format!("--cpus-per-task={v}"));
- }
- if let Some(v) = &self.mem {
- args.push(format!("--mem={v}"));
- }
- if let Some(v) = &self.partition {
- args.push(format!("--partition={v}"));
- }
- if let Some(v) = &self.gres {
- args.push(format!("--gres={v}"));
- }
- args
- }
- }
- pub trait SbatchRunner: Command {
- fn sbatch_bin(&self) -> &str {
- "sbatch"
- }
- fn squeue_bin(&self) -> &str {
- "squeue"
- }
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams::default()
- }
- fn sbatch_extra_args(&self) -> Vec<String> {
- Vec::new()
- }
- /// Submit via sbatch, tail slurm-<jobid>.out "live", wait for job to finish,
- /// then return captured stdout.
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- // 1. sbatch --parsable --output=slurm-%j.out --wrap "<cmd>"
- let mut args = self.slurm_params().to_args();
- args.extend(self.sbatch_extra_args());
- args.push("--parsable".to_string());
- args.push("--output=slurm-%j.out".to_string());
- args.push("--wrap".to_string());
- args.push(self.cmd());
- let output = ProcessCommand::new(self.sbatch_bin())
- .args(&args)
- .output()
- .context("failed to spawn sbatch")?;
- let sbatch_stdout = String::from_utf8_lossy(&output.stdout).to_string();
- let sbatch_stderr = String::from_utf8_lossy(&output.stderr).to_string();
- if !output.status.success() {
- anyhow::bail!(
- "sbatch failed with status: {}\nstdout:\n{}\nstderr:\n{}",
- output.status,
- sbatch_stdout,
- sbatch_stderr
- );
- }
- // --parsable usually returns "<jobid>" or "<jobid>;..."
- let job_id = sbatch_stdout
- .trim()
- .split(';')
- .next()
- .unwrap_or("")
- .to_string();
- info!("Running jobid: {job_id}: {}", args.join(" "));
- if job_id.is_empty() {
- anyhow::bail!("failed to parse job id from sbatch output: {sbatch_stdout}");
- }
- let out_file = format!("slurm-{job_id}.out");
- // 2. Tail thread: emulate `tail -f slurm-<jobid>.out`
- let (stop_tx, stop_rx) = mpsc::channel::<()>();
- let out_path = out_file.clone();
- let captured_stdout = Arc::new(Mutex::new(String::new()));
- let captured_stdout_clone = Arc::clone(&captured_stdout);
- let job_id_clone = job_id.clone();
- let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
- let mut pos: u64 = 0;
- let mut file_opened = false;
- loop {
- // stop signal?
- if stop_rx.try_recv().is_ok() {
- // Do one final read before stopping
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut remaining = Vec::new();
- if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
- let s = String::from_utf8_lossy(&remaining);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- }
- }
- break;
- }
- match File::open(&out_path) {
- Ok(mut f) => {
- if !file_opened {
- file_opened = true;
- }
- // seek to last position and read new bytes
- if let Err(e) = f.seek(SeekFrom::Start(pos)) {
- anyhow::bail!("failed to seek in output file: {}", e);
- }
- let mut buf = [0u8; 8192];
- loop {
- match f.read(&mut buf) {
- Ok(0) => break, // EOF for now
- Ok(n) => {
- pos += n as u64;
- let bytes = &buf[..n];
- // forward to terminal
- let s = String::from_utf8_lossy(bytes);
- let cleaned = clean_output(&s);
- // Log non-progress lines
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- io::stdout().flush().ok();
- // capture
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
- Err(e) => {
- anyhow::bail!("error reading output file: {}", e);
- }
- }
- }
- }
- Err(e) if e.kind() == io::ErrorKind::NotFound => {
- // file not yet created; this is expected initially
- }
- Err(e) => {
- // Only bail if we've already opened the file once
- // (unexpected disappearance or permission issues)
- if file_opened {
- anyhow::bail!("error opening output file: {}", e);
- }
- }
- }
- thread::sleep(Duration::from_millis(500));
- }
- Ok(())
- });
- // 3. Poll squeue until job is no longer in the queue
- loop {
- let sq_out = ProcessCommand::new(self.squeue_bin())
- .args(["-j", &job_id])
- .output()
- .context("failed to run squeue")?;
- let out_str = String::from_utf8_lossy(&sq_out.stdout);
- // squeue prints a header line + job line if job exists
- // If job is gone, usually only the header OR nothing
- let has_job = out_str.lines().skip(1).any(|l| !l.trim().is_empty());
- if !has_job {
- break;
- }
- thread::sleep(Duration::from_secs(1));
- }
- // Wait for output file to stop growing (SLURM epilog may still be writing)
- let mut stable_count = 0;
- let mut last_size = 0u64;
- while stable_count < 2 {
- thread::sleep(Duration::from_millis(300));
- if let Ok(metadata) = std::fs::metadata(&out_file) {
- let current_size = metadata.len();
- if current_size == last_size {
- stable_count += 1;
- } else {
- stable_count = 0;
- last_size = current_size;
- }
- } else {
- // File doesn't exist yet, keep waiting
- stable_count = 0;
- }
- }
- // 4. Stop tail thread and join
- let _ = stop_tx.send(());
- match tail_handle.join() {
- Ok(Ok(())) => {}
- Ok(Err(e)) => return Err(e.context("tail thread failed")),
- Err(_) => anyhow::bail!("tail thread panicked"),
- }
- self.clean_up()?;
- let stdout = match Arc::try_unwrap(captured_stdout) {
- Ok(mutex) => mutex.into_inner().unwrap_or_default(),
- Err(arc) => arc.lock().unwrap().clone(),
- };
- // Remove the SLURM output file
- let _ = std::fs::remove_file(&out_file);
- let (stdout, slurm_epilog) = split_output(&stdout);
- Ok(CapturedOutput {
- stdout,
- stderr: String::new(), // all job output is in the .out file
- slurm_epilog,
- })
- }
- }
- #[macro_export]
- macro_rules! run {
- ($cfg:expr, $cmd:expr) => {{
- if $cfg.slurm_runner {
- $crate::commands::SbatchRunner::exec($cmd)
- } else {
- $crate::commands::LocalRunner::exec($cmd)
- }
- }};
- }
- /// Clean output by handling carriage returns properly.
- ///
- /// When a line contains \r (carriage return), only the text after the last \r
- /// is kept, simulating terminal behavior where \r moves cursor to start of line.
- fn clean_output(s: &str) -> String {
- s.split('\n')
- .map(|line| {
- // For lines with \r, keep only the last segment (what's visible after all \r overwrites)
- line.split('\r').next_back().unwrap_or("")
- })
- .collect::<Vec<_>>()
- .join("\n")
- }
- /// Check if a line is a progress indicator that should be filtered from logs.
- fn is_progress_line(line: &str) -> bool {
- line.starts_with("> Output records written:")
- || line.starts_with("> Processing")
- || (line.starts_with('>') && line.contains("records"))
- }
- use std::collections::HashMap;
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- pub struct SlurmEpilog {
- pub job_id: String,
- pub cluster: String,
- pub user: String,
- pub group: String,
- pub nodelist: String,
- pub cores: u32,
- pub job_started_at: String,
- pub job_ended_at: String,
- pub wall_clock_time: String,
- pub cpu_utilized: String,
- pub cpu_efficiency: String,
- pub memory_utilized: String,
- pub memory_efficiency: String,
- }
- impl SlurmEpilog {
- /// Parse SLURM epilog from output string
- pub fn parse(output: &str) -> Option<Self> {
- // Find the epilog section
- let epilog_start = output.find("SLURM EPILOG")?;
- let epilog_section = &output[epilog_start..];
- // Parse key-value pairs
- let mut fields: HashMap<String, String> = HashMap::new();
- for line in epilog_section.lines() {
- if let Some((key, value)) = Self::parse_line(line) {
- fields.insert(key, value);
- }
- }
- // Extract user/group
- let (user, group) = fields.get("User/Group").and_then(|s| {
- let parts: Vec<&str> = s.split('/').collect();
- if parts.len() == 2 {
- Some((parts[0].to_string(), parts[1].to_string()))
- } else {
- None
- }
- })?;
- Some(SlurmEpilog {
- job_id: fields.get("Job ID")?.clone(),
- cluster: fields.get("Cluster")?.clone(),
- user,
- group,
- nodelist: fields.get("Nodelist")?.clone(),
- cores: fields.get("Cores")?.parse().ok()?,
- job_started_at: fields.get("Job started at")?.clone(),
- job_ended_at: fields.get("Job ended at")?.clone(),
- wall_clock_time: fields.get("Job Wall-clock time")?.clone(),
- cpu_utilized: fields.get("CPU Utilized")?.clone(),
- cpu_efficiency: fields.get("CPU Efficiency")?.clone(),
- memory_utilized: fields.get("Memory Utilized")?.clone(),
- memory_efficiency: fields.get("Memory Efficiency")?.clone(),
- })
- }
- fn parse_line(line: &str) -> Option<(String, String)> {
- // Skip lines that are just decorations or headers
- if line.contains("---") || line.contains("SLURM EPILOG") || line.trim().is_empty() {
- return None;
- }
- // Look for "Key: Value" pattern
- let parts: Vec<&str> = line.splitn(2, ':').collect();
- if parts.len() == 2 {
- let key = parts[0].trim().to_string();
- let value = parts[1].trim().to_string();
- if !key.is_empty() && !value.is_empty() {
- return Some((key, value));
- }
- }
- None
- }
- }
- impl Display for SlurmEpilog {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- writeln!(f, "--- SLURM JOB EPILOG (ID: {}) ---", self.job_id)?;
- writeln!(f, " Cluster: {}", self.cluster)?;
- writeln!(f, " User: {} (Group: {})", self.user, self.group)?;
- writeln!(f, " Nodelist: {}", self.nodelist)?;
- writeln!(f, " Cores Used: {}", self.cores)?;
- writeln!(f)?;
- writeln!(f, " --- Timestamps ---")?;
- writeln!(f, " Started At: {}", self.job_started_at)?;
- writeln!(f, " Ended At: {}", self.job_ended_at)?;
- writeln!(f, " Wall Time: {}", self.wall_clock_time)?;
- writeln!(f)?;
- writeln!(f, " --- Resource Utilization ---")?;
- writeln!(f, " CPU Utilized: {}", self.cpu_utilized)?;
- writeln!(f, " CPU Efficiency: {}", self.cpu_efficiency)?;
- writeln!(f, " Memory Utilized: {}", self.memory_utilized)?;
- writeln!(f, " Memory Efficiency:{}", self.memory_efficiency)?;
- writeln!(f, "------------------------------------")
- }
- }
- /// Split output into job output and epilog
- pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
- if let Some(epilog_start) = full_output.find("----------------------------------------------") {
- // Check if this is actually the epilog header
- if full_output[epilog_start..].contains("SLURM EPILOG") {
- let job_output = full_output[..epilog_start].trim_end().to_string();
- let epilog = SlurmEpilog::parse(full_output);
- return (job_output, epilog);
- }
- }
- // No epilog found, return full output
- (full_output.to_string(), None)
- }
- /// Run multiple SbatchRunner jobs in parallel.
- ///
- /// Each job:
- /// - calls `init()`
- /// - runs `sbatch --wait ... --wrap "<cmd>"`
- /// - streams its output to this process' stdout/stderr
- /// - returns `CapturedOutput`
- ///
- /// NOTE:
- /// - Outputs from different jobs may interleave on the terminal.
- /// - Slurm still decides scheduling order (queue vs run).
- pub fn run_many_sbatch<T>(jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
- where
- T: SbatchRunner + Send + 'static,
- {
- // let mut handles = Vec::with_capacity(jobs.len());
- //
- // for mut job in jobs.drain(..) {
- // let handle = thread::spawn(move || -> anyhow::Result<CapturedOutput> {
- // SbatchRunner::exec(&mut job)
- // });
- // handles.push(handle);
- // }
- //
- // let mut results = Vec::with_capacity(handles.len());
- // for h in handles {
- // // propagate the first error you hit
- // let res = h.join().expect("thread panicked")?;
- // results.push(res);
- // }
- // Rule for the IGR cluster because the admins dont want to implement propoper rules so we are
- let max_parallel = 22;
- let pool = rayon::ThreadPoolBuilder::new()
- .num_threads(max_parallel)
- .build()
- .context("failed to build rayon thread pool")?;
- pool.install(|| {
- jobs.into_par_iter()
- .map(|mut job| SbatchRunner::exec(&mut job))
- .collect::<anyhow::Result<Vec<_>>>()
- })
- }
- /// Local batch runner: execute the command directly on the machine,
- /// mimicking SbatchRunner behavior (output to file, async monitoring).
- pub trait LocalBatchRunner: Command {
- fn shell(&self) -> &str {
- "bash"
- }
- /// Output file pattern. Use `{}` as placeholder for job ID (UUID).
- fn output_pattern(&self) -> &str {
- "local-{}.out"
- }
- fn run(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let job_id = Uuid::new_v4()
- .to_string()
- .split('-')
- .next()
- .unwrap_or("local")
- .to_string();
- let out_file = self.output_pattern().replace("{}", &job_id);
- info!("Running local batch job: {job_id}");
- let out_file_handle =
- File::create(&out_file).with_context(|| format!("failed to create output file: {out_file}"))?;
- let out_file_write = out_file_handle
- .try_clone()
- .context("failed to clone file handle for stderr")?;
- let mut child = ProcessCommand::new(self.shell())
- .arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::from(out_file_handle))
- .stderr(Stdio::from(out_file_write))
- .spawn()
- .context("failed to spawn local batch command")?;
- let (stop_tx, stop_rx) = mpsc::channel::<()>();
- let out_path = out_file.clone();
- let captured_stdout = Arc::new(Mutex::new(String::new()));
- let captured_stdout_clone = Arc::clone(&captured_stdout);
- let job_id_clone = job_id.clone();
- let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
- let mut pos: u64 = 0;
- loop {
- if stop_rx.try_recv().is_ok() {
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut remaining = Vec::new();
- if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
- let s = String::from_utf8_lossy(&remaining);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- }
- }
- break;
- }
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut buf = [0u8; 8192];
- loop {
- match f.read(&mut buf) {
- Ok(0) => break,
- Ok(n) => {
- pos += n as u64;
- let s = String::from_utf8_lossy(&buf[..n]);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
- Err(_) => break,
- }
- }
- }
- }
- thread::sleep(Duration::from_millis(100));
- }
- Ok(())
- });
- let status = child.wait().context("failed to wait on local batch command")?;
- let _ = stop_tx.send(());
- match tail_handle.join() {
- Ok(Ok(())) => {}
- Ok(Err(e)) => return Err(e.context("tail thread failed")),
- Err(_) => anyhow::bail!("tail thread panicked"),
- }
- if !status.success() {
- anyhow::bail!("local batch command failed with status: {status}");
- }
- self.clean_up()?;
- let stdout = match Arc::try_unwrap(captured_stdout) {
- Ok(mutex) => mutex.into_inner().unwrap_or_default(),
- Err(arc) => arc.lock().unwrap_or_else(|e| e.into_inner()).clone(),
- };
- let _ = std::fs::remove_file(&out_file);
- Ok(CapturedOutput {
- stdout,
- stderr: String::new(),
- slurm_epilog: None,
- })
- }
- }
- pub fn run_many_local_batch<T>(jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
- where
- T: LocalBatchRunner + Send,
- {
- // Set thread pool size based on max_concurrent
- // let pool = rayon::ThreadPoolBuilder::new()
- // .num_threads(threads)
- // .build()
- // .context("failed to build thread pool")?;
- // pool.install(|| {
- jobs.into_iter()
- .map(|mut job| LocalBatchRunner::run(&mut job))
- .collect()
- // })
- }
- /// Macro to run multiple batch commands in parallel, either via SLURM or locally.
- ///
- /// Usage:
- /// ```ignore
- /// let results = run_many!(cfg, jobs)?;
- ///
- /// ```
- #[macro_export]
- macro_rules! run_many {
- ($cfg:expr, $jobs:expr) => {{
- if $cfg.slurm_runner {
- $crate::commands::run_many_sbatch($jobs)
- } else {
- $crate::commands::run_many_local_batch($jobs)
- }
- }}
- }
- #[cfg(test)]
- mod tests {
- use super::{Command, SbatchRunner, SlurmParams};
- struct TestRun;
- impl Command for TestRun {
- fn cmd(&self) -> String {
- "echo 'hello from sbatch'".to_string()
- }
- }
- impl SbatchRunner for TestRun {
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams {
- job_name: Some("test-sbatch".into()),
- partition: Some("gpgpuq".into()),
- cpus_per_task: Some(1),
- mem: Some("1G".into()),
- gres: None,
- }
- }
- }
- #[test]
- fn sbatch_test_run() -> anyhow::Result<()> {
- let mut t = TestRun;
- let out = SbatchRunner::exec(&mut t)?;
- println!("{out:#?}");
- assert!(out.stdout.contains("hello from sbatch"));
- Ok(())
- }
- }
|