Selaa lähdekoodia

cleared syntax

Thomas 11 kuukautta sitten
vanhempi
commit
62bb510dba

+ 11 - 1
Cargo.lock

@@ -704,7 +704,7 @@ dependencies = [
  "bitflags 2.6.0",
  "cexpr",
  "clang-sys",
- "itertools",
+ "itertools 0.12.1",
  "lazy_static",
  "lazycell",
  "log",
@@ -2603,6 +2603,15 @@ dependencies = [
  "either",
 ]
 
+[[package]]
+name = "itertools"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itoa"
 version = "1.0.14"
@@ -3616,6 +3625,7 @@ dependencies = [
  "glob",
  "hashbrown 0.15.2",
  "indicatif",
+ "itertools 0.14.0",
  "lazy_static",
  "locale_config",
  "log",

+ 1 - 0
Cargo.toml

@@ -52,3 +52,4 @@ charming = { version = "0.4.0", features = ["ssr"] }
 rusqlite = { version = "0.32.1", features = ["chrono", "serde_json"] }
 dirs = "5.0.1"
 noodles-gff = "0.41.0"
+itertools = "0.14.0"

+ 75 - 5
src/annotation/mod.rs

@@ -1,8 +1,8 @@
 pub mod cosmic;
 pub mod echtvar;
 pub mod gnomad;
-pub mod vep;
 pub mod ncbi;
+pub mod vep;
 
 use std::{
     collections::{HashMap, HashSet},
@@ -11,11 +11,13 @@ use std::{
     sync::Arc,
 };
 
-use crate::helpers::mean;
+use crate::{helpers::mean, variant::variant_collection::VariantCollection};
 use cosmic::Cosmic;
 use dashmap::DashMap;
 use gnomad::GnomAD;
+use log::info;
 use rayon::prelude::*;
+use vep::VEP;
 
 #[derive(Debug, Clone, PartialEq)]
 pub enum Annotation {
@@ -32,6 +34,7 @@ pub enum Annotation {
     Cosmic(Cosmic),
     GnomAD(GnomAD),
     LowEntropy,
+    VEP(Vec<VEP>),
 }
 
 impl fmt::Display for Annotation {
@@ -50,6 +53,7 @@ impl fmt::Display for Annotation {
             Annotation::Cosmic(_) => "Cosmic",
             Annotation::GnomAD(_) => "GnomAD",
             Annotation::LowEntropy => "LowEntropy",
+            Annotation::VEP(_) => "VEP",
         };
         write!(f, "{}", str)
     }
@@ -80,6 +84,10 @@ impl FromStr for Annotation {
 pub enum Caller {
     DeepVariant,
     ClairS,
+    NanomonSV,
+    NanomonSVSolo,
+    Savana,
+    Severus,
 }
 
 impl fmt::Display for Caller {
@@ -87,6 +95,10 @@ impl fmt::Display for Caller {
         match self {
             Caller::DeepVariant => write!(f, "DeepVariant"),
             Caller::ClairS => write!(f, "ClairS"),
+            Caller::NanomonSV => write!(f, "NanomonSV"),
+            Caller::NanomonSVSolo => write!(f, "NanomonSV-solo"),
+            Caller::Savana => write!(f, "Savana"),
+            Caller::Severus => write!(f, "Severus"),
         }
     }
 }
@@ -128,6 +140,7 @@ impl Annotations {
                     | Annotation::LowConstitDepth
                     | Annotation::LowEntropy
                     | Annotation::GnomAD(_)
+                    | Annotation::VEP(_)
                     | Annotation::HighConstitAlt => categorical.push(ann.to_string()),
                     Annotation::Callers(caller) => categorical.push(caller.to_string()),
                     Annotation::ShannonEntropy(v) => numerical.push((ann.to_string(), *v)),
@@ -190,6 +203,57 @@ impl Annotations {
             .collect()
     }
 
+    pub fn retain_variants(
+        &mut self,
+        variants: &mut Vec<VariantCollection>,
+        filter: impl Fn(&Vec<Annotation>) -> bool + Send + Sync,
+    ) -> usize {
+        info!("Variant Keys lookup");
+        let mut keys = HashSet::new();
+        self.store.retain(|key, value| {
+            if filter(value) {
+                keys.insert(*key);
+                true
+            } else {
+                false
+            }
+        });
+
+        // let keys: Vec<u128> = self
+        //     .store
+        //     .par_iter()
+        //     .filter(|entry| filter(entry.value()))
+        //     .map(|entry| *entry.key())
+        //     .collect();
+        info!("{} unique Variants to keep", keys.len());
+
+        // info!("Removing annotations");
+        // self.store.retain(|key, _| keys.contains(key));
+
+        info!("Removing variants from collections");
+        let n_removed: usize = variants
+            .par_iter_mut()
+            .map(|c| {
+                let before = c.variants.len();
+                c.variants = c
+                    .variants
+                    .par_iter()
+                    .filter(|a| keys.contains(&a.hash_variant()))
+                    // .filter(|a| keys.par_iter().any(|k| k == &a.hash_variant()))
+                    .cloned()
+                    .collect();
+                // c.variants
+                //     .retain(|a| keys.par_iter().any(|k| k == &a.hash_variant()));
+                let after = c.variants.len();
+                info!("{} {}\t{}/{}", c.caller, c.category, before - after, before);
+                before - after
+            })
+            .sum();
+        variants.retain(|e| !e.variants.is_empty());
+        info!("{n_removed} variants removed from collections.");
+        n_removed
+    }
+
     pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u128>) {
         self.store.retain(|key, _| keys_to_keep.contains(key));
     }
@@ -260,9 +324,11 @@ impl Annotations {
         self.store.iter_mut().for_each(|mut e| {
             let anns = e.value_mut();
             let mut is_low = false;
-            anns.iter().for_each(|ann| if let Annotation::ShannonEntropy(ent) = ann {
-                if *ent < min_shannon_entropy && !anns.contains(&Annotation::Somatic) {
-                    is_low = true
+            anns.iter().for_each(|ann| {
+                if let Annotation::ShannonEntropy(ent) = ann {
+                    if *ent < min_shannon_entropy && !anns.contains(&Annotation::Somatic) {
+                        is_low = true
+                    }
                 }
             });
             if is_low {
@@ -271,3 +337,7 @@ impl Annotations {
         });
     }
 }
+
+pub trait CallerCat {
+    fn caller_cat(&self) -> (Caller, Annotation);
+}

+ 243 - 241
src/annotation/vep.rs

@@ -1,23 +1,19 @@
-use anyhow::{anyhow, Context};
-use csv::ReaderBuilder;
+use anyhow::anyhow;
 use hashbrown::HashMap;
+use itertools::Itertools;
 use log::warn;
 use serde::{Deserialize, Serialize};
-use std::io::Write;
 use std::{
-    env::temp_dir,
-    fs::{self, File},
+    cmp::{Ordering, Reverse},
     io::{BufRead, BufReader},
     process::{Command, Stdio},
     str::FromStr,
 };
 
-use crate::io::vcf::vcf_header;
-
 use super::ncbi::NCBIAcc;
 
 #[derive(Debug, PartialEq, Serialize, Deserialize)]
-pub struct VEPLine {
+pub struct VepLine {
     pub uploaded_variation: String,
     pub location: String,
     pub allele: String,
@@ -34,6 +30,34 @@ pub struct VEPLine {
     pub extra: String,
 }
 
+impl FromStr for VepLine {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let parts: Vec<&str> = s.split('\t').collect();
+        if parts.len() != 14 {
+            return Err(anyhow!("Invalid number of fields in VEP line"));
+        }
+
+        Ok(VepLine {
+            uploaded_variation: parts[0].to_string(),
+            location: parts[1].to_string(),
+            allele: parts[2].to_string(),
+            gene: parts[3].to_string(),
+            feature: parts[4].to_string(),
+            feature_type: parts[5].to_string(),
+            consequence: parts[6].to_string(),
+            cdna_position: parts[7].to_string(),
+            cds_position: parts[8].to_string(),
+            protein_position: parts[9].to_string(),
+            amino_acids: parts[10].to_string(),
+            codons: parts[11].to_string(),
+            existing_variation: parts[12].to_string(),
+            extra: parts[13].to_string(),
+        })
+    }
+}
+
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 pub struct VEP {
     pub gene: Option<String>,
@@ -94,8 +118,7 @@ pub enum VepConsequence {
     SequenceVariant,
 }
 
-
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub enum VepImpact {
     HIGH,
     MODERATE,
@@ -103,56 +126,143 @@ pub enum VepImpact {
     MODIFIER,
 }
 
-impl VepImpact {
-    pub fn from_conseque(consequence: &VepConsequence) -> VepImpact {
+impl PartialOrd for VepImpact {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for VepImpact {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match (self, other) {
+            (VepImpact::HIGH, VepImpact::HIGH) => Ordering::Equal,
+            (VepImpact::HIGH, _) => Ordering::Less,
+            (VepImpact::MODERATE, VepImpact::HIGH) => Ordering::Greater,
+            (VepImpact::MODERATE, VepImpact::MODERATE) => Ordering::Equal,
+            (VepImpact::MODERATE, _) => Ordering::Less,
+            (VepImpact::LOW, VepImpact::HIGH | VepImpact::MODERATE) => Ordering::Greater,
+            (VepImpact::LOW, VepImpact::LOW) => Ordering::Equal,
+            (VepImpact::LOW, VepImpact::MODIFIER) => Ordering::Less,
+            (VepImpact::MODIFIER, VepImpact::MODIFIER) => Ordering::Equal,
+            (VepImpact::MODIFIER, _) => Ordering::Greater,
+        }
+    }
+}
+
+impl FromStr for VepImpact {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        match s {
+            "LOW" => Ok(VepImpact::LOW),
+            "MODERATE" => Ok(VepImpact::MODERATE),
+            "HIGH" => Ok(VepImpact::HIGH),
+            "MODIFIER" => Ok(VepImpact::MODIFIER),
+            _ => Err(anyhow!("Unexpected VEP Impact value")),
+        }
+    }
+}
+
+impl From<&VepConsequence> for VepImpact {
+    fn from(consequence: &VepConsequence) -> Self {
         match consequence {
-            VepConsequence::TranscriptAblation |
-            VepConsequence::SpliceAcceptorVariant |
-            VepConsequence::SpliceDonorVariant |
-            VepConsequence::StopGained |
-            VepConsequence::FrameshiftVariant |
-            VepConsequence::StopLost |
-            VepConsequence::StartLost |
-            VepConsequence::TranscriptAmplification |
-            VepConsequence::FeatureElongation |
-            VepConsequence::FeatureTruncation => VepImpact::HIGH,
-            
-            VepConsequence::InframeInsertion |
-            VepConsequence::InframeDeletion |
-            VepConsequence::MissenseVariant |
-            VepConsequence::ProteinAlteringVariant => VepImpact::MODERATE,
-
-            VepConsequence::SpliceDonor5thBaseVariant |
-            VepConsequence::SpliceRegionVariant |
-            VepConsequence::SpliceDonorRegionVariant |
-            VepConsequence::SplicePolyrimidineTractVariant |
-            VepConsequence::IncompleteTerminalCodonVariant |
-            VepConsequence::StartRetainedVariant |
-            VepConsequence::StopRetainedVariant |
-            VepConsequence::SynonymousVariant => VepImpact::LOW,
-
-            VepConsequence::CodingSequenceVariant |
-            VepConsequence::MatureMiRnaVariant |
-            VepConsequence::FivePrimeUtrVariant |
-            VepConsequence::ThreePrimeUtrVariant |
-            VepConsequence::NonCodingTranscriptExonVariant |
-            VepConsequence::IntronVariant |
-            VepConsequence::NmdTranscriptVariant |
-            VepConsequence::NonCodingTranscriptVariant |
-            VepConsequence::UpstreamGeneVariant |
-            VepConsequence::DownstreamGeneVariant |
-            VepConsequence::TfbsAblation |
-            VepConsequence::TfbsAmplification |
-            VepConsequence::TfBindingSiteVariant |
-            VepConsequence::RegulatoryRegionAblation |
-            VepConsequence::RegulatoryRegionAmplification |
-            VepConsequence::RegulatoryRegionVariant |
-            VepConsequence::SequenceVariant |
-            VepConsequence::IntergenicVariant => VepImpact::MODIFIER,
+            VepConsequence::TranscriptAblation
+            | VepConsequence::SpliceAcceptorVariant
+            | VepConsequence::SpliceDonorVariant
+            | VepConsequence::StopGained
+            | VepConsequence::FrameshiftVariant
+            | VepConsequence::StopLost
+            | VepConsequence::StartLost
+            | VepConsequence::TranscriptAmplification
+            | VepConsequence::FeatureElongation
+            | VepConsequence::FeatureTruncation => VepImpact::HIGH,
+
+            VepConsequence::InframeInsertion
+            | VepConsequence::InframeDeletion
+            | VepConsequence::MissenseVariant
+            | VepConsequence::ProteinAlteringVariant => VepImpact::MODERATE,
+
+            VepConsequence::SpliceDonor5thBaseVariant
+            | VepConsequence::SpliceRegionVariant
+            | VepConsequence::SpliceDonorRegionVariant
+            | VepConsequence::SplicePolyrimidineTractVariant
+            | VepConsequence::IncompleteTerminalCodonVariant
+            | VepConsequence::StartRetainedVariant
+            | VepConsequence::StopRetainedVariant
+            | VepConsequence::SynonymousVariant => VepImpact::LOW,
+
+            VepConsequence::CodingSequenceVariant
+            | VepConsequence::MatureMiRnaVariant
+            | VepConsequence::FivePrimeUtrVariant
+            | VepConsequence::ThreePrimeUtrVariant
+            | VepConsequence::NonCodingTranscriptExonVariant
+            | VepConsequence::IntronVariant
+            | VepConsequence::NmdTranscriptVariant
+            | VepConsequence::NonCodingTranscriptVariant
+            | VepConsequence::UpstreamGeneVariant
+            | VepConsequence::DownstreamGeneVariant
+            | VepConsequence::TfbsAblation
+            | VepConsequence::TfbsAmplification
+            | VepConsequence::TfBindingSiteVariant
+            | VepConsequence::RegulatoryRegionAblation
+            | VepConsequence::RegulatoryRegionAmplification
+            | VepConsequence::RegulatoryRegionVariant
+            | VepConsequence::SequenceVariant
+            | VepConsequence::IntergenicVariant => VepImpact::MODIFIER,
         }
     }
 }
 
+impl VepImpact {
+    // pub fn from_consequence(consequence: &VepConsequence) -> VepImpact {
+    //     match consequence {
+    //         VepConsequence::TranscriptAblation
+    //         | VepConsequence::SpliceAcceptorVariant
+    //         | VepConsequence::SpliceDonorVariant
+    //         | VepConsequence::StopGained
+    //         | VepConsequence::FrameshiftVariant
+    //         | VepConsequence::StopLost
+    //         | VepConsequence::StartLost
+    //         | VepConsequence::TranscriptAmplification
+    //         | VepConsequence::FeatureElongation
+    //         | VepConsequence::FeatureTruncation => VepImpact::HIGH,
+    //
+    //         VepConsequence::InframeInsertion
+    //         | VepConsequence::InframeDeletion
+    //         | VepConsequence::MissenseVariant
+    //         | VepConsequence::ProteinAlteringVariant => VepImpact::MODERATE,
+    //
+    //         VepConsequence::SpliceDonor5thBaseVariant
+    //         | VepConsequence::SpliceRegionVariant
+    //         | VepConsequence::SpliceDonorRegionVariant
+    //         | VepConsequence::SplicePolyrimidineTractVariant
+    //         | VepConsequence::IncompleteTerminalCodonVariant
+    //         | VepConsequence::StartRetainedVariant
+    //         | VepConsequence::StopRetainedVariant
+    //         | VepConsequence::SynonymousVariant => VepImpact::LOW,
+    //
+    //         VepConsequence::CodingSequenceVariant
+    //         | VepConsequence::MatureMiRnaVariant
+    //         | VepConsequence::FivePrimeUtrVariant
+    //         | VepConsequence::ThreePrimeUtrVariant
+    //         | VepConsequence::NonCodingTranscriptExonVariant
+    //         | VepConsequence::IntronVariant
+    //         | VepConsequence::NmdTranscriptVariant
+    //         | VepConsequence::NonCodingTranscriptVariant
+    //         | VepConsequence::UpstreamGeneVariant
+    //         | VepConsequence::DownstreamGeneVariant
+    //         | VepConsequence::TfbsAblation
+    //         | VepConsequence::TfbsAmplification
+    //         | VepConsequence::TfBindingSiteVariant
+    //         | VepConsequence::RegulatoryRegionAblation
+    //         | VepConsequence::RegulatoryRegionAmplification
+    //         | VepConsequence::RegulatoryRegionVariant
+    //         | VepConsequence::SequenceVariant
+    //         | VepConsequence::IntergenicVariant => VepImpact::MODIFIER,
+    //     }
+    // }
+}
+
 impl From<VepConsequence> for String {
     fn from(consequence: VepConsequence) -> Self {
         match consequence {
@@ -169,7 +279,9 @@ impl From<VepConsequence> for String {
             VepConsequence::MissenseVariant => "missense_variant".to_string(),
             VepConsequence::ProteinAlteringVariant => "protein_altering_variant".to_string(),
             VepConsequence::SpliceRegionVariant => "splice_region_variant".to_string(),
-            VepConsequence::IncompleteTerminalCodonVariant => "incomplete_terminal_codon_variant".to_string(),
+            VepConsequence::IncompleteTerminalCodonVariant => {
+                "incomplete_terminal_codon_variant".to_string()
+            }
             VepConsequence::StartRetainedVariant => "start_retained_variant".to_string(),
             VepConsequence::StopRetainedVariant => "stop_retained_variant".to_string(),
             VepConsequence::SynonymousVariant => "synonymous_variant".to_string(),
@@ -177,23 +289,33 @@ impl From<VepConsequence> for String {
             VepConsequence::MatureMiRnaVariant => "mature_miRNA_variant".to_string(),
             VepConsequence::FivePrimeUtrVariant => "5_prime_UTR_variant".to_string(),
             VepConsequence::ThreePrimeUtrVariant => "3_prime_UTR_variant".to_string(),
-            VepConsequence::NonCodingTranscriptExonVariant => "non_coding_transcript_exon_variant".to_string(),
+            VepConsequence::NonCodingTranscriptExonVariant => {
+                "non_coding_transcript_exon_variant".to_string()
+            }
             VepConsequence::IntronVariant => "intron_variant".to_string(),
             VepConsequence::NmdTranscriptVariant => "NMD_transcript_variant".to_string(),
-            VepConsequence::NonCodingTranscriptVariant => "non_coding_transcript_variant".to_string(),
+            VepConsequence::NonCodingTranscriptVariant => {
+                "non_coding_transcript_variant".to_string()
+            }
             VepConsequence::UpstreamGeneVariant => "upstream_gene_variant".to_string(),
             VepConsequence::DownstreamGeneVariant => "downstream_gene_variant".to_string(),
             VepConsequence::TfbsAblation => "TFBS_ablation".to_string(),
             VepConsequence::TfbsAmplification => "TFBS_amplification".to_string(),
             VepConsequence::TfBindingSiteVariant => "TF_binding_site_variant".to_string(),
             VepConsequence::RegulatoryRegionAblation => "regulatory_region_ablation".to_string(),
-            VepConsequence::RegulatoryRegionAmplification => "regulatory_region_amplification".to_string(),
+            VepConsequence::RegulatoryRegionAmplification => {
+                "regulatory_region_amplification".to_string()
+            }
             VepConsequence::FeatureElongation => "feature_elongation".to_string(),
             VepConsequence::RegulatoryRegionVariant => "regulatory_region_variant".to_string(),
             VepConsequence::FeatureTruncation => "feature_truncation".to_string(),
-            VepConsequence::SpliceDonor5thBaseVariant => "splice_donor_5th_base_variant".to_string(),
+            VepConsequence::SpliceDonor5thBaseVariant => {
+                "splice_donor_5th_base_variant".to_string()
+            }
             VepConsequence::SpliceDonorRegionVariant => "splice_donor_region_variant".to_string(),
-            VepConsequence::SplicePolyrimidineTractVariant => "splice_polyrimidine_tract_variant".to_string(),
+            VepConsequence::SplicePolyrimidineTractVariant => {
+                "splice_polyrimidine_tract_variant".to_string()
+            }
             VepConsequence::SequenceVariant => "sequence_variant".to_string(),
             VepConsequence::IntergenicVariant => "intergenic_variant".to_string(),
         }
@@ -223,9 +345,13 @@ impl FromStr for VepConsequence {
             "splice_donor_5th_base_variant" => Ok(VepConsequence::SpliceDonor5thBaseVariant),
             "splice_region_variant" => Ok(VepConsequence::SpliceRegionVariant),
             "splice_donor_region_variant" => Ok(VepConsequence::SpliceDonorRegionVariant),
-            "splice_polypyrimidine_tract_variant" => Ok(VepConsequence::SplicePolyrimidineTractVariant),
+            "splice_polypyrimidine_tract_variant" => {
+                Ok(VepConsequence::SplicePolyrimidineTractVariant)
+            }
 
-            "incomplete_terminal_codon_variant" => Ok(VepConsequence::IncompleteTerminalCodonVariant),
+            "incomplete_terminal_codon_variant" => {
+                Ok(VepConsequence::IncompleteTerminalCodonVariant)
+            }
             "start_retained_variant" => Ok(VepConsequence::StartRetainedVariant),
             "stop_retained_variant" => Ok(VepConsequence::StopRetainedVariant),
             "synonymous_variant" => Ok(VepConsequence::SynonymousVariant),
@@ -233,7 +359,9 @@ impl FromStr for VepConsequence {
             "mature_miRNA_variant" => Ok(VepConsequence::MatureMiRnaVariant),
             "5_prime_UTR_variant" => Ok(VepConsequence::FivePrimeUtrVariant),
             "3_prime_UTR_variant" => Ok(VepConsequence::ThreePrimeUtrVariant),
-            "non_coding_transcript_exon_variant" => Ok(VepConsequence::NonCodingTranscriptExonVariant),
+            "non_coding_transcript_exon_variant" => {
+                Ok(VepConsequence::NonCodingTranscriptExonVariant)
+            }
             "intron_variant" => Ok(VepConsequence::IntronVariant),
 
             "NMD_transcript_variant" => Ok(VepConsequence::NmdTranscriptVariant),
@@ -254,15 +382,20 @@ impl FromStr for VepConsequence {
     }
 }
 
-impl VEP {
-    fn from_vep_line(d: &VEPLine) -> anyhow::Result<VEP> {
+impl TryFrom<&VepLine> for VEP {
+    type Error = anyhow::Error;
+
+    fn try_from(d: &VepLine) -> anyhow::Result<Self> {
         let or_opt = |s: &str| match s {
             "-" => None,
             _ => Some(s.to_string()),
         };
 
-        let consequence = or_opt(&d.consequence)
-            .map(|c| c.split(",").map(|e| e.parse()).collect::<Vec<VepConsequence>>());
+        let consequence = or_opt(&d.consequence).map(|c| {
+            c.split(',')
+                .filter_map(|e| e.parse::<VepConsequence>().ok())
+                .collect::<Vec<VepConsequence>>()
+        });
 
         Ok(VEP {
             gene: or_opt(&d.gene),
@@ -282,42 +415,24 @@ impl VEP {
 
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 pub struct VEPExtra {
-    pub impact: Option<VEPImpact>,
+    pub impact: Option<VepImpact>,
     pub symbol: Option<String>,
     pub distance: Option<u32>,
     pub hgvs_c: Option<String>,
     pub hgvs_p: Option<String>,
 }
+
 impl FromStr for VEPExtra {
     type Err = anyhow::Error;
 
     fn from_str(s: &str) -> anyhow::Result<Self> {
-        let err = |c| anyhow!("Error {} parsing VEP Extra field {}", c, s);
-
-        let elements = s.split(";").collect::<Vec<&str>>();
-
-        let mut kv = HashMap::new();
+        let kv: HashMap<_, _> = s.split(';').filter_map(|e| e.split_once('=')).collect();
 
-        for e in elements.iter() {
-            let (k, v) = e.split_once("=").ok_or(err("in split '='"))?;
-            if kv.insert(k, v).is_some() {
-                return Err(err("kv insert"));
-            };
-        }
-
-        let impact: Option<VEPImpact> = if let Some(v) = kv.get("IMPACT") {
-            Some(v.parse()?)
-        } else {
-            None
-        };
-        let symbol: Option<String> = kv.get("SYMBOL").map(|v| v.to_string());
-        let distance: Option<u32> = if let Some(v) = kv.get("DISTANCE") {
-            Some(v.parse()?)
-        } else {
-            None
-        };
-        let hgvs_c: Option<String> = kv.get("HGVSc").map(|v| v.to_string());
-        let hgvs_p: Option<String> = kv.get("HGVSp").map(|v| v.to_string());
+        let impact = kv.get("IMPACT").map(|&v| v.parse()).transpose()?;
+        let symbol = kv.get("SYMBOL").map(ToString::to_string);
+        let distance = kv.get("DISTANCE").map(|&v| v.parse()).transpose()?;
+        let hgvs_c = kv.get("HGVSc").map(ToString::to_string);
+        let hgvs_p = kv.get("HGVSp").map(ToString::to_string);
 
         Ok(VEPExtra {
             impact,
@@ -329,120 +444,8 @@ impl FromStr for VEPExtra {
     }
 }
 
-// #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-// pub enum VEPImpact {
-//     Low,
-//     Moderate,
-//     High,
-//     Modifier,
-// }
-//
-// impl FromStr for VEPImpact {
-//     type Err = anyhow::Error;
-//
-//     fn from_str(s: &str) -> Result<Self> {
-//         match s {
-//             "LOW" => Ok(VEPImpact::Low),
-//             "MODERATE" => Ok(VEPImpact::Moderate),
-//             "HIGH" => Ok(VEPImpact::High),
-//             "MODIFIER" => Ok(VEPImpact::Modifier),
-//             _ => Err(anyhow!("Unexpected VEP Impact value")),
-//         }
-//     }
-// }
-// pub fn vep_chunk(data: &mut [Variant]) -> Result<()> {
-//     let in_vcf = format!(
-//         "{}/vep_{}.vcf",
-//         temp_dir().to_str().unwrap(),
-//         uuid::Uuid::new_v4()
-//     );
-//     let out_vep = format!(
-//         "{}/vep_{}.txt",
-//         temp_dir().to_str().unwrap(),
-//         uuid::Uuid::new_v4()
-//     );
-//
-//     let mut vcf = File::create(&in_vcf).unwrap();
-//     let vcf_header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?;
-//
-//     writeln!(vcf, "{}", vcf_header.join("\n")).unwrap();
-//
-//     for (i, row) in data.iter().enumerate() {
-//         writeln!(
-//             vcf,
-//             "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
-//             row.contig,
-//             row.position,
-//             i + 1,
-//             row.reference,
-//             row.alternative
-//         )?;
-//     }
-//
-//     if let Err(err) = run_vep(&in_vcf, &out_vep) {
-//         panic!("{err}");
-//     };
-//
-//     // read the results in txt file, parse and add to HashMap
-//     let mut reader_vep = ReaderBuilder::new()
-//         .delimiter(b'\t')
-//         .has_headers(false)
-//         .comment(Some(b'#'))
-//         .flexible(true)
-//         .from_reader(fs::File::open(out_vep.clone())?);
-//
-//     let mut lines: HashMap<u64, Vec<VEPLine>> = HashMap::new();
-//     for line in reader_vep.deserialize::<VEPLine>() {
-//         if let std::result::Result::Ok(line) = line {
-//             if let std::result::Result::Ok(k) = line.uploaded_variation.parse::<u64>() {
-//                 lines
-//                     .raw_entry_mut()
-//                     .from_key(&k)
-//                     .or_insert_with(|| (k, vec![]))
-//                     .1
-//                     .push(line);
-//             } else {
-//                 return Err(anyhow!("Error while parsing: {:?}", line));
-//             }
-//         } else {
-//             return Err(anyhow!("Error while parsing: {:?}", line));
-//         }
-//     }
-//
-//     // remove input and result file
-//     fs::remove_file(in_vcf)?;
-//     fs::remove_file(out_vep)?;
-//
-//     let mut n_not_vep = 0;
-//     data.iter_mut().enumerate().for_each(|(i, entry)| {
-//         let k = (i + 1) as u64;
-//
-//         match lines.get(&k) {
-//             Some(vep_lines) => {
-//                 let vep: Vec<VEP> = vep_lines
-//                     .iter()
-//                     .map(|e| match VEP::from_vep_line(e) {
-//                         std::result::Result::Ok(r) => r,
-//                         Err(err) => panic!("Error while parsing: {} line: {:?}", err, e),
-//                     })
-//                     .collect();
-//                 entry.annotations.push(AnnotationType::VEP(vep.to_vec()));
-//             }
-//             None => {
-//                 n_not_vep += 1;
-//             }
-//         };
-//     });
-//
-//     if n_not_vep > 0 {
-//         warn!("{} variants not annotated by VEP", n_not_vep);
-//     }
-//
-//     Ok(())
-// }
-//
 // VEP need plugin Downstream and SpliceRegion /home/prom/.vep/Plugins
-fn run_vep(in_path: &str, out_path: &str) -> Result<()> {
+pub fn run_vep(in_path: &str, out_path: &str) -> anyhow::Result<()> {
     let bin_dir = "/data/tools/ensembl-vep";
     let dir_cache = "/data/ref/hs1/vepcache/";
     let fasta = "/data/ref/hs1/chm13v2.0.fa";
@@ -490,42 +493,41 @@ fn run_vep(in_path: &str, out_path: &str) -> Result<()> {
 }
 
 pub fn get_best_vep(d: &[VEP]) -> anyhow::Result<VEP> {
-    d.into_iter().filter(|v| v.)
-
-    if d.is_empty() {
-        return Err(anyhow!("No element in VEP vector"));
-    }
-    if d.len() == 1 {
-        return Ok(d.first().unwrap().clone());
-    }
+    let best_impact_veps = d
+        .iter()
+        .chunk_by(|vep| {
+            vep.consequence.as_ref().map_or(VepImpact::MODIFIER, |c| {
+                c.iter()
+                    .map(VepImpact::from)
+                    .min()
+                    .unwrap_or(VepImpact::MODIFIER)
+            })
+        })
+        .into_iter()
+        .min_by_key(|(impact, _)| impact.clone())
+        .map(|(_, group)| group.cloned().collect::<Vec<_>>())
+        .ok_or_else(|| anyhow!("No element in VEP vector"))?;
 
-    let mut parsed: Vec<(usize, NCBIAcc)> = Vec::new();
-    for (i, vep) in d.iter().enumerate() {
-        if let Some(feat) = &vep.feature {
-            if let std::result::Result::Ok(f) = feat
-                .parse::<NCBIAcc>()
-                .context("Error parsing NCBI accession")
-            {
-                parsed.push((i, f));
-            } else {
-                warn!("Can't parse {}", feat);
-            }
-        }
+    if best_impact_veps.len() == 1 {
+        return Ok(best_impact_veps[0].clone());
     }
 
-    parsed.sort_by(|(_, a), (_, b)| a.number.cmp(&b.number));
+    let parsed_veps = best_impact_veps
+        .iter()
+        .enumerate()
+        .filter_map(|(i, vep)| {
+            vep.feature.as_ref().and_then(|feat| {
+                feat.parse::<NCBIAcc>()
+                    .map(|acc| (i, acc))
+                    .map_err(|e| warn!("Can't parse {}: {}", feat, e))
+                    .ok()
+            })
+        })
+        .sorted_by_key(|(_, acc)| (Reverse(acc.prefix == "NM"), acc.number))
+        .collect::<Vec<_>>();
 
-    let nm: Vec<(usize, NCBIAcc)> = parsed
-        .clone()
-        .into_iter()
-        .filter(|(_, e)| e.prefix == *"NM")
-        .collect();
-
-    if !nm.is_empty() {
-        let (k, _) = nm.first().unwrap();
-        return Ok(d.get(*k).unwrap().clone());
-    } else {
-        let (k, _) = parsed.first().unwrap();
-        return Ok(d.get(*k).unwrap().clone());
-    }
+    parsed_veps
+        .first()
+        .map(|(k, _)| best_impact_veps[*k].clone())
+        .ok_or_else(|| anyhow!("No valid NCBI accession found"))
 }

+ 20 - 26
src/callers/clairs.rs

@@ -1,9 +1,9 @@
 use crate::{
-    annotation::{Annotation, Annotations, Caller},
+    annotation::{Annotation, Annotations, Caller, CallerCat},
     collection::{vcf::Vcf, Initialize},
     commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
     config::Config,
-    helpers::{force_or_not, get_temp_file_path},
+    helpers::{force_or_not, temp_file_path},
     io::vcf::read_vcf,
     runners::{run_wait, DockerRun, Run},
     variant::{
@@ -12,9 +12,9 @@ use crate::{
     },
 };
 use anyhow::{Context, Ok};
+use log::info;
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{fs, path::Path};
-use log::info;
 
 #[derive(Debug, Clone)]
 pub struct ClairS {
@@ -122,25 +122,6 @@ impl Run for ClairS {
 
         // Germline
         if !Path::new(&self.clair3_germline_passed).exists() {
-            // let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
-            // let report = bcftools_concat(
-            //     vec![
-            //         self.clair3_germline_tumor.to_string(),
-            //         self.clair3_germline_normal.to_string(),
-            //     ],
-            //     &tmp_file,
-            //     BcftoolsConfig::default(),
-            // )
-            // .context(format!(
-            //     "Error while running bcftools concat for {} and {}",
-            //     &self.output_vcf, &self.output_indels_vcf
-            // ))?;
-            //
-            // let log_file = format!("{}/bcftools_concat_", self.log_dir);
-            // report
-            //     .save_to_file(&log_file)
-            //     .context(format!("Error while writing logs into {log_file}"))?;
-            //
             let report = bcftools_keep_pass(
                 &self.clair3_germline_normal,
                 &self.clair3_germline_passed,
@@ -160,10 +141,9 @@ impl Run for ClairS {
             info!("ClairS germline VCF exists.");
         }
 
-
         if !Path::new(&self.vcf_passed).exists() {
             // Concat output and indels
-            let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
+            let tmp_file = temp_file_path("vcf.gz")?.to_str().unwrap().to_string();
             let report = bcftools_concat(
                 vec![
                     self.output_vcf.to_string(),
@@ -201,10 +181,20 @@ impl Run for ClairS {
     }
 }
 
+impl CallerCat for ClairS {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        (Caller::ClairS, Annotation::Somatic)
+    }
+}
+
 impl Variants for ClairS {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
-        let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Somatic];
-        info!("Loading variant from ClairS {} with annotations: {:?}", self.id, add);
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variant from ClairS {} with annotations: {:?}",
+            self.id, add
+        );
         let variants = read_vcf(&self.vcf_passed)?;
 
         variants.par_iter().for_each(|v| {
@@ -213,6 +203,8 @@ impl Variants for ClairS {
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
+            caller,
+            category,
         })
     }
 }
@@ -228,6 +220,8 @@ impl ClairS {
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.clair3_germline_passed.clone().into())?,
+            caller: Caller::ClairS,
+            category: Annotation::Germline,
         })
     }
 }

+ 25 - 7
src/callers/deep_variant.rs

@@ -4,7 +4,7 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{fs, path::Path};
 
 use crate::{
-    annotation::{Annotation, Annotations, Caller},
+    annotation::{Annotation, Annotations, Caller, CallerCat},
     collection::{vcf::Vcf, InitializeSolo},
     commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
     config::Config,
@@ -124,14 +124,30 @@ impl Run for DeepVariant {
     }
 }
 
+impl CallerCat for DeepVariant {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        let Config {
+            normal_name,
+            tumoral_name,
+            ..
+        } = &self.config;
+        let cat = if *normal_name == self.time {
+            Annotation::SoloConstit
+        } else if *tumoral_name == self.time {
+            Annotation::SoloTumor
+        } else {
+            panic!("Error in time_point name: {}", self.time);
+        };
+        (Caller::DeepVariant, cat)
+    }
+}
+
+
+
 impl Variants for DeepVariant {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
-        let solo = match self.time.as_str() {
-            "diag" => Annotation::SoloTumor,
-            "mrd" => Annotation::SoloConstit,
-            _ => return Err(anyhow::anyhow!("Invalid time point.")),
-        };
-        let add = vec![Annotation::Callers(Caller::DeepVariant), solo];
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
         info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add);
         let variants = read_vcf(&self.vcf_passed)?;
         variants.par_iter().for_each(|v| {
@@ -141,6 +157,8 @@ impl Variants for DeepVariant {
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
+            caller,
+            category,
         })
     }
 }

+ 242 - 153
src/callers/nanomonsv.rs

@@ -1,3 +1,4 @@
+use rayon::prelude::*;
 use std::{
     fs::{self},
     path::{Path, PathBuf},
@@ -8,29 +9,18 @@ use anyhow::Context;
 use log::{error, info, warn};
 
 use crate::{
+    annotation::{Annotation, Annotations, Caller, CallerCat},
+    collection::{vcf::Vcf, Initialize, InitializeSolo},
     commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
-    runners::{run_wait, CommandRun, RunReport},
+    config::Config,
+    io::vcf::read_vcf,
+    runners::{run_wait, CommandRun, Run, RunReport},
+    variant::{
+        variant::{RunnerVariants, Variants},
+        variant_collection::VariantCollection,
+    },
 };
 
-#[derive(Debug, Clone)]
-pub struct NanomonSVConfig {
-    pub bin: String,
-    pub result_dir: String,
-    pub reference: String,
-    pub thread: u8,
-}
-
-impl Default for NanomonSVConfig {
-    fn default() -> Self {
-        Self {
-            bin: "nanomonsv".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            thread: 155,
-        }
-    }
-}
-
 #[derive(Debug)]
 pub struct NanomonSV {
     pub id: String,
@@ -38,104 +28,76 @@ pub struct NanomonSV {
     pub mrd_bam: String,
     pub diag_out_dir: String,
     pub mrd_out_dir: String,
-    pub log: String,
     pub log_dir: String,
-    pub config: NanomonSVConfig,
+    pub vcf_passed: String,
+    pub config: Config,
 }
 
-impl NanomonSV {
-    pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
-        let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
-        let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
-        let log_dir = format!("{}/{id}/log/nanomonsv", &config.result_dir);
+impl Initialize for NanomonSV {
+    fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
+        let id = id.to_string();
+        info!("Initialize Nanomonsv for {id}.");
+        let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
 
-        NanomonSV {
-            id: id.to_string(),
-            diag_bam: diag_bam.to_string(),
-            mrd_bam: mrd_bam.to_string(),
+        if !Path::new(&log_dir).exists() {
+            fs::create_dir_all(&log_dir)
+                .context(format!("Failed  to create {log_dir} directory"))?;
+        }
+
+        let diag_bam = config.tumoral_bam(&id);
+        let mrd_bam = config.normal_bam(&id);
+        let diag_out_dir = config.nanomonsv_output_dir(&id, "diag");
+        fs::create_dir_all(&diag_out_dir)?;
+        let mrd_out_dir = config.nanomonsv_output_dir(&id, "mrd");
+        fs::create_dir_all(&mrd_out_dir)?;
+
+        let vcf_passed = config.nanomonsv_passed_vcf(&id);
+        Ok(Self {
+            id,
+            diag_bam,
+            mrd_bam,
             diag_out_dir,
             mrd_out_dir,
-            log: String::default(),
-            config,
             log_dir,
-        }
+            vcf_passed,
+            config,
+        })
     }
+}
 
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        // Create direcories
-        if !Path::new(&self.diag_out_dir).exists() {
-            fs::create_dir_all(&self.diag_out_dir)
-                .context(format!("Can't create {}", &self.diag_out_dir))?;
-        }
-        if !Path::new(&self.mrd_out_dir).exists() {
-            fs::create_dir_all(&self.mrd_out_dir)
-                .context(format!("Can't create {}", &self.mrd_out_dir))?;
-        }
-        if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).context(format!("Can't create {}", &self.log_dir))?;
-        }
+impl Run for NanomonSV {
+    fn run(&mut self) -> anyhow::Result<()> {
+        somatic_parse(&self)?;
 
-        // Parse
-        info!("Nanomonsv Parse");
+        info!("Nanomonsv Get");
         let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
         let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
 
-        let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
-        let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
-
-        let mut threads_handles = Vec::new();
-        if !Path::new(&diag_info_vcf).exists() {
-            let diag_bam = self.diag_bam.clone();
-            let diag_out_prefix = diag_out_prefix.clone();
-            let config = self.config.clone();
-            let log_dir = self.log_dir.clone();
-            let handle = thread::spawn(move || {
-                let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, config).unwrap();
-                report
-                    .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
-                    .unwrap();
-            });
-            threads_handles.push(handle);
-        }
-
-        if !Path::new(&mrd_info_vcf).exists() {
-            let mrd_out_prefix = mrd_out_prefix.clone();
-            let mrd_bam = self.mrd_bam.clone();
-            let config = self.config.clone();
-            let log_dir = self.log_dir.clone();
-            let handle = thread::spawn(move || {
-                let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, config).unwrap();
-                report
-                    .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
-                    .unwrap();
-            });
-            threads_handles.push(handle);
-        }
-
-        // Wait for all threads to finish
-        for handle in threads_handles {
-            handle.join().expect("Thread panicked");
-        }
-
-        // Get
-        info!("Nanomonsv Get");
         let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
         let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&mrd_result_vcf).exists() {
+            let report = nanomonsv_get(&self.mrd_bam, &mrd_out_prefix, None, None, &self.config)
+                .context(format!(
+                    "Error while running NanomonSV get for {mrd_result_vcf}"
+                ))?;
+            report
+                .save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))
+                .unwrap();
+        }
+
+        if !Path::new(&diag_result_vcf).exists() {
             let report = nanomonsv_get(
-                &self.mrd_bam,
-                &mrd_out_prefix,
-                None,
-                None,
-                self.config.clone(),
+                &self.diag_bam,
+                &diag_out_prefix,
+                Some(&self.mrd_bam),
+                Some(&mrd_out_prefix),
+                &self.config,
             )
             .context(format!(
-                "Error while running NanomonSV get for {mrd_result_vcf}"
+                "Error while running NanomonSV get for {diag_result_vcf}"
             ))?;
-            report
-                .save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))
-                .unwrap();
+            report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?;
         }
 
         if !Path::new(&diag_result_vcf).exists() {
@@ -144,75 +106,114 @@ impl NanomonSV {
                 &diag_out_prefix,
                 Some(&self.mrd_bam),
                 Some(&mrd_out_prefix),
-                self.config.clone(),
+                &self.config,
             )
             .context(format!(
                 "Error while running NanomonSV get for {diag_result_vcf}"
             ))?;
-            report
-                .save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))
-                .unwrap();
+            report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?;
         }
 
-        let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
-        if !Path::new(&vcf_passed).exists() {
-            let report =
-                bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default())
-                    .context(format!("Can't index {vcf_passed}"))?;
+        if !Path::new(&self.vcf_passed).exists() {
+            let report = bcftools_keep_pass(
+                &diag_result_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            )
+            .context(format!("Can't index {}", self.vcf_passed))?;
             report
                 .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
                 .context("Can't save report")?;
         }
+
         Ok(())
     }
 }
 
+impl CallerCat for NanomonSV {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        (Caller::NanomonSV, Annotation::Somatic)
+    }
+}
+
+impl Variants for NanomonSV {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variant from NanomonSV {} with annotations: {:?}",
+            self.id, add
+        );
+        let variants = read_vcf(&self.vcf_passed)?;
+
+        variants.par_iter().for_each(|v| {
+            annotations.insert_update(v.hash_variant(), &add);
+        });
+        Ok(VariantCollection {
+            variants,
+            vcf: Vcf::new(self.vcf_passed.clone().into())?,
+            caller,
+            category,
+        })
+    }
+}
+
+impl RunnerVariants for NanomonSV {}
+
 #[derive(Debug)]
 pub struct NanomonSVSolo {
     pub id: String,
     pub bam: String,
     pub time_point: String,
     pub out_dir: String,
-    pub log: String,
     pub log_dir: String,
-    pub config: NanomonSVConfig,
+    pub vcf_passed: String,
+    pub config: Config,
 }
 
-impl NanomonSVSolo {
-    pub fn new(id: &str, bam: &str, time_point: &str, config: NanomonSVConfig) -> Self {
-        let out_dir = format!("{}/{id}/{time_point}/nanomonsv-solo", &config.result_dir);
-        let log_dir = format!("{}/{id}/log/nanomonsv", &config.result_dir);
-        NanomonSVSolo {
-            id: id.to_string(),
-            bam: bam.to_string(),
+impl InitializeSolo for NanomonSVSolo {
+    fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
+        let id = id.to_string();
+        info!("Initialize Nanomonsv solo for {id} {time}.");
+        let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id);
+
+        if !Path::new(&log_dir).exists() {
+            fs::create_dir_all(&log_dir)
+                .context(format!("Failed  to create {log_dir} directory"))?;
+        }
+
+        let out_dir = config.nanomonsv_solo_output_dir(&id, time);
+        fs::create_dir_all(&out_dir)?;
+
+        let bam = config.solo_bam(&id, time);
+
+        let vcf_passed = config.nanomonsv_solo_passed_vcf(&id, time);
+
+        Ok(Self {
+            id,
+            bam,
+            time_point: time.to_string(),
             out_dir,
-            time_point: time_point.to_string(),
-            log: String::default(),
             log_dir,
+            vcf_passed,
             config,
-        }
+        })
     }
+}
 
-    pub fn run(&mut self) -> anyhow::Result<()> {
-        // Create direcories
-        if !Path::new(&self.out_dir).exists() {
-            fs::create_dir_all(&self.out_dir).context(format!("Can't create {}", &self.out_dir))?;
-        }
-
-        if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.log_dir).context(format!("Can't create {}", &self.log_dir))?;
-        }
-
+impl Run for NanomonSVSolo {
+    fn run(&mut self) -> anyhow::Result<()> {
         // Parse
         info!("Nanomonsv Parse");
         let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point);
 
         let info_vcf = format!("{out_prefix}.bp_info.sorted.bed.gz");
         if !Path::new(&info_vcf).exists() {
-            let report = nanomonsv_parse(&self.bam, &out_prefix, self.config.clone()).context(
-                format!("Error while running NanomonSV parse for {}", self.bam),
-            )?;
-            let log_file = format!("{}/nanomonsv_parse_{}_", self.log_dir, self.time_point);
+            let report = nanomonsv_parse(&self.bam, &out_prefix, &self.config).context(format!(
+                "Error while running NanomonSV parse for {}",
+                self.bam
+            ))?;
+            let log_file = format!("{}/nanomonsv_solo_parse_{}_", self.log_dir, self.time_point);
             report
                 .save_to_file(&log_file)
                 .context(format!("Can't write logs into {log_file}"))?;
@@ -223,37 +224,76 @@ impl NanomonSVSolo {
         let result_vcf = format!("{out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&result_vcf).exists() {
-            let report = nanomonsv_get(&self.bam, &out_prefix, None, None, self.config.clone())
-                .context(format!(
-                    "Error while running NanomonSV get for {result_vcf}"
-                ))?;
-            let log_file = format!("{}/nanomonsv_get_{}_", self.log_dir, self.time_point);
+            let report = nanomonsv_get(&self.bam, &out_prefix, None, None, &self.config).context(
+                format!("Error while running NanomonSV get for {result_vcf}"),
+            )?;
+            let log_file = format!("{}/nanomonsv_solo_get_{}_", self.log_dir, self.time_point);
             report
                 .save_to_file(&log_file)
                 .context(format!("Error while writing log into {log_file}"))?;
         }
 
-        let vcf_passed = format!("{out_prefix}_nanomonsv-solo_PASSED.vcf.gz");
-        if !Path::new(&vcf_passed).exists() {
-            let report = bcftools_keep_pass(&result_vcf, &vcf_passed, BcftoolsConfig::default())
-                .context(format!(
-                    "Error while running bcftools keep PASS for {result_vcf}"
-                ))?;
+        if !Path::new(&self.vcf_passed).exists() {
+            let report =
+                bcftools_keep_pass(&result_vcf, &self.vcf_passed, BcftoolsConfig::default())
+                    .context(format!(
+                        "Error while running bcftools keep PASS for {result_vcf}"
+                    ))?;
 
             let log_file = format!("{}/bcftools_pass_", self.log_dir);
             report
                 .save_to_file(&log_file)
                 .context(format!("Error while writing log into {log_file}"))?;
         }
+
         Ok(())
     }
 }
 
-pub fn nanomonsv_parse(
-    bam: &str,
-    out_prefix: &str,
-    config: NanomonSVConfig,
-) -> anyhow::Result<RunReport> {
+impl CallerCat for NanomonSVSolo {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        let Config {
+            normal_name,
+            tumoral_name,
+            ..
+        } = &self.config;
+        let cat = if *normal_name == self.time_point {
+            Annotation::SoloConstit
+        } else if *tumoral_name == self.time_point {
+            Annotation::SoloTumor
+        } else {
+            panic!("Error in time_point name: {}", self.time_point);
+        };
+        (Caller::NanomonSVSolo, cat)
+    }
+}
+
+impl RunnerVariants for NanomonSVSolo {}
+
+impl Variants for NanomonSVSolo {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variant from ClairS {} with annotations: {:?}",
+            self.id, add
+        );
+        let variants = read_vcf(&self.vcf_passed)?;
+
+        variants.par_iter().for_each(|v| {
+            annotations.insert_update(v.hash_variant(), &add);
+        });
+        Ok(VariantCollection {
+            variants,
+            vcf: Vcf::new(self.vcf_passed.clone().into())?,
+            caller,
+            category,
+        })
+    }
+}
+
+// Helper functions
+pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::Result<RunReport> {
     let args = vec![
         "parse",
         "--reference_fasta",
@@ -261,19 +301,68 @@ pub fn nanomonsv_parse(
         bam,
         out_prefix,
     ];
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
     let res = run_wait(&mut cmd_run)?;
     Ok(res)
 }
 
+pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
+    info!("Nanomonsv Parse");
+    let diag_out_prefix = format!("{}/{}_diag", s.diag_out_dir, s.id);
+    let mrd_out_prefix = format!("{}/{}_mrd", s.mrd_out_dir, s.id);
+
+    let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
+    let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
+
+    let mut threads_handles = Vec::new();
+    if !Path::new(&diag_info_vcf).exists() {
+        let diag_bam = s.diag_bam.clone();
+        let diag_out_prefix = diag_out_prefix.clone();
+        let config = s.config.clone();
+        let log_dir = s.log_dir.clone();
+        let handle = thread::spawn(move || {
+            let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, &config).unwrap();
+            report
+                .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
+                .unwrap();
+        });
+        threads_handles.push(handle);
+    } else {
+        info!("Nanonmonsv parse already exists: {diag_info_vcf}");
+    }
+
+    if !Path::new(&mrd_info_vcf).exists() {
+        let mrd_out_prefix = mrd_out_prefix.clone();
+        let mrd_bam = s.mrd_bam.clone();
+        let config = s.config.clone();
+        let log_dir = s.log_dir.clone();
+        let handle = thread::spawn(move || {
+            let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, &config).unwrap();
+            report
+                .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
+                .unwrap();
+        });
+        threads_handles.push(handle);
+    } else {
+        info!("Nanonmonsv parse already exists: {diag_info_vcf}");
+    }
+
+    // Wait for all threads to finish
+    for handle in threads_handles {
+        handle.join().expect("Thread panicked for nanomonsv parse");
+    }
+
+    Ok(())
+}
+
 pub fn nanomonsv_get(
     bam: &str,
     out_prefix: &str,
     ctrl_bam: Option<&str>,
     ctrl_prefix: Option<&str>,
-    config: NanomonSVConfig,
+    config: &Config,
 ) -> anyhow::Result<RunReport> {
-    let threads = config.thread.to_string();
+    let threads = config.nanomonsv_threads.to_string();
     let mut args = vec!["get"];
 
     if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
@@ -292,12 +381,12 @@ pub fn nanomonsv_get(
         bam,
         &config.reference,
     ]);
-    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
     let res = run_wait(&mut cmd_run)?;
     Ok(res)
 }
 
-pub fn nanomonsv_create_pon(config: NanomonSVConfig, pon_path: &str) -> anyhow::Result<()> {
+pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
     let mut passed_mrd = Vec::new();
     for mrd_dir in find_nanomonsv_dirs(&PathBuf::from(&config.result_dir), "mrd", 0, 3) {
         let mut passed = None;

+ 53 - 14
src/callers/savana.rs

@@ -1,12 +1,22 @@
 use crate::{
-    collection::{HasOutputs, Initialize, Version}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::force_or_not, runners::{run_wait, CommandRun, Run}
+    annotation::{Annotation, Annotations, Caller, CallerCat},
+    collection::{vcf::Vcf, HasOutputs, Initialize, Version},
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    config::Config,
+    helpers::force_or_not,
+    io::vcf::read_vcf,
+    runners::{run_wait, CommandRun, Run},
+    variant::{variant::Variants, variant_collection::VariantCollection},
 };
 use anyhow::Context;
+use rayon::prelude::*;
+use log::info;
 use std::{fs, path::Path};
 
 #[derive(Debug)]
 pub struct Savana {
     pub id: String,
+    pub passed_vcf: String,
     pub config: Config,
     pub log_dir: String,
 }
@@ -18,9 +28,13 @@ impl Initialize for Savana {
             fs::create_dir_all(&log_dir)
                 .context(format!("Failed  to create {log_dir} directory"))?;
         }
+        // Check for haplotagged bam
+
+        let passed_vcf = config.savana_passed_vcf(id);
 
         Ok(Self {
             id: id.to_string(),
+            passed_vcf,
             config,
             log_dir,
         })
@@ -29,7 +43,11 @@ impl Initialize for Savana {
 
 impl Run for Savana {
     fn run(&mut self) -> anyhow::Result<()> {
-        force_or_not(&self.config.savana_output_vcf(&self.id), self.config.savana_force)?;
+        force_or_not(
+            &self.config.savana_output_vcf(&self.id),
+            self.config.savana_force,
+        )?;
+        info!("Running Savana v{}", Savana::version(&self.config)?);
 
         let id = &self.id;
         let savana_args = [
@@ -69,13 +87,13 @@ impl Run for Savana {
             .context(format!("Error while writing logs into {log_file}"))?;
 
         // Keep PASS
-        let passed_vcf = &self.config.savana_passed_vcf(id);
         let output_vcf = &self.config.savana_output_vcf(id);
-        if !Path::new(passed_vcf).exists() && Path::new(output_vcf).exists() {
-            let report = bcftools_keep_pass(output_vcf, passed_vcf, BcftoolsConfig::default())
-                .context(format!(
-                    "Error while running bcftools keep PASS for {output_vcf}"
-                ))?;
+        if !Path::new(&self.passed_vcf).exists() && Path::new(output_vcf).exists() {
+            let report =
+                bcftools_keep_pass(output_vcf, &self.passed_vcf, BcftoolsConfig::default())
+                    .context(format!(
+                        "Error while running bcftools keep PASS for {output_vcf}"
+                    ))?;
             let log_file = format!("{}/bcftools_pass", self.log_dir);
             report
                 .save_to_file(&log_file)
@@ -123,9 +141,30 @@ impl Version for Savana {
     }
 }
 
-// impl LoadVariants for Savana {
-//     fn load_variants(&self) -> anyhow::Result<Variants> {
-//         let variants = read_vcf(self.savana_output_vcf, source, variant_type)?;
-//         Ok()
-//     }
-// }
+impl CallerCat for Savana {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        (Caller::Savana, Annotation::Somatic)
+    }
+}
+
+impl Variants for Savana {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variants from Savana {} with annotations: {:?}",
+            self.id, add
+        );
+        let variants = read_vcf(&self.passed_vcf)?;
+
+        variants.par_iter().for_each(|v| {
+            annotations.insert_update(v.hash_variant(), &add);
+        });
+        Ok(VariantCollection {
+            variants,
+            vcf: Vcf::new(self.passed_vcf.clone().into())?,
+            caller,
+            category,
+        })
+    }
+}

+ 43 - 3
src/callers/severus.rs

@@ -1,10 +1,18 @@
 use crate::{
-    collection::{HasOutputs, Initialize, InitializeSolo, Version},
+    annotation::{Annotation, Annotations, Caller, CallerCat},
+    collection::{vcf::Vcf, HasOutputs, Initialize, InitializeSolo, Version},
     commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
     config::Config,
+    io::vcf::read_vcf,
     runners::{run_wait, CommandRun, Run},
+    variant::{
+        variant::{RunnerVariants, Variants},
+        variant_collection::VariantCollection,
+    },
 };
 use anyhow::Context;
+use rayon::prelude::*;
+use log::info;
 use std::{fs, path::Path};
 
 #[derive(Debug)]
@@ -32,7 +40,7 @@ impl Initialize for Severus {
                 .context(format!("Failed  to create {log_dir} directory"))?;
         }
 
-        let phased_germline_vcf = &config.germline_phased_vcf(id);
+        let phased_germline_vcf = &config.constit_phased_vcf(id);
         if !Path::new(phased_germline_vcf).exists() {
             anyhow::bail!("Could not find required phased VCF: {phased_germline_vcf}")
         }
@@ -47,6 +55,8 @@ impl Initialize for Severus {
 
 impl Run for Severus {
     fn run(&mut self) -> anyhow::Result<()> {
+        info!("Running Severus v{}", Severus::version(&self.config)?);
+
         let id = &self.id;
         let output_vcf = &self.config.severus_output_vcf(id);
         let passed_vcf = &self.config.severus_passed_vcf(id);
@@ -133,6 +143,37 @@ impl Version for Severus {
     }
 }
 
+impl CallerCat for Severus {
+    fn caller_cat(&self) -> (Caller, Annotation) {
+        (Caller::Severus, Annotation::Somatic)
+    }
+}
+
+impl Variants for Severus {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let vcf_passed = self.config.severus_passed_vcf(&self.id);
+        let (caller, category) = self.caller_cat();
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variants from Severus {}: {vcf_passed} with annotations: {:?}",
+            self.id, add
+        );
+        let variants = read_vcf(&vcf_passed)?;
+
+        variants.par_iter().for_each(|v| {
+            annotations.insert_update(v.hash_variant(), &add);
+        });
+        Ok(VariantCollection {
+            variants,
+            vcf: Vcf::new(vcf_passed.into())?,
+            caller,
+            category,
+        })
+    }
+}
+
+impl RunnerVariants for Severus {}
+
 #[derive(Debug)]
 pub struct SeverusSolo {
     pub id: String,
@@ -156,7 +197,6 @@ impl InitializeSolo for SeverusSolo {
             log_dir,
         })
     }
-    // add code here
 }
 
 impl Run for SeverusSolo {

+ 10 - 30
src/collection/mod.rs

@@ -17,11 +17,7 @@ use pandora_lib_variants::variants::Variants;
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
-    callers::{
-        clairs::ClairS,
-        deep_variant::DeepVariant,
-        nanomonsv::{NanomonSV, NanomonSVConfig},
-    },
+    callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV},
     collection::pod5::FlowCellCase,
     commands::{
         dorado::Dorado as BasecallAlign,
@@ -423,7 +419,6 @@ impl Collections {
     }
 
     pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
-        let config = NanomonSVConfig::default();
         self.bam_pairs()
             .iter()
             .filter_map(|(diag, mrd)| {
@@ -437,10 +432,7 @@ impl Collections {
                     None
                 } else {
                     Some(CollectionsTasks::NanomonSV {
-                        config: config.clone(),
                         id: diag.id.clone(),
-                        diag_bam: diag.path.to_string_lossy().to_string(),
-                        mrd_bam: mrd.path.to_string_lossy().to_string(),
                     })
                 }
             })
@@ -750,9 +742,6 @@ pub enum CollectionsTasks {
     },
     NanomonSV {
         id: String,
-        diag_bam: String,
-        mrd_bam: String,
-        config: NanomonSVConfig,
     },
     SomaticVariants {
         id: String,
@@ -776,12 +765,9 @@ impl CollectionsTasks {
             CollectionsTasks::ClairS { id, .. } => {
                 ClairS::initialize(&id, Config::default())?.run()
             }
-            CollectionsTasks::NanomonSV {
-                id,
-                diag_bam,
-                mrd_bam,
-                config,
-            } => NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run(),
+            CollectionsTasks::NanomonSV { id, .. } => {
+                NanomonSV::initialize(&id, Config::default())?.run()
+            }
             CollectionsTasks::WholeScan {
                 id,
                 time_point,
@@ -825,7 +811,10 @@ impl fmt::Display for CollectionsTasks {
             Align(case) => write!(
                 f,
                 "Alignment task for: {} {} {} {}",
-                case.id, case.time_point, case.barcode, case.pod_dir.display()
+                case.id,
+                case.time_point,
+                case.barcode,
+                case.pod_dir.display()
             ),
             DemuxAlign(cases) => write!(
                 f,
@@ -860,17 +849,8 @@ impl fmt::Display for CollectionsTasks {
                     id, diag_bam, mrd_bam
                 )
             }
-            NanomonSV {
-                id,
-                diag_bam,
-                mrd_bam,
-                ..
-            } => {
-                write!(
-                    f,
-                    "NanomonSV calling task for {}, with diag_bam: {}, mrd_bam: {}",
-                    id, diag_bam, mrd_bam
-                )
+            NanomonSV { id } => {
+                write!(f, "NanomonSV calling task for {id}")
             }
             WholeScan {
                 id,

+ 3 - 0
src/commands/longphase.rs

@@ -152,6 +152,9 @@ impl Initialize for LongphasePhase {
 
 impl Run for LongphasePhase {
     fn run(&mut self) -> anyhow::Result<()> {
+        info!("Running longphase phase for: {}", self.vcf);
+        info!("Savong longphase phase in: {}", self.out_prefix);
+
         let args = [
             "phase",
             "-s",

+ 56 - 4
src/config.rs

@@ -42,6 +42,13 @@ pub struct Config {
     pub solo_min_constit_depth: u16,
     pub solo_max_alt_constit: u16,
     pub min_shannon_entropy: f64,
+    pub nanomonsv_bin: String,
+    pub nanomonsv_output_dir: String,
+    pub nanomonsv_force: bool,
+    pub nanomonsv_threads: u8,
+    pub nanomonsv_passed_vcf: String,
+    pub nanomonsv_solo_output_dir: String,
+    pub nanomonsv_solo_passed_vcf: String,
 }
 
 // Here comes names that can't be changed from output of tools
@@ -65,15 +72,16 @@ impl Default for Config {
 
             // File structure
             result_dir: "/data/longreads_basic_pipe".to_string(),
+
             tumoral_name: "diag".to_string(),
             normal_name: "mrd".to_string(),
             haplotagged_bam_tag_name: "hp".to_string(),
 
-            // 
+            //
             mask_bed: "{result_dir}/{id}/diag/mask.bed".to_string(),
 
             germline_phased_vcf:
-                "{result_dir}/{id}/diag/ClairS/clair3_normal_tumoral_germline_output_PS.vcf"
+                "{result_dir}/{id}/diag/ClairS/clair3_normal_germline_output_PS.vcf"
                     .to_string(),
             conda_sh: "/data/miniconda3/etc/profile.d/conda.sh".to_string(),
 
@@ -119,6 +127,16 @@ impl Default for Config {
             modkit_summary_file: "{result_dir}/{id}/{time}/{id}_{time}_5mC_5hmC_summary.txt"
                 .to_string(),
 
+            // Nanomonsv
+            nanomonsv_bin: "nanomonsv".to_string(),
+            nanomonsv_output_dir: "{result_dir}/{id}/{time}/nanomonsv".to_string(),
+            nanomonsv_threads: 150,
+            nanomonsv_force: false,
+            nanomonsv_passed_vcf: "{output_dir}/{id}_diag_nanomonsv_PASSED.vcf.gz".to_string(),
+
+            nanomonsv_solo_output_dir: "{result_dir}/{id}/{time}/nanomonsv-solo".to_string(),
+            nanomonsv_solo_passed_vcf: "{output_dir}/{id}_{time}_nanomonsv-solo_PASSED.vcf.gz".to_string(),
+
             // Pipe
             solo_min_constit_depth: 5,
             solo_max_alt_constit: 1,
@@ -255,7 +273,10 @@ impl Config {
 
     pub fn clairs_output_vcfs(&self, id: &str) -> (String, String) {
         let dir = self.clairs_output_dir(id);
-        (format!("{dir}/{}", *CLAIRS_OUTPUT_NAME), format!("{dir}/{}", *CLAIRS_OUTPUT_INDELS_NAME))
+        (
+            format!("{dir}/{}", *CLAIRS_OUTPUT_NAME),
+            format!("{dir}/{}", *CLAIRS_OUTPUT_INDELS_NAME),
+        )
     }
 
     pub fn clairs_germline_normal_vcf(&self, id: &str) -> String {
@@ -273,6 +294,36 @@ impl Config {
         format!("{dir}/{id}_diag_clair3-germline_PASSED.vcf.gz")
     }
 
+    // Nanomonsv
+    pub fn nanomonsv_output_dir(&self, id: &str, time: &str) -> String {
+        self.nanomonsv_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+            .replace("{time}", time)
+    }
+
+    pub fn nanomonsv_passed_vcf(&self, id: &str) -> String {
+        self.nanomonsv_passed_vcf
+            .replace("{output_dir}", &self.nanomonsv_output_dir(id, "diag"))
+            .replace("{id}", id)
+    }
+
+    // Nanomonsv solo
+    pub fn nanomonsv_solo_output_dir(&self, id: &str, time: &str) -> String {
+        self.nanomonsv_solo_output_dir
+            .replace("{result_dir}", &self.result_dir)
+            .replace("{id}", id)
+            .replace("{time}", time)
+    }
+
+    pub fn nanomonsv_solo_passed_vcf(&self, id: &str, time: &str) -> String {
+        self.nanomonsv_solo_passed_vcf
+            .replace("{output_dir}", &self.nanomonsv_solo_output_dir(id, time))
+            .replace("{id}", id)
+            .replace("{time}", time)
+    }
+
+
     // Savana
     pub fn savana_output_dir(&self, id: &str) -> String {
         self.savana_output_dir
@@ -335,7 +386,8 @@ impl Config {
     }
 
     pub fn constit_vcf(&self, id: &str) -> String {
-        format!("{}/{}_variants_constit.vcf.gz", self.tumoral_dir(id), id)
+        self.clairs_germline_passed_vcf(id)
+        // format!("{}/{}_variants_constit.vcf.gz", self.tumoral_dir(id), id)
     }
 
     pub fn constit_phased_vcf(&self, id: &str) -> String {

+ 2 - 2
src/helpers.rs

@@ -231,10 +231,10 @@ pub fn par_intersection<T: Ord + Send + Sync + Clone>(
         })
 }
 
-pub fn get_temp_file_path() -> std::io::Result<PathBuf> {
+pub fn temp_file_path(suffix: &str) -> std::io::Result<PathBuf> {
     let temp_path = tempfile::Builder::new()
         .prefix("pandora-temp-")
-        .suffix(".tmp")
+        .suffix(suffix)
         .rand_bytes(5)
         .tempfile()?
         .into_temp_path();

+ 19 - 20
src/lib.rs

@@ -27,7 +27,7 @@ lazy_static! {
 mod tests {
     use std::{collections::HashMap, fs, path::Path};
 
-    use annotation::Annotations;
+    use annotation::{vep::{VepLine, VEP}, Annotations};
     use callers::{nanomonsv::nanomonsv_create_pon, savana::Savana, severus::{Severus, SeverusSolo}};
     use collection::{bam::{counts_at, counts_ins_at, ins_pileup, nt_pileup}, Initialize, InitializeSolo, Version};
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
@@ -44,7 +44,7 @@ mod tests {
 
     use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
+    use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}}, collection::{bam::{self, BamType}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
 
     // export RUST_LOG="debug"
     fn init() {
@@ -139,29 +139,14 @@ mod tests {
 
     #[test_log::test]
     fn nanomonsv() -> anyhow::Result<()> {
-        // let config = NanomonSVConfig {
-        //     result_dir: "/data/test".to_string(),
-        //     ..NanomonSVConfig::default()
-        // };
-        // NanomonSV::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run()
-
-        let bam = |id:&str, time_point: &str| format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam");
         let id = "HAMROUNE";
-        NanomonSV::new(id, &bam(id, "diag"), &bam(id, "mrd"), NanomonSVConfig::default()).run()
+        NanomonSV::initialize(id, Config::default())?.run()
     }
 
     #[test]
     fn nanomddonsv_solo() -> anyhow::Result<()> {
         init();
-        let id = "BRETON";
-        let time_point = "diag";
-        NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
-        // let time_point = "mrd";
-        // for id in ["MERY", "CAMARA", "FRANIATTE", "FERATI", "IQBAL", "COLLE", "JOLIVET", "BAFFREAU", "MANCUS", "BELARBI", "BENGUIRAT", "HENAUX", "MEDDAH"] {
-        //
-        // NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
-        // }
-        Ok(())
+        NanomonSVSolo::initialize("BRETON", "diag", Config::default())?.run()
     }
 
     // cargo test run -- --nocapture; ~/run_scripts/notify_finish.sh &
@@ -256,7 +241,7 @@ mod tests {
     #[test]
     fn sv_pon() -> anyhow::Result<()> {
         init();
-        nanomonsv_create_pon(NanomonSVConfig::default(), "/data/ref/hs1/nanomonsv_pon.vcf.gz")
+        nanomonsv_create_pon(&Config::default(), "/data/ref/hs1/nanomonsv_pon.vcf.gz")
     }
 
     #[test]
@@ -570,4 +555,18 @@ mod tests {
         
         Ok(())
     }
+
+    #[test]
+    fn vep_line() -> anyhow::Result<()> {
+        init();
+
+        let line = "chr2_1922358_-/T\tchr2:1922357-1922358\tT\tMYT1L\tNM_001303052.2\tTranscript\tintron_variant\t-\t-\t-\t-\t-\t-\tIMPACT=MODIFIER;STRAND=-1;SYMBOL=MYT1L;SOURCE=chm13v2.0_RefSeq_Liftoff_v5.1_sorted.gff3.gz;HGVSc=NM_001303052.2:c.1619-613dup";
+        let vep_line: VepLine = line.parse()?;
+        println!("{vep_line:#?}");
+
+        let vep: VEP = VEP::try_from(&vep_line)?;
+        println!("{vep:#?}");
+
+        Ok(())
+    }
 }

+ 335 - 233
src/pipes/somatic.rs

@@ -1,16 +1,16 @@
 use log::info;
 use rayon::prelude::*;
-use std::{collections::HashSet, fs::File, io::Write, sync::Arc};
+use std::{collections::HashSet, fs::File, sync::Arc};
 
 use crate::{
     annotation::{Annotation, Annotations, Caller},
-    callers::{clairs::ClairS, deep_variant::DeepVariant},
+    callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV},
     collection::{Initialize, InitializeSolo},
     config::Config,
-    io::vcf::write_vcf,
+    init_solo_callers, init_somatic_callers,
     runners::Run,
     variant::{
-        variant::{load_variants, parallel_intersection, RunnerVariants, VcfVariant},
+        variant::{load_variants, parallel_intersection, CallerBox},
         variant_collection::{ExternalAnnotation, VariantCollection},
     },
 };
@@ -44,25 +44,29 @@ pub struct SomaticStats {
 
 #[derive(Debug, Default, Clone)]
 pub struct InputStats {
-    pub solo_tumor: Vec<(Annotation, usize)>,
-    pub solo_constit: Vec<(Annotation, usize)>,
-    pub germline: Vec<(Annotation, usize)>,
-    pub somatic: Vec<(Annotation, usize)>,
+    pub solo_tumor: Vec<(Caller, usize)>,
+    pub solo_constit: Vec<(Caller, usize)>,
+    pub germline: Vec<(Caller, usize)>,
+    pub somatic: Vec<(Caller, usize)>,
 }
 
 impl InputStats {
     pub fn from_collections(collections: &[VariantCollection]) -> Self {
         let mut stats = Self::default();
         for collection in collections.iter() {
-            let name = format!("{}_{}", collection.vcf.caller, collection.vcf.time);
-            let (caller, cat) = to_callers_cat(&name);
-            match cat {
-                Annotation::SoloTumor => stats.solo_tumor.push((caller, collection.variants.len())),
-                Annotation::SoloConstit => {
-                    stats.solo_constit.push((caller, collection.variants.len()))
-                }
-                Annotation::Germline => stats.germline.push((caller, collection.variants.len())),
-                Annotation::Somatic => stats.somatic.push((caller, collection.variants.len())),
+            match collection.category {
+                Annotation::SoloTumor => stats
+                    .solo_tumor
+                    .push((collection.caller.clone(), collection.variants.len())),
+                Annotation::SoloConstit => stats
+                    .solo_constit
+                    .push((collection.caller.clone(), collection.variants.len())),
+                Annotation::Germline => stats
+                    .germline
+                    .push((collection.caller.clone(), collection.variants.len())),
+                Annotation::Somatic => stats
+                    .somatic
+                    .push((collection.caller.clone(), collection.variants.len())),
                 _ => (),
             };
         }
@@ -79,195 +83,247 @@ impl SomaticStats {
     }
 }
 
-pub fn to_callers_cat(s: &str) -> (Annotation, Annotation) {
-    let splits: Vec<&str> = s.split("_").collect();
-
-    if splits.contains(&"clairs") {
-        (Annotation::Callers(Caller::ClairS), Annotation::Somatic)
-    } else if splits.contains(&"clair3-germline") {
-        (Annotation::Callers(Caller::ClairS), Annotation::Germline)
-    } else if splits.contains(&"DeepVariant") && splits.contains(&"mrd") {
-        (
-            Annotation::Callers(Caller::DeepVariant),
-            Annotation::SoloConstit,
-        )
-    } else if splits.contains(&"DeepVariant") && splits.contains(&"diag") {
-        (
-            Annotation::Callers(Caller::DeepVariant),
-            Annotation::SoloTumor,
-        )
-    } else {
-        panic!("unknown caller: {s}");
-    }
-}
+// pub fn to_callers_cat(s: &str) -> (Annotation, Annotation) {
+//     let splits: Vec<&str> = s.split("_").collect();
+//
+//     if splits.contains(&"clairs") {
+//         (Annotation::Callers(Caller::ClairS), Annotation::Somatic)
+//     } else if splits.contains(&"clair3-germline") {
+//         (Annotation::Callers(Caller::ClairS), Annotation::Germline)
+//     } else if splits.contains(&"DeepVariant") && splits.contains(&"mrd") {
+//         (
+//             Annotation::Callers(Caller::DeepVariant),
+//             Annotation::SoloConstit,
+//         )
+//     } else if splits.contains(&"DeepVariant") && splits.contains(&"diag") {
+//         (
+//             Annotation::Callers(Caller::DeepVariant),
+//             Annotation::SoloTumor,
+//         )
+//     } else if splits.contains(&"nanomonsv") {
+//         (Annotation::Callers(Caller::NanomonSV), Annotation::Somatic)
+//     } else {
+//         panic!("unknown caller: {s}");
+//     }
+// }
 
 impl Run for Somatic {
     fn run(&mut self) -> anyhow::Result<()> {
         let id = self.id.clone();
+
+        // TODO: GZ !!!
+        // LongphasePhase::initialize(&id, self.config.clone())?.run()?;
+
         info!("Running somatic pipe for {id}.");
 
         info!("Initialization...");
-        let mut v: Vec<Box<dyn RunnerVariants + Send + Sync>> = vec![
-            Box::new(ClairS::initialize(&id, self.config.clone())?),
-            Box::new(DeepVariant::initialize(&id, "diag", self.config.clone())?),
-            Box::new(DeepVariant::initialize(&id, "mrd", self.config.clone())?),
-        ];
-
+        let config = self.config.clone();
         let annotations = Arc::new(self.annotations.clone());
 
-        let mut variants_collection = load_variants(&mut v, &annotations)?;
+        // Loading variants collections
+        let mut callers = init_somatic_callers!(&id, &config, ClairS, NanomonSV);
+        callers.extend(init_solo_callers!(
+            &id,
+            &config,
+            DeepVariant,
+            "diag",
+            DeepVariant,
+            "mrd"
+        ));
+        let mut variants_collections = load_variants(&mut callers, &annotations)?;
+
         let clairs_germline =
             ClairS::initialize(&id, self.config.clone())?.germline(&annotations)?;
-        variants_collection.push(clairs_germline);
-        info!("Variants sources loaded: {}", variants_collection.len());
+        variants_collections.push(clairs_germline);
 
-        let mut somatic_stats = SomaticStats::init(&variants_collection);
+        let mut somatic_stats = SomaticStats::init(&variants_collections);
+        info!(
+            "Variants collections from {} vcf ({} variants)",
+            variants_collections.len(),
+            variants_collections.iter().map(|v| v.variants.len()).sum::<usize>()
+        );
 
         // Annotations Stats
         // let mut annotations = Arc::unwrap_or_clone(annotations);
-        annotations.callers_stat();
         // somatic_stats.push_annotations_stats();
 
         // TODO: look at variants: ClairS + DeepVariant + SoloConstit + SoloDiag + Somatic (error
         // in ClairS somatic)
 
         // Filtering Somatic variants
-        info!("Filtering somatic variants (variants not in salo callers on constit sample or germline).");
+        // let mut variants_collections: Vec<VariantCollection> = variants_collections
+        //     .into_iter()
+        //     .map(|var| {
+        //         let total = var.variants.len();
+        //         let (left, _) = annotations.filter_variants(var, |anns| {
+        //             !anns.contains(&Annotation::Germline)
+        //                 && !anns.contains(&Annotation::SoloConstit)
+        //         });
+        //         info!("{} {}\t{}/{}", left.caller, left.category, left.variants.len(), total);
+        //         left
+        //     })
+        //     .collect();
 
-        let tumor_or_somatic_keys = annotations.get_keys_filter(|anns| {
-            !anns.contains(&Annotation::Germline) && !anns.contains(&Annotation::SoloConstit)
-        });
-
-        let (somatic_keys, germline_keys, remains) = parallel_intersection(
-            &variants_collection
-                .iter()
-                .flat_map(|e| e.keys())
-                .collect::<Vec<u128>>(),
-            &tumor_or_somatic_keys,
-        );
-        assert_eq!(0, remains.len());
-
-        info!(
-            "Germline variants positions removed {}.",
-            germline_keys.len()
-        );
-        germline_keys.len();
-
-        let somatic_keys: HashSet<u128> = somatic_keys.into_iter().collect();
         let mut annotations = Arc::try_unwrap(annotations)
             .map_err(|e| anyhow::anyhow!("Failed to unwrap Arc: {:?}", e))?;
-        annotations.retain_keys(&somatic_keys);
         annotations.callers_stat();
 
-        somatic_stats.n_constit_germline = variants_collection
-            .par_iter_mut()
-            .map(|c| {
-                let before = c.variants.len();
-                c.retain_keys(&somatic_keys);
-                let after = c.variants.len();
-                info!(
-                    "Variants removed from {}: {}",
-                    c.vcf.path.display(),
-                    before - after
-                );
-                before - after
-            })
-            .sum();
-
-        variants_collection.retain(|e| !e.variants.is_empty());
-
-        variants_collection.iter().for_each(|c| c.stats());
-
-        info!("Constit Bam annotation...");
-        variants_collection.iter().try_for_each(|c| {
+        // Filter: Variants neither Germline nor SoloConstit
+        info!("Keeping somatic variants (variants neither in solo nor in germline called).");
+        somatic_stats.n_constit_germline =
+            annotations.retain_variants(&mut variants_collections, |anns| {
+                !anns.contains(&Annotation::Germline) && !anns.contains(&Annotation::SoloConstit)
+            });
+        annotations.callers_stat();
+
+        // return Ok(());
+        // let tumor_or_somatic_keys = annotations.get_keys_filter(|anns| {
+        //     !anns.contains(&Annotation::Germline) && !anns.contains(&Annotation::SoloConstit)
+        // });
+        //
+        // let (somatic_keys, germline_keys, remains) = parallel_intersection(
+        //     &variants_collections
+        //         .iter()
+        //         .flat_map(|e| e.keys())
+        //         .collect::<Vec<u128>>(),
+        //     &tumor_or_somatic_keys,
+        // );
+        // assert_eq!(0, remains.len());
+        //
+        // info!(
+        //     "Germline variants positions removed {}.",
+        //     germline_keys.len()
+        // );
+        // germline_keys.len();
+        //
+        // let somatic_keys: HashSet<u128> = somatic_keys.into_iter().collect();
+        // // let mut annotations = Arc::try_unwrap(annotations)
+        // //     .map_err(|e| anyhow::anyhow!("Failed to unwrap Arc: {:?}", e))?;
+        // annotations.retain_keys(&somatic_keys);
+        // annotations.callers_stat();
+        //
+        // somatic_stats.n_constit_germline = variants_collections
+        //     .par_iter_mut()
+        //     .map(|c| {
+        //         let before = c.variants.len();
+        //         c.retain_keys(&somatic_keys);
+        //         let after = c.variants.len();
+        //         info!(
+        //             "Variants removed from {}: {}",
+        //             c.vcf.path.display(),
+        //             before - after
+        //         );
+        //         before - after
+        //     })
+        //     .sum();
+        //
+        // variants_collections.retain(|e| !e.variants.is_empty());
+        //
+        // variants_collections.iter().for_each(|c| c.stats());
+
+        // Annotation: BAM depth, n_alt
+        info!("Reading Constit Bam file for depth and pileup annotation.");
+        variants_collections.iter().try_for_each(|c| {
             c.annotate_with_constit_bam(&annotations, &self.config.normal_bam(&id), 150)
         })?;
         annotations.solo_constit_boundaries(
             self.config.solo_max_alt_constit,
             self.config.solo_min_constit_depth,
         );
-
         annotations.callers_stat();
 
-        //
-        // Remove LowConstitDepth
+        // Filter: Remove LowConstitDepth from annotations and variants collections
         info!(
             "Removing low constit depth (depth < {})",
             self.config.solo_min_constit_depth
         );
+        somatic_stats.n_low_constit = annotations
+            .retain_variants(&mut variants_collections, |anns| {
+                !anns.contains(&Annotation::LowConstitDepth)
+            });
+        annotations.callers_stat();
 
-        let low_constit_keys: HashSet<u128> = annotations
-            .get_keys_filter(|anns| anns.contains(&Annotation::LowConstitDepth))
-            .into_iter()
-            .collect();
-
-        annotations.remove_keys(&low_constit_keys);
-
-        somatic_stats.n_low_constit = variants_collection
-            .par_iter_mut()
-            .map(|c| {
-                let before = c.variants.len();
-                c.remove_keys(&low_constit_keys);
-                let after = c.variants.len();
-                let rm = before - after;
-                info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
-                rm
-            })
-            .sum();
-
-        variants_collection.retain(|e| !e.variants.is_empty());
-
-        variants_collection.iter().for_each(|c| c.stats());
+        // let low_constit_keys: HashSet<u128> = annotations
+        //     .get_keys_filter(|anns| anns.contains(&Annotation::LowConstitDepth))
+        //     .into_iter()
+        //     .collect();
+        //
+        // annotations.remove_keys(&low_constit_keys);
+        //
+        // somatic_stats.n_low_constit = variants_collections
+        //     .par_iter_mut()
+        //     .map(|c| {
+        //         let before = c.variants.len();
+        //         c.remove_keys(&low_constit_keys);
+        //         let after = c.variants.len();
+        //         let rm = before - after;
+        //         info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
+        //         rm
+        //     })
+        //     .sum();
+        //
+        // variants_collections.retain(|e| !e.variants.is_empty());
+        //
+        // variants_collections.iter().for_each(|c| c.stats());
 
-        // Remove High Constit Alt
+        // Filter: remove HighConstitAlt
         info!(
             "Removing high constit alternative allele (alt constit > {})",
             self.config.solo_max_alt_constit
         );
 
-        let high_alt_keys: HashSet<u128> = annotations
-            .get_keys_filter(|anns| anns.contains(&Annotation::HighConstitAlt))
-            .into_iter()
-            .collect();
-
-        annotations.remove_keys(&high_alt_keys);
-        somatic_stats.n_high_alt_constit = variants_collection
-            .par_iter_mut()
-            .map(|c| {
-                let before = c.variants.len();
-                c.remove_keys(&high_alt_keys);
-                let after = c.variants.len();
-                let rm = before - after;
-                info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
-                rm
-            })
-            .sum();
-        variants_collection.retain(|e| !e.variants.is_empty());
-
-        variants_collection.iter().for_each(|c| c.stats());
-
+        somatic_stats.n_high_alt_constit = annotations
+            .retain_variants(&mut variants_collections, |anns| {
+                !anns.contains(&Annotation::HighConstitAlt)
+            });
         annotations.callers_stat();
 
-        // Entropy
-        info!("Entropy annotation...");
-        variants_collection.iter().for_each(|c| {
+        // let high_alt_keys: HashSet<u128> = annotations
+        //     .get_keys_filter(|anns| anns.contains(&Annotation::HighConstitAlt))
+        //     .into_iter()
+        //     .collect();
+        //
+        // annotations.remove_keys(&high_alt_keys);
+        // somatic_stats.n_high_alt_constit = variants_collections
+        //     .par_iter_mut()
+        //     .map(|c| {
+        //         let before = c.variants.len();
+        //         c.remove_keys(&high_alt_keys);
+        //         let after = c.variants.len();
+        //         let rm = before - after;
+        //         info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
+        //         rm
+        //     })
+        //     .sum();
+        // variants_collections.retain(|e| !e.variants.is_empty());
+        //
+        // variants_collections.iter().for_each(|c| c.stats());
+
+
+        // Annotation: Entropy
+        info!("Entropy annotation from {} sequences.", self.config.reference);
+        variants_collections.iter().for_each(|c| {
             c.annotate_with_sequence_entropy(&annotations, &self.config.reference, 10, 150);
         });
         annotations.callers_stat();
 
-        variants_collection
+        // Annotation: Cosmic and GnomAD
+        info!("Annotation with Cosmic and GnomAD.");
+        variants_collections
             .iter()
             .try_for_each(|c| -> anyhow::Result<()> {
                 let ext_annot = ExternalAnnotation::init()?;
                 ext_annot.annotate(&c.variants, &annotations)?;
                 Ok(())
             })?;
-
         annotations.callers_stat();
 
-        let high_alt_gnomad_keys: HashSet<u128> = annotations
-            .get_keys_filter(|anns| {
-                anns.iter()
+        // Filter: Remove variants in Gnomad and in constit bam
+        info!("Filtering out variants in GnomAD and in constit bam at low AF.");
+        somatic_stats.n_high_alt_constit_gnomad =
+            annotations.retain_variants(&mut variants_collections, |anns| {
+                !anns
+                    .iter()
                     .find_map(|a| {
                         if let Annotation::GnomAD(gnomad) = a {
                             Some(gnomad.gnomad_af > 0.0)
@@ -287,103 +343,149 @@ impl Run for Somatic {
                             .map(|constit_alt_condition| gnomad_condition && constit_alt_condition)
                     })
                     .unwrap_or(false)
-            })
-            .into_iter()
-            .collect();
-
-        info!(
-            "Filtering out {} positions, with constit alt <= max contig alt ({}) and in GnomAD.",
-            high_alt_gnomad_keys.len(),
-            self.config.solo_max_alt_constit
-        );
-
-        annotations.remove_keys(&high_alt_gnomad_keys);
-        somatic_stats.n_high_alt_constit_gnomad = variants_collection
-            .par_iter_mut()
-            .map(|c| {
-                let before = c.variants.len();
-                c.remove_keys(&high_alt_gnomad_keys);
-                let after = c.variants.len();
-                let rm = before - after;
-                info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
-                rm
-            })
-            .sum();
-        variants_collection.retain(|e| !e.variants.is_empty());
+            });
         info!(
             "{} variants filtered, with constit alt <= max contig alt ({}) and in GnomAD.",
             somatic_stats.n_high_alt_constit_gnomad, self.config.solo_max_alt_constit
         );
+        annotations.callers_stat();
 
-        let prob_keys: HashSet<u128> = annotations
-            .get_keys_filter(|anns| {
-                anns.contains(&Annotation::SoloTumor)
-                    && !anns.contains(&Annotation::Callers(Caller::ClairS))
-            })
-            .into_iter()
-            .collect();
-
-        info!("Problematic variants {}", prob_keys.len());
-
-        let mut problematic_variants = variants_collection.clone();
-
-        let problematic_variants: Vec<VcfVariant> = problematic_variants
-            .iter_mut()
-            .flat_map(|e| {
-                info!("{} {}:\t{}", e.vcf.caller, e.vcf.time, e.variants.len());
-                e.retain_keys(&prob_keys);
-                e.variants.clone()
-            })
-            .collect();
-        write_vcf(&problematic_variants, "prob.vcf.gz")?;
-
-        // Entropies
-        let entropies: Vec<f64> = problematic_variants
-            .iter()
-            .flat_map(|v| {
-                annotations
-                    .store
-                    .get(&v.hash_variant())
-                    .unwrap()
-                    .iter()
-                    .flat_map(|a| {
-                        if let Annotation::ShannonEntropy(ent) = a {
-                            vec![*ent]
-                        } else {
-                            Vec::new()
-                        }
-                    })
-                    .collect::<Vec<_>>()
-            })
-            .collect();
-        let json_string = serde_json::to_string(&entropies)?;
-        let mut file = File::create("/data/entropies.json")?;
-        file.write_all(json_string.as_bytes())?;
-
-
+        // let high_alt_gnomad_keys: HashSet<u128> = annotations
+        //     .get_keys_filter(|anns| {
+        //         anns.iter()
+        //             .find_map(|a| {
+        //                 if let Annotation::GnomAD(gnomad) = a {
+        //                     Some(gnomad.gnomad_af > 0.0)
+        //                 } else {
+        //                     None
+        //                 }
+        //             })
+        //             .and_then(|gnomad_condition| {
+        //                 anns.iter()
+        //                     .find_map(|a| {
+        //                         if let Annotation::ConstitAlt(n_alt) = a {
+        //                             Some(*n_alt > 0)
+        //                         } else {
+        //                             None
+        //                         }
+        //                     })
+        //                     .map(|constit_alt_condition| gnomad_condition && constit_alt_condition)
+        //             })
+        //             .unwrap_or(false)
+        //     })
+        //     .into_iter()
+        //     .collect();
+        //
+        // info!(
+        //     "Filtering out {} positions, with constit alt <= max contig alt ({}) and in GnomAD.",
+        //     high_alt_gnomad_keys.len(),
+        //     self.config.solo_max_alt_constit
+        // );
+        //
+        // annotations.remove_keys(&high_alt_gnomad_keys);
+        // somatic_stats.n_high_alt_constit_gnomad = variants_collections
+        //     .par_iter_mut()
+        //     .map(|c| {
+        //         let before = c.variants.len();
+        //         c.remove_keys(&high_alt_gnomad_keys);
+        //         let after = c.variants.len();
+        //         let rm = before - after;
+        //         info!("Variants removed from {}: {rm}", c.vcf.path.display(),);
+        //         rm
+        //     })
+        //     .sum();
+        // variants_collections.retain(|e| !e.variants.is_empty());
+        
+
+        // let prob_keys: HashSet<u128> = annotations
+        //     .get_keys_filter(|anns| {
+        //         anns.contains(&Annotation::SoloTumor)
+        //             && !anns.contains(&Annotation::Callers(Caller::ClairS))
+        //     })
+        //     .into_iter()
+        //     .collect();
+        //
+        // info!("Problematic variants {}", prob_keys.len());
+        //
+        // let mut problematic_variants = variants_collection.clone();
+        //
+        // let problematic_variants: Vec<VcfVariant> = problematic_variants
+        //     .iter_mut()
+        //     .flat_map(|e| {
+        //         info!("{} {}:\t{}", e.vcf.caller, e.vcf.time, e.variants.len());
+        //         e.retain_keys(&prob_keys);
+        //         e.variants.clone()
+        //     })
+        //     .collect();
+        // write_vcf(&problematic_variants, "prob.vcf.gz")?;
+        //
+        // // Entropies
+        // let entropies: Vec<f64> = problematic_variants
+        //     .iter()
+        //     .flat_map(|v| {
+        //         annotations
+        //             .store
+        //             .get(&v.hash_variant())
+        //             .unwrap()
+        //             .iter()
+        //             .flat_map(|a| {
+        //                 if let Annotation::ShannonEntropy(ent) = a {
+        //                     vec![*ent]
+        //                 } else {
+        //                     Vec::new()
+        //                 }
+        //             })
+        //             .collect::<Vec<_>>()
+        //     })
+        //     .collect();
+        // let json_string = serde_json::to_string(&entropies)?;
+        // let mut file = File::create("/data/entropies.json")?;
+        // file.write_all(json_string.as_bytes())?;
+
+        // Annotation low entropy
         annotations.low_shannon_entropy(self.config.min_shannon_entropy);
-        let low_ent_keys: HashSet<u128> = annotations
-            .get_keys_filter(|anns| anns.contains(&Annotation::LowEntropy))
-            .into_iter()
-            .collect();
+        annotations.callers_stat();
 
-        annotations.remove_keys(&low_ent_keys);
+        // Filtering low entropy for solo variants.
+        info!("Filtering low entropies");
+        somatic_stats.n_low_entropies = annotations
+            .retain_variants(&mut variants_collections, |anns| {
+                !anns.contains(&Annotation::LowEntropy)
+            });
         annotations.callers_stat();
-        somatic_stats.n_low_entropies = variants_collection
-            .par_iter_mut()
-            .map(|c| {
-                let before = c.variants.len();
-                c.remove_keys(&low_ent_keys);
-                let after = c.variants.len();
-                let rm = before - after;
-                info!("Variants removed from {}: {rm}", c.vcf.path.display());
-                rm
-            })
-            .sum();
-        variants_collection.retain(|e| !e.variants.is_empty());
+
+        // let low_ent_keys: HashSet<u128> = annotations
+        //     .get_keys_filter(|anns| anns.contains(&Annotation::LowEntropy))
+        //     .into_iter()
+        //     .collect();
+        //
+        // annotations.remove_keys(&low_ent_keys);
+        // annotations.callers_stat();
+        // somatic_stats.n_low_entropies = variants_collections
+        //     .par_iter_mut()
+        //     .map(|c| {
+        //         let before = c.variants.len();
+        //         c.remove_keys(&low_ent_keys);
+        //         let after = c.variants.len();
+        //         let rm = before - after;
+        //         info!("Variants removed from {}: {rm}", c.vcf.path.display());
+        //         rm
+        //     })
+        //     .sum();
+        // variants_collections.retain(|e| !e.variants.is_empty());
         // let tw: Vec<VcfVariant> = variants_collection.iter().flat_map(|c| c.variants.clone()).collect();
         // write_vcf(&tw, "low_ent.vcf.gz")?;
 
+        // VEP
+        info!("VEP annotation.");
+        variants_collections
+            .iter()
+            .try_for_each(|c| -> anyhow::Result<()> {
+                let ext_annot = ExternalAnnotation::init()?;
+                ext_annot.annotate_vep(&c.variants, &annotations)?;
+                Ok(())
+            })?;
+        annotations.callers_stat();
 
         Ok(())
     }

+ 25 - 5
src/variant/variant.rs

@@ -1,8 +1,5 @@
 use crate::{
-    annotation::Annotations,
-    positions::{GenomePosition, GetGenomePosition, VcfPosition},
-    runners::Run,
-    variant::variant_collection::VariantCollection,
+    annotation::Annotations, positions::{GenomePosition, GetGenomePosition, VcfPosition}, runners::Run, variant::variant_collection::VariantCollection
 };
 use anyhow::{anyhow, Context, Ok};
 use rayon::prelude::*;
@@ -649,8 +646,31 @@ pub trait VariantId {
 
 pub trait RunnerVariants: Run + Variants + Send + Sync {}
 
+pub type CallerBox = Box<dyn RunnerVariants + Send + Sync>;
+
+#[macro_export]
+macro_rules! init_somatic_callers {
+    ($id:expr, $config:expr, $($runner:ty),+ $(,)?) => {
+        vec![
+            $(
+                Box::new(<$runner>::initialize($id, $config.clone())?) as CallerBox
+            ),+
+        ]
+    };
+}
+#[macro_export]
+macro_rules! init_solo_callers {
+    ($id:expr, $config:expr, $($runner:ty, $arg:expr),+ $(,)?) => {
+        vec![
+            $(
+                Box::new(<$runner>::initialize($id, $arg, $config.clone())?) as CallerBox
+            ),+
+        ]
+    };
+}
+
 pub fn load_variants(
-    iterable: &mut [Box<dyn RunnerVariants + Send + Sync>],
+    iterable: &mut [CallerBox],
     annotations: &Annotations,
 ) -> anyhow::Result<Vec<VariantCollection>> {
     // First, run all items in parallel

+ 170 - 22
src/variant/variant_collection.rs

@@ -1,9 +1,12 @@
-use std::{collections::HashSet, fs::{self, File}, io::{Read, Write}};
+use std::{
+    collections::{HashMap, HashSet},
+    fs::{self, File},
+    io::Write,
+};
 
 use anyhow::Context;
-use chrono::Utc;
 use csv::ReaderBuilder;
-use log::info;
+use log::{info, warn};
 use rayon::prelude::*;
 use uuid::Uuid;
 
@@ -13,17 +16,15 @@ use crate::{
         cosmic::Cosmic,
         echtvar::{parse_echtvar_val, run_echtvar},
         gnomad::GnomAD,
-        Annotation, Annotations,
+        vep::{run_vep, VepLine, VEP},
+        Annotation, Annotations, Caller,
     },
     collection::{
         bam::{counts_at, counts_ins_at},
         vcf::Vcf,
     },
-    helpers::{app_storage_dir, estimate_shannon_entropy, get_temp_file_path},
-    io::{
-        readers::get_reader,
-        vcf::{vcf_header, write_vcf},
-    },
+    helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path},
+    io::{readers::get_reader, vcf::vcf_header},
     pipes::somatic::sequence_at,
 };
 
@@ -31,6 +32,8 @@ use crate::{
 pub struct VariantCollection {
     pub variants: Vec<VcfVariant>,
     pub vcf: Vcf,
+    pub caller: Caller,
+    pub category: Annotation,
 }
 
 impl VariantCollection {
@@ -48,6 +51,33 @@ impl VariantCollection {
             .retain(|v| !keys_to_remove.contains(&v.hash_variant()));
     }
 
+    pub fn partition<F>(self, predicate: F) -> (Self, Self)
+    where
+        F: Fn(&VcfVariant) -> bool,
+    {
+        let Self {
+            variants,
+            vcf,
+            caller,
+            category,
+        } = self;
+        let (left_data, right_data) = variants.into_iter().partition(predicate);
+        (
+            Self {
+                variants: left_data,
+                vcf: vcf.clone(),
+                caller: caller.clone(),
+                category: category.clone(),
+            },
+            Self {
+                variants: right_data,
+                vcf,
+                caller,
+                category,
+            },
+        )
+    }
+
     pub fn chunk_size(&self, max_threads: u8) -> usize {
         let total_items = self.variants.len();
         let min_chunk_size = 1000;
@@ -187,7 +217,7 @@ impl ExternalAnnotation {
             hash BLOB(16) UNIQUE NOT NULL,
             source TEXT NOT NULL,
             data BLOB,
-            created_at TEXT NOT NULL
+            modified TEXT NOT NULL
         )",
             [],
         )?;
@@ -336,25 +366,25 @@ impl ExternalAnnotation {
             .collect::<Vec<_>>();
 
         for (hash, cosmic, gnomad) in results {
-            // let created_at = chrono::Utc::now().to_rfc3339();
+            // let modified = chrono::Utc::now().to_rfc3339();
 
             // Update COSMIC
             // self.conn.execute(
-            //     "INSERT OR REPLACE INTO cosmic_annotations (hash, data, created_at) VALUES (?, ?, ?)",
+            //     "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
             //     params![
             //         hash.to_le_bytes().to_vec(),
             //         cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()),
-            //         created_at
+            //         modified
             //     ],
             // )?;
             //
             // // Update GnomAD
             // self.conn.execute(
-            //     "INSERT OR REPLACE INTO gnomad_annotations (hash, data, created_at) VALUES (?, ?, ?)",
+            //     "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
             //     params![
             //         hash.to_le_bytes().to_vec(),
             //         gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()),
-            //         created_at
+            //         modified
             //     ],
             // )?;
 
@@ -378,16 +408,134 @@ impl ExternalAnnotation {
         Ok(())
     }
 
-    pub fn update_database(
+    pub fn annotate_vep(
         &self,
-        hash: u128,
-        source: &str,
-        data: &[u8],
-        created_at: &str,
+        variants: &[VcfVariant],
+        annotations: &Annotations,
+    ) -> anyhow::Result<()> {
+        let mut unfound = Vec::new();
+
+        for variant in variants {
+            let hash: u128 = variant.hash_variant();
+
+            // Check VEP
+            match self.get_annotation(hash, "VEP")? {
+                Some(veps) => {
+                    annotations
+                        .store
+                        .entry(hash)
+                        .or_default()
+                        .push(Annotation::VEP(veps));
+                }
+                None => {
+                    unfound.push(variant.clone());
+                }
+            }
+        }
+
+        if !unfound.is_empty() {
+            self.process_unfound_vep(unfound, annotations)?;
+        }
+
+        Ok(())
+    }
+
+    pub fn process_unfound_vep(
+        &self,
+        mut unfound: Vec<VcfVariant>,
+        annotations: &Annotations,
     ) -> anyhow::Result<()> {
+        unfound.par_sort();
+        unfound.dedup();
+
+        let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
+
+        let results = unfound
+            .par_chunks(unfound.len() / 33)
+            .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
+                let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
+                let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
+
+                // Write input VCF
+                let mut vcf = File::create(&in_tmp)?;
+                writeln!(vcf, "{}", header)?;
+                for (i, row) in chunk.iter().enumerate() {
+                    writeln!(
+                        vcf,
+                        "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
+                        row.position.contig(),
+                        row.position.position + 1, // vcf
+                        i + 1,
+                        row.reference,
+                        row.alternative
+                    )?;
+                }
+
+                run_vep(&in_tmp, &out_vep)?;
+
+                let mut reader_vep = ReaderBuilder::new()
+                    .delimiter(b'\t')
+                    .has_headers(false)
+                    .comment(Some(b'#'))
+                    .flexible(true)
+                    .from_reader(fs::File::open(out_vep.clone())?);
+
+                let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
+                for line in reader_vep.deserialize() {
+                    let line: VepLine = line.context("Failed to deserialize VepLine")?;
+                    let key = line
+                        .uploaded_variation
+                        .parse::<u64>()
+                        .context("Failed to parse uploaded_variation as u64")?;
+
+                    lines.entry(key).or_default().push(line);
+                }
+
+                fs::remove_file(in_tmp)?;
+                fs::remove_file(out_vep)?;
+
+                let mut n_not_vep = 0;
+                let mut chunk_results: Vec<(u128, Vec<VEP>)> = Vec::new();
+
+                chunk.iter().enumerate().for_each(|(i, entry)| {
+                    let k = (i + 1) as u64;
+
+                    if let Some(vep_lines) = lines.get(&k) {
+                        if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
+                            chunk_results.push((entry.hash_variant(), veps));
+                        }
+                    } else {
+                        n_not_vep += 1;
+                    }
+                });
+
+                if n_not_vep > 0 {
+                    warn!("{n_not_vep} variants not annotated by VEP");
+                }
+                Ok(chunk_results)
+            })
+            .flatten()
+            .collect::<Vec<_>>();
+
+        for (hash, veps) in results {
+            // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
+
+            annotations
+                .store
+                .entry(hash)
+                .or_default()
+                .push(Annotation::VEP(veps));
+        }
+
+        Ok(())
+    }
+
+    pub fn update_database(&self, hash: u128, source: &str, data: &[u8]) -> anyhow::Result<()> {
+        let modified = chrono::Utc::now().to_rfc3339();
+
         self.conn.execute(
-            "INSERT OR REPLACE INTO annotations (hash, source, data, created_at) VALUES (?, ?, ?, ?)",
-            params![hash.to_le_bytes().to_vec(), source, data, created_at],
+            "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)",
+            params![hash.to_le_bytes().to_vec(), source, data, modified],
         )?;
         Ok(())
     }