use std::{ collections::{HashMap, HashSet}, fs::{self, File}, io::Write, }; use anyhow::Context; use bgzip::{BGZFReader, BGZFWriter}; use csv::ReaderBuilder; use log::{debug, info, warn}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::variant::{AlterationCategory, ReferenceAlternative, VcfVariant}; use crate::{ annotation::{ cosmic::Cosmic, echtvar::{parse_echtvar_val, run_echtvar}, gnomad::GnomAD, vep::{run_vep, VepLine, VEP}, Annotation, Annotations, }, collection::{ bam::{counts_at, counts_ins_at}, vcf::Vcf, }, helpers::{app_storage_dir, estimate_shannon_entropy, temp_file_path, Hash128}, io::{readers::get_reader, vcf::vcf_header}, pipes::somatic::sequence_at, positions::GenomePosition, }; #[derive(Debug, Clone)] pub struct VariantCollection { pub variants: Vec, pub vcf: Vcf, pub caller: Annotation, } impl VariantCollection { pub fn keys(&self) -> Vec { self.variants.iter().map(|v| v.hash()).collect() } pub fn retain_keys(&mut self, keys_to_keep: &HashSet) { self.variants.retain(|v| keys_to_keep.contains(&v.hash())); } pub fn remove_keys(&mut self, keys_to_remove: &HashSet) { self.variants .retain(|v| !keys_to_remove.contains(&v.hash())); } pub fn partition(self, predicate: F) -> (Self, Self) where F: Fn(&VcfVariant) -> bool, { let Self { variants, vcf, caller, } = self; let (left_data, right_data) = variants.into_iter().partition(predicate); ( Self { variants: left_data, vcf: vcf.clone(), caller: caller.clone(), }, Self { variants: right_data, vcf, caller, }, ) } pub fn chunk_size(&self, max_threads: u8) -> usize { let total_items = self.variants.len(); let min_chunk_size = 1000; let max_chunks = max_threads; let optimal_chunk_size = total_items.div_ceil(max_chunks as usize); optimal_chunk_size.max(min_chunk_size) } pub fn annotate_with_sequence_entropy( &self, annotations: &Annotations, reference: &str, seq_len: usize, max_threads: u8, ) { self.variants .par_chunks(self.chunk_size(max_threads)) .for_each(|chunk| { let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default() .build_from_path(reference) .unwrap(); for c in chunk { let key = c.hash(); let mut anns = annotations.store.entry(key).or_default(); if !anns .iter() .any(|e| matches!(e, Annotation::ShannonEntropy(_))) { if let Ok(r) = sequence_at( &mut fasta_reader, &c.position.contig(), c.position.position as usize, seq_len, ) { let ent = estimate_shannon_entropy(r.as_str()); anns.push(Annotation::ShannonEntropy(ent)); } } } }); } pub fn annotate_with_constit_bam( &self, annotations: &Annotations, constit_bam_path: &str, max_threads: u8, ) -> anyhow::Result<()> { self.variants .par_chunks(self.chunk_size(max_threads)) .try_for_each(|chunk| { let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path) .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?; for var in chunk { let key = var.hash(); let mut anns = annotations.store.entry(key).or_default(); if anns .iter() .filter(|e| { matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_)) }) .count() != 2 { let mut n_alt = 0; let mut depth = 0; let mut alt_seq = var.alternative.to_string(); let c = if var.alteration_category() == AlterationCategory::INS { // TODO: Add stretch comparison alt_seq = alt_seq.split_off(1); counts_ins_at(&mut bam, &var.position.contig(), var.position.position)? } else if var.alteration_category() == AlterationCategory::DEL { alt_seq = "D".to_string(); counts_at(&mut bam, &var.position.contig(), var.position.position + 1)? } else { counts_at(&mut bam, &var.position.contig(), var.position.position)? }; for (seq, n) in c { if seq == alt_seq { n_alt = n; } if seq == *"D" && alt_seq != *"D" { continue; } depth += n; } anns.push(Annotation::ConstitAlt(n_alt as u16)); anns.push(Annotation::ConstitDepth(depth as u16)); } } anyhow::Ok(()) })?; Ok(()) } pub fn stats(&self) { println!( "{} {}\t{}", self.vcf.caller, self.vcf.time, self.variants.len() ); } } #[derive(Debug, Serialize, Deserialize)] pub struct Variant { pub hash: Hash128, pub position: GenomePosition, pub reference: ReferenceAlternative, pub alternative: ReferenceAlternative, pub vcf_variants: Vec, pub annotations: Vec, } impl PartialEq for Variant { fn eq(&self, other: &Self) -> bool { self.position == other.position && self.reference == other.reference && self.alternative == other.alternative } } #[derive(Debug, Default, Serialize, Deserialize)] pub struct Variants { pub data: Vec, } impl Variants { pub fn sort(&mut self) { self.data .sort_unstable_by(|a, b| a.position.cmp(&b.position)); } pub fn merge(&mut self, others: VariantCollection, annotations: &Annotations) { let mut result = Vec::new(); let mut n_merged = 0; let mut self_iter = self.data.drain(..).peekable(); // Iterator for self.data let mut others_iter = others.variants.into_iter().peekable(); // Iterator for others.variants // Merge using two-pointer technique while let (Some(self_variant), Some(other_variant)) = (self_iter.peek(), others_iter.peek()) { match self_variant.position.cmp(&other_variant.position) { std::cmp::Ordering::Less => { result.push(self_iter.next().unwrap()); } std::cmp::Ordering::Greater => { result.push(create_variant( vec![others_iter.next().unwrap()], annotations, )); } std::cmp::Ordering::Equal => { let self_variant = self_iter.next().unwrap(); let other_variant = others_iter.next().unwrap(); if self_variant.reference == other_variant.reference && self_variant.alternative == other_variant.alternative { let mut merged_variant = self_variant; merged_variant.vcf_variants.push(other_variant); n_merged += 1; result.push(merged_variant); } else { result.push(self_variant); result.push(create_variant(vec![other_variant], annotations)); } } } } // Drain remaining elements from iterators result.extend(self_iter); result.extend(others_iter.map(|v| create_variant(vec![v], annotations))); info!("n merged: {}", n_merged); self.data = result; } pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> { let file = File::create(filename) .with_context(|| format!("Failed to create file: {}", filename))?; let mut writer = BGZFWriter::new(file, bgzip::Compression::default()); serde_json::to_writer(&mut writer, self) .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?; writer .close() .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?; debug!("Successfully saved variants to {}", filename); Ok(()) } pub fn load_from_json(filename: &str) -> anyhow::Result { let file = File::open(filename).with_context(|| format!("Failed to open file: {}", filename))?; let mut reader = BGZFReader::new(file) .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?; let variants: Self = serde_json::from_reader(&mut reader) .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?; debug!("Successfully loaded variants from {}", filename); Ok(variants) } } fn create_variant(vcf_variants: Vec, annotations: &Annotations) -> Variant { let first = &vcf_variants[0]; let annotations = annotations .store .get(&first.hash) .map(|v| v.value().to_vec()) .unwrap_or_default(); Variant { hash: first.hash, position: first.position.clone(), reference: first.reference.clone(), alternative: first.alternative.clone(), vcf_variants, annotations, } } pub enum ExtAnnotationSource { Cosmic(Cosmic), GnomAD(GnomAD), } use rusqlite::{params, Connection, Result as SqliteResult}; pub struct ExternalAnnotation { pub conn: Connection, } impl ExternalAnnotation { pub fn init() -> anyhow::Result { let mut db_path = app_storage_dir()?; db_path.push("annotations_db.sqlite"); info!("Opening DB: {}", db_path.display()); let conn = Connection::open(db_path)?; conn.execute( "CREATE TABLE IF NOT EXISTS annotations ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash BLOB(16) UNIQUE NOT NULL, source TEXT NOT NULL, data BLOB, modified TEXT NOT NULL )", [], )?; Ok(Self { conn }) } pub fn annotate( &self, variants: &[VcfVariant], annotations: &Annotations, ) -> anyhow::Result<()> { let mut unfound = Vec::new(); for variant in variants { let hash = variant.hash(); let mut has_pushed = false; // Check COSMIC match self.get_annotation(hash, "COSMIC")? { Some(cosmic) => { annotations .store .entry(hash) .or_default() .push(Annotation::Cosmic(cosmic)); } None => { unfound.push(variant.clone()); has_pushed = true; } } // Check GnomAD match self.get_annotation(hash, "GnomAD")? { Some(gnomad) => { annotations .store .entry(hash) .or_default() .push(Annotation::GnomAD(gnomad)); } None => { if !has_pushed { unfound.push(variant.clone()) } } } } if !unfound.is_empty() { self.process_unfound_echtvar(unfound, annotations)?; } Ok(()) } fn get_annotation( &self, hash: Hash128, source: &str, ) -> anyhow::Result> { let result: SqliteResult> = self.conn.query_row( "SELECT data FROM annotations WHERE hash = ? AND source = ?", params![hash.to_bytes(), source], |row| row.get(0), ); match result { Ok(data) => Ok(Some(serde_json::from_slice(&data)?)), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), Err(e) => Err(e.into()), } } pub fn process_unfound_echtvar( &self, mut unfound: Vec, annotations: &Annotations, ) -> anyhow::Result<()> { let temp_dir = std::env::temp_dir(); unfound.par_sort(); unfound.dedup(); let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n"); let min_chunk_size = 1000; let max_chunks = 150; let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); let results = unfound .par_chunks(optimal_chunk_size) .flat_map(|chunk| -> anyhow::Result> { let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4())); let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4())); // Write input VCF let mut vcf = File::create(&in_tmp)?; writeln!(vcf, "{}", header)?; for (i, row) in chunk.iter().enumerate() { writeln!( vcf, "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.", row.position.contig(), row.position.position + 1, // vcf i + 1, row.reference, row.alternative )?; } // Run echtvar run_echtvar(in_tmp.to_str().unwrap(), out_tmp.to_str().unwrap())?; fs::remove_file(in_tmp)?; // Parse echtvar output let mut reader = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(get_reader(out_tmp.to_str().unwrap())?); let mut chunk_results = Vec::new(); for (i, result) in reader.deserialize::().enumerate() { let row = result?; // Verify that the ID corresponds to the input let id: usize = row.id.parse().context("Failed to parse ID")?; if id != i + 1 { return Err(anyhow::anyhow!( "Echtvar output ID {} does not match expected ID {}", id, i + 1 )); } let (cosmic, gnomad) = parse_echtvar_val(&row.info)?; let hash = chunk[i].hash(); chunk_results.push((hash, cosmic, gnomad)); } fs::remove_file(out_tmp)?; Ok(chunk_results) }) .flatten() .collect::>(); for (hash, cosmic, gnomad) in results { // let modified = chrono::Utc::now().to_rfc3339(); // Update COSMIC // self.conn.execute( // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)", // params![ // hash.to_le_bytes().to_vec(), // cosmic.as_ref().map(|c| serde_json::to_vec(c).unwrap()), // modified // ], // )?; // // // Update GnomAD // self.conn.execute( // "INSERT OR REPLACE INTO annotations (hash, data, modified) VALUES (?, ?, ?)", // params![ // hash.to_le_bytes().to_vec(), // gnomad.as_ref().map(|g| serde_json::to_vec(g).unwrap()), // modified // ], // )?; // Update in-memory annotations if let Some(c) = cosmic { annotations .store .entry(hash) .or_default() .push(Annotation::Cosmic(c)); } if let Some(g) = gnomad { annotations .store .entry(hash) .or_default() .push(Annotation::GnomAD(g)); } } Ok(()) } pub fn annotate_vep( &self, variants: &[VcfVariant], annotations: &Annotations, ) -> anyhow::Result<()> { let mut unfound = Vec::new(); for variant in variants { let hash = variant.hash(); // Check VEP match self.get_annotation(hash, "VEP")? { Some(veps) => { annotations .store .entry(hash) .or_default() .push(Annotation::VEP(veps)); } None => { unfound.push(variant.clone()); } } } if !unfound.is_empty() { self.process_unfound_vep(unfound, annotations)?; } Ok(()) } pub fn process_unfound_vep( &self, mut unfound: Vec, annotations: &Annotations, ) -> anyhow::Result<()> { unfound.par_sort(); unfound.dedup(); let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n"); let (sv, unfound): (Vec, Vec) = unfound.into_iter().partition(|v| v.has_svtype()); warn!("SV {}", sv.len()); let min_chunk_size = 1000; let max_chunks = 150; let mut results = if !unfound.is_empty() { let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); unfound .par_chunks(optimal_chunk_size) .flat_map(|chunk| -> anyhow::Result> { let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string(); let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string(); let out_summary = format!("{out_vep}_summary.html"); let out_warnings = format!("{out_vep}_warnings.txt"); // Write input VCF let mut vcf = File::create(&in_tmp)?; writeln!(vcf, "{}", header)?; for (i, row) in chunk.iter().enumerate() { writeln!( vcf, "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.", row.position.contig(), row.position.position + 1, // vcf i + 1, row.reference, row.alternative )?; } run_vep(&in_tmp, &out_vep)?; let mut reader_vep = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(fs::File::open(out_vep.clone())?); let mut lines: HashMap> = HashMap::new(); for line in reader_vep.deserialize() { let line: VepLine = line.context("Failed to deserialize VepLine")?; let key = line .uploaded_variation .parse::() .context("Failed to parse uploaded_variation as u64")?; lines.entry(key).or_default().push(line); } fs::remove_file(in_tmp)?; fs::remove_file(out_vep)?; let mut n_not_vep = 0; let mut chunk_results: Vec<(Hash128, Vec)> = Vec::new(); chunk.iter().enumerate().for_each(|(i, entry)| { let k = (i + 1) as u64; if let Some(vep_lines) = lines.get(&k) { if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() { chunk_results.push((entry.hash(), veps)); } } else { warn!( "No VEP entry for {}:{}>{}", entry.position.to_string(), entry.reference.to_string(), entry.alternative.to_string() ); n_not_vep += 1; } }); if n_not_vep > 0 { debug!("{n_not_vep} variants not annotated by VEP."); let warnings = fs::read_to_string(&out_warnings) .context(format!("Can't read VEP warnings: {out_warnings}"))?; warn!("VEP warnings:\n{warnings}"); } fs::remove_file(out_warnings)?; fs::remove_file(out_summary)?; Ok(chunk_results) }) .flatten() .collect::>() } else { Vec::new() }; if !sv.is_empty() { let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize); let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size); let results_sv = sv .par_chunks(optimal_chunk_size) .flat_map(|chunk| -> anyhow::Result> { let in_tmp = temp_file_path(".vcf")?.to_str().unwrap().to_string(); let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string(); let out_summary = format!("{out_vep}_summary.html"); let out_warnings = format!("{out_vep}_warnings.txt"); // Write input VCF let mut vcf = File::create(&in_tmp)?; writeln!(vcf, "{}", header)?; for (i, mut row) in chunk.iter().cloned().enumerate() { row.id = (i + 1).to_string(); let s = row.into_vcf_row(); writeln!(vcf, "{s}",)?; } run_vep(&in_tmp, &out_vep)?; let mut reader_vep = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(fs::File::open(out_vep.clone())?); let mut lines: HashMap> = HashMap::new(); for line in reader_vep.deserialize() { let line: VepLine = line.context("Failed to deserialize VepLine")?; let key = line .uploaded_variation .parse::() .context("Failed to parse uploaded_variation as u64")?; lines.entry(key).or_default().push(line); } fs::remove_file(in_tmp)?; fs::remove_file(out_vep)?; let mut n_not_vep = 0; let mut chunk_results: Vec<(Hash128, Vec)> = Vec::new(); chunk.iter().enumerate().for_each(|(i, entry)| { let k = (i + 1) as u64; if let Some(vep_lines) = lines.get(&k) { if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() { chunk_results.push((entry.hash(), veps)); } } else { warn!( "No VEP entry for {}\t{}\t{}", entry.position.to_string(), entry.reference.to_string(), entry.alternative.to_string() ); n_not_vep += 1; } }); if n_not_vep > 0 { debug!("{n_not_vep} variants not annotated by VEP."); let warnings = fs::read_to_string(&out_warnings) .context(format!("Can't read VEP warnings: {out_warnings}"))?; warn!("VEP warnings:\n{warnings}"); } fs::remove_file(out_warnings)?; fs::remove_file(out_summary)?; Ok(chunk_results) }) .flatten() .collect::>(); results.extend(results_sv); } for (hash, veps) in results { // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?; annotations .store .entry(hash) .or_default() .push(Annotation::VEP(veps)); } Ok(()) } pub fn update_database(&self, hash: Hash128, source: &str, data: &[u8]) -> anyhow::Result<()> { let modified = chrono::Utc::now().to_rfc3339(); self.conn.execute( "INSERT OR REPLACE INTO annotations (hash, source, data, modified) VALUES (?, ?, ?, ?)", params![hash.to_bytes(), source, data, modified], )?; Ok(()) } }