Jelajahi Sumber

clairs phasing

Thomas 1 tahun lalu
induk
melakukan
b692aefa9a

+ 38 - 2
Cargo.lock

@@ -1756,6 +1756,15 @@ dependencies = [
  "simd-adler32",
  "simd-adler32",
 ]
 ]
 
 
+[[package]]
+name = "features"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "83072b3c84e55f9d0c0ff36a4575d0fd2e543ae4a56e04e7f5a9222188d574e3"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
 [[package]]
 [[package]]
 name = "filetime"
 name = "filetime"
 version = "0.2.25"
 version = "0.2.25"
@@ -1916,6 +1925,19 @@ dependencies = [
  "winapi",
  "winapi",
 ]
 ]
 
 
+[[package]]
+name = "full"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3eb4fedc7ed4eef621cda06d7b667b24bed97fee56fe7642d676ac409140163a"
+dependencies = [
+ "num-complex",
+ "num-traits",
+ "opimps",
+ "rand",
+ "rand_distr",
+]
+
 [[package]]
 [[package]]
 name = "funty"
 name = "funty"
 version = "2.0.0"
 version = "2.0.0"
@@ -3339,6 +3361,17 @@ dependencies = [
  "vcpkg",
  "vcpkg",
 ]
 ]
 
 
+[[package]]
+name = "opimps"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68e9faf4b2c7b2387c6d0ff631ebef992a0e7bd0c5e847d2faedf1f49c268d3e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 [[package]]
 name = "option-ext"
 name = "option-ext"
 version = "0.2.0"
 version = "0.2.0"
@@ -3513,6 +3546,8 @@ dependencies = [
  "duct",
  "duct",
  "env_logger 0.11.5",
  "env_logger 0.11.5",
  "expectrl",
  "expectrl",
+ "features",
+ "full",
  "glob",
  "glob",
  "hashbrown 0.15.0",
  "hashbrown 0.15.0",
  "indicatif",
  "indicatif",
@@ -3537,6 +3572,7 @@ dependencies = [
  "serde_json",
  "serde_json",
  "tempfile",
  "tempfile",
  "test-log",
  "test-log",
+ "tokio",
  "tracing",
  "tracing",
  "tracing-test",
  "tracing-test",
  "uuid",
  "uuid",
@@ -5282,9 +5318,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 
 [[package]]
 [[package]]
 name = "tokio"
 name = "tokio"
-version = "1.40.0"
+version = "1.41.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
+checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33"
 dependencies = [
 dependencies = [
  "backtrace",
  "backtrace",
  "bytes",
  "bytes",

+ 3 - 0
Cargo.toml

@@ -39,3 +39,6 @@ ctrlc = "3.4.4"
 lazy_static = "1.5.0"
 lazy_static = "1.5.0"
 indicatif = "0.17.8"
 indicatif = "0.17.8"
 tempfile = "3.12.0"
 tempfile = "3.12.0"
+tokio = "1.41.1"
+features = "0.10.0"
+full = "0.3.0"

+ 63 - 3
src/callers/clairs.rs

@@ -1,5 +1,8 @@
 use crate::{
 use crate::{
-    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    commands::{
+        bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
+        longphase::{LongphaseConfig, LongphaseHap, LongphasePhase},
+    },
     runners::{run_wait, DockerRun},
     runners::{run_wait, DockerRun},
 };
 };
 use anyhow::Context;
 use anyhow::Context;
@@ -12,6 +15,7 @@ pub struct ClairSConfig {
     pub reference: String,
     pub reference: String,
     pub threads: u8,
     pub threads: u8,
     pub platform: String,
     pub platform: String,
+    pub force: bool,
 }
 }
 
 
 impl Default for ClairSConfig {
 impl Default for ClairSConfig {
@@ -21,6 +25,7 @@ impl Default for ClairSConfig {
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             threads: 155,
             threads: 155,
             platform: "ont_r10_dorado_sup_5khz_ssrs".to_string(),
             platform: "ont_r10_dorado_sup_5khz_ssrs".to_string(),
+            force: true,
         }
         }
     }
     }
 }
 }
