Thomas před 11 měsíci
rodič
revize
710bd6515c
15 změnil soubory, kde provedl 596 přidání a 167 odebrání
  1. 1 0
      Cargo.lock
  2. 1 0
      Cargo.toml
  3. 0 0
      gg.txt
  4. 22 0
      src/annotation/mod.rs
  5. 32 7
      src/callers/clairs.rs
  6. 20 10
      src/callers/deep_variant.rs
  7. 38 30
      src/commands/dorado.rs
  8. 78 15
      src/helpers.rs
  9. 73 0
      src/io/bed.rs
  10. 1 0
      src/io/mod.rs
  11. 2 2
      src/io/readers.rs
  12. 38 5
      src/lib.rs
  13. 67 53
      src/pipes/somatic.rs
  14. 173 0
      src/positions.rs
  15. 50 45
      src/variant/variant.rs

+ 1 - 0
Cargo.lock

@@ -3639,6 +3639,7 @@ dependencies = [
  "chrono",
  "csv",
  "ctrlc",
+ "dashmap",
  "duct",
  "env_logger 0.11.5",
  "expectrl",

+ 1 - 0
Cargo.toml

@@ -44,3 +44,4 @@ podders = "0.1.4"
 arrow = "53.3.0"
 bgzip = "0.3.1"
 tempfile = "3.14.0"
+dashmap = { version = "6.1.0", features = ["rayon"] }

+ 0 - 0
gg.txt


+ 22 - 0
src/annotation/mod.rs

@@ -0,0 +1,22 @@
+use dashmap::DashMap;
+
+#[derive(Debug, Clone)]
+pub enum Annotation {
+    SoloDiag,
+    SoloConstit,
+    Callers(Caller),
+    Germline,
+    Somatic,
+}
+
+#[derive(Debug, Clone)]
+pub enum Caller {
+    DeepVariant,
+    ClairS,
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct Annotations {
+    pub store: DashMap<u64, Vec<Annotation>>
+}
+

+ 32 - 7
src/callers/clairs.rs

@@ -1,7 +1,15 @@
 use crate::{
-    collection::{vcf::Vcf, Initialize}, commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{force_or_not, get_temp_file_path}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{variant::Variants, variant_collection::VariantCollection}
+    annotation::{Annotation, Annotations, Caller},
+    collection::{vcf::Vcf, Initialize},
+    commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
+    config::Config,
+    helpers::{force_or_not, get_temp_file_path},
+    io::vcf::read_vcf,
+    runners::{run_wait, DockerRun, Run},
+    variant::{variant::Variants, variant_collection::VariantCollection},
 };
 use anyhow::{Context, Ok};
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{fs, path::Path};
 use tracing::info;
 
@@ -187,20 +195,37 @@ impl Run for ClairS {
 }
 
 impl Variants for ClairS {
-    fn variants(&self) -> anyhow::Result<VariantCollection> {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Somatic];
+        let variants = read_vcf(&self.vcf_passed)?;
+        variants.par_iter().for_each(|v| {
+            annotations
+                .store
+                .entry(v.hash_variant())
+                .or_default()
+                .extend(add.clone())
+        });
         Ok(VariantCollection {
-            variants: read_vcf(&self.vcf_passed)?,
+            variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
         })
     }
 }
-
 impl ClairS {
-    pub fn germline(&self) -> anyhow::Result<VariantCollection> {
+    pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Germline];
+        let variants = read_vcf(&self.clair3_germline_passed)?;
+        variants.par_iter().for_each(|v| {
+            annotations
+                .store
+                .entry(v.hash_variant())
+                .or_default()
+                .extend(add.clone())
+        });
+
         Ok(VariantCollection {
-            variants: read_vcf(&self.clair3_germline_passed)?,
+            variants,
             vcf: Vcf::new(self.clair3_germline_passed.clone().into())?,
         })
     }
-
 }

+ 20 - 10
src/callers/deep_variant.rs

@@ -1,15 +1,10 @@
 use anyhow::Context;
 use log::info;
 use std::{fs, path::Path};
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 
 use crate::{
-    collection::{vcf::Vcf, InitializeSolo},
-    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
-    config::Config,
-    helpers::{force_or_not, path_prefix},
-    io::vcf::read_vcf,
-    runners::{run_wait, DockerRun, Run},
-    variant::{variant::Variants, variant_collection::VariantCollection},
+    annotation::{Annotation, Annotations, Caller}, collection::{vcf::Vcf, InitializeSolo}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{force_or_not, path_prefix}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{variant::Variants, variant_collection::VariantCollection}
 };
 
 #[derive(Debug, Clone)]
