Browse Source

vcf tasks ok, refactor CommandRun

Thomas 1 year ago
parent
commit
5f479db5bc
10 changed files with 722 additions and 249 deletions
  1. 280 0
      :
  2. 16 6
      src/callers/clairs.rs
  3. 3 4
      src/callers/deep_variant.rs
  4. 187 113
      src/callers/nanomonsv.rs
  5. 2 2
      src/collection/bam.rs
  6. 90 66
      src/collection/mod.rs
  7. 9 2
      src/collection/vcf.rs
  8. 22 39
      src/commands/bcftools.rs
  9. 1 1
      src/lib.rs
  10. 112 16
      src/runners.rs

+ 280 - 0
:

@@ -0,0 +1,280 @@
+// reference="/data/ref/hs1/chm13v2.0.fa"
+// bcftools="/data/tools/bcftools-1.18/bcftools"
+//
+// DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
+// MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
+//
+// DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
+// MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
+//
+// DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
+// MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
+//
+// DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
+// MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
+//
+// if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
+//   mkdir "$DIAG_OUTPUT_DIR"
+// fi
+//
+// if [ ! -d "$MRD_OUTPUT_DIR" ]; then
+//   mkdir "$MRD_OUTPUT_DIR"
+// fi
+//
+// # PARSE
+// if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
+//   nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
+// fi
+//
+// if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
+//   echo "[pipe] NanomonSV parse ${MRD_BAM}"
+//   nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
+// fi
+//
+// # GET
+// if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
+//   nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
+//     --control_bam "$MRD_BAM" \
+//     --processes 155 \
+//     "$DIAG_OUT_PREFIX" \
+//     "$DIAG_BAM" "$reference"
+// fi
+//
+// # GET MRD
+// if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
+//   nanomonsv get \
+//     --processes 155 \
+//     "$MRD_OUT_PREFIX" \
+//     "$MRD_BAM" "$reference"
+// fi
+//
+// vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
+// if [ ! -f "$vcf_passed" ]; then
+//   $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
+//     $bcftools view -s "TUMOR" --write-index --threads 20 \
+//     -i "FILTER='PASS'" \
+//     -o "$vcf_passed"
+// fi
+
+use std::{
+    fs,
+    io::{BufRead, BufReader},
+    path::Path,
+    process::{Command, Stdio},
+    sync::{Arc, Mutex},
+    thread,
+};
+
+use log::info;
+
+use crate::commands::bcftools::{bcftools_keep_pass, BcftoolsConfig};
+
+#[derive(Debug)]
+pub struct NanomonSVConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub reference: String,
+    pub thread: u8,
+}
+
+impl Default for NanomonSVConfig {
+    fn default() -> Self {
+        Self {
+            bin: "nanomonsv".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            thread: 155,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct NanomonSV {
+    pub id: String,
+    pub diag_bam: String,
+    pub mrd_bam: String,
+    pub diag_out_dir: String,
+    pub mrd_out_dir: String,
+    pub log: String,
+    pub config: NanomonSVConfig,
+}
+
+impl NanomonSV {
+    pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
+        let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
+        let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
+
+        NanomonSV {
+            id: id.to_string(),
+            diag_bam: diag_bam.to_string(),
+            mrd_bam: mrd_bam.to_string(),
+            diag_out_dir,
+            mrd_out_dir,
+            log: String::default(),
+            config,
+        }
+    }
+
+    pub fn run(&mut self) {
+        // Create direcories
+        if !Path::new(&self.diag_out_dir).exists() {
+            fs::create_dir_all(&self.diag_out_dir).unwrap();
+        }
+        if !Path::new(&self.mrd_out_dir).exists() {
+            fs::create_dir_all(&self.mrd_out_dir).unwrap();
+        }
+
+        // Parse
+        let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
+        let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
+
+        let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
+        let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
+
+        let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
+
+        let mut threads_handles = Vec::new();
+        if !Path::new(&diag_info_vcf).exists() {
+            let bin = self.config.bin.clone();
+            let reference = self.config.reference.clone();
+            let diag_bam = self.diag_bam.clone();
+            let log = log.clone();
+            let diag_out_prefix = diag_out_prefix.clone();
+
+            let handle = thread::spawn(move || {
+                let mut child = Command::new(bin)
+                    .args([
+                        "parse",
+                        "--reference_fasta",
+                        &reference,
+                        &diag_bam,
+                        &diag_out_prefix,
+                    ])
+                    .stdout(Stdio::piped())
+                    .stderr(Stdio::inherit())
+                    .spawn()
+                    .expect("Failed to spawn NanomonSV parse");
+
+                if let Some(stdout) = child.stdout.take() {
+                    let reader = BufReader::new(stdout);
+                    for line in reader.lines().map_while(Result::ok) {
+                        info!("{}", line);
+                        let mut log_lock = log.lock().unwrap();
+                        log_lock.push_str(&line);
+                        log_lock.push('\n');
+                    }
+                }
+            });
+            threads_handles.push(handle);
+        }
+
+        if !Path::new(&mrd_info_vcf).exists() {
+            let bin = self.config.bin.clone();
+            let reference = self.config.reference.clone();
+            let mrd_bam = self.diag_bam.clone();
+            let log = log.clone();
+            let mrd_out_prefix = mrd_out_prefix.clone();
+
+            let handle = thread::spawn(move || {
+                let mut child = Command::new(bin)
+                    .args([
+                        "parse",
+                        "--reference_fasta",
+                        &reference,
+                        &mrd_bam,
+                        &mrd_out_prefix,
+                    ])
+                    .stdout(Stdio::inherit())
+                    .stderr(Stdio::piped())
+                    .spawn()
+                    .expect("Failed to spawn NanomonSV parse");
+
+                if let Some(stdout) = child.stderr.take() {
+                    let reader = BufReader::new(stdout);
+                    for line in reader.lines().map_while(Result::ok) {
+                        info!("{}", line);
+                        let mut log_lock = log.lock().unwrap();
+                        log_lock.push_str(&line);
+                        log_lock.push('\n');
+                    }
+                }
+                child.wait().unwrap();
+            });
+            threads_handles.push(handle);
+        }
+
+        // Wait for all threads to finish
+        for handle in threads_handles {
+            handle.join().expect("Thread panicked");
+        }
+
+        // Get
+        let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
+        let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
+
+        if !Path::new(&mrd_result_vcf).exists() {
+            let mut child = Command::new(&self.config.bin)
+                .args([
+                    "get",
+                    "--process",
+                    &self.config.thread.to_string(),
+                    &mrd_out_prefix,
+                    &self.mrd_bam,
+                    &self.config.reference,
+                ])
+                .stdout(Stdio::inherit())
+                .stderr(Stdio::piped())
+                .spawn()
+                .expect("Failed to spawn nanomonsv");
+
+            if let Some(stdout) = child.stderr.take() {
+                let reader = BufReader::new(stdout);
+                for line in reader.lines().map_while(Result::ok) {
+                    info!("{line}");
+                    let mut log_lock = log.lock().unwrap();
+                    log_lock.push_str(&line);
+                    log_lock.push('\n');
+                }
+            }
+
+            child.wait().unwrap();
+        }
+
+        if !Path::new(&diag_result_vcf).exists() {
+            let mut child = Command::new(&self.config.bin)
+                .args([
+                    "get",
+                    "--control_prefix",
+                    &mrd_out_prefix,
+                    "--control_bam",
+                    &self.mrd_bam,
+                    "--process",
+                    &self.config.thread.to_string(),
+                    &diag_out_prefix,
+                    &self.diag_bam,
+                    &self.config.reference,
+                ])
+                .stdout(Stdio::inherit())
+                .stderr(Stdio::piped())
+                .spawn()
+                .expect("Failed to spawn nanomonsv");
+
+            if let Some(stdout) = child.stderr.take() {
+                let reader = BufReader::new(stdout);
+                for line in reader.lines().map_while(Result::ok) {
+                    info!("{line}");
+                    let mut log_lock = log.lock().unwrap();
+                    log_lock.push_str(&line);
+                    log_lock.push('\n');
+                }
+            }
+
+            child.wait().unwrap();
+        }
+
+        let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
+        if !Path::new(&vcf_passed).exists() {
+            bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
+        }
+    }
+}

+ 16 - 6
src/callers/clairs.rs

@@ -1,6 +1,9 @@
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, DockerRun},
+};
 use std::{fs, path::Path};
 use tracing::info;
