Browse Source

somatic pipeline / blake3 128 for annotations hash / seq from fasta at position +/- len, entropy

Thomas 11 months ago
parent
commit
e01ab06376
11 changed files with 341 additions and 72 deletions
  1. 36 1
      Cargo.lock
  2. 3 0
      Cargo.toml
  3. 44 11
      src/annotation/mod.rs
  4. 21 21
      src/callers/clairs.rs
  5. 2 2
      src/collection/bam.rs
  6. 1 1
      src/collection/vcf.rs
  7. 72 11
      src/helpers.rs
  8. 30 3
      src/lib.rs
  9. 66 13
      src/pipes/somatic.rs
  10. 18 5
      src/variant/variant.rs
  11. 48 4
      src/variant/variant_collection.rs

+ 36 - 1
Cargo.lock

@@ -1,6 +1,6 @@
 # This file is automatically @generated by Cargo.
 # It is not intended for manual editing.
-version = 3
+version = 4
 
 [[package]]
 name = "addr2line"
@@ -801,6 +801,19 @@ dependencies = [
  "wyz",
 ]
 
+[[package]]
+name = "blake3"
+version = "1.5.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e"
+dependencies = [
+ "arrayref",
+ "arrayvec",
+ "cc",
+ "cfg-if",
+ "constant_time_eq",
+]
+
 [[package]]
 name = "block"
 version = "0.1.6"
@@ -1204,6 +1217,12 @@ dependencies = [
  "tiny-keccak",
 ]
 
+[[package]]
+name = "constant_time_eq"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
+
 [[package]]
 name = "convert_case"
 version = "0.4.0"
@@ -3229,6 +3248,19 @@ dependencies = [
  "noodles-core",
 ]
 
+[[package]]
+name = "noodles-fasta"
+version = "0.46.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "16862f9e1bf1ad825a1fab6fc29da9e950dd477cfcd0cb1a2b14fa8ee1a72575"
+dependencies = [
+ "bstr",
+ "bytes",
+ "memchr",
+ "noodles-bgzf 0.34.0",
+ "noodles-core",
+]
+
 [[package]]
 name = "noodles-gff"
 version = "0.38.0"