@@ -65,6 +70,10 @@ impl ClairS {
     }
     }
 
 
     pub fn run(&mut self) -> anyhow::Result<()> {
     pub fn run(&mut self) -> anyhow::Result<()> {
+        if self.config.force && Path::new(&self.output_vcf).exists() {
+            fs::remove_dir_all(&self.output_dir)?;
+        }
+
         // Create out dir
         // Create out dir
         if !Path::new(&self.output_dir).exists() {
         if !Path::new(&self.output_dir).exists() {
             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
@@ -98,6 +107,8 @@ impl ClairS {
                 &self.config.platform,
                 &self.config.platform,
                 "--enable_indel_calling",
                 "--enable_indel_calling",
                 "--include_all_ctgs",
                 "--include_all_ctgs",
+                "--print_germline_calls",
+                "--enable_clair3_germline_output",
                 "--output_dir",
                 "--output_dir",
                 &self.output_dir,
                 &self.output_dir,
                 "-s",
                 "-s",
@@ -116,6 +127,26 @@ impl ClairS {
             info!("ClairS output vcf already exists");
             info!("ClairS output vcf already exists");
         }
         }
 
 
+        let germline_normal = format!("{}/clair3_normal_germline_output.vcf.gz", self.output_dir);
+        let germline_tumor = format!("{}/clair3_tumor_germline_output.vcf.gz", self.output_dir);
+        let germline_normal_tumor = format!(
+            "{}/clair3_normal_tumoral_germline_output.vcf.gz",
+            self.output_dir
+        );
+        let report = bcftools_concat(
+            vec![germline_normal, germline_tumor],
+            &germline_normal_tumor,
+            BcftoolsConfig::default(),
+        )
+        .context(format!(
+            "Error while running bcftools concat germlines for {}",
+            &self.output_vcf
+        ))?;
+        let log_file = format!("{}/bcftools_concat_germline", self.log_dir);
+        report
+            .save_to_file(&log_file)
+            .context(format!("Error while writing logs into {log_file}"))?;
+
         // Keep PASS
         // Keep PASS
         if !Path::new(&self.vcf_passed).exists() {
         if !Path::new(&self.vcf_passed).exists() {
             let report = bcftools_keep_pass(
             let report = bcftools_keep_pass(
@@ -124,13 +155,15 @@ impl ClairS {
                 BcftoolsConfig::default(),
                 BcftoolsConfig::default(),
             )
             )
             .context(format!(
             .context(format!(
-                "Error while running bcftools keep PASS for {}", &self.output_vcf
+                "Error while running bcftools keep PASS for {}",
+                &self.output_vcf
             ))?;
             ))?;
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             report
             report
                 .save_to_file(&log_file)
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
                 .context(format!("Error while writing logs into {log_file}"))?;
         }
         }
+
         if !Path::new(&self.indel_vcf_passed).exists() {
         if !Path::new(&self.indel_vcf_passed).exists() {
             let report = bcftools_keep_pass(
             let report = bcftools_keep_pass(
                 &self.output_indel,
                 &self.output_indel,
@@ -138,7 +171,8 @@ impl ClairS {
                 BcftoolsConfig::default(),
                 BcftoolsConfig::default(),
             )
             )
             .context(format!(
             .context(format!(
-                "Error while running bcftools keep PASS for {}", &self.output_indel
+                "Error while running bcftools keep PASS for {}",
+                &self.output_indel
             ))?;
             ))?;
 
 
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             let log_file = format!("{}/bcftools_pass", self.log_dir);
@@ -146,6 +180,32 @@ impl ClairS {
                 .save_to_file(&log_file)
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
                 .context(format!("Error while writing logs into {log_file}"))?;
         }
         }
+
+        LongphaseHap::new(
+            &self.id,
+            &self.diag_bam,
+            &germline_normal_tumor,
+            LongphaseConfig::default(),
+        )
+        .run()?;
+        LongphaseHap::new(
+            &self.id,
+            &self.mrd_bam,
+            &germline_normal_tumor,
+            LongphaseConfig::default(),
+        )
+        .run()?;
+
+        let bam = Path::new(&self.diag_bam);
+        let new_fn = format!("{}_hp", bam.file_stem().unwrap().to_str().unwrap());
+        let bam_hp = bam.with_file_name(new_fn);
+        LongphasePhase::new(
+            &self.id,
+            bam_hp.to_str().unwrap(),
+            &germline_normal_tumor,
+            LongphaseConfig::default(),
+        )
+        .run()?;
         Ok(())
         Ok(())
     }
     }
 }
 }

+ 6 - 1
src/callers/deep_variant.rs

@@ -15,6 +15,7 @@ pub struct DeepVariantConfig {
     pub result_dir: String,
     pub result_dir: String,
     pub reference: String,
     pub reference: String,
     pub bcftools: String,
     pub bcftools: String,
+    pub force: bool,
 }
 }
 
 
 impl Default for DeepVariantConfig {
 impl Default for DeepVariantConfig {
@@ -26,6 +27,7 @@ impl Default for DeepVariantConfig {
             result_dir: "/data/longreads_basic_pipe".to_string(),
             result_dir: "/data/longreads_basic_pipe".to_string(),
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             bcftools: "/data/tools/bcftools-1.21/bcftools".to_string(),
             bcftools: "/data/tools/bcftools-1.21/bcftools".to_string(),
+            force: true,
         }
         }
     }
     }
 }
 }
@@ -67,6 +69,9 @@ impl DeepVariant {
     }
     }
 
 
     pub fn run(&self) -> anyhow::Result<()> {
     pub fn run(&self) -> anyhow::Result<()> {
+        if self.config.force && Path::new(&self.output_vcf).exists() {
+            fs::remove_dir_all(&self.output_dir)?;
+        }
         // Create out dir
         // Create out dir
         if !Path::new(&self.output_dir).exists() {
         if !Path::new(&self.output_dir).exists() {
             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
@@ -99,7 +104,7 @@ impl DeepVariant {
                     "/output/{}_{}_DeepVariant.g.vcf.gz",
                     "/output/{}_{}_DeepVariant.g.vcf.gz",
                     self.id, self.time_point
                     self.id, self.time_point
                 ),
                 ),
-                "--num_shards=155",
+                &format!("--num_shards={}", self.config.threads),
                 "--logging_dir",
                 "--logging_dir",
                 &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
                 &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
                 "--dry_run=false",
                 "--dry_run=false",

+ 2 - 0
src/callers/mod.rs

@@ -1,4 +1,6 @@
 pub mod deep_variant;
 pub mod deep_variant;
 pub mod clairs;
 pub mod clairs;
 pub mod nanomonsv;
 pub mod nanomonsv;
+pub mod severus;
+pub mod savana;
 
 

+ 2 - 2
src/callers/nanomonsv.rs

@@ -158,10 +158,10 @@ impl NanomonSV {
         if !Path::new(&vcf_passed).exists() {
         if !Path::new(&vcf_passed).exists() {
             let report =
             let report =
                 bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default())
                 bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default())
-                    .unwrap();
+                    .context(format!("Can't index {vcf_passed}"))?;
             report
             report
                 .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
                 .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
-                .unwrap();
+                .context("Can't save report")?;
         }
         }
         Ok(())
         Ok(())
     }
     }

+ 139 - 0
src/callers/savana.rs