@@ -62,7 +57,7 @@ impl InitializeSolo for DeepVariant {
 impl Run for DeepVariant {
     fn run(&mut self) -> anyhow::Result<()> {
         force_or_not(&self.vcf_passed, self.config.deepvariant_force)?;
-        
+
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
             let mut docker_run = DockerRun::new(&[
@@ -119,9 +114,24 @@ impl Run for DeepVariant {
 }
 
 impl Variants for DeepVariant {
-    fn variants(&self) -> anyhow::Result<VariantCollection> {
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
+        let solo = match self.time.as_str() {
+            "diag" => Annotation::SoloDiag,
+            "mrd" => Annotation::SoloConstit,
+            _ => return Err(anyhow::anyhow!("Invalid time point."))
+        };
+        let add = vec![Annotation::Callers(Caller::DeepVariant), solo];
+        let variants = read_vcf(&self.vcf_passed)?;
+        variants.par_iter().for_each(|v| {
+            annotations
+                .store
+                .entry(v.hash_variant())
+                .or_default()
+                .extend(add.clone())
+        });
+
         Ok(VariantCollection {
-            variants: read_vcf(&self.vcf_passed)?,
+            variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
         })
     }

+ 38 - 30
src/commands/dorado.rs

@@ -289,7 +289,6 @@ impl Dorado {
             .sequencing_kit
             .to_uppercase();
 
-
         let dorado = format!(
             "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --trim all --reference {ref_mmi}"
         );
@@ -303,52 +302,61 @@ impl Dorado {
         info!("Basecalling ✅");
 
         // Demux the temporary bam file
-                
+
         let tmp_demux_dir = format!("{tmp_dir}/demuxed");
         fs::create_dir(&tmp_demux_dir)?;
 
+        let pipe = format!(
+            "samtools split -@ {} -f '{tmp_demux_dir}/%*_%!.bam' {muxed_bam}",
+            config.align.samtools_view_threads
+        );
         info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
-
-        duct::cmd!(
-            &config.align.dorado_bin,
-            "demux",
-            "--output-dir",
-            &tmp_demux_dir,
-            "--kit-name",
-            &sequencing_kit,
-            &tmp_dir,
-        )
-        .run()?;
+        info!("Running: {pipe}");
+        let pipe_cmd = cmd!("bash", "-c", pipe);
+        pipe_cmd.run()?;
+        //
+        //
+        // duct::cmd!(
+        //     &config.align.dorado_bin,
+        //     "demux",
+        //     "--output-dir",
+        //     &tmp_demux_dir,
+        //     "--kit-name",
+        //     &sequencing_kit,
+        //     &tmp_dir,
+        // )
+        // .run()?;
         info!("Demux ✅");
-
+        //
         for case in cases.iter() {
             let barcode = case.barcode.replace("NB", "");
             let bam = find_unique_file(
                 &tmp_demux_dir,
                 &format!("{sequencing_kit}_barcode{}.bam", barcode),
             )?;
-
-            // Trim
-            let trimmed_bam = format!(
-                "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam",
-                barcode
-            );
-            let pipe = format!(
-                "{} trim --sequencing-kit {sequencing_kit} {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
-                config.align.dorado_bin, &config.align.samtools_view_threads
-            );
-
-            info!("Running: {pipe}");
-            cmd!("bash", "-c", pipe).run()?;
-            info!("Trim ✅");
-
+            //
+            //     // Trim
+            //     let trimmed_bam = format!(
+            //         "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam",
+            //         barcode
+            //     );
+            //     let pipe = format!(
+            //         "{} trim --sequencing-kit {sequencing_kit} {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
+            //         config.align.dorado_bin, &config.align.samtools_view_threads
+            //     );
+            //
+            //     info!("Running: {pipe}");
+            //     cmd!("bash", "-c", pipe).run()?;
+            //     info!("Trim ✅");
+            //
             // Align
             let aligned_bam = format!(
                 "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
                 barcode
             );
             let dorado = format!(
-                "{} aligner --threads 160 {} {trimmed_bam}",
+                //         "{} aligner --threads 160 {} {trimmed_bam}",
+                "{} aligner --threads 160 {} {bam}",
                 config.align.dorado_bin, config.align.ref_fa,
             );
             let samtools_view = format!(

+ 78 - 15
src/helpers.rs

@@ -1,6 +1,8 @@
 use anyhow::Context;
 use log::warn;
-use std::{fs, path::{Path, PathBuf}};
+use std::{
+    cmp::Ordering, fmt, fs, path::{Path, PathBuf}
+};
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
     let mut matching_files = Vec::new();
@@ -76,13 +78,38 @@ pub fn force_or_not(path: &str, force: bool) -> anyhow::Result<()> {
 use rayon::prelude::*;
 use std::cmp::Ord;
 
-pub struct VectorIntersction<T> {
+pub struct VectorIntersection<T> {
     pub common: Vec<T>,
     pub only_in_first: Vec<T>,
     pub only_in_second: Vec<T>,
 }
 
-impl<T> Default for VectorIntersction<T> {
+impl<T> fmt::Display for VectorIntersection<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let total_items = self.common.len() + self.only_in_first.len() + self.only_in_second.len();
+        
+        writeln!(f, "Total items: {}", total_items)?;
+        writeln!(f, "Common: {} ({:.2}%)", 
+               self.common.len(), 
+               percentage(self.common.len(), total_items))?;
+        writeln!(f, "Only in first: {} ({:.2}%)", 
+               self.only_in_first.len(), 
+               percentage(self.only_in_first.len(), total_items))?;
+        writeln!(f, "Only in second: {} ({:.2}%)", 
+               self.only_in_second.len(), 
+               percentage(self.only_in_second.len(), total_items))
+    }
+}
+
+fn percentage(part: usize, total: usize) -> f64 {
+    if total == 0 {
+        0.0
+    } else {
+        (part as f64 / total as f64) * 100.0
+    }
+}
+
+impl<T> Default for VectorIntersection<T> {
     fn default() -> Self {
         Self {
             common: Vec::new(),
@@ -92,7 +119,7 @@ impl<T> Default for VectorIntersction<T> {
     }
 }
 
-impl<T: Ord + Clone> VectorIntersction<T> {
+impl<T: Ord + Clone> VectorIntersection<T> {
     fn merge(&mut self, other: &mut Self) {
         self.common.append(&mut other.common);
         self.only_in_first.append(&mut other.only_in_first);
@@ -100,23 +127,59 @@ impl<T: Ord + Clone> VectorIntersction<T> {
     }
 }
 
-fn intersection<T: Ord + Send + Sync + Clone>(vec1: &[T], vec2: &[T]) -> VectorIntersction<T> {
-    let mut result = VectorIntersction::default();
+fn intersection<T: Ord + Clone>(vec1: &[T], vec2: &[T]) -> VectorIntersection<T> {
+    let mut result = VectorIntersection::default();
     let mut i = 0;
     let mut j = 0;
 
     while i < vec1.len() && j < vec2.len() {
         match vec1[i].cmp(&vec2[j]) {
-            std::cmp::Ordering::Less => {
+            Ordering::Less => {
                 result.only_in_first.push(vec1[i].clone());
                 i += 1;
             }
-            std::cmp::Ordering::Greater => {
+            Ordering::Greater => {
                 result.only_in_second.push(vec2[j].clone());
                 j += 1;
             }
-            std::cmp::Ordering::Equal => {
-                result.common.push(vec1[i].clone());
+            Ordering::Equal => {
+                let val = &vec1[i];
+                let mut count1 = 1;
+                let mut count2 = 1;
+
+                // Count occurrences in vec1
+                while i + 1 < vec1.len() && &vec1[i + 1] == val {
+                    i += 1;
+                    count1 += 1;
+                }
+
+                // Count occurrences in vec2
+                while j + 1 < vec2.len() && &vec2[j + 1] == val {
+                    j += 1;
+                    count2 += 1;
+                }
+
+                // Add to common
+                result
+                    .common
+                    .extend(std::iter::repeat(val.clone()).take(count1.min(count2)));
+
+                // Add excess to only_in_first or only_in_second
+                match count1.cmp(&count2) {
+                    Ordering::Greater => {
+                        result
+                            .only_in_first
+                            .extend(std::iter::repeat(val.clone()).take(count1 - count2));
+                    }
+                    Ordering::Less => {
+                        result
+                            .only_in_second
+                            .extend(std::iter::repeat(val.clone()).take(count2 - count1));
+                    }
+                    Ordering::Equal => {
+                        // No excess elements, do nothing
+                    }
+                }
                 i += 1;
                 j += 1;
             }
@@ -132,23 +195,23 @@ fn intersection<T: Ord + Send + Sync + Clone>(vec1: &[T], vec2: &[T]) -> VectorI
 pub fn par_intersection<T: Ord + Send + Sync + Clone>(
     vec1: &[T],
     vec2: &[T],
-) -> VectorIntersction<T> {
+) -> VectorIntersection<T> {
     let chunk_size = (vec1.len() / rayon::current_num_threads()).max(1);
 
     vec1.par_chunks(chunk_size)
         .map(|chunk| {
             let start = vec2.partition_point(|x| x < &chunk[0]);
             let end = vec2.partition_point(|x| x <= &chunk[chunk.len() - 1]);
-            
+
             // Ensure start is not greater than end
             if start <= end {
                 intersection(chunk, &vec2[start..end])
             } else {
                 // If start > end, there's no intersection for this chunk
-                VectorIntersction::default()
+                VectorIntersection::default()
             }
         })
-        .reduce(VectorIntersction::default, |mut acc, mut x| {
+        .reduce(VectorIntersection::default, |mut acc, mut x| {
             acc.merge(&mut x);
             acc
         })
@@ -161,6 +224,6 @@ pub fn get_temp_file_path() -> std::io::Result<PathBuf> {
         .rand_bytes(5)
         .tempfile()?
         .into_temp_path();
-    
+
     Ok(temp_path.to_path_buf())
 }

+ 73 - 0
src/io/bed.rs

@@ -0,0 +1,73 @@
+use std::{
+    io::{BufRead, BufReader},
+    str::FromStr,
+};
+
+use anyhow::Context;
+use log::warn;
+
+use crate::positions::{GenomeRange, GetGenomeRange};
+
+use super::readers::get_reader;
+
+#[derive(Debug)]
+pub struct BedRow {
+    pub range: GenomeRange,
+    pub name: Option<String>,
+    pub score: Option<u16>,
+    pub strand: Option<bool>,
+}
+
+impl FromStr for BedRow {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        let v: Vec<&str> = s.split('\t').collect();
+        let range: GenomeRange = (
+            *v.first()
+                .ok_or(anyhow::anyhow!("Can't get contig from: {s}"))?,
+            *v.get(1)
+                .ok_or(anyhow::anyhow!("Can't get position from: {s}"))?,
+            *v.get(2)
+                .ok_or(anyhow::anyhow!("Can't get position from: {s}"))?,
+        )
+            .try_into()
+            .context(format!("Can't parse range from: {s}"))?;
+
+        Ok(Self {
+            range,
+            name: v.get(3).map(|v| Some(v.to_string())).unwrap_or(None),
+            score: v.get(4).and_then(|v| v.parse().ok()),
+            strand: v.get(5).and_then(|&v| match v {
+                "+" => Some(true),
+                "-" => Some(false),
+                _ => None,
+            }),
+        })
+    }
+}
+
+impl GetGenomeRange for BedRow {
+    fn range(&self) -> &GenomeRange {
+        &self.range
+    }
+}
+
+pub fn read_bed(path: &str) -> anyhow::Result<Vec<BedRow>> {
+    let reader = BufReader::new(get_reader(path)?);
+
+    let mut res = Vec::new();
+    for (i, line) in reader.lines().enumerate() {
+        match line {
+            Ok(line) => {
+                if line.starts_with("#") {
+                    continue;
+                }
+                res.push(line.parse().context(format!("Can't parse {line}"))?);
+            }
+            Err(e) => warn!("Can't read line {i}: {e}"),
+        }
+    }
+
+    Ok(res)
+}

+ 1 - 0
src/io/mod.rs

@@ -1,3 +1,4 @@
 pub mod pod5_infos;
 pub mod readers;
 pub mod vcf;
+pub mod bed;

+ 2 - 2
src/io/readers.rs

@@ -11,7 +11,7 @@ pub fn get_reader(path: &str) -> anyhow::Result<Box<dyn std::io::Read>> {
         .collect::<Vec<&str>>()
         .last()
         .context(format!("Can't parse {path}"))?;
-    assert!(file_type == "gz" || file_type == "vcf");
+    assert!(file_type == "gz" || file_type == "vcf" || file_type == "bed");
 
     let raw_reader: Box<dyn std::io::Read> = Box::new(File::open(path)?);
 
@@ -20,7 +20,7 @@ pub fn get_reader(path: &str) -> anyhow::Result<Box<dyn std::io::Read>> {
             let reader = Box::new(BGZFReader::new(raw_reader)?);
             Ok(Box::new(BufReader::new(reader)))
         }
-        "vcf" => Ok(Box::new(BufReader::new(raw_reader))),
+        "vcf" | "bed" => Ok(Box::new(BufReader::new(raw_reader))),
         t => {
             panic!("unknown file type: {}", t)
         }

+ 38 - 5
src/lib.rs

@@ -11,6 +11,8 @@ pub mod helpers;
 pub mod variant;
 pub mod io;
 pub mod pipes;
+pub mod positions;
+pub mod annotation;
 // pub mod vcf_reader;
 
 #[macro_use]
@@ -25,12 +27,15 @@ lazy_static! {
 mod tests {
     use std::{fs, path::Path};
 
+    use annotation::Annotations;
     use callers::{nanomonsv::nanomonsv_create_pon, savana::Savana, severus::{Severus, SeverusSolo}};
     use collection::{Initialize, InitializeSolo, Version};
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
+    use io::bed::read_bed;
     use log::info;
     use pipes::somatic::Somatic;
+    use positions::{overlaps_par, GenomePosition, GenomeRange};
     // use pandora_lib_assembler::assembler::AssembleConfig;
     use rayon::prelude::*;
     use runners::Run;
@@ -163,7 +168,7 @@ mod tests {
     fn todo_all() -> anyhow::Result<()> {
         init();
         // let config = CollectionsConfig::default();
-        let config = CollectionsConfig { pod_dir: "/data/store".to_string(), ..Default::default() };
+        let config = CollectionsConfig { pod_dir: "/data/run_data".to_string(), ..Default::default() };
         info!("Runing todo with config: {:#?}", config);
         let mut collections = Collections::new(config)?;
         collections.todo()?;
@@ -203,7 +208,7 @@ mod tests {
     #[test_log::test]
     fn run_t() -> anyhow::Result<()> {
         // let config = CollectionsConfig::default();
-        let config = CollectionsConfig { pod_dir: "/data/store".to_string(), ..Default::default() };
+        let config = CollectionsConfig { pod_dir: "/data/run_data".to_string(), ..Default::default() };
 
         run_tasks(config)
     }
@@ -430,7 +435,8 @@ mod tests {
         let id = "ADJAGBA";
         let time = "diag";
         let dv = DeepVariant::initialize(id, time, Config::default())?;
-        let variant_collection = dv.variants()?;
+        let annotations = Annotations::default();
+        let variant_collection = dv.variants(&annotations)?;
         println!("Deepvariant for {id} {time}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
         Ok(())
     }
@@ -440,7 +446,8 @@ mod tests {
         init();
         let id = "ADJAGBA";
         let clairs = ClairS::initialize(id, Config::default())?;
-        let variant_collection = clairs.variants()?;
+        let annotations = Annotations::default();
+        let variant_collection = clairs.variants(&annotations)?;
         println!("ClairS for {id}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants);
         Ok(())
     }
@@ -450,7 +457,8 @@ mod tests {
         init();
         let id = "ADJAGBA";
         let clairs = ClairS::initialize(id, Config::default())?;
-        let germline_variant_collection = clairs.germline()?;
+        let annotations = Annotations::default();
+        let germline_variant_collection = clairs.germline(&annotations)?;
         println!("ClairS for {id}: variants {} {}", germline_variant_collection.variants.len(), germline_variant_collection.vcf.n_variants);
         Ok(())
     }
@@ -462,4 +470,29 @@ mod tests {
         Somatic::initialize(id, Config::default())?.run()
     }
 
+    #[test]
+    fn overlaps() {
+        let positions = vec![
+            &GenomePosition { contig: 1, position: 100 },
+            &GenomePosition { contig: 1, position: 150 },
+            &GenomePosition { contig: 1, position: 200 },
+            &GenomePosition { contig: 2, position: 150 },
+        ];
+
+        let ranges = vec![
+            &GenomeRange { contig: 1, range: 50..150 },
+            &GenomeRange { contig: 2, range: 100..200 },
+        ];
+
+        let parallel_overlapping_indices = overlaps_par(&positions, &ranges);
+        assert_eq!(parallel_overlapping_indices, vec![0, 3])
+    }
+
+    #[test]
+    fn bed_read() -> anyhow::Result<()> {
+        let path = &Config::default().mask_bed("ADJAGBA");
+        let r = read_bed(path)?;
+        println!("{}", r.len());
+        Ok(())
+    }
 }

+ 67 - 53
src/pipes/somatic.rs

@@ -2,21 +2,11 @@ use anyhow::Context;
 use log::info;
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{
-    collections::HashMap,
-    fs::File,
-    io::{BufRead, BufReader},
-    ops::Range,
-    thread,
+    collections::HashMap, fs::File, io::{BufRead, BufReader}, ops::Range, sync::Arc, thread
 };
 
 use crate::{
-    callers::{clairs::ClairS, deep_variant::DeepVariant},
-    collection::{Initialize, InitializeSolo},
-    config::Config,
-    helpers::par_intersection,
-    io::vcf::write_vcf,
-    runners::Run,
-    variant::variant::Variants,
+    annotation::Annotations, callers::{clairs::ClairS, deep_variant::DeepVariant}, collection::{Initialize, InitializeSolo}, config::Config, helpers::{par_intersection, VectorIntersection}, io::{bed::read_bed, vcf::write_vcf}, positions::{overlaps_par, par_overlaps}, runners::Run, variant::variant::Variants
 };
 
 #[derive(Debug)]
@@ -30,7 +20,6 @@ impl Initialize for Somatic {
         let id = id.to_string();
         Ok(Self { id, config })
     }
-    // add code here
 }
 
 impl Run for Somatic {
@@ -38,30 +27,6 @@ impl Run for Somatic {
         info!("Running Somatic pipe for {}", self.id);
 
         info!("Initialization...");
-        let masks = {
-            let mut hm = HashMap::new();
-
-            let reader = BufReader::new(File::open(self.config.mask_bed(&self.id))?);
-            let mut last_chr = "chr1".to_string();
-            let mut acc = Vec::new();
-            for line in reader.lines() {
-                let line = line?;
-                let (contig, range) = line.split_once("\t").context("")?;
-                let (start, end) = range.split_once("\t").context("")?;
-                let r = Range {
-                    start: start.parse::<u32>()?,
-                    end: end.parse::<u32>()?,
-                };
-                if last_chr != contig {
-                    hm.insert(last_chr.clone(), acc.clone());
-                    last_chr = contig.to_string();
-                    acc.clear();
-                }
-                acc.push(r);
-            }
-            hm.insert(last_chr, acc);
-            hm
-        };
 
         let mut clairs = ClairS::initialize(&self.id, self.config.clone())?;
         let mut deep_variant_mrd = DeepVariant::initialize(&self.id, "mrd", self.config.clone())?;
@@ -71,17 +36,24 @@ impl Run for Somatic {
         deep_variant_mrd.run()?;
 
         info!("Loading Germlines VCF from DeepVariant mrd and ClairS germline, in parallel...");
+        let annotations = Arc::new(Annotations::default());
 
         let clairs_handle = {
             let clairs = clairs.clone();
-            thread::spawn(move || clairs.germline())
+            let annotations = Arc::clone(&annotations);
+
+            thread::spawn(move || clairs.germline(&annotations))
         };
 
         let deep_variant_mrd_handle = {
             let deep_variant_mrd = deep_variant_mrd.clone();
-            thread::spawn(move || deep_variant_mrd.variants())
+            let annotations = Arc::clone(&annotations);
+
+            thread::spawn(move || deep_variant_mrd.variants(&annotations))
         };
 
+        let annotations = Arc::unwrap_or_clone(annotations);
+
         let clairs_germline = clairs_handle
             .join()
             .map_err(|e| anyhow::anyhow!("Thread panic in clairs germline: {:?}", e))
@@ -99,25 +71,67 @@ impl Run for Somatic {
 
         info!("Merging variants");
 
-        let u = par_intersection(&deep_variant_germline.variants, &clairs_germline.variants);
+        let VectorIntersection {
+            common: germline_common,
+            only_in_first: only_in_deep_variant_mrd,
+            only_in_second: only_in_clairs_germline,
+        } = par_intersection(&deep_variant_germline.variants, &clairs_germline.variants);
 
         info!(
             "common: {}, only DeepVariant: {}, only ClairS: {}",
-            u.common.len(),
-            u.only_in_first.len(),
-            u.only_in_second.len()
+            germline_common.len(),
+            only_in_deep_variant_mrd.len(),
+            only_in_clairs_germline.len()
         );
 
-        [
-            (&u.common, "commun.vcf.gz"),
-            (&u.only_in_first, "dv.vcf.gz"),
-            (&u.only_in_second, "clairs.vcf.gz"),
-        ]
-        .par_iter()
-        .for_each(|(v, p)| {
-            write_vcf(v, p).unwrap();
-        });
-
+        // Write vcf
+        // [
+        //     (germline_common, "common.vcf.gz"),
+        //     (only_in_deep_variant_mrd, "deep_variant_only.vcf.gz"),
+        //     (only_in_clairs_germline, "clairs_only.vcf.gz"),
+        // ]
+        // .par_iter()
+        // .for_each(|(v, p)| {
+        //     write_vcf(v, p).unwrap();
+        // });
+
+        let deep_variant_diag =
+            DeepVariant::initialize(&self.id, "diag", self.config.clone())?.variants(&annotations)?;
+
+        info!("Intersection...");
+        // filter common
+        let deep_diag_int =
+            par_intersection(&deep_variant_diag.variants, &germline_common);
+        println!("filtering out common germline\n{deep_diag_int}");
+        let somatic = deep_diag_int.only_in_first;
+        println!("N somatic: {}", somatic.len());
+
+        // filter only in clairs
+        let deep_diag_int =
+            par_intersection(&somatic, &only_in_clairs_germline);
+        println!("filtering out germline only in clairs\n{deep_diag_int}");
+        let somatic = deep_diag_int.only_in_first;
+        println!("N somatic: {}", somatic.len());
+
+        // filter only in deepvariant mrd
+        let deep_diag_int =
+            par_intersection(&somatic, &only_in_deep_variant_mrd);
+        println!("fiiltering out germline only in deep_variant_mrd\n{deep_diag_int}");
+        let deep_variant_diag_f = deep_diag_int.only_in_first;
+        println!("N somatic: {}", somatic.len());
+
+        // Load clairs somatic
+        let clairs_somatic = ClairS::initialize(&self.id, self.config.clone())?.variants(&annotations)?;
+        let masked = read_bed(&self.config.mask_bed(&self.id))?;
+
+
+        let somatic_int = par_intersection(&deep_variant_diag_f, &clairs_somatic.variants);
+
+        println!("deep_variant_diag_filtered & clairs\n{somatic_int}");
+        let clairs_masked = par_overlaps(&clairs_somatic.variants, &masked);
+        let deepvariant_masked = par_overlaps(&deep_variant_diag_f, &masked);
+        info!("Clairs masked: {}", clairs_masked.len());
+        info!("Deepvariant masked: {}", deepvariant_masked.len());
         Ok(())
     }
 }

+ 173 - 0
src/positions.rs

@@ -0,0 +1,173 @@
+use std::{cmp::Ordering, ops::Range};
+
+use anyhow::Context;
+use rayon::prelude::*;
+use serde::{Deserialize, Serialize};
+
+// 0-based
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
+pub struct GenomePosition {
+    pub contig: u8,
+    pub position: u32,
+}
+
+impl TryFrom<(&str, &str)> for GenomePosition {
+    type Error = anyhow::Error;
+
+    fn try_from((c, p): (&str, &str)) -> anyhow::Result<Self> {
+        Ok(GenomePosition {
+            contig: contig_to_num(c),
+            position: p.parse().context(format!("Can't parse {p} into u32."))?,
+        })
+    }
+}
+
+impl From<VcfPosition> for GenomePosition {
+    fn from(value: VcfPosition) -> Self {
+        Self {
+            contig: value.contig,
+            position: value.position - 1,
+        }
+    }
+}
+
+impl PartialOrd for GenomePosition {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl From<GenomePosition> for (String, String) {
+    fn from(val: GenomePosition) -> Self {
+        (num_to_string(val.contig), val.position.to_string())
+    }
+}
+
+impl Ord for GenomePosition {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match self.contig.cmp(&other.contig) {
+            Ordering::Equal => self.position.cmp(&other.position),
+            other => other,
+        }
+    }
+}
+
+pub fn contig_to_num(contig: &str) -> u8 {
+    match contig {
+        "chrX" => 23,
+        "chrY" => 24,
+        "chrM" => 25,
+        c => c.trim_start_matches("chr").parse().unwrap_or(u8::MAX),
+    }
+}
+
+pub fn num_to_string(num: u8) -> String {
+    match num {
+        23 => "chrX".to_string(),
+        24 => "chrY".to_string(),
+        25 => "chrM".to_string(),
+        c => format!("chr{c}"),
+    }
+}
+
+// 1-based
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+pub struct VcfPosition {
+    pub contig: u8,
+    pub position: u32,
+}
+
+impl TryFrom<(&str, &str)> for VcfPosition {
+    type Error = anyhow::Error;
+
+    fn try_from((c, p): (&str, &str)) -> anyhow::Result<Self> {
+        Ok(VcfPosition {
+            contig: contig_to_num(c),
+            position: p.parse().context(format!("Can't parse {p} into u32."))?,
+        })
+    }
+}
+
+impl From<GenomePosition> for VcfPosition {
+    fn from(value: GenomePosition) -> Self {
+        Self {
+            contig: value.contig,
+            position: value.position + 1,
+        }
+    }
+}
+
+impl From<VcfPosition> for (String, String) {
+    fn from(val: VcfPosition) -> Self {
+        (num_to_string(val.contig), val.position.to_string())
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+pub struct GenomeRange {
+    pub contig: u8,
+    pub range: Range<u32>,
+}
+
+impl TryFrom<(&str, &str, &str)> for GenomeRange {
+    type Error = anyhow::Error;
+
+    fn try_from((c, s, e): (&str, &str, &str)) -> anyhow::Result<Self> {
+        Ok(Self {
+            contig: contig_to_num(c),
+            range: Range {
+                start: s.parse().context(format!("Can't parse {s} into u32."))?,
+                end: e.parse().context(format!("Can't parse {e} into u32."))?,
+            },
+        })
+    }
+}
+
+pub fn overlaps_par(positions: &[&GenomePosition], ranges: &[&GenomeRange]) -> Vec<usize> {
+    let chunk_size = (positions.len() / rayon::current_num_threads()).max(1);
+    
+    positions.par_chunks(chunk_size)
+        .enumerate()
+        .flat_map(|(chunk_idx, chunk)| {
+            let mut result = Vec::new();
+            let mut range_idx = ranges.partition_point(|r| r.contig < chunk[0].contig);
+
+            for (pos_idx, pos) in chunk.iter().enumerate() {
+                while range_idx < ranges.len() && ranges[range_idx].contig < pos.contig {
+                    range_idx += 1;
+                }
+
+                if range_idx < ranges.len() && ranges[range_idx].contig == pos.contig {
+                    while range_idx < ranges.len() && 
+                          ranges[range_idx].contig == pos.contig && 
+                          ranges[range_idx].range.end <= pos.position {
+                        range_idx += 1;
+                    }
+
+                    if range_idx < ranges.len() && 
+                       ranges[range_idx].contig == pos.contig && 
+                       ranges[range_idx].range.contains(&pos.position) {
+                        result.push(chunk_idx * chunk_size + pos_idx);
+                    }
+                }
+            }
+            result
+        })
+        .collect()
+}
+
+pub fn par_overlaps(a: &[impl GetGenomePosition], b: &[impl GetGenomeRange]) -> Vec<usize> {
+    let a: Vec<_> = a.iter().map(|e| e.position()).collect();
+    let b: Vec<_> = b.iter().map(|e| e.range()).collect();
+    overlaps_par(&a, &b)
+}
+
+pub trait GetGenomePosition {
+    fn position(&self) -> &GenomePosition;
+}
+
+pub trait GetGenomeRange {
+    fn range(&self) -> &GenomeRange;
+}
+
+

+ 50 - 45
src/variant/variant.rs

@@ -1,12 +1,20 @@
-use crate::variant::variant_collection::VariantCollection;
+use crate::{
+    annotation::Annotations,
+    positions::{GenomePosition, GetGenomePosition, VcfPosition},
+    variant::variant_collection::VariantCollection,
+};
 use anyhow::{anyhow, Context, Ok};
 use serde::{Deserialize, Serialize};
-use std::{cmp::Ordering, fmt, str::FromStr};
+use std::{
+    cmp::Ordering,
+    fmt,
+    hash::{Hash, Hasher},
+    str::FromStr,
+};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct VcfVariant {
-    pub contig: String,
-    pub position: u32,
+    pub position: GenomePosition,
     pub id: String,
     pub reference: ReferenceAlternative,
     pub alternative: ReferenceAlternative,
@@ -19,19 +27,31 @@ pub struct VcfVariant {
 impl PartialEq for VcfVariant {
     fn eq(&self, other: &Self) -> bool {
         // Nota bene: id, filter, info, format and quality is intentionally not compared
-        self.contig == other.contig
-            && self.position == other.position
+        self.position == other.position
             && self.reference == other.reference
             && self.alternative == other.alternative
     }
 }
 
+impl Hash for VcfVariant {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.position.hash(state);
+        self.reference.hash(state);
+        self.alternative.hash(state);
+    }
+}
 impl Eq for VcfVariant {}
 impl FromStr for VcfVariant {
     type Err = anyhow::Error;
 
     fn from_str(s: &str) -> anyhow::Result<Self> {
         let v: Vec<&str> = s.split('\t').collect();
+        let vcf_position: VcfPosition = (
+            *v.first().ok_or(anyhow!("Can't get contig from: {s}"))?,
+            *v.get(1).ok_or(anyhow!("Can't get position from: {s}"))?,
+        )
+            .try_into()
+            .context(format!("Can't parse position from: {s}"))?;
 
         let formats = if v.len() == 10 {
             (
@@ -45,15 +65,7 @@ impl FromStr for VcfVariant {
         };
 
         Ok(Self {
-            contig: v
-                .first()
-                .ok_or(anyhow!("Can't parse contig from: {s}"))?
-                .to_string(),
-            position: v
-                .get(1)
-                .ok_or(anyhow!("Can't parse contig from: {s}"))?
-                .parse()
-                .context(format!("Can't parse position from: {s}"))?,
+            position: vcf_position.into(),
             id: v
                 .get(2)
                 .ok_or(anyhow!("Can't parse id from: {s}"))?
@@ -90,9 +102,12 @@ impl FromStr for VcfVariant {
 // #CHROM  POS     ID      REF     ALT     QUAL    FILTER  INFO    FORMAT  ADJAGBA_diag
 impl VcfVariant {
     pub fn into_vcf_row(&self) -> String {
+        let vcf_position: VcfPosition = self.position.clone().into();
+        let (contig, position) = vcf_position.into();
+
         let mut columns = vec![
-            self.contig.to_string(),
-            self.position.to_string(),
+            contig,
+            position,
             self.id.to_string(),
             self.reference.to_string(),
             self.alternative.to_string(),
@@ -112,17 +127,9 @@ impl VcfVariant {
         columns.join("\t")
     }
 
-    pub fn chr_num(&self) -> u32 {
-        self.contig
-            .trim_start_matches("chr")
-            .parse()
-            .unwrap_or(u32::MAX)
-    }
-
     pub fn commun_deepvariant_clairs(&self) -> VcfVariant {
         VcfVariant {
-            contig: self.contig.clone(),
-            position: self.position,
+            position: self.position.clone(),
             id: self.id.clone(),
             reference: self.reference.clone(),
             alternative: self.alternative.clone(),
@@ -132,6 +139,18 @@ impl VcfVariant {
             formats: self.formats.commun_deepvariant_clairs(),
         }
     }
+
+    pub fn hash_variant(&self) -> u64 {
+        let mut hasher = std::collections::hash_map::DefaultHasher::new();
+        self.hash(&mut hasher);
+        hasher.finish()
+    }
+}
+
+impl GetGenomePosition for VcfVariant {
+    fn position(&self) -> &GenomePosition {
+        &self.position
+    }
 }
 
 impl PartialOrd for VcfVariant {
@@ -142,25 +161,10 @@ impl PartialOrd for VcfVariant {
 
 impl Ord for VcfVariant {
     fn cmp(&self, other: &Self) -> Ordering {
-        let self_num = self.chr_num();
-        let other_num = other.chr_num();
-
-        match self_num.cmp(&other_num) {
-            Ordering::Equal => self.position.cmp(&other.position),
-            other => other,
-        }
+        self.position.cmp(&other.position)
     }
 }
 
-// Tag
-#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
-pub enum Annotation {
-    Source(String),
-    Diag,
-    Constit,
-    Other((String, String)), // (key, value)
-}
-
 /// Info
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
 pub struct Infos(Vec<Info>);
@@ -393,6 +397,7 @@ impl From<Format> for (String, String) {
                 .collect::<Vec<_>>()
                 .join(",")
         };
+
         match format {
             Format::GT(value) => ("GT".to_string(), value),
             Format::GQ(value) => ("GQ".to_string(), value.to_string()),
@@ -469,7 +474,7 @@ impl fmt::Display for Filter {
     }
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
 pub enum ReferenceAlternative {
     Nucleotide(Base),
     Nucleotides(Vec<Base>),
@@ -512,7 +517,7 @@ impl fmt::Display for ReferenceAlternative {
     }
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
 pub enum Base {
     A,
     T,
@@ -565,5 +570,5 @@ impl fmt::Display for Base {
 }
 
 pub trait Variants {
-    fn variants(&self) -> anyhow::Result<VariantCollection>;
+    fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection>;
 }