@@ -3635,6 +3667,7 @@ dependencies = [
  "anyhow",
  "arrow 53.3.0",
  "bgzip",
+ "blake3",
  "byte-unit",
  "chrono",
  "csv",
@@ -3653,7 +3686,9 @@ dependencies = [
  "log",
  "logtest",
  "nix 0.29.0",
+ "noodles-core",
  "noodles-csi 0.41.0",
+ "noodles-fasta 0.46.0",
  "num-format",
  "pandora_lib_assembler",
  "pandora_lib_bindings",

+ 3 - 0
Cargo.toml

@@ -45,3 +45,6 @@ arrow = "53.3.0"
 bgzip = "0.3.1"
 tempfile = "3.14.0"
 dashmap = { version = "6.1.0", features = ["rayon"] }
+noodles-fasta = "0.46.0"
+noodles-core = "0.15.0"
+blake3 = "1.5.5"

+ 44 - 11
src/annotation/mod.rs

@@ -3,13 +3,30 @@ use std::{collections::HashSet, fmt};
 use dashmap::DashMap;
 use rayon::prelude::*;
 
-#[derive(Debug, Clone, PartialEq, Eq)]
+use crate::helpers::mean;
+
+#[derive(Debug, Clone, PartialEq)]
 pub enum Annotation {
     SoloDiag,
     SoloConstit,
     Callers(Caller),
     Germline,
     Somatic,
+    ShannonEntropy(f64),
+}
+
+impl fmt::Display for Annotation {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let str = match self {
+            Annotation::SoloDiag => "SoloDiag",
+            Annotation::SoloConstit => "SoloConstit",
+            Annotation::Callers(caller) => &caller.to_string(),
+            Annotation::Germline => "Germline",
+            Annotation::Somatic => "Somatic",
+            Annotation::ShannonEntropy(_) => "ShannonEntropy",
+        };
+        write!(f, "{}", str)
+    }
 }
 
 #[derive(Debug, Clone, PartialEq, Eq)]
@@ -29,11 +46,11 @@ impl fmt::Display for Caller {
 
 #[derive(Debug, Default, Clone)]
 pub struct Annotations {
-    pub store: DashMap<u64, Vec<Annotation>>,
+    pub store: DashMap<u128, Vec<Annotation>>,
 }
 
 impl Annotations {
-    pub fn insert_update(&self, key: u64, add: &[Annotation]) {
+    pub fn insert_update(&self, key: u128, add: &[Annotation]) {
         self.store
             .entry(key)
             .or_default()
@@ -42,32 +59,49 @@ impl Annotations {
 
     pub fn callers_stat(&self) {
         let map: DashMap<String, u64> = DashMap::new();
+        let map_shannon: DashMap<String, Vec<f64>> = DashMap::new();
 
         self.store.par_iter().for_each(|e| {
             let mut s: Vec<String> = e
                 .value()
                 .iter()
-                .flat_map(|v| match v {
-                    Annotation::Callers(caller) => vec![caller.to_string()],
-                    _ => Vec::new(),
-                })
+                .filter(|e| !matches!(e, Annotation::ShannonEntropy(_)))
+                .map(|v| v.to_string())
                 .collect();
             s.sort();
             s.dedup();
 
-            *map.entry(s.join(" + ")).or_default() += 1;
+            let k = s.join(" + ");
+
+            *map.entry(k.clone()).or_default() += 1;
+
+            if let Some(Annotation::ShannonEntropy(v)) = e
+                .value()
+                .iter()
+                .find(|&a| matches!(a, Annotation::ShannonEntropy(_)))
+            {
+                map_shannon.entry(k.clone()).or_default().push(*v);
+            }
         });
         println!("Callers stats:");
         println!("\ttotal: {}", map.len());
         map.iter().for_each(|e| {
             println!("\t{}:\t{}", e.key(), e.value());
         });
+
+        println!("Callers stats mean shannon ent:");
+        map_shannon.iter().for_each(|e| {
+            let v = e.value();
+            if !v.is_empty() {
+                println!("\t{}:\t{:.2}", e.key(), mean(v));
+            }
+        });
     }
 
     pub fn get_keys_filter(
         &self,
         filter: impl Fn(&Vec<Annotation>) -> bool + Send + Sync,
-    ) -> Vec<u64> {
+    ) -> Vec<u128> {
         self.store
             .par_iter()
             .filter(|entry| filter(entry.value()))
@@ -75,8 +109,7 @@ impl Annotations {
             .collect()
     }
 
-    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u64>) {
+    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u128>) {
         self.store.retain(|key, _| keys_to_keep.contains(key));
     }
-
 }

+ 21 - 21
src/callers/clairs.rs

@@ -122,27 +122,27 @@ impl Run for ClairS {
 
         // Germline
         if !Path::new(&self.clair3_germline_passed).exists() {
-            let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
-            let report = bcftools_concat(
-                vec![
-                    self.clair3_germline_tumor.to_string(),
-                    self.clair3_germline_normal.to_string(),
-                ],
-                &tmp_file,
-                BcftoolsConfig::default(),
-            )
-            .context(format!(
-                "Error while running bcftools concat for {} and {}",
-                &self.output_vcf, &self.output_indels_vcf
-            ))?;
-
-            let log_file = format!("{}/bcftools_concat_", self.log_dir);
-            report
-                .save_to_file(&log_file)
-                .context(format!("Error while writing logs into {log_file}"))?;
-
+            // let tmp_file = format!("{}.vcf.gz", get_temp_file_path()?.display());
+            // let report = bcftools_concat(
+            //     vec![
+            //         self.clair3_germline_tumor.to_string(),
+            //         self.clair3_germline_normal.to_string(),
+            //     ],
+            //     &tmp_file,
+            //     BcftoolsConfig::default(),
+            // )
+            // .context(format!(
+            //     "Error while running bcftools concat for {} and {}",
+            //     &self.output_vcf, &self.output_indels_vcf
+            // ))?;
+            //
+            // let log_file = format!("{}/bcftools_concat_", self.log_dir);
+            // report
+            //     .save_to_file(&log_file)
+            //     .context(format!("Error while writing logs into {log_file}"))?;
+            //
             let report = bcftools_keep_pass(
-                &tmp_file,
+                &self.clair3_germline_normal,
                 &self.clair3_germline_passed,
                 BcftoolsConfig::default(),
             )
@@ -155,7 +155,7 @@ impl Run for ClairS {
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
 
-            fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
+            // fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
         } else {
             info!("ClairS germline VCF exists.");
         }

+ 2 - 2
src/collection/bam.rs

@@ -251,8 +251,8 @@ pub fn nt_pileup(
             "Can't pileup bam at position {}:{} (0-based)",
             chr, position
         ))?;
-        let position = pileup.pos() as i32;
-        if position == position {
+        let cur_position = pileup.pos() as i32;
+        if cur_position == position {
             for alignment in pileup.alignments() {
                 match alignment.indel() {
                     bam::pileup::Indel::Ins(_len) => bam_pileup.push(b'I'),

+ 1 - 1
src/collection/vcf.rs

@@ -1,4 +1,4 @@
-use anyhow::{anyhow, Context};
+use anyhow::Context;
 use chrono::{DateTime, Utc};
 use csi::binning_index::ReferenceSequence;
 use glob::glob;

+ 72 - 11
src/helpers.rs

@@ -1,7 +1,12 @@
 use anyhow::Context;
 use log::warn;
 use std::{
-    cmp::Ordering, fmt, fs, path::{Path, PathBuf}
+    cmp::Ordering,
+    collections::HashMap,
+    fmt, fs,
+    iter::Sum,
+    ops::{Add, Div},
+    path::{Path, PathBuf},
 };
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
@@ -87,17 +92,26 @@ pub struct VectorIntersection<T> {
 impl<T> fmt::Display for VectorIntersection<T> {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         let total_items = self.common.len() + self.only_in_first.len() + self.only_in_second.len();
-        
+
         writeln!(f, "Total items: {}", total_items)?;
-        writeln!(f, "Common: {} ({:.2}%)", 
-               self.common.len(), 
-               percentage(self.common.len(), total_items))?;
-        writeln!(f, "Only in first: {} ({:.2}%)", 
-               self.only_in_first.len(), 
-               percentage(self.only_in_first.len(), total_items))?;
-        writeln!(f, "Only in second: {} ({:.2}%)", 
-               self.only_in_second.len(), 
-               percentage(self.only_in_second.len(), total_items))
+        writeln!(
+            f,
+            "Common: {} ({:.2}%)",
+            self.common.len(),
+            percentage(self.common.len(), total_items)
+        )?;
+        writeln!(
+            f,
+            "Only in first: {} ({:.2}%)",
+            self.only_in_first.len(),
+            percentage(self.only_in_first.len(), total_items)
+        )?;
+        writeln!(
+            f,
+            "Only in second: {} ({:.2}%)",
+            self.only_in_second.len(),
+            percentage(self.only_in_second.len(), total_items)
+        )
     }
 }
 
@@ -227,3 +241,50 @@ pub fn get_temp_file_path() -> std::io::Result<PathBuf> {
 
     Ok(temp_path.to_path_buf())
 }
+
+pub fn estimate_shannon_entropy(dna_sequence: &str) -> f64 {
+    let m = dna_sequence.len() as f64;
+
+    // Early return for empty sequences
+    if m == 0.0 {
+        return 0.0;
+    }
+
+    // Count occurrences of each base
+    let mut bases = HashMap::<char, usize>::new();
+    for base in dna_sequence.chars() {
+        *bases.entry(base).or_insert(0) += 1;
+    }
+
+    // Calculate Shannon entropy
+    let shannon_entropy_value: f64 = bases
+        .values()
+        .map(|&n_i| {
+            let p_i = n_i as f64 / m;
+            if p_i > 0.0 {
+                -p_i * p_i.log2()
+            } else {
+                0.0 // Avoid log2(0)
+            }
+        })
+        .sum();
+
+    shannon_entropy_value
+}
+
+pub fn mean<T>(values: &[T]) -> f64
+where
+    T: Copy + Add<Output = T> + Div<Output = T> + Sum + Into<f64>,
+{
+    let count = values.len();
+
+    if count == 0 {
+        return 0.0;
+    }
+
+    // Calculate the sum of the values
+    let sum: T = values.iter().copied().sum();
+
+    // Calculate and return the average as f64
+   sum.into() / count as f64
+}

+ 30 - 3
src/lib.rs

@@ -25,13 +25,14 @@ lazy_static! {
 
 #[cfg(test)]
 mod tests {
-    use std::{fs, path::Path};
+    use std::{collections::HashMap, fs, path::Path};
 
     use annotation::Annotations;
     use callers::{nanomonsv::nanomonsv_create_pon, savana::Savana, severus::{Severus, SeverusSolo}};
     use collection::{bam::nt_pileup, Initialize, InitializeSolo, Version};
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
+    use helpers::estimate_shannon_entropy;
     use io::bed::read_bed;
     use log::info;
     use pipes::somatic::Somatic;
@@ -506,8 +507,34 @@ mod tests {
         let chr = "chr3";
         let position = 62416039; // 1-based
         let mut bam = rust_htslib::bam::IndexedReader::from_path(c.solo_bam(id, "diag"))?;
-        let p = nt_pileup(&mut bam, chr, position - 1, false)?;
-        info!("{:?}", p.iter().map(|e| String::from_utf8(vec![*e]).unwrap()).collect::<Vec<_>>());
+        let p = nt_pileup(&mut bam, chr, position - 1, false)?.iter().map(|e| String::from_utf8(vec![*e]).unwrap()).collect::<Vec<_>>();
+        let mut counts = HashMap::new();
+
+        for item in p.iter() {
+            *counts.entry(item.as_str()).or_insert(0) += 1;
+        }
+
+        for (key, value) in &counts {
+            println!("{}: {}", key, value);
+        }
+
+        assert_eq!(8, *counts.get("C").unwrap());
+        assert_eq!(13, *counts.get("G").unwrap());
+        assert_eq!(6, *counts.get("D").unwrap());
+
+        Ok(())
+    }
+
+    #[test]
+    fn seq_at() -> anyhow::Result<()> {
+        init();
+        let c = Config::default();
+        let chr = "chr3";
+        let position = 716766;
+
+        let mut fasta_reader  = noodles_fasta::indexed_reader::Builder::default().build_from_path(c.reference)?;
+        let r = pipes::somatic::sequence_at(&mut fasta_reader, chr, position, 10)?;
+        println!("{r} ({} {:.2})", r.len(), estimate_shannon_entropy(r.as_str()));
         Ok(())
     }
 }

+ 66 - 13
src/pipes/somatic.rs

@@ -1,14 +1,15 @@
 use log::info;
 use rayon::prelude::*;
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, fs::File, sync::Arc};
 
 use crate::{
     annotation::{Annotation, Annotations},
     callers::{clairs::ClairS, deep_variant::DeepVariant},
     collection::{Initialize, InitializeSolo},
     config::Config,
+    io::vcf::write_vcf,
     runners::Run,
-    variant::variant::{load_variants, parallel_intersection, RunnerVariants},
+    variant::variant::{load_variants, parallel_intersection, RunnerVariants, VcfVariant},
 };
 
 pub struct Somatic {
@@ -51,22 +52,20 @@ impl Run for Somatic {
         // Annotations Stats
         let mut annotations = Arc::unwrap_or_clone(annotations);
         annotations.callers_stat();
+        // TODO: look at variants: ClairS + DeepVariant + SoloConstit + SoloDiag + Somatic (error
+        // in ClairS somatic)
 
         // Filtering Somatic variants
-        info!("Filtering somatic variants (variants from somatic callers or not found in germline or in a constit sample).");
+        info!("Filtering somatic variants (variants not in salo callers on constit sample or germline).");
         let germline_or_somatic_keys = annotations.get_keys_filter(|anns| {
-            anns.contains(&Annotation::Somatic)
-                | (anns.contains(&Annotation::SoloDiag)
-                    && !anns
-                        .iter()
-                        .any(|ann| matches!(ann, Annotation::Germline | Annotation::SoloConstit)))
+            !anns.contains(&Annotation::Germline) && !anns.contains(&Annotation::SoloConstit)
         });
 
         let (somatic_keys, germline_keys, remains) = parallel_intersection(
             &variants_collection
                 .iter()
                 .flat_map(|e| e.keys())
-                .collect::<Vec<u64>>(),
+                .collect::<Vec<u128>>(),
             &germline_or_somatic_keys,
         );
         assert_eq!(0, remains.len());
@@ -74,7 +73,7 @@ impl Run for Somatic {
         info!("Somatic variants positions {}.", somatic_keys.len());
         info!("Germline variants positions {}.", germline_keys.len());
 
-        let somatic_keys: HashSet<u64> = somatic_keys.into_iter().collect();
+        let somatic_keys: HashSet<u128> = somatic_keys.into_iter().collect();
         annotations.retain_keys(&somatic_keys);
         annotations.callers_stat();
 
@@ -82,13 +81,67 @@ impl Run for Somatic {
             let before = c.variants.len();
             c.retain_keys(&somatic_keys);
             let after = c.variants.len();
-            info!("Variants removed from {}: {}", c.vcf.path.display(), before - after);
+            info!(
+                "Variants removed from {}: {}",
+                c.vcf.path.display(),
+                before - after
+            );
         });
 
+        variants_collection.retain(|e| !e.variants.is_empty());
+
+        info!("Entropy annotation...");
+        variants_collection.iter().for_each(|c| {
+            c.annotate_with_sequence_entropy(&annotations, &self.config.reference);
+        });
+        annotations.callers_stat();
+
+        let prob_keys: HashSet<u128> = annotations
+            .get_keys_filter(|anns| {
+                anns.contains(&Annotation::Somatic) && anns.contains(&Annotation::Germline)
+            })
+            .into_iter()
+            .collect();
+
+        info!("Problematic variants {}", prob_keys.len());
+
+        // let mut problematic_variants = variants_collection.clone();
+        //
+        // let problematic_variants: Vec<VcfVariant> = problematic_variants
+        //     .iter_mut()
+        //     .flat_map(|e| {
+        //         e.retain_keys(&prob_keys);
+        //         e.variants.clone()
+        //     })
+        //     .collect();
+
+        // write_vcf(&problematic_variants, "prob.vcf.gz")?;
+
         Ok(())
     }
 }
 
-pub fn filter_entropy() {
-
+// 0-based position
+pub fn sequence_at(
+    fasta_reader: &mut noodles_fasta::IndexedReader<noodles_fasta::io::BufReader<File>>,
+    contig: &str,
+    position: usize,
+    len: usize,
+) -> anyhow::Result<String> {
+    // convert to 1-based
+    let position = position + 1;
+
+    let start = position.saturating_sub(len / 2).max(1);
+    let end = start + len - 1;
+    // debug!("Region {contig}:{start}-{end} (1-based inclusive)");
+
+    let start = noodles_core::Position::try_from(start)?;
+    let end = noodles_core::Position::try_from(end)?;
+    let interval = noodles_core::region::interval::Interval::from(start..=end);
+
+    let r = noodles_core::Region::new(contig.to_string(), interval);
+    let record = fasta_reader.query(&r)?;
+    let s = String::from_utf8(record.sequence().as_ref().to_vec())?.to_uppercase();
+
+    Ok(s)
 }

+ 18 - 5
src/variant/variant.rs

@@ -141,12 +141,25 @@ impl VcfVariant {
         }
     }
 
-    pub fn hash_variant(&self) -> u64 {
-        let mut hasher = std::collections::hash_map::DefaultHasher::new();
-        self.hash(&mut hasher);
-        hasher.finish()
+    pub fn hash_variant(&self) -> u128 {
+        // Create a new BLAKE3 hasher
+        let mut hasher = blake3::Hasher::new();
+        // Update the hasher with the fields of the variant
+        hasher.update(&self.position.contig.to_ne_bytes()); // Convert position to bytes
+        hasher.update(&self.position.position.to_ne_bytes()); // Convert position to bytes
+        hasher.update(self.reference.to_string().as_bytes());   // Reference string as bytes
+        hasher.update(self.alternative.to_string().as_bytes()); // Alternative string as bytes
+
+        // Finalize the hash and get the output
+        let hash_output = hasher.finalize();
+
+        // Convert the first 16 bytes of the hash output to a u128
+        let mut array = [0u8; 16];
+        array.copy_from_slice(&hash_output.as_bytes()[..16]);
+
+        // Convert to u128
+        u128::from_ne_bytes(array)
     }
-
 }
 
 impl VariantId for VcfVariant {

+ 48 - 4
src/variant/variant_collection.rs

@@ -1,21 +1,65 @@
 use std::collections::HashSet;
 
+use rayon::{iter::ParallelIterator, slice::ParallelSlice};
+
 use super::variant::VcfVariant;
-use crate::collection::vcf::Vcf;
+use crate::{
+    annotation::{Annotation, Annotations},
+    collection::vcf::Vcf,
+    helpers::estimate_shannon_entropy,
+    pipes::somatic::sequence_at,
+};
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct VariantCollection {
     pub variants: Vec<VcfVariant>,
     pub vcf: Vcf,
 }
 
 impl VariantCollection {
-    pub fn keys(&self) -> Vec<u64> {
+    pub fn keys(&self) -> Vec<u128> {
         self.variants.iter().map(|v| v.hash_variant()).collect()
     }
 
-    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u64>) {
+    pub fn retain_keys(&mut self, keys_to_keep: &HashSet<u128>) {
         self.variants
             .retain(|v| keys_to_keep.contains(&v.hash_variant()));
     }
+
+    pub fn annotate_with_sequence_entropy(&self, annotations: &Annotations, reference: &str) {
+        let total_items = self.variants.len();
+        let min_chunk_size = 1000;
+        let max_chunks = 150;
+        let seq_len = 10;
+
+        // Calculate the optimal chunk size
+        let optimal_chunk_size = total_items.div_ceil(max_chunks); // Ceiling division
+        let chunk_size = optimal_chunk_size.max(min_chunk_size);
+
+        self.variants.par_chunks(chunk_size).for_each(|chunk| {
+            let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default()
+                .build_from_path(reference)
+                .unwrap();
+
+            for c in chunk {
+                let key = c.hash_variant();
+                let mut anns = annotations.store.entry(key).or_default();
+
+                if !anns
+                    .iter()
+                    .any(|e| matches!(e, Annotation::ShannonEntropy(_)))
+                {
+                    if let Ok(r) = sequence_at(
+                        &mut fasta_reader,
+                        &c.position.contig(),
+                        c.position.position as usize,
+                        seq_len,
+                    ) {
+                        let ent = estimate_shannon_entropy(r.as_str());
+                        anns.push(Annotation::ShannonEntropy(ent));
+                    }
+                }
+            }
+        });
+    }
 }