@@ -0,0 +1,139 @@
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, CommandRun},
+};
+use anyhow::Context;
+use std::{fs, path::Path};
+use tracing::info;
+
+#[derive(Debug, Clone)]
+pub struct SavanaConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub threads: u8,
+    pub vntr_bed: String,
+    pub pon: String,
+    pub conda_sh: String,
+    pub force: bool,
+}
+
+impl Default for SavanaConfig {
+    fn default() -> Self {
+        Self {
+            bin: "/data/tools/Savana/severus.py".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            threads: 32,
+            vntr_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
+            pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
+            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
+            force: true,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Savana {
+    pub id: String,
+    pub output_dir: String,
+    pub diag_bam: String,
+    pub mrd_bam: String,
+    pub phased_vcf: String,
+    pub output_vcf: String,
+    pub vcf_passed: String,
+    pub config: SavanaConfig,
+    pub log_dir: String,
+}
+
+impl Savana {
+    pub fn new(
+        id: &str,
+        diag_bam: &str,
+        mrd_bam: &str,
+        phased_vcf: &str,
+        config: SavanaConfig,
+    ) -> Self {
+        let output_dir = format!("{}/{}/diag/severus", config.result_dir, id);
+        let output_vcf = format!("{output_dir}/somatic_SVs/severus_somatic.vcf");
+        let vcf_passed = format!("{output_dir}/{id}_diag_severus_PASSED.vcf.gz",);
+
+        let log_dir = format!("{}/{}/log/severus", config.result_dir, id);
+        Self {
+            id: id.to_string(),
+            output_dir,
+            diag_bam: diag_bam.to_string(),
+            mrd_bam: mrd_bam.to_string(),
+            config,
+            phased_vcf: phased_vcf.to_string(),
+            log_dir,
+            output_vcf,
+            vcf_passed,
+        }
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        if self.config.force && Path::new(&self.output_vcf).exists() {
+            fs::remove_dir_all(&self.output_dir)?;
+        }
+
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        }
+
+        // Run command if output VCF doesn't exist
+        // savana --tumour /data/longreads_basic_pipe/CUNY/diag/CUNY_diag_hs1_hp.bam --normal /data/longreads_basic_pipe/CUNY/mrd/CUNY_mrd_hs1_hp.bam --outdir /data/longreads_basic_pipe/CUNY/diag/savana --ref /data/ref/hs1/chm13v2.0.fa --phased_vcf /data/longreads_basic_pipe/CUNY/diag/ClairS/clair3_normal_germline_output_PS.vcf --no_blacklist
+        if !Path::new(&self.output_vcf).exists() {
+            let severus_args = [
+                "--target-bam",
+                &self.diag_bam,
+                "--control-bam",
+                &self.mrd_bam,
+                "--phasing-vcf",
+                &self.phased_vcf,
+                "--out-dir",
+                &self.output_dir,
+                "-t",
+                &self.config.threads.to_string(),
+                "--write-alignments",
+                "--use-supplementary-tag",
+                "--resolve-overlaps",
+                "--between-junction-ins",
+                "--vntr-bed",
+                &self.config.vntr_bed,
+            ];
+            let args = [
+                "-c",
+                &format!("source {} && conda activate severus_env && {} {}", self.config.conda_sh, self.config.bin, severus_args.join(" ")),
+            ];
+            let mut cmd_run = CommandRun::new("bash", &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `severus.py {}`",
+                args.join(" ")
+            ))?;
+
+            let log_file = format!("{}/severus_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        } else {
+            info!("Savana output vcf already exists");
+        }
+
+        // Keep PASS
+        if !Path::new(&self.vcf_passed).exists() {
+            let report = bcftools_keep_pass(
+                &self.output_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            )
+            .context(format!(
+                "Error while running bcftools keep PASS for {}",
+                &self.output_vcf
+            ))?;
+            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        }
+        Ok(())
+    }
+}

+ 138 - 0
src/callers/severus.rs

@@ -0,0 +1,138 @@
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, CommandRun},
+};
+use anyhow::Context;
+use std::{fs, path::Path};
+use tracing::info;
+
+#[derive(Debug, Clone)]
+pub struct SeverusConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub threads: u8,
+    pub vntr_bed: String,
+    pub pon: String,
+    pub conda_sh: String,
+    pub force: bool,
+}
+
+impl Default for SeverusConfig {
+    fn default() -> Self {
+        Self {
+            bin: "/data/tools/Severus/severus.py".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            threads: 32,
+            vntr_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
+            pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
+            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
+            force: true,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Severus {
+    pub id: String,
+    pub output_dir: String,
+    pub diag_bam: String,
+    pub mrd_bam: String,
+    pub phased_vcf: String,
+    pub output_vcf: String,
+    pub vcf_passed: String,
+    pub config: SeverusConfig,
+    pub log_dir: String,
+}
+
+impl Severus {
+    pub fn new(
+        id: &str,
+        diag_bam: &str,
+        mrd_bam: &str,
+        phased_vcf: &str,
+        config: SeverusConfig,
+    ) -> Self {
+        let output_dir = format!("{}/{}/diag/severus", config.result_dir, id);
+        let output_vcf = format!("{output_dir}/somatic_SVs/severus_somatic.vcf");
+        let vcf_passed = format!("{output_dir}/{id}_diag_severus_PASSED.vcf.gz",);
+
+        let log_dir = format!("{}/{}/log/severus", config.result_dir, id);
+        Self {
+            id: id.to_string(),
+            output_dir,
+            diag_bam: diag_bam.to_string(),
+            mrd_bam: mrd_bam.to_string(),
+            config,
+            phased_vcf: phased_vcf.to_string(),
+            log_dir,
+            output_vcf,
+            vcf_passed,
+        }
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        if self.config.force && Path::new(&self.output_vcf).exists() {
+            fs::remove_dir_all(&self.output_dir)?;
+        }
+
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        }
+
+        // Run command if output VCF doesn't exist
+        if !Path::new(&self.output_vcf).exists() {
+            let severus_args = [
+                "--target-bam",
+                &self.diag_bam,
+                "--control-bam",
+                &self.mrd_bam,
+                "--phasing-vcf",
+                &self.phased_vcf,
+                "--out-dir",
+                &self.output_dir,
+                "-t",
+                &self.config.threads.to_string(),
+                "--write-alignments",
+                "--use-supplementary-tag",
+                "--resolve-overlaps",
+                "--between-junction-ins",
+                "--vntr-bed",
+                &self.config.vntr_bed,
+            ];
+            let args = [
+                "-c",
+                &format!("source {} && conda activate severus_env && {} {}", self.config.conda_sh, self.config.bin, severus_args.join(" ")),
+            ];
+            let mut cmd_run = CommandRun::new("bash", &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `severus.py {}`",
+                args.join(" ")
+            ))?;
+
+            let log_file = format!("{}/severus_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        } else {
+            info!("Severus output vcf already exists");
+        }
+
+        // Keep PASS
+        if !Path::new(&self.vcf_passed).exists() {
+            let report = bcftools_keep_pass(
+                &self.output_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            )
+            .context(format!(
+                "Error while running bcftools keep PASS for {}",
+                &self.output_vcf
+            ))?;
+            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        }
+        Ok(())
+    }
+}

+ 125 - 46
src/collection/mod.rs

@@ -1,10 +1,5 @@
 use std::{
 use std::{
-    collections::HashMap,
-    fmt,
-    fs::{self, metadata},
-    os::unix::fs::MetadataExt,
-    path::{Path, PathBuf},
-    time::SystemTime,
+    collections::HashMap, fmt, fs::{self, metadata}, os::unix::fs::MetadataExt, path::{Path, PathBuf}, thread, time::SystemTime
 };
 };
 
 
 use anyhow::Context;
 use anyhow::Context;
@@ -82,8 +77,8 @@ impl Collections {
             result_dir,
             result_dir,
             ..
             ..
         } = &config;
         } = &config;
-        // let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
-        let pod5 = Pod5Collection::default();
+        let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
+        // let pod5 = Pod5Collection::default();
         let bam = BamCollection::new(result_dir);
         let bam = BamCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
         let modbases = ModBasesCollection::new(result_dir);
         let modbases = ModBasesCollection::new(result_dir);
@@ -172,7 +167,7 @@ impl Collections {
         }
         }
 
 
         // de novo
         // de novo
