| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105 |
- pub mod bcftools;
- pub mod dorado;
- // pub mod longphase;
- pub mod longphase;
- pub mod modkit;
- pub mod samtools;
- // pub mod wakhan;
- use std::{
- fmt::{self, Display, Formatter},
- fs::{self, File},
- io::{self, BufReader, Read, Seek, SeekFrom, Write},
- path::{Path, PathBuf},
- process::{Command as ProcessCommand, Stdio},
- sync::{mpsc, Arc, Mutex},
- thread,
- time::Duration,
- };
- /// A helper trait for commands that should be run through Slurm.
- ///
- /// Types implementing `SlurmRunner` must also implement [`Command`].
- /// The provided [`run`](SlurmRunner::run) method:
- ///
- /// 1. Calls [`Command::init`].
- /// 2. Runs `srun <slurm_args...> bash -lc "<cmd()>”`.
- /// 3. On success, calls [`Command::clean_up`].
- pub trait Command {
- fn init(&mut self) -> anyhow::Result<()> {
- Ok(())
- }
- fn cmd(&self) -> String;
- fn clean_up(&self) -> anyhow::Result<()> {
- Ok(())
- }
- }
- /// Local runner: execute the command directly on the machine.
- ///
- /// Usage:
- /// ```ignore
- /// my_command.run()?;
- /// ```
- pub trait LocalRunner: Command {
- /// The shell binary used to run commands.
- /// Override if you want to use `sh` or something else.
- fn shell(&self) -> &str {
- "bash"
- }
- /// Run locally with:
- /// <shell> -c "<cmd()>"
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let mut cmd = ProcessCommand::new(self.shell());
- cmd.arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped());
- info!(
- "Running: {} {}",
- cmd.get_program().to_string_lossy(),
- cmd.get_args()
- .map(|arg| arg.to_string_lossy())
- .collect::<Vec<_>>()
- .join(" ")
- );
- let mut child = cmd.spawn().context("failed to spawn local command")?;
- let stdout = child.stdout.take().context("failed to capture stdout")?;
- let stderr = child.stderr.take().context("failed to capture stderr")?;
- let stderr_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stderr);
- let mut buf = Vec::new();
- let mut line = Vec::new();
- let mut byte = [0u8; 1];
- loop {
- match reader.read(&mut byte) {
- Ok(0) => {
- if !line.is_empty() {
- let text = String::from_utf8_lossy(&line);
- eprint!("{}", text);
- buf.extend_from_slice(&line);
- }
- break;
- }
- Ok(_) => {
- line.push(byte[0]);
- if byte[0] == b'\n' {
- let text = String::from_utf8_lossy(&line);
- eprint!("{}", text);
- buf.extend_from_slice(&line);
- line.clear();
- }
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // --- stdout in a separate thread ---
- let stdout_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stdout);
- let mut buf = Vec::new();
- let mut line = Vec::new();
- let mut byte = [0u8; 1];
- loop {
- match reader.read(&mut byte) {
- Ok(0) => {
- if !line.is_empty() {
- let text = String::from_utf8_lossy(&line);
- print!("{}", text);
- buf.extend_from_slice(&line);
- }
- break;
- }
- Ok(_) => {
- line.push(byte[0]);
- if byte[0] == b'\n' {
- let text = String::from_utf8_lossy(&line);
- print!("{}", text);
- buf.extend_from_slice(&line);
- line.clear();
- }
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // wait for process
- let status = child.wait().context("failed to wait on local command")?;
- // wait for stderr thread and collect stderr
- let captured_stdout = stdout_handle.join().unwrap_or_default();
- let captured_stderr = stderr_handle.join().unwrap_or_default();
- if !status.success() {
- anyhow::bail!("local command failed with status: {status}");
- }
- self.clean_up()?;
- Ok(CapturedOutput {
- stdout: captured_stdout,
- stderr: captured_stderr,
- slurm_epilog: None,
- })
- }
- }
- use anyhow::Context;
- use log::info;
- use rayon::iter::{IntoParallelIterator, ParallelIterator};
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- /// Captured process output while it was being streamed live.
- #[derive(Debug, Default, Serialize, Deserialize)]
- pub struct CapturedOutput {
- pub stdout: String,
- pub stderr: String,
- pub slurm_epilog: Option<SlurmEpilog>,
- }
- impl CapturedOutput {
- /// Constructs a unique filename based on the base `path`, appending
- /// a truncated UUID v4 segment and the ".json" suffix.
- /// Saves the struct as JSON to the constructed path (creates parent dirs if needed).
- pub fn save_to_file(&self, base_path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
- let base_path = base_path.as_ref();
- let binding = Uuid::new_v4().to_string();
- let unique_id = binding.split('-').next().unwrap_or("default");
- let filename = format!(
- "{}_{}.json",
- base_path
- .file_name()
- .unwrap_or(base_path.as_os_str())
- .to_string_lossy(),
- unique_id
- );
- let mut full_path = PathBuf::from(base_path.parent().unwrap_or_else(|| Path::new("")));
- full_path.push(filename);
- if let Some(p) = full_path.parent() {
- fs::create_dir_all(p)
- .with_context(|| format!("Failed to create directory {}", p.display()))?;
- }
- let json = serde_json::to_string_pretty(self)?; // Using pretty for readability, remove _pretty if not needed
- fs::write(&full_path, json)
- .with_context(|| format!("Failed to write file {}", full_path.display()))?;
- Ok(full_path)
- }
- }
- impl Display for CapturedOutput {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- // 1. Write STDOUT
- writeln!(f, "--- STANDARD OUTPUT ---")?;
- if self.stdout.is_empty() {
- writeln!(f, "[No standard output was captured.]")?;
- } else {
- writeln!(f, "{}", self.stdout)?;
- }
- // 2. Write STDERR
- writeln!(f, "\n--- STANDARD ERROR ---")?;
- if self.stderr.is_empty() {
- writeln!(f, "[No standard error was captured.]")?;
- } else {
- writeln!(f, "{}", self.stderr)?;
- }
- // 3. Write SlurmEpilog if present
- if let Some(epilog) = &self.slurm_epilog {
- // Using a separate line break before the epilog for clarity
- writeln!(f, "\n{}", epilog)?;
- }
- Ok(())
- }
- }
- /// Super-trait adding a Slurm `run()` method on top of [`Command`].
- pub trait SlurmRunner: Command {
- /// Slurm launcher binary. Defaults to `srun`.
- ///
- /// Override if you want to use `sbatch` or another wrapper.
- fn slurm_bin(&self) -> &str {
- "srun"
- }
- /// Additional Slurm arguments (e.g. `--time=1:00:00`, `--cpus-per-task=4`).
- ///
- /// Default is no extra arguments.
- fn slurm_args(&self) -> Vec<String> {
- Vec::new()
- }
- /// Run the command through Slurm.
- ///
- /// The effective shell command is:
- ///
- /// ```text
- /// <slurm_bin> <slurm_args...> bash -lc "<cmd()>"
- /// ```
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let mut cmd = ProcessCommand::new(self.slurm_bin());
- cmd.args(self.slurm_args())
- .arg("bash")
- .arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped());
- info!(
- "Running with Slurm: {} {}",
- cmd.get_program().to_string_lossy(),
- cmd.get_args()
- .map(|arg| arg.to_string_lossy())
- .collect::<Vec<_>>()
- .join(" ")
- );
- let mut child = cmd.spawn().context("failed to spawn slurm job")?;
- let stdout = child.stdout.take().context("failed to capture stdout")?;
- let stderr = child.stderr.take().context("failed to capture stderr")?;
- let stderr_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stderr);
- let mut buf = Vec::new();
- let mut chunk = [0u8; 8192];
- let mut out = io::stderr();
- loop {
- match reader.read(&mut chunk) {
- Ok(0) => break, // EOF
- Ok(n) => {
- // keep a copy
- buf.extend_from_slice(&chunk[..n]);
- // forward to our stderr (preserves colors, \r, etc.)
- let _ = out.write_all(&chunk[..n]);
- let _ = out.flush();
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // --- stdout in a separate thread ---
- let stdout_handle = thread::spawn(move || {
- let mut reader = BufReader::new(stdout);
- let mut buf = Vec::new();
- let mut chunk = [0u8; 8192];
- let mut out = io::stdout();
- loop {
- match reader.read(&mut chunk) {
- Ok(0) => break, // EOF
- Ok(n) => {
- // capture
- buf.extend_from_slice(&chunk[..n]);
- // forward
- let _ = out.write_all(&chunk[..n]);
- let _ = out.flush();
- }
- Err(_) => break,
- }
- }
- String::from_utf8_lossy(&buf).to_string()
- });
- // wait for process
- let status = child.wait().context("failed to wait on local command")?;
- // wait for both threads and collect output
- let stdout = stdout_handle.join().unwrap_or_default();
- let stderr = stderr_handle.join().unwrap_or_default();
- if !status.success() {
- anyhow::bail!("slurm job failed with status: {status}");
- }
- self.clean_up()?;
- Ok(CapturedOutput {
- stdout,
- stderr,
- slurm_epilog: None,
- })
- }
- }
- /// Holds optional Slurm parameters that can be converted to CLI args.
- ///
- /// Example:
- /// ```rust
- /// let params = SlurmParams {
- /// job_name: Some("dorado_basecall".into()),
- /// cpus_per_task: Some(47),
- /// mem: Some("60G".into()),
- /// partition: Some("gpgpuq".into()),
- /// gres: Some("gpu:h100:4".into()),
- /// };
- ///
- /// let args = params.to_args();
- /// ```
- ///
- /// Produces:
- /// ```text
- /// --job-name=dorado_basecall
- /// --cpus-per-task=47
- /// --mem=60G
- /// --partition=gpgpuq
- /// --gres=gpu:h100:4
- /// ```
- #[derive(Debug, Clone, Default)]
- pub struct SlurmParams {
- pub job_name: Option<String>,
- pub cpus_per_task: Option<u32>,
- pub mem: Option<String>,
- pub partition: Option<String>,
- pub gres: Option<String>,
- }
- impl SlurmParams {
- /// Convert all non-empty fields into Slurm CLI arguments.
- pub fn to_args(&self) -> Vec<String> {
- let mut args = Vec::new();
- if let Some(v) = &self.job_name {
- args.push(format!("--job-name={v}"));
- }
- if let Some(v) = self.cpus_per_task {
- args.push(format!("--cpus-per-task={v}"));
- }
- if let Some(v) = &self.mem {
- args.push(format!("--mem={v}"));
- }
- if let Some(v) = &self.partition {
- args.push(format!("--partition={v}"));
- }
- if let Some(v) = &self.gres {
- args.push(format!("--gres={v}"));
- }
- args
- }
- }
- pub trait SbatchRunner: Command {
- fn sbatch_bin(&self) -> &str {
- "sbatch"
- }
- fn squeue_bin(&self) -> &str {
- "squeue"
- }
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams::default()
- }
- fn sbatch_extra_args(&self) -> Vec<String> {
- Vec::new()
- }
- /// Submit via sbatch, tail slurm-<jobid>.out "live", wait for job to finish,
- /// then return captured stdout.
- fn exec(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- // 1. sbatch --parsable --output=slurm-%j.out --wrap "<cmd>"
- let mut args = self.slurm_params().to_args();
- args.extend(self.sbatch_extra_args());
- args.push("--parsable".to_string());
- args.push("--output=slurm-%j.out".to_string());
- args.push("--wrap".to_string());
- args.push(self.cmd());
- let output = ProcessCommand::new(self.sbatch_bin())
- .args(&args)
- .output()
- .context("failed to spawn sbatch")?;
- let sbatch_stdout = String::from_utf8_lossy(&output.stdout).to_string();
- let sbatch_stderr = String::from_utf8_lossy(&output.stderr).to_string();
- if !output.status.success() {
- anyhow::bail!(
- "sbatch failed with status: {}\nstdout:\n{}\nstderr:\n{}",
- output.status,
- sbatch_stdout,
- sbatch_stderr
- );
- }
- // --parsable usually returns "<jobid>" or "<jobid>;..."
- let job_id = sbatch_stdout
- .trim()
- .split(';')
- .next()
- .unwrap_or("")
- .to_string();
- info!("Running jobid: {job_id}: {}", args.join(" "));
- if job_id.is_empty() {
- anyhow::bail!("failed to parse job id from sbatch output: {sbatch_stdout}");
- }
- let out_file = format!("slurm-{job_id}.out");
- // 2. Tail thread: emulate `tail -f slurm-<jobid>.out`
- let (stop_tx, stop_rx) = mpsc::channel::<()>();
- let out_path = out_file.clone();
- let captured_stdout = Arc::new(Mutex::new(String::new()));
- let captured_stdout_clone = Arc::clone(&captured_stdout);
- let job_id_clone = job_id.clone();
- let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
- let mut pos: u64 = 0;
- let mut file_opened = false;
- loop {
- // stop signal?
- if stop_rx.try_recv().is_ok() {
- // Do one final read before stopping
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut remaining = Vec::new();
- if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
- let s = String::from_utf8_lossy(&remaining);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- }
- }
- break;
- }
- match File::open(&out_path) {
- Ok(mut f) => {
- if !file_opened {
- file_opened = true;
- }
- // seek to last position and read new bytes
- if let Err(e) = f.seek(SeekFrom::Start(pos)) {
- anyhow::bail!("failed to seek in output file: {}", e);
- }
- let mut buf = [0u8; 8192];
- loop {
- match f.read(&mut buf) {
- Ok(0) => break, // EOF for now
- Ok(n) => {
- pos += n as u64;
- let bytes = &buf[..n];
- // forward to terminal
- let s = String::from_utf8_lossy(bytes);
- let cleaned = clean_output(&s);
- // Log non-progress lines
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- io::stdout().flush().ok();
- // capture
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
- Err(e) => {
- anyhow::bail!("error reading output file: {}", e);
- }
- }
- }
- }
- Err(e) if e.kind() == io::ErrorKind::NotFound => {
- // file not yet created; this is expected initially
- }
- Err(e) => {
- // Only bail if we've already opened the file once
- // (unexpected disappearance or permission issues)
- if file_opened {
- anyhow::bail!("error opening output file: {}", e);
- }
- }
- }
- thread::sleep(Duration::from_millis(500));
- }
- Ok(())
- });
- // 3. Poll squeue until job is no longer in the queue
- loop {
- let sq_out = ProcessCommand::new(self.squeue_bin())
- .args(["-j", &job_id])
- .output()
- .context("failed to run squeue")?;
- let out_str = String::from_utf8_lossy(&sq_out.stdout);
- // squeue prints a header line + job line if job exists
- // If job is gone, usually only the header OR nothing
- let has_job = out_str.lines().skip(1).any(|l| !l.trim().is_empty());
- if !has_job {
- break;
- }
- thread::sleep(Duration::from_secs(1));
- }
- // Wait for output file to stop growing (SLURM epilog may still be writing)
- let mut stable_count = 0;
- let mut last_size = 0u64;
- while stable_count < 2 {
- thread::sleep(Duration::from_millis(300));
- if let Ok(metadata) = std::fs::metadata(&out_file) {
- let current_size = metadata.len();
- if current_size == last_size {
- stable_count += 1;
- } else {
- stable_count = 0;
- last_size = current_size;
- }
- } else {
- // File doesn't exist yet, keep waiting
- stable_count = 0;
- }
- }
- // 4. Stop tail thread and join
- let _ = stop_tx.send(());
- match tail_handle.join() {
- Ok(Ok(())) => {}
- Ok(Err(e)) => return Err(e.context("tail thread failed")),
- Err(_) => anyhow::bail!("tail thread panicked"),
- }
- self.clean_up()?;
- let stdout = match Arc::try_unwrap(captured_stdout) {
- Ok(mutex) => mutex.into_inner().unwrap_or_default(),
- Err(arc) => arc.lock().unwrap().clone(),
- };
- // Remove the SLURM output file
- let _ = std::fs::remove_file(&out_file);
- let (stdout, slurm_epilog) = split_output(&stdout);
- Ok(CapturedOutput {
- stdout,
- stderr: String::new(), // all job output is in the .out file
- slurm_epilog,
- })
- }
- }
- #[macro_export]
- macro_rules! run {
- ($cfg:expr, $cmd:expr) => {{
- if $cfg.slurm_runner {
- $crate::commands::SbatchRunner::exec($cmd)
- } else {
- $crate::commands::LocalRunner::exec($cmd)
- }
- }};
- }
- /// Clean output by handling carriage returns properly.
- ///
- /// When a line contains \r (carriage return), only the text after the last \r
- /// is kept, simulating terminal behavior where \r moves cursor to start of line.
- fn clean_output(s: &str) -> String {
- s.split('\n')
- .map(|line| {
- // For lines with \r, keep only the last segment (what's visible after all \r overwrites)
- line.split('\r').next_back().unwrap_or("")
- })
- .collect::<Vec<_>>()
- .join("\n")
- }
- /// Check if a line is a progress indicator that should be filtered from logs.
- fn is_progress_line(line: &str) -> bool {
- line.starts_with("> Output records written:")
- || line.starts_with("> Processing")
- || (line.starts_with('>') && line.contains("records"))
- }
- use std::collections::HashMap;
- use crate::config::Config;
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- pub struct SlurmEpilog {
- pub job_id: String,
- pub cluster: String,
- pub user: String,
- pub group: String,
- pub nodelist: String,
- pub cores: u32,
- pub job_started_at: String,
- pub job_ended_at: String,
- pub wall_clock_time: String,
- pub cpu_utilized: String,
- pub cpu_efficiency: String,
- pub memory_utilized: String,
- pub memory_efficiency: String,
- }
- impl SlurmEpilog {
- pub fn parse(output: &str) -> Option<Self> {
- let epilog_start = output.find("SLURM EPILOG")?;
- let epilog_section = &output[epilog_start..];
- let mut fields: HashMap<String, String> = HashMap::new();
- for line in epilog_section.lines() {
- if let Some((key, value)) = Self::parse_line(line) {
- fields.insert(key.to_lowercase(), value);
- }
- }
- let get =
- |keys: &[&str]| -> Option<String> { keys.iter().find_map(|k| fields.get(*k).cloned()) };
- let (user, group) = get(&["user/group"])
- .and_then(|s| {
- let mut parts = s.splitn(2, '/');
- Some((parts.next()?.to_string(), parts.next()?.to_string()))
- })
- .unwrap_or_default();
- Some(SlurmEpilog {
- job_id: get(&["job id", "jobid"]).unwrap_or_default(),
- cluster: get(&["cluster"]).unwrap_or_default(),
- user,
- group,
- nodelist: get(&["nodelist", "node list"]).unwrap_or_default(),
- cores: get(&["cores per node", "cores per task", "cores"])
- .and_then(|v| v.parse().ok())
- .unwrap_or(0),
- job_started_at: get(&["job started at", "start time"]).unwrap_or_default(),
- job_ended_at: get(&["job ended at", "end time"]).unwrap_or_default(),
- wall_clock_time: get(&["job wall-clock time", "wall-clock time", "elapsed"])
- .unwrap_or_default(),
- cpu_utilized: get(&["cpu utilized", "cpu time"]).unwrap_or_default(),
- cpu_efficiency: get(&["cpu efficiency"]).unwrap_or_default(),
- memory_utilized: get(&["memory utilized", "mem utilized"]).unwrap_or_default(),
- memory_efficiency: get(&["memory efficiency", "mem efficiency"]).unwrap_or_default(),
- })
- }
- fn parse_line(line: &str) -> Option<(String, String)> {
- // Strip box-drawing chars and whitespace
- let line = line.trim_matches(|c: char| c == '|' || c == '-' || c.is_whitespace());
- if line.is_empty() || line.contains("SLURM EPILOG") {
- return None;
- }
- let (key, value) = line.split_once(':')?;
- let key = key.trim().to_lowercase();
- let value = value.trim().to_string();
- if key.is_empty() || value.is_empty() {
- return None;
- }
- Some((key, value))
- }
- }
- // impl SlurmEpilog {
- // /// Parse SLURM epilog from output string
- // pub fn parse(output: &str) -> Option<Self> {
- // // Find the epilog section
- // let epilog_start = output.find("SLURM EPILOG")?;
- // let epilog_section = &output[epilog_start..];
- //
- // // Parse key-value pairs
- // let mut fields: HashMap<String, String> = HashMap::new();
- //
- // for line in epilog_section.lines() {
- // if let Some((key, value)) = Self::parse_line(line) {
- // fields.insert(key, value);
- // }
- // }
- //
- // // Extract user/group
- // let (user, group) = fields.get("User/Group").and_then(|s| {
- // let parts: Vec<&str> = s.split('/').collect();
- // if parts.len() == 2 {
- // Some((parts[0].to_string(), parts[1].to_string()))
- // } else {
- // None
- // }
- // })?;
- //
- // Some(SlurmEpilog {
- // job_id: fields.get("Job ID")?.clone(),
- // cluster: fields.get("Cluster")?.clone(),
- // user,
- // group,
- // nodelist: fields.get("Nodelist")?.clone(),
- // cores: fields.get("Cores")?.parse().ok()?,
- // job_started_at: fields.get("Job started at")?.clone(),
- // job_ended_at: fields.get("Job ended at")?.clone(),
- // wall_clock_time: fields.get("Job Wall-clock time")?.clone(),
- // cpu_utilized: fields.get("CPU Utilized")?.clone(),
- // cpu_efficiency: fields.get("CPU Efficiency")?.clone(),
- // memory_utilized: fields.get("Memory Utilized")?.clone(),
- // memory_efficiency: fields.get("Memory Efficiency")?.clone(),
- // })
- // }
- //
- // fn parse_line(line: &str) -> Option<(String, String)> {
- // // Skip lines that are just decorations or headers
- // if line.contains("---") || line.contains("SLURM EPILOG") || line.trim().is_empty() {
- // return None;
- // }
- //
- // // Look for "Key: Value" pattern
- // let parts: Vec<&str> = line.splitn(2, ':').collect();
- // if parts.len() == 2 {
- // let key = parts[0].trim().to_string();
- // let value = parts[1].trim().to_string();
- // if !key.is_empty() && !value.is_empty() {
- // return Some((key, value));
- // }
- // }
- //
- // None
- // }
- // }
- impl Display for SlurmEpilog {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- writeln!(f, "--- SLURM JOB EPILOG (ID: {}) ---", self.job_id)?;
- writeln!(f, " Cluster: {}", self.cluster)?;
- writeln!(f, " User: {} (Group: {})", self.user, self.group)?;
- writeln!(f, " Nodelist: {}", self.nodelist)?;
- writeln!(f, " Cores Used: {}", self.cores)?;
- writeln!(f)?;
- writeln!(f, " --- Timestamps ---")?;
- writeln!(f, " Started At: {}", self.job_started_at)?;
- writeln!(f, " Ended At: {}", self.job_ended_at)?;
- writeln!(f, " Wall Time: {}", self.wall_clock_time)?;
- writeln!(f)?;
- writeln!(f, " --- Resource Utilization ---")?;
- writeln!(f, " CPU Utilized: {}", self.cpu_utilized)?;
- writeln!(f, " CPU Efficiency: {}", self.cpu_efficiency)?;
- writeln!(f, " Memory Utilized: {}", self.memory_utilized)?;
- writeln!(f, " Memory Efficiency:{}", self.memory_efficiency)?;
- writeln!(f, "------------------------------------")
- }
- }
- /// Split output into job output and epilog
- pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
- if let Some(epilog_start) = full_output.find("----------------------------------------------") {
- // Check if this is actually the epilog header
- if full_output[epilog_start..].contains("SLURM EPILOG") {
- let job_output = full_output[..epilog_start].trim_end().to_string();
- let epilog = SlurmEpilog::parse(full_output);
- return (job_output, epilog);
- }
- }
- // No epilog found, return full output
- (full_output.to_string(), None)
- }
- /// Run multiple SbatchRunner jobs in parallel.
- ///
- /// Each job:
- /// - calls `init()`
- /// - runs `sbatch --wait ... --wrap "<cmd>"`
- /// - streams its output to this process' stdout/stderr
- /// - returns `CapturedOutput`
- ///
- /// NOTE:
- /// - Outputs from different jobs may interleave on the terminal.
- /// - Slurm still decides scheduling order (queue vs run).
- pub fn run_many_sbatch<T>(jobs: Vec<T>, config: &Config) -> anyhow::Result<Vec<CapturedOutput>>
- where
- T: SbatchRunner + Send + 'static,
- {
- // let mut handles = Vec::with_capacity(jobs.len());
- //
- // for mut job in jobs.drain(..) {
- // let handle = thread::spawn(move || -> anyhow::Result<CapturedOutput> {
- // SbatchRunner::exec(&mut job)
- // });
- // handles.push(handle);
- // }
- //
- // let mut results = Vec::with_capacity(handles.len());
- // for h in handles {
- // // propagate the first error you hit
- // let res = h.join().expect("thread panicked")?;
- // results.push(res);
- // }
- // Rule for the IGR cluster because the admins dont want to implement proper rules so we are
- let max_parallel = config.slurm_max_par;
- let pool = rayon::ThreadPoolBuilder::new()
- .num_threads(max_parallel.into())
- .build()
- .context("failed to build rayon thread pool")?;
- pool.install(|| {
- jobs.into_par_iter()
- .map(|mut job| SbatchRunner::exec(&mut job))
- .collect::<anyhow::Result<Vec<_>>>()
- })
- }
- /// Local batch runner: execute the command directly on the machine,
- /// mimicking SbatchRunner behavior (output to file, async monitoring).
- pub trait LocalBatchRunner: Command {
- fn shell(&self) -> &str {
- "bash"
- }
- /// Output file pattern. Use `{}` as placeholder for job ID (UUID).
- fn output_pattern(&self) -> &str {
- "local-{}.out"
- }
- fn run(&mut self) -> anyhow::Result<CapturedOutput> {
- self.init()?;
- let job_id = Uuid::new_v4()
- .to_string()
- .split('-')
- .next()
- .unwrap_or("local")
- .to_string();
- let out_file = self.output_pattern().replace("{}", &job_id);
- info!("Running local batch job: {job_id}");
- let out_file_handle = File::create(&out_file)
- .with_context(|| format!("failed to create output file: {out_file}"))?;
- let out_file_write = out_file_handle
- .try_clone()
- .context("failed to clone file handle for stderr")?;
- let mut child = ProcessCommand::new(self.shell())
- .arg("-c")
- .arg(self.cmd())
- .stdout(Stdio::from(out_file_handle))
- .stderr(Stdio::from(out_file_write))
- .spawn()
- .context("failed to spawn local batch command")?;
- let (stop_tx, stop_rx) = mpsc::channel::<()>();
- let out_path = out_file.clone();
- let captured_stdout = Arc::new(Mutex::new(String::new()));
- let captured_stdout_clone = Arc::clone(&captured_stdout);
- let job_id_clone = job_id.clone();
- let tail_handle = thread::spawn(move || -> anyhow::Result<()> {
- let mut pos: u64 = 0;
- loop {
- if stop_rx.try_recv().is_ok() {
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut remaining = Vec::new();
- if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() {
- let s = String::from_utf8_lossy(&remaining);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- }
- }
- break;
- }
- if let Ok(mut f) = File::open(&out_path) {
- if f.seek(SeekFrom::Start(pos)).is_ok() {
- let mut buf = [0u8; 8192];
- loop {
- match f.read(&mut buf) {
- Ok(0) => break,
- Ok(n) => {
- pos += n as u64;
- let s = String::from_utf8_lossy(&buf[..n]);
- let cleaned = clean_output(&s);
- for line in cleaned.lines() {
- let line = line.trim();
- if !line.is_empty() && !is_progress_line(line) {
- info!("{}: {}", job_id_clone, line);
- }
- }
- if let Ok(mut c) = captured_stdout_clone.lock() {
- c.push_str(&s);
- }
- }
- Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
- Err(_) => break,
- }
- }
- }
- }
- thread::sleep(Duration::from_millis(100));
- }
- Ok(())
- });
- let status = child
- .wait()
- .context("failed to wait on local batch command")?;
- let _ = stop_tx.send(());
- match tail_handle.join() {
- Ok(Ok(())) => {}
- Ok(Err(e)) => return Err(e.context("tail thread failed")),
- Err(_) => anyhow::bail!("tail thread panicked"),
- }
- if !status.success() {
- anyhow::bail!("local batch command failed with status: {status}");
- }
- self.clean_up()?;
- let stdout = match Arc::try_unwrap(captured_stdout) {
- Ok(mutex) => mutex.into_inner().unwrap_or_default(),
- Err(arc) => arc.lock().unwrap_or_else(|e| e.into_inner()).clone(),
- };
- let _ = std::fs::remove_file(&out_file);
- Ok(CapturedOutput {
- stdout,
- stderr: String::new(),
- slurm_epilog: None,
- })
- }
- }
- pub fn run_many_local_batch<T>(jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
- where
- T: LocalBatchRunner + Send,
- {
- // Set thread pool size based on max_concurrent
- // let pool = rayon::ThreadPoolBuilder::new()
- // .num_threads(threads)
- // .build()
- // .context("failed to build thread pool")?;
- // pool.install(|| {
- jobs.into_iter()
- .map(|mut job| LocalBatchRunner::run(&mut job))
- .collect()
- // })
- }
- /// Macro to run multiple batch commands in parallel, either via SLURM or locally.
- ///
- /// Usage:
- /// ```ignore
- /// let results = run_many!(cfg, jobs)?;
- ///
- /// ```
- #[macro_export]
- macro_rules! run_many {
- ($cfg:expr, $jobs:expr) => {{
- if $cfg.slurm_runner {
- $crate::commands::run_many_sbatch($jobs, $cfg)
- } else {
- $crate::commands::run_many_local_batch($jobs)
- }
- }};
- }
- #[cfg(test)]
- mod tests {
- use super::{Command, SbatchRunner, SlurmParams};
- struct TestRun;
- impl Command for TestRun {
- fn cmd(&self) -> String {
- "echo 'hello from sbatch'".to_string()
- }
- }
- impl SbatchRunner for TestRun {
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams {
- job_name: Some("test-sbatch".into()),
- partition: Some("gpgpuq".into()),
- cpus_per_task: Some(1),
- mem: Some("1G".into()),
- gres: None,
- }
- }
- }
- #[test]
- fn sbatch_test_run() -> anyhow::Result<()> {
- let mut t = TestRun;
- let out = SbatchRunner::exec(&mut t)?;
- println!("{out:#?}");
- assert!(out.stdout.contains("hello from sbatch"));
- Ok(())
- }
- }
|