瀏覽代碼

callers: savana, severus; traits: Run (stderr and stdout log and print), Initialize (global config), Version, HasOutput

Thomas 1 年之前
父節點
當前提交
12f24b0405
共有 8 個文件被更改,包括 504 次插入268 次删除
  1. 2 2
      src/callers/clairs.rs
  2. 102 110
      src/callers/savana.rs
  3. 102 106
      src/callers/severus.rs
  4. 20 4
      src/collection/mod.rs
  5. 13 4
      src/commands/longphase.rs
  6. 144 1
      src/config.rs
  7. 41 26
      src/lib.rs
  8. 80 15
      src/runners.rs

+ 2 - 2
src/callers/clairs.rs

@@ -197,14 +197,14 @@ impl ClairS {
         .run()?;
 
         let bam = Path::new(&self.diag_bam);
-        let new_fn = format!("{}_hp", bam.file_stem().unwrap().to_str().unwrap());
+        let new_fn = format!("{}_hp.bam", bam.file_stem().unwrap().to_str().unwrap());
         let bam_hp = bam.with_file_name(new_fn);
         LongphasePhase::new(
             &self.id,
             bam_hp.to_str().unwrap(),
             &germline_normal_tumor,
             LongphaseConfig::default(),
-        )
+        )?
         .run()?;
         Ok(())
     }

+ 102 - 110
src/callers/savana.rs

@@ -1,134 +1,89 @@
 use crate::{
+    collection::{HasOutputs, InitSomatic, Version},
     commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
-    runners::{run_wait, CommandRun},
+    config::Config,
+    runners::{run_wait, CommandRun, Run},
 };
 use anyhow::Context;
 use std::{fs, path::Path};
-use tracing::info;
-
-#[derive(Debug, Clone)]
-pub struct SavanaConfig {
-    pub bin: String,
-    pub result_dir: String,
-    pub threads: u8,
-    pub vntr_bed: String,
-    pub pon: String,
-    pub conda_sh: String,
-    pub force: bool,
-}
-
-impl Default for SavanaConfig {
-    fn default() -> Self {
-        Self {
-            bin: "/data/tools/Savana/severus.py".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            threads: 32,
-            vntr_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
-            pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
-            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
-            force: true,
-        }
-    }
-}
 
 #[derive(Debug)]
 pub struct Savana {
     pub id: String,
-    pub output_dir: String,
-    pub diag_bam: String,
-    pub mrd_bam: String,
-    pub phased_vcf: String,
-    pub output_vcf: String,
-    pub vcf_passed: String,
-    pub config: SavanaConfig,
+    pub config: Config,
     pub log_dir: String,
 }
 
