Thomas hai 11 meses
pai
achega
3085d82cfd

+ 63 - 3
src/annotation/mod.rs

@@ -1,6 +1,9 @@
+use std::{collections::HashSet, fmt};
+
 use dashmap::DashMap;
+use rayon::prelude::*;
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub enum Annotation {
     SoloDiag,
     SoloConstit,
@@ -9,14 +12,71 @@ pub enum Annotation {
     Somatic,
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub enum Caller {
     DeepVariant,
     ClairS,
 }
 
+impl fmt::Display for Caller {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Caller::DeepVariant => write!(f, "DeepVariant"),
+            Caller::ClairS => write!(f, "ClairS"),
+        }
+    }
+}
+
 #[derive(Debug, Default, Clone)]
 pub struct Annotations {
-    pub store: DashMap<u64, Vec<Annotation>>
+    pub store: DashMap<u64, Vec<Annotation>>,
 }
 
+impl Annotations {
+    pub fn insert_update(&self, key: u64, add: &[Annotation]) {
+        self.store
+            .entry(key)
+            .or_default()
+            .extend(add.iter().cloned())
+    }
+
+    pub fn callers_stat(&self) {
+        let map: DashMap<String, u64> = DashMap::new();
+
+        self.store.par_iter().for_each(|e| {
+            let mut s: Vec<String> = e
+                .value()
+                .iter()
+                .flat_map(|v| match v {
+                    Annotation::Callers(caller) => vec![caller.to_string()],
+                    _ => Vec::new(),
+                })
+                .collect();
+            s.sort();
+            s.dedup();
+
+            *map.entry(s.join(" + ")).or_default() += 1;
+        });
+        println!("Callers stats:");
+        println!("\ttotal: {}", map.len());
+        map.iter().for_each(|e| {
+            println!("\t{}:\t{}", e.key(), e.value());
+        });
+    }
+
+    pub fn get_keys_filter(
+        &self,
+        filter: impl Fn(&Vec<Annotation>) -> bool + Send + Sync,
+    ) -> Vec<u64> {
+        self.store
+            .par_iter()
+            .filter(|entry| filter(entry.value()))
+            .map(|entry| *entry.key())
+            .collect()
+    }
+
+    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u64>) {
+        self.store.retain(|key, _| keys_to_keep.contains(key));
+    }
+
+}

+ 18 - 14
src/callers/clairs.rs

@@ -6,12 +6,15 @@ use crate::{
     helpers::{force_or_not, get_temp_file_path},
     io::vcf::read_vcf,
     runners::{run_wait, DockerRun, Run},
-    variant::{variant::Variants, variant_collection::VariantCollection},
+    variant::{
+        variant::{RunnerVariants, Variants},
+        variant_collection::VariantCollection,
+    },
 };
 use anyhow::{Context, Ok};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{fs, path::Path};
