variant_collection.rs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. use std::{
  2. collections::{HashMap, HashSet},
  3. fs::{self, File},
  4. io::Write,
  5. };
  6. use anyhow::Context;
  7. use bgzip::{BGZFReader, BGZFWriter};
  8. use csv::ReaderBuilder;
  9. use log::{debug, info, warn};
  10. use rayon::prelude::*;
  11. use serde::{Deserialize, Serialize};
  12. use uuid::Uuid;
  13. use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant};
  14. use crate::{
  15. annotation::{
  16. cosmic::Cosmic,
  17. echtvar::{parse_echtvar_val, run_echtvar},
  18. gnomad::GnomAD,
  19. vep::{run_vep, VepLine, VEP},
  20. Annotation, Annotations,
  21. },
  22. collection::{
  23. bam::{counts_at, counts_ins_at},
  24. vcf::Vcf,
  25. },
  26. helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128},
  27. io::{readers::get_reader, vcf::vcf_header},
  28. pipes::somatic::sequence_at,
  29. positions::GenomePosition,
  30. };
  31. #[derive(Debug, Clone)]
  32. pub struct VariantCollection {
  33. pub variants: Vec<VcfVariant>,
  34. pub vcf: Vcf,
  35. pub caller: Annotation,
  36. }
  37. impl VariantCollection {
  38. pub fn keys(&self) -> Vec<Hash128> {
  39. self.variants.iter().map(|v| v.hash()).collect()
  40. }
  41. pub fn retain_keys(&mut self, keys_to_keep: &HashSet<Hash128>) {
  42. self.variants.retain(|v| keys_to_keep.contains(&v.hash()));
  43. }
  44. pub fn remove_keys(&mut self, keys_to_remove: &HashSet<Hash128>) {
  45. self.variants
  46. .retain(|v| !keys_to_remove.contains(&v.hash()));
  47. }
  48. pub fn partition<F>(self, predicate: F) -> (Self, Self)
  49. where
  50. F: Fn(&VcfVariant) -> bool,
  51. {
  52. let Self {
  53. variants,
  54. vcf,
  55. caller,
  56. } = self;
  57. let (left_data, right_data) = variants.into_iter().partition(predicate);
  58. (
  59. Self {
  60. variants: left_data,
  61. vcf: vcf.clone(),
  62. caller: caller.clone(),
  63. },
  64. Self {
  65. variants: right_data,
  66. vcf,
  67. caller,
  68. },
  69. )
  70. }
  71. pub fn chunk_size(&self, max_threads: u8) -> usize {
  72. let total_items = self.variants.len();
  73. let min_chunk_size = 1000;
  74. let max_chunks = max_threads;
  75. let optimal_chunk_size = total_items.div_ceil(max_chunks as usize);
  76. optimal_chunk_size.max(min_chunk_size)
  77. }
  78. pub fn annotate_with_sequence_entropy(
  79. &self,
  80. annotations: &Annotations,
  81. reference: &str,
  82. seq_len: usize,
  83. max_threads: u8,
  84. ) {
  85. self.variants
  86. .par_chunks(self.chunk_size(max_threads))
  87. .for_each(|chunk| {
  88. let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default()
  89. .build_from_path(reference)
  90. .unwrap();
  91. for c in chunk {
  92. let key = c.hash();
  93. let mut anns = annotations.store.entry(key).or_default();
  94. if !anns
  95. .iter()
  96. .any(|e| matches!(e, Annotation::ShannonEntropy(_)))
  97. {
  98. if let Ok(r) = sequence_at(
  99. &mut fasta_reader,
  100. &c.position.contig(),
  101. c.position.position as usize,
  102. seq_len,
  103. ) {
  104. let ent = estimate_shannon_entropy(r.as_str());
  105. anns.push(Annotation::ShannonEntropy(ent));
  106. }
  107. }
  108. }
  109. });
  110. }
  111. pub fn annotate_with_constit_bam(
  112. &self,
  113. annotations: &Annotations,
  114. constit_bam_path: &str,
  115. max_threads: u8,
  116. ) -> anyhow::Result<()> {
  117. self.variants
  118. .par_chunks(self.chunk_size(max_threads))
  119. .try_for_each(|chunk| {
  120. let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
  121. .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?;
  122. for var in chunk {
  123. let key = var.hash();
  124. let mut anns = annotations.store.entry(key).or_default();
  125. if anns
  126. .iter()
  127. .filter(|e| {
  128. matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
  129. })
  130. .count()
  131. != 2
  132. {
  133. let mut n_alt = 0;
  134. let mut depth = 0;
  135. let mut alt_seq = var.alternative.to_string();
  136. let c = if var.alteration_category() == AlterationCategory::INS {
  137. // TODO: Add stretch comparison
  138. alt_seq = alt_seq.split_off(1);
  139. counts_ins_at(&mut bam, &var.position.contig(), var.position.position)?
  140. } else if var.alteration_category() == AlterationCategory::DEL {
  141. alt_seq = "D".to_string();
  142. counts_at(&mut bam, &var.position.contig(), var.position.position + 1)?
  143. } else {
  144. counts_at(&mut bam, &var.position.contig(), var.position.position)?
  145. };
  146. for (seq, n) in c {
  147. if seq == alt_seq {
  148. n_alt = n;
  149. }
  150. if seq == *"D" && alt_seq != *"D" {
  151. continue;
  152. }
  153. depth += n;
  154. }
  155. anns.push(Annotation::ConstitAlt(n_alt as u16));
  156. anns.push(Annotation::ConstitDepth(depth as u16));
  157. }
  158. }
  159. anyhow::Ok(())
  160. })?;
  161. Ok(())
  162. }
  163. pub fn stats(&self) {
  164. println!(
  165. "{} {}\t{}",
  166. self.vcf.caller,
  167. self.vcf.time,
  168. self.variants.len()
  169. );
  170. }
  171. }
  172. #[derive(Debug, Serialize, Deserialize)]
  173. pub struct Variant {
  174. pub hash: Hash128,
  175. pub position: GenomePosition,
  176. pub reference: ReferenceAlternative,
  177. pub alternative: ReferenceAlternative,
  178. pub vcf_variants: Vec<VcfVariant>,
  179. pub annotations: Vec<Annotation>,
  180. }
  181. impl PartialEq for Variant {
  182. fn eq(&self, other: &Self) -> bool {
  183. self.position == other.position
  184. && self.reference == other.reference
  185. && self.alternative == other.alternative
  186. }
  187. }
  188. #[derive(Debug, Default, Serialize, Deserialize)]
  189. pub struct Variants {
  190. pub data: Vec<Variant>,
  191. }
  192. impl Variants {
  193. pub fn sort(&mut self) {
  194. self.data
  195. .sort_unstable_by(|a, b| a.position.cmp(&b.position));
  196. }
  197. pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) {
  198. let mut result = Vec::new();
  199. let mut n_merged = 0;
  200. let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data
  201. let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants
  202. // Merge using two-pointer technique
  203. while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek())
  204. {
  205. match self_variant.position.cmp(&other_variant.position) {
  206. std::cmp::Ordering::Less => {
  207. result.push(self_iter.next().unwrap());
  208. }
  209. std::cmp::Ordering::Greater => {
  210. result.push(create_variant(
  211. vec![others_iter.next().unwrap()],
  212. annotations,
  213. ));
  214. }
  215. std::cmp::Ordering::Equal => {
  216. let self_variant = self_iter.next().unwrap();
  217. let other_variant = others_iter.next().unwrap();
  218. if self_variant.reference == other_variant.reference
  219. && self_variant.alternative == other_variant.alternative
  220. {
  221. let mut merged_variant = self_variant;
  222. merged_variant.vcf_variants.push(other_variant);
  223. n_merged += 1;
  224. result.push(merged_variant);
  225. } else {
  226. result.push(self_variant);
  227. result.push(create_variant(vec![other_variant], annotations));
  228. }
  229. }
  230. }
  231. }
  232. // Drain remaining elements from iterators
  233. result.extend(self_iter);
  234. result.extend(others_iter.map(|v| create_variant(vec![v], annotations)));
  235. info!("n merged: {}", n_merged);
  236. self.data = result;
  237. }
  238. pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> {
  239. let file = File::create(filename)
  240. .with_context(|| format!("Failed to create file: {}", filename))?;
  241. let mut writer = BGZFWriter::new(file, bgzip::Compression::default());
  242. serde_json::to_writer(&mut writer, self)
  243. .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?;
  244. writer
  245. .close()
  246. .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?;
  247. debug!("Successfully saved variants to {}", filename);
  248. Ok(())
  249. }
  250. pub fn load_from_json(filename: &str) -> anyhow::Result<Self> {
  251. let file =
  252. File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?;
  253. let mut reader = BGZFReader::new(file)
  254. .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?;
  255. let variants: Self = serde_json::from_reader(&mut reader)
  256. .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?;
  257. debug!("Successfully loaded variants from {}", filename);
  258. Ok(variants)
  259. }
  260. }
  261. fn create_variant(vcf_variants: Vec<VcfVariant>, annotations: &Annotations) -> Variant {
  262. let first = &vcf_variants[0];
  263. let annotations = annotations
  264. .store
  265. .get(&first.hash)
  266. .map(|v| v.value().to_vec())
  267. .unwrap_or_default();
  268. Variant {
  269. hash: first.hash,
  270. position: first.position.clone(),
  271. reference: first.reference.clone(),
  272. alternative: first.alternative.clone(),
  273. vcf_variants,
  274. annotations,
  275. }
  276. }
  277. pub enum ExtAnnotationSource {
  278. Cosmic(Cosmic),
  279. GnomAD(GnomAD),
  280. }
  281. use rusqlite::{params, Connection, Result as SqliteResult};
  282. pub struct ExternalAnnotation {
  283. pub conn: Connection,
  284. }
  285. impl ExternalAnnotation {
  286. pub fn init() -> anyhow::Result<Self> {
  287. let mut db_path = app_storage_dir()?;
  288. db_path.push("annotations_db.sqlite");
  289. info!("Opening DB: {}", db_path.display());
  290. let conn = Connection::open(db_path)?;
  291. conn.execute(
  292. "CREATE TABLE IF NOT EXISTS annotations (
  293. id INTEGER PRIMARY KEY AUTOINCREMENT,
  294. hash BLOB(16) UNIQUE NOT NULL,
  295. source TEXT NOT NULL,
  296. data BLOB,
  297. modified TEXT NOT NULL
  298. )",
  299. [],
  300. )?;
  301. Ok(Self { conn })
  302. }
  303. pub fn annotate(
  304. &self,
  305. variants: &[VcfVariant],
  306. annotations: &Annotations,
  307. ) -> anyhow::Result<()> {
  308. let mut unfound = Vec::new();
  309. for variant in variants {
  310. let hash = variant.hash();
  311. let mut has_pushed = false;
  312. // Check COSMIC
  313. match self.get_annotation(hash, "COSMIC")? {
  314. Some(cosmic) => {
  315. annotations
  316. .store
  317. .entry(hash)
  318. .or_default()
  319. .push(Annotation::Cosmic(cosmic));
  320. }
  321. None => {
  322. unfound.push(variant.clone());
  323. has_pushed = true;
  324. }
  325. }
  326. // Check GnomAD
  327. match self.get_annotation(hash, "GnomAD")? {
  328. Some(gnomad) => {
  329. annotations
  330. .store
  331. .entry(hash)
  332. .or_default()
  333. .push(Annotation::GnomAD(gnomad));
  334. }
  335. None => {
  336. if !has_pushed {
  337. unfound.push(variant.clone())
  338. }
  339. }
  340. }
  341. }
  342. if !unfound.is_empty() {
  343. self.process_unfound_echtvar(unfound, annotations)?;
  344. }
  345. Ok(())
  346. }
  347. fn get_annotation<T: serde::de::DeserializeOwned>(
  348. &self,
  349. hash: Hash128,
  350. source: &str,
  351. ) -> anyhow::Result<Option<T>> {
  352. let result: SqliteResult<Vec<u8>> = self.conn.query_row(
  353. "SELECT data FROM annotations WHERE hash = ? AND source = ?",
  354. params![hash.to_bytes(), source],
  355. |row| row.get(0),
  356. );
  357. match result {
  358. Ok(data) => Ok(Some(serde_json::from_slice(&data)?)),
  359. Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
  360. Err(e) => Err(e.into()),
  361. }
  362. }
  363. pub fn process_unfound_echtvar(
  364. &self,
  365. mut unfound: Vec<VcfVariant>,
  366. annotations: &Annotations,
  367. ) -> anyhow::Result<()> {
  368. let temp_dir = std::env::temp_dir();
  369. unfound.par_sort();
  370. unfound.dedup();
  371. let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
  372. let min_chunk_size = 1000;
  373. let max_chunks = 150;
  374. let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
  375. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  376. let results = unfound
  377. .par_chunks(optimal_chunk_size)
  378. .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
  379. let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
  380. let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
  381. // Write input VCF
  382. let mut vcf = File::create(&in_tmp)?;
  383. writeln!(vcf, "{}", header)?;
  384. for (i, row) in chunk.iter().enumerate() {
  385. writeln!(
  386. vcf,
  387. "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
  388. row.position.contig(),
  389. row.position.position + 1, // vcf
  390. i + 1,
  391. row.reference,
  392. row.alternative
  393. )?;
  394. }
  395. // Run echtvar
  396. run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?;
  397. fs::remove_file(in_tmp)?;
  398. // Parse echtvar output
  399. let mut reader = ReaderBuilder::new()
  400. .delimiter(b'\t')
  401. .has_headers(false)
  402. .comment(Some(b'#'))
  403. .flexible(true)
  404. .from_reader(get_reader(out_tmp.to_str().unwrap())?);
  405. let mut chunk_results = Vec::new();
  406. for (i, result) in reader.deserialize::<crate::io::vcf::VCFRow>().enumerate() {
  407. let row = result?;
  408. // Verify that the ID corresponds to the input
  409. let id: usize = row.id.parse().context("Failed to parse ID")?;
  410. if id != i + 1 {
  411. return Err(anyhow::anyhow!(
  412. "Echtvar output ID {} does not match expected ID {}",
  413. id,
  414. i + 1
  415. ));
  416. }
  417. let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
  418. let hash = chunk[i].hash();
  419. chunk_results.push((hash, cosmic, gnomad));
  420. }
  421. fs::remove_file(out_tmp)?;
  422. Ok(chunk_results)
  423. })
  424. .flatten()
  425. .collect::<Vec<_>>();
  426. for (hash, cosmic, gnomad) in results {
  427. // let modified = chrono::Utc::now().to_rfc3339();
  428. // Update COSMIC
  429. // self.conn.execute(
  430. // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
  431. // params![
  432. // hash.to_le_bytes().to_vec(),
  433. // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()),
  434. // modified
  435. // ],
  436. // )?;
  437. //
  438. // // Update GnomAD
  439. // self.conn.execute(
  440. // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
  441. // params![
  442. // hash.to_le_bytes().to_vec(),
  443. // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()),
  444. // modified
  445. // ],
  446. // )?;
  447. // Update in-memory annotations
  448. if let Some(c) = cosmic {
  449. annotations
  450. .store
  451. .entry(hash)
  452. .or_default()
  453. .push(Annotation::Cosmic(c));
  454. }
  455. if let Some(g) = gnomad {
  456. annotations
  457. .store
  458. .entry(hash)
  459. .or_default()
  460. .push(Annotation::GnomAD(g));
  461. }
  462. }
  463. Ok(())
  464. }
  465. pub fn annotate_vep(
  466. &self,
  467. variants: &[VcfVariant],
  468. annotations: &Annotations,
  469. ) -> anyhow::Result<()> {
  470. let mut unfound = Vec::new();
  471. for variant in variants {
  472. let hash = variant.hash();
  473. // Check VEP
  474. match self.get_annotation(hash, "VEP")? {
  475. Some(veps) => {
  476. annotations
  477. .store
  478. .entry(hash)
  479. .or_default()
  480. .push(Annotation::VEP(veps));
  481. }
  482. None => {
  483. unfound.push(variant.clone());
  484. }
  485. }
  486. }
  487. if !unfound.is_empty() {
  488. self.process_unfound_vep(unfound, annotations)?;
  489. }
  490. Ok(())
  491. }
  492. pub fn process_unfound_vep(
  493. &self,
  494. mut unfound: Vec<VcfVariant>,
  495. annotations: &Annotations,
  496. ) -> anyhow::Result<()> {
  497. unfound.par_sort();
  498. unfound.dedup();
  499. let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
  500. let (sv, unfound): (Vec<VcfVariant>, Vec<VcfVariant>) =
  501. unfound.into_iter().partition(|v| v.has_svtype());
  502. warn!("SV {}", sv.len());
  503. let min_chunk_size = 1000;
  504. let max_chunks = 150;
  505. let mut results = if !unfound.is_empty() {
  506. let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
  507. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  508. unfound
  509. .par_chunks(optimal_chunk_size)
  510. .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
  511. let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
  512. let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
  513. let out_summary = format!("{out_vep}_summary.html");
  514. let out_warnings = format!("{out_vep}_warnings.txt");
  515. // Write input VCF
  516. let mut vcf = File::create(&in_tmp)?;
  517. writeln!(vcf, "{}", header)?;
  518. for (i, row) in chunk.iter().enumerate() {
  519. writeln!(
  520. vcf,
  521. "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
  522. row.position.contig(),
  523. row.position.position + 1, // vcf
  524. i + 1,
  525. row.reference,
  526. row.alternative
  527. )?;
  528. }
  529. run_vep(&in_tmp, &out_vep)?;
  530. let mut reader_vep = ReaderBuilder::new()
  531. .delimiter(b'\t')
  532. .has_headers(false)
  533. .comment(Some(b'#'))
  534. .flexible(true)
  535. .from_reader(fs::File::open(out_vep.clone())?);
  536. let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
  537. for line in reader_vep.deserialize() {
  538. let line: VepLine = line.context("Failed to deserialize VepLine")?;
  539. let key = line
  540. .uploaded_variation
  541. .parse::<u64>()
  542. .context("Failed to parse uploaded_variation as u64")?;
  543. lines.entry(key).or_default().push(line);
  544. }
  545. fs::remove_file(in_tmp)?;
  546. fs::remove_file(out_vep)?;
  547. let mut n_not_vep = 0;
  548. let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
  549. chunk.iter().enumerate().for_each(|(i, entry)| {
  550. let k = (i + 1) as u64;
  551. if let Some(vep_lines) = lines.get(&k) {
  552. if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
  553. chunk_results.push((entry.hash(), veps));
  554. }
  555. } else {
  556. warn!(
  557. "No VEP entry for {}:{}>{}",
  558. entry.position.to_string(),
  559. entry.reference.to_string(),
  560. entry.alternative.to_string()
  561. );
  562. n_not_vep += 1;
  563. }
  564. });
  565. if n_not_vep > 0 {
  566. debug!("{n_not_vep} variants not annotated by VEP.");
  567. let warnings = fs::read_to_string(&out_warnings)
  568. .context(format!("Can't read VEP warnings: {out_warnings}"))?;
  569. warn!("VEP warnings:\n{warnings}");
  570. }
  571. fs::remove_file(out_warnings)?;
  572. fs::remove_file(out_summary)?;
  573. Ok(chunk_results)
  574. })
  575. .flatten()
  576. .collect::<Vec<_>>()
  577. } else {
  578. Vec::new()
  579. };
  580. if !sv.is_empty() {
  581. let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
  582. let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
  583. let results_sv = sv
  584. .par_chunks(optimal_chunk_size)
  585. .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
  586. let in_tmp = temp_file_path(".vcf")?.to_str().unwrap().to_string();
  587. let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
  588. let out_summary = format!("{out_vep}_summary.html");
  589. let out_warnings = format!("{out_vep}_warnings.txt");
  590. // Write input VCF
  591. let mut vcf = File::create(&in_tmp)?;
  592. writeln!(vcf, "{}", header)?;
  593. for (i, mut row) in chunk.iter().cloned().enumerate() {
  594. row.id = (i + 1).to_string();
  595. let s = row.into_vcf_row();
  596. writeln!(vcf, "{s}",)?;
  597. }
  598. run_vep(&in_tmp, &out_vep)?;
  599. let mut reader_vep = ReaderBuilder::new()
  600. .delimiter(b'\t')
  601. .has_headers(false)
  602. .comment(Some(b'#'))
  603. .flexible(true)
  604. .from_reader(fs::File::open(out_vep.clone())?);
  605. let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
  606. for line in reader_vep.deserialize() {
  607. let line: VepLine = line.context("Failed to deserialize VepLine")?;
  608. let key = line
  609. .uploaded_variation
  610. .parse::<u64>()
  611. .context("Failed to parse uploaded_variation as u64")?;
  612. lines.entry(key).or_default().push(line);
  613. }
  614. fs::remove_file(in_tmp)?;
  615. fs::remove_file(out_vep)?;
  616. let mut n_not_vep = 0;
  617. let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
  618. chunk.iter().enumerate().for_each(|(i, entry)| {
  619. let k = (i + 1) as u64;
  620. if let Some(vep_lines) = lines.get(&k) {
  621. if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
  622. chunk_results.push((entry.hash(), veps));
  623. }
  624. } else {
  625. warn!(
  626. "No VEP entry for {}\t{}\t{}",
  627. entry.position.to_string(),
  628. entry.reference.to_string(),
  629. entry.alternative.to_string()
  630. );
  631. n_not_vep += 1;
  632. }
  633. });
  634. if n_not_vep > 0 {
  635. debug!("{n_not_vep} variants not annotated by VEP.");
  636. let warnings = fs::read_to_string(&out_warnings)
  637. .context(format!("Can't read VEP warnings: {out_warnings}"))?;
  638. warn!("VEP warnings:\n{warnings}");
  639. }
  640. fs::remove_file(out_warnings)?;
  641. fs::remove_file(out_summary)?;
  642. Ok(chunk_results)
  643. })
  644. .flatten()
  645. .collect::<Vec<_>>();
  646. results.extend(results_sv);
  647. }
  648. for (hash, veps) in results {
  649. // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
  650. annotations
  651. .store
  652. .entry(hash)
  653. .or_default()
  654. .push(Annotation::VEP(veps));
  655. }
  656. Ok(())
  657. }
  658. pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> {
  659. let modified = chrono::Utc::now().to_rfc3339();
  660. self.conn.execute(
  661. "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)",
  662. params![hash.to_bytes(), source, data, modified],
  663. )?;
  664. Ok(())
  665. }
  666. }