-impl Savana {
-    pub fn new(
-        id: &str,
-        diag_bam: &str,
-        mrd_bam: &str,
-        phased_vcf: &str,
-        config: SavanaConfig,
-    ) -> Self {
-        let output_dir = format!("{}/{}/diag/severus", config.result_dir, id);
-        let output_vcf = format!("{output_dir}/somatic_SVs/severus_somatic.vcf");
-        let vcf_passed = format!("{output_dir}/{id}_diag_severus_PASSED.vcf.gz",);
-
-        let log_dir = format!("{}/{}/log/severus", config.result_dir, id);
-        Self {
-            id: id.to_string(),
-            output_dir,
-            diag_bam: diag_bam.to_string(),
-            mrd_bam: mrd_bam.to_string(),
-            config,
-            phased_vcf: phased_vcf.to_string(),
-            log_dir,
-            output_vcf,
-            vcf_passed,
+impl InitSomatic for Savana {
+    fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
+        let mut output_vcf_exists = Path::new(&config.savana_output_vcf(id)).exists();
+        if config.savana_force && output_vcf_exists {
+            fs::remove_dir_all(config.savana_output_dir(id))?;
+            output_vcf_exists = false;
         }
-    }
 
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        if self.config.force && Path::new(&self.output_vcf).exists() {
-            fs::remove_dir_all(&self.output_dir)?;
+        if output_vcf_exists {
+            anyhow::bail!("{} already exists.", config.savana_output_vcf(id))
         }
 
-        if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        let log_dir = format!("{}/{}/log/savana", config.result_dir, id);
+        if !Path::new(&log_dir).exists() {
+            fs::create_dir_all(&log_dir)
+                .context(format!("Failed  to create {log_dir} directory"))?;
         }
 
-        // Run command if output VCF doesn't exist
-        // savana --tumour /data/longreads_basic_pipe/CUNY/diag/CUNY_diag_hs1_hp.bam --normal /data/longreads_basic_pipe/CUNY/mrd/CUNY_mrd_hs1_hp.bam --outdir /data/longreads_basic_pipe/CUNY/diag/savana --ref /data/ref/hs1/chm13v2.0.fa --phased_vcf /data/longreads_basic_pipe/CUNY/diag/ClairS/clair3_normal_germline_output_PS.vcf --no_blacklist
-        if !Path::new(&self.output_vcf).exists() {
-            let severus_args = [
-                "--target-bam",
-                &self.diag_bam,
-                "--control-bam",
-                &self.mrd_bam,
-                "--phasing-vcf",
-                &self.phased_vcf,
-                "--out-dir",
-                &self.output_dir,
-                "-t",
-                &self.config.threads.to_string(),
-                "--write-alignments",
-                "--use-supplementary-tag",
-                "--resolve-overlaps",
-                "--between-junction-ins",
-                "--vntr-bed",
-                &self.config.vntr_bed,
-            ];
-            let args = [
-                "-c",
-                &format!("source {} && conda activate severus_env && {} {}", self.config.conda_sh, self.config.bin, severus_args.join(" ")),
-            ];
-            let mut cmd_run = CommandRun::new("bash", &args);
-            let report = run_wait(&mut cmd_run).context(format!(
-                "Error while running `severus.py {}`",
-                args.join(" ")
-            ))?;
+        Ok(Self {
+            id: id.to_string(),
+            config,
+            log_dir,
+        })
+    }
+}
 
-            let log_file = format!("{}/severus_", self.log_dir);
-            report
-                .save_to_file(&log_file)
-                .context(format!("Error while writing logs into {log_file}"))?;
-        } else {
-            info!("Savana output vcf already exists");
-        }
+impl Run for Savana {
+    fn run(&mut self) -> anyhow::Result<()> {
+        let id = &self.id;
+        let savana_args = [
+            // "run",
+            "--tumour",
+            &self.config.tumoral_haplotagged_bam(id),
+            "--normal",
+            &self.config.normal_haplotagged_bam(id),
+            "--outdir",
+            &self.config.savana_output_dir(id),
+            "--ref",
+            &self.config.reference,
+            "--phased_vcf",
+            &self.config.germline_phased_vcf(id),
+        ];
+        let args = [
+            "-c",
+            &format!(
+                "source {} && conda activate savana && {} {}",
+                self.config.conda_sh,
+                self.config.savana_bin,
+                savana_args.join(" ")
+            ),
+        ];
+        let mut cmd_run = CommandRun::new("bash", &args);
+        let report = run_wait(&mut cmd_run).context(format!(
+            "Error while running `severus.py {}`",
+            args.join(" ")
+        ))?;
+
+        let log_file = format!("{}/savana_", self.log_dir);
+        report
+            .save_to_file(&log_file)
+            .context(format!("Error while writing logs into {log_file}"))?;
 
         // Keep PASS
-        if !Path::new(&self.vcf_passed).exists() {
-            let report = bcftools_keep_pass(
-                &self.output_vcf,
-                &self.vcf_passed,
-                BcftoolsConfig::default(),
-            )
-            .context(format!(
-                "Error while running bcftools keep PASS for {}",
-                &self.output_vcf
-            ))?;
+        let passed_vcf = &self.config.savana_passed_vcf(id);
+        let output_vcf = &self.config.savana_output_vcf(id);
+        if !Path::new(passed_vcf).exists() && Path::new(output_vcf).exists() {
+            let report = bcftools_keep_pass(output_vcf, passed_vcf, BcftoolsConfig::default())
+                .context(format!(
+                    "Error while running bcftools keep PASS for {output_vcf}"
+                ))?;
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             report
                 .save_to_file(&log_file)
@@ -137,3 +92,40 @@ impl Savana {
         Ok(())
     }
 }
+
+impl HasOutputs for Savana {
+    fn has_output(&self, id: &str) -> (bool, bool) {
+        let output = &self.config.savana_passed_vcf(id);
+        let passed = &self.config.savana_passed_vcf(id);
+        (Path::new(output).exists(), Path::new(passed).exists())
+    }
+}
+
+impl Version for Savana {
+    fn version(config: &Config) -> anyhow::Result<String> {
+        let savana_args = ["--version"];
+        let args = [
+            "-c",
+            &format!(
+                "source {} && conda activate savana && {} {}",
+                config.conda_sh,
+                config.savana_bin,
+                savana_args.join(" ")
+            ),
+        ];
+        let mut cmd_run = CommandRun::new("bash", &args);
+        let report = run_wait(&mut cmd_run).context(format!(
+            "Error while running `savana {}`",
+            args.join(" ")
+        ))?;
+        let log = report.log;
+        let start = log
+            .find("Version ")
+            .context("Failed to find 'Version ' in the log")?;
+        let start_index = start + "Version ".len();
+        let end = log[start_index..]
+            .find('\n')
+            .context("Failed to find newline after 'Version '")?;
+        Ok(log[start_index..start_index + end].to_string().trim().to_string())
+    }
+}

+ 102 - 106
src/callers/severus.rs

@@ -1,133 +1,101 @@
 use crate::{
+    collection::{HasOutputs, InitSomatic, Version},
     commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
-    runners::{run_wait, CommandRun},
+    config::Config,
+    runners::{run_wait, CommandRun, Run},
 };
 use anyhow::Context;
 use std::{fs, path::Path};
-use tracing::info;
-
-#[derive(Debug, Clone)]
-pub struct SeverusConfig {
-    pub bin: String,
-    pub result_dir: String,
-    pub threads: u8,
-    pub vntr_bed: String,
-    pub pon: String,
-    pub conda_sh: String,
-    pub force: bool,
-}
-
-impl Default for SeverusConfig {
-    fn default() -> Self {
-        Self {
-            bin: "/data/tools/Severus/severus.py".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            threads: 32,
-            vntr_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
-            pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
-            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
-            force: true,
-        }
-    }
-}
 
 #[derive(Debug)]
 pub struct Severus {
     pub id: String,
-    pub output_dir: String,
-    pub diag_bam: String,
-    pub mrd_bam: String,
-    pub phased_vcf: String,
-    pub output_vcf: String,
-    pub vcf_passed: String,
-    pub config: SeverusConfig,
+    pub config: Config,
     pub log_dir: String,
 }
 
-impl Severus {
-    pub fn new(
-        id: &str,
-        diag_bam: &str,
-        mrd_bam: &str,
-        phased_vcf: &str,
-        config: SeverusConfig,
-    ) -> Self {
-        let output_dir = format!("{}/{}/diag/severus", config.result_dir, id);
-        let output_vcf = format!("{output_dir}/somatic_SVs/severus_somatic.vcf");
-        let vcf_passed = format!("{output_dir}/{id}_diag_severus_PASSED.vcf.gz",);
+impl InitSomatic for Severus {
+    fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
+        let mut output_vcf_exists = Path::new(&config.severus_output_vcf(id)).exists();
+        if config.severus_force && output_vcf_exists {
+            fs::remove_dir_all(config.severus_output_dir(id))?;
+            output_vcf_exists = false;
+        }
+
+        if output_vcf_exists {
+            anyhow::bail!("{} already exists.", config.severus_output_vcf(id))
+        }
 
         let log_dir = format!("{}/{}/log/severus", config.result_dir, id);
-        Self {
+        if !Path::new(&log_dir).exists() {
+            fs::create_dir_all(&log_dir)
+                .context(format!("Failed  to create {log_dir} directory"))?;
+        }
+
+        let phased_germline_vcf = &config.germline_phased_vcf(id);
+        if !Path::new(phased_germline_vcf).exists() {
+            anyhow::bail!("Could not find required phased VCF: {phased_germline_vcf}")
+        }
+
+        Ok(Self {
             id: id.to_string(),
-            output_dir,
-            diag_bam: diag_bam.to_string(),
-            mrd_bam: mrd_bam.to_string(),
             config,
-            phased_vcf: phased_vcf.to_string(),
             log_dir,
-            output_vcf,
-            vcf_passed,
-        }
+        })
     }
+}
 
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        if self.config.force && Path::new(&self.output_vcf).exists() {
-            fs::remove_dir_all(&self.output_dir)?;
-        }
-
-        if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
-        }
+impl Run for Severus {
+    fn run(&mut self) -> anyhow::Result<()> {
+        let id = &self.id;
+        let output_vcf = &self.config.severus_output_vcf(id);
+        let passed_vcf = &self.config.severus_passed_vcf(id);
 
         // Run command if output VCF doesn't exist
-        if !Path::new(&self.output_vcf).exists() {
-            let severus_args = [
-                "--target-bam",
-                &self.diag_bam,
-                "--control-bam",
-                &self.mrd_bam,
-                "--phasing-vcf",
-                &self.phased_vcf,
-                "--out-dir",
-                &self.output_dir,
-                "-t",
-                &self.config.threads.to_string(),
-                "--write-alignments",
-                "--use-supplementary-tag",
-                "--resolve-overlaps",
-                "--between-junction-ins",
-                "--vntr-bed",
-                &self.config.vntr_bed,
-            ];
-            let args = [
-                "-c",
-                &format!("source {} && conda activate severus_env && {} {}", self.config.conda_sh, self.config.bin, severus_args.join(" ")),
-            ];
-            let mut cmd_run = CommandRun::new("bash", &args);
-            let report = run_wait(&mut cmd_run).context(format!(
-                "Error while running `severus.py {}`",
-                args.join(" ")
-            ))?;
+        let severus_args = [
+            "--target-bam",
+            &self.config.tumoral_bam(id),
+            "--control-bam",
+            &self.config.normal_bam(id),
+            "--phasing-vcf",
+            &self.config.germline_phased_vcf(id),
+            "--out-dir",
+            &self.config.severus_output_dir(id),
+            "-t",
+            &self.config.severus_threads.to_string(),
+            "--write-alignments",
+            "--use-supplementary-tag",
+            "--resolve-overlaps",
+            "--between-junction-ins",
+            "--vntr-bed",
+            &self.config.vntrs_bed,
+        ];
+        let args = [
+            "-c",
+            &format!(
+                "source {} && conda activate severus_env && {} {}",
+                self.config.conda_sh,
+                self.config.severus_bin,
+                severus_args.join(" ")
+            ),
+        ];
+        let mut cmd_run = CommandRun::new("bash", &args);
+        let report = run_wait(&mut cmd_run).context(format!(
+            "Error while running `severus.py {}`",
+            args.join(" ")
+        ))?;
 
-            let log_file = format!("{}/severus_", self.log_dir);
-            report
-                .save_to_file(&log_file)
-                .context(format!("Error while writing logs into {log_file}"))?;
-        } else {
-            info!("Severus output vcf already exists");
-        }
+        let log_file = format!("{}/severus_", self.log_dir);
+        report
+            .save_to_file(&log_file)
+            .context(format!("Error while writing logs into {log_file}"))?;
 
         // Keep PASS
-        if !Path::new(&self.vcf_passed).exists() {
-            let report = bcftools_keep_pass(
-                &self.output_vcf,
-                &self.vcf_passed,
-                BcftoolsConfig::default(),
-            )
-            .context(format!(
-                "Error while running bcftools keep PASS for {}",
-                &self.output_vcf
-            ))?;
+        if !Path::new(passed_vcf).exists() && Path::new(output_vcf).exists() {
+            let report = bcftools_keep_pass(output_vcf, passed_vcf, BcftoolsConfig::default())
+                .context(format!(
+                    "Error while running bcftools keep PASS for {output_vcf}"
+                ))?;
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             report
                 .save_to_file(&log_file)
@@ -136,3 +104,31 @@ impl Severus {
         Ok(())
     }
 }
+
+impl HasOutputs for Severus {
+    fn has_output(&self, id: &str) -> (bool, bool) {
+        let output = &self.config.severus_passed_vcf(id);
+        let passed = &self.config.severus_passed_vcf(id);
+        (Path::new(output).exists(), Path::new(passed).exists())
+    }
+}
+
+impl Version for Severus {
+    fn version(config: &Config) -> anyhow::Result<String> {
+        let args = [
+            "-c",
+            &format!(
+                "source {} && conda activate severus_env && {} {}",
+                config.conda_sh, config.severus_bin, "--version"
+            ),
+        ];
+        let mut cmd_run = CommandRun::new("bash", &args);
+        let report = run_wait(&mut cmd_run).context("Error while running `severus --version`")?;
+        let v = match report.log.split_once(':') {
+            Some((_, value)) => value.trim(),
+            None => anyhow::bail!("Error while parsing `severus --version`"),
+        };
+
+        Ok(v.to_string())
+    }
+}

+ 20 - 4
src/collection/mod.rs

@@ -1,5 +1,11 @@
 use std::{
-    collections::HashMap, fmt, fs::{self, metadata}, os::unix::fs::MetadataExt, path::{Path, PathBuf}, thread, time::SystemTime
+    collections::HashMap,
+    fmt,
+    fs::{self, metadata},
+    os::unix::fs::MetadataExt,
+    path::{Path, PathBuf},
+    thread,
+    time::SystemTime,
 };
 
 use anyhow::Context;
@@ -705,15 +711,13 @@ impl Collections {
                     let handle2 = thread::spawn(|| b.run());
                     let _ = handle1.join().unwrap();
                     let _ = handle2.join().unwrap();
-
                 }
                 [a] => {
                     info!("Single task: ({})", a);
                     let _ = a.clone().run();
-                },
+                }
                 _ => (),
             }
-
         }
 
         Ok(())
@@ -939,3 +943,15 @@ pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
 
     Ok(())
 }
+
+pub trait InitSomatic: Sized {
+    fn initialize(id: &str, config: Config) -> anyhow::Result<Self>;
+}
+
+pub trait HasOutputs {
+    fn has_output(&self, id: &str) -> (bool, bool);
+}
+
+pub trait Version {
+    fn version(config: &Config) -> anyhow::Result<String>;
+}

+ 13 - 4
src/commands/longphase.rs

@@ -121,20 +121,29 @@ pub struct LongphasePhase {
 }
 
 impl LongphasePhase {
-    pub fn new(id: &str, bam_hp: &str, vcf: &str, config: LongphaseConfig) -> Self {
+    pub fn new(id: &str, bam_hp: &str, vcf: &str, config: LongphaseConfig) -> anyhow::Result<Self> {
         let log_dir = format!("{}/{}/log/longphase", config.result_dir, id);
 
         let vcf = Path::new(vcf);
-        let new_file_name = format!("{}_PS", vcf.file_stem().unwrap().to_str().unwrap());
+        let mut stem = vcf
+            .file_stem()
+            .and_then(|s| s.to_str())
+            .map(String::from)
+            .context(format!("Can't parse stem for {}", vcf.display()))?;
+        if stem.ends_with(".vcf") {
+            stem = stem.replace(".vcf", "")
+        }
+
+        let new_file_name = format!("{stem}_PS");
         let out_prefix = vcf.with_file_name(new_file_name);
 
-        Self {
+        Ok(Self {
             bam_hp: bam_hp.to_string(),
             config,
             log_dir,
             vcf: vcf.to_path_buf(),
             out_prefix,
-        }
+        })
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {

+ 144 - 1
src/config.rs

@@ -3,14 +3,61 @@ pub struct Config {
     pub pod_dir: String,
     pub result_dir: String,
     pub align: AlignConfig,
+
+    pub reference: String,
+    pub reference_name: String,
+    pub savana_bin: String,
+    pub tumoral_name: String,
+    pub normal_name: String,
+    pub haplotagged_bam_tag_name: String,
+    pub savana_output_dir: String,
+    pub germline_phased_vcf: String,
+    pub savana_passed_vcf: String,
+    pub conda_sh: String,
+    pub savana_force: bool,
+
+    pub severus_bin: String,
+    pub severus_force: bool,
+    pub severus_threads: u8,
+    pub vntrs_bed: String,
+    pub severus_pon: String,
+    pub severus_output_dir: String,
 }
 
 impl Default for Config {
     fn default() -> Self {
         Self {
             pod_dir: "/data/run_data".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
             align: Default::default(),
+            // Reference genome
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            reference_name: "hs1".to_string(),
+
+            // File structure
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            tumoral_name: "diag".to_string(),
+            normal_name: "mrd".to_string(),
+            haplotagged_bam_tag_name: "hp".to_string(),
+
+            germline_phased_vcf:
+                "{result_dir}/{id}/diag/ClairS/clair3_normal_tumoral_germline_output_PS.vcf"
+                    .to_string(),
+            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
+
+            // Savana
+            savana_bin: "savana".to_string(),
+            savana_output_dir: "{result_dir}/{id}/diag/savana".to_string(),
+            savana_passed_vcf: "{output_dir}/{id}_diag_savana_PASSED.vcf.gz".to_string(),
+            savana_force: true,
+
+            // Severus
+            severus_bin: "/data/tools/Severus/severus.py".to_string(),
+            severus_threads: 32,
+            vntrs_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
+            severus_pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
+            severus_output_dir: "{result_dir}/{id}/diag/severus".to_string(),
+
+            severus_force: true,
         }
     }
 }
@@ -38,3 +85,99 @@ impl Default for AlignConfig {
         }
     }
 }
+
+impl Config {
+    pub fn tumoral_dir(&self, id: &str) -> String {
+        format!("{}/{}/{}", self.result_dir, id, self.tumoral_name)
+    }
+
+    pub fn normal_dir(&self, id: &str) -> String {
+        format!("{}/{}/{}", self.result_dir, id, self.normal_name)
+    }
+
+    pub fn tumoral_bam(&self, id: &str) -> String {
+        format!(
+            "{}/{}_{}_{}.bam",
+            self.tumoral_dir(id),
+            id,
+            self.tumoral_name,
+            self.reference_name,
+        )
+    }
+
+    pub fn normal_bam(&self, id: &str) -> String {
+        format!(
+            "{}/{}_{}_{}.bam",
+            self.normal_dir(id),
+            id,
+            self.normal_name,
+            self.reference_name,
+        )
+    }
+
+
+    pub fn tumoral_haplotagged_bam(&self, id: &str) -> String {
+        format!(
+            "{}/{}_{}_{}_{}.bam",
+            self.tumoral_dir(id),
+            id,
+            self.tumoral_name,
+            self.reference_name,
+            self.haplotagged_bam_tag_name
+        )
+    }
+
+    pub fn normal_haplotagged_bam(&self, id: &str) -> String {
+        format!(
+            "{}/{}_{}_{}_{}.bam",
+            self.normal_dir(id),
+            id,
+            self.normal_name,
+            self.reference_name,
+            self.haplotagged_bam_tag_name
+        )
+    }
+
+    pub fn germline_phased_vcf(&self, id: &str) -> String {
+        self.germline_phased_vcf
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+    }
+
+    pub fn savana_output_dir(&self, id: &str) -> String {
+        self.savana_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+    }
+
+    pub fn savana_output_vcf(&self, id: &str) -> String {
+        let output_dir = self.savana_output_dir(id);
+
+        format!("{output_dir}/{id}_diag_hs1_hp.classified.somatic.vcf")
+    }
+
+    pub fn savana_passed_vcf(&self, id: &str) -> String {
+        self.savana_passed_vcf
+            .replace("{output_dir}", &self.savana_output_dir(id))
+            .replace("{id}", id)
+    }
+
+    pub fn severus_output_dir(&self, id: &str) -> String {
+        self.severus_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+    }
+
+    pub fn severus_output_vcf(&self, id: &str) -> String {
+        let output_dir = self.severus_output_dir(id);
+        format!("{output_dir}/somatic_SVs/severus_somatic.vcf")
+    }
+
+    pub fn severus_passed_vcf(&self, id: &str) -> String {
+        format!(
+            "{}/{}_diag_severus_PASSED.vcf.gz",
+            &self.severus_output_dir(id),
+            id
+        )
+    }
+}

+ 41 - 26
src/lib.rs

@@ -21,12 +21,14 @@ lazy_static! {
 mod tests {
     use std::{fs, path::Path};
 
-    use callers::{nanomonsv::nanomonsv_create_pon, severus::{Severus, SeverusConfig}};
+    use callers::{nanomonsv::nanomonsv_create_pon, savana::Savana, severus::Severus};
+    use collection::{InitSomatic, Version};
     use commands::{longphase::{LongphaseHap, LongphaseConfig}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
     use log::info;
     // use pandora_lib_assembler::assembler::AssembleConfig;
     use rayon::prelude::*;
+    use runners::Run;
 
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
@@ -152,7 +154,7 @@ mod tests {
     #[test]
     fn nanomonsv_solo() -> anyhow::Result<()> {
         init();
-        let id = "MONVILLE";
+        let id = "BRETON";
         let time_point = "diag";
         NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
         // let time_point = "mrd";
@@ -296,25 +298,25 @@ mod tests {
         Assembler::new("CAMEL".to_string(), "diag".to_string(), AssemblerConfig::default()).run()
     }
 
-    #[test]
-    fn run_dmr_par() -> anyhow::Result<()> {
-        init();
-        let collections = Collections::new(
-            CollectionsConfig::default()
-        )?;
-        let tasks = collections.todo_dmr_c_diag_mrd();
-        tasks.iter().for_each(|t| info!("{t}"));
-        let len = tasks.len();
-        // let pool = ThreadPoolBuilder::new().num_threads(10).build().unwrap();
-        // pool.install(|| {
-        //     tasks.par_iter().enumerate().for_each(|(i, t)| {
-        //         let config = ModkitConfig {threads: 2,  ..Default::default() };
-        //         if let collection::CollectionsTasks::DMRCDiagMrd { id, .. } = t { let _ = dmr_c_mrd_diag(id, &config); }
-        //         println!("⚡ {i}/{len}");
-        //     });
-        // });
-        Ok(())
-    }
+    // #[test]
+    // fn run_dmr_par() -> anyhow::Result<()> {
+    //     init();
+    //     let collections = Collections::new(
+    //         CollectionsConfig::default()
+    //     )?;
+    //     let tasks = collections.todo_dmr_c_diag_mrd();
+    //     tasks.iter().for_each(|t| info!("{t}"));
+    //     let len = tasks.len();
+    //     // let pool = ThreadPoolBuilder::new().num_threads(10).build().unwrap();
+    //     // pool.install(|| {
+    //     //     tasks.par_iter().enumerate().for_each(|(i, t)| {
+    //     //         let config = ModkitConfig {threads: 2,  ..Default::default() };
+    //     //         if let collection::CollectionsTasks::DMRCDiagMrd { id, .. } = t { let _ = dmr_c_mrd_diag(id, &config); }
+    //     //         println!("⚡ {i}/{len}");
+    //     //     });
+    //     // });
+    //     Ok(())
+    // }
 
     #[test]
     fn run_mod_par() -> anyhow::Result<()> {
@@ -336,11 +338,24 @@ mod tests {
     #[test]
     fn run_severus() -> anyhow::Result<()> {
         init();
-        let id = "CUNY";
-        let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1_hp.bam");
-        let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1_hp.bam");
-        let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/CUNY_diag_clairs_PASSED.vcf.gz");
-        Severus::new(id, &diag_bam, &mrd_bam, &vcf, SeverusConfig::default()).run()
+        Severus::initialize("ACHITE", Config::default())?.run()
+    }
+
+    #[test]
+    fn run_savana() -> anyhow::Result<()> {
+        init();
+        Savana::initialize("BECERRA", Config::default())?.run()
+    }
+
+    #[test]
+    fn check_versions() -> anyhow::Result<()> {
+        init();
+        let config = Config::default();
+        let v = Savana::version(&config)?;
+        info!("Savanna version {v}");
+        let v = Severus::version(&config)?;
+        info!("Severus version {v}");
+        Ok(())
     }
 
     #[test]

+ 80 - 15
src/runners.rs

@@ -2,7 +2,7 @@ use std::{
     fs::File,
     io::{BufRead, BufReader, Write},
     process::{Child, Command, Stdio},
-    sync::{Arc, Mutex},
+    sync::{mpsc::{self, TryRecvError}, Arc, Mutex},
     thread,
 };
 
@@ -13,7 +13,6 @@ use uuid::Uuid;
 
 use crate::DOCKER_ID;
 
-
 pub trait Run {
     fn run(&mut self) -> anyhow::Result<()>;
 }
@@ -171,47 +170,113 @@ pub struct CommandRun {
     pub bin: String,
     pub args: Vec<String>,
     pub child: Option<Child>,
+    pub tx: mpsc::Sender<(String, String)>,
+    pub rx: mpsc::Receiver<(String, String)>,
     pub log: String,
 }
 
 impl CommandRun {
     pub fn new(bin: &str, args: &[&str]) -> Self {
+        let (tx, rx) = mpsc::channel();
+
         CommandRun {
             bin: bin.to_string(),
             args: args.iter().map(|e| e.to_string()).collect(),
             child: None,
             log: String::default(),
+            tx,
+            rx,
         }
     }
 }
 
 impl Run for CommandRun {
     fn run(&mut self) -> anyhow::Result<()> {
+        // info!("Running command: {} {}", &self.bin, &self.args.join(" "));
+        // let mut child = Command::new(&self.bin)
+        //     .args(&self.args)
+        //     .stdout(Stdio::inherit())
+        //     .stderr(Stdio::piped())
+        //     .spawn()
+        //     .expect("Failed to spawn");
+        //
+        // if let Some(stderr) = child.stderr.take() {
+        //     let reader = BufReader::new(stderr);
+        //     for line in reader.lines().map_while(Result::ok) {
+        //         info!("[{}] {line}", self.bin);
+        //         self.log.push_str(&line);
+        //         self.log.push('\n');
+        //     }
+        // }
+        // self.child = Some(child);
+        // Ok(())
         info!("Running command: {} {}", &self.bin, &self.args.join(" "));
         let mut child = Command::new(&self.bin)
             .args(&self.args)
-            .stdout(Stdio::inherit())
+            .stdout(Stdio::piped())
             .stderr(Stdio::piped())
-            .spawn()
-            .expect("Failed to spawn");
-
-        if let Some(stderr) = child.stderr.take() {
-            let reader = BufReader::new(stderr);
-            for line in reader.lines().map_while(Result::ok) {
-                info!("[{}] {line}", self.bin);
-                self.log.push_str(&line);
-                self.log.push('\n');
-            }
-        }
+            .spawn()?;
+
+        let stdout = child.stdout.take().expect("Failed to capture stdout");
+        let stderr = child.stderr.take().expect("Failed to capture stderr");
+
+        let stdout_reader = BufReader::new(stdout);
+        let stderr_reader = BufReader::new(stderr);
+
+        let tx = self.tx.clone();
+        std::thread::scope(|s| {
+            s.spawn(|| {
+                for line in stdout_reader.lines().map_while(Result::ok) {
+                    info!("[{}:stdout] {}", self.bin, line);
+                    tx
+                        .send(("stdout".to_string(), line.to_string()))
+                        .expect("Channel send failed");
+                }
+            });
+
+            s.spawn(|| {
+                for line in stderr_reader.lines().map_while(Result::ok) {
+                    info!("[{}:stderr] {}", self.bin, line);
+                    tx
+                        .send(("stderr".to_string(), line.to_string()))
+                        .expect("Channel send failed");
+                }
+            });
+        });
+
         self.child = Some(child);
         Ok(())
     }
 }
 
+
 impl Wait for CommandRun {
     fn wait(&mut self) -> anyhow::Result<()> {
         if let Some(child) = &mut self.child {
-            child.wait().unwrap();
+            loop {
+                match self.rx.try_recv() {
+                    Ok((stream, line)) => {
+                        self.log.push_str(&format!("[{}] {}: {}\n", self.bin, stream, line));
+                    }
+                    Err(TryRecvError::Empty) => {
+                        match child.try_wait()? {
+                            Some(_) => {
+                                break;
+                            }
+                            None => {
+                                std::thread::sleep(std::time::Duration::from_millis(100));
+                            }
+                        }
+                    }
+                    Err(TryRecvError::Disconnected) => {
+                        break;
+                    }
+                }
+            }
+
+            // Ensure child process has exited
+            let status = child.wait()?;
+            info!("{} {}", self.bin, status);
         }
         Ok(())
     }