use std::{ collections::{HashMap, HashSet}, fs::{self, File}, io::Write, path::Path, }; use anyhow::Context; use bgzip::{BGZFReader, BGZFWriter}; use csv::ReaderBuilder; use log::{debug, error, info, warn}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant}; use crate::{ annotation::{ cosmic::Cosmic, echtvar::{parse_echtvar_val, run_echtvar}, gnomad::GnomAD, vep::{run_vep, VepLine, VEP}, Annotation, Annotations, }, collection::{ bam::{counts_at, counts_ins_at}, vcf::Vcf, }, helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128}, io::{fasta::sequence_at, readers::get_reader, vcf::vcf_header}, positions::GenomePosition, }; /// A collection of VCF variants along with associated metadata. /// /// This struct represents a set of variants from a VCF file, including /// the original VCF metadata and the caller annotation. /// /// # Fields /// * `variants` - A vector of VcfVariant instances representing individual variants /// * `vcf` - The Vcf struct containing metadata from the original VCF file /// * `caller` - An Annotation indicating the variant caller used /// /// # Derives /// This struct derives Debug and Clone traits, allowing for easy debugging /// and cloning of VariantCollection instances. /// /// # Usage /// This struct is typically used to group related variants from a single VCF file, /// preserving the context in which they were called and allowing for collective /// operations on the set of variants. #[derive(Debug, Clone)] pub struct VariantCollection { pub variants: Vec, pub vcf: Vcf, pub caller: Annotation, } impl VariantCollection { /// Returns a vector of hash keys for all variants in the collection. /// /// This method generates a unique hash for each variant in the collection /// and returns these hashes as a vector. /// /// # Returns /// A `Vec` containing the hash of each variant in the collection. /// /// # Performance /// This method iterates over all variants in the collection, so its time complexity /// is O(n) where n is the number of variants. /// /// # Usage /// This method is useful for obtaining unique identifiers for each variant, /// which can be used for indexing, comparison, or lookup operations. /// /// # Example /// ``` /// let variant_keys = variant_collection.keys(); /// for key in variant_keys { /// println!("Variant hash: {:?}", key); /// } /// ``` pub fn keys(&self) -> Vec { self.variants.iter().map(|v| v.hash()).collect() } /// Retains only the variants whose hash keys are present in the provided set. /// /// This method filters the variants in the collection, keeping only those /// whose hash keys are found in the `keys_to_keep` set. /// /// # Arguments /// * `keys_to_keep` - A `HashSet` containing the hash keys of variants to retain /// /// # Effects /// This method modifies the `VariantCollection` in place, potentially reducing /// the number of variants it contains. /// /// # Performance /// The time complexity is O(n), where n is the number of variants in the collection. /// The space complexity is O(1) as it operates in place. /// /// # Usage /// This method is useful for filtering variants based on a pre-computed set of keys, /// which can be helpful in various scenarios such as: /// - Removing duplicates /// - Keeping only variants that meet certain criteria /// - Synchronizing variants across different collections /// /// # Example /// ``` /// let mut variant_collection = VariantCollection::new(); /// // ... populate variant_collection ... /// /// let keys_to_keep: HashSet = some_filtering_function(); /// variant_collection.retain_keys(&keys_to_keep); /// ``` pub fn retain_keys(&mut self, keys_to_keep: &HashSet) { self.variants.retain(|v| keys_to_keep.contains(&v.hash())); } /// Removes variants whose hash keys are present in the provided set. /// /// This method filters the variants in the collection, removing those /// whose hash keys are found in the `keys_to_remove` set. /// /// # Arguments /// * `keys_to_remove` - A `HashSet` containing the hash keys of variants to remove /// /// # Effects /// This method modifies the `VariantCollection` in place, potentially reducing /// the number of variants it contains. /// /// # Performance /// The time complexity is O(n), where n is the number of variants in the collection. /// The space complexity is O(1) as it operates in place. /// /// # Usage /// This method is useful for filtering out unwanted variants based on a pre-computed set of keys, /// which can be helpful in various scenarios such as: /// - Removing variants that fail certain criteria /// - Eliminating duplicates identified by their hash keys /// - Excluding variants present in another dataset /// /// # Example /// ``` /// let mut variant_collection = VariantCollection::new(); /// // ... populate variant_collection ... /// /// let keys_to_remove: HashSet = some_filtering_function(); /// variant_collection.remove_keys(&keys_to_remove); /// ``` pub fn remove_keys(&mut self, keys_to_remove: &HashSet) { self.variants .retain(|v| !keys_to_remove.contains(&v.hash())); } /// Partitions the VcfVariants into two sets based on a given predicate. /// /// This function splits the current VcfVariants instance into two new instances, /// where the variants are divided based on whether they satisfy the given predicate. /// /// # Arguments /// * `predicate` - A closure that takes a reference to a VcfVariant and returns a boolean /// /// # Returns /// A tuple of two VcfVariants instances: /// * The first contains all variants for which the predicate returned true /// * The second contains all variants for which the predicate returned false /// /// # Type Parameters /// * `F` - The type of the predicate function, which must implement Fn(&VcfVariant) -> bool /// /// # Behavior /// - Consumes the original VcfVariants instance /// - Creates two new VcfVariants instances with the partitioned data /// - The VCF and caller information are cloned for the first returned instance /// and moved into the second returned instance /// /// # Examples /// ``` /// let (passing, failing) = vcf_variants.partition(|variant| variant.quality.map_or(false, |q| q > 30)); /// ``` pub fn partition(self, predicate: F) -> (Self, Self) where F: Fn(&VcfVariant) -> bool, { let Self { variants, vcf, caller, } = self; let (left_data, right_data) = variants.into_iter().partition(predicate); ( Self { variants: left_data, vcf: vcf.clone(), caller: caller.clone(), }, Self { variants: right_data, vcf, caller, }, ) } fn chunk_size(&self, max_threads: u8) -> usize { let total_items = self.variants.len(); let min_chunk_size = 1000; let max_chunks = max_threads; let optimal_chunk_size = total_items.div_ceil(max_chunks as usize); optimal_chunk_size.max(min_chunk_size) } /// Annotates variants with sequence entropy information. /// /// This function calculates and adds Shannon entropy annotations to the variants /// based on the surrounding sequence context. It processes variants in parallel /// chunks for improved performance. /// /// # Arguments /// * `annotations` - A reference to the Annotations structure to store the results /// * `reference` - Path to the reference FASTA file /// * `seq_len` - Length of the sequence context to consider for entropy calculation /// * `max_threads` - Maximum number of threads to use for parallel processing /// /// # Behavior /// - Processes variants in parallel chunks /// - For each variant, retrieves the surrounding sequence from the reference /// - Calculates Shannon entropy for the sequence /// - Adds the entropy value as an annotation to the variant /// - Skips variants that already have a Shannon entropy annotation /// /// # Panics /// This function will panic if it fails to build the FASTA reader from the provided reference path. pub fn annotate_with_sequence_entropy( &self, annotations: &Annotations, reference: &str, seq_len: usize, max_threads: u8, ) { self.variants .par_chunks(self.chunk_size(max_threads)) .for_each(|chunk| { let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default() .build_from_path(reference) .unwrap(); for c in chunk { let key = c.hash(); let mut anns = annotations.store.entry(key).or_default(); if !anns .iter() .any(|e| matches!(e, Annotation::ShannonEntropy(_))) { if let Ok(r) = sequence_at( &mut fasta_reader, &c.position.contig(), c.position.position as usize, seq_len, ) { let ent = estimate_shannon_entropy(r.as_str()); anns.push(Annotation::ShannonEntropy(ent)); } } } }); } /// Annotates variants with information from a constitutional BAM file. /// /// This function processes variants in parallel chunks and adds annotations /// based on the read counts from a provided BAM file. It calculates and adds /// annotations for the number of reads supporting the alternative allele and /// the total read depth at each variant position. /// /// # Arguments /// * `annotations` - A reference to the Annotations structure to store the results /// * `constit_bam_path` - Path to the constituent BAM file /// * `max_threads` - Maximum number of threads to use for parallel processing /// /// # Returns /// * `Ok(())` if the annotation process completes successfully /// * `Err` if there's an error opening the BAM file or processing the variants /// /// # Behavior /// - Processes variants in parallel chunks /// - For each variant, retrieves read counts from the BAM file /// - Calculates the number of reads supporting the alternative allele and total depth /// - Adds ConstitAlt and ConstitDepth annotations to each variant /// - Skips variants that already have both ConstitAlt and ConstitDepth annotations /// - Handles insertions, deletions, and other alteration types differently /// /// # Errors /// This function will return an error if: /// - It fails to open the BAM file /// - There's an error while processing the variants or reading from the BAM file pub fn annotate_with_constit_bam( &self, annotations: &Annotations, constit_bam_path: &str, max_threads: u8, ) -> anyhow::Result<()> { self.variants .par_chunks(self.chunk_size(max_threads)) .try_for_each(|chunk| { let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path) .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?; for var in chunk { let key = var.hash(); let mut anns = annotations.store.entry(key).or_default(); if anns .iter() .filter(|e| { matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_)) }) .count() != 2 { let mut n_alt = 0; let mut depth = 0; let mut alt_seq = var.alternative.to_string(); let c = if var.alteration_category() == AlterationCategory::INS { // TODO: Add stretch comparison alt_seq = alt_seq.split_off(1); counts_ins_at(&mut bam, &var.position.contig(), var.position.position)? } else if var.alteration_category() == AlterationCategory::DEL { alt_seq = "D".to_string(); counts_at(&mut bam, &var.position.contig(), var.position.position + 1)? } else { counts_at(&mut bam, &var.position.contig(), var.position.position)? }; for (seq, n) in c { if seq == alt_seq { n_alt = n; } if seq == *"D" && alt_seq != *"D" { continue; } depth += n; } anns.push(Annotation::ConstitAlt(n_alt as u16)); anns.push(Annotation::ConstitDepth(depth as u16)); } } anyhow::Ok(()) })?; Ok(()) } pub fn stats(&self) { println!( "{} {}\t{}", self.vcf.caller, self.vcf.time, self.variants.len() ); } } /// Represents a consolidated genomic variant with associated information and annotations. /// /// This struct encapsulates comprehensive data for a single genomic variant, /// including its unique identifier, genomic position, alleles, related VCF entries, /// and various annotations. /// /// # Fields /// * `hash` - A unique `Hash128` identifier for the variant /// * `position` - The `GenomePosition` of the variant (0-based coordinates) /// * `reference` - The reference allele as a `ReferenceAlternative` /// * `alternative` - The alternative allele as a `ReferenceAlternative` /// * `vcf_variants` - A vector of `VcfVariant` instances associated with this variant /// * `annotations` - A vector of `Annotation` instances providing additional variant information /// /// # Derives /// * `Debug` - For easier debugging and logging /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON) /// * `Clone` - For creating deep copies of the variant /// /// # Usage /// This struct is central to variant analysis, providing a unified representation /// of a variant that may combine data from multiple VCF entries and include /// various annotations. It's useful for advanced variant processing, filtering, /// and reporting in genomic analysis pipelines. /// /// # Example /// ``` /// let variant = Variant { /// hash: Hash128::new(/* ... */), /// position: GenomePosition { contig: 1, position: 1000 }, /// reference: ReferenceAlternative::new("A"), /// alternative: ReferenceAlternative::new("T"), /// vcf_variants: vec![/* VcfVariant instances */], /// annotations: vec![/* Annotation instances */], /// }; /// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Variant { pub hash: Hash128, pub position: GenomePosition, pub reference: ReferenceAlternative, pub alternative: ReferenceAlternative, pub vcf_variants: Vec, pub annotations: Vec, } impl PartialEq for Variant { fn eq(&self, other: &Self) -> bool { self.position == other.position && self.reference == other.reference && self.alternative == other.alternative } } impl Variant { pub fn vep(&self) -> Vec { self.annotations .iter() .flat_map(|a| { if let Annotation::VEP(v) = a { v.to_vec() } else { vec![] } }) .collect() } pub fn alteration_category(&self) -> Vec { self.vcf_variants .iter() .map(|v| v.alteration_category()) .collect::>() .into_iter() .collect() } } /// A collection of genomic variants. /// /// This struct represents a set of `Variant` instances, providing a container /// for multiple genomic variants. /// /// # Fields /// * `data` - A vector of `Variant` instances /// /// # Derives /// * `Debug` - For easier debugging and logging /// * `Default` - Allows creation of an empty `Variants` instance /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON) /// * `Clone` - For creating deep copies of the collection /// /// # Usage /// This struct is typically used to group related variants, such as those from /// a single sample or analysis run. It provides a convenient way to handle and /// process multiple variants as a single unit. /// /// # Example /// ``` /// let mut variants = Variants::default(); /// variants.data.push(Variant { /// // ... variant details ... /// }); /// // Process or analyze the collection of variants /// ``` /// /// # Note /// The `Default` implementation creates an empty vector of variants. #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Variants { pub data: Vec, } impl Variants { /// Sorts the variants in-place based on their genomic positions. /// /// This method uses an unstable sort to order the variants in the collection /// according to their genomic positions. The sorting is done in ascending order, /// first by contig and then by position within the contig. /// /// # Effects /// Modifies the order of variants in the `data` vector in-place. /// /// # Performance /// Uses `sort_unstable_by` for better performance, with a time complexity of O(n log n), /// where n is the number of variants. /// /// # Example /// ``` /// let mut variants = Variants { /// data: vec![ /// Variant { position: GenomePosition { contig: 0, position: 100 }, .. }, /// Variant { position: GenomePosition { contig: 0, position: 50 }, .. }, /// Variant { position: GenomePosition { contig: 1, position: 75 }, .. }, /// ] /// }; /// variants.sort(); /// // Variants are now sorted by position /// ``` pub fn sort(&mut self) { self.data .sort_unstable_by(|a, b| a.position.cmp(&b.position)); } /// Merges this Variants collection with another VariantCollection, combining overlapping variants. /// /// This method performs a merge operation, combining the current Variants with a VariantCollection. /// It uses a two-pointer technique to efficiently merge the sorted collections, handling overlapping /// variants and creating new variants as necessary. /// /// # Arguments /// * `others` - A VariantCollection to merge with the current Variants /// * `annotations` - A reference to Annotations used when creating new variants /// /// # Effects /// * Modifies the current Variants in-place, replacing its data with the merged result /// * Drains the data from the current Variants during the merge process /// /// # Behavior /// * Variants are compared based on their genomic positions /// * When positions are equal, variants with matching reference and alternative alleles are merged /// * Non-matching variants at the same position are kept separate /// * The method logs the number of merged variants /// /// # Performance /// * Time complexity: O(n + m), where n and m are the sizes of the two collections /// * Space complexity: O(n + m) for the merged result /// /// # Note /// This method assumes that both the current Variants and the input VariantCollection are sorted /// by genomic position. Use the `sort` method before merging if necessary. /// /// # Example /// ``` /// let mut variants1 = Variants { /* ... */ }; /// let variants2 = VariantCollection { /* ... */ }; /// let annotations = Annotations::new(); /// variants1.merge(variants2, &annotations); /// // variants1 now contains the merged data /// ``` pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) { let mut result = Vec::new(); let mut n_merged = 0; let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants // Merge using two-pointer technique while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek()) { match self_variant.position.cmp(&other_variant.position) { std::cmp::Ordering::Less => { result.push(self_iter.next().unwrap()); } std::cmp::Ordering::Greater => { result.push(create_variant( vec![others_iter.next().unwrap()], annotations, )); } std::cmp::Ordering::Equal => { let self_variant = self_iter.next().unwrap(); let other_variant = others_iter.next().unwrap(); if self_variant.reference == other_variant.reference && self_variant.alternative == other_variant.alternative { let mut merged_variant = self_variant; merged_variant.vcf_variants.push(other_variant); n_merged += 1; result.push(merged_variant); } else { result.push(self_variant); result.push(create_variant(vec![other_variant], annotations)); } } } } // Drain remaining elements from iterators result.extend(self_iter); result.extend(others_iter.map(|v| create_variant(vec![v], annotations))); info!("n merged: {}", n_merged); self.data = result; } /// Filters and returns variants of a specific alteration category. /// /// This method creates a new vector containing clones of all variants /// in the collection that match the specified alteration category. /// /// # Arguments /// * `cat` - An `AlterationCategory` enum value specifying the category to filter by /// /// # Returns /// A `Vec` containing clones of all matching variants /// /// # Performance /// * Time complexity: O(n), where n is the number of variants in the collection /// * Space complexity: O(m), where m is the number of matching variants /// /// # Example /// ``` /// let variants = Variants { /* ... */ }; /// let snvs = variants.get_alteration_cat(AlterationCategory::SNV); /// println!("Found {} SNVs", snvs.len()); /// ``` /// /// # Note /// This method clones matching variants. If you need to process a large number of variants /// and don't require ownership, consider implementing an iterator-based approach instead. pub fn get_alteration_cat(&self, cat: AlterationCategory) -> Vec { self.data .iter() .filter(|v| v.alteration_category().contains(&cat)) .cloned() .collect() } /// Saves the Variants collection to a BGZF-compressed JSON file. /// /// This method serializes the Variants struct to JSON format and writes it to a file /// using BGZF (Blocked GNU Zip Format) compression. /// /// # Arguments /// * `filename` - A string slice that holds the name of the file to write to /// /// # Returns /// * `Ok(())` if the operation is successful /// * `Err(anyhow::Error)` if any error occurs during file creation, serialization, or writing /// /// # Errors /// This function will return an error if: /// * The file cannot be created /// * The Variants struct cannot be serialized to JSON /// * The BGZF writer cannot be closed properly /// /// # Performance /// The time and space complexity depends on the size of the Variants collection /// and the efficiency of the JSON serialization and BGZF compression. /// /// # Example /// ``` /// let variants = Variants { /* ... */ }; /// match variants.save_to_json("output.json.gz") { /// Ok(()) => println!("Successfully saved variants"), /// Err(e) => eprintln!("Failed to save variants: {}", e), /// } /// ``` /// /// # Note /// This method uses BGZF compression, which is compatible with standard gzip decompression. /// The resulting file can be read using standard gzip-aware tools. pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> { let file = File::create(filename) .with_context(|| format!("Failed to create file: {}", filename))?; let mut writer = BGZFWriter::new(file, bgzip::Compression::default()); serde_json::to_writer(&mut writer, self) .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?; writer .close() .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?; debug!("Successfully saved variants to {}", filename); Ok(()) } /// Loads a Variants collection from a BGZF-compressed JSON file. /// /// This method reads a BGZF-compressed file, deserializes its JSON content, /// and constructs a new Variants instance from it. /// /// # Arguments /// * `filename` - A string slice that holds the name of the file to read from /// /// # Returns /// * `Ok(Self)` containing the deserialized Variants if successful /// * `Err(anyhow::Error)` if any error occurs during file opening, reading, or deserialization /// /// # Errors /// This function will return an error if: /// * The file cannot be opened /// * A BGZF reader cannot be created for the file /// * The file content cannot be deserialized into a Variants struct /// /// # Performance /// The time and space complexity depends on the size of the input file /// and the efficiency of the JSON deserialization and BGZF decompression. /// /// # Example /// ``` /// match Variants::load_from_json("input.json.gz") { /// Ok(variants) => println!("Successfully loaded {} variants", variants.data.len()), /// Err(e) => eprintln!("Failed to load variants: {}", e), /// } /// ``` /// /// # Note /// This method expects the input file to be in BGZF-compressed JSON format, /// typically created by the `save_to_json` method of this struct. pub fn load_from_json(filename: &str) -> anyhow::Result { let file = File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?; let mut reader = BGZFReader::new(file) .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?; let variants: Self = serde_json::from_reader(&mut reader) .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?; debug!("Successfully loaded variants from {}", filename); Ok(variants) } } /// Creates a new Variant instance from a collection of VcfVariants and annotations. /// /// This function consolidates information from one or more VcfVariants into a single Variant, /// using the first VcfVariant as the primary source for most fields. It also retrieves /// and includes relevant annotations. /// /// # Arguments /// * `vcf_variants` - A vector of VcfVariant instances to be consolidated /// * `annotations` - A reference to an Annotations structure containing annotation data /// /// # Returns /// A new Variant instance /// /// # Behavior /// - Uses the first VcfVariant in the vector as the source for hash, position, reference, and alternative /// - Includes all provided VcfVariants in the new Variant /// - Retrieves annotations for the variant based on its hash /// - If no annotations are found, an empty vector is used /// /// # Panics /// This function will panic if `vcf_variants` is empty. /// /// # Example /// ``` /// let vcf_variants = vec![VcfVariant { /* ... */ }]; /// let annotations = Annotations::new(); /// let variant = create_variant(vcf_variants, &annotations); /// ``` /// /// # Note /// This function assumes that all VcfVariants in the input vector represent the same genomic variant /// and should be consolidated. It's the caller's responsibility to ensure this is the case. fn create_variant(vcf_variants: Vec, annotations: &Annotations) -> Variant { let first = &vcf_variants[0]; let annotations = annotations .store .get(&first.hash) .map(|v| v.value().to_vec()) .unwrap_or_default(); Variant { hash: first.hash, position: first.position.clone(), reference: first.reference.clone(), alternative: first.alternative.clone(), vcf_variants, annotations, } } pub enum ExtAnnotationSource { Cosmic(Cosmic), GnomAD(GnomAD), } use rusqlite::{params, Connection, Result as SqliteResult}; pub struct ExternalAnnotation { pub conn: Connection, } impl ExternalAnnotation { pub fn init() -> anyhow::Result { let mut db_path = app_storage_dir()?; db_path.push("annotations_db.sqlite"); info!("Opening DB: {}", db_path.display()); let conn = Connection::open(db_path)?; conn.execute( "CREATE TABLE IF NOT EXISTS annotations ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash BLOB(16) UNIQUE NOT NULL, source TEXT NOT NULL, data BLOB, modified TEXT NOT NULL )", [], )?; Ok(Self { conn }) } pub fn annotate( &self, variants: &[VcfVariant], annotations: &Annotations, ) -> anyhow::Result<()> { let mut unfound = Vec::new(); for variant in variants { let hash = variant.hash(); let mut has_pushed = false; // Check COSMIC match self.get_annotation(hash, "COSMIC")? { Some(cosmic) => { annotations .store .entry(hash) .or_default() .push(Annotation::Cosmic(cosmic)); } None => { unfound.push(variant.clone()); has_pushed = true; } } // Check GnomAD match self.get_annotation(hash, "GnomAD")? { Some(gnomad) => { annotations .store .entry(hash) .or_default() .push(Annotation::GnomAD(gnomad)); } None => { if !has_pushed { unfound.push(variant.clone()) } } } } if !unfound.is_empty() { self.process_unfound_echtvar(unfound, annotations)?; } Ok(()) } fn get_annotation( &self, hash: Hash128, source: &str, ) -> anyhow::Result> { let result: SqliteResult> = self.conn.query_row( "SELECT data FROM annotations WHERE hash = ? AND source = ?", params![hash.to_bytes(), source], |row| row.get(0), ); match result { Ok(data) => Ok(Some(serde_json::from_slice(&data)?)), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), Err(e) => Err(e.into()), } } pub fn process_unfound_echtvar( &self, mut unfound: Vec, annotations: &Annotations, ) -> anyhow::Result<()> { let temp_dir = std::env::temp_dir(); unfound.par_sort(); unfound.dedup(); let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n"); let min_chunk_size = 1000; let max_chunks = 150; let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); let results = unfound .par_chunks(optimal_chunk_size) .flat_map(|chunk| -> anyhow::Result> { let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4())); let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4())); // Write input VCF let mut vcf = File::create(&in_tmp)?; writeln!(vcf, "{}", header)?; for (i, row) in chunk.iter().enumerate() { writeln!( vcf, "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.", row.position.contig(), row.position.position + 1, // vcf i + 1, row.reference, row.alternative )?; } // Run echtvar run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?; fs::remove_file(in_tmp)?; // Parse echtvar output let mut reader = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(get_reader(out_tmp.to_str().unwrap())?); let mut chunk_results = Vec::new(); for (i, result) in reader.deserialize::().enumerate() { let row = result?; // Verify that the ID corresponds to the input let id: usize = row.id.parse().context("Failed to parse ID")?; if id != i + 1 { return Err(anyhow::anyhow!( "Echtvar output ID {} does not match expected ID {}", id, i + 1 )); } let (cosmic, gnomad) = parse_echtvar_val(&row.info)?; let hash = chunk[i].hash(); chunk_results.push((hash, cosmic, gnomad)); } fs::remove_file(out_tmp)?; Ok(chunk_results) }) .flatten() .collect::>(); for (hash, cosmic, gnomad) in results { // let modified = chrono::Utc::now().to_rfc3339(); // Update COSMIC // self.conn.execute( // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)", // params![ // hash.to_le_bytes().to_vec(), // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()), // modified // ], // )?; // // // Update GnomAD // self.conn.execute( // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)", // params![ // hash.to_le_bytes().to_vec(), // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()), // modified // ], // )?; // Update in-memory annotations if let Some(c) = cosmic { annotations .store .entry(hash) .or_default() .push(Annotation::Cosmic(c)); } if let Some(g) = gnomad { annotations .store .entry(hash) .or_default() .push(Annotation::GnomAD(g)); } } Ok(()) } pub fn annotate_vep( &self, variants: &[VcfVariant], annotations: &Annotations, ) -> anyhow::Result<()> { let mut unfound = Vec::new(); for variant in variants { let hash = variant.hash(); // Check VEP match self.get_annotation(hash, "VEP")? { Some(veps) => { annotations .store .entry(hash) .or_default() .push(Annotation::VEP(veps)); } None => { unfound.push(variant.clone()); } } } if !unfound.is_empty() { self.process_unfound_vep(unfound, annotations)?; } Ok(()) } pub fn process_unfound_vep( &self, mut unfound: Vec, annotations: &Annotations, ) -> anyhow::Result<()> { unfound.par_sort(); unfound.dedup(); info!("{} variants to annotate with VEP.", unfound.len()); let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n"); let (sv, unfound): (Vec, Vec) = unfound.into_iter().partition(|v| v.has_svtype()); warn!("SV {}", sv.len()); let min_chunk_size = 1000; let max_chunks = 150; let mut results: Vec<(Hash128, Vec)> = if !unfound.is_empty() { let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); debug!("{} chunks to process.", unfound.len() / optimal_chunk_size); unfound .par_chunks(optimal_chunk_size) .enumerate() .map(|(chunk_i, chunk)| { debug!("Processing chunk {chunk_i}"); process_vep_chunk(chunk, &header).map_err(|e| { error!("Error processing chunk {chunk_i}: {e}"); e }) }) .collect::, _>>()? // Collect results into a Result> .into_iter() .flatten() .collect::>() } else { Vec::new() }; if !sv.is_empty() { let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); let results_sv = sv .par_chunks(optimal_chunk_size) .flat_map(|chunk| -> anyhow::Result> { let in_tmp = temp_file_path(".vcf") .context("Can't create tmp path for in tmp")? .to_str() .unwrap() .to_string(); let out_vep = temp_file_path("_vep.txt") .context("Can't create tmp path for in tmp")? .to_str() .unwrap() .to_string(); let out_summary = format!("{out_vep}_summary.html"); let out_warnings = format!("{out_vep}_warnings.txt"); // Write input VCF let mut vcf = File::create(&in_tmp).context("Can't create input vcf file for VEP.")?; writeln!(vcf, "{}", header)?; for (i, mut row) in chunk.iter().cloned().enumerate() { row.id = (i + 1).to_string(); let s = row.into_vcf_row(); writeln!(vcf, "{s}",)?; } run_vep(&in_tmp, &out_vep).context("Error while running VEP.")?; let mut reader_vep = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader( fs::File::open(&out_vep).context("Can't open VEP result file.")?, ); let mut lines: HashMap> = HashMap::new(); for line in reader_vep.deserialize() { let line: VepLine = line.context("Failed to deserialize VepLine")?; let key = line .uploaded_variation .parse::() .context("Failed to parse uploaded_variation as u64")?; lines.entry(key).or_default().push(line); } fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?; let mut n_not_vep = 0; let mut chunk_results: Vec<(Hash128, Vec)> = Vec::new(); chunk.iter().enumerate().for_each(|(i, entry)| { let k = (i + 1) as u64; if let Some(vep_lines) = lines.get(&k) { if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() { chunk_results.push((entry.hash(), veps)); } } else { warn!( "No VEP entry for {}\t{}\t{}", entry.position.to_string(), entry.reference.to_string(), entry.alternative.to_string() ); n_not_vep += 1; } }); fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?; if n_not_vep > 0 { debug!("{n_not_vep} variants not annotated by VEP."); let warnings = fs::read_to_string(&out_warnings) .context(format!("Can't read VEP warnings: {out_warnings}"))?; warn!("VEP warnings:\n{warnings}"); } if Path::new(&out_warnings).exists() { fs::remove_file(&out_warnings) .context(format!("Can't remove file {out_warnings}"))?; } if Path::new(&out_summary).exists() { fs::remove_file(&out_summary) .context(format!("Can't remove file {out_summary}"))?; } Ok(chunk_results) }) .flatten() .collect::>(); results.extend(results_sv); } info!("{} total variants annotaded by VEP.", results.len()); for (hash, veps) in results { // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?; annotations .store .entry(hash) .or_default() .push(Annotation::VEP(veps)); } Ok(()) } pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> { let modified = chrono::Utc::now().to_rfc3339(); self.conn.execute( "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)", params![hash.to_bytes(), source, data, modified], )?; Ok(()) } } fn process_vep_chunk( chunk: &[VcfVariant], header: &str, ) -> anyhow::Result)>> { let in_tmp = temp_file_path("vcf")? .to_str() .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))? .to_string(); let out_vep = temp_file_path("_vep.txt")? .to_str() .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))? .to_string(); let out_summary = format!("{out_vep}_summary.html"); let out_warnings = format!("{out_vep}_warnings.txt"); let mut vcf = File::create(&in_tmp)?; // If this fails, the error is propagated. writeln!(vcf, "{}", header)?; // If this fails, the error is propagated. for (i, row) in chunk.iter().enumerate() { writeln!( vcf, "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.", row.position.contig(), row.position.position + 1, i + 1, row.reference, row.alternative )?; } if let Err(e) = run_vep(&in_tmp, &out_vep) { error!("VEP error: {e}"); return Err(anyhow::anyhow!("VEP execution failed: {}", e)); // Propagate the error. } let mut reader_vep = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(fs::File::open(&out_vep)?); // If this fails, the error is propagated. let mut lines: HashMap> = HashMap::new(); for line in reader_vep.deserialize() { let line: VepLine = line.context("Failed to deserialize VepLine")?; // Propagate the error. let key = line .uploaded_variation .parse::() .context("Failed to parse uploaded_variation as u64")?; // Propagate the error. lines.entry(key).or_default().push(line); } fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?; let mut n_not_vep = 0; let mut chunk_results: Vec<(Hash128, Vec)> = Vec::new(); chunk.iter().enumerate().for_each(|(i, entry)| { let k = (i + 1) as u64; if let Some(vep_lines) = lines.get(&k) { if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() { chunk_results.push((entry.hash(), veps)); } } else { warn!( "No VEP entry for {}\t{}\t{}", entry.position.to_string(), entry.reference.to_string(), entry.alternative.to_string() ); n_not_vep += 1; } }); fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?; if n_not_vep > 0 { debug!("{n_not_vep} variants not annotated by VEP."); let warnings = fs::read_to_string(&out_warnings) .context(format!("Can't read VEP warnings: {out_warnings}"))?; warn!("VEP warnings:\n{warnings}"); } if Path::new(&out_warnings).exists() { fs::remove_file(&out_warnings).context(format!("Can't remove file {out_warnings}"))?; } if Path::new(&out_summary).exists() { fs::remove_file(&out_summary).context(format!("Can't remove file {out_summary}"))?; } Ok(chunk_results) // Return the successful result. }