variant_collection.rs 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287
  1. use std::{
  2. collections::{HashMap, HashSet},
  3. fs::{self, File},
  4. io::Write,
  5. path::Path,
  6. };
  7. use anyhow::Context;
  8. use bgzip::{BGZFReader, BGZFWriter};
  9. use csv::ReaderBuilder;
  10. use log::{debug, error, info, warn};
  11. use rayon::prelude::*;
  12. use serde::{Deserialize, Serialize};
  13. use uuid::Uuid;
  14. use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant};
  15. use crate::{
  16. annotation::{
  17. cosmic::Cosmic,
  18. echtvar::{parse_echtvar_val, run_echtvar},
  19. gnomad::GnomAD,
  20. vep::{run_vep, VepLine, VEP},
  21. Annotation, Annotations,
  22. },
  23. collection::{
  24. bam::{counts_at, counts_ins_at},
  25. vcf::Vcf,
  26. },
  27. helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128},
  28. io::{fasta::sequence_at, readers::get_reader, vcf::vcf_header},
  29. positions::GenomePosition,
  30. };
  31. /// A collection of VCF variants along with associated metadata.
  32. ///
  33. /// This struct represents a set of variants from a VCF file, including
  34. /// the original VCF metadata and the caller annotation.
  35. ///
  36. /// # Fields
  37. /// * `variants` - A vector of VcfVariant instances representing individual variants
  38. /// * `vcf` - The Vcf struct containing metadata from the original VCF file
  39. /// * `caller` - An Annotation indicating the variant caller used
  40. ///
  41. /// # Derives
  42. /// This struct derives Debug and Clone traits, allowing for easy debugging
  43. /// and cloning of VariantCollection instances.
  44. ///
  45. /// # Usage
  46. /// This struct is typically used to group related variants from a single VCF file,
  47. /// preserving the context in which they were called and allowing for collective
  48. /// operations on the set of variants.
  49. #[derive(Debug, Clone)]
  50. pub struct VariantCollection {
  51. pub variants: Vec<VcfVariant>,
  52. pub vcf: Vcf,
  53. pub caller: Annotation,
  54. }
  55. impl VariantCollection {
  56. /// Returns a vector of hash keys for all variants in the collection.
  57. ///
  58. /// This method generates a unique hash for each variant in the collection
  59. /// and returns these hashes as a vector.
  60. ///
  61. /// # Returns
  62. /// A `Vec<Hash128>` containing the hash of each variant in the collection.
  63. ///
  64. /// # Performance
  65. /// This method iterates over all variants in the collection, so its time complexity
  66. /// is O(n) where n is the number of variants.
  67. ///
  68. /// # Usage
  69. /// This method is useful for obtaining unique identifiers for each variant,
  70. /// which can be used for indexing, comparison, or lookup operations.
  71. ///
  72. /// # Example
  73. /// ```
  74. /// let variant_keys = variant_collection.keys();
  75. /// for key in variant_keys {
  76. /// println!("Variant hash: {:?}", key);
  77. /// }
  78. /// ```
  79. pub fn keys(&self) -> Vec<Hash128> {
  80. self.variants.iter().map(|v| v.hash()).collect()
  81. }
  82. /// Retains only the variants whose hash keys are present in the provided set.
  83. ///
  84. /// This method filters the variants in the collection, keeping only those
  85. /// whose hash keys are found in the `keys_to_keep` set.
  86. ///
  87. /// # Arguments
  88. /// * `keys_to_keep` - A `HashSet<Hash128>` containing the hash keys of variants to retain
  89. ///
  90. /// # Effects
  91. /// This method modifies the `VariantCollection` in place, potentially reducing
  92. /// the number of variants it contains.
  93. ///
  94. /// # Performance
  95. /// The time complexity is O(n), where n is the number of variants in the collection.
  96. /// The space complexity is O(1) as it operates in place.
  97. ///
  98. /// # Usage
  99. /// This method is useful for filtering variants based on a pre-computed set of keys,
  100. /// which can be helpful in various scenarios such as:
  101. /// - Removing duplicates
  102. /// - Keeping only variants that meet certain criteria
  103. /// - Synchronizing variants across different collections
  104. ///
  105. /// # Example
  106. /// ```
  107. /// let mut variant_collection = VariantCollection::new();
  108. /// // ... populate variant_collection ...
  109. ///
  110. /// let keys_to_keep: HashSet<Hash128> = some_filtering_function();
  111. /// variant_collection.retain_keys(&keys_to_keep);
  112. /// ```
  113. pub fn retain_keys(&mut self, keys_to_keep: &HashSet<Hash128>) {
  114. self.variants.retain(|v| keys_to_keep.contains(&v.hash()));
  115. }
  116. /// Removes variants whose hash keys are present in the provided set.
  117. ///
  118. /// This method filters the variants in the collection, removing those
  119. /// whose hash keys are found in the `keys_to_remove` set.
  120. ///
  121. /// # Arguments
  122. /// * `keys_to_remove` - A `HashSet<Hash128>` containing the hash keys of variants to remove
  123. ///
  124. /// # Effects
  125. /// This method modifies the `VariantCollection` in place, potentially reducing
  126. /// the number of variants it contains.
  127. ///
  128. /// # Performance
  129. /// The time complexity is O(n), where n is the number of variants in the collection.
  130. /// The space complexity is O(1) as it operates in place.
  131. ///
  132. /// # Usage
  133. /// This method is useful for filtering out unwanted variants based on a pre-computed set of keys,
  134. /// which can be helpful in various scenarios such as:
  135. /// - Removing variants that fail certain criteria
  136. /// - Eliminating duplicates identified by their hash keys
  137. /// - Excluding variants present in another dataset
  138. ///
  139. /// # Example
  140. /// ```
  141. /// let mut variant_collection = VariantCollection::new();
  142. /// // ... populate variant_collection ...
  143. ///
  144. /// let keys_to_remove: HashSet<Hash128> = some_filtering_function();
  145. /// variant_collection.remove_keys(&keys_to_remove);
  146. /// ```
  147. pub fn remove_keys(&mut self, keys_to_remove: &HashSet<Hash128>) {
  148. self.variants
  149. .retain(|v| !keys_to_remove.contains(&v.hash()));
  150. }
  151. /// Partitions the VcfVariants into two sets based on a given predicate.
  152. ///
  153. /// This function splits the current VcfVariants instance into two new instances,
  154. /// where the variants are divided based on whether they satisfy the given predicate.
  155. ///
  156. /// # Arguments
  157. /// * `predicate` - A closure that takes a reference to a VcfVariant and returns a boolean
  158. ///
  159. /// # Returns
  160. /// A tuple of two VcfVariants instances:
  161. /// * The first contains all variants for which the predicate returned true
  162. /// * The second contains all variants for which the predicate returned false
  163. ///
  164. /// # Type Parameters
  165. /// * `F` - The type of the predicate function, which must implement Fn(&VcfVariant) -> bool
  166. ///
  167. /// # Behavior
  168. /// - Consumes the original VcfVariants instance
  169. /// - Creates two new VcfVariants instances with the partitioned data
  170. /// - The VCF and caller information are cloned for the first returned instance
  171. /// and moved into the second returned instance
  172. ///
  173. /// # Examples
  174. /// ```
  175. /// let (passing, failing) = vcf_variants.partition(|variant| variant.quality.map_or(false, |q| q > 30));
  176. /// ```
  177. pub fn partition<F>(self, predicate: F) -> (Self, Self)
  178. where
  179. F: Fn(&VcfVariant) -> bool,
  180. {
  181. let Self {
  182. variants,
  183. vcf,
  184. caller,
  185. } = self;
  186. let (left_data, right_data) = variants.into_iter().partition(predicate);
  187. (
  188. Self {
  189. variants: left_data,
  190. vcf: vcf.clone(),
  191. caller: caller.clone(),
  192. },
  193. Self {
  194. variants: right_data,
  195. vcf,
  196. caller,
  197. },
  198. )
  199. }
  200. fn chunk_size(&self, max_threads: u8) -> usize {
  201. let total_items = self.variants.len();
  202. let min_chunk_size = 1000;
  203. let max_chunks = max_threads;
  204. let optimal_chunk_size = total_items.div_ceil(max_chunks as usize);
  205. optimal_chunk_size.max(min_chunk_size)
  206. }
  207. /// Annotates variants with sequence entropy information.
  208. ///
  209. /// This function calculates and adds Shannon entropy annotations to the variants
  210. /// based on the surrounding sequence context. It processes variants in parallel
  211. /// chunks for improved performance.
  212. ///
  213. /// # Arguments
  214. /// * `annotations` - A reference to the Annotations structure to store the results
  215. /// * `reference` - Path to the reference FASTA file
  216. /// * `seq_len` - Length of the sequence context to consider for entropy calculation
  217. /// * `max_threads` - Maximum number of threads to use for parallel processing
  218. ///
  219. /// # Behavior
  220. /// - Processes variants in parallel chunks
  221. /// - For each variant, retrieves the surrounding sequence from the reference
  222. /// - Calculates Shannon entropy for the sequence
  223. /// - Adds the entropy value as an annotation to the variant
  224. /// - Skips variants that already have a Shannon entropy annotation
  225. ///
  226. /// # Panics
  227. /// This function will panic if it fails to build the FASTA reader from the provided reference path.
  228. pub fn annotate_with_sequence_entropy(
  229. &self,
  230. annotations: &Annotations,
  231. reference: &str,
  232. seq_len: usize,
  233. max_threads: u8,
  234. ) {
  235. self.variants
  236. .par_chunks(self.chunk_size(max_threads))
  237. .for_each(|chunk| {
  238. let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default()
  239. .build_from_path(reference)
  240. .unwrap();
  241. for c in chunk {
  242. let key = c.hash();
  243. let mut anns = annotations.store.entry(key).or_default();
  244. if !anns
  245. .iter()
  246. .any(|e| matches!(e, Annotation::ShannonEntropy(_)))
  247. {
  248. if let Ok(r) = sequence_at(
  249. &mut fasta_reader,
  250. &c.position.contig(),
  251. c.position.position as usize,
  252. seq_len,
  253. ) {
  254. let ent = estimate_shannon_entropy(r.as_str());
  255. anns.push(Annotation::ShannonEntropy(ent));
  256. }
  257. }
  258. }
  259. });
  260. }
  261. /// Annotates variants with information from a constitutional BAM file.
  262. ///
  263. /// This function processes variants in parallel chunks and adds annotations
  264. /// based on the read counts from a provided BAM file. It calculates and adds
  265. /// annotations for the number of reads supporting the alternative allele and
  266. /// the total read depth at each variant position.
  267. ///
  268. /// # Arguments
  269. /// * `annotations` - A reference to the Annotations structure to store the results
  270. /// * `constit_bam_path` - Path to the constituent BAM file
  271. /// * `max_threads` - Maximum number of threads to use for parallel processing
  272. ///
  273. /// # Returns
  274. /// * `Ok(())` if the annotation process completes successfully
  275. /// * `Err` if there's an error opening the BAM file or processing the variants
  276. ///
  277. /// # Behavior
  278. /// - Processes variants in parallel chunks
  279. /// - For each variant, retrieves read counts from the BAM file
  280. /// - Calculates the number of reads supporting the alternative allele and total depth
  281. /// - Adds ConstitAlt and ConstitDepth annotations to each variant
  282. /// - Skips variants that already have both ConstitAlt and ConstitDepth annotations
  283. /// - Handles insertions, deletions, and other alteration types differently
  284. ///
  285. /// # Errors
  286. /// This function will return an error if:
  287. /// - It fails to open the BAM file
  288. /// - There's an error while processing the variants or reading from the BAM file
  289. pub fn annotate_with_constit_bam(
  290. &self,
  291. annotations: &Annotations,
  292. constit_bam_path: &str,
  293. max_threads: u8,
  294. ) -> anyhow::Result<()> {
  295. self.variants
  296. .par_chunks(self.chunk_size(max_threads))
  297. .try_for_each(|chunk| {
  298. let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
  299. .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?;
  300. for var in chunk {
  301. let key = var.hash();
  302. let mut anns = annotations.store.entry(key).or_default();
  303. if anns
  304. .iter()
  305. .filter(|e| {
  306. matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
  307. })
  308. .count()
  309. != 2
  310. {
  311. let mut n_alt = 0;
  312. let mut depth = 0;
  313. let mut alt_seq = var.alternative.to_string();
  314. let c = if var.alteration_category() == AlterationCategory::INS {
  315. // TODO: Add stretch comparison
  316. alt_seq = alt_seq.split_off(1);
  317. counts_ins_at(&mut bam, &var.position.contig(), var.position.position)?
  318. } else if var.alteration_category() == AlterationCategory::DEL {
  319. alt_seq = "D".to_string();
  320. counts_at(&mut bam, &var.position.contig(), var.position.position + 1)?
  321. } else {
  322. counts_at(&mut bam, &var.position.contig(), var.position.position)?
  323. };
  324. for (seq, n) in c {
  325. if seq == alt_seq {
  326. n_alt = n;
  327. }
  328. if seq == *"D" && alt_seq != *"D" {
  329. continue;
  330. }
  331. depth += n;
  332. }
  333. anns.push(Annotation::ConstitAlt(n_alt as u16));
  334. anns.push(Annotation::ConstitDepth(depth as u16));
  335. }
  336. }
  337. anyhow::Ok(())
  338. })?;
  339. Ok(())
  340. }
  341. pub fn stats(&self) {
  342. println!(
  343. "{} {}\t{}",
  344. self.vcf.caller,
  345. self.vcf.time,
  346. self.variants.len()
  347. );
  348. }
  349. }
  350. /// Represents a consolidated genomic variant with associated information and annotations.
  351. ///
  352. /// This struct encapsulates comprehensive data for a single genomic variant,
  353. /// including its unique identifier, genomic position, alleles, related VCF entries,
  354. /// and various annotations.
  355. ///
  356. /// # Fields
  357. /// * `hash` - A unique `Hash128` identifier for the variant
  358. /// * `position` - The `GenomePosition` of the variant (0-based coordinates)
  359. /// * `reference` - The reference allele as a `ReferenceAlternative`
  360. /// * `alternative` - The alternative allele as a `ReferenceAlternative`
  361. /// * `vcf_variants` - A vector of `VcfVariant` instances associated with this variant
  362. /// * `annotations` - A vector of `Annotation` instances providing additional variant information
  363. ///
  364. /// # Derives
  365. /// * `Debug` - For easier debugging and logging
  366. /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON)
  367. /// * `Clone` - For creating deep copies of the variant
  368. ///
  369. /// # Usage
  370. /// This struct is central to variant analysis, providing a unified representation
  371. /// of a variant that may combine data from multiple VCF entries and include
  372. /// various annotations. It's useful for advanced variant processing, filtering,
  373. /// and reporting in genomic analysis pipelines.
  374. ///
  375. /// # Example
  376. /// ```
  377. /// let variant = Variant {
  378. /// hash: Hash128::new(/* ... */),
  379. /// position: GenomePosition { contig: 1, position: 1000 },
  380. /// reference: ReferenceAlternative::new("A"),
  381. /// alternative: ReferenceAlternative::new("T"),
  382. /// vcf_variants: vec![/* VcfVariant instances */],
  383. /// annotations: vec![/* Annotation instances */],
  384. /// };
  385. /// ```
  386. #[derive(Debug, Serialize, Deserialize, Clone)]
  387. pub struct Variant {
  388. pub hash: Hash128,
  389. pub position: GenomePosition,
  390. pub reference: ReferenceAlternative,
  391. pub alternative: ReferenceAlternative,
  392. pub vcf_variants: Vec<VcfVariant>,
  393. pub annotations: Vec<Annotation>,
  394. }
  395. impl PartialEq for Variant {
  396. fn eq(&self, other: &Self) -> bool {
  397. self.position == other.position
  398. && self.reference == other.reference
  399. && self.alternative == other.alternative
  400. }
  401. }
  402. impl Variant {
  403. pub fn vep(&self) -> Vec<VEP> {
  404. self.annotations
  405. .iter()
  406. .flat_map(|a| {
  407. if let Annotation::VEP(v) = a {
  408. v.to_vec()
  409. } else {
  410. vec![]
  411. }
  412. })
  413. .collect()
  414. }
  415. pub fn alteration_category(&self) -> Vec<AlterationCategory> {
  416. self.vcf_variants
  417. .iter()
  418. .map(|v| v.alteration_category())
  419. .collect::<HashSet<_>>()
  420. .into_iter()
  421. .collect()
  422. }
  423. }
  424. /// A collection of genomic variants.
  425. ///
  426. /// This struct represents a set of `Variant` instances, providing a container
  427. /// for multiple genomic variants.
  428. ///
  429. /// # Fields
  430. /// * `data` - A vector of `Variant` instances
  431. ///
  432. /// # Derives
  433. /// * `Debug` - For easier debugging and logging
  434. /// * `Default` - Allows creation of an empty `Variants` instance
  435. /// * `Serialize`, `Deserialize` - For serialization and deserialization (e.g., JSON)
  436. /// * `Clone` - For creating deep copies of the collection
  437. ///
  438. /// # Usage
  439. /// This struct is typically used to group related variants, such as those from
  440. /// a single sample or analysis run. It provides a convenient way to handle and
  441. /// process multiple variants as a single unit.
  442. ///
  443. /// # Example
  444. /// ```
  445. /// let mut variants = Variants::default();
  446. /// variants.data.push(Variant {
  447. /// // ... variant details ...
  448. /// });
  449. /// // Process or analyze the collection of variants
  450. /// ```
  451. ///
  452. /// # Note
  453. /// The `Default` implementation creates an empty vector of variants.
  454. #[derive(Debug, Default, Serialize, Deserialize, Clone)]
  455. pub struct Variants {
  456. pub data: Vec<Variant>,
  457. }
  458. impl Variants {
  459. /// Sorts the variants in-place based on their genomic positions.
  460. ///
  461. /// This method uses an unstable sort to order the variants in the collection
  462. /// according to their genomic positions. The sorting is done in ascending order,
  463. /// first by contig and then by position within the contig.
  464. ///
  465. /// # Effects
  466. /// Modifies the order of variants in the `data` vector in-place.
  467. ///
  468. /// # Performance
  469. /// Uses `sort_unstable_by` for better performance, with a time complexity of O(n log n),
  470. /// where n is the number of variants.
  471. ///
  472. /// # Example
  473. /// ```
  474. /// let mut variants = Variants {
  475. /// data: vec![
  476. /// Variant { position: GenomePosition { contig: 0, position: 100 }, .. },
  477. /// Variant { position: GenomePosition { contig: 0, position: 50 }, .. },
  478. /// Variant { position: GenomePosition { contig: 1, position: 75 }, .. },
  479. /// ]
  480. /// };
  481. /// variants.sort();
  482. /// // Variants are now sorted by position
  483. /// ```
  484. pub fn sort(&mut self) {
  485. self.data
  486. .sort_unstable_by(|a, b| a.position.cmp(&b.position));
  487. }
  488. /// Merges this Variants collection with another VariantCollection, combining overlapping variants.
  489. ///
  490. /// This method performs a merge operation, combining the current Variants with a VariantCollection.
  491. /// It uses a two-pointer technique to efficiently merge the sorted collections, handling overlapping
  492. /// variants and creating new variants as necessary.
  493. ///
  494. /// # Arguments
  495. /// * `others` - A VariantCollection to merge with the current Variants
  496. /// * `annotations` - A reference to Annotations used when creating new variants
  497. ///
  498. /// # Effects
  499. /// * Modifies the current Variants in-place, replacing its data with the merged result
  500. /// * Drains the data from the current Variants during the merge process
  501. ///
  502. /// # Behavior
  503. /// * Variants are compared based on their genomic positions
  504. /// * When positions are equal, variants with matching reference and alternative alleles are merged
  505. /// * Non-matching variants at the same position are kept separate
  506. /// * The method logs the number of merged variants
  507. ///
  508. /// # Performance
  509. /// * Time complexity: O(n + m), where n and m are the sizes of the two collections
  510. /// * Space complexity: O(n + m) for the merged result
  511. ///
  512. /// # Note
  513. /// This method assumes that both the current Variants and the input VariantCollection are sorted
  514. /// by genomic position. Use the `sort` method before merging if necessary.
  515. ///
  516. /// # Example
  517. /// ```
  518. /// let mut variants1 = Variants { /* ... */ };
  519. /// let variants2 = VariantCollection { /* ... */ };
  520. /// let annotations = Annotations::new();
  521. /// variants1.merge(variants2, &annotations);
  522. /// // variants1 now contains the merged data
  523. /// ```
  524. pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) {
  525. let mut result = Vec::new();
  526. let mut n_merged = 0;
  527. let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data
  528. let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants
  529. // Merge using two-pointer technique
  530. while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek())
  531. {
  532. match self_variant.position.cmp(&other_variant.position) {
  533. std::cmp::Ordering::Less => {
  534. result.push(self_iter.next().unwrap());
  535. }
  536. std::cmp::Ordering::Greater => {
  537. result.push(create_variant(
  538. vec![others_iter.next().unwrap()],
  539. annotations,
  540. ));
  541. }
  542. std::cmp::Ordering::Equal => {
  543. let self_variant = self_iter.next().unwrap();
  544. let other_variant = others_iter.next().unwrap();
  545. if self_variant.reference == other_variant.reference
  546. && self_variant.alternative == other_variant.alternative
  547. {
  548. let mut merged_variant = self_variant;
  549. merged_variant.vcf_variants.push(other_variant);
  550. n_merged += 1;
  551. result.push(merged_variant);
  552. } else {
  553. result.push(self_variant);
  554. result.push(create_variant(vec![other_variant], annotations));
  555. }
  556. }
  557. }
  558. }
  559. // Drain remaining elements from iterators
  560. result.extend(self_iter);
  561. result.extend(others_iter.map(|v| create_variant(vec![v], annotations)));
  562. info!("n merged: {}", n_merged);
  563. self.data = result;
  564. }
  565. /// Filters and returns variants of a specific alteration category.
  566. ///
  567. /// This method creates a new vector containing clones of all variants
  568. /// in the collection that match the specified alteration category.
  569. ///
  570. /// # Arguments
  571. /// * `cat` - An `AlterationCategory` enum value specifying the category to filter by
  572. ///
  573. /// # Returns
  574. /// A `Vec<Variant>` containing clones of all matching variants
  575. ///
  576. /// # Performance
  577. /// * Time complexity: O(n), where n is the number of variants in the collection
  578. /// * Space complexity: O(m), where m is the number of matching variants
  579. ///
  580. /// # Example
  581. /// ```
  582. /// let variants = Variants { /* ... */ };
  583. /// let snvs = variants.get_alteration_cat(AlterationCategory::SNV);
  584. /// println!("Found {} SNVs", snvs.len());
  585. /// ```
  586. ///
  587. /// # Note
  588. /// This method clones matching variants. If you need to process a large number of variants
  589. /// and don't require ownership, consider implementing an iterator-based approach instead.
  590. pub fn get_alteration_cat(&self, cat: AlterationCategory) -> Vec<Variant> {
  591. self.data
  592. .iter()
  593. .filter(|v| v.alteration_category().contains(&cat))
  594. .cloned()
  595. .collect()
  596. }
  597. /// Saves the Variants collection to a BGZF-compressed JSON file.
  598. ///
  599. /// This method serializes the Variants struct to JSON format and writes it to a file
  600. /// using BGZF (Blocked GNU Zip Format) compression.
  601. ///
  602. /// # Arguments
  603. /// * `filename` - A string slice that holds the name of the file to write to
  604. ///
  605. /// # Returns
  606. /// * `Ok(())` if the operation is successful
  607. /// * `Err(anyhow::Error)` if any error occurs during file creation, serialization, or writing
  608. ///
  609. /// # Errors
  610. /// This function will return an error if:
  611. /// * The file cannot be created
  612. /// * The Variants struct cannot be serialized to JSON
  613. /// * The BGZF writer cannot be closed properly
  614. ///
  615. /// # Performance
  616. /// The time and space complexity depends on the size of the Variants collection
  617. /// and the efficiency of the JSON serialization and BGZF compression.
  618. ///
  619. /// # Example
  620. /// ```
  621. /// let variants = Variants { /* ... */ };
  622. /// match variants.save_to_json("output.json.gz") {
  623. /// Ok(()) => println!("Successfully saved variants"),
  624. /// Err(e) => eprintln!("Failed to save variants: {}", e),
  625. /// }
  626. /// ```
  627. ///
  628. /// # Note
  629. /// This method uses BGZF compression, which is compatible with standard gzip decompression.
  630. /// The resulting file can be read using standard gzip-aware tools.
  631. pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> {
  632. let file = File::create(filename)
  633. .with_context(|| format!("Failed to create file: {}", filename))?;
  634. let mut writer = BGZFWriter::new(file, bgzip::Compression::default());
  635. serde_json::to_writer(&mut writer, self)
  636. .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?;
  637. writer
  638. .close()
  639. .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?;
  640. debug!("Successfully saved variants to {}", filename);
  641. Ok(())
  642. }
  643. /// Loads a Variants collection from a BGZF-compressed JSON file.
  644. ///
  645. /// This method reads a BGZF-compressed file, deserializes its JSON content,
  646. /// and constructs a new Variants instance from it.
  647. ///
  648. /// # Arguments
  649. /// * `filename` - A string slice that holds the name of the file to read from
  650. ///
  651. /// # Returns
  652. /// * `Ok(Self)` containing the deserialized Variants if successful
  653. /// * `Err(anyhow::Error)` if any error occurs during file opening, reading, or deserialization
  654. ///
  655. /// # Errors
  656. /// This function will return an error if:
  657. /// * The file cannot be opened
  658. /// * A BGZF reader cannot be created for the file
  659. /// * The file content cannot be deserialized into a Variants struct
  660. ///
  661. /// # Performance
  662. /// The time and space complexity depends on the size of the input file
  663. /// and the efficiency of the JSON deserialization and BGZF decompression.
  664. ///
  665. /// # Example
  666. /// ```
  667. /// match Variants::load_from_json("input.json.gz") {
  668. /// Ok(variants) => println!("Successfully loaded {} variants", variants.data.len()),
  669. /// Err(e) => eprintln!("Failed to load variants: {}", e),
  670. /// }
  671. /// ```
  672. ///
  673. /// # Note
  674. /// This method expects the input file to be in BGZF-compressed JSON format,
  675. /// typically created by the `save_to_json` method of this struct.
  676. pub fn load_from_json(filename: &str) -> anyhow::Result<Self> {
  677. let file =
  678. File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?;
  679. let mut reader = BGZFReader::new(file)
  680. .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?;
  681. let variants: Self = serde_json::from_reader(&mut reader)
  682. .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?;
  683. debug!("Successfully loaded variants from {}", filename);
  684. Ok(variants)
  685. }
  686. }
  687. /// Creates a new Variant instance from a collection of VcfVariants and annotations.
  688. ///
  689. /// This function consolidates information from one or more VcfVariants into a single Variant,
  690. /// using the first VcfVariant as the primary source for most fields. It also retrieves
  691. /// and includes relevant annotations.
  692. ///
  693. /// # Arguments
  694. /// * `vcf_variants` - A vector of VcfVariant instances to be consolidated
  695. /// * `annotations` - A reference to an Annotations structure containing annotation data
  696. ///
  697. /// # Returns
  698. /// A new Variant instance
  699. ///
  700. /// # Behavior
  701. /// - Uses the first VcfVariant in the vector as the source for hash, position, reference, and alternative
  702. /// - Includes all provided VcfVariants in the new Variant
  703. /// - Retrieves annotations for the variant based on its hash
  704. /// - If no annotations are found, an empty vector is used
  705. ///
  706. /// # Panics
  707. /// This function will panic if `vcf_variants` is empty.
  708. ///
  709. /// # Example
  710. /// ```
  711. /// let vcf_variants = vec![VcfVariant { /* ... */ }];
  712. /// let annotations = Annotations::new();
  713. /// let variant = create_variant(vcf_variants, &annotations);
  714. /// ```
  715. ///
  716. /// # Note
  717. /// This function assumes that all VcfVariants in the input vector represent the same genomic variant
  718. /// and should be consolidated. It's the caller's responsibility to ensure this is the case.
  719. fn create_variant(vcf_variants: Vec<VcfVariant>, annotations: &Annotations) -> Variant {
  720. let first = &vcf_variants[0];
  721. let annotations = annotations
  722. .store
  723. .get(&first.hash)
  724. .map(|v| v.value().to_vec())
  725. .unwrap_or_default();
  726. Variant {
  727. hash: first.hash,
  728. position: first.position.clone(),
  729. reference: first.reference.clone(),
  730. alternative: first.alternative.clone(),
  731. vcf_variants,
  732. annotations,
  733. }
  734. }
  735. pub enum ExtAnnotationSource {
  736. Cosmic(Cosmic),
  737. GnomAD(GnomAD),
  738. }
  739. use rusqlite::{params, Connection, Result as SqliteResult};
  740. pub struct ExternalAnnotation {
  741. pub conn: Connection,
  742. }
  743. impl ExternalAnnotation {
  744. pub fn init() -> anyhow::Result<Self> {
  745. let mut db_path = app_storage_dir()?;
  746. db_path.push("annotations_db.sqlite");
  747. info!("Opening DB: {}", db_path.display());
  748. let conn = Connection::open(db_path)?;
  749. conn.execute(
  750. "CREATE TABLE IF NOT EXISTS annotations (
  751. id INTEGER PRIMARY KEY AUTOINCREMENT,
  752. hash BLOB(16) UNIQUE NOT NULL,
  753. source TEXT NOT NULL,
  754. data BLOB,
  755. modified TEXT NOT NULL
  756. )",
  757. [],
  758. )?;
  759. Ok(Self { conn })
  760. }
  761. pub fn annotate(
  762. &self,
  763. variants: &[VcfVariant],
  764. annotations: &Annotations,
  765. ) -> anyhow::Result<()> {
  766. let mut unfound = Vec::new();
  767. for variant in variants {
  768. let hash = variant.hash();
  769. let mut has_pushed = false;
  770. // Check COSMIC
  771. match self.get_annotation(hash, "COSMIC")? {
  772. Some(cosmic) => {
  773. annotations
  774. .store
  775. .entry(hash)
  776. .or_default()
  777. .push(Annotation::Cosmic(cosmic));
  778. }
  779. None => {
  780. unfound.push(variant.clone());
  781. has_pushed = true;
  782. }
  783. }
  784. // Check GnomAD
  785. match self.get_annotation(hash, "GnomAD")? {
  786. Some(gnomad) => {
  787. annotations
  788. .store
  789. .entry(hash)
  790. .or_default()
  791. .push(Annotation::GnomAD(gnomad));
  792. }
  793. None => {
  794. if !has_pushed {
  795. unfound.push(variant.clone())
  796. }
  797. }
  798. }
  799. }
  800. if !unfound.is_empty() {
  801. self.process_unfound_echtvar(unfound, annotations)?;
  802. }
  803. Ok(())
  804. }
  805. fn get_annotation<T: serde::de::DeserializeOwned>(
  806. &self,
  807. hash: Hash128,
  808. source: &str,
  809. ) -> anyhow::Result<Option<T>> {
  810. let result: SqliteResult<Vec<u8>> = self.conn.query_row(
  811. "SELECT data FROM annotations WHERE hash = ? AND source = ?",
  812. params![hash.to_bytes(), source],
  813. |row| row.get(0),
  814. );
  815. match result {
  816. Ok(data) => Ok(Some(serde_json::from_slice(&data)?)),
  817. Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
  818. Err(e) => Err(e.into()),
  819. }
  820. }
  821. pub fn process_unfound_echtvar(
  822. &self,
  823. mut unfound: Vec<VcfVariant>,
  824. annotations: &Annotations,
  825. ) -> anyhow::Result<()> {
  826. let temp_dir = std::env::temp_dir();
  827. unfound.par_sort();
  828. unfound.dedup();
  829. let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
  830. let min_chunk_size = 1000;
  831. let max_chunks = 150;
  832. let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
  833. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  834. let results = unfound
  835. .par_chunks(optimal_chunk_size)
  836. .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
  837. let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
  838. let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
  839. // Write input VCF
  840. let mut vcf = File::create(&in_tmp)?;
  841. writeln!(vcf, "{}", header)?;
  842. for (i, row) in chunk.iter().enumerate() {
  843. writeln!(
  844. vcf,
  845. "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
  846. row.position.contig(),
  847. row.position.position + 1, // vcf
  848. i + 1,
  849. row.reference,
  850. row.alternative
  851. )?;
  852. }
  853. // Run echtvar
  854. run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?;
  855. fs::remove_file(in_tmp)?;
  856. // Parse echtvar output
  857. let mut reader = ReaderBuilder::new()
  858. .delimiter(b'\t')
  859. .has_headers(false)
  860. .comment(Some(b'#'))
  861. .flexible(true)
  862. .from_reader(get_reader(out_tmp.to_str().unwrap())?);
  863. let mut chunk_results = Vec::new();
  864. for (i, result) in reader.deserialize::<crate::io::vcf::VCFRow>().enumerate() {
  865. let row = result?;
  866. // Verify that the ID corresponds to the input
  867. let id: usize = row.id.parse().context("Failed to parse ID")?;
  868. if id != i + 1 {
  869. return Err(anyhow::anyhow!(
  870. "Echtvar output ID {} does not match expected ID {}",
  871. id,
  872. i + 1
  873. ));
  874. }
  875. let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
  876. let hash = chunk[i].hash();
  877. chunk_results.push((hash, cosmic, gnomad));
  878. }
  879. fs::remove_file(out_tmp)?;
  880. Ok(chunk_results)
  881. })
  882. .flatten()
  883. .collect::<Vec<_>>();
  884. for (hash, cosmic, gnomad) in results {
  885. // let modified = chrono::Utc::now().to_rfc3339();
  886. // Update COSMIC
  887. // self.conn.execute(
  888. // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
  889. // params![
  890. // hash.to_le_bytes().to_vec(),
  891. // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()),
  892. // modified
  893. // ],
  894. // )?;
  895. //
  896. // // Update GnomAD
  897. // self.conn.execute(
  898. // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
  899. // params![
  900. // hash.to_le_bytes().to_vec(),
  901. // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()),
  902. // modified
  903. // ],
  904. // )?;
  905. // Update in-memory annotations
  906. if let Some(c) = cosmic {
  907. annotations
  908. .store
  909. .entry(hash)
  910. .or_default()
  911. .push(Annotation::Cosmic(c));
  912. }
  913. if let Some(g) = gnomad {
  914. annotations
  915. .store
  916. .entry(hash)
  917. .or_default()
  918. .push(Annotation::GnomAD(g));
  919. }
  920. }
  921. Ok(())
  922. }
  923. pub fn annotate_vep(
  924. &self,
  925. variants: &[VcfVariant],
  926. annotations: &Annotations,
  927. ) -> anyhow::Result<()> {
  928. let mut unfound = Vec::new();
  929. for variant in variants {
  930. let hash = variant.hash();
  931. // Check VEP
  932. match self.get_annotation(hash, "VEP")? {
  933. Some(veps) => {
  934. annotations
  935. .store
  936. .entry(hash)
  937. .or_default()
  938. .push(Annotation::VEP(veps));
  939. }
  940. None => {
  941. unfound.push(variant.clone());
  942. }
  943. }
  944. }
  945. if !unfound.is_empty() {
  946. self.process_unfound_vep(unfound, annotations)?;
  947. }
  948. Ok(())
  949. }
  950. pub fn process_unfound_vep(
  951. &self,
  952. mut unfound: Vec<VcfVariant>,
  953. annotations: &Annotations,
  954. ) -> anyhow::Result<()> {
  955. unfound.par_sort();
  956. unfound.dedup();
  957. info!("{} variants to annotate with VEP.", unfound.len());
  958. let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
  959. let (sv, unfound): (Vec<VcfVariant>, Vec<VcfVariant>) =
  960. unfound.into_iter().partition(|v| v.has_svtype());
  961. warn!("SV {}", sv.len());
  962. let min_chunk_size = 1000;
  963. let max_chunks = 150;
  964. let mut results: Vec<(Hash128, Vec<VEP>)> = if !unfound.is_empty() {
  965. let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
  966. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  967. debug!("{} chunks to process.", unfound.len() / optimal_chunk_size);
  968. unfound
  969. .par_chunks(optimal_chunk_size)
  970. .enumerate()
  971. .map(|(chunk_i, chunk)| {
  972. debug!("Processing chunk {chunk_i}");
  973. process_vep_chunk(chunk, &header).map_err(|e| {
  974. error!("Error processing chunk {chunk_i}: {e}");
  975. e
  976. })
  977. })
  978. .collect::<Result<Vec<_>, _>>()? // Collect results into a Result<Vec<_>>
  979. .into_iter()
  980. .flatten()
  981. .collect::<Vec<_>>()
  982. } else {
  983. Vec::new()
  984. };
  985. if !sv.is_empty() {
  986. let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
  987. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  988. let results_sv = sv
  989. .par_chunks(optimal_chunk_size)
  990. .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
  991. let in_tmp = temp_file_path(".vcf")
  992. .context("Can't create tmp path for in tmp")?
  993. .to_str()
  994. .unwrap()
  995. .to_string();
  996. let out_vep = temp_file_path("_vep.txt")
  997. .context("Can't create tmp path for in tmp")?
  998. .to_str()
  999. .unwrap()
  1000. .to_string();
  1001. let out_summary = format!("{out_vep}_summary.html");
  1002. let out_warnings = format!("{out_vep}_warnings.txt");
  1003. // Write input VCF
  1004. let mut vcf =
  1005. File::create(&in_tmp).context("Can't create input vcf file for VEP.")?;
  1006. writeln!(vcf, "{}", header)?;
  1007. for (i, mut row) in chunk.iter().cloned().enumerate() {
  1008. row.id = (i + 1).to_string();
  1009. let s = row.into_vcf_row();
  1010. writeln!(vcf, "{s}",)?;
  1011. }
  1012. run_vep(&in_tmp, &out_vep).context("Error while running VEP.")?;
  1013. let mut reader_vep = ReaderBuilder::new()
  1014. .delimiter(b'\t')
  1015. .has_headers(false)
  1016. .comment(Some(b'#'))
  1017. .flexible(true)
  1018. .from_reader(
  1019. fs::File::open(&out_vep).context("Can't open VEP result file.")?,
  1020. );
  1021. let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
  1022. for line in reader_vep.deserialize() {
  1023. let line: VepLine = line.context("Failed to deserialize VepLine")?;
  1024. let key = line
  1025. .uploaded_variation
  1026. .parse::<u64>()
  1027. .context("Failed to parse uploaded_variation as u64")?;
  1028. lines.entry(key).or_default().push(line);
  1029. }
  1030. fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
  1031. let mut n_not_vep = 0;
  1032. let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
  1033. chunk.iter().enumerate().for_each(|(i, entry)| {
  1034. let k = (i + 1) as u64;
  1035. if let Some(vep_lines) = lines.get(&k) {
  1036. if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
  1037. chunk_results.push((entry.hash(), veps));
  1038. }
  1039. } else {
  1040. warn!(
  1041. "No VEP entry for {}\t{}\t{}",
  1042. entry.position.to_string(),
  1043. entry.reference.to_string(),
  1044. entry.alternative.to_string()
  1045. );
  1046. n_not_vep += 1;
  1047. }
  1048. });
  1049. fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
  1050. if n_not_vep > 0 {
  1051. debug!("{n_not_vep} variants not annotated by VEP.");
  1052. let warnings = fs::read_to_string(&out_warnings)
  1053. .context(format!("Can't read VEP warnings: {out_warnings}"))?;
  1054. warn!("VEP warnings:\n{warnings}");
  1055. }
  1056. if Path::new(&out_warnings).exists() {
  1057. fs::remove_file(&out_warnings)
  1058. .context(format!("Can't remove file {out_warnings}"))?;
  1059. }
  1060. if Path::new(&out_summary).exists() {
  1061. fs::remove_file(&out_summary)
  1062. .context(format!("Can't remove file {out_summary}"))?;
  1063. }
  1064. Ok(chunk_results)
  1065. })
  1066. .flatten()
  1067. .collect::<Vec<_>>();
  1068. results.extend(results_sv);
  1069. }
  1070. info!("{} total variants annotaded by VEP.", results.len());
  1071. for (hash, veps) in results {
  1072. // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
  1073. annotations
  1074. .store
  1075. .entry(hash)
  1076. .or_default()
  1077. .push(Annotation::VEP(veps));
  1078. }
  1079. Ok(())
  1080. }
  1081. pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> {
  1082. let modified = chrono::Utc::now().to_rfc3339();
  1083. self.conn.execute(
  1084. "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)",
  1085. params![hash.to_bytes(), source, data, modified],
  1086. )?;
  1087. Ok(())
  1088. }
  1089. }
  1090. fn process_vep_chunk(
  1091. chunk: &[VcfVariant],
  1092. header: &str,
  1093. ) -> anyhow::Result<Vec<(Hash128, Vec<VEP>)>> {
  1094. let in_tmp = temp_file_path("vcf")?
  1095. .to_str()
  1096. .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
  1097. .to_string();
  1098. let out_vep = temp_file_path("_vep.txt")?
  1099. .to_str()
  1100. .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
  1101. .to_string();
  1102. let out_summary = format!("{out_vep}_summary.html");
  1103. let out_warnings = format!("{out_vep}_warnings.txt");
  1104. let mut vcf = File::create(&in_tmp)?; // If this fails, the error is propagated.
  1105. writeln!(vcf, "{}", header)?; // If this fails, the error is propagated.
  1106. for (i, row) in chunk.iter().enumerate() {
  1107. writeln!(
  1108. vcf,
  1109. "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
  1110. row.position.contig(),
  1111. row.position.position + 1,
  1112. i + 1,
  1113. row.reference,
  1114. row.alternative
  1115. )?;
  1116. }
  1117. if let Err(e) = run_vep(&in_tmp, &out_vep) {
  1118. error!("VEP error: {e}");
  1119. return Err(anyhow::anyhow!("VEP execution failed: {}", e)); // Propagate the error.
  1120. }
  1121. let mut reader_vep = ReaderBuilder::new()
  1122. .delimiter(b'\t')
  1123. .has_headers(false)
  1124. .comment(Some(b'#'))
  1125. .flexible(true)
  1126. .from_reader(fs::File::open(&out_vep)?); // If this fails, the error is propagated.
  1127. let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
  1128. for line in reader_vep.deserialize() {
  1129. let line: VepLine = line.context("Failed to deserialize VepLine")?; // Propagate the error.
  1130. let key = line
  1131. .uploaded_variation
  1132. .parse::<u64>()
  1133. .context("Failed to parse uploaded_variation as u64")?; // Propagate the error.
  1134. lines.entry(key).or_default().push(line);
  1135. }
  1136. fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
  1137. let mut n_not_vep = 0;
  1138. let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
  1139. chunk.iter().enumerate().for_each(|(i, entry)| {
  1140. let k = (i + 1) as u64;
  1141. if let Some(vep_lines) = lines.get(&k) {
  1142. if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
  1143. chunk_results.push((entry.hash(), veps));
  1144. }
  1145. } else {
  1146. warn!(
  1147. "No VEP entry for {}\t{}\t{}",
  1148. entry.position.to_string(),
  1149. entry.reference.to_string(),
  1150. entry.alternative.to_string()
  1151. );
  1152. n_not_vep += 1;
  1153. }
  1154. });
  1155. fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
  1156. if n_not_vep > 0 {
  1157. debug!("{n_not_vep} variants not annotated by VEP.");
  1158. let warnings = fs::read_to_string(&out_warnings)
  1159. .context(format!("Can't read VEP warnings: {out_warnings}"))?;
  1160. warn!("VEP warnings:\n{warnings}");
  1161. }
  1162. if Path::new(&out_warnings).exists() {
  1163. fs::remove_file(&out_warnings).context(format!("Can't remove file {out_warnings}"))?;
  1164. }
  1165. if Path::new(&out_summary).exists() {
  1166. fs::remove_file(&out_summary).context(format!("Can't remove file {out_summary}"))?;
  1167. }
  1168. Ok(chunk_results) // Return the successful result.
  1169. }