pub mod bcftools; pub mod dorado; // pub mod longphase; pub mod modkit; pub mod samtools; pub mod longphase; // pub mod wakhan; use std::{ fmt::{self, Display, Formatter}, fs::{self, File}, io::{self, BufReader, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, process::{Command as ProcessCommand, Stdio}, sync::{mpsc, Arc, Mutex}, thread, time::Duration, }; /// A helper trait for commands that should be run through Slurm. /// /// Types implementing `SlurmRunner` must also implement [`Command`]. /// The provided [`run`](SlurmRunner::run) method: /// /// 1. Calls [`Command::init`]. /// 2. Runs `srun bash -lc "”`. /// 3. On success, calls [`Command::clean_up`]. pub trait Command { fn init(&mut self) -> anyhow::Result<()> { Ok(()) } fn cmd(&self) -> String; fn clean_up(&self) -> anyhow::Result<()> { Ok(()) } } /// Local runner: execute the command directly on the machine. /// /// Usage: /// ```ignore /// my_command.run()?; /// ``` pub trait LocalRunner: Command { /// The shell binary used to run commands. /// Override if you want to use `sh` or something else. fn shell(&self) -> &str { "bash" } /// Run locally with: /// -c "" fn exec(&mut self) -> anyhow::Result { self.init()?; let mut cmd = ProcessCommand::new(self.shell()); cmd.arg("-c") .arg(self.cmd()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); info!( "Running: {} {}", cmd.get_program().to_string_lossy(), cmd.get_args() .map(|arg| arg.to_string_lossy()) .collect::>() .join(" ") ); let mut child = cmd.spawn().context("failed to spawn local command")?; let stdout = child.stdout.take().context("failed to capture stdout")?; let stderr = child.stderr.take().context("failed to capture stderr")?; let stderr_handle = thread::spawn(move || { let mut reader = BufReader::new(stderr); let mut buf = Vec::new(); let mut line = Vec::new(); let mut byte = [0u8; 1]; loop { match reader.read(&mut byte) { Ok(0) => { if !line.is_empty() { let text = String::from_utf8_lossy(&line); eprint!("{}", text); buf.extend_from_slice(&line); } break; } Ok(_) => { line.push(byte[0]); if byte[0] == b'\n' { let text = String::from_utf8_lossy(&line); eprint!("{}", text); buf.extend_from_slice(&line); line.clear(); } } Err(_) => break, } } String::from_utf8_lossy(&buf).to_string() }); // --- stdout in a separate thread --- let stdout_handle = thread::spawn(move || { let mut reader = BufReader::new(stdout); let mut buf = Vec::new(); let mut line = Vec::new(); let mut byte = [0u8; 1]; loop { match reader.read(&mut byte) { Ok(0) => { if !line.is_empty() { let text = String::from_utf8_lossy(&line); print!("{}", text); buf.extend_from_slice(&line); } break; } Ok(_) => { line.push(byte[0]); if byte[0] == b'\n' { let text = String::from_utf8_lossy(&line); print!("{}", text); buf.extend_from_slice(&line); line.clear(); } } Err(_) => break, } } String::from_utf8_lossy(&buf).to_string() }); // wait for process let status = child.wait().context("failed to wait on local command")?; // wait for stderr thread and collect stderr let captured_stdout = stdout_handle.join().unwrap_or_default(); let captured_stderr = stderr_handle.join().unwrap_or_default(); if !status.success() { anyhow::bail!("local command failed with status: {status}"); } self.clean_up()?; Ok(CapturedOutput { stdout: captured_stdout, stderr: captured_stderr, slurm_epilog: None, }) } } use anyhow::Context; use log::info; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; use uuid::Uuid; /// Captured process output while it was being streamed live. #[derive(Debug, Default, Serialize, Deserialize)] pub struct CapturedOutput { pub stdout: String, pub stderr: String, pub slurm_epilog: Option, } impl CapturedOutput { /// Constructs a unique filename based on the base `path`, appending /// a truncated UUID v4 segment and the ".json" suffix. /// Saves the struct as JSON to the constructed path (creates parent dirs if needed). pub fn save_to_file(&self, base_path: impl AsRef) -> anyhow::Result { let base_path = base_path.as_ref(); let binding = Uuid::new_v4().to_string(); let unique_id = binding.split('-').next().unwrap_or("default"); let filename = format!( "{}_{}.json", base_path .file_name() .unwrap_or(base_path.as_os_str()) .to_string_lossy(), unique_id ); let mut full_path = PathBuf::from(base_path.parent().unwrap_or_else(|| Path::new(""))); full_path.push(filename); if let Some(p) = full_path.parent() { fs::create_dir_all(p) .with_context(|| format!("Failed to create directory {}", p.display()))?; } let json = serde_json::to_string_pretty(self)?; // Using pretty for readability, remove _pretty if not needed fs::write(&full_path, json) .with_context(|| format!("Failed to write file {}", full_path.display()))?; Ok(full_path) } } impl Display for CapturedOutput { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { // 1. Write STDOUT writeln!(f, "--- STANDARD OUTPUT ---")?; if self.stdout.is_empty() { writeln!(f, "[No standard output was captured.]")?; } else { writeln!(f, "{}", self.stdout)?; } // 2. Write STDERR writeln!(f, "\n--- STANDARD ERROR ---")?; if self.stderr.is_empty() { writeln!(f, "[No standard error was captured.]")?; } else { writeln!(f, "{}", self.stderr)?; } // 3. Write SlurmEpilog if present if let Some(epilog) = &self.slurm_epilog { // Using a separate line break before the epilog for clarity writeln!(f, "\n{}", epilog)?; } Ok(()) } } /// Super-trait adding a Slurm `run()` method on top of [`Command`]. pub trait SlurmRunner: Command { /// Slurm launcher binary. Defaults to `srun`. /// /// Override if you want to use `sbatch` or another wrapper. fn slurm_bin(&self) -> &str { "srun" } /// Additional Slurm arguments (e.g. `--time=1:00:00`, `--cpus-per-task=4`). /// /// Default is no extra arguments. fn slurm_args(&self) -> Vec { Vec::new() } /// Run the command through Slurm. /// /// The effective shell command is: /// /// ```text /// bash -lc "" /// ``` fn exec(&mut self) -> anyhow::Result { self.init()?; let mut cmd = ProcessCommand::new(self.slurm_bin()); cmd.args(self.slurm_args()) .arg("bash") .arg("-c") .arg(self.cmd()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); info!( "Running with Slurm: {} {}", cmd.get_program().to_string_lossy(), cmd.get_args() .map(|arg| arg.to_string_lossy()) .collect::>() .join(" ") ); let mut child = cmd.spawn().context("failed to spawn slurm job")?; let stdout = child.stdout.take().context("failed to capture stdout")?; let stderr = child.stderr.take().context("failed to capture stderr")?; let stderr_handle = thread::spawn(move || { let mut reader = BufReader::new(stderr); let mut buf = Vec::new(); let mut chunk = [0u8; 8192]; let mut out = io::stderr(); loop { match reader.read(&mut chunk) { Ok(0) => break, // EOF Ok(n) => { // keep a copy buf.extend_from_slice(&chunk[..n]); // forward to our stderr (preserves colors, \r, etc.) let _ = out.write_all(&chunk[..n]); let _ = out.flush(); } Err(_) => break, } } String::from_utf8_lossy(&buf).to_string() }); // --- stdout in a separate thread --- let stdout_handle = thread::spawn(move || { let mut reader = BufReader::new(stdout); let mut buf = Vec::new(); let mut chunk = [0u8; 8192]; let mut out = io::stdout(); loop { match reader.read(&mut chunk) { Ok(0) => break, // EOF Ok(n) => { // capture buf.extend_from_slice(&chunk[..n]); // forward let _ = out.write_all(&chunk[..n]); let _ = out.flush(); } Err(_) => break, } } String::from_utf8_lossy(&buf).to_string() }); // wait for process let status = child.wait().context("failed to wait on local command")?; // wait for both threads and collect output let stdout = stdout_handle.join().unwrap_or_default(); let stderr = stderr_handle.join().unwrap_or_default(); if !status.success() { anyhow::bail!("slurm job failed with status: {status}"); } self.clean_up()?; Ok(CapturedOutput { stdout, stderr, slurm_epilog: None, }) } } /// Holds optional Slurm parameters that can be converted to CLI args. /// /// Example: /// ```rust /// let params = SlurmParams { /// job_name: Some("dorado_basecall".into()), /// cpus_per_task: Some(47), /// mem: Some("60G".into()), /// partition: Some("gpgpuq".into()), /// gres: Some("gpu:h100:4".into()), /// }; /// /// let args = params.to_args(); /// ``` /// /// Produces: /// ```text /// --job-name=dorado_basecall /// --cpus-per-task=47 /// --mem=60G /// --partition=gpgpuq /// --gres=gpu:h100:4 /// ``` #[derive(Debug, Clone, Default)] pub struct SlurmParams { pub job_name: Option, pub cpus_per_task: Option, pub mem: Option, pub partition: Option, pub gres: Option, } impl SlurmParams { /// Convert all non-empty fields into Slurm CLI arguments. pub fn to_args(&self) -> Vec { let mut args = Vec::new(); if let Some(v) = &self.job_name { args.push(format!("--job-name={v}")); } if let Some(v) = self.cpus_per_task { args.push(format!("--cpus-per-task={v}")); } if let Some(v) = &self.mem { args.push(format!("--mem={v}")); } if let Some(v) = &self.partition { args.push(format!("--partition={v}")); } if let Some(v) = &self.gres { args.push(format!("--gres={v}")); } args } } pub trait SbatchRunner: Command { fn sbatch_bin(&self) -> &str { "sbatch" } fn squeue_bin(&self) -> &str { "squeue" } fn slurm_params(&self) -> SlurmParams { SlurmParams::default() } fn sbatch_extra_args(&self) -> Vec { Vec::new() } /// Submit via sbatch, tail slurm-.out "live", wait for job to finish, /// then return captured stdout. fn exec(&mut self) -> anyhow::Result { self.init()?; // 1. sbatch --parsable --output=slurm-%j.out --wrap "" let mut args = self.slurm_params().to_args(); args.extend(self.sbatch_extra_args()); args.push("--parsable".to_string()); args.push("--output=slurm-%j.out".to_string()); args.push("--wrap".to_string()); args.push(self.cmd()); let output = ProcessCommand::new(self.sbatch_bin()) .args(&args) .output() .context("failed to spawn sbatch")?; let sbatch_stdout = String::from_utf8_lossy(&output.stdout).to_string(); let sbatch_stderr = String::from_utf8_lossy(&output.stderr).to_string(); if !output.status.success() { anyhow::bail!( "sbatch failed with status: {}\nstdout:\n{}\nstderr:\n{}", output.status, sbatch_stdout, sbatch_stderr ); } // --parsable usually returns "" or ";..." let job_id = sbatch_stdout .trim() .split(';') .next() .unwrap_or("") .to_string(); info!("Running jobid: {job_id}: {}", args.join(" ")); if job_id.is_empty() { anyhow::bail!("failed to parse job id from sbatch output: {sbatch_stdout}"); } let out_file = format!("slurm-{job_id}.out"); // 2. Tail thread: emulate `tail -f slurm-.out` let (stop_tx, stop_rx) = mpsc::channel::<()>(); let out_path = out_file.clone(); let captured_stdout = Arc::new(Mutex::new(String::new())); let captured_stdout_clone = Arc::clone(&captured_stdout); let job_id_clone = job_id.clone(); let tail_handle = thread::spawn(move || -> anyhow::Result<()> { let mut pos: u64 = 0; let mut file_opened = false; loop { // stop signal? if stop_rx.try_recv().is_ok() { // Do one final read before stopping if let Ok(mut f) = File::open(&out_path) { if f.seek(SeekFrom::Start(pos)).is_ok() { let mut remaining = Vec::new(); if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() { let s = String::from_utf8_lossy(&remaining); let cleaned = clean_output(&s); for line in cleaned.lines() { let line = line.trim(); if !line.is_empty() && !is_progress_line(line) { info!("{}: {}", job_id_clone, line); } } if let Ok(mut c) = captured_stdout_clone.lock() { c.push_str(&s); } } } } break; } match File::open(&out_path) { Ok(mut f) => { if !file_opened { file_opened = true; } // seek to last position and read new bytes if let Err(e) = f.seek(SeekFrom::Start(pos)) { anyhow::bail!("failed to seek in output file: {}", e); } let mut buf = [0u8; 8192]; loop { match f.read(&mut buf) { Ok(0) => break, // EOF for now Ok(n) => { pos += n as u64; let bytes = &buf[..n]; // forward to terminal let s = String::from_utf8_lossy(bytes); let cleaned = clean_output(&s); // Log non-progress lines for line in cleaned.lines() { let line = line.trim(); if !line.is_empty() && !is_progress_line(line) { info!("{}: {}", job_id_clone, line); } } io::stdout().flush().ok(); // capture if let Ok(mut c) = captured_stdout_clone.lock() { c.push_str(&s); } } Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, Err(e) => { anyhow::bail!("error reading output file: {}", e); } } } } Err(e) if e.kind() == io::ErrorKind::NotFound => { // file not yet created; this is expected initially } Err(e) => { // Only bail if we've already opened the file once // (unexpected disappearance or permission issues) if file_opened { anyhow::bail!("error opening output file: {}", e); } } } thread::sleep(Duration::from_millis(500)); } Ok(()) }); // 3. Poll squeue until job is no longer in the queue loop { let sq_out = ProcessCommand::new(self.squeue_bin()) .args(["-j", &job_id]) .output() .context("failed to run squeue")?; let out_str = String::from_utf8_lossy(&sq_out.stdout); // squeue prints a header line + job line if job exists // If job is gone, usually only the header OR nothing let has_job = out_str.lines().skip(1).any(|l| !l.trim().is_empty()); if !has_job { break; } thread::sleep(Duration::from_secs(1)); } // Wait for output file to stop growing (SLURM epilog may still be writing) let mut stable_count = 0; let mut last_size = 0u64; while stable_count < 2 { thread::sleep(Duration::from_millis(300)); if let Ok(metadata) = std::fs::metadata(&out_file) { let current_size = metadata.len(); if current_size == last_size { stable_count += 1; } else { stable_count = 0; last_size = current_size; } } else { // File doesn't exist yet, keep waiting stable_count = 0; } } // 4. Stop tail thread and join let _ = stop_tx.send(()); match tail_handle.join() { Ok(Ok(())) => {} Ok(Err(e)) => return Err(e.context("tail thread failed")), Err(_) => anyhow::bail!("tail thread panicked"), } self.clean_up()?; let stdout = match Arc::try_unwrap(captured_stdout) { Ok(mutex) => mutex.into_inner().unwrap_or_default(), Err(arc) => arc.lock().unwrap().clone(), }; // Remove the SLURM output file let _ = std::fs::remove_file(&out_file); let (stdout, slurm_epilog) = split_output(&stdout); Ok(CapturedOutput { stdout, stderr: String::new(), // all job output is in the .out file slurm_epilog, }) } } #[macro_export] macro_rules! run { ($cfg:expr, $cmd:expr) => {{ if $cfg.slurm_runner { $crate::commands::SbatchRunner::exec($cmd) } else { $crate::commands::LocalRunner::exec($cmd) } }}; } /// Clean output by handling carriage returns properly. /// /// When a line contains \r (carriage return), only the text after the last \r /// is kept, simulating terminal behavior where \r moves cursor to start of line. fn clean_output(s: &str) -> String { s.split('\n') .map(|line| { // For lines with \r, keep only the last segment (what's visible after all \r overwrites) line.split('\r').next_back().unwrap_or("") }) .collect::>() .join("\n") } /// Check if a line is a progress indicator that should be filtered from logs. fn is_progress_line(line: &str) -> bool { line.starts_with("> Output records written:") || line.starts_with("> Processing") || (line.starts_with('>') && line.contains("records")) } use std::collections::HashMap; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SlurmEpilog { pub job_id: String, pub cluster: String, pub user: String, pub group: String, pub nodelist: String, pub cores: u32, pub job_started_at: String, pub job_ended_at: String, pub wall_clock_time: String, pub cpu_utilized: String, pub cpu_efficiency: String, pub memory_utilized: String, pub memory_efficiency: String, } impl SlurmEpilog { /// Parse SLURM epilog from output string pub fn parse(output: &str) -> Option { // Find the epilog section let epilog_start = output.find("SLURM EPILOG")?; let epilog_section = &output[epilog_start..]; // Parse key-value pairs let mut fields: HashMap = HashMap::new(); for line in epilog_section.lines() { if let Some((key, value)) = Self::parse_line(line) { fields.insert(key, value); } } // Extract user/group let (user, group) = fields.get("User/Group").and_then(|s| { let parts: Vec<&str> = s.split('/').collect(); if parts.len() == 2 { Some((parts[0].to_string(), parts[1].to_string())) } else { None } })?; Some(SlurmEpilog { job_id: fields.get("Job ID")?.clone(), cluster: fields.get("Cluster")?.clone(), user, group, nodelist: fields.get("Nodelist")?.clone(), cores: fields.get("Cores")?.parse().ok()?, job_started_at: fields.get("Job started at")?.clone(), job_ended_at: fields.get("Job ended at")?.clone(), wall_clock_time: fields.get("Job Wall-clock time")?.clone(), cpu_utilized: fields.get("CPU Utilized")?.clone(), cpu_efficiency: fields.get("CPU Efficiency")?.clone(), memory_utilized: fields.get("Memory Utilized")?.clone(), memory_efficiency: fields.get("Memory Efficiency")?.clone(), }) } fn parse_line(line: &str) -> Option<(String, String)> { // Skip lines that are just decorations or headers if line.contains("---") || line.contains("SLURM EPILOG") || line.trim().is_empty() { return None; } // Look for "Key: Value" pattern let parts: Vec<&str> = line.splitn(2, ':').collect(); if parts.len() == 2 { let key = parts[0].trim().to_string(); let value = parts[1].trim().to_string(); if !key.is_empty() && !value.is_empty() { return Some((key, value)); } } None } } impl Display for SlurmEpilog { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { writeln!(f, "--- SLURM JOB EPILOG (ID: {}) ---", self.job_id)?; writeln!(f, " Cluster: {}", self.cluster)?; writeln!(f, " User: {} (Group: {})", self.user, self.group)?; writeln!(f, " Nodelist: {}", self.nodelist)?; writeln!(f, " Cores Used: {}", self.cores)?; writeln!(f)?; writeln!(f, " --- Timestamps ---")?; writeln!(f, " Started At: {}", self.job_started_at)?; writeln!(f, " Ended At: {}", self.job_ended_at)?; writeln!(f, " Wall Time: {}", self.wall_clock_time)?; writeln!(f)?; writeln!(f, " --- Resource Utilization ---")?; writeln!(f, " CPU Utilized: {}", self.cpu_utilized)?; writeln!(f, " CPU Efficiency: {}", self.cpu_efficiency)?; writeln!(f, " Memory Utilized: {}", self.memory_utilized)?; writeln!(f, " Memory Efficiency:{}", self.memory_efficiency)?; writeln!(f, "------------------------------------") } } /// Split output into job output and epilog pub fn split_output(full_output: &str) -> (String, Option) { if let Some(epilog_start) = full_output.find("----------------------------------------------") { // Check if this is actually the epilog header if full_output[epilog_start..].contains("SLURM EPILOG") { let job_output = full_output[..epilog_start].trim_end().to_string(); let epilog = SlurmEpilog::parse(full_output); return (job_output, epilog); } } // No epilog found, return full output (full_output.to_string(), None) } /// Run multiple SbatchRunner jobs in parallel. /// /// Each job: /// - calls `init()` /// - runs `sbatch --wait ... --wrap ""` /// - streams its output to this process' stdout/stderr /// - returns `CapturedOutput` /// /// NOTE: /// - Outputs from different jobs may interleave on the terminal. /// - Slurm still decides scheduling order (queue vs run). pub fn run_many_sbatch(jobs: Vec) -> anyhow::Result> where T: SbatchRunner + Send + 'static, { // let mut handles = Vec::with_capacity(jobs.len()); // // for mut job in jobs.drain(..) { // let handle = thread::spawn(move || -> anyhow::Result { // SbatchRunner::exec(&mut job) // }); // handles.push(handle); // } // // let mut results = Vec::with_capacity(handles.len()); // for h in handles { // // propagate the first error you hit // let res = h.join().expect("thread panicked")?; // results.push(res); // } // Rule for the IGR cluster because the admins dont want to implement propoper rules so we are let max_parallel = 22; let pool = rayon::ThreadPoolBuilder::new() .num_threads(max_parallel) .build() .context("failed to build rayon thread pool")?; pool.install(|| { jobs.into_par_iter() .map(|mut job| SbatchRunner::exec(&mut job)) .collect::>>() }) } /// Local batch runner: execute the command directly on the machine, /// mimicking SbatchRunner behavior (output to file, async monitoring). pub trait LocalBatchRunner: Command { fn shell(&self) -> &str { "bash" } /// Output file pattern. Use `{}` as placeholder for job ID (UUID). fn output_pattern(&self) -> &str { "local-{}.out" } fn run(&mut self) -> anyhow::Result { self.init()?; let job_id = Uuid::new_v4() .to_string() .split('-') .next() .unwrap_or("local") .to_string(); let out_file = self.output_pattern().replace("{}", &job_id); info!("Running local batch job: {job_id}"); let out_file_handle = File::create(&out_file).with_context(|| format!("failed to create output file: {out_file}"))?; let out_file_write = out_file_handle .try_clone() .context("failed to clone file handle for stderr")?; let mut child = ProcessCommand::new(self.shell()) .arg("-c") .arg(self.cmd()) .stdout(Stdio::from(out_file_handle)) .stderr(Stdio::from(out_file_write)) .spawn() .context("failed to spawn local batch command")?; let (stop_tx, stop_rx) = mpsc::channel::<()>(); let out_path = out_file.clone(); let captured_stdout = Arc::new(Mutex::new(String::new())); let captured_stdout_clone = Arc::clone(&captured_stdout); let job_id_clone = job_id.clone(); let tail_handle = thread::spawn(move || -> anyhow::Result<()> { let mut pos: u64 = 0; loop { if stop_rx.try_recv().is_ok() { if let Ok(mut f) = File::open(&out_path) { if f.seek(SeekFrom::Start(pos)).is_ok() { let mut remaining = Vec::new(); if f.read_to_end(&mut remaining).is_ok() && !remaining.is_empty() { let s = String::from_utf8_lossy(&remaining); let cleaned = clean_output(&s); for line in cleaned.lines() { let line = line.trim(); if !line.is_empty() && !is_progress_line(line) { info!("{}: {}", job_id_clone, line); } } if let Ok(mut c) = captured_stdout_clone.lock() { c.push_str(&s); } } } } break; } if let Ok(mut f) = File::open(&out_path) { if f.seek(SeekFrom::Start(pos)).is_ok() { let mut buf = [0u8; 8192]; loop { match f.read(&mut buf) { Ok(0) => break, Ok(n) => { pos += n as u64; let s = String::from_utf8_lossy(&buf[..n]); let cleaned = clean_output(&s); for line in cleaned.lines() { let line = line.trim(); if !line.is_empty() && !is_progress_line(line) { info!("{}: {}", job_id_clone, line); } } if let Ok(mut c) = captured_stdout_clone.lock() { c.push_str(&s); } } Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, Err(_) => break, } } } } thread::sleep(Duration::from_millis(100)); } Ok(()) }); let status = child.wait().context("failed to wait on local batch command")?; let _ = stop_tx.send(()); match tail_handle.join() { Ok(Ok(())) => {} Ok(Err(e)) => return Err(e.context("tail thread failed")), Err(_) => anyhow::bail!("tail thread panicked"), } if !status.success() { anyhow::bail!("local batch command failed with status: {status}"); } self.clean_up()?; let stdout = match Arc::try_unwrap(captured_stdout) { Ok(mutex) => mutex.into_inner().unwrap_or_default(), Err(arc) => arc.lock().unwrap_or_else(|e| e.into_inner()).clone(), }; let _ = std::fs::remove_file(&out_file); Ok(CapturedOutput { stdout, stderr: String::new(), slurm_epilog: None, }) } } pub fn run_many_local_batch(jobs: Vec) -> anyhow::Result> where T: LocalBatchRunner + Send, { // Set thread pool size based on max_concurrent // let pool = rayon::ThreadPoolBuilder::new() // .num_threads(threads) // .build() // .context("failed to build thread pool")?; // pool.install(|| { jobs.into_iter() .map(|mut job| LocalBatchRunner::run(&mut job)) .collect() // }) } /// Macro to run multiple batch commands in parallel, either via SLURM or locally. /// /// Usage: /// ```ignore /// let results = run_many!(cfg, jobs)?; /// /// ``` #[macro_export] macro_rules! run_many { ($cfg:expr, $jobs:expr) => {{ if $cfg.slurm_runner { $crate::commands::run_many_sbatch($jobs) } else { $crate::commands::run_many_local_batch($jobs) } }} } #[cfg(test)] mod tests { use super::{Command, SbatchRunner, SlurmParams}; struct TestRun; impl Command for TestRun { fn cmd(&self) -> String { "echo 'hello from sbatch'".to_string() } } impl SbatchRunner for TestRun { fn slurm_params(&self) -> SlurmParams { SlurmParams { job_name: Some("test-sbatch".into()), partition: Some("gpgpuq".into()), cpus_per_task: Some(1), mem: Some("1G".into()), gres: None, } } } #[test] fn sbatch_test_run() -> anyhow::Result<()> { let mut t = TestRun; let out = SbatchRunner::exec(&mut t)?; println!("{out:#?}"); assert!(out.stdout.contains("hello from sbatch")); Ok(()) } }