| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776 |
- use std::{
- collections::{HashMap, HashSet},
- fs::{self, File},
- io::Write,
- };
- use anyhow::Context;
- use bgzip::{BGZFReader, BGZFWriter};
- use csv::ReaderBuilder;
- use log::{debug, info, warn};
- use rayon::prelude::*;
- use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant};
- use crate::{
- annotation::{
- cosmic::Cosmic,
- echtvar::{parse_echtvar_val, run_echtvar},
- gnomad::GnomAD,
- vep::{run_vep, VepLine, VEP},
- Annotation, Annotations,
- },
- collection::{
- bam::{counts_at, counts_ins_at},
- vcf::Vcf,
- },
- helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128},
- io::{readers::get_reader, vcf::vcf_header},
- pipes::somatic::sequence_at,
- positions::GenomePosition,
- };
- #[derive(Debug, Clone)]
- pub struct VariantCollection {
- pub variants: Vec<VcfVariant>,
- pub vcf: Vcf,
- pub caller: Annotation,
- }
- impl VariantCollection {
- pub fn keys(&self) -> Vec<Hash128> {
- self.variants.iter().map(|v| v.hash()).collect()
- }
- pub fn retain_keys(&mut self, keys_to_keep: &HashSet<Hash128>) {
- self.variants.retain(|v| keys_to_keep.contains(&v.hash()));
- }
- pub fn remove_keys(&mut self, keys_to_remove: &HashSet<Hash128>) {
- self.variants
- .retain(|v| !keys_to_remove.contains(&v.hash()));
- }
- pub fn partition<F>(self, predicate: F) -> (Self, Self)
- where
- F: Fn(&VcfVariant) -> bool,
- {
- let Self {
- variants,
- vcf,
- caller,
- } = self;
- let (left_data, right_data) = variants.into_iter().partition(predicate);
- (
- Self {
- variants: left_data,
- vcf: vcf.clone(),
- caller: caller.clone(),
- },
- Self {
- variants: right_data,
- vcf,
- caller,
- },
- )
- }
- pub fn chunk_size(&self, max_threads: u8) -> usize {
- let total_items = self.variants.len();
- let min_chunk_size = 1000;
- let max_chunks = max_threads;
- let optimal_chunk_size = total_items.div_ceil(max_chunks as usize);
- optimal_chunk_size.max(min_chunk_size)
- }
- pub fn annotate_with_sequence_entropy(
- &self,
- annotations: &Annotations,
- reference: &str,
- seq_len: usize,
- max_threads: u8,
- ) {
- self.variants
- .par_chunks(self.chunk_size(max_threads))
- .for_each(|chunk| {
- let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default()
- .build_from_path(reference)
- .unwrap();
- for c in chunk {
- let key = c.hash();
- let mut anns = annotations.store.entry(key).or_default();
- if !anns
- .iter()
- .any(|e| matches!(e, Annotation::ShannonEntropy(_)))
- {
- if let Ok(r) = sequence_at(
- &mut fasta_reader,
- &c.position.contig(),
- c.position.position as usize,
- seq_len,
- ) {
- let ent = estimate_shannon_entropy(r.as_str());
- anns.push(Annotation::ShannonEntropy(ent));
- }
- }
- }
- });
- }
- pub fn annotate_with_constit_bam(
- &self,
- annotations: &Annotations,
- constit_bam_path: &str,
- max_threads: u8,
- ) -> anyhow::Result<()> {
- self.variants
- .par_chunks(self.chunk_size(max_threads))
- .try_for_each(|chunk| {
- let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
- .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?;
- for var in chunk {
- let key = var.hash();
- let mut anns = annotations.store.entry(key).or_default();
- if anns
- .iter()
- .filter(|e| {
- matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
- })
- .count()
- != 2
- {
- let mut n_alt = 0;
- let mut depth = 0;
- let mut alt_seq = var.alternative.to_string();
- let c = if var.alteration_category() == AlterationCategory::INS {
- // TODO: Add stretch comparison
- alt_seq = alt_seq.split_off(1);
- counts_ins_at(&mut bam, &var.position.contig(), var.position.position)?
- } else if var.alteration_category() == AlterationCategory::DEL {
- alt_seq = "D".to_string();
- counts_at(&mut bam, &var.position.contig(), var.position.position + 1)?
- } else {
- counts_at(&mut bam, &var.position.contig(), var.position.position)?
- };
- for (seq, n) in c {
- if seq == alt_seq {
- n_alt = n;
- }
- if seq == *"D" && alt_seq != *"D" {
- continue;
- }
- depth += n;
- }
- anns.push(Annotation::ConstitAlt(n_alt as u16));
- anns.push(Annotation::ConstitDepth(depth as u16));
- }
- }
- anyhow::Ok(())
- })?;
- Ok(())
- }
- pub fn stats(&self) {
- println!(
- "{} {}\t{}",
- self.vcf.caller,
- self.vcf.time,
- self.variants.len()
- );
- }
- }
- #[derive(Debug, Serialize, Deserialize)]
- pub struct Variant {
- pub hash: Hash128,
- pub position: GenomePosition,
- pub reference: ReferenceAlternative,
- pub alternative: ReferenceAlternative,
- pub vcf_variants: Vec<VcfVariant>,
- pub annotations: Vec<Annotation>,
- }
- impl PartialEq for Variant {
- fn eq(&self, other: &Self) -> bool {
- self.position == other.position
- && self.reference == other.reference
- && self.alternative == other.alternative
- }
- }
- #[derive(Debug, Default, Serialize, Deserialize)]
- pub struct Variants {
- pub data: Vec<Variant>,
- }
- impl Variants {
- pub fn sort(&mut self) {
- self.data
- .sort_unstable_by(|a, b| a.position.cmp(&b.position));
- }
- pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) {
- let mut result = Vec::new();
- let mut n_merged = 0;
- let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data
- let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants
- // Merge using two-pointer technique
- while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek())
- {
- match self_variant.position.cmp(&other_variant.position) {
- std::cmp::Ordering::Less => {
- result.push(self_iter.next().unwrap());
- }
- std::cmp::Ordering::Greater => {
- result.push(create_variant(
- vec![others_iter.next().unwrap()],
- annotations,
- ));
- }
- std::cmp::Ordering::Equal => {
- let self_variant = self_iter.next().unwrap();
- let other_variant = others_iter.next().unwrap();
- if self_variant.reference == other_variant.reference
- && self_variant.alternative == other_variant.alternative
- {
- let mut merged_variant = self_variant;
- merged_variant.vcf_variants.push(other_variant);
- n_merged += 1;
- result.push(merged_variant);
- } else {
- result.push(self_variant);
- result.push(create_variant(vec![other_variant], annotations));
- }
- }
- }
- }
- // Drain remaining elements from iterators
- result.extend(self_iter);
- result.extend(others_iter.map(|v| create_variant(vec![v], annotations)));
- info!("n merged: {}", n_merged);
- self.data = result;
- }
- pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> {
- let file = File::create(filename)
- .with_context(|| format!("Failed to create file: {}", filename))?;
- let mut writer = BGZFWriter::new(file, bgzip::Compression::default());
- serde_json::to_writer(&mut writer, self)
- .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?;
- writer
- .close()
- .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?;
- debug!("Successfully saved variants to {}", filename);
- Ok(())
- }
- pub fn load_from_json(filename: &str) -> anyhow::Result<Self> {
- let file =
- File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?;
- let mut reader = BGZFReader::new(file)
- .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?;
- let variants: Self = serde_json::from_reader(&mut reader)
- .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?;
- debug!("Successfully loaded variants from {}", filename);
- Ok(variants)
- }
- }
- fn create_variant(vcf_variants: Vec<VcfVariant>, annotations: &Annotations) -> Variant {
- let first = &vcf_variants[0];
- let annotations = annotations
- .store
- .get(&first.hash)
- .map(|v| v.value().to_vec())
- .unwrap_or_default();
- Variant {
- hash: first.hash,
- position: first.position.clone(),
- reference: first.reference.clone(),
- alternative: first.alternative.clone(),
- vcf_variants,
- annotations,
- }
- }
- pub enum ExtAnnotationSource {
- Cosmic(Cosmic),
- GnomAD(GnomAD),
- }
- use rusqlite::{params, Connection, Result as SqliteResult};
- pub struct ExternalAnnotation {
- pub conn: Connection,
- }
- impl ExternalAnnotation {
- pub fn init() -> anyhow::Result<Self> {
- let mut db_path = app_storage_dir()?;
- db_path.push("annotations_db.sqlite");
- info!("Opening DB: {}", db_path.display());
- let conn = Connection::open(db_path)?;
- conn.execute(
- "CREATE TABLE IF NOT EXISTS annotations (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- hash BLOB(16) UNIQUE NOT NULL,
- source TEXT NOT NULL,
- data BLOB,
- modified TEXT NOT NULL
- )",
- [],
- )?;
- Ok(Self { conn })
- }
- pub fn annotate(
- &self,
- variants: &[VcfVariant],
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let mut unfound = Vec::new();
- for variant in variants {
- let hash = variant.hash();
- let mut has_pushed = false;
- // Check COSMIC
- match self.get_annotation(hash, "COSMIC")? {
- Some(cosmic) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::Cosmic(cosmic));
- }
- None => {
- unfound.push(variant.clone());
- has_pushed = true;
- }
- }
- // Check GnomAD
- match self.get_annotation(hash, "GnomAD")? {
- Some(gnomad) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::GnomAD(gnomad));
- }
- None => {
- if !has_pushed {
- unfound.push(variant.clone())
- }
- }
- }
- }
- if !unfound.is_empty() {
- self.process_unfound_echtvar(unfound, annotations)?;
- }
- Ok(())
- }
- fn get_annotation<T: serde::de::DeserializeOwned>(
- &self,
- hash: Hash128,
- source: &str,
- ) -> anyhow::Result<Option<T>> {
- let result: SqliteResult<Vec<u8>> = self.conn.query_row(
- "SELECT data FROM annotations WHERE hash = ? AND source = ?",
- params![hash.to_bytes(), source],
- |row| row.get(0),
- );
- match result {
- Ok(data) => Ok(Some(serde_json::from_slice(&data)?)),
- Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
- Err(e) => Err(e.into()),
- }
- }
- pub fn process_unfound_echtvar(
- &self,
- mut unfound: Vec<VcfVariant>,
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let temp_dir = std::env::temp_dir();
- unfound.par_sort();
- unfound.dedup();
- let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
- let min_chunk_size = 1000;
- let max_chunks = 150;
- let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- let results = unfound
- .par_chunks(optimal_chunk_size)
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
- let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
- let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
- // Write input VCF
- let mut vcf = File::create(&in_tmp)?;
- writeln!(vcf, "{}", header)?;
- for (i, row) in chunk.iter().enumerate() {
- writeln!(
- vcf,
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
- row.position.contig(),
- row.position.position + 1, // vcf
- i + 1,
- row.reference,
- row.alternative
- )?;
- }
- // Run echtvar
- run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?;
- fs::remove_file(in_tmp)?;
- // Parse echtvar output
- let mut reader = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(get_reader(out_tmp.to_str().unwrap())?);
- let mut chunk_results = Vec::new();
- for (i, result) in reader.deserialize::<crate::io::vcf::VCFRow>().enumerate() {
- let row = result?;
- // Verify that the ID corresponds to the input
- let id: usize = row.id.parse().context("Failed to parse ID")?;
- if id != i + 1 {
- return Err(anyhow::anyhow!(
- "Echtvar output ID {} does not match expected ID {}",
- id,
- i + 1
- ));
- }
- let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
- let hash = chunk[i].hash();
- chunk_results.push((hash, cosmic, gnomad));
- }
- fs::remove_file(out_tmp)?;
- Ok(chunk_results)
- })
- .flatten()
- .collect::<Vec<_>>();
- for (hash, cosmic, gnomad) in results {
- // let modified = chrono::Utc::now().to_rfc3339();
- // Update COSMIC
- // self.conn.execute(
- // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
- // params![
- // hash.to_le_bytes().to_vec(),
- // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()),
- // modified
- // ],
- // )?;
- //
- // // Update GnomAD
- // self.conn.execute(
- // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)",
- // params![
- // hash.to_le_bytes().to_vec(),
- // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()),
- // modified
- // ],
- // )?;
- // Update in-memory annotations
- if let Some(c) = cosmic {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::Cosmic(c));
- }
- if let Some(g) = gnomad {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::GnomAD(g));
- }
- }
- Ok(())
- }
- pub fn annotate_vep(
- &self,
- variants: &[VcfVariant],
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- let mut unfound = Vec::new();
- for variant in variants {
- let hash = variant.hash();
- // Check VEP
- match self.get_annotation(hash, "VEP")? {
- Some(veps) => {
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::VEP(veps));
- }
- None => {
- unfound.push(variant.clone());
- }
- }
- }
- if !unfound.is_empty() {
- self.process_unfound_vep(unfound, annotations)?;
- }
- Ok(())
- }
- pub fn process_unfound_vep(
- &self,
- mut unfound: Vec<VcfVariant>,
- annotations: &Annotations,
- ) -> anyhow::Result<()> {
- unfound.par_sort();
- unfound.dedup();
- let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
- let (sv, unfound): (Vec<VcfVariant>, Vec<VcfVariant>) =
- unfound.into_iter().partition(|v| v.has_svtype());
- warn!("SV {}", sv.len());
- let min_chunk_size = 1000;
- let max_chunks = 150;
- let mut results = if !unfound.is_empty() {
- let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- unfound
- .par_chunks(optimal_chunk_size)
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
- let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
- let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
- let out_summary = format!("{out_vep}_summary.html");
- let out_warnings = format!("{out_vep}_warnings.txt");
- // Write input VCF
- let mut vcf = File::create(&in_tmp)?;
- writeln!(vcf, "{}", header)?;
- for (i, row) in chunk.iter().enumerate() {
- writeln!(
- vcf,
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
- row.position.contig(),
- row.position.position + 1, // vcf
- i + 1,
- row.reference,
- row.alternative
- )?;
- }
- run_vep(&in_tmp, &out_vep)?;
- let mut reader_vep = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(fs::File::open(out_vep.clone())?);
- let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
- for line in reader_vep.deserialize() {
- let line: VepLine = line.context("Failed to deserialize VepLine")?;
- let key = line
- .uploaded_variation
- .parse::<u64>()
- .context("Failed to parse uploaded_variation as u64")?;
- lines.entry(key).or_default().push(line);
- }
- fs::remove_file(in_tmp)?;
- fs::remove_file(out_vep)?;
- let mut n_not_vep = 0;
- let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
- chunk.iter().enumerate().for_each(|(i, entry)| {
- let k = (i + 1) as u64;
- if let Some(vep_lines) = lines.get(&k) {
- if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
- chunk_results.push((entry.hash(), veps));
- }
- } else {
- warn!(
- "No VEP entry for {}:{}>{}",
- entry.position.to_string(),
- entry.reference.to_string(),
- entry.alternative.to_string()
- );
- n_not_vep += 1;
- }
- });
- if n_not_vep > 0 {
- debug!("{n_not_vep} variants not annotated by VEP.");
- let warnings = fs::read_to_string(&out_warnings)
- .context(format!("Can't read VEP warnings: {out_warnings}"))?;
- warn!("VEP warnings:\n{warnings}");
- }
- fs::remove_file(out_warnings)?;
- fs::remove_file(out_summary)?;
- Ok(chunk_results)
- })
- .flatten()
- .collect::<Vec<_>>()
- } else {
- Vec::new()
- };
- if !sv.is_empty() {
- let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
- let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
- let results_sv = sv
- .par_chunks(optimal_chunk_size)
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
- let in_tmp = temp_file_path(".vcf")?.to_str().unwrap().to_string();
- let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
- let out_summary = format!("{out_vep}_summary.html");
- let out_warnings = format!("{out_vep}_warnings.txt");
- // Write input VCF
- let mut vcf = File::create(&in_tmp)?;
- writeln!(vcf, "{}", header)?;
- for (i, mut row) in chunk.iter().cloned().enumerate() {
- row.id = (i + 1).to_string();
- let s = row.into_vcf_row();
- writeln!(vcf, "{s}",)?;
- }
- run_vep(&in_tmp, &out_vep)?;
- let mut reader_vep = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(false)
- .comment(Some(b'#'))
- .flexible(true)
- .from_reader(fs::File::open(out_vep.clone())?);
- let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
- for line in reader_vep.deserialize() {
- let line: VepLine = line.context("Failed to deserialize VepLine")?;
- let key = line
- .uploaded_variation
- .parse::<u64>()
- .context("Failed to parse uploaded_variation as u64")?;
- lines.entry(key).or_default().push(line);
- }
- fs::remove_file(in_tmp)?;
- fs::remove_file(out_vep)?;
- let mut n_not_vep = 0;
- let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
- chunk.iter().enumerate().for_each(|(i, entry)| {
- let k = (i + 1) as u64;
- if let Some(vep_lines) = lines.get(&k) {
- if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
- chunk_results.push((entry.hash(), veps));
- }
- } else {
- warn!(
- "No VEP entry for {}\t{}\t{}",
- entry.position.to_string(),
- entry.reference.to_string(),
- entry.alternative.to_string()
- );
- n_not_vep += 1;
- }
- });
- if n_not_vep > 0 {
- debug!("{n_not_vep} variants not annotated by VEP.");
- let warnings = fs::read_to_string(&out_warnings)
- .context(format!("Can't read VEP warnings: {out_warnings}"))?;
- warn!("VEP warnings:\n{warnings}");
- }
- fs::remove_file(out_warnings)?;
- fs::remove_file(out_summary)?;
- Ok(chunk_results)
- })
- .flatten()
- .collect::<Vec<_>>();
- results.extend(results_sv);
- }
- for (hash, veps) in results {
- // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
- annotations
- .store
- .entry(hash)
- .or_default()
- .push(Annotation::VEP(veps));
- }
- Ok(())
- }
- pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> {
- let modified = chrono::Utc::now().to_rfc3339();
- self.conn.execute(
- "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)",
- params![hash.to_bytes(), source, data, modified],
- )?;
- Ok(())
- }
- }
|