-use tracing::info;
+use log::info;
 
 #[derive(Debug, Clone)]
 pub struct ClairS {
@@ -32,6 +35,7 @@ pub struct ClairS {
 impl Initialize for ClairS {
     fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
         let id = id.to_string();
+        info!("Initialize ClairS for {id}.");
         let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
 
         if !Path::new(&log_dir).exists() {
@@ -113,7 +117,7 @@ impl Run for ClairS {
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
         } else {
-            info!("ClairS vcfs already exist");
+            info!("ClairS VCFs exist.");
         }
 
         // Germline
@@ -152,8 +156,11 @@ impl Run for ClairS {
                 .context(format!("Error while writing logs into {log_file}"))?;
 
             fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
+        } else {
+            info!("ClairS germline VCF exists.");
         }
 
+
         if !Path::new(&self.vcf_passed).exists() {
             // Concat output and indels
             let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
@@ -187,7 +194,7 @@ impl Run for ClairS {
 
             fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
         } else {
-            info!("ClairS PASSED vcf already exist");
+            info!("ClairS PASSED vcf exists.");
         }
 
         Ok(())
@@ -197,13 +204,11 @@ impl Run for ClairS {
 impl Variants for ClairS {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Somatic];
+        info!("Loading variant from ClairS {} with annotations: {:?}", self.id, add);
         let variants = read_vcf(&self.vcf_passed)?;
+
         variants.par_iter().for_each(|v| {
-            annotations
-                .store
-                .entry(v.hash_variant())
-                .or_default()
-                .extend(add.clone())
+            annotations.insert_update(v.hash_variant(), &add);
         });
         Ok(VariantCollection {
             variants,
@@ -216,11 +221,7 @@ impl ClairS {
         let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Germline];
         let variants = read_vcf(&self.clair3_germline_passed)?;
         variants.par_iter().for_each(|v| {
-            annotations
-                .store
-                .entry(v.hash_variant())
-                .or_default()
-                .extend(add.clone())
+            annotations.insert_update(v.hash_variant(), &add);
         });
 
         Ok(VariantCollection {
@@ -229,3 +230,6 @@ impl ClairS {
         })
     }
 }
+
+impl RunnerVariants for ClairS {}
+

+ 19 - 8
src/callers/deep_variant.rs

@@ -1,10 +1,20 @@
 use anyhow::Context;
 use log::info;
-use std::{fs, path::Path};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
+use std::{fs, path::Path};
 
 use crate::{
-    annotation::{Annotation, Annotations, Caller}, collection::{vcf::Vcf, InitializeSolo}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{force_or_not, path_prefix}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{variant::Variants, variant_collection::VariantCollection}
+    annotation::{Annotation, Annotations, Caller},
+    collection::{vcf::Vcf, InitializeSolo},
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    config::Config,
+    helpers::{force_or_not, path_prefix},
+    io::vcf::read_vcf,
+    runners::{run_wait, DockerRun, Run},
+    variant::{
+        variant::{RunnerVariants, Variants},
+        variant_collection::VariantCollection,
+    },
 };
 
 #[derive(Debug, Clone)]
@@ -23,6 +33,7 @@ impl InitializeSolo for DeepVariant {
     fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
         let id = id.to_string();
         let time = time.to_string();
+        info!("Initializing DeepVariant for {id} {time}.");
 
         let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
         if !Path::new(&log_dir).exists() {
@@ -118,16 +129,13 @@ impl Variants for DeepVariant {
         let solo = match self.time.as_str() {
             "diag" => Annotation::SoloDiag,
             "mrd" => Annotation::SoloConstit,
-            _ => return Err(anyhow::anyhow!("Invalid time point."))
+            _ => return Err(anyhow::anyhow!("Invalid time point.")),
         };
         let add = vec![Annotation::Callers(Caller::DeepVariant), solo];
+        info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add);
         let variants = read_vcf(&self.vcf_passed)?;
         variants.par_iter().for_each(|v| {
-            annotations
-                .store
-                .entry(v.hash_variant())
-                .or_default()
-                .extend(add.clone())
+            annotations.insert_update(v.hash_variant(), &add);
         });
 
         Ok(VariantCollection {
@@ -136,3 +144,6 @@ impl Variants for DeepVariant {
         })
     }
 }
+
+impl RunnerVariants for DeepVariant {}
+

+ 9 - 9
src/callers/mod.rs

@@ -1,15 +1,15 @@
-use deep_variant::DeepVariant;
-use nanomonsv::NanomonSV;
+// use deep_variant::DeepVariant;
+// use nanomonsv::NanomonSV;
 
-pub mod deep_variant;
 pub mod clairs;
+pub mod deep_variant;
 pub mod nanomonsv;
-pub mod severus;
 pub mod savana;
+pub mod severus;
 
+// #[derive(Debug)]
+// pub enum CallersSolo {
+//     DeepVariant(DeepVariant),
+//     NanomonSV(NanomonSV),
+// }
 
-#[derive(Debug)]
-pub enum Caller {
-    DeepVariant(DeepVariant),
-    NanomonSV(NanomonSV),
-}

+ 113 - 0
src/collection/bam.rs

@@ -234,3 +234,116 @@ pub fn bam_compo(file_path: &str, sample_size: usize) -> anyhow::Result<Vec<(Str
         })
         .collect())
 }
+
+// 0-based inclusif
+pub fn nt_pileup(
+    bam: &mut rust_htslib::bam::IndexedReader,
+    chr: &str,
+    position: i32,
+    with_next_ins: bool,
+) -> anyhow::Result<Vec<u8>> {
+    use rust_htslib::{bam, bam::Read};
+    let mut bases = Vec::new();
+    bam.fetch((chr, position,  position + 1))?;
+    let mut bam_pileup = Vec::new();
+    for p in bam.pileup() {
+        let pileup = p.context(format!(
+            "Can't pileup bam at position {}:{} (0-based)",
+            chr, position
+        ))?;
+        let position = pileup.pos() as i32;
+        if position == position {
+            for alignment in pileup.alignments() {
+                match alignment.indel() {
+                    bam::pileup::Indel::Ins(_len) => bam_pileup.push(b'I'),
+                    bam::pileup::Indel::Del(_len) => bam_pileup.push(b'D'),
+                    _ => {
+                        let record = alignment.record();
+                        if record.seq_len() > 0 {
+                            if let Some(b) = base_at(&record, position as u32, with_next_ins)? {
+                                bases.push(b);
+                            }
+                        } else if alignment.is_del() {
+                            bases.push(b'D');
+                        }
+                    }
+                }
+            }
+        }
+    }
+    Ok(bases)
+}
+
+pub fn base_at(
+    record: &rust_htslib::bam::record::Record,
+    at_pos: u32,
+    with_next_ins: bool,
+) -> anyhow::Result<Option<u8>> {
+    use rust_htslib::bam::record::Cigar;
+
+    let cigar = record.cigar();
+    let seq = record.seq();
+    let pos = cigar.pos() as u32;
+
+    let mut read_i = 0u32;
+    // let at_pos = at_pos - 1;
+    let mut ref_pos = pos;
+    if ref_pos > at_pos {
+        return Ok(None);
+    }
+
+    for (id, op) in cigar.iter().enumerate() {
+        let (add_read, add_ref) = match *op {
+            Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
+            Cigar::Ins(len) => (len, 0),
+            Cigar::Del(len) => (0, len),
+            Cigar::RefSkip(len) => (0, len),
+            Cigar::SoftClip(len) => (len, 0),
+            Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
+        };
+        // If at the end of the op len and next op is Ins return I
+        if with_next_ins && ref_pos + add_read == at_pos + 1 {
+            if let Some(Cigar::Ins(_)) = cigar.get(id + 1) {
+                return Ok(Some(b'I'));
+            }
+        }
+
+        if ref_pos + add_ref > at_pos {
+            // Handle deletions directly
+            if let Cigar::Del(_) = *op {
+                return Ok(Some(b'D'));
+            } else if let Cigar::RefSkip(_) = op {
+                return Ok(None);
+            } else {
+                let diff = at_pos - ref_pos;
+                let p = read_i + diff;
+                return Ok(Some(seq[p as usize]));
+            }
+        }
+
+        read_i += add_read;
+        ref_pos += add_ref;
+    }
+    Ok(None)
+}
+
+// thanks to chatGPT (the best)
+pub fn estimate_shannon_entropy(dna_sequence: &str) -> f64 {
+    let m = dna_sequence.len() as f64;
+
+    // Count occurrences of each base
+    let mut bases = HashMap::<char, usize>::new();
+    for base in dna_sequence.chars() {
+        *bases.entry(base).or_insert(0) += 1;
+    }
+
+    // Calculate Shannon entropy
+    let mut shannon_entropy_value = 0.0;
+    for &n_i in bases.values() {
+        let p_i = n_i as f64 / m;
+        shannon_entropy_value -= p_i * p_i.log2();
+    }
+
+    shannon_entropy_value
+}
+

+ 16 - 1
src/lib.rs

@@ -29,7 +29,7 @@ mod tests {
 
     use annotation::Annotations;
     use callers::{nanomonsv::nanomonsv_create_pon, savana::Savana, severus::{Severus, SeverusSolo}};
-    use collection::{Initialize, InitializeSolo, Version};
+    use collection::{bam::nt_pileup, Initialize, InitializeSolo, Version};
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
     use io::bed::read_bed;
@@ -472,6 +472,7 @@ mod tests {
 
     #[test]
     fn overlaps() {
+        init();
         let positions = vec![
             &GenomePosition { contig: 1, position: 100 },
             &GenomePosition { contig: 1, position: 150 },
@@ -490,9 +491,23 @@ mod tests {
 
     #[test]
     fn bed_read() -> anyhow::Result<()> {
+        init();
         let path = &Config::default().mask_bed("ADJAGBA");
         let r = read_bed(path)?;
         println!("{}", r.len());
         Ok(())
     }
+
+    #[test]
+    fn bases_at() -> anyhow::Result<()> {
+        init();
+        let id = "ADJAGBA";
+        let c = Config::default();
+        let chr = "chr3";
+        let position = 62416039; // 1-based
+        let mut bam = rust_htslib::bam::IndexedReader::from_path(c.solo_bam(id, "diag"))?;
+        let p = nt_pileup(&mut bam, chr, position - 1, false)?;
+        info!("{:?}", p.iter().map(|e| String::from_utf8(vec![*e]).unwrap()).collect::<Vec<_>>());
+        Ok(())
+    }
 }

+ 67 - 110
src/pipes/somatic.rs

@@ -1,137 +1,94 @@
-use anyhow::Context;
 use log::info;
-use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
-use std::{
-    collections::HashMap, fs::File, io::{BufRead, BufReader}, ops::Range, sync::Arc, thread
-};
+use rayon::prelude::*;
+use std::{collections::HashSet, sync::Arc};
 
 use crate::{
-    annotation::Annotations, callers::{clairs::ClairS, deep_variant::DeepVariant}, collection::{Initialize, InitializeSolo}, config::Config, helpers::{par_intersection, VectorIntersection}, io::{bed::read_bed, vcf::write_vcf}, positions::{overlaps_par, par_overlaps}, runners::Run, variant::variant::Variants
+    annotation::{Annotation, Annotations},
+    callers::{clairs::ClairS, deep_variant::DeepVariant},
+    collection::{Initialize, InitializeSolo},
+    config::Config,
+    runners::Run,
+    variant::variant::{load_variants, parallel_intersection, RunnerVariants},
 };
 
-#[derive(Debug)]
 pub struct Somatic {
     pub id: String,
     pub config: Config,
+    pub annotations: Annotations,
 }
 
 impl Initialize for Somatic {
     fn initialize(id: &str, config: crate::config::Config) -> anyhow::Result<Self> {
         let id = id.to_string();
-        Ok(Self { id, config })
+        Ok(Self {
+            id,
+            config,
+            annotations: Annotations::default(),
+        })
     }
 }
 
 impl Run for Somatic {
     fn run(&mut self) -> anyhow::Result<()> {
-        info!("Running Somatic pipe for {}", self.id);
+        info!("Running somatic pipe for {}.", self.id);
+        let id = self.id.clone();
 
         info!("Initialization...");
+        let mut v: Vec<Box<dyn RunnerVariants + Send + Sync>> = vec![
+            Box::new(ClairS::initialize(&id, self.config.clone())?),
+            Box::new(DeepVariant::initialize(&id, "diag", self.config.clone())?),
+            Box::new(DeepVariant::initialize(&id, "mrd", self.config.clone())?),
+        ];
+
+        let annotations = Arc::new(self.annotations.clone());
+
+        let mut variants_collection = load_variants(&mut v, &annotations)?;
+        let clairs_germline =
+            ClairS::initialize(&id, self.config.clone())?.germline(&annotations)?;
+        variants_collection.push(clairs_germline);
+        info!("Variants sources loaded: {}", variants_collection.len());
+
+        // Annotations Stats
+        let mut annotations = Arc::unwrap_or_clone(annotations);
+        annotations.callers_stat();
+
+        // Filtering Somatic variants
+        info!("Filtering somatic variants (variants from somatic callers or not found in germline or in a constit sample).");
+        let germline_or_somatic_keys = annotations.get_keys_filter(|anns| {
+            anns.contains(&Annotation::Somatic)
+                | (anns.contains(&Annotation::SoloDiag)
+                    && !anns
+                        .iter()
+                        .any(|ann| matches!(ann, Annotation::Germline | Annotation::SoloConstit)))
+        });
+
+        let (somatic_keys, germline_keys, remains) = parallel_intersection(
+            &variants_collection
+                .iter()
+                .flat_map(|e| e.keys())
+                .collect::<Vec<u64>>(),
+            &germline_or_somatic_keys,
+        );
+        assert_eq!(0, remains.len());
 
-        let mut clairs = ClairS::initialize(&self.id, self.config.clone())?;
-        let mut deep_variant_mrd = DeepVariant::initialize(&self.id, "mrd", self.config.clone())?;
-
-        info!("Running callers if necessary...");
-        clairs.run()?;
-        deep_variant_mrd.run()?;
-
-        info!("Loading Germlines VCF from DeepVariant mrd and ClairS germline, in parallel...");
-        let annotations = Arc::new(Annotations::default());
-
-        let clairs_handle = {
-            let clairs = clairs.clone();
-            let annotations = Arc::clone(&annotations);
-
-            thread::spawn(move || clairs.germline(&annotations))
-        };
-
-        let deep_variant_mrd_handle = {
-            let deep_variant_mrd = deep_variant_mrd.clone();
-            let annotations = Arc::clone(&annotations);
-
-            thread::spawn(move || deep_variant_mrd.variants(&annotations))
-        };
-
-        let annotations = Arc::unwrap_or_clone(annotations);
-
-        let clairs_germline = clairs_handle
-            .join()
-            .map_err(|e| anyhow::anyhow!("Thread panic in clairs germline: {:?}", e))
-            .context("Failed to join clairs_handle thread")?
-            .context(format!("Error in clairs germline loading for {}", self.id))?;
-
-        let deep_variant_germline = deep_variant_mrd_handle
-            .join()
-            .map_err(|e| anyhow::anyhow!("Thread panic in clairs germline: {:?}", e))
-            .context("Failed to join deep_variant_mrd_handle thread")?
-            .context(format!(
-                "Error in deepvariant germline loading for {}",
-                self.id
-            ))?;
-
-        info!("Merging variants");
+        info!("Somatic variants positions {}.", somatic_keys.len());
+        info!("Germline variants positions {}.", germline_keys.len());
 
-        let VectorIntersection {
-            common: germline_common,
-            only_in_first: only_in_deep_variant_mrd,
-            only_in_second: only_in_clairs_germline,
-        } = par_intersection(&deep_variant_germline.variants, &clairs_germline.variants);
+        let somatic_keys: HashSet<u64> = somatic_keys.into_iter().collect();
+        annotations.retain_keys(&somatic_keys);
+        annotations.callers_stat();
 
-        info!(
-            "common: {}, only DeepVariant: {}, only ClairS: {}",
-            germline_common.len(),
-            only_in_deep_variant_mrd.len(),
-            only_in_clairs_germline.len()
-        );
+        variants_collection.par_iter_mut().for_each(|c| {
+            let before = c.variants.len();
+            c.retain_keys(&somatic_keys);
+            let after = c.variants.len();
+            info!("Variants removed from {}: {}", c.vcf.path.display(), before - after);
+        });
 
-        // Write vcf
-        // [
-        //     (germline_common, "common.vcf.gz"),
-        //     (only_in_deep_variant_mrd, "deep_variant_only.vcf.gz"),
-        //     (only_in_clairs_germline, "clairs_only.vcf.gz"),
-        // ]
-        // .par_iter()
-        // .for_each(|(v, p)| {
-        //     write_vcf(v, p).unwrap();
-        // });
-
-        let deep_variant_diag =
-            DeepVariant::initialize(&self.id, "diag", self.config.clone())?.variants(&annotations)?;
-
-        info!("Intersection...");
-        // filter common
-        let deep_diag_int =
-            par_intersection(&deep_variant_diag.variants, &germline_common);
-        println!("filtering out common germline\n{deep_diag_int}");
-        let somatic = deep_diag_int.only_in_first;
-        println!("N somatic: {}", somatic.len());
-
-        // filter only in clairs
-        let deep_diag_int =
-            par_intersection(&somatic, &only_in_clairs_germline);
-        println!("filtering out germline only in clairs\n{deep_diag_int}");
-        let somatic = deep_diag_int.only_in_first;
-        println!("N somatic: {}", somatic.len());
-
-        // filter only in deepvariant mrd
-        let deep_diag_int =
-            par_intersection(&somatic, &only_in_deep_variant_mrd);
-        println!("fiiltering out germline only in deep_variant_mrd\n{deep_diag_int}");
-        let deep_variant_diag_f = deep_diag_int.only_in_first;
-        println!("N somatic: {}", somatic.len());
-
-        // Load clairs somatic
-        let clairs_somatic = ClairS::initialize(&self.id, self.config.clone())?.variants(&annotations)?;
-        let masked = read_bed(&self.config.mask_bed(&self.id))?;
-
-
-        let somatic_int = par_intersection(&deep_variant_diag_f, &clairs_somatic.variants);
-
-        println!("deep_variant_diag_filtered & clairs\n{somatic_int}");
-        let clairs_masked = par_overlaps(&clairs_somatic.variants, &masked);
-        let deepvariant_masked = par_overlaps(&deep_variant_diag_f, &masked);
-        info!("Clairs masked: {}", clairs_masked.len());
-        info!("Deepvariant masked: {}", deepvariant_masked.len());
         Ok(())
     }
 }
+
+pub fn filter_entropy() {
+
+}

+ 36 - 11
src/positions.rs

@@ -1,4 +1,4 @@
-use std::{cmp::Ordering, ops::Range};
+use std::{cmp::Ordering, fmt::Display, ops::Range, str::FromStr};
 
 use anyhow::Context;
 use rayon::prelude::*;
@@ -11,6 +11,30 @@ pub struct GenomePosition {
     pub position: u32,
 }
 
+impl GenomePosition {
+    pub fn contig(&self) -> String {
+        num_to_string(self.contig)
+    }
+}
+
+impl Display for GenomePosition {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}:{}", self.contig(), self.position)
+    }
+}
+
+impl FromStr for GenomePosition {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        let (c, p) = s.split_once(":").ok_or(anyhow::anyhow!("Can't split {s}"))?;
+        Ok(Self {
+            contig: contig_to_num(c),
+            position: p.parse()?,
+        })
+    }
+}
+
 impl TryFrom<(&str, &str)> for GenomePosition {
     type Error = anyhow::Error;
 
@@ -125,8 +149,9 @@ impl TryFrom<(&str, &str, &str)> for GenomeRange {
 
 pub fn overlaps_par(positions: &[&GenomePosition], ranges: &[&GenomeRange]) -> Vec<usize> {
     let chunk_size = (positions.len() / rayon::current_num_threads()).max(1);
-    
-    positions.par_chunks(chunk_size)
+
+    positions
+        .par_chunks(chunk_size)
         .enumerate()
         .flat_map(|(chunk_idx, chunk)| {
             let mut result = Vec::new();
@@ -138,15 +163,17 @@ pub fn overlaps_par(positions: &[&GenomePosition], ranges: &[&GenomeRange]) -> V
                 }
 
                 if range_idx < ranges.len() && ranges[range_idx].contig == pos.contig {
-                    while range_idx < ranges.len() && 
-                          ranges[range_idx].contig == pos.contig && 
-                          ranges[range_idx].range.end <= pos.position {
+                    while range_idx < ranges.len()
+                        && ranges[range_idx].contig == pos.contig
+                        && ranges[range_idx].range.end <= pos.position
+                    {
                         range_idx += 1;
                     }
 
-                    if range_idx < ranges.len() && 
-                       ranges[range_idx].contig == pos.contig && 
-                       ranges[range_idx].range.contains(&pos.position) {
+                    if range_idx < ranges.len()
+                        && ranges[range_idx].contig == pos.contig
+                        && ranges[range_idx].range.contains(&pos.position)
+                    {
                         result.push(chunk_idx * chunk_size + pos_idx);
                     }
                 }
@@ -169,5 +196,3 @@ pub trait GetGenomePosition {
 pub trait GetGenomeRange {
     fn range(&self) -> &GenomeRange;
 }
-
-

+ 61 - 4
src/variant/variant.rs

@@ -1,15 +1,14 @@
 use crate::{
     annotation::Annotations,
     positions::{GenomePosition, GetGenomePosition, VcfPosition},
+    runners::Run,
     variant::variant_collection::VariantCollection,
 };
 use anyhow::{anyhow, Context, Ok};
+use rayon::prelude::*;
 use serde::{Deserialize, Serialize};
 use std::{
-    cmp::Ordering,
-    fmt,
-    hash::{Hash, Hasher},
-    str::FromStr,
+    cmp::Ordering, collections::HashSet, fmt, hash::{Hash, Hasher}, str::FromStr
 };
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -40,7 +39,9 @@ impl Hash for VcfVariant {
         self.alternative.hash(state);
     }
 }
+
 impl Eq for VcfVariant {}
+
 impl FromStr for VcfVariant {
     type Err = anyhow::Error;
 
@@ -145,6 +146,13 @@ impl VcfVariant {
         self.hash(&mut hasher);
         hasher.finish()
     }
+
+}
+
+impl VariantId for VcfVariant {
+    fn variant_id(&self) -> String {
+        format!("{}_{}>{}", self.position, self.reference, self.alternative)
+    }
 }
 
 impl GetGenomePosition for VcfVariant {
@@ -572,3 +580,52 @@ impl fmt::Display for Base {
 pub trait Variants {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection>;
 }
+
+pub trait VariantId {
+    fn variant_id(&self) -> String;
+}
+
+pub trait RunnerVariants: Run + Variants + Send + Sync {}
+
+pub fn load_variants(
+    iterable: &mut [Box<dyn RunnerVariants + Send + Sync>],
+    annotations: &Annotations,
+) -> anyhow::Result<Vec<VariantCollection>> {
+    // First, run all items in parallel
+    iterable
+        .par_iter_mut()
+        .try_for_each(|runner| runner.run())?;
+
+    // Then, collect variants from all items in parallel
+    let variants: Vec<VariantCollection> = iterable
+        .par_iter()
+        .map(|runner| runner.variants(annotations))
+        .collect::<anyhow::Result<Vec<_>>>()?;
+
+    Ok(variants)
+}
+
+pub fn parallel_intersection<T: Hash + Eq + Clone + Send + Sync>(
+    vec1: &[T],
+    vec2: &[T]
+) -> (Vec<T>, Vec<T>, Vec<T>) {
+    let set1: HashSet<_> = vec1.par_iter().cloned().collect();
+    let set2: HashSet<_> = vec2.par_iter().cloned().collect();
+
+    let common: Vec<T> = set1.par_iter()
+        .filter(|item| set2.contains(item))
+        .cloned()
+        .collect();
+
+    let only_in_first: Vec<T> = set1.par_iter()
+        .filter(|item| !set2.contains(item))
+        .cloned()
+        .collect();
+
+    let only_in_second: Vec<T> = set2.par_iter()
+        .filter(|item| !set1.contains(item))
+        .cloned()
+        .collect();
+
+    (common, only_in_first, only_in_second)
+}

+ 14 - 2
src/variant/variant_collection.rs

@@ -1,9 +1,21 @@
-use crate::collection::vcf::Vcf;
+use std::collections::HashSet;
+
 use super::variant::VcfVariant;
+use crate::collection::vcf::Vcf;
 
 #[derive(Debug)]
 pub struct VariantCollection {
     pub variants: Vec<VcfVariant>,
-    pub vcf: Vcf
+    pub vcf: Vcf,
 }
 
+impl VariantCollection {
+    pub fn keys(&self) -> Vec<u64> {
+        self.variants.iter().map(|v| v.hash_variant()).collect()
+    }
+
+    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u64>) {
+        self.variants
+            .retain(|v| keys_to_keep.contains(&v.hash_variant()));
+    }
+}