Browse Source

better tasks runner

Thomas 1 year ago
parent
commit
b06901c8a1
1 changed files with 105 additions and 50 deletions
  1. 105 50
      src/collection/mod.rs

+ 105 - 50
src/collection/mod.rs

@@ -10,7 +10,7 @@ use std::{
 use anyhow::Context;
 use chrono::{DateTime, Utc};
 use glob::glob;
-use log::{info, warn};
+use log::{error, info, warn};
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
@@ -161,6 +161,9 @@ impl Collections {
             }
         }
 
+        // de novo
+        tasks.extend(self.todo_assembler()?);
+
         // Remove VCF anterior to BAM
         let vcf_by_id = self.vcf.group_by_id();
         vcf_by_id.iter().for_each(|(id, vcfs)| {
@@ -213,7 +216,7 @@ impl Collections {
                 if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
                     if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
                         && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
-                    && !self.config.id_black_list.contains(&id)
+                        && !self.config.id_black_list.contains(&id)
                     {
                         let caller_time: Vec<(&str, &str)> = vcf_by_id
                             .iter()
@@ -263,18 +266,6 @@ impl Collections {
             }
         });
 
-        // Variants aggregation
-        // info!("Looking for variants aggregation tasks...");
-        // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| {
-        //     let id = bam.id;
-        // });
-
-        // de novo
-        tasks.extend(self.todo_assembler()?);
-
-        // Tasks sorting
-        tasks.sort_by_key(|task| task.get_order());
-
         // Tasks dedup
         let mut hs = HashMap::new();
         tasks.into_iter().for_each(|t| {
@@ -282,6 +273,12 @@ impl Collections {
         });
         self.tasks = hs.into_values().collect();
 
+        // Variants aggregation
+        self.tasks.extend(self.todo_variants_agg()?);
+
+        // Tasks sorting
+        self.tasks.sort_by_cached_key(|task| task.get_order());
+
         Ok(())
     }
 
@@ -433,28 +430,60 @@ impl Collections {
         Ok(tasks)
     }
 
+    /// Runs all tasks in the collection.
+    ///
+    /// This method attempts to execute each task in the collection.
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(())` if the process completes without any critical errors, even if
+    /// individual tasks fail.
+    ///
+    /// # Errors
+    ///
+    /// This function will return an error if:
+    /// - Fetching todo tasks fails when the initial task list is empty.
+    /// - Any critical error occurs during the execution process.
+    ///
+    /// Note that individual task failures do not cause this method to return an error.
     pub fn run(&mut self) -> anyhow::Result<()> {
-        // self.tasks.reverse();
         if self.tasks.is_empty() {
-            self.todo()?;
+            self.todo().context("Failed to fetch todo tasks")?;
             if self.tasks.is_empty() {
+                info!("No tasks to run");
                 return Ok(());
-            } else {
-                self.run()?;
             }
-        } else {
-            let n_tasks = self.tasks.len();
-            warn!("{n_tasks} tasks to run");
-            let mut i = 1;
-            while let Some(task) = self.tasks.pop() {
-                warn!("Running {i}/{n_tasks}");
-                info!("{task:#?}");
-                if let Err(err) = task.run() {
-                    log::error!("{err}");
+        }
+
+        let n_tasks = self.tasks.len();
+        warn!("{n_tasks} tasks to run");
+
+        let mut completed_tasks = Vec::new();
+
+        for (i, task) in self.tasks.iter().enumerate() {
+            warn!("Running task {}/{}", i + 1, n_tasks);
+            info!("{task}");
+
+            match task.clone().run() {
+                Ok(_) => {
+                    info!("Task completed successfully");
+                    completed_tasks.push(i);
                 }
-                i += 1;
+                Err(err) => error!("Task failed: {}", err),
             }
         }
+
+        // Remove completed tasks
+        for &index in completed_tasks.iter().rev() {
+            self.tasks.remove(index);
+        }
+
+        info!(
+            "{} tasks completed, {} tasks remaining",
+            completed_tasks.len(),
+            self.tasks.len()
+        );
+
         Ok(())
     }
 }
@@ -573,8 +602,16 @@ impl fmt::Display for CollectionsTasks {
         use CollectionsTasks::*;
 
         match self {
-            Align(case) => write!(f, "Align task  with: {:#?}", case),
-            DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases),
+            Align(case) => write!(f, "Alignment task for: {} {}", case.id, case.time_point),
+            DemuxAlign(cases) => write!(
+                f,
+                "Demultiplex and alignment task for: {}",
+                cases
+                    .iter()
+                    .map(|c| format!("{} {}", c.id, c.time_point))
+                    .collect::<Vec<String>>()
+                    .join(", ")
+            ),
             DeepVariant {
                 id,
                 time_point,
@@ -583,7 +620,7 @@ impl fmt::Display for CollectionsTasks {
             } => {
                 write!(
                     f,
-                    "DeepVariant task with id: {}, time_point: {}, bam: {}",
+                    "DeepVariant calling task for {} {}, from bam: {}",
                     id, time_point, bam
                 )
             }
@@ -595,7 +632,7 @@ impl fmt::Display for CollectionsTasks {
             } => {
                 write!(
                     f,
-                    "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}",
+                    "ClairS calling task for {}, with diag_bam: {}, mrd_bam: {}",
                     id, diag_bam, mrd_bam
                 )
             }
@@ -607,39 +644,57 @@ impl fmt::Display for CollectionsTasks {
             } => {
                 write!(
                     f,
-                    "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}",
+                    "NanomonSV calling task for {}, with diag_bam: {}, mrd_bam: {}",
                     id, diag_bam, mrd_bam
                 )
             }
-            WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam),
-            Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id),
+            WholeScan {
+                id,
+                bam,
+                time_point,
+                ..
+            } => write!(f, "Whole scan for {} {}, bam: {}", id, time_point, bam),
+            Variants { id, .. } => write!(f, "Variants aggregation for {}", id),
             Assemble { id, time_point, .. } => {
-                write!(f, "Assembly for id: {}, time point: {}", id, time_point)
+                write!(f, "De novo assemblage for {} {}", id, time_point)
             }
         }
     }
 }
 
 pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
-    let mut last_n = Vec::new();
+    let mut last_n = Vec::with_capacity(3);
+    let mut consecutive_same_count = 0;
+
     loop {
-        let mut collection = Collections::new(config.clone())?;
-        collection.todo()?;
-        if collection.tasks.is_empty() {
-            warn!("All results are update");
-            break;
-        }
+        let mut collection =
+            Collections::new(config.clone()).context("Failed to create new Collections")?;
+        collection.todo().context("Failed to get todo tasks")?;
+
         let n_tasks = collection.tasks.len();
-        warn!("{n_tasks} tasks to run");
-        if last_n.len() > 2
-            && last_n[last_n.len() - 1] == n_tasks
-            && last_n[last_n.len() - 2] == n_tasks
-        {
-            warn!("Tasks don't progress");
+
+        if n_tasks == 0 {
+            info!("All results are up to date");
             break;
         }
+
+        if last_n.len() >= 2 && last_n.iter().rev().take(2).all(|&x| x == n_tasks) {
+            consecutive_same_count += 1;
+            if consecutive_same_count >= 2 {
+                error!("Tasks are not progressing");
+                break;
+            }
+        } else {
+            consecutive_same_count = 0;
+        }
+
         last_n.push(n_tasks);
-        collection.run()?;
+        if last_n.len() > 3 {
+            last_n.remove(0);
+        }
+
+        collection.run().context("Failed to run collection tasks")?;
     }
+
     Ok(())
 }