Parcourir la source

limit max sbatch in paral for run_many_sbatch

Thomas il y a 1 semaine
Parent
commit
a6e037983f
1 fichiers modifiés avec 29 ajouts et 18 suppressions
  1. 29 18
      src/commands/mod.rs

+ 29 - 18
src/commands/mod.rs

@@ -158,6 +158,7 @@ pub trait LocalRunner: Command {
 
 use anyhow::Context;
 use log::info;
+use rayon::iter::{IntoParallelIterator, ParallelIterator};
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 
@@ -796,27 +797,37 @@ pub fn split_output(full_output: &str) -> (String, Option<SlurmEpilog>) {
 /// NOTE:
 ///   - Outputs from different jobs may interleave on the terminal.
 ///   - Slurm still decides scheduling order (queue vs run).
-pub fn run_many_sbatch<T>(mut jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
+pub fn run_many_sbatch<T>(jobs: Vec<T>) -> anyhow::Result<Vec<CapturedOutput>>
 where
     T: SbatchRunner + Send + 'static,
 {
-    let mut handles = Vec::with_capacity(jobs.len());
-
-    for mut job in jobs.drain(..) {
-        let handle = thread::spawn(move || -> anyhow::Result<CapturedOutput> {
-            SbatchRunner::exec(&mut job)
-        });
-        handles.push(handle);
-    }
-
-    let mut results = Vec::with_capacity(handles.len());
-    for h in handles {
-        // propagate the first error you hit
-        let res = h.join().expect("thread panicked")?;
-        results.push(res);
-    }
-
-    Ok(results)
+    // let mut handles = Vec::with_capacity(jobs.len());
+    //
+    // for mut job in jobs.drain(..) {
+    //     let handle = thread::spawn(move || -> anyhow::Result<CapturedOutput> {
+    //         SbatchRunner::exec(&mut job)
+    //     });
+    //     handles.push(handle);
+    // }
+    //
+    // let mut results = Vec::with_capacity(handles.len());
+    // for h in handles {
+    //     // propagate the first error you hit
+    //     let res = h.join().expect("thread panicked")?;
+    //     results.push(res);
+    // }
+
+    let max_parallel = 20;
+    let pool = rayon::ThreadPoolBuilder::new()
+        .num_threads(max_parallel)
+        .build()
+        .context("failed to build rayon thread pool")?;
+
+    pool.install(|| {
+        jobs.into_par_iter()
+            .map(|mut job| SbatchRunner::exec(&mut job))
+            .collect::<anyhow::Result<Vec<_>>>()
+    })
 }
 
 /// Local batch runner: execute the command directly on the machine,