| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287 |
- use std::{
- collections::{HashMap, HashSet},
- fs::{self, File},
- io::Write,
- path::Path,
- };
- use anyhow::Context;
- use bgzip::{BGZFReader, BGZFWriter};
- use csv::ReaderBuilder;
- use log::{debug, error, info, warn};
- use rayon::prelude::*;
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant};
- use crate::{
- annotation::{
- cosmic::Cosmic,
- echtvar::{parse_echtvar_val, run_echtvar},
- gnomad::GnomAD,
- vep::{run_vep, VepLine, VEP},
- Annotation, Annotations,
- },
- collection::{
- bam::{counts_at, counts_ins_at},
- vcf::Vcf,
- },
- helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128},
- io::{fasta::sequence_at, readers::get_reader, vcf::vcf_header},
- positions::GenomePosition,
- };
- /// A collection of VCF variants along with associated metadata.
- ///
- /// This struct represents a set of variants from a VCF file, including
- /// the original VCF metadata and the caller annotation.
- ///
- /// # Fields
- /// * `variants` - A vector of VcfVariant instances representing individual variants
- /// * `vcf` - The Vcf struct containing metadata from the original VCF file
- /// * `caller` - An Annotation indicating the variant caller used
- ///
- /// # Derives
- /// This struct derives Debug and Clone traits, allowing for easy debugging
- /// and cloning of VariantCollection instances.
- ///
- /// # Usage
- /// This struct is typically used to group related variants from a single VCF file,
- /// preserving the context in which they were called and allowing for collective
- /// operations on the set of variants.
- #[derive(Debug, Clone)]
- pub struct VariantCollection {
- pub variants: Vec<VcfVariant>,
- pub vcf: Vcf,
- pub caller: Annotation,
- }
- impl VariantCollection {
- /// Returns a vector of hash keys for all variants in the collection.
- ///
- /// This method generates a unique hash for each variant in the collection
- /// and returns these hashes as a vector.
- ///
- /// # Returns
- /// A `Vec<Hash128>` containing the hash of each variant in the collection.
- ///
- /// # Performance
- /// This method iterates over all variants in the collection, so its time complexity
- /// is O(n) where n is the number of variants.
- ///
- /// # Usage
- /// This method is useful for obtaining unique identifiers for each variant,
- /// which can be used for indexing, comparison, or lookup operations.
- ///
- /// # Example
- /// ```
- /// let variant_keys = variant_collection.keys();
- /// for key in variant_keys {
- /// println!("Variant hash: {:?}", key);
- /// }
- /// ```
- pub fn keys(&self) -> Vec<Hash128> {
- self.variants.iter().map(|v| v.hash()).collect()
- }
- /// Retains only the variants whose hash keys are present in the provided set.
- ///
- /// This method filters the variants in the collection, keeping only those
- /// whose hash keys are found in the `keys_to_keep` set.
- ///
- /// # Arguments
- /// * `keys_to_keep` - A `HashSet<Hash128>` containing the hash keys of variants to retain
- ///
- /// # Effects
- /// This method modifies the `VariantCollection` in place, potentially reducing
- /// the number of variants it contains.
- ///
- /// # Performance
- /// The time complexity is O(n), where n is the number of variants in the collection.
- /// The space complexity is O(1) as it operates in place.
- ///
- /// # Usage
- /// This method is useful for filtering variants based on a pre-computed set of keys,
- /// which can be helpful in various scenarios such as:
- /// - Removing duplicates
- /// - Keeping only variants that meet certain criteria
- /// - Synchronizing variants across different collections
- ///
- /// # Example
- /// ```
- /// let mut variant_collection = VariantCollection::new();
- /// // ... populate variant_collection ...
- ///
- /// let keys_to_keep: HashSet<Hash128> = some_filtering_function();
- /// variant_collection.retain_keys(&keys_to_keep);
- /// ```
- pub fn retain_keys(&mut self, keys_to_keep: &HashSet<Hash128>) {
- self.variants.retain(|v| keys_to_keep.contains(&v.hash()));
- }
- /// Removes variants whose hash keys are present in the provided set.
- ///
- /// This method filters the variants in the collection, removing those
- /// whose hash keys are found in the `keys_to_remove` set.
- ///
- /// # Arguments
- /// * `keys_to_remove` - A `HashSet<Hash128>` containing the hash keys of variants to remove
- ///
- /// # Effects
- /// This method modifies the `VariantCollection` in place, potentially reducing
- /// the number of variants it contains.
- ///
- /// # Performance
- /// The time complexity is O(n), where n is the number of variants in the collection.
- /// The space complexity is O(1) as it operates in place.
- ///
- /// # Usage
- /// This method is useful for filtering out unwanted variants based on a pre-computed set of keys,
- /// which can be helpful in various scenarios such as:
- /// - Removing variants that fail certain criteria
- /// - Eliminating duplicates identified by their hash keys
- /// - Excluding variants present in another dataset
- ///
- /// # Example
- /// ```
- /// let mut variant_collection = VariantCollection::new();
- /// // ... populate variant_collection ...
- ///
- /// let keys_to_remove: HashSet<Hash128> = some_filtering_function();
- /// variant_collection.remove_keys(&keys_to_remove);
- /// ```
- pub fn remove_keys(&mut self, keys_to_remove: &HashSet<Hash128>) {
- self.variants
- .retain(|v| !keys_to_remove.contains(&v.hash()));
- }
- /// Partitions the VcfVariants into two sets based on a given predicate.
- ///
- /// This function splits the current VcfVariants instance into two new instances,
- /// where the variants are divided based on whether they satisfy the given predicate.
- ///
- /// # Arguments
- /// * `predicate` - A closure that takes a reference to a VcfVariant and returns a boolean
- ///
- /// # Returns
- /// A tuple of two VcfVariants instances:
- /// * The first contains all variants for which the predicate returned true
- /// * The second contains all variants for which the predicate returned false
- ///
- /// # Type Parameters
- /// * `F` - The type of the predicate function, which must implement Fn(&VcfVariant) -> bool
- ///
- /// # Behavior
- /// - Consumes the original VcfVariants instance
- /// - Creates two new VcfVariants instances with the partitioned data
- /// - The VCF and caller information are cloned for the first returned instance
- /// and moved into the second returned instance
- ///
- /// # Examples
- /// ```
- /// let (passing, failing) = vcf_variants.partition(|variant| variant.quality.map_or(false, |q| q > 30));
- /// ```
- pub fn partition<F>(self, predicate: F) -> (Self, Self)
- where
- F: Fn(&VcfVariant) -> bool,
- {
- let Self {
- variants,
- vcf,
- caller,
- } = self;
- let (left_data, right_data) = variants.into_iter().partition(predicate);
- (
- Self {
- variants: left_data,
- vcf: vcf.clone(),
- caller: caller.clone(),
- },
- Self {
- variants: right_data,
- vcf,
- caller,
- },
- )
- }
- fn chunk_size(&self, max_threads: u8) -> usize {
- let total_items = self.variants.len();
- let min_chunk_size = 1000;
- let max_chunks = max_threads;
- let optimal_chunk_size = total_items.div_ceil(max_chunks as usize);
- optimal_chunk_size.max(min_chunk_size)
- }
- /// Annotates variants with sequence entropy information.
- ///
- /// This function calculates and adds Shannon entropy annotations to the variants
- /// based on the surrounding sequence context. It processes variants in parallel
- /// chunks for improved performance.
- ///
- /// # Arguments
- /// * `annotations` - A reference to the Annotations structure to store the results
- /// * `reference` - Path to the reference FASTA file
- /// * `seq_len` - Length of the sequence context to consider for entropy calculation
- /// * `max_threads` - Maximum number of threads to use for parallel processing
- ///
- /// # Behavior
- /// - Processes variants in parallel chunks
- /// - For each variant, retrieves the surrounding sequence from the reference
- /// - Calculates Shannon entropy for the sequence
- /// - Adds the entropy value as an annotation to the variant
- /// - Skips variants that already have a Shannon entropy annotation
- ///
- /// # Panics
- /// This function will panic if it fails to build the FASTA reader from the provided reference path.
- pub fn annotate_with_sequence_entropy(
- &self,
- annotations: &Annotations,
- reference: &str,
- seq_len: usize,
- max_threads: u8,
- ) {
- self.variants
- .par_chunks(self.chunk_size(max_threads))
- .for_each(|chunk| {
- let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default()
- .build_from_path(reference)
- .unwrap();
- for c in chunk {
- let key = c.hash();
- let mut anns = annotations.store.entry(key).or_default();
- if !anns
- .iter()
- .any(|e| matches!(e, Annotation::ShannonEntropy(_)))
- {
- if let Ok(r) = sequence_at(
- &mut fasta_reader,
- &c.position.contig(),
- c.position.position as usize,
- seq_len,
- ) {
- let ent = estimate_shannon_entropy(r.as_str());
- anns.push(Annotation::ShannonEntropy(ent));
- }
- }
- }
- });
- }
- /// Annotates variants with information from a constitutional BAM file.
- ///
- /// This function processes variants in parallel chunks and adds annotations
- /// based on the read counts from a provided BAM file. It calculates and adds
- /// annotations for the number of reads supporting the alternative allele and
- /// the total read depth at each variant position.
- ///
- /// # Arguments
- /// * `annotations` - A reference to the Annotations structure to store the results
- /// * `constit_bam_path` - Path to the constituent BAM file
- /// * `max_threads` - Maximum number of threads to use for parallel processing
- ///
- /// # Returns
- /// * `Ok(())` if the annotation process completes successfully
- /// * `Err` if there's an error opening the BAM file or processing the variants
- ///
- /// # Behavior
- /// - Processes variants in parallel chunks
- /// - For each variant, retrieves read counts from the BAM file
- /// - Calculates the number of reads supporting the alternative allele and total depth
- /// - Adds ConstitAlt and ConstitDepth annotations to each variant
- /// - Skips variants that already have both ConstitAlt and ConstitDepth annotations
- /// - Handles insertions, deletions, and other alteration types differently
- ///
- /// # Errors
- /// This function will return an error if:
- /// - It fails to open the BAM file
- /// - There's an error while processing the variants or reading from the BAM file
- pub fn annotate_with_constit_bam(
- &self,
- annotations: &Annotations,
- constit_bam_path: &str,
- max_threads: u8,
- ) -> anyhow::Result<()> {
- self.variants
- .par_chunks(self.chunk_size(max_threads))
- .try_for_each(|chunk| {
- let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
- .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?;
- for var in chunk {
- let key = var.hash();
- let mut anns = annotations.store.entry(key).or_default();
- if anns
- .iter()
- .filter(|e| {
- matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
- })
- .count()
- != 2
- {
- let mut n_alt = 0;
- let mut depth = 0;
- let mut alt_seq = var.alternative.to_string();
- let c = if var.alteration_category() == AlterationCategory::INS {
- // TODO: Add stretch comparison
- alt_seq = alt_seq.split_off(1);
- counts_ins_at(&mut bam, &var.position.contig(), var.position.position)?
- } else if var.alteration_category() == AlterationCategory::DEL {
- alt_seq = "D".to_string();
- counts_at(&mut bam, &var.position.contig(), var.position.position + 1)?
- } else {
- counts_at(&mut bam, &var.position.contig(), var.position.position)?
- };
- for (seq, n) in c {
- if seq == alt_seq {
- n_alt = n;
- }
- if seq == *"D" && alt_seq != *"D" {
- continue;
- }
- depth += n;
- }
- anns.push(Annotation::ConstitAlt(n_alt as u16));
- anns.push(Annotation::ConstitDepth(depth as u16));
- }
- }
- anyhow::Ok(())
- })?;
- Ok(())
- }
- pub fn stats(&self) {
- println!(
- "{} {}\t{}",
- self.vcf.caller,
- self.vcf.time,
- self.variants.len()
- );
- }
- }
- /// Represents a consolidated genomic variant with associated information and annotations.
- ///
- /// This struct encapsulates comprehensive data for a single genomic variant,
- /// including its unique identifier, genomic position, alleles, related VCF entries,
- /// and various annotations.
- ///
- /// # Fields
- /// * `hash` - A unique `Hash128` identifier for the variant
- /// * `position` - The `GenomePosition` of the variant (0-based coordinates)
- /// * `reference` - The reference allele as a `ReferenceAlternative`
- /// * `alternative` - The alternative allele as a `ReferenceAlternative`
- /// * `vcf_variants` - A vector of `VcfVariant` instances associated with this variant
- /// * `annotations` - A vector of `Annotation` instances providing additional variant information
- ///
- /// # Derives
- /// * `Debug` - For easier debugging and logging
- /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON)
- /// * `Clone` - For creating deep copies of the variant
- ///
- /// # Usage
- /// This struct is central to variant analysis, providing a unified representation
- /// of a variant that may combine data from multiple VCF entries and include
- /// various annotations. It's useful for advanced variant processing, filtering,
- /// and reporting in genomic analysis pipelines.
- ///
- /// # Example
- /// ```
- /// let variant = Variant {
- /// hash: Hash128::new(/* ... */),
- /// position: GenomePosition { contig: 1, position: 1000 },
- /// reference: ReferenceAlternative::new("A"),
- /// alternative: ReferenceAlternative::new("T"),
- /// vcf_variants: vec![/* VcfVariant instances */],
- /// annotations: vec![/* Annotation instances */],
- /// };
- /// ```
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct Variant {
- pub hash: Hash128,
- pub position: GenomePosition,
- pub reference: ReferenceAlternative,
- pub alternative: ReferenceAlternative,
- pub vcf_variants: Vec<VcfVariant>,
- pub annotations: Vec<Annotation>,
- }
- impl PartialEq for Variant {
- fn eq(&self, other: &Self) -> bool {
- self.position == other.position
- && self.reference == other.reference
- && self.alternative == other.alternative
- }
- }
- impl Variant {
- pub fn vep(&self) -> Vec<VEP> {
- self.annotations
- .iter()
- .flat_map(|a| {
- if let Annotation::VEP(v) = a {
- v.to_vec()
- } else {
- vec![]
- }
- })
- .collect()
- }
- pub fn alteration_category(&self) -> Vec<AlterationCategory> {
- self.vcf_variants
- .iter()
- .map(|v| v.alteration_category())
- .collect::<HashSet<_>>()
- .into_iter()
- .collect()
- }
- }
- /// A collection of genomic variants.
- ///
- /// This struct represents a set of `Variant` instances, providing a container
- /// for multiple genomic variants.
- ///
- /// # Fields
- /// * `data` - A vector of `Variant` instances
- ///
- /// # Derives
- /// * `Debug` - For easier debugging and logging
- /// * `Default` - Allows creation of an empty `Variants` instance
- /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON)
- /// * `Clone` - For creating deep copies of the collection
- ///
- /// # Usage
- /// This struct is typically used to group related variants, such as those from
- /// a single sample or analysis run. It provides a convenient way to handle and
- /// process multiple variants as a single unit.
- ///
- /// # Example
- /// ```
- /// let mut variants = Variants::default();
- /// variants.data.push(Variant {
- /// // ... variant details ...
- /// });
- /// // Process or analyze the collection of variants
- /// ```
- ///
- /// # Note
- /// The `Default` implementation creates an empty vector of variants.
- #[derive(Debug, Default, Serialize, Deserialize, Clone)]
- pub struct Variants {
- pub data: Vec<Variant>,
- }
- impl Variants {
- /// Sorts the variants in-place based on their genomic positions.
- ///
- /// This method uses an unstable sort to order the variants in the collection
- /// according to their genomic positions. The sorting is done in ascending order,
- /// first by contig and then by position within the contig.
- ///
- /// # Effects
- /// Modifies the order of variants in the `data` vector in-place.
- ///
- /// # Performance
- /// Uses `sort_unstable_by` for better performance, with a time complexity of O(n log n),
- /// where n is the number of variants.
- ///
- /// # Example
- /// ```
- /// let mut variants = Variants {
- /// data: vec![
- /// Variant { position: GenomePosition { contig: 0, position: 100 }, .. },
- /// Variant { position: GenomePosition { contig: 0, position: 50 }, .. },
- /// Variant { position: GenomePosition { contig: 1, position: 75 }, .. },
- /// ]
- /// };
- /// variants.sort();
- /// // Variants are now sorted by position
- /// ```
- pub fn sort(&mut self) {
- self.data
- .sort_unstable_by(|a, b| a.position.cmp(&b.position));
- }
- /// Merges this Variants collection with another VariantCollection, combining overlapping variants.
- ///
- /// This method performs a merge operation, combining the current Variants with a VariantCollection.
- /// It uses a two-pointer technique to efficiently merge the sorted collections, handling overlapping
- /// variants and creating new variants as necessary.
- ///
- /// # Arguments
- /// * `others` - A VariantCollection to merge with the current Variants
- /// * `annotations` - A reference to Annotations used when creating new variants
- ///
- /// # Effects
- /// * Modifies the current Variants in-place, replacing its data with the merged result
- /// * Drains the data from the current Variants during the merge process
- ///
- /// # Behavior
- /// * Variants are compared based on their genomic positions
- /// * When positions are equal, variants with matching reference and alternative alleles are merged
- /// * Non-matching variants at the same position are kept separate
- /// * The method logs the number of merged variants
- ///
- /// # Performance
- /// * Time complexity: O(n + m), where n and m are the sizes of the two collections
- /// * Space complexity: O(n + m) for the merged result
- ///
- /// # Note
- /// This method assumes that both the current Variants and the input VariantCollection are sorted
- /// by genomic position. Use the `sort` method before merging if necessary.
- ///
- /// # Example
- /// ```
- /// let mut variants1 = Variants { /* ... */ };
- /// let variants2 = VariantCollection { /* ... */ };
- /// let annotations = Annotations::new();
- /// variants1.merge(variants2, &annotations);
- /// // variants1 now contains the merged data
- /// ```
- pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) {
- let mut result = Vec::new();
- let mut n_merged = 0;
- let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data
- let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants
- // Merge using two-pointer technique
- while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek())
- {
- match self_variant.position.cmp(&other_variant.position) {
- std::cmp::Ordering::Less => {
- result.push(self_iter.next().unwrap());
- }
- std::cmp::Ordering::Greater => {
- result.push(create_variant(
- vec![others_iter.next().unwrap()],
- annotations,
- ));
- }
- std::cmp::Ordering::Equal => {
- let self_variant = self_iter.next().unwrap();
- let other_variant = others_iter.next().unwrap();
- if self_variant.reference == other_variant.reference
- && self_variant.alternative == other_variant.alternative
- {
- let mut merged_variant = self_variant;
- merged_variant.vcf_variants.push(other_variant);
- n_merged += 1;
- result.push(merged_variant);
- } else {
- result.push(self_variant);
- result.push(create_variant(vec![other_variant], annotations));
- }
- }
- }
- }
- // Drain remaining elements from iterators
- result.extend(self_iter);
- result.extend(others_iter.map(|v| create_variant(vec![v], annotations)));
- info!("n merged: {}", n_merged);
- self.data = result;
- }
- /// Filters and returns variants of a specific alteration category.
- ///
- /// This method creates a new vector containing clones of all variants
- /// in the collection that match the specified alteration category.
- ///
- /// # Arguments
- /// * `cat` - An `AlterationCategory` enum value specifying the category to filter by
- ///
- /// # Returns
- /// A `Vec<Variant>` containing clones of all matching variants
- ///
- /// # Performance
- /// * Time complexity: O(n), where n is the number of variants in the collection
- /// * Space complexity: O(m), where m is the number of matching variants
- ///
- /// # Example
- /// ```
- /// let variants = Variants { /* ... */ };
- /// let snvs = variants.get_alteration_cat(AlterationCategory::SNV);
- /// println!("Found {} SNVs", snvs.len());
- /// ```
- ///
- /// # Note
- /// This method clones matching variants. If you need to process a large number of variants
- /// and don't require ownership, consider implementing an iterator-based approach instead.
- pub fn get_alteration_cat(&self, cat: AlterationCategory) -> Vec<Variant> {
- self.data
- .iter()
- .filter(|v| v.alteration_category().contains(&cat))
- .cloned()
- .collect()
- }
- /// Saves the Variants collection to a BGZF-compressed JSON file.
- ///
- /// This method serializes the Variants struct to JSON format and writes it to a file
- /// using BGZF (Blocked GNU Zip Format) compression.
- ///
- /// # Arguments
- /// * `filename` - A string slice that holds the name of the file to write to
- ///
- /// # Returns
- /// * `Ok(())` if the operation is successful
- /// * `Err(anyhow::Error)` if any error occurs during file creation, serialization, or writing
- ///
- /// # Errors
- /// This function will return an error if:
- /// * The file cannot be created
- /// * The Variants struct cannot be serialized to JSON
- /// * The BGZF writer cannot be closed properly
- ///
- /// # Performance
- /// The time and space complexity depends on the size of the Variants collection
- /// and the efficiency of the JSON serialization and BGZF compression.
- ///
- /// # Example
- /// ```
- /// let variants = Variants { /* ... */ };
- /// match variants.save_to_json("output.json.gz") {
- /// Ok(()) => println!("Successfully saved variants"),
- /// Err(e) => eprintln!("Failed to save variants: {}", e),
- /// }
- /// ```
- ///
- /// # Note
- /// This method uses BGZF compression, which is compatible with standard gzip decompression.
- /// The resulting file can be read using standard gzip-aware tools.
- pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> {
- let file = File::create(filename)
- .with_context(|| format!("Failed to create file: {}", filename))?;
- let mut writer = BGZFWriter::new(file, bgzip::Compression::default());
- serde_json::to_writer(&mut writer, self)
- .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?;
- writer
- .close()
- .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?;
- debug!("Successfully saved variants to {}", filename);
- Ok(())
- }
- /// Loads a Variants collection from a BGZF-compressed JSON file.
- ///
- /// This method reads a BGZF-compressed file, deserializes its JSON content,
- /// and constructs a new Variants instance from it.
- ///
- /// # Arguments
- /// * `filename` - A string slice that holds the name of the file to read from
- ///
- /// # Returns
- /// * `Ok(Self)` containing the deserialized Variants if successful
- /// * `Err(anyhow::Error)` if any error occurs during file opening, reading, or deserialization
- ///
- /// # Errors
- /// This function will return an error if:
- /// * The file cannot be opened
- /// * A BGZF reader cannot be created for the file
- /// * The file content cannot be deserialized into a Variants struct
- ///
- /// # Performance
- /// The time and space complexity depends on the size of the input file
- /// and the efficiency of the JSON deserialization and BGZF decompression.
- ///
- /// # Example
- /// ```
- /// match Variants::load_from_json("input.json.gz") {
- /// Ok(variants) => println!("Successfully loaded {} variants", variants.data.len()),
- /// Err(e) => eprintln!("Failed to load variants: {}", e),
- /// }
- /// ```
- ///
- /// # Note
- /// This method expects the input file to be in BGZF-compressed JSON format,
- /// typically created by the `save_to_json` method of this struct.
- pub fn load_from_json(filename: &str) -> anyhow::Result<Self> {
- let file =
- File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?;
- let mut reader = BGZFReader::new(file)
- .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?;
- let variants: Self = serde_json::from_reader(&mut reader)
- .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?;
- debug!("Successfully loaded variants from {}", filename);
- Ok(variants)
- }
- }
- /// Creates a new Variant instance from a collection of VcfVariants and annotations.
- ///
- /// This function consolidates information from one or more VcfVariants into a single Variant,
- /// using the first VcfVariant as the primary source for most fields. It also retrieves
- /// and includes relevant annotations.
- ///
- /// # Arguments
- /// * `vcf_variants` - A vector of VcfVariant instances to be consolidated
- /// * `annotations` - A reference to an Annotations structure containing annotation data
- ///
- /// # Returns
- /// A new Variant instance
- ///
- /// # Behavior
- /// - Uses the first VcfVariant in the vector as the source for hash, position, reference, and alternative
- /// - Includes all provided VcfVariants in the new Variant
- /// - Retrieves annotations for the variant based on its hash
- /// - If no annotations are found, an empty vector is used
- ///
- /// # Panics
- /// This function will panic if `vcf_variants` is empty.
- ///
- /// # Example
- /// ```
- /// let vcf_variants = vec![VcfVariant { /* ... */ }];
- /// let annotations = Annotations::new();
- /// let variant = create_variant(vcf_variants, &annotations);
- /// ```
- ///
- /// # Note
- /// This function assumes that all VcfVariants in the input vector represent the same genomic variant
- /// and should be consolidated. It's the caller's responsibility to ensure this is the case.
- fn create_variant(vcf_variants: Vec<VcfVariant>, annotations: &Annotations) -> Variant {
- let first = &vcf_variants[0];
- let annotations = annotations
- .store
- .get(&first.hash)
- .map(|v| v.value().to_vec())
- .unwrap_or_default();
- Variant {
- hash: first.hash,
- position: first.position.clone(),
- reference: first.reference.clone(),
- alternative: first.alternative.clone(),
- vcf_variants,
- annotations,
- }
- }
- pub enum ExtAnnotationSource {
- Cosmic(Cosmic),
- GnomAD(GnomAD),
- }
- use rusqlite::{params, Connection, Result as SqliteResult};
- pub struct ExternalAnnotation {
- pub conn: Connection,
- }
- impl ExternalAnnotation {
- pub fn init() -> anyhow::Result<Self> {
- let mut db_path = app_storage_dir()?;
- db_path.push("annotations_db.sqlite");
- info!("Opening DB: {}", db_path.display());
- let conn = Connection::open(db_path)?;
- conn.execute(
- "CREATE TABLE IF NOT EXISTS annotations (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- hash BLOB(16) UNIQUE NOT NULL,
- source TEXT NOT NULL,
- data BLOB,
- modified TEXT NOT NULL
- )",
- [],
- )?;
- Ok(Self { conn })
- }
- pub fn annotate(
- &self,
- variants: &[VcfVariant],
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let mut unfound = Vec::new();
- for variant in variants {
- let hash = variant.hash();
- let mut has_pushed = false;
- // Check COSMIC
- match self.get_annotation(hash, "COSMIC")? {
- Some(cosmic) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::Cosmic(cosmic));
- }
- None => {
- unfound.push(variant.clone());
- has_pushed = true;
- }
- }
- // Check GnomAD
- match self.get_annotation(hash, "GnomAD")? {
- Some(gnomad) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::GnomAD(gnomad));
- }
- None => {
- if !has_pushed {
- unfound.push(variant.clone())
- }
- }
- }
- }
- if !unfound.is_empty() {
- self.process_unfound_echtvar(unfound, annotations)?;
- }
- Ok(())
- }
- fn get_annotation<T: serde::de::DeserializeOwned>(
- &self,
- hash: Hash128,
- source: &str,
- ) -> anyhow::Result<Option<T>> {
- let result: SqliteResult<Vec<u8>> = self.conn.query_row(
- "SELECT data FROM annotations WHERE hash = ? AND source = ?",
- params![hash.to_bytes(), source],
- |row| row.get(0),
- );
- match result {
- Ok(data) => Ok(Some(serde_json::from_slice(&data)?)),
- Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
- Err(e) => Err(e.into()),
- }
- }
- pub fn process_unfound_echtvar(
- &self,
- mut unfound: Vec<VcfVariant>,
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let temp_dir = std::env::temp_dir();
- unfound.par_sort();
- unfound.dedup();
- let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
- let min_chunk_size = 1000;
- let max_chunks = 150;
- let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- let results = unfound
- .par_chunks(optimal_chunk_size)
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
- let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
- let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
- // Write input VCF
- let mut vcf = File::create(&in_tmp)?;
- writeln!(vcf, "{}", header)?;
- for (i, row) in chunk.iter().enumerate() {
- writeln!(
- vcf,
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
- row.position.contig(),
- row.position.position + 1, // vcf
- i + 1,
- row.reference,
- row.alternative
- )?;
- }
- // Run echtvar
- run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?;
- fs::remove_file(in_tmp)?;
- // Parse echtvar output
- let mut reader = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(get_reader(out_tmp.to_str().unwrap())?);
- let mut chunk_results = Vec::new();
- for (i, result) in reader.deserialize::<crate::io::vcf::VCFRow>().enumerate() {
- let row = result?;
- // Verify that the ID corresponds to the input
- let id: usize = row.id.parse().context("Failed to parse ID")?;
- if id != i + 1 {
- return Err(anyhow::anyhow!(
- "Echtvar output ID {} does not match expected ID {}",
- id,
- i + 1
- ));
- }
- let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
- let hash = chunk[i].hash();
- chunk_results.push((hash, cosmic, gnomad));
- }
- fs::remove_file(out_tmp)?;
- Ok(chunk_results)
- })
- .flatten()
- .collect::<Vec<_>>();
- for (hash, cosmic, gnomad) in results {
- // let modified = chrono::Utc::now().to_rfc3339();
- // Update COSMIC
- // self.conn.execute(
- // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
- // params![
- // hash.to_le_bytes().to_vec(),
- // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()),
- // modified
- // ],
- // )?;
- //
- // // Update GnomAD
- // self.conn.execute(
- // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
- // params![
- // hash.to_le_bytes().to_vec(),
- // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()),
- // modified
- // ],
- // )?;
- // Update in-memory annotations
- if let Some(c) = cosmic {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::Cosmic(c));
- }
- if let Some(g) = gnomad {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::GnomAD(g));
- }
- }
- Ok(())
- }
- pub fn annotate_vep(
- &self,
- variants: &[VcfVariant],
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let mut unfound = Vec::new();
- for variant in variants {
- let hash = variant.hash();
- // Check VEP
- match self.get_annotation(hash, "VEP")? {
- Some(veps) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::VEP(veps));
- }
- None => {
- unfound.push(variant.clone());
- }
- }
- }
- if !unfound.is_empty() {
- self.process_unfound_vep(unfound, annotations)?;
- }
- Ok(())
- }
- pub fn process_unfound_vep(
- &self,
- mut unfound: Vec<VcfVariant>,
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- unfound.par_sort();
- unfound.dedup();
- info!("{} variants to annotate with VEP.", unfound.len());
- let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
- let (sv, unfound): (Vec<VcfVariant>, Vec<VcfVariant>) =
- unfound.into_iter().partition(|v| v.has_svtype());
- warn!("SV {}", sv.len());
- let min_chunk_size = 1000;
- let max_chunks = 150;
- let mut results: Vec<(Hash128, Vec<VEP>)> = if !unfound.is_empty() {
- let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- debug!("{} chunks to process.", unfound.len() / optimal_chunk_size);
- unfound
- .par_chunks(optimal_chunk_size)
- .enumerate()
- .map(|(chunk_i, chunk)| {
- debug!("Processing chunk {chunk_i}");
- process_vep_chunk(chunk, &header).map_err(|e| {
- error!("Error processing chunk {chunk_i}: {e}");
- e
- })
- })
- .collect::<Result<Vec<_>, _>>()? // Collect results into a Result<Vec<_>>
- .into_iter()
- .flatten()
- .collect::<Vec<_>>()
- } else {
- Vec::new()
- };
- if !sv.is_empty() {
- let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- let results_sv = sv
- .par_chunks(optimal_chunk_size)
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
- let in_tmp = temp_file_path(".vcf")
- .context("Can't create tmp path for in tmp")?
- .to_str()
- .unwrap()
- .to_string();
- let out_vep = temp_file_path("_vep.txt")
- .context("Can't create tmp path for in tmp")?
- .to_str()
- .unwrap()
- .to_string();
- let out_summary = format!("{out_vep}_summary.html");
- let out_warnings = format!("{out_vep}_warnings.txt");
- // Write input VCF
- let mut vcf =
- File::create(&in_tmp).context("Can't create input vcf file for VEP.")?;
- writeln!(vcf, "{}", header)?;
- for (i, mut row) in chunk.iter().cloned().enumerate() {
- row.id = (i + 1).to_string();
- let s = row.into_vcf_row();
- writeln!(vcf, "{s}",)?;
- }
- run_vep(&in_tmp, &out_vep).context("Error while running VEP.")?;
- let mut reader_vep = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(
- fs::File::open(&out_vep).context("Can't open VEP result file.")?,
- );
- let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
- for line in reader_vep.deserialize() {
- let line: VepLine = line.context("Failed to deserialize VepLine")?;
- let key = line
- .uploaded_variation
- .parse::<u64>()
- .context("Failed to parse uploaded_variation as u64")?;
- lines.entry(key).or_default().push(line);
- }
- fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
- let mut n_not_vep = 0;
- let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
- chunk.iter().enumerate().for_each(|(i, entry)| {
- let k = (i + 1) as u64;
- if let Some(vep_lines) = lines.get(&k) {
- if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
- chunk_results.push((entry.hash(), veps));
- }
- } else {
- warn!(
- "No VEP entry for {}\t{}\t{}",
- entry.position.to_string(),
- entry.reference.to_string(),
- entry.alternative.to_string()
- );
- n_not_vep += 1;
- }
- });
- fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
- if n_not_vep > 0 {
- debug!("{n_not_vep} variants not annotated by VEP.");
- let warnings = fs::read_to_string(&out_warnings)
- .context(format!("Can't read VEP warnings: {out_warnings}"))?;
- warn!("VEP warnings:\n{warnings}");
- }
- if Path::new(&out_warnings).exists() {
- fs::remove_file(&out_warnings)
- .context(format!("Can't remove file {out_warnings}"))?;
- }
- if Path::new(&out_summary).exists() {
- fs::remove_file(&out_summary)
- .context(format!("Can't remove file {out_summary}"))?;
- }
- Ok(chunk_results)
- })
- .flatten()
- .collect::<Vec<_>>();
- results.extend(results_sv);
- }
- info!("{} total variants annotaded by VEP.", results.len());
- for (hash, veps) in results {
- // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::VEP(veps));
- }
- Ok(())
- }
- pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> {
- let modified = chrono::Utc::now().to_rfc3339();
- self.conn.execute(
- "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)",
- params![hash.to_bytes(), source, data, modified],
- )?;
- Ok(())
- }
- }
- fn process_vep_chunk(
- chunk: &[VcfVariant],
- header: &str,
- ) -> anyhow::Result<Vec<(Hash128, Vec<VEP>)>> {
- let in_tmp = temp_file_path("vcf")?
- .to_str()
- .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
- .to_string();
- let out_vep = temp_file_path("_vep.txt")?
- .to_str()
- .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
- .to_string();
- let out_summary = format!("{out_vep}_summary.html");
- let out_warnings = format!("{out_vep}_warnings.txt");
- let mut vcf = File::create(&in_tmp)?; // If this fails, the error is propagated.
- writeln!(vcf, "{}", header)?; // If this fails, the error is propagated.
- for (i, row) in chunk.iter().enumerate() {
- writeln!(
- vcf,
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
- row.position.contig(),
- row.position.position + 1,
- i + 1,
- row.reference,
- row.alternative
- )?;
- }
- if let Err(e) = run_vep(&in_tmp, &out_vep) {
- error!("VEP error: {e}");
- return Err(anyhow::anyhow!("VEP execution failed: {}", e)); // Propagate the error.
- }
- let mut reader_vep = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(fs::File::open(&out_vep)?); // If this fails, the error is propagated.
- let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
- for line in reader_vep.deserialize() {
- let line: VepLine = line.context("Failed to deserialize VepLine")?; // Propagate the error.
- let key = line
- .uploaded_variation
- .parse::<u64>()
- .context("Failed to parse uploaded_variation as u64")?; // Propagate the error.
- lines.entry(key).or_default().push(line);
- }
- fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
- let mut n_not_vep = 0;
- let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
- chunk.iter().enumerate().for_each(|(i, entry)| {
- let k = (i + 1) as u64;
- if let Some(vep_lines) = lines.get(&k) {
- if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
- chunk_results.push((entry.hash(), veps));
- }
- } else {
- warn!(
- "No VEP entry for {}\t{}\t{}",
- entry.position.to_string(),
- entry.reference.to_string(),
- entry.alternative.to_string()
- );
- n_not_vep += 1;
- }
- });
- fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
- if n_not_vep > 0 {
- debug!("{n_not_vep} variants not annotated by VEP.");
- let warnings = fs::read_to_string(&out_warnings)
- .context(format!("Can't read VEP warnings: {out_warnings}"))?;
- warn!("VEP warnings:\n{warnings}");
- }
- if Path::new(&out_warnings).exists() {
- fs::remove_file(&out_warnings).context(format!("Can't remove file {out_warnings}"))?;
- }
- if Path::new(&out_summary).exists() {
- fs::remove_file(&out_summary).context(format!("Can't remove file {out_summary}"))?;
- }
- Ok(chunk_results) // Return the successful result.
- }
|