-use crate::{commands::bcftools::{Bcftools, BcftoolsConfig}, runners::DockerRun};
 
 #[derive(Debug)]
 pub struct ClairSConfig {
@@ -67,7 +70,7 @@ impl ClairS {
 
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
-            let docker_run = DockerRun::run(&[
+            let mut docker_run = DockerRun::new(&[
                 "run",
                 "-d",
                 "-v",
@@ -93,18 +96,25 @@ impl ClairS {
                 "-s",
                 &format!("{}_diag", self.id),
             ]);
-            docker_run.wait();
-            self.log = docker_run.log();
+            self.log = run_wait(&mut docker_run).unwrap().log;
         } else {
             info!("ClairS output vcf already exists");
         }
 
         // Keep PASS
         if !Path::new(&self.vcf_passed).exists() {
-            Bcftools::new(&self.output_vcf, &self.vcf_passed, BcftoolsConfig::default()).keep_pass();
+            bcftools_keep_pass(
+                &self.output_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            ).unwrap();
         }
         if !Path::new(&self.indel_vcf_passed).exists() {
-            Bcftools::new(&self.output_indel, &self.indel_vcf_passed, BcftoolsConfig::default()).keep_pass();
+            bcftools_keep_pass(
+                &self.output_indel,
+                &self.indel_vcf_passed,
+                BcftoolsConfig::default(),
+            ).unwrap();
         }
     }
 }

+ 3 - 4
src/callers/deep_variant.rs

@@ -5,7 +5,7 @@ use std::{
 };
 use log::{info, warn};
 
-use crate::runners::DockerRun;
+use crate::runners::{run_wait, DockerRun};
 
 #[derive(Debug)]
 pub struct DeepVariantConfig {
@@ -77,7 +77,7 @@ impl DeepVariant {
         
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
-            let docker_run = DockerRun::run(&[
+            let mut docker_run = DockerRun::new(&[
                 "run",
                 "-d",
                 "-v",
@@ -105,8 +105,7 @@ impl DeepVariant {
                 "--sample_name",
                 &format!("{}_{}", self.id, self.time_point),
             ]);
-            docker_run.wait();
-            self.log = docker_run.log();
+            self.log = run_wait(&mut docker_run).unwrap().log;
         }
 
         // Keep PASS

+ 187 - 113
src/callers/nanomonsv.rs

@@ -58,18 +58,21 @@
 
 use std::{
     fs,
-    io::{BufRead, BufReader},
+    // io::{BufRead, BufReader},
     path::Path,
-    process::{Command, Stdio},
-    sync::{Arc, Mutex},
+    // process::{Command, Stdio},
+    // sync::{Arc, Mutex},
     thread,
 };
 
 use log::info;
 
-use crate::commands::bcftools::{Bcftools, BcftoolsConfig};
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, CommandRun, RunReport},
+};
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct NanomonSVConfig {
     pub bin: String,
     pub result_dir: String,
@@ -125,80 +128,86 @@ impl NanomonSV {
         }
 
         // Parse
+        info!("Nanomonsv Parse");
         let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
         let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
 
         let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
         let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
 
-        let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
+        // let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
 
         let mut threads_handles = Vec::new();
         if !Path::new(&diag_info_vcf).exists() {
-            let bin = self.config.bin.clone();
-            let reference = self.config.reference.clone();
+            // let bin = self.config.bin.clone();
+            // let reference = self.config.reference.clone();
             let diag_bam = self.diag_bam.clone();
-            let log = log.clone();
+            // let log = log.clone();
             let diag_out_prefix = diag_out_prefix.clone();
+            let config = self.config.clone();
 
             let handle = thread::spawn(move || {
-                let mut child = Command::new(bin)
-                    .args([
-                        "parse",
-                        "--reference_fasta",
-                        &reference,
-                        &diag_bam,
-                        &diag_out_prefix,
-                    ])
-                    .stdout(Stdio::piped())
-                    .stderr(Stdio::inherit())
-                    .spawn()
-                    .expect("Failed to spawn NanomonSV parse");
-
-                if let Some(stdout) = child.stdout.take() {
-                    let reader = BufReader::new(stdout);
-                    for line in reader.lines().map_while(Result::ok) {
-                        info!("{}", line);
-                        let mut log_lock = log.lock().unwrap();
-                        log_lock.push_str(&line);
-                        log_lock.push('\n');
-                    }
-                }
+                nanomonsv_parse(&diag_bam, &diag_out_prefix, config).unwrap();
+                // let mut child = Command::new(bin)
+                //     .args([
+                //         "parse",
+                //         "--reference_fasta",
+                //         &reference,
+                //         &diag_bam,
+                //         &diag_out_prefix,
+                //     ])
+                //     .stdout(Stdio::piped())
+                //     .stderr(Stdio::inherit())
+                //     .spawn()
+                //     .expect("Failed to spawn NanomonSV parse");
+                //
+                // if let Some(stdout) = child.stdout.take() {
+                //     let reader = BufReader::new(stdout);
+                //     for line in reader.lines().map_while(Result::ok) {
+                //         info!("{}", line);
+                //         let mut log_lock = log.lock().unwrap();
+                //         log_lock.push_str(&line);
+                //         log_lock.push('\n');
+                //     }
+                // }
+                // child.wait().unwrap();
             });
             threads_handles.push(handle);
         }
 
         if !Path::new(&mrd_info_vcf).exists() {
-            let bin = self.config.bin.clone();
-            let reference = self.config.reference.clone();
-            let mrd_bam = self.diag_bam.clone();
-            let log = log.clone();
+            // let bin = self.config.bin.clone();
+            // let reference = self.config.reference.clone();
+            // let log = log.clone();
             let mrd_out_prefix = mrd_out_prefix.clone();
+            let mrd_bam = self.mrd_bam.clone();
+            let config = self.config.clone();
 
             let handle = thread::spawn(move || {
-                let mut child = Command::new(bin)
-                    .args([
-                        "parse",
-                        "--reference_fasta",
-                        &reference,
-                        &mrd_bam,
-                        &mrd_out_prefix,
-                    ])
-                    .stdout(Stdio::inherit())
-                    .stderr(Stdio::piped())
-                    .spawn()
-                    .expect("Failed to spawn NanomonSV parse");
-
-                if let Some(stdout) = child.stderr.take() {
-                    let reader = BufReader::new(stdout);
-                    for line in reader.lines().map_while(Result::ok) {
-                        info!("{}", line);
-                        let mut log_lock = log.lock().unwrap();
-                        log_lock.push_str(&line);
-                        log_lock.push('\n');
-                    }
-                }
-                child.wait().unwrap();
+                nanomonsv_parse(&mrd_bam, &mrd_out_prefix, config).unwrap();
+                // let mut child = Command::new(bin)
+                //     .args([
+                //         "parse",
+                //         "--reference_fasta",
+                //         &reference,
+                //         &mrd_bam,
+                //         &mrd_out_prefix,
+                //     ])
+                //     .stdout(Stdio::inherit())
+                //     .stderr(Stdio::piped())
+                //     .spawn()
+                //     .expect("Failed to spawn NanomonSV parse");
+                //
+                // if let Some(stdout) = child.stderr.take() {
+                //     let reader = BufReader::new(stdout);
+                //     for line in reader.lines().map_while(Result::ok) {
+                //         info!("{}", line);
+                //         let mut log_lock = log.lock().unwrap();
+                //         log_lock.push_str(&line);
+                //         log_lock.push('\n');
+                //     }
+                // }
+                // child.wait().unwrap();
             });
             threads_handles.push(handle);
         }
@@ -209,72 +218,137 @@ impl NanomonSV {
         }
 
         // Get
+        info!("Nanomonsv Get");
         let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
         let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&mrd_result_vcf).exists() {
-            let mut child = Command::new(&self.config.bin)
-                .args([
-                    "get",
-                    "--process",
-                    &self.config.thread.to_string(),
-                    &mrd_out_prefix,
-                    &self.mrd_bam,
-                    &self.config.reference,
-                ])
-                .stdout(Stdio::inherit())
-                .stderr(Stdio::piped())
-                .spawn()
-                .expect("Failed to spawn nanomonsv");
-
-            if let Some(stdout) = child.stderr.take() {
-                let reader = BufReader::new(stdout);
-                for line in reader.lines().map_while(Result::ok) {
-                    info!("{line}");
-                    let mut log_lock = log.lock().unwrap();
-                    log_lock.push_str(&line);
-                    log_lock.push('\n');
-                }
-            }
-
-            child.wait().unwrap();
+            nanomonsv_get(
+                &self.mrd_bam,
+                &mrd_out_prefix,
+                None,
+                None,
+                self.config.clone(),
+            )
+            .unwrap();
+            // let mut child = Command::new(&self.config.bin)
+            //     .args([
+            //         "get",
+            //         "--process",
+            //         &self.config.thread.to_string(),
+            //         &mrd_out_prefix,
+            //         &self.mrd_bam,
+            //         &self.config.reference,
+            //     ])
+            //     .stdout(Stdio::inherit())
+            //     .stderr(Stdio::piped())
+            //     .spawn()
+            //     .expect("Failed to spawn nanomonsv");
+            //
+            // if let Some(stdout) = child.stderr.take() {
+            //     let reader = BufReader::new(stdout);
+            //     for line in reader.lines().map_while(Result::ok) {
+            //         info!("{line}");
+            //         let mut log_lock = log.lock().unwrap();
+            //         log_lock.push_str(&line);
+            //         log_lock.push('\n');
+            //     }
+            // }
+            //
+            // child.wait().unwrap();
         }
 
         if !Path::new(&diag_result_vcf).exists() {
-            let mut child = Command::new(&self.config.bin)
-                .args([
-                    "get",
-                    "--control_prefix",
-                    &mrd_out_prefix,
-                    "--control_bam",
-                    &self.mrd_bam,
-                    "--process",
-                    &self.config.thread.to_string(),
-                    &diag_out_prefix,
-                    &self.diag_bam,
-                    &self.config.reference,
-                ])
-                .stdout(Stdio::inherit())
-                .stderr(Stdio::piped())
-                .spawn()
-                .expect("Failed to spawn nanomonsv");
-
-            if let Some(stdout) = child.stderr.take() {
-                let reader = BufReader::new(stdout);
-                for line in reader.lines().map_while(Result::ok) {
-                    info!("{line}");
-                    let mut log_lock = log.lock().unwrap();
-                    log_lock.push_str(&line);
-                    log_lock.push('\n');
-                }
-            }
-
-            child.wait().unwrap();
+            nanomonsv_get(
+                &self.mrd_bam,
+                &diag_out_prefix,
+                Some(&self.mrd_bam),
+                Some(&mrd_out_prefix),
+                self.config.clone(),
+            )
+            .unwrap();
+            // let mut child = Command::new(&self.config.bin)
+            //     .args([
+            //         "get",
+            //         "--control_prefix",
+            //         &mrd_out_prefix,
+            //         "--control_bam",
+            //         &self.mrd_bam,
+            //         "--process",
+            //         &self.config.thread.to_string(),
+            //         &diag_out_prefix,
+            //         &self.diag_bam,
+            //         &self.config.reference,
+            //     ])
+            //     .stdout(Stdio::inherit())
+            //     .stderr(Stdio::piped())
+            //     .spawn()
+            //     .expect("Failed to spawn nanomonsv");
+            //
+            // if let Some(stdout) = child.stderr.take() {
+            //     let reader = BufReader::new(stdout);
+            //     for line in reader.lines().map_while(Result::ok) {
+            //         info!("{line}");
+            //         let mut log_lock = log.lock().unwrap();
+            //         log_lock.push_str(&line);
+            //         log_lock.push('\n');
+            //     }
+            // }
+            //
+            // child.wait().unwrap();
         }
 
         let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
         if !Path::new(&vcf_passed).exists() {
-            Bcftools::new(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).keep_pass();
+            bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
         }
     }
 }
+
+pub fn nanomonsv_parse(
+    bam: &str,
+    out_prefix: &str,
+    config: NanomonSVConfig,
+) -> anyhow::Result<RunReport> {
+    let args = vec![
+        "parse",
+        "--reference_fasta",
+        &config.reference,
+        bam,
+        out_prefix,
+    ];
+    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    let res = run_wait(&mut cmd_run)?;
+    Ok(res)
+}
+
+pub fn nanomonsv_get(
+    bam: &str,
+    out_prefix: &str,
+    ctrl_bam: Option<&str>,
+    ctrl_prefix: Option<&str>,
+    config: NanomonSVConfig,
+) -> anyhow::Result<RunReport> {
+    let threads = config.thread.to_string();
+    let mut args = vec!["get"];
+
+    if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
+        args.extend(vec![
+            "--control_prefix",
+            ctrl_prefix,
+            "--control_bam",
+            ctrl_bam,
+        ])
+    }
+
+    args.extend(vec![
+        "--process",
+        &threads,
+        out_prefix,
+        bam,
+        &config.reference,
+    ]);
+    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    let res = run_wait(&mut cmd_run)?;
+    Ok(res)
+}

+ 2 - 2
src/collection/bam.rs

@@ -44,7 +44,7 @@ impl Bam {
         let stem: Vec<&str> = stem.split('_').collect();
 
         if stem.len() > 4 || stem.len() < 3 {
-            return Err(anyhow!("Error in bam name formating {}", path.display()));
+            return Err(anyhow!("Error in bam name: {}", path.display()));
         }
 
         let id = stem[0].to_string();
@@ -59,7 +59,7 @@ impl Bam {
                 "oncoT" => BamType::Panel("oncoT".to_string()),
                 "H3K27ac" => BamType::ChIP("H3K27ac".to_string()),
                 "H3K4me3" => BamType::ChIP("H3K4me3".to_string()),
-                _ => return Err(anyhow!("Error in bam name formating {}", path.display())),
+                _ => return Err(anyhow!("Error in bam name: {}", path.display())),
             }
         } else {
             BamType::WGS

+ 90 - 66
src/collection/mod.rs

@@ -3,21 +3,17 @@ use std::path::PathBuf;
 use hashbrown::HashMap;
 use log::{info, warn};
 
+use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
     callers::{
         clairs::{ClairS, ClairSConfig},
         deep_variant::{DeepVariant, DeepVariantConfig},
         nanomonsv::{NanomonSV, NanomonSVConfig},
     },
-    collection::{
-        pod5::{FlowCellCase, Pod5Type},
-        vcf::Vcf,
-    },
+    collection::pod5::{FlowCellCase, Pod5Type},
     commands::dorado::Dorado,
     config::Config,
-    runners::Run,
 };
-use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 
 pub mod bam;
 pub mod pod5;
@@ -49,7 +45,6 @@ impl Collections {
         info!("Looking for base calling tasks...");
         let mut to_demux = Vec::new();
 
-        // let bams_acquisitions_ids = self.bam.by_acquisition_id();
         for run in self.pod5.runs.iter() {
             for fc in run.flowcells.iter() {
                 let acq_id = fc.pod5_info.acquisition_id.clone();
@@ -88,65 +83,94 @@ impl Collections {
             .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
 
         // Variant calling
-        self.vcf.sort_by_id();
-        let mut vcf_by_ids: HashMap<String, Vec<Vcf>> = HashMap::new();
-        self.vcf.vcfs.iter().for_each(|v| {
-            vcf_by_ids.entry(v.id.clone()).or_default().push(v.clone());
-        });
-
-        vcf_by_ids.iter().for_each(|(k, v)| {
-            if v.len() != 5 {
+        info!("Looking for variant calling tasks...");
+        let mut looked_ids = Vec::new();
+        for (id, vcfs) in self.vcf.group_by_id() {
+            if let (Some(diag), Some(mrd)) = (
+                self.bam.get(&id, "diag").first(),
+                self.bam.get(&id, "mrd").first(),
+            ) {
+                let caller_time: Vec<(&str, &str)> = vcfs
+                    .iter()
+                    .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
+                    .collect();
+
+                if !caller_time.contains(&("clairs", "diag"))
+                    || !caller_time.contains(&("clairs_indel", "diag"))
+                {
+                    self.tasks.push(CollectionsTasks::ClairS {
+                        id: id.to_string(),
+                        diag_bam: diag.path.to_str().unwrap().to_string(),
+                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                        config: ClairSConfig::default(),
+                    });
+                }
+                if !caller_time.contains(&("DeepVariant", "diag")) {
+                    self.tasks.push(CollectionsTasks::DeepVariant {
+                        id: id.to_string(),
+                        time_point: "diag".to_string(),
+                        bam: diag.path.to_str().unwrap().to_string(),
+                        config: DeepVariantConfig::default(),
+                    });
+                }
+                if !caller_time.contains(&("DeepVariant", "mrd")) {
+                    self.tasks.push(CollectionsTasks::DeepVariant {
+                        id: id.to_string(),
+                        time_point: "mrd".to_string(),
+                        bam: mrd.path.to_str().unwrap().to_string(),
+                        config: DeepVariantConfig::default(),
+                    });
+                }
+                if !caller_time.contains(&("nanomonsv", "diag")) {
+                    self.tasks.push(CollectionsTasks::NanomonSV {
+                        id: id.to_string(),
+                        diag_bam: diag.path.to_str().unwrap().to_string(),
+                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                        config: NanomonSVConfig::default(),
+                    });
+                }
+                looked_ids.push(id.clone());
+            }
+        }
+
+        // ids without any vcf
+        self.bam
+            .bams
+            .iter()
+            .map(|b| b.id.clone())
+            .filter(|id| !looked_ids.contains(id))
+            .for_each(|id| {
                 if let (Some(diag), Some(mrd)) = (
-                    self.bam.get(k, "diag").first(),
-                    self.bam.get(k, "mrd").first(),
+                    self.bam.get(&id, "diag").first(),
+                    self.bam.get(&id, "mrd").first(),
                 ) {
-                    println!("{v:#?}");
-                    let diag_bam = diag.path.to_str().unwrap().to_string();
-                    let mrd_bam = mrd.path.to_str().unwrap().to_string();
-
-                    let has_clairs = v.iter().any(|v| v.caller == "clairs");
-                    let has_clairs_indel = v.iter().any(|v| v.caller == "clairs_indel");
-                    if !has_clairs || !has_clairs_indel {
-                        self.tasks.push(CollectionsTasks::ClairS {
-                            id: k.to_string(),
-                            diag_bam: diag_bam.clone(),
-                            mrd_bam: mrd_bam.clone(),
-                            config: ClairSConfig::default(),
-                        });
-                    }
-                    if !v
-                        .iter()
-                        .any(|v| v.caller == "DeepVariant" && v.time_point == "diag")
-                    {
-                        self.tasks.push(CollectionsTasks::DeepVariant {
-                            id: k.to_string(),
-                            time_point: "diag".to_string(),
-                            bam: diag_bam.clone(),
-                            config: DeepVariantConfig::default(),
-                        })
-                    }
-                    if !v
-                        .iter()
-                        .any(|v| v.caller == "DeepVariant" && v.time_point == "mrd")
-                    {
-                        self.tasks.push(CollectionsTasks::DeepVariant {
-                            id: k.to_string(),
-                            time_point: "mrd".to_string(),
-                            bam: mrd_bam.clone(),
-                            config: DeepVariantConfig::default(),
-                        })
-                    }
-                    if !v.iter().any(|v| v.caller == "nanomonsv") {
-                        self.tasks.push(CollectionsTasks::NanomonSV {
-                            id: k.to_string(),
-                            diag_bam: diag_bam.clone(),
-                            mrd_bam: mrd_bam.clone(),
-                            config: NanomonSVConfig::default(),
-                        })
-                    }
-                };
-            }
-        });
+                    self.tasks.push(CollectionsTasks::ClairS {
+                        id: id.to_string(),
+                        diag_bam: diag.path.to_str().unwrap().to_string(),
+                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                        config: ClairSConfig::default(),
+                    });
+
+                    self.tasks.push(CollectionsTasks::DeepVariant {
+                        id: id.to_string(),
+                        time_point: "diag".to_string(),
+                        bam: diag.path.to_str().unwrap().to_string(),
+                        config: DeepVariantConfig::default(),
+                    });
+                    self.tasks.push(CollectionsTasks::DeepVariant {
+                        id: id.to_string(),
+                        time_point: "mrd".to_string(),
+                        bam: mrd.path.to_str().unwrap().to_string(),
+                        config: DeepVariantConfig::default(),
+                    });
+                    self.tasks.push(CollectionsTasks::NanomonSV {
+                        id: id.to_string(),
+                        diag_bam: diag.path.to_str().unwrap().to_string(),
+                        mrd_bam: mrd.path.to_str().unwrap().to_string(),
+                        config: NanomonSVConfig::default(),
+                    });
+                }
+            });
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {
@@ -197,8 +221,8 @@ pub enum CollectionsTasks {
     },
 }
 
-impl Run for CollectionsTasks {
-    fn run(self) -> anyhow::Result<()> {
+impl CollectionsTasks {
+    pub fn run(self) -> anyhow::Result<()> {
         match self {
             CollectionsTasks::Align(case) => {
                 Dorado::init(case.clone(), Config::default())?.run_pipe()?;

+ 9 - 2
src/collection/vcf.rs

@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
 use csi::binning_index::ReferenceSequence;
 use glob::glob;
 use log::warn;
-use std::{fs::Metadata, os::unix::fs::MetadataExt, path::PathBuf};
+use std::{collections::HashMap, fs::Metadata, os::unix::fs::MetadataExt, path::PathBuf};
 
 use noodles_csi as csi;
 use num_format::{Locale, ToFormattedString};
@@ -114,6 +114,14 @@ impl VcfCollection {
     pub fn sort_by_id(&mut self) {
         self.vcfs.sort_by_key(|v| v.id.clone());
     }
+
+    pub fn group_by_id(&self) -> Vec<(String, Vec<Vcf>)> {
+        let mut vcf_by_ids: HashMap<String, Vec<Vcf>> = HashMap::new();
+        self.vcfs.iter().for_each(|v| {
+            vcf_by_ids.entry(v.id.clone()).or_default().push(v.clone());
+        });
+        vcf_by_ids.into_iter().collect()
+    }
 }
 
 pub fn n_variants(path: &str) -> anyhow::Result<u64> {
@@ -121,7 +129,6 @@ pub fn n_variants(path: &str) -> anyhow::Result<u64> {
     let index = csi::read(csi_src)?;
 
     let mut n = 0;
-
     for reference_sequence in index.reference_sequences() {
         if let Some(metadata) = reference_sequence.metadata() {
             n += metadata.mapped_record_count()

+ 22 - 39
src/commands/bcftools.rs

@@ -1,5 +1,4 @@
-use std::process::{Command, Stdio};
-use log::warn;
+use crate::runners::{run_wait, CommandRun, RunReport};
 
 #[derive(Debug)]
 pub struct BcftoolsConfig {
@@ -16,41 +15,25 @@ impl Default for BcftoolsConfig {
     }
 }
 
-#[derive(Debug)]
-pub struct Bcftools {
-    pub input: String,
-    pub output: String,
-    pub config: BcftoolsConfig,
-}
-
-impl Bcftools {
-    pub fn new(input: &str, output: &str, config: BcftoolsConfig) -> Self {
-        Bcftools {
-            input: input.to_string(),
-            output: output.to_string(),
-            config,
-        }
-    }
-    pub fn keep_pass(&self) {
-        let status = Command::new(&self.config.bin)
-            .args([
-                "view",
-                "--write-index",
-                "--threads",
-                &self.config.threads.to_string(),
-                "-i",
-                "FILTER='PASS'",
-                &self.input,
-                "-o",
-                &self.output,
-            ])
-            .stdout(Stdio::inherit())
-            .stderr(Stdio::inherit())
-            .status()
-            .expect("Failed to filter VCF with bcftools");
-
-        if !status.success() {
-            warn!("bcftools command failed with status: {}", status);
-        }
-    }
+pub fn bcftools_keep_pass(
+    input: &str,
+    output: &str,
+    config: BcftoolsConfig,
+) -> anyhow::Result<RunReport> {
+    let mut cmd_run = CommandRun::new(
+        &config.bin,
+        &[
+            "view",
+            "--write-index",
+            "--threads",
+            &config.threads.to_string(),
+            "-i",
+            "FILTER='PASS'",
+            input,
+            "-o",
+            output,
+        ],
+    );
+    let res = run_wait(&mut cmd_run)?;
+    Ok(res)
 }

+ 1 - 1
src/lib.rs

@@ -107,7 +107,7 @@ mod tests {
 
     // cargo test run -- --nocapture; ~/run_scripts/notify_finish.sh &
     #[test_log::test]
-    fn task() -> anyhow::Result<()> {
+    fn todo() -> anyhow::Result<()> {
         let mut collections = Collections::new(
             "/data/run_data",
             "/data/flow_cells.tsv",

+ 112 - 16
src/runners.rs

@@ -1,33 +1,70 @@
 use std::{
     io::{BufRead, BufReader},
-    process::{Command, Stdio},
+    process::{Child, Command, Stdio},
     sync::{Arc, Mutex},
     thread,
 };
 
+use chrono::{DateTime, Utc};
 use log::{info, warn};
 
 pub trait Run {
-    fn run(self) -> anyhow::Result<()>;
+    fn run(&mut self) -> anyhow::Result<()>;
+}
+
+pub trait Wait {
+    fn wait(&mut self) -> anyhow::Result<()>;
+}
+
+pub trait RunWait: Run + Wait + Log {}
+impl<T: Run + Wait + Log> RunWait for T {}
+
+pub fn run_wait<T: RunWait>(item: &mut T) -> anyhow::Result<RunReport> {
+    let mut run_report = RunReport {
+        start: Utc::now(),
+        ..Default::default()
+    };
+    item.run()?;
+    item.wait()?;
+    run_report.log = item.log();
+    run_report.end = Utc::now();
+    Ok(run_report)
+}
+
+#[derive(Debug, Default)]
+pub struct RunReport {
+    pub start: DateTime<Utc>,
+    pub end: DateTime<Utc>,
+    pub log: String,
+}
+
+pub trait Log {
+    fn log(&self) -> String;
 }
 
 #[derive(Debug, Default)]
 pub struct DockerRun {
     pub args: Vec<String>,
     pub container_id: Arc<Mutex<Option<String>>>,
+    pub start: DateTime<Utc>,
     pub logs: Arc<Mutex<String>>,
 }
 
 impl DockerRun {
-    pub fn run(args: &[&str]) -> Self {
-        let docker_run = DockerRun {
+    pub fn new(args: &[&str]) -> Self {
+        DockerRun {
             args: args.iter().map(|e| e.to_string()).collect(),
+            start: Utc::now(),
             ..Default::default()
-        };
+        }
+    }
+}
 
-        // Setup Ctrl-C handler
+impl Run for DockerRun {
+    fn run(&mut self) -> anyhow::Result<()> {
+        // Setup Ctrl-C handler cant be defined two times
         {
-            let container_id = Arc::clone(&docker_run.container_id);
+            let container_id = Arc::clone(&self.container_id);
             ctrlc::set_handler(move || {
                 if let Ok(container_id) = container_id.lock() {
                     if let Some(ref id) = *container_id {
@@ -42,7 +79,7 @@ impl DockerRun {
 
         // Spawn the main command
         let output = Command::new("docker")
-            .args(&docker_run.args)
+            .args(&self.args)
             .stdout(Stdio::piped())
             .stderr(Stdio::inherit())
             .output()
@@ -51,13 +88,13 @@ impl DockerRun {
         // add id to Arc
         let id = String::from_utf8_lossy(&output.stdout).trim().to_string();
         {
-            let mut container_id_lock = docker_run.container_id.lock().unwrap();
+            let mut container_id_lock = self.container_id.lock().unwrap();
             *container_id_lock = Some(id.clone());
         }
 
         // Spawn a thread to follow the logs
         let log_id = id.clone();
-        let logs_clone = Arc::clone(&docker_run.logs);
+        let logs_clone = Arc::clone(&self.logs);
         let _loger_thread = thread::spawn(move || {
             let output = Command::new("docker")
                 .args(["inspect", "--format='{{.Config.Image}}'", &log_id])
@@ -69,7 +106,7 @@ impl DockerRun {
                     .trim() // Trim to remove any trailing newlines
                     .to_string()
             } else {
-                "".to_string()
+                "?".to_string()
             };
             let mut child = Command::new("docker")
                 .args(["logs", "--follow", &log_id])
@@ -88,12 +125,12 @@ impl DockerRun {
                 }
             }
         });
-
-        docker_run
+        Ok(())
     }
+}
 
-    // Wait for the container to finish
-    pub fn wait(&self) {
+impl Wait for DockerRun {
+    fn wait(&mut self) -> anyhow::Result<()> {
         if let Ok(container_id) = self.container_id.lock() {
             if let Some(ref id) = *container_id {
                 let status = Command::new("docker")
@@ -105,10 +142,69 @@ impl DockerRun {
                 }
             }
         }
+        Ok(())
     }
+}
 
-    pub fn log(&self) -> String {
+impl Log for DockerRun {
+    fn log(&self) -> String {
         let logs_lock = self.logs.lock().unwrap();
         logs_lock.to_string()
     }
 }
+
+pub struct CommandRun {
+    pub bin: String,
+    pub args: Vec<String>,
+    pub child: Option<Child>,
+    pub log: String,
+}
+
+impl CommandRun {
+    pub fn new(bin: &str, args: &[&str]) -> Self {
+        CommandRun {
+            bin: bin.to_string(),
+            args: args.iter().map(|e| e.to_string()).collect(),
+            child: None,
+            log: String::default(),
+        }
+    }
+}
+
+impl Run for CommandRun {
+    fn run(&mut self) -> anyhow::Result<()> {
+        info!("Running command: {} {}", &self.bin, &self.args.join(" "));
+        let mut child = Command::new(&self.bin)
+            .args(&self.args)
+            .stdout(Stdio::inherit())
+            .stderr(Stdio::piped())
+            .spawn()
+            .expect("Failed to spawn");
+        
+        if let Some(stderr) = child.stderr.take() {
+            let reader = BufReader::new(stderr);
+            for line in reader.lines().map_while(Result::ok) {
+                info!("[{}] {line}", self.bin);
+                self.log.push_str(&line);
+                self.log.push('\n');
+            }
+        }
+        self.child = Some(child);
+        Ok(())
+    }
+}
+
+impl Wait for CommandRun {
+    fn wait(&mut self) -> anyhow::Result<()> {
+        if let Some(child) = &mut self.child {
+            child.wait().unwrap();
+        }
+        Ok(())
+    }
+}
+
+impl Log for CommandRun {
+    fn log(&self) -> String {
+        self.log.clone()
+    }
+}