-        tasks.extend(self.todo_assembler()?);
+        // tasks.extend(self.todo_assembler()?);
 
 
         // Remove VCF anterior to BAM
         // Remove VCF anterior to BAM
         // let vcf_by_id = self.vcf.group_by_id();
         // let vcf_by_id = self.vcf.group_by_id();
@@ -291,12 +286,12 @@ impl Collections {
 
 
         // Variants Nanomonsv
         // Variants Nanomonsv
         self.tasks.extend(self.todo_nanomonsv());
         self.tasks.extend(self.todo_nanomonsv());
-        
+
         // Variants aggregation
         // Variants aggregation
         self.tasks.extend(self.todo_variants_agg()?);
         self.tasks.extend(self.todo_variants_agg()?);
 
 
         // ModPileup
         // ModPileup
-        // self.tasks.extend(self.todo_mod_pileup());
+        self.tasks.extend(self.todo_mod_pileup());
 
 
         // DMR C diag vs mrd
         // DMR C diag vs mrd
         self.tasks.extend(self.todo_dmr_c_diag_mrd());
         self.tasks.extend(self.todo_dmr_c_diag_mrd());
@@ -391,46 +386,61 @@ impl Collections {
 
 
     pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
     pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
         let config = ClairSConfig::default();
         let config = ClairSConfig::default();
-        self.bam_pairs().iter().filter_map(|(diag, mrd)| {
-            if self.vcf.vcfs.iter().any(|v| {
-                v.caller == "clairs"
-                    && v.id == diag.id
-                    && v.time_point == diag.time_point
-                    && (v.modified().unwrap_or_default() > diag.modified
-                        || v.modified().unwrap_or_default() > mrd.modified)
-            }) {
-                None
-            } else {
-                Some(CollectionsTasks::ClairS {
-                    config: config.clone(),
-                    id: diag.id.clone(),
-                    diag_bam: diag.path.to_string_lossy().to_string(),
-                    mrd_bam: mrd.path.to_string_lossy().to_string(),
-                })
+        self.bam_pairs()
+            .iter()
+            .filter_map(|(diag, mrd)| {
+                if self.vcf.vcfs.iter().any(|v| {
+                    v.caller == "clairs"
+                        && v.id == diag.id
+                        && v.time_point == diag.time_point
+                        && (v.modified().unwrap_or_default() > diag.modified
+                            || v.modified().unwrap_or_default() > mrd.modified)
+                }) {
+                    None
+                } else {
+                    Some(CollectionsTasks::ClairS {
+                        config: config.clone(),
+                        id: diag.id.clone(),
+                        diag_bam: diag.path.to_string_lossy().to_string(),
+                        mrd_bam: mrd.path.to_string_lossy().to_string(),
+                    })
+                }
+            })
+            .collect()
+    }
+    pub fn run_clairs(&self) -> anyhow::Result<()> {
+        for task in self.todo_clairs() {
+            match task.run() {
+                Ok(_) => info!("done"),
+                Err(e) => warn!("{e}"),
             }
             }
-        }).collect()
+        }
+        Ok(())
     }
     }
 
 
     pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
     pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
         let config = NanomonSVConfig::default();
         let config = NanomonSVConfig::default();
-        self.bam_pairs().iter().filter_map(|(diag, mrd)| {
-            if self.vcf.vcfs.iter().any(|v| {
-                v.caller == "nanomonsv"
-                    && v.id == diag.id
-                    && v.time_point == diag.time_point
-                    && (v.modified().unwrap_or_default() > diag.modified
-                        || v.modified().unwrap_or_default() > mrd.modified)
-            }) {
-                None
-            } else {
-                Some(CollectionsTasks::NanomonSV {
-                    config: config.clone(),
-                    id: diag.id.clone(),
-                    diag_bam: diag.path.to_string_lossy().to_string(),
-                    mrd_bam: mrd.path.to_string_lossy().to_string(),
-                })
-            }
-        }).collect()
+        self.bam_pairs()
+            .iter()
+            .filter_map(|(diag, mrd)| {
+                if self.vcf.vcfs.iter().any(|v| {
+                    v.caller == "nanomonsv"
+                        && v.id == diag.id
+                        && v.time_point == diag.time_point
+                        && (v.modified().unwrap_or_default() > diag.modified
+                            || v.modified().unwrap_or_default() > mrd.modified)
+                }) {
+                    None
+                } else {
+                    Some(CollectionsTasks::NanomonSV {
+                        config: config.clone(),
+                        id: diag.id.clone(),
+                        diag_bam: diag.path.to_string_lossy().to_string(),
+                        mrd_bam: mrd.path.to_string_lossy().to_string(),
+                    })
+                }
+            })
+            .collect()
     }
     }
     pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
     pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
         let config = ModkitConfig::default();
         let config = ModkitConfig::default();
@@ -639,6 +649,75 @@ impl Collections {
 
 
         Ok(())
         Ok(())
     }
     }
+
+    pub fn run_deepvariant(&mut self) -> anyhow::Result<()> {
+        let tasks = self.todo_deepvariants();
+
+        let n_tasks = tasks.len();
+        warn!("{n_tasks} tasks to run");
+
+        let config = DeepVariantConfig {
+            threads: 85,
+            ..Default::default()
+        };
+
+        for (i, tasks_chunk) in tasks.chunks_exact(2).enumerate() {
+            match tasks_chunk {
+                [a, b] => {
+                    warn!("Running task {}/{} and {}/{n_tasks}", i + 1, n_tasks, i + 2);
+                    info!("{a} and {b}");
+
+                    let a = if let CollectionsTasks::DeepVariant {
+                        id,
+                        time_point,
+                        bam,
+                        ..
+                    } = a
+                    {
+                        CollectionsTasks::DeepVariant {
+                            id: id.to_string(),
+                            time_point: time_point.to_string(),
+                            bam: bam.to_string(),
+                            config: config.clone(),
+                        }
+                    } else {
+                        anyhow::bail!("Err")
+                    };
+
+                    let b = if let CollectionsTasks::DeepVariant {
+                        id,
+                        time_point,
+                        bam,
+                        ..
+                    } = b
+                    {
+                        CollectionsTasks::DeepVariant {
+                            id: id.to_string(),
+                            time_point: time_point.to_string(),
+                            bam: bam.to_string(),
+                            config: config.clone(),
+                        }
+                    } else {
+                        anyhow::bail!("Err");
+                    };
+
+                    let handle1 = thread::spawn(|| a.run());
+                    let handle2 = thread::spawn(|| b.run());
+                    let _ = handle1.join().unwrap();
+                    let _ = handle2.join().unwrap();
+
+                }
+                [a] => {
+                    info!("Single task: ({})", a);
+                    let _ = a.clone().run();
+                },
+                _ => (),
+            }
+
+        }
+
+        Ok(())
+    }
 }
 }
 
 
 #[derive(Clone, Debug)]
 #[derive(Clone, Debug)]

+ 196 - 0
src/commands/longphase.rs

@@ -0,0 +1,196 @@
+use crate::runners::{run_wait, CommandRun};
+use anyhow::Context;
+use duct::cmd;
+use std::{
+    fs,
+    path::{Path, PathBuf},
+};
+use tracing::info;
+
+use super::bcftools::{bcftools_keep_pass, BcftoolsConfig};
+
+#[derive(Debug, Clone)]
+pub struct LongphaseConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub reference: String,
+    pub threads: u8,
+    pub force: bool,
+}
+
+impl Default for LongphaseConfig {
+    fn default() -> Self {
+        Self {
+            bin: "/data/tools/longphase_linux-x64".to_string(),
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            threads: 150,
+            force: true,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct LongphaseHap {
+    pub id: String,
+    pub vcf: String,
+    pub bam: PathBuf,
+    pub bam_hp: PathBuf,
+    pub config: LongphaseConfig,
+    pub log_dir: String,
+}
+
+impl LongphaseHap {
+    pub fn new(id: &str, bam: &str, vcf: &str, config: LongphaseConfig) -> Self {
+        let log_dir = format!("{}/{}/log/longphase", config.result_dir, id);
+
+        let bam = Path::new(bam);
+        let new_fn = format!("{}_hp", bam.file_stem().unwrap().to_str().unwrap());
+        let bam_hp = bam.with_file_name(new_fn);
+
+        Self {
+            id: id.to_string(),
+            bam: bam.to_path_buf(),
+            config,
+            log_dir,
+            vcf: vcf.to_string(),
+            bam_hp: bam_hp.to_path_buf(),
+        }
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        if self.config.force && self.bam_hp.exists() {
+            fs::remove_file(&self.bam_hp)?;
+        }
+
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        }
+
+        // Run command if output VCF doesn't exist
+        if !self.bam_hp.exists() {
+            let args = [
+                "haplotag",
+                "-s",
+                &self.vcf,
+                "-b",
+                self.bam.to_str().unwrap(),
+                "-r",
+                &self.config.reference,
+                "-t",
+                &self.config.threads.to_string(),
+                "-o",
+                self.bam_hp.to_str().unwrap(),
+            ];
+            let mut cmd_run = CommandRun::new(&self.config.bin, &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `{} {}`",
+                self.config.bin,
+                args.join(" ")
+            ))?;
+
+            let log_file = format!("{}/longphase_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+
+            let _ = cmd!(
+                "samtools",
+                "index",
+                "-@",
+                &self.config.threads.to_string(),
+                &format!("{}.bam", self.bam_hp.to_str().unwrap())
+            )
+            .run()?;
+        } else {
+            info!("Longphase output vcf already exists");
+        }
+
+        Ok(())
+    }
+}
+
+// /data/tools/longphase_linux-x64 phase -s ClairS/clair3_normal_tumoral_germline_output.vcf.gz -b CUNY_diag_hs1_hp.bam -r /data/ref/hs1/chm13v2.0.fa -t 155 --ont -o ClairS/clair3_normal_tumoral_germline_output_PS
+#[derive(Debug)]
+pub struct LongphasePhase {
+    pub vcf: PathBuf,
+    pub out_prefix: PathBuf,
+    pub bam_hp: String,
+    pub config: LongphaseConfig,
+    pub log_dir: String,
+}
+
+impl LongphasePhase {
+    pub fn new(id: &str, bam_hp: &str, vcf: &str, config: LongphaseConfig) -> Self {
+        let log_dir = format!("{}/{}/log/longphase", config.result_dir, id);
+
+        let vcf = Path::new(vcf);
+        let new_file_name = format!("{}_PS", vcf.file_stem().unwrap().to_str().unwrap());
+        let out_prefix = vcf.with_file_name(new_file_name);
+
+        Self {
+            bam_hp: bam_hp.to_string(),
+            config,
+            log_dir,
+            vcf: vcf.to_path_buf(),
+            out_prefix,
+        }
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        let uncompressed_vcf = format!("{}.vcf", self.out_prefix.display());
+        let final_vcf = format!("{}.gz", uncompressed_vcf);
+
+        if self.config.force && Path::new(&final_vcf).exists() {
+            fs::remove_file(&final_vcf)?;
+        }
+
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        }
+
+        // Run command if output VCF doesn't exist
+        if !Path::new(&final_vcf).exists() {
+            let args = [
+                "phase",
+                "-s",
+                self.vcf.to_str().unwrap(),
+                "-b",
+                &self.bam_hp,
+                "-r",
+                &self.config.reference,
+                "-t",
+                &self.config.threads.to_string(),
+                "--ont",
+                "-o",
+                self.out_prefix.to_str().unwrap(),
+            ];
+            let mut cmd_run = CommandRun::new(&self.config.bin, &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `{} {}`",
+                self.config.bin,
+                args.join(" ")
+            ))?;
+
+            let log_file = format!("{}/longphase_phase_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+
+            let report =
+                bcftools_keep_pass(&uncompressed_vcf, &final_vcf, BcftoolsConfig::default())
+                    .context(format!(
+                        "Error while running bcftools keep PASS for {}",
+                        &final_vcf
+                    ))?;
+            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        } else {
+            info!("Longphase output vcf already exists");
+        }
+
+        Ok(())
+    }
+}

+ 2 - 0
src/commands/mod.rs

@@ -1,3 +1,5 @@
 pub mod dorado;
 pub mod dorado;
 pub mod bcftools;
 pub mod bcftools;
 pub mod modkit;
 pub mod modkit;
+pub mod longphase;
+pub mod wakhan;

+ 141 - 0
src/commands/wakhan.rs

@@ -0,0 +1,141 @@
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, CommandRun},
+};
+use anyhow::Context;
+use std::{fs, path::Path};
+use tracing::info;
+
+#[derive(Debug, Clone)]
+pub struct WakhanConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub threads: u8,
+    pub vntr_bed: String,
+    pub pon: String,
+    pub conda_sh: String,
+    pub force: bool,
+}
+// savana --tumour /data/longreads_basic_pipe/CUNY/diag/CUNY_diag_hs1_hp.bam --normal /data/longreads_basic_pipe/CUNY/mrd/CUNY_mrd_hs1_hp.bam --outdir /data/longreads_basic_pipe/CUNY/diag/savana --ref /data/ref/hs1/chm13v2.0.fa --phased_vcf /data/longreads_basic_pipe/CUNY/diag/ClairS/clair3_normal_germline_output_PS.vcf --no_blacklist
+
+// python /data/tools/Wakhan/src/main.py --threads 150 --reference /data/ref/hs1/chm13v2.0.fa --target-bam /data/longreads_basic_pipe/CUNY/diag/CUNY_diag_hs1_hp.bam --normal-phased-vcf /data/longreads_basic_pipe/CUNY/diag/ClairS/clair3_normal_tumoral_germline_output.vcf.gz --genome-name hs1 --out-dir-plots /data/longreads_basic_pipe/CUNY/diag/wakhan/ --breakpoints /data/longreads_basic_pipe/CUNY/diag/severus/somatic_SVs/severus_somatic.vcf --loh-enable --pdf-enable --centromere /data/tools/Wakhan/src/annotations/chm13v2_cen_coord.bed
+
+impl Default for WakhanConfig {
+    fn default() -> Self {
+        Self {
+            bin: "/data/tools/Wakhan/src/main.py".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            threads: 155,
+            vntr_bed: "/data/ref/hs1/vntrs_chm13.bed".to_string(),
+            pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
+            conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
+            force: true,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Wakhan {
+    pub id: String,
+    pub output_dir: String,
+    pub diag_bam: String,
+    pub mrd_bam: String,
+    pub phased_vcf: String,
+    pub output_vcf: String,
+    pub vcf_passed: String,
+    pub config: WakhanConfig,
+    pub log_dir: String,
+}
+
+impl Wakhan {
+    pub fn new(
+        id: &str,
+        diag_bam: &str,
+        mrd_bam: &str,
+        phased_vcf: &str,
+        config: WakhanConfig,
+    ) -> Self {
+        let output_dir = format!("{}/{}/diag/severus", config.result_dir, id);
+        let output_vcf = format!("{output_dir}/somatic_SVs/severus_somatic.vcf");
+        let vcf_passed = format!("{output_dir}/{id}_diag_severus_PASSED.vcf.gz",);
+
+        let log_dir = format!("{}/{}/log/severus", config.result_dir, id);
+        Self {
+            id: id.to_string(),
+            output_dir,
+            diag_bam: diag_bam.to_string(),
+            mrd_bam: mrd_bam.to_string(),
+            config,
+            phased_vcf: phased_vcf.to_string(),
+            log_dir,
+            output_vcf,
+            vcf_passed,
+        }
+    }
+
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        if self.config.force && Path::new(&self.output_vcf).exists() {
+            fs::remove_dir_all(&self.output_dir)?;
+        }
+
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
+        }
+
+        // Run command if output VCF doesn't exist
+        if !Path::new(&self.output_vcf).exists() {
+            let severus_args = [
+                "--target-bam",
+                &self.diag_bam,
+                "--control-bam",
+                &self.mrd_bam,
+                "--phasing-vcf",
+                &self.phased_vcf,
+                "--out-dir",
+                &self.output_dir,
+                "-t",
+                &self.config.threads.to_string(),
+                "--write-alignments",
+                "--use-supplementary-tag",
+                "--resolve-overlaps",
+                "--between-junction-ins",
+                "--vntr-bed",
+                &self.config.vntr_bed,
+            ];
+            let args = [
+                "-c",
+                &format!("source {} && conda activate severus_env && {} {}", self.config.conda_sh, self.config.bin, severus_args.join(" ")),
+            ];
+            let mut cmd_run = CommandRun::new("bash", &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `severus.py {}`",
+                args.join(" ")
+            ))?;
+
+            let log_file = format!("{}/severus_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        } else {
+            info!("Wakhan output vcf already exists");
+        }
+
+        // Keep PASS
+        if !Path::new(&self.vcf_passed).exists() {
+            let report = bcftools_keep_pass(
+                &self.output_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            )
+            .context(format!(
+                "Error while running bcftools keep PASS for {}",
+                &self.output_vcf
+            ))?;
+            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+        }
+        Ok(())
+    }
+}

+ 1 - 1
src/config.rs

@@ -28,7 +28,7 @@ pub struct AlignConfig {
 impl Default for AlignConfig {
 impl Default for AlignConfig {
     fn default() -> Self {
     fn default() -> Self {
         Self {
         Self {
-            dorado_bin: "/data/tools/dorado-0.8.2-linux-x64/bin/dorado".to_string(),
+            dorado_bin: "/data/tools/dorado-0.8.3-linux-x64/bin/dorado".to_string(),
             dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(), // since v0.8.0 need
             dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(), // since v0.8.0 need
             // to specify cuda devices (exclude the T1000)
             // to specify cuda devices (exclude the T1000)
             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),

+ 76 - 15
src/lib.rs

@@ -14,29 +14,31 @@ extern crate lazy_static;
 
 
 // Define DOCKER_ID lock for handling Docker kill when ctrlc is pressed
 // Define DOCKER_ID lock for handling Docker kill when ctrlc is pressed
 lazy_static! {
 lazy_static! {
-    static ref DOCKER_ID: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
+    static ref DOCKER_ID: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
 }
 }
 
 
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
     use std::{fs, path::Path};
     use std::{fs, path::Path};
 
 
-    use callers::nanomonsv::nanomonsv_create_pon;
-    use commands::modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig};
+    use callers::{nanomonsv::nanomonsv_create_pon, severus::{Severus, SeverusConfig}};
+    use commands::{longphase::{LongphaseHap, LongphaseConfig}, modkit::{bed_methyl, ModkitConfig}};
+    use functions::assembler::{Assembler, AssemblerConfig};
     use log::info;
     use log::info;
-    use rayon::{prelude::*, ThreadPoolBuilder};
+    // use pandora_lib_assembler::assembler::AssembleConfig;
+    use rayon::prelude::*;
 
 
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
     use super::*;
     use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, variants::VariantsCollection, vcf::VcfCollection, Collections, CollectionsConfig}, commands::{bcftools::{bcftools_keep_pass, BcftoolsConfig}, dorado::Dorado}};
     use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, variants::VariantsCollection, vcf::VcfCollection, Collections, CollectionsConfig}, commands::{bcftools::{bcftools_keep_pass, BcftoolsConfig}, dorado::Dorado}};
 
 
+    // export RUST_LOG="debug"
     fn init() {
     fn init() {
         let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
         let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
             .is_test(true)
             .is_test(true)
             .try_init();
             .try_init();
     }
     }
 
 
-    // export RUST_LOG="debug"
     #[test]
     #[test]
     fn it_works() {
     fn it_works() {
         let bam_path = "/data/longreads_basic_pipe/PARACHINI/diag/PARACHINI_diag_hs1.bam";
         let bam_path = "/data/longreads_basic_pipe/PARACHINI/diag/PARACHINI_diag_hs1.bam";
@@ -114,11 +116,16 @@ mod tests {
 
 
     #[test_log::test]
     #[test_log::test]
     fn deep_variant() -> anyhow::Result<()> {
     fn deep_variant() -> anyhow::Result<()> {
+        // let config = DeepVariantConfig {
+        //     result_dir: "/data/test".to_string(),
+        //     ..DeepVariantConfig::default()
+        // };
+        // DeepVariant::new("test_a", "diag", "/data/test_data/subset.bam", config).run()
         let config = DeepVariantConfig {
         let config = DeepVariantConfig {
             result_dir: "/data/test".to_string(),
             result_dir: "/data/test".to_string(),
             ..DeepVariantConfig::default()
             ..DeepVariantConfig::default()
         };
         };
-        DeepVariant::new("test_a", "diag", "/data/test_data/subset.bam", config).run()
+        DeepVariant::new("LEVASSEUR", "mrd", "/data/longreads_basic_pipe/LEVASSEUR/mrd/LEVASSEUR_mrd_hs1.bam", config).run()
     }
     }
 
 
     #[test_log::test]
     #[test_log::test]
@@ -132,17 +139,20 @@ mod tests {
 
 
     #[test_log::test]
     #[test_log::test]
     fn nanomonsv() -> anyhow::Result<()> {
     fn nanomonsv() -> anyhow::Result<()> {
-        let config = NanomonSVConfig {
-            result_dir: "/data/test".to_string(),
-            ..NanomonSVConfig::default()
-        };
-        NanomonSV::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
+        // let config = NanomonSVConfig {
+        //     result_dir: "/data/test".to_string(),
+        //     ..NanomonSVConfig::default()
+        // };
+        // NanomonSV::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
+
+        let bam = |id:&str, time_point: &str| format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam");
+        NanomonSV::new("CAZIER", &bam("CAZIER", "diag"), &bam("CAZIER", "mrd"), NanomonSVConfig::default()).run()
     }
     }
 
 
     #[test]
     #[test]
     fn nanomonsv_solo() -> anyhow::Result<()> {
     fn nanomonsv_solo() -> anyhow::Result<()> {
         init();
         init();
-        let id = "GILLOUX";
+        let id = "MONVILLE";
         let time_point = "diag";
         let time_point = "diag";
         NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
         NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
         // let time_point = "mrd";
         // let time_point = "mrd";
@@ -157,7 +167,8 @@ mod tests {
     #[test]
     #[test]
     fn todo_all() -> anyhow::Result<()> {
     fn todo_all() -> anyhow::Result<()> {
         init();
         init();
-        let config = CollectionsConfig::default();
+        // let config = CollectionsConfig::default();
+        let config = CollectionsConfig { pod_dir: "/data/store".to_string(), ..Default::default() };
         info!("Runing todo with config: {:#?}", config);
         info!("Runing todo with config: {:#?}", config);
         let mut collections = Collections::new(config)?;
         let mut collections = Collections::new(config)?;
         collections.todo()?;
         collections.todo()?;
@@ -196,7 +207,10 @@ mod tests {
     // export RUST_LOG="debug"
     // export RUST_LOG="debug"
     #[test_log::test]
     #[test_log::test]
     fn run_t() -> anyhow::Result<()> {
     fn run_t() -> anyhow::Result<()> {
-        run_tasks(CollectionsConfig::default())
+        // let config = CollectionsConfig::default();
+        let config = CollectionsConfig { pod_dir: "/data/store".to_string(), ..Default::default() };
+
+        run_tasks(config)
     }
     }
 
 
     #[test_log::test]
     #[test_log::test]
@@ -261,7 +275,9 @@ mod tests {
         let collections = Collections::new(
         let collections = Collections::new(
             CollectionsConfig::default()
             CollectionsConfig::default()
         )?;
         )?;
-        collections.todo_deepvariants().iter().for_each(|t| info!("{t}"));
+        let tasks = collections.todo_deepvariants();
+        tasks.iter().for_each(|t| info!("{t}"));
+        info!("n tasks {}", tasks.len());
         Ok(())
         Ok(())
     }
     }
 
 
@@ -275,6 +291,11 @@ mod tests {
         Ok(())
         Ok(())
     }
     }
 
 
+    #[test]
+    fn run_assemblers() -> anyhow::Result<()> {
+        Assembler::new("CAMEL".to_string(), "diag".to_string(), AssemblerConfig::default()).run()
+    }
+
     #[test]
     #[test]
     fn run_dmr_par() -> anyhow::Result<()> {
     fn run_dmr_par() -> anyhow::Result<()> {
         init();
         init();
@@ -311,4 +332,44 @@ mod tests {
         });
         });
         Ok(())
         Ok(())
     }
     }
+
+    #[test]
+    fn run_severus() -> anyhow::Result<()> {
+        init();
+        let id = "CUNY";
+        let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1_hp.bam");
+        let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1_hp.bam");
+        let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/CUNY_diag_clairs_PASSED.vcf.gz");
+        Severus::new(id, &diag_bam, &mrd_bam, &vcf, SeverusConfig::default()).run()
+    }
+
+    #[test]
+    fn run_multi_deepvariant() -> anyhow::Result<()> {
+        init();
+        let mut collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.run_deepvariant()
+    }
+
+    #[test]
+    fn run_clairs() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.run_clairs()
+    }
+
+    #[test]
+    fn run_longphase() -> anyhow::Result<()> {
+        init();
+        let id = "CUNY";
+        let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
+        let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/clair3_normal_tumoral_germline_output.vcf.gz");
+        let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1.bam");
+
+        LongphaseHap::new(id, &diag_bam, &vcf, LongphaseConfig::default()).run()?;
+        LongphaseHap::new(id, &mrd_bam, &vcf, LongphaseConfig::default()).run()
+    }
 }
 }

+ 13 - 16
src/runners.rs

@@ -63,7 +63,7 @@ pub trait Log {
 #[derive(Debug, Default)]
 #[derive(Debug, Default)]
 pub struct DockerRun {
 pub struct DockerRun {
     pub args: Vec<String>,
     pub args: Vec<String>,
-    pub container_id: Arc<Mutex<Option<String>>>,
+    pub container_id: String,
     pub start: DateTime<Utc>,
     pub start: DateTime<Utc>,
     pub logs: Arc<Mutex<String>>,
     pub logs: Arc<Mutex<String>>,
 }
 }
@@ -83,8 +83,8 @@ impl Run for DockerRun {
         // Setup Ctrl-C handler cant be defined two times
         // Setup Ctrl-C handler cant be defined two times
         let _ = ctrlc::try_set_handler(move || {
         let _ = ctrlc::try_set_handler(move || {
             if let Ok(container_id) = DOCKER_ID.lock() {
             if let Ok(container_id) = DOCKER_ID.lock() {
-                if let Some(ref id) = *container_id {
-                    warn!("Stopping Docker container...");
+                for id in container_id.iter() {
+                    warn!("Stopping Docker container {id}...");
                     let _ = Command::new("docker").args(["stop", id]).status();
                     let _ = Command::new("docker").args(["stop", id]).status();
                 }
                 }
             }
             }
@@ -102,8 +102,8 @@ impl Run for DockerRun {
         // add id to Arc
         // add id to Arc
         let id = String::from_utf8_lossy(&output.stdout).trim().to_string();
         let id = String::from_utf8_lossy(&output.stdout).trim().to_string();
         {
         {
-            let mut container_id_lock = DOCKER_ID.lock().unwrap();
-            *container_id_lock = Some(id.clone());
+            DOCKER_ID.lock().unwrap().push(id.clone());
+            self.container_id = id.clone();
         }
         }
 
 
         // Spawn a thread to follow the logs
         // Spawn a thread to follow the logs
@@ -145,19 +145,16 @@ impl Run for DockerRun {
 
 
 impl Wait for DockerRun {
 impl Wait for DockerRun {
     fn wait(&mut self) -> anyhow::Result<()> {
     fn wait(&mut self) -> anyhow::Result<()> {
-        if let Ok(container_id) = DOCKER_ID.lock() {
-            if let Some(ref id) = *container_id {
-                let status = Command::new("docker")
-                    .args(["wait", id])
-                    .status()
-                    .expect("Failed to wait on Docker container");
-                if !status.success() {
-                    warn!("Docker command failed with status: {}", status);
-                }
-            }
+        let status = Command::new("docker")
+            .args(["wait", &self.container_id])
+            .status()
+            .expect("Failed to wait on Docker container");
+        if !status.success() {
+            warn!("Docker command failed with status: {}", status);
         }
         }
+
         let mut container_id_lock = DOCKER_ID.lock().unwrap();
         let mut container_id_lock = DOCKER_ID.lock().unwrap();
-        *container_id_lock = None;
+        container_id_lock.retain(|entry| *entry != self.container_id);
 
 
         Ok(())
         Ok(())
     }
     }