Browse Source

init progress

Thomas 1 year ago
parent
commit
a1396b1aaf
2 changed files with 75 additions and 12 deletions
  1. 15 12
      src/lib.rs
  2. 60 0
      src/runn.rs

+ 15 - 12
src/lib.rs

@@ -7,7 +7,7 @@ pub mod utils;
 mod tests {
     use std::time;
 
-    use crate::utils::Run;
+    use crate::{runn::{BasicProgress, Inc}, utils::Run};
     use anyhow::anyhow;
     use env_logger::Env;
     use log::info;
@@ -42,24 +42,26 @@ mod tests {
             value: i32,
         }
 
-        let task_manager: TaskManager<Cramino> = TaskManager::new();
+        let task_manager = TaskManager::<Cramino>::new();
+
+        let mut cramino = Cramino::default()
+            .with_threads(150)
+            .with_result_path(
+                "/data/longreads_basic_pipe/CAMARA/diag/CAMARA_diag_hs1_cramino.txt",
+                )
+            .with_bam("/data/longreads_basic_pipe/CAMARA/diag/CAMARA_diag_hs1.bam").unwrap();
+
+        let mut prog = BasicProgress { length: 1, step: 0 };
 
         let task_id = task_manager
-            .spawn(|sender| async {
-                let mut cramino = Cramino::default()
-                    .with_threads(150)
-                    .with_result_path(
-                        "/data/longreads_basic_pipe/CAMARA/diag/CAMARA_diag_hs1_cramino.txt",
-                    )
-                    .with_bam("/data/longreads_basic_pipe/CAMARA/diag/CAMARA_diag_hs1.bam")?;
+            .spawn_progress(|sender, mut progress: BasicProgress| async move {
                 cramino.run()?;
-
+                progress.inc(1);
                 if let Err(_) = sender.send(cramino) {
                     return Err(anyhow!("the receiver dropped"));
                 }
-
                 Ok(())
-            })
+            }, prog.clone())
             .await;
 
         loop {
@@ -75,6 +77,7 @@ mod tests {
 
         if let Some(r) = task_manager.try_recv(task_id).await {
             println!("{r:?}");
+            println!("{prog:?}");
         }
 
     }

+ 60 - 0
src/runn.rs

@@ -2,6 +2,44 @@ use std::{collections::HashMap, sync::Arc};
 use tokio::{sync::{oneshot::{Receiver, Sender}, Mutex}, task};
 use std::future::Future;
 
+
+#[derive(Clone, Debug)]
+pub struct BasicProgress {
+    pub length: usize,
+    pub step: usize,
+}
+
+pub trait Inc {
+    fn inc(&mut self, delta: usize);
+}
+
+impl Inc for BasicProgress {
+    fn inc(&mut self, delta: usize) {
+        if self.step < self.length {
+            self.step += delta;
+        }
+    }
+}
+
+pub struct IndProgress {
+    inner: indicatif::ProgressBar,
+}
+
+impl Inc for IndProgress {
+    fn inc(&mut self, delta: usize) {
+        self.inner.inc(delta as u64)
+    }
+}
+
+impl IndProgress {
+    pub fn new(len: u64) -> Self {
+        Self {
+            inner: indicatif::ProgressBar::new(len),
+        }
+    }
+}
+
+
 // type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
 
 pub struct TaskManager<R> {
@@ -39,6 +77,28 @@ impl<R> TaskManager<R> {
         task_id
     }
 
+
+    pub async fn spawn_progress<F, T, I>(&self, f: F, progress: I) -> usize
+    where
+        F: FnOnce(Sender<R>, I) -> T + Send + 'static,
+        I: Inc,
+        // T: Future<Output = BoxFuture<Result<(), ()>>> + Send + 'static,
+        T: Future<Output = anyhow::Result<()>> + Send + 'static,
+    {
+        let mut next_id = self.next_id.lock().await;
+        let task_id = *next_id;
+        *next_id += 1;
+
+        let (s, r) = tokio::sync::oneshot::channel::<R>();
+        self.results_channels.lock().await.insert(task_id, r);
+
+        let handle = task::spawn(Box::pin(f(s, progress)));
+
+        self.tasks.lock().await.insert(task_id, handle);
+
+        task_id
+    }
+
     pub async fn try_recv(&self, task_id: usize) -> Option<R> {
         if let Some(r) = self.results_channels.lock().await.get_mut(&task_id) {
             match r.try_recv() {