Browse Source

up dorado 0.9.0: added sequencing kit name for trim

Thomas 1 year ago
parent
commit
14c829fadf

+ 16 - 4
Cargo.lock

@@ -3142,6 +3142,18 @@ dependencies = [
  "flate2",
 ]
 
+[[package]]
+name = "noodles-bgzf"
+version = "0.34.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e624384981e5847bfd6a026f157c45d687187c30ee21b8c435310267c7aa7ab"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "crossbeam-channel",
+ "flate2",
+]
+
 [[package]]
 name = "noodles-core"
 version = "0.15.0"
@@ -3179,15 +3191,15 @@ dependencies = [
 
 [[package]]
 name = "noodles-csi"
-version = "0.40.0"
+version = "0.41.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "99fdf34a300b37bc15160afcbec76487750bf6e3d9a6ff69c8e4fd635bc66745"
+checksum = "199113fe53fef2d79b0a9f670d1cad524b4ddcefdc1629dc69f0eb2707212c9e"
 dependencies = [
  "bit-vec 0.8.0",
  "bstr",
  "byteorder",
  "indexmap",
- "noodles-bgzf 0.33.0",
+ "noodles-bgzf 0.34.0",
  "noodles-core",
 ]
 
@@ -3640,7 +3652,7 @@ dependencies = [
  "log",
  "logtest",
  "nix 0.29.0",
- "noodles-csi 0.40.0",
+ "noodles-csi 0.41.0",
  "num-format",
  "pandora_lib_assembler",
  "pandora_lib_bindings",

+ 2 - 2
Cargo.toml

@@ -21,7 +21,7 @@ tracing-test = "0.2.5"
 tracing = "0.1.40"
 logtest = "2.0.0"
 test-log = "0.2.16"
-noodles-csi = "0.40.0"
+noodles-csi = "0.41.0"
 num-format = "0.4.4"
 locale_config = "0.3.0"
 byte-unit = "5.1.4"
@@ -36,7 +36,6 @@ hashbrown = { version = "0.15.0", features = ["rayon"] }
 ctrlc = "3.4.4"
 lazy_static = "1.5.0"
 indicatif = "0.17.8"
-tempfile = "3.12.0"
 tokio = "1.41.1"
 features = "0.10.0"
 full = "0.3.0"
@@ -44,3 +43,4 @@ rust-htslib = "0.49.0"
 podders = "0.1.4"
 arrow = "53.3.0"
 bgzip = "0.3.1"
+tempfile = "3.14.0"

+ 120 - 125
src/callers/clairs.rs

@@ -1,92 +1,72 @@
 use crate::{
-    commands::{
-        bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
-        longphase::{LongphaseConfig, LongphaseHap, LongphasePhase},
-    },
-    runners::{run_wait, DockerRun},
+    collection::{vcf::Vcf, Initialize}, commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{force_or_not, get_temp_file_path}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{variant::Variants, variant_collection::VariantCollection}
 };
-use anyhow::Context;
-use pandora_lib_scan::Config;
+use anyhow::{Context, Ok};
 use std::{fs, path::Path};
 use tracing::info;
 
 #[derive(Debug, Clone)]
-pub struct ClairSConfig {
-    pub result_dir: String,
-    pub reference: String,
-    pub threads: u8,
-    pub platform: String,
-    pub force: bool,
-}
-
-impl Default for ClairSConfig {
-    fn default() -> Self {
-        Self {
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            threads: 155,
-            platform: "ont_r10_dorado_sup_5khz_ssrs".to_string(),
-            force: true,
-        }
-    }
-}
-
-#[derive(Debug)]
 pub struct ClairS {
     pub id: String,
     pub output_dir: String,
     pub output_vcf: String,
-    pub output_indel: String,
+    pub output_indels_vcf: String,
     pub vcf_passed: String,
-    pub indel_vcf_passed: String,
     pub diag_bam: String,
     pub mrd_bam: String,
-    pub config: ClairSConfig,
-    pub log: String,
     pub log_dir: String,
+    pub config: Config,
+    pub clair3_germline_normal: String,
+    pub clair3_germline_tumor: String,
+    pub clair3_germline_passed: String,
 }
 
-impl ClairS {
-    pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: ClairSConfig) -> Self {
-        let output_dir = format!("{}/{}/diag/ClairS", config.result_dir, id);
-        let output_vcf = format!("{output_dir}/output.vcf.gz");
-        let output_indel = format!("{output_dir}/indel.vcf.gz");
-        let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
-        let indel_vcf_passed = format!("{output_dir}/{id}_diag_clairs_indel_PASSED.vcf.gz");
-
-        let log_dir = format!("{}/{}/log/ClairS", config.result_dir, id);
-        Self {
-            id: id.to_string(),
+impl Initialize for ClairS {
+    fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
+        let id = id.to_string();
+        let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
+
+        if !Path::new(&log_dir).exists() {
+            fs::create_dir_all(&log_dir)
+                .context(format!("Failed  to create {log_dir} directory"))?;
+        }
+
+        let output_dir = config.clairs_output_dir(&id);
+        fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?;
+
+        let (output_vcf, output_indels_vcf) = config.clairs_output_vcfs(&id);
+        let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz");
+
+        let diag_bam = config.tumoral_bam(&id);
+        let mrd_bam = config.normal_bam(&id);
+
+        let clair3_germline_normal = config.clairs_germline_normal_vcf(&id);
+        let clair3_germline_tumor = config.clairs_germline_tumor_vcf(&id);
+        let clair3_germline_passed = config.clairs_germline_passed_vcf(&id);
+
+        Ok(Self {
+            id,
             output_dir,
             output_vcf,
+            output_indels_vcf,
             vcf_passed,
-            diag_bam: diag_bam.to_string(),
-            mrd_bam: mrd_bam.to_string(),
-            config,
-            output_indel,
-            log: String::default(),
-            indel_vcf_passed,
+            diag_bam,
+            mrd_bam,
             log_dir,
-        }
+            config,
+            clair3_germline_normal,
+            clair3_germline_tumor,
+            clair3_germline_passed,
+        })
     }
+}
 
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        if self.config.force && Path::new(&self.output_vcf).exists() {
-            fs::remove_dir_all(&self.output_dir)?;
-        }
-
-        // Create out dir
-        if !Path::new(&self.output_dir).exists() {
-            fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
-        } else {
-            info!("ClairS output directory exists");
-        }
-        if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
-        }
+impl Run for ClairS {
+    fn run(&mut self) -> anyhow::Result<()> {
+        force_or_not(&self.vcf_passed, self.config.clairs_force)?;
 
         // Run Docker command if output VCF doesn't exist
-        if !Path::new(&self.output_vcf).exists() {
+        if !Path::new(&self.output_vcf).exists() || !Path::new(&self.output_indels_vcf).exists() {
             let mut docker_run = DockerRun::new(&[
                 "run",
                 "-d",
@@ -103,9 +83,9 @@ impl ClairS {
                 "-R",
                 &self.config.reference,
                 "-t",
-                &self.config.threads.to_string(),
+                &self.config.clairs_threads.to_string(),
                 "-p",
-                &self.config.platform,
+                &self.config.clairs_platform,
                 "--enable_indel_calling",
                 "--include_all_ctgs",
                 "--print_germline_calls",
@@ -125,87 +105,102 @@ impl ClairS {
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
         } else {
-            info!("ClairS output vcf already exists");
+            info!("ClairS vcfs already exist");
         }
 
-        let germline_normal = format!("{}/clair3_normal_germline_output.vcf.gz", self.output_dir);
-        let germline_tumor = format!("{}/clair3_tumor_germline_output.vcf.gz", self.output_dir);
-        let germline_normal_tumor = format!(
-            "{}/clair3_normal_tumoral_germline_output.vcf.gz",
-            self.output_dir
-        );
-        let report = bcftools_concat(
-            vec![germline_normal, germline_tumor],
-            &germline_normal_tumor,
-            BcftoolsConfig::default(),
-        )
-        .context(format!(
-            "Error while running bcftools concat germlines for {}",
-            &self.output_vcf
-        ))?;
-        let log_file = format!("{}/bcftools_concat_germline", self.log_dir);
-        report
-            .save_to_file(&log_file)
-            .context(format!("Error while writing logs into {log_file}"))?;
-
-        // Keep PASS
-        if !Path::new(&self.vcf_passed).exists() {
+        // Germline
+        if !Path::new(&self.clair3_germline_passed).exists() {
+            let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
+            let report = bcftools_concat(
+                vec![
+                    self.clair3_germline_tumor.to_string(),
+                    self.clair3_germline_normal.to_string(),
+                ],
+                &tmp_file,
+                BcftoolsConfig::default(),
+            )
+            .context(format!(
+                "Error while running bcftools concat for {} and {}",
+                &self.output_vcf, &self.output_indels_vcf
+            ))?;
+
+            let log_file = format!("{}/bcftools_concat_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+
             let report = bcftools_keep_pass(
-                &self.output_vcf,
-                &self.vcf_passed,
+                &tmp_file,
+                &self.clair3_germline_passed,
                 BcftoolsConfig::default(),
             )
             .context(format!(
                 "Error while running bcftools keep PASS for {}",
-                &self.output_vcf
+                &self.clair3_germline_passed
             ))?;
-            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            let log_file = format!("{}/bcftools_pass_", self.log_dir);
             report
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
+
+            fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
         }
 
-        if !Path::new(&self.indel_vcf_passed).exists() {
-            let report = bcftools_keep_pass(
-                &self.output_indel,
-                &self.indel_vcf_passed,
+        if !Path::new(&self.vcf_passed).exists() {
+            // Concat output and indels
+            let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
+            let report = bcftools_concat(
+                vec![
+                    self.output_vcf.to_string(),
+                    self.output_indels_vcf.to_string(),
+                ],
+                &tmp_file,
                 BcftoolsConfig::default(),
             )
             .context(format!(
-                "Error while running bcftools keep PASS for {}",
-                &self.output_indel
+                "Error while running bcftools concat for {} and {}",
+                &self.output_vcf, &self.output_indels_vcf
             ))?;
 
-            let log_file = format!("{}/bcftools_pass", self.log_dir);
+            let log_file = format!("{}/bcftools_concat_", self.log_dir);
             report
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
+
+            let report = bcftools_keep_pass(&tmp_file, &self.vcf_passed, BcftoolsConfig::default())
+                .context(format!(
+                    "Error while running bcftools keep PASS for {}",
+                    &self.output_vcf
+                ))?;
+            let log_file = format!("{}/bcftools_pass_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+
+            fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
+        } else {
+            info!("ClairS PASSED vcf already exist");
         }
 
-        // let bam = Path::new(&self.diag_bam);
-        // let new_fn = format!("{}_hp.bam", bam.file_stem().unwrap().to_str().unwrap());
-        // let bam_hp = bam.with_file_name(new_fn);
-        // LongphasePhase::new(
-        //     &self.id,
-        //     bam.to_str().unwrap(),
-        //     &germline_normal_tumor,
-        //     LongphaseConfig::default(),
-        // )?
-        // .run()?;
-        // LongphaseHap::new(
-        //     &self.id,
-        //     &self.diag_bam,
-        //     &format!("{}/clair3_normal_tumoral_germline_output_PS.vcf.gz", self.output_dir),
-        //     LongphaseConfig::default(),
-        // )
-        //     .run()?;
-        // LongphaseHap::new(
-        //     &self.id,
-        //     &self.mrd_bam,
-        //     &format!("{}/clair3_normal_tumoral_germline_output_PS.vcf.gz", self.output_dir),
-        //     LongphaseConfig::default(),
-        // )
-        //     .run()?;
         Ok(())
     }
 }
+
+impl Variants for ClairS {
+    fn variants(&self) -> anyhow::Result<VariantCollection> {
+        Ok(VariantCollection {
+            variants: read_vcf(&self.vcf_passed)?,
+            vcf: Vcf::new(self.vcf_passed.clone().into())?,
+        })
+    }
+}
+
+impl ClairS {
+    pub fn germline(&self) -> anyhow::Result<VariantCollection> {
+        Ok(VariantCollection {
+            variants: read_vcf(&self.clair3_germline_passed)?,
+            vcf: Vcf::new(self.clair3_germline_passed.clone().into())?,
+        })
+    }
+
+}

+ 12 - 143
src/callers/deep_variant.rs

@@ -3,41 +3,16 @@ use log::info;
 use std::{fs, path::Path};
 
 use crate::{
-    collection::InitializeSolo,
+    collection::{vcf::Vcf, InitializeSolo},
     commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
     config::Config,
     helpers::{force_or_not, path_prefix},
     io::vcf::read_vcf,
     runners::{run_wait, DockerRun, Run},
-    variant::variant::{Annotation, Variants},
+    variant::{variant::Variants, variant_collection::VariantCollection},
 };
 
 #[derive(Debug, Clone)]
-pub struct DeepVariantConfig {
-    pub bin_version: String,
-    pub threads: u8,
-    pub model_type: String,
-    pub result_dir: String,
-    pub reference: String,
-    pub bcftools: String,
-    pub force: bool,
-}
-
-impl Default for DeepVariantConfig {
-    fn default() -> Self {
-        Self {
-            bin_version: "1.8.0".to_string(),
-            threads: 155,
-            model_type: "ONT_R104".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            bcftools: "/data/tools/bcftools-1.21/bcftools".to_string(),
-            force: true,
-        }
-    }
-}
-
-#[derive(Debug)]
 pub struct DeepVariant {
     pub id: String,
     pub time: String,
@@ -65,11 +40,12 @@ impl InitializeSolo for DeepVariant {
             anyhow::bail!("Bam files doesn't exists: {bam}")
         }
 
-        let output_vcf = config.deepvariant_output_vcf(&id, &time);
         let output_dir = config.deepvariant_output_dir(&id, &time);
-        let vcf_passed = format!("{}_PASSED.vcf.gz", path_prefix(&output_vcf)?);
         fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?;
 
+        let output_vcf = config.deepvariant_output_vcf(&id, &time);
+        let vcf_passed = format!("{}_PASSED.vcf.gz", path_prefix(&output_vcf)?);
+
         Ok(Self {
             id,
             time,
@@ -86,13 +62,7 @@ impl InitializeSolo for DeepVariant {
 impl Run for DeepVariant {
     fn run(&mut self) -> anyhow::Result<()> {
         force_or_not(&self.vcf_passed, self.config.deepvariant_force)?;
-        if !Path::new(&self.output_dir).exists() {
-            fs::create_dir_all(&self.output_dir).context(format!(
-                "Failed to create output directory: {}",
-                self.output_dir
-            ))?;
-        }
-
+        
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
             let mut docker_run = DockerRun::new(&[
@@ -112,10 +82,7 @@ impl Run for DeepVariant {
                 "--output_vcf",
                 &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time),
                 "--output_gvcf",
-                &format!(
-                    "/output/{}_{}_DeepVariant.g.vcf.gz",
-                    self.id, self.time
-                ),
+                &format!("/output/{}_{}_DeepVariant.g.vcf.gz", self.id, self.time),
                 &format!("--num_shards={}", self.config.deepvariant_threads),
                 "--logging_dir",
                 "--vcf_stats_report=true",
@@ -151,109 +118,11 @@ impl Run for DeepVariant {
     }
 }
 
-// impl DeepVariant {
-//     pub fn new(id: &str, time_point: &str, bam: &str, config: DeepVariantConfig) -> Self {
-//         let output_dir = format!("{}/{}/{}/DeepVariant", config.result_dir, id, time_point);
-//         let output_vcf = format!("{output_dir}/{}_{}_DeepVariant.vcf.gz", id, time_point);
-//         let vcf_passed = format!(
-//             "{}/{}_{}_DeepVariant_PASSED.vcf.gz",
-//             output_dir, id, time_point
-//         );
-//         let log_dir = format!("{}/{}/log/DeepVariant", config.result_dir, id);
-//
-//         Self {
-//             id: id.to_string(),
-//             time_point: time_point.to_string(),
-//             bam: bam.to_string(),
-//             config,
-//             log: String::default(),
-//             output_dir,
-//             output_vcf,
-//             vcf_passed,
-//             log_dir,
-//         }
-//     }
-//
-//     pub fn run(&self) -> anyhow::Result<()> {
-//         if self.config.force && Path::new(&self.output_vcf).exists() {
-//             fs::remove_dir_all(&self.output_dir)?;
-//         }
-//         // Create out dir
-//         if !Path::new(&self.output_dir).exists() {
-//             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
-//         }
-//
-//         if !Path::new(&self.log_dir).exists() {
-//             fs::create_dir_all(&self.log_dir).expect("Failed to create log directory");
-//         }
-//
-//         // Run Docker command if output VCF doesn't exist
-//         if !Path::new(&self.output_vcf).exists() {
-//             let mut docker_run = DockerRun::new(&[
-//                 "run",
-//                 "-d",
-//                 "-v",
-//                 "/data:/data",
-//                 "-v",
-//                 &format!("{}:/output", self.output_dir),
-//                 &format!("google/deepvariant:{}", self.config.bin_version),
-//                 "/opt/deepvariant/bin/run_deepvariant",
-//                 &format!("--model_type={}", self.config.model_type),
-//                 "--ref",
-//                 &self.config.reference,
-//                 "--reads",
-//                 &self.bam,
-//                 "--output_vcf",
-//                 &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point),
-//                 "--output_gvcf",
-//                 &format!(
-//                     "/output/{}_{}_DeepVariant.g.vcf.gz",
-//                     self.id, self.time_point
-//                 ),
-//                 &format!("--num_shards={}", self.config.threads),
-//                 "--logging_dir",
-//                 "--vcf_stats_report=true",
-//                 &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
-//                 "--dry_run=false",
-//                 "--sample_name",
-//                 &format!("{}_{}", self.id, self.time_point),
-//             ]);
-//             let report = run_wait(&mut docker_run).context(format!(
-//                 "Erreur while running DeepVariant for {} {}",
-//                 self.id, self.time_point
-//             ))?;
-//             report
-//                 .save_to_file(&format!("{}/deepvariant_", self.log_dir))
-//                 .context("Can't save DeepVariant logs")?;
-//         }
-//
-//         // Keep PASS
-//         if !Path::new(&self.vcf_passed).exists() {
-//             info!("Filtering PASS variants");
-//             let report = bcftools_keep_pass(
-//                 &self.output_vcf,
-//                 &self.vcf_passed,
-//                 BcftoolsConfig::default(),
-//             )
-//             .unwrap();
-//             report
-//                 .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
-//                 .unwrap();
-//         }
-//         Ok(())
-//     }
-// }
-
-// VCF
 impl Variants for DeepVariant {
-    fn variants(&self) -> anyhow::Result<Vec<crate::variant::variant::Variant>> {
-        let mut annotations = vec![Annotation::Source("DeepVariant".to_string())];
-        match self.time.as_str() {
-            "diag" => annotations.push(Annotation::Diag),
-            "mrd" => annotations.push(Annotation::Constit),
-            _ => anyhow::bail!("Unrecognised time point {}", self.time),
-        }
-
-        read_vcf(&self.vcf_passed, &annotations)
+    fn variants(&self) -> anyhow::Result<VariantCollection> {
+        Ok(VariantCollection {
+            variants: read_vcf(&self.vcf_passed)?,
+            vcf: Vcf::new(self.vcf_passed.clone().into())?,
+        })
     }
 }

+ 19 - 34
src/collection/mod.rs

@@ -18,8 +18,8 @@ use pandora_lib_variants::variants::Variants;
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
     callers::{
-        clairs::{ClairS, ClairSConfig},
-        deep_variant::{DeepVariant, DeepVariantConfig},
+        clairs::ClairS,
+        deep_variant::DeepVariant,
         nanomonsv::{NanomonSV, NanomonSVConfig},
     },
     collection::pod5::FlowCellCase,
@@ -32,7 +32,8 @@ use crate::{
         assembler::{Assembler, AssemblerConfig},
         variants::{RunVariantsAgg, VariantsConfig},
         whole_scan::{WholeScan, WholeScanConfig},
-    }, runners::Run,
+    },
+    runners::Run,
 };
 
 pub mod bam;
@@ -295,7 +296,7 @@ impl Collections {
         // self.tasks.extend(self.todo_nanomonsv());
 
         // Variants aggregation
-        self.tasks.extend(self.todo_variants_agg()?);
+        // self.tasks.extend(self.todo_variants_agg()?);
 
         // ModPileup
         // self.tasks.extend(self.todo_mod_pileup());
@@ -367,7 +368,6 @@ impl Collections {
     }
 
     pub fn todo_deepvariants(&self) -> Vec<CollectionsTasks> {
-        let config = DeepVariantConfig::default();
         self.bam
             .bams
             .iter()
@@ -375,13 +375,12 @@ impl Collections {
                 if self.vcf.vcfs.iter().any(|v| {
                     v.caller == "DeepVariant"
                         && v.id == b.id
-                        && v.time_point == b.time_point
+                        && v.time == b.time_point
                         && v.modified().unwrap_or_default() > b.modified
                 }) {
                     None
                 } else {
                     Some(CollectionsTasks::DeepVariant {
-                        config: config.clone(),
                         id: b.id.clone(),
                         time_point: b.time_point.clone(),
                         bam: b.path.to_string_lossy().to_string(),
@@ -392,21 +391,19 @@ impl Collections {
     }
 
     pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
-        let config = ClairSConfig::default();
         self.bam_pairs()
             .iter()
             .filter_map(|(diag, mrd)| {
                 if self.vcf.vcfs.iter().any(|v| {
                     v.caller == "clairs"
                         && v.id == diag.id
-                        && v.time_point == diag.time_point
+                        && v.time == diag.time_point
                         && (v.modified().unwrap_or_default() > diag.modified
                             || v.modified().unwrap_or_default() > mrd.modified)
                 }) {
                     None
                 } else {
                     Some(CollectionsTasks::ClairS {
-                        config: config.clone(),
                         id: diag.id.clone(),
                         diag_bam: diag.path.to_string_lossy().to_string(),
                         mrd_bam: mrd.path.to_string_lossy().to_string(),
@@ -433,7 +430,7 @@ impl Collections {
                 if self.vcf.vcfs.iter().any(|v| {
                     v.caller == "nanomonsv"
                         && v.id == diag.id
-                        && v.time_point == diag.time_point
+                        && v.time == diag.time_point
                         && (v.modified().unwrap_or_default() > diag.modified
                             || v.modified().unwrap_or_default() > mrd.modified)
                 }) {
@@ -663,11 +660,6 @@ impl Collections {
         let n_tasks = tasks.len();
         warn!("{n_tasks} tasks to run");
 
-        let config = DeepVariantConfig {
-            threads: 85,
-            ..Default::default()
-        };
-
         for (i, tasks_chunk) in tasks.chunks_exact(2).enumerate() {
             match tasks_chunk {
                 [a, b] => {
@@ -685,7 +677,6 @@ impl Collections {
                             id: id.to_string(),
                             time_point: time_point.to_string(),
                             bam: bam.to_string(),
-                            config: config.clone(),
                         }
                     } else {
                         anyhow::bail!("Err")
@@ -702,7 +693,6 @@ impl Collections {
                             id: id.to_string(),
                             time_point: time_point.to_string(),
                             bam: bam.to_string(),
-                            config: config.clone(),
                         }
                     } else {
                         anyhow::bail!("Err");
@@ -752,13 +742,11 @@ pub enum CollectionsTasks {
         id: String,
         time_point: String,
         bam: String,
-        config: DeepVariantConfig,
     },
     ClairS {
         id: String,
         diag_bam: String,
         mrd_bam: String,
-        config: ClairSConfig,
     },
     NanomonSV {
         id: String,
@@ -782,17 +770,12 @@ impl CollectionsTasks {
                 BasecallAlign::from_mux(cases, Config::default())
             }
             CollectionsTasks::ModPileup { bam, config } => bed_methyl(bam, &config),
-            CollectionsTasks::DeepVariant {
-                id,
-                time_point,
-                ..
-            } => DeepVariant::initialize(&id, &time_point, Config::default())?.run(),
-            CollectionsTasks::ClairS {
-                id,
-                diag_bam,
-                mrd_bam,
-                config,
-            } => ClairS::new(&id, &diag_bam, &mrd_bam, config).run(),
+            CollectionsTasks::DeepVariant { id, time_point, .. } => {
+                DeepVariant::initialize(&id, &time_point, Config::default())?.run()
+            }
+            CollectionsTasks::ClairS { id, .. } => {
+                ClairS::initialize(&id, Config::default())?.run()
+            }
             CollectionsTasks::NanomonSV {
                 id,
                 diag_bam,
@@ -805,7 +788,9 @@ impl CollectionsTasks {
                 bam,
                 config,
             } => WholeScan::new(id, time_point, bam, config)?.run(),
-            CollectionsTasks::SomaticVariants { id, config } => RunVariantsAgg::new(id, config).run(),
+            CollectionsTasks::SomaticVariants { id, config } => {
+                RunVariantsAgg::new(id, config).run()
+            }
             CollectionsTasks::Assemble {
                 id,
                 time_point,
@@ -839,8 +824,8 @@ impl fmt::Display for CollectionsTasks {
         match self {
             Align(case) => write!(
                 f,
-                "Alignment task for: {} {} {}",
-                case.id, case.time_point, case.barcode
+                "Alignment task for: {} {} {} {}",
+                case.id, case.time_point, case.barcode, case.pod_dir.display()
             ),
             DemuxAlign(cases) => write!(
                 f,

+ 61 - 61
src/collection/variants.rs

@@ -1,64 +1,64 @@
-use std::{fs::Metadata, path::PathBuf};
+// use std::{fs::Metadata, path::PathBuf};
 
-use anyhow::Context;
-use glob::glob;
-use log::warn;
-use rayon::prelude::*;
+// use anyhow::Context;
+// use glob::glob;
+// use log::warn;
+// use rayon::prelude::*;
 
-pub struct VariantsCollection {
-    pub data: Vec<VariantsCase>,
-}
+// pub struct VariantsCollection {
+//     pub data: Vec<VariantsCase>,
+// }
+//
+// #[derive(Debug)]
+// pub struct VariantsCase {
+//     pub path: PathBuf,
+//     pub id: String,
+//     pub file_metadata: Metadata,
+// }
+//
+// impl VariantsCase {
+//     pub fn new(path: PathBuf) -> anyhow::Result<Self> {
+//         let id = path
+//             .ancestors()
+//             .nth(2)
+//             .and_then(|p| p.file_name())
+//             .and_then(|name| name.to_str())
+//             .map(String::from)
+//             .context(format!(
+//                 "Invalid path structure: unable to extract ID for {}",
+//                 path.display()
+//             ))?;
+//
+//         let file_metadata = path.metadata().context(format!(
+//             "Failed to read file metadata for {}",
+//             path.display()
+//         ))?;
+//
+//         Ok(VariantsCase {
+//             path,
+//             id,
+//             file_metadata,
+//         })
+//     }
+// }
 
-#[derive(Debug)]
-pub struct VariantsCase {
-    pub path: PathBuf,
-    pub id: String,
-    pub file_metadata: Metadata,
-}
-
-impl VariantsCase {
-    pub fn new(path: PathBuf) -> anyhow::Result<Self> {
-        let id = path
-            .ancestors()
-            .nth(2)
-            .and_then(|p| p.file_name())
-            .and_then(|name| name.to_str())
-            .map(String::from)
-            .context(format!(
-                "Invalid path structure: unable to extract ID for {}",
-                path.display()
-            ))?;
-
-        let file_metadata = path.metadata().context(format!(
-            "Failed to read file metadata for {}",
-            path.display()
-        ))?;
-
-        Ok(VariantsCase {
-            path,
-            id,
-            file_metadata,
-        })
-    }
-}
-
-impl VariantsCollection {
-    pub fn new(result_dir: &str) -> anyhow::Result<Self> {
-        let pattern = format!("{}/*/*/*_variants.bytes.gz", result_dir);
-        let data = glob(&pattern)
-            .expect("Failed to read glob pattern")
-            .par_bridge()
-            .filter_map(|entry| {
-                match entry {
-                    Ok(path) => match VariantsCase::new(path) {
-                        Ok(vc) => return Some(vc),
-                        Err(err) => warn!("{err}"),
-                    },
-                    Err(e) => warn!("Error: {:?}", e),
-                }
-                None
-            })
-            .collect();
-        Ok(VariantsCollection { data })
-    }
-}
+// impl VariantsCollection {
+//     pub fn new(result_dir: &str) -> anyhow::Result<Self> {
+//         let pattern = format!("{}/*/*/*_variants.bytes.gz", result_dir);
+//         let data = glob(&pattern)
+//             .expect("Failed to read glob pattern")
+//             .par_bridge()
+//             .filter_map(|entry| {
+//                 match entry {
+//                     Ok(path) => match VariantsCase::new(path) {
+//                         Ok(vc) => return Some(vc),
+//                         Err(err) => warn!("{err}"),
+//                     },
+//                     Err(e) => warn!("Error: {:?}", e),
+//                 }
+//                 None
+//             })
+//             .collect();
+//         Ok(VariantsCollection { data })
+//     }
+// }

+ 11 - 7
src/collection/vcf.rs

@@ -8,11 +8,13 @@ use std::{collections::HashMap, fs::Metadata, os::unix::fs::MetadataExt, path::P
 use noodles_csi as csi;
 use num_format::{Locale, ToFormattedString};
 
+use crate::commands::bcftools::{bcftools_index, BcftoolsConfig};
+
 #[derive(Debug, Clone)]
 pub struct Vcf {
     pub id: String,
     pub caller: String,
-    pub time_point: String,
+    pub time: String,
     pub path: PathBuf,
     pub file_metadata: Metadata,
     pub n_variants: u64,
@@ -27,20 +29,22 @@ impl Vcf {
             .to_string();
         let stem_splt: Vec<&str> = stem.split('_').collect();
         let id = stem_splt[0].to_string();
-        let time_point = stem_splt[1].to_string();
+        let time = stem_splt[1].to_string();
         let caller = stem_splt[2..stem_splt.len() - 1].join("_");
 
         if !PathBuf::from(format!("{}.csi", path.display())).exists() {
-            return Err(anyhow!("No csi for {}", path.display()));
+            bcftools_index(&path.display().to_string(), &BcftoolsConfig::default())?;
         }
 
         let n_variants = n_variants(path.to_str().context("Can't convert path to str")?)?;
-        let file_metadata = path.metadata()?;
+        let file_metadata = path
+            .metadata()
+            .context(format!("Can't access metadata for {}", path.display()))?;
 
         Ok(Self {
             id,
             caller,
-            time_point,
+            time,
             path,
             file_metadata,
             n_variants,
@@ -58,7 +62,7 @@ impl Vcf {
     pub fn tsv(&self) -> anyhow::Result<String> {
         Ok([
             self.id.clone(),
-            self.time_point.clone(),
+            self.time.clone(),
             self.caller.clone(),
             self.n_variants.to_string(),
             self.modified()?.to_string(),
@@ -76,7 +80,7 @@ impl Vcf {
             "{}",
             [
                 self.id.to_string(),
-                self.time_point.to_string(),
+                self.time.to_string(),
                 self.caller.to_string(),
                 formated_n_variants,
                 formated_modified,

+ 10 - 0
src/commands/bcftools.rs

@@ -1,4 +1,5 @@
 use anyhow::Context;
+use log::info;
 use std::{fs, path::Path};
 use uuid::Uuid;
 
@@ -76,6 +77,7 @@ pub fn bcftools_concat(
     output: &str,
     config: BcftoolsConfig,
 ) -> anyhow::Result<RunReport> {
+    info!("Concatening vcf with bcftools: {}", inputs.join(", "));
     let tmp_file = format!("/tmp/{}", Uuid::new_v4());
     fs::write(&tmp_file, inputs.join("\n"))?;
 
@@ -111,3 +113,11 @@ pub fn bcftools_keep_only_in_a(
         .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
     Ok(())
 }
+
+pub fn bcftools_index(vcf: &str, config: &BcftoolsConfig) -> anyhow::Result<()> {
+    let args = ["index", "--threads", &config.threads.to_string(), vcf];
+    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    let _ = run_wait(&mut cmd_run)
+        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
+    Ok(())
+}

+ 2 - 2
src/commands/dorado.rs

@@ -328,8 +328,8 @@ impl Dorado {
                 barcode
             );
             let pipe = format!(
-                "{} trim {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
-                config.align.dorado_bin, &config.align.samtools_view_threads
+                "{} trim --sequencing-kit {} {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
+                config.align.dorado_bin, config.align.dorado_sequencing_kit, &config.align.samtools_view_threads
             );
 
             info!("Running: {pipe}");

+ 57 - 2
src/config.rs

@@ -34,10 +34,20 @@ pub struct Config {
     pub deepvariant_bin_version: String,
     pub deepvariant_model_type: String,
     pub deepvariant_force: bool,
+    pub clairs_threads: u8,
+    pub clairs_force: bool,
+    pub clairs_platform: String,
+    pub clairs_output_dir: String,
+    pub mask_bed: String,
 }
 
+// Here comes names that can't be changed from output of tools
 lazy_static! {
     static ref DEEPVARIANT_OUTPUT_NAME: &'static str = "{id}_{time}_DeepVariant.vcf.gz";
+    static ref CLAIRS_OUTPUT_NAME: &'static str = "output.vcf.gz";
+    static ref CLAIRS_OUTPUT_INDELS_NAME: &'static str = "indel.vcf.gz";
+    static ref CLAIRS_GERMLINE_NORMAL: &'static str = "clair3_normal_germline_output.vcf.gz";
+    static ref CLAIRS_GERMLINE_TUMOR: &'static str = "clair3_tumor_germline_output.vcf.gz";
 }
 
 impl Default for Config {
@@ -50,12 +60,16 @@ impl Default for Config {
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             reference_name: "hs1".to_string(),
 
+
             // File structure
             result_dir: "/data/longreads_basic_pipe".to_string(),
             tumoral_name: "diag".to_string(),
             normal_name: "mrd".to_string(),
             haplotagged_bam_tag_name: "hp".to_string(),
 
+            // 
+            mask_bed: "{result_dir}/{id}/diag/mask.bed".to_string(),
+
             germline_phased_vcf:
                 "{result_dir}/{id}/diag/ClairS/clair3_normal_tumoral_germline_output_PS.vcf"
                     .to_string(),
@@ -68,12 +82,18 @@ impl Default for Config {
             deepvariant_model_type: "ONT_R104".to_string(),
             deepvariant_force: false,
 
+            // ClairS
+            clairs_output_dir: "{result_dir}/{id}/diag/ClairS".to_string(),
+            clairs_threads: 155,
+            clairs_platform: "ont_r10_dorado_sup_5khz_ssrs".to_string(),
+            clairs_force: false,
+
             // Savana
             savana_bin: "savana".to_string(),
             savana_threads: 150,
             savana_output_dir: "{result_dir}/{id}/diag/savana".to_string(),
             savana_passed_vcf: "{output_dir}/{id}_diag_savana_PASSED.vcf".to_string(),
-            savana_force: true,
+            savana_force: false,
 
             // Severus
             severus_bin: "/data/tools/Severus/severus.py".to_string(),
@@ -82,7 +102,7 @@ impl Default for Config {
             severus_pon: "/data/ref/hs1/PoN_1000G_chm13.tsv.gz".to_string(),
             severus_output_dir: "{result_dir}/{id}/diag/severus".to_string(),
             severus_solo_output_dir: "{result_dir}/{id}/{time}/severus".to_string(),
-            severus_force: true,
+            severus_force: false,
 
             // Longphase
             longphase_bin: "/data/tools/longphase_linux-x64".to_string(),
@@ -104,6 +124,7 @@ impl Default for Config {
 pub struct AlignConfig {
     pub dorado_bin: String,
     pub dorado_basecall_arg: String,
+    pub dorado_sequencing_kit: String,
     pub ref_fa: String,
     pub ref_mmi: String,
     pub samtools_view_threads: u16,
@@ -115,6 +136,7 @@ impl Default for AlignConfig {
         Self {
             dorado_bin: "/data/tools/dorado-0.9.0-linux-x64/bin/dorado".to_string(),
             dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(), // since v0.8.0 need
+            dorado_sequencing_kit: "SQK-LSK114".to_string(),
             // to specify cuda devices (exclude the T1000)
             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
@@ -189,6 +211,12 @@ impl Config {
         )
     }
 
+    pub fn mask_bed(&self, id: &str) -> String {
+        self.mask_bed
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+    }
+
     pub fn germline_phased_vcf(&self, id: &str) -> String {
         self.germline_phased_vcf
             .replace("{result_dir}", &self.result_dir)
@@ -213,6 +241,33 @@ impl Config {
         .replace("{time}", time)
     }
 
+    // ClairS
+    pub fn clairs_output_dir(&self, id: &str) -> String {
+        self.clairs_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+    }
+
+    pub fn clairs_output_vcfs(&self, id: &str) -> (String, String) {
+        let dir = self.clairs_output_dir(id);
+        (format!("{dir}/{}", *CLAIRS_OUTPUT_NAME), format!("{dir}/{}", *CLAIRS_OUTPUT_INDELS_NAME))
+    }
+
+    pub fn clairs_germline_normal_vcf(&self, id: &str) -> String {
+        let dir = self.clairs_output_dir(id);
+        format!("{dir}/{}", *CLAIRS_GERMLINE_NORMAL)
+    }
+
+    pub fn clairs_germline_tumor_vcf(&self, id: &str) -> String {
+        let dir = self.clairs_output_dir(id);
+        format!("{dir}/{}", *CLAIRS_GERMLINE_TUMOR)
+    }
+
+    pub fn clairs_germline_passed_vcf(&self, id: &str) -> String {
+        let dir = self.clairs_output_dir(id);
+        format!("{dir}/{id}_diag_clair3-germline_PASSED.vcf.gz")
+    }
+
     // Savana
     pub fn savana_output_dir(&self, id: &str) -> String {
         self.savana_output_dir

+ 100 - 6
src/helpers.rs

@@ -1,5 +1,6 @@
 use anyhow::Context;
-use std::{fs, path::Path};
+use log::warn;
+use std::{fs, path::{Path, PathBuf}};
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
     let mut matching_files = Vec::new();
@@ -56,17 +57,110 @@ pub fn path_prefix(out: &str) -> anyhow::Result<String> {
 pub fn force_or_not(path: &str, force: bool) -> anyhow::Result<()> {
     let path = Path::new(path);
     let mut output_exists = path.exists();
+    let dir = path
+        .parent()
+        .context(format!("Can't parse the parent dir of {}", path.display()))?;
 
     if force && output_exists {
-        fs::remove_dir_all(
-            path.parent()
-                .context(format!("Can't parse the parent dir of {}", path.display()))?,
-        )?;
+        fs::remove_dir_all(dir)?;
+        fs::create_dir_all(dir)?;
         output_exists = false;
     }
 
     if output_exists {
-        anyhow::bail!("{} already exists.", path.display())
+        warn!("{} already exists.", path.display())
     }
     Ok(())
 }
+
+use rayon::prelude::*;
+use std::cmp::Ord;
+
+pub struct VectorIntersction<T> {
+    pub common: Vec<T>,
+    pub only_in_first: Vec<T>,
+    pub only_in_second: Vec<T>,
+}
+
+impl<T> Default for VectorIntersction<T> {
+    fn default() -> Self {
+        Self {
+            common: Vec::new(),
+            only_in_first: Vec::new(),
+            only_in_second: Vec::new(),
+        }
+    }
+}
+
+impl<T: Ord + Clone> VectorIntersction<T> {
+    fn merge(&mut self, other: &mut Self) {
+        self.common.append(&mut other.common);
+        self.only_in_first.append(&mut other.only_in_first);
+        self.only_in_second.append(&mut other.only_in_second);
+    }
+}
+
+fn intersection<T: Ord + Send + Sync + Clone>(vec1: &[T], vec2: &[T]) -> VectorIntersction<T> {
+    let mut result = VectorIntersction::default();
+    let mut i = 0;
+    let mut j = 0;
+
+    while i < vec1.len() && j < vec2.len() {
+        match vec1[i].cmp(&vec2[j]) {
+            std::cmp::Ordering::Less => {
+                result.only_in_first.push(vec1[i].clone());
+                i += 1;
+            }
+            std::cmp::Ordering::Greater => {
+                result.only_in_second.push(vec2[j].clone());
+                j += 1;
+            }
+            std::cmp::Ordering::Equal => {
+                result.common.push(vec1[i].clone());
+                i += 1;
+                j += 1;
+            }
+        }
+    }
+
+    result.only_in_first.extend(vec1[i..].iter().cloned());
+    result.only_in_second.extend(vec2[j..].iter().cloned());
+
+    result
+}
+
+pub fn par_intersection<T: Ord + Send + Sync + Clone>(
+    vec1: &[T],
+    vec2: &[T],
+) -> VectorIntersction<T> {
+    let chunk_size = (vec1.len() / rayon::current_num_threads()).max(1);
+
+    vec1.par_chunks(chunk_size)
+        .map(|chunk| {
+            let start = vec2.partition_point(|x| x < &chunk[0]);
+            let end = vec2.partition_point(|x| x <= &chunk[chunk.len() - 1]);
+            
+            // Ensure start is not greater than end
+            if start <= end {
+                intersection(chunk, &vec2[start..end])
+            } else {
+                // If start > end, there's no intersection for this chunk
+                VectorIntersction::default()
+            }
+        })
+        .reduce(VectorIntersction::default, |mut acc, mut x| {
+            acc.merge(&mut x);
+            acc
+        })
+}
+
+pub fn get_temp_file_path() -> std::io::Result<PathBuf> {
+    let temp_path = tempfile::Builder::new()
+        .prefix("my-temp-")
+        .suffix(".tmp")
+        .rand_bytes(5)
+        .tempfile()?
+        .into_temp_path();
+    
+    Ok(temp_path.to_path_buf())
+}

+ 25 - 11
src/io/vcf.rs

@@ -1,15 +1,14 @@
-use std::io::{BufRead, BufReader};
+use std::{fs::File, io::{BufRead, BufReader, Write}};
 
-use log::warn;
+use anyhow::Context;
+use bgzip::{write::BGZFMultiThreadWriter, Compression};
+use log::{info, warn};
 
-use crate::variant::variant::{Annotation, Variant};
+use crate::variant::variant::VcfVariant;
 
 use super::readers::get_reader;
 
-pub fn read_vcf(
-    path: &str,
-    annotations: &[Annotation],
-) -> anyhow::Result<Vec<Variant>> {
+pub fn read_vcf(path: &str) -> anyhow::Result<Vec<VcfVariant>> {
     let reader = BufReader::new(get_reader(path)?);
 
     let mut res = Vec::new();
@@ -19,13 +18,28 @@ pub fn read_vcf(
                 if line.starts_with("#") {
                     continue;
                 }
-                let mut v: Variant = line.parse()?;
-                v.annotations = annotations.to_vec();
-                res.push(v);
-            },
+                res.push(line.parse().context(format!("Can't parse {line}"))?);
+            }
             Err(e) => warn!("Can't read line {i}: {e}"),
         }
     }
 
     Ok(res)
 }
+
+pub fn write_vcf(variants: &[VcfVariant], path: &str) -> anyhow::Result<()> {
+    info!("Writing: {path}");
+    let file = File::create(path)?;
+    let mut writer = BGZFMultiThreadWriter::new(file, Compression::default());
+    writer.write_all(b"##fileformat=VCFv4.2\n")?;
+    writer.write_all(b"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tSAMPLE\n")?;
+
+    for variant in variants {
+        writer.write_fmt(format_args!("{}\n", variant.commun_deepvariant_clairs().into_vcf_row()))?;
+    }
+
+    writer.close()?;
+    Ok(())
+}
+
+

+ 54 - 32
src/lib.rs

@@ -10,6 +10,7 @@ pub mod functions;
 pub mod helpers;
 pub mod variant;
 pub mod io;
+pub mod pipes;
 // pub mod vcf_reader;
 
 #[macro_use]
@@ -29,14 +30,15 @@ mod tests {
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
     use log::info;
+    use pipes::somatic::Somatic;
     // use pandora_lib_assembler::assembler::AssembleConfig;
     use rayon::prelude::*;
     use runners::Run;
-    use variant::variant::{Variant, Variants};
+    use variant::variant::{VcfVariant, Variants};
 
-    use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
+    use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, variants::VariantsCollection, vcf::VcfCollection, Collections, CollectionsConfig}, commands::{bcftools::{bcftools_keep_pass, BcftoolsConfig}, dorado::Dorado}};
+    use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
 
     // export RUST_LOG="debug"
     fn init() {
@@ -120,14 +122,14 @@ mod tests {
         Dorado::from_mux(cases, config)
     }
 
-    #[test_log::test]
-    fn clairs() -> anyhow::Result<()> {
-        let config = ClairSConfig {
-            result_dir: "/data/test".to_string(),
-            ..ClairSConfig::default()
-        };
-        ClairS::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
-    }
+    // #[test_log::test]
+    // fn clairs() -> anyhow::Result<()> {
+    //     let config = ClairSConfig {
+    //         result_dir: "/data/test".to_string(),
+    //         ..ClairSConfig::default()
+    //     };
+    //     ClairS::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
+    // }
 
     #[test_log::test]
     fn nanomonsv() -> anyhow::Result<()> {
@@ -206,13 +208,6 @@ mod tests {
         run_tasks(config)
     }
 
-    #[test_log::test]
-    fn somatic() -> anyhow::Result<()> {
-        let variants_collection = VariantsCollection::new("/data/longreads_basic_pipe")?;
-        variants_collection.data.iter().for_each(|v| println!("{}\t{}", v.id, v.path.display()));
-        Ok(())
-    }
-
     // #[test_log::test]
     // fn bcftools_pass() {
     //     let config = BcftoolsConfig::default();
@@ -371,21 +366,17 @@ mod tests {
     }
 
     #[test]
-    fn run_clairs() -> anyhow::Result<()> {
+    fn run_deepvariant() -> anyhow::Result<()> {
         init();
-        let id = "HAMROUNE";
-        let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
-        let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1.bam");
-        ClairS::new(id, &diag_bam, &mrd_bam, ClairSConfig::default()).run()
+        DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run()
     }
 
     #[test]
-    fn run_deepvariant() -> anyhow::Result<()> {
+    fn run_clairs() -> anyhow::Result<()> {
         init();
-        DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run()
+        ClairS::initialize("ADJAGBA", Config::default())?.run()
     }
 
-
     #[test]
     fn run_longphase() -> anyhow::Result<()> {
         init();
@@ -416,15 +407,19 @@ mod tests {
     #[test]
     fn variant_parse() -> anyhow::Result<()> {
         let row = "chr1\t1366\t.\tC\tCCCT\t8.2\tPASS\t.\tGT:GQ:DP:AD:VAF:PL\t1/1:4:6:1,4:0.666667:6,4,0";
-        let variant: Variant = row.parse()?;
+        let variant: VcfVariant = row.parse()?;
         let var_string = variant.into_vcf_row();
         assert_eq!(row, &var_string);
 
         let row = "chr1\t1366\t.\tC\tCCCT\t8.2\tPASS\t.";
-        let variant: Variant = row.parse()?;
+        let variant: VcfVariant = row.parse()?;
         let var_string = variant.into_vcf_row();
         assert_eq!(row, &var_string);
 
+        let row = "chr1\t2628434\t.\tC\tT\t17.973\tPASS\tH;FAU=0;FCU=7;FGU=0;FTU=7;RAU=0;RCU=2;RGU=0;RTU=2\tGT:GQ:DP:AF:AD:NAF:NDP:NAD:AU:CU:GU:TU:NAU:NCU:NGU:NTU\t0/1:17:18:0.5:0,9:0:11:0,0:0:9:0:9:0:11:0:0";
+        let variant: VcfVariant = row.parse()?;
+        let var_string = variant.into_vcf_row();
+        assert_eq!(row, &var_string);
 
         Ok(())
     }
@@ -434,10 +429,37 @@ mod tests {
         init();
         let id = "ADJAGBA";
         let time = "diag";
-        let mut dv = DeepVariant::initialize(id, time, Config::default())?;
-        dv.run()?;
-        let variants = dv.variants()?;
-        println!("Deepvariant for {id} {time}: variants {}", variants.len());
+        let dv = DeepVariant::initialize(id, time, Config::default())?;
+        let variant_collection = dv.variants()?;
+        println!("Deepvariant for {id} {time}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
+        Ok(())
+    }
+
+    #[test]
+    fn variant_load_clairs() -> anyhow::Result<()> {   
+        init();
+        let id = "ADJAGBA";
+        let clairs = ClairS::initialize(id, Config::default())?;
+        let variant_collection = clairs.variants()?;
+        println!("ClairS for {id}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
         Ok(())
     }
+
+    #[test]
+    fn variant_load_clairs_germline() -> anyhow::Result<()> {   
+        init();
+        let id = "ADJAGBA";
+        let clairs = ClairS::initialize(id, Config::default())?;
+        let germline_variant_collection = clairs.germline()?;
+        println!("ClairS for {id}: variants {} {}", germline_variant_collection.variants.len(), germline_variant_collection.vcf.n_variants);
+        Ok(())
+    }
+
+    #[test]
+    fn pipe_somatic() -> anyhow::Result<()> {   
+        init();
+        let id = "ADJAGBA";
+        Somatic::initialize(id, Config::default())?.run()
+    }
+
 }

+ 2 - 0
src/pipes/mod.rs

@@ -0,0 +1,2 @@
+pub mod somatic;
+

+ 123 - 0
src/pipes/somatic.rs

@@ -0,0 +1,123 @@
+use anyhow::Context;
+use log::info;
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
+use std::{
+    collections::HashMap,
+    fs::File,
+    io::{BufRead, BufReader},
+    ops::Range,
+    thread,
+};
+
+use crate::{
+    callers::{clairs::ClairS, deep_variant::DeepVariant},
+    collection::{Initialize, InitializeSolo},
+    config::Config,
+    helpers::par_intersection,
+    io::vcf::write_vcf,
+    runners::Run,
+    variant::variant::Variants,
+};
+
+#[derive(Debug)]
+pub struct Somatic {
+    pub id: String,
+    pub config: Config,
+}
+
+impl Initialize for Somatic {
+    fn initialize(id: &str, config: crate::config::Config) -> anyhow::Result<Self> {
+        let id = id.to_string();
+        Ok(Self { id, config })
+    }
+    // add code here
+}
+
+impl Run for Somatic {
+    fn run(&mut self) -> anyhow::Result<()> {
+        info!("Running Somatic pipe for {}", self.id);
+
+        info!("Initialization...");
+        let masks = {
+            let mut hm = HashMap::new();
+
+            let reader = BufReader::new(File::open(self.config.mask_bed(&self.id))?);
+            let mut last_chr = "chr1".to_string();
+            let mut acc = Vec::new();
+            for line in reader.lines() {
+                let line = line?;
+                let (contig, range) = line.split_once("\t").context("")?;
+                let (start, end) = range.split_once("\t").context("")?;
+                let r = Range {
+                    start: start.parse::<u32>()?,
+                    end: end.parse::<u32>()?,
+                };
+                if last_chr != contig {
+                    hm.insert(last_chr.clone(), acc.clone());
+                    last_chr = contig.to_string();
+                    acc.clear();
+                }
+                acc.push(r);
+            }
+            hm.insert(last_chr, acc);
+            hm
+        };
+
+        let mut clairs = ClairS::initialize(&self.id, self.config.clone())?;
+        let mut deep_variant_mrd = DeepVariant::initialize(&self.id, "mrd", self.config.clone())?;
+
+        info!("Running callers if necessary...");
+        clairs.run()?;
+        deep_variant_mrd.run()?;
+
+        info!("Loading Germlines VCF from DeepVariant mrd and ClairS germline, in parallel...");
+
+        let clairs_handle = {
+            let clairs = clairs.clone();
+            thread::spawn(move || clairs.germline())
+        };
+
+        let deep_variant_mrd_handle = {
+            let deep_variant_mrd = deep_variant_mrd.clone();
+            thread::spawn(move || deep_variant_mrd.variants())
+        };
+
+        let clairs_germline = clairs_handle
+            .join()
+            .map_err(|e| anyhow::anyhow!("Thread panic in clairs germline: {:?}", e))
+            .context("Failed to join clairs_handle thread")?
+            .context(format!("Error in clairs germline loading for {}", self.id))?;
+
+        let deep_variant_germline = deep_variant_mrd_handle
+            .join()
+            .map_err(|e| anyhow::anyhow!("Thread panic in clairs germline: {:?}", e))
+            .context("Failed to join deep_variant_mrd_handle thread")?
+            .context(format!(
+                "Error in deepvariant germline loading for {}",
+                self.id
+            ))?;
+
+        info!("Merging variants");
+
+        let u = par_intersection(&deep_variant_germline.variants, &clairs_germline.variants);
+
+        info!(
+            "common: {}, only DeepVariant: {}, only ClairS: {}",
+            u.common.len(),
+            u.only_in_first.len(),
+            u.only_in_second.len()
+        );
+
+        [
+            (&u.common, "commun.vcf.gz"),
+            (&u.only_in_first, "dv.vcf.gz"),
+            (&u.only_in_second, "clairs.vcf.gz"),
+        ]
+        .par_iter()
+        .for_each(|(v, p)| {
+            write_vcf(v, p).unwrap();
+        });
+
+        Ok(())
+    }
+}

+ 1 - 0
src/variant/mod.rs

@@ -1,2 +1,3 @@
 pub mod variant;
+pub mod variant_collection;
 

+ 245 - 30
src/variant/variant.rs

@@ -1,9 +1,10 @@
+use crate::variant::variant_collection::VariantCollection;
 use anyhow::{anyhow, Context, Ok};
 use serde::{Deserialize, Serialize};
-use std::{fmt, str::FromStr};
+use std::{cmp::Ordering, fmt, str::FromStr};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Variant {
+pub struct VcfVariant {
     pub contig: String,
     pub position: u32,
     pub id: String,
@@ -11,12 +12,11 @@ pub struct Variant {
     pub alternative: ReferenceAlternative,
     pub quality: Option<f32>,
     pub filter: Filter,
-    pub info: String,
+    pub infos: Infos,
     pub formats: Formats,
-    pub annotations: Vec<Annotation>,
 }
 
-impl PartialEq for Variant {
+impl PartialEq for VcfVariant {
     fn eq(&self, other: &Self) -> bool {
         // Nota bene: id, filter, info, format and quality is intentionally not compared
         self.contig == other.contig
@@ -26,8 +26,8 @@ impl PartialEq for Variant {
     }
 }
 
-impl Eq for Variant {}
-impl FromStr for Variant {
+impl Eq for VcfVariant {}
+impl FromStr for VcfVariant {
     type Err = anyhow::Error;
 
     fn from_str(s: &str) -> anyhow::Result<Self> {
@@ -77,18 +77,18 @@ impl FromStr for Variant {
                 .ok_or(anyhow!("Can't parse filter from: {s}"))?
                 .parse()
                 .context(format!("Can't parse filter from: {s}"))?,
-            info: v
+            infos: v
                 .get(7)
-                .ok_or(anyhow!("Can't parse id from: {s}"))?
-                .to_string(),
+                .ok_or(anyhow!("Can't parse infos from: {s}"))?
+                .parse()
+                .context(format!("Can't parse infos from: {s}"))?,
             formats,
-            annotations: Vec::new(),
         })
     }
 }
 
 // #CHROM  POS     ID      REF     ALT     QUAL    FILTER  INFO    FORMAT  ADJAGBA_diag
-impl Variant {
+impl VcfVariant {
     pub fn into_vcf_row(&self) -> String {
         let mut columns = vec![
             self.contig.to_string(),
@@ -100,7 +100,7 @@ impl Variant {
                 .map(|v| v.to_string())
                 .unwrap_or(".".to_string()),
             self.filter.to_string(),
-            self.info.to_string(),
+            self.infos.to_string(),
         ];
 
         if !self.formats.0.is_empty() {
@@ -111,6 +111,45 @@ impl Variant {
 
         columns.join("\t")
     }
+
+    pub fn chr_num(&self) -> u32 {
+        self.contig
+            .trim_start_matches("chr")
+            .parse()
+            .unwrap_or(u32::MAX)
+    }
+
+    pub fn commun_deepvariant_clairs(&self) -> VcfVariant {
+        VcfVariant {
+            contig: self.contig.clone(),
+            position: self.position,
+            id: self.id.clone(),
+            reference: self.reference.clone(),
+            alternative: self.alternative.clone(),
+            quality: self.quality,
+            filter: Filter::Other(".".to_string()),
+            infos: Infos(vec![Info::Empty]),
+            formats: self.formats.commun_deepvariant_clairs(),
+        }
+    }
+}
+
+impl PartialOrd for VcfVariant {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for VcfVariant {
+    fn cmp(&self, other: &Self) -> Ordering {
+        let self_num = self.chr_num();
+        let other_num = other.chr_num();
+
+        match self_num.cmp(&other_num) {
+            Ordering::Equal => self.position.cmp(&other.position),
+            other => other,
+        }
+    }
 }
 
 // Tag
@@ -122,15 +161,161 @@ pub enum Annotation {
     Other((String, String)), // (key, value)
 }
 
+/// Info
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
+pub struct Infos(Vec<Info>);
+
+impl FromStr for Infos {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        Ok(Self(
+            s.split(";")
+                .map(Info::from_str)
+                .collect::<Result<Vec<Info>, _>>()
+                .map_err(|e| anyhow::anyhow!("Failed to parse info: {e}"))?,
+        ))
+    }
+}
+
+impl fmt::Display for Infos {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(
+            f,
+            "{}",
+            self.0
+                .iter()
+                .map(|e| e.to_string())
+                .collect::<Vec<String>>()
+                .join(";")
+        )
+    }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub enum Info {
+    Empty,
+    H,
+    F,
+    P,
+    FAU(u32),
+    FCU(u32),
+    FGU(u32),
+    FTU(u32),
+    RAU(u32),
+    RCU(u32),
+    RGU(u32),
+    RTU(u32),
+}
+
+impl FromStr for Info {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        if s.contains("=") {
+            let (key, value) = s
+                .split_once('=')
+                .context(format!("Can't split with `=` {s}"))?;
+            Ok(match key {
+                "FAU" => Info::FAU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "FCU" => Info::FCU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "FGU" => Info::FGU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "FTU" => Info::FTU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "RAU" => Info::RAU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "RCU" => Info::RCU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "RGU" => Info::RGU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "RTU" => Info::RTU(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+
+                _ => Info::Empty,
+            })
+        } else {
+            Ok(match s {
+                "H" => Info::H,
+                "F" => Info::F,
+                "P" => Info::P,
+
+                _ => Info::Empty,
+            })
+        }
+    }
+}
+
+impl fmt::Display for Info {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Info::Empty => write!(f, "."),
+            Info::H => write!(f, "H"),
+            Info::F => write!(f, "F"),
+            Info::P => write!(f, "P"),
+            Info::FAU(v) => write!(f, "FAU={v}"),
+            Info::FCU(v) => write!(f, "FCU={v}"),
+            Info::FGU(v) => write!(f, "FGU={v}"),
+            Info::FTU(v) => write!(f, "FTU={v}"),
+            Info::RAU(v) => write!(f, "RAU={v}"),
+            Info::RCU(v) => write!(f, "RCU={v}"),
+            Info::RGU(v) => write!(f, "RGU={v}"),
+            Info::RTU(v) => write!(f, "RTU={v}"),
+        }
+    }
+}
+
 /// Format
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub enum Format {
+    // DeepVariant
     GT(String),
     GQ(u32),
     DP(u32),
     AD(Vec<u32>),
     VAF(f32),
     PL(Vec<u32>),
+
+    // Clairs
+    AF(f32),
+    NAF(u32),
+    NDP(u32),
+    NAD(Vec<u32>),
+    AU(u32),
+    CU(u32),
+    GU(u32),
+    TU(u32),
+    NAU(u32),
+    NCU(u32),
+    NGU(u32),
+    NTU(u32),
+
     Other((String, String)), // (key, value)
 }
 
@@ -201,32 +386,62 @@ impl TryFrom<(&str, &str)> for Format {
 
 impl From<Format> for (String, String) {
     fn from(format: Format) -> Self {
+        let concat = |values: Vec<u32>| -> String {
+            values
+                .iter()
+                .map(|v| v.to_string())
+                .collect::<Vec<_>>()
+                .join(",")
+        };
         match format {
             Format::GT(value) => ("GT".to_string(), value),
             Format::GQ(value) => ("GQ".to_string(), value.to_string()),
             Format::DP(value) => ("DP".to_string(), value.to_string()),
-            Format::AD(values) => {
-                let value_str = values
-                    .iter()
-                    .map(|v| v.to_string())
-                    .collect::<Vec<_>>()
-                    .join(",");
-                ("AD".to_string(), value_str)
-            }
+            Format::AD(values) => ("AD".to_string(), concat(values)),
             Format::VAF(value) => ("VAF".to_string(), value.to_string()),
-            Format::PL(values) => {
-                let value_str = values
-                    .iter()
-                    .map(|v| v.to_string())
-                    .collect::<Vec<_>>()
-                    .join(",");
-                ("PL".to_string(), value_str)
-            }
+            Format::PL(values) => ("PL".to_string(), concat(values)),
             Format::Other((key, value)) => (key, value),
+            Format::AF(value) => ("AF".to_string(), value.to_string()),
+            Format::NAF(value) => ("NAF".to_string(), value.to_string()),
+            Format::NDP(value) => ("NDP".to_string(), value.to_string()),
+            Format::NAD(values) => ("NAD".to_string(), concat(values)),
+            Format::AU(value) => ("AU".to_string(), value.to_string()),
+            Format::CU(value) => ("CU".to_string(), value.to_string()),
+            Format::GU(value) => ("GU".to_string(), value.to_string()),
+            Format::TU(value) => ("TU".to_string(), value.to_string()),
+            Format::NAU(value) => ("NAU".to_string(), value.to_string()),
+            Format::NCU(value) => ("NCU".to_string(), value.to_string()),
+            Format::NGU(value) => ("NGU".to_string(), value.to_string()),
+            Format::NTU(value) => ("NTU".to_string(), value.to_string()),
         }
     }
 }
 
+impl Formats {
+    pub fn commun_deepvariant_clairs(&self) -> Self {
+        let filtered_vec: Vec<Format> = self
+            .0
+            .clone()
+            .into_iter()
+            .map(|e| {
+                if let Format::VAF(v) = e {
+                    Format::AF(v)
+                } else {
+                    e
+                }
+            })
+            .filter(|format| {
+                matches!(
+                    format,
+                    Format::GT(_) | Format::GQ(_) | Format::DP(_) | Format::AD(_) | Format::AF(_)
+                )
+            })
+            .collect();
+
+        Formats(filtered_vec)
+    }
+}
+
 /// Filter
 #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
 pub enum Filter {
@@ -350,5 +565,5 @@ impl fmt::Display for Base {
 }
 
 pub trait Variants {
-    fn variants(&self) -> anyhow::Result<Vec<Variant>>;
+    fn variants(&self) -> anyhow::Result<VariantCollection>;
 }

+ 9 - 0
src/variant/variant_collection.rs

@@ -0,0 +1,9 @@
+use crate::collection::vcf::Vcf;
+use super::variant::VcfVariant;
+
+#[derive(Debug)]
+pub struct VariantCollection {
+    pub variants: Vec<VcfVariant>,
+    pub vcf: Vcf
+}
+