use crate::{ annotations::{ cosmic::Cosmic, echtvar::{parse_echtvar_val, run_echtvar}, gnomad::GnomAD, ncbi_gff::NCBIGFF, pangolin::{pangolin_parse_results, pangolin_save_variants, run_pangolin, Pangolin}, phase::{Phase, PhaseAnnotation}, vep::{get_best_vep, vep_chunk, VEP}, }, callers::{ clairs::{ClairSFormat, ClairSInfo}, deepvariant::{DeepVariantFormat, DeepVariantInfo}, nanomonsv::{NanomonsvFormat, NanomonsvInfo}, sniffles::{SnifflesFormat, SnifflesInfo}, }, config::{self, Config}, in_out::{ self, dict_reader::read_dict, get_reader, vcf_reader::{read_vcf, VCFRow}, vcf_writer::{vcf_header_from, VariantWritter}, }, rna_seq::find_monoallelics, sql::{ stats_sql::insert_stats, variants_sql::{insert_variants, update_annotations, write_annotaded}, }, utils::{ chi_square_test_for_proportions, count_repetitions, estimate_shannon_entropy, get_hts_nt_pileup, new_pg, new_pg_speed, print_stat_cat, } }; use anyhow::{anyhow, bail, Context, Ok, Result}; use charming::{ component::{Axis, Grid}, element::{AxisLabel, AxisType, ItemStyle, Label, LabelPosition}, series::Bar, Chart, }; use csv::ReaderBuilder; use dashmap::DashMap; use hashbrown::HashMap; use indicatif::{MultiProgress, ParallelProgressIterator}; use log::{info, warn}; use noodles_core::{region::Region, Position}; use noodles_fasta::indexed_reader::Builder as FastaBuilder; use noodles_gff as gff; use rayon::prelude::*; use rust_htslib::bam::IndexedReader; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; use std::{ env::temp_dir, fmt, fs::File, io::Read, str::FromStr, sync::{ atomic::{AtomicI32, Ordering}, Arc, }, }; use std::{fs, io::Write}; use utoipa::ToSchema; // chr12:25116542|G>T KRAS #[derive(Debug, Clone)] pub struct Variants { pub name: String, pub data: Vec, pub constit: DashMap, pub stats_vcf: StatsVCF, pub stats_bam: StatsBAM, pub cfg: Config, pub mp: MultiProgress, } impl Serialize for Variants { fn serialize(&self, serializer: S) -> Result where S: Serializer, { // 3 is the number of fields in the struct. let mut state = serializer.serialize_struct("Variants", 5)?; state.serialize_field("name", &self.name)?; state.serialize_field("data", &self.data)?; state.serialize_field("constit", &self.constit)?; state.serialize_field("stats_vcf", &self.stats_vcf)?; state.serialize_field("stats_bam", &self.stats_bam)?; state.end() } } #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct StatsVCF { n_tumoral_init: usize, n_constit_init: usize, n_constit: i32, n_loh: i32, n_low_mrd_depth: i32, } impl fmt::Display for StatsVCF { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let k = 100.0 / self.n_tumoral_init as f64; let string = format!( "VCF filters found {} ({:.1}%) constit, {} ({:.1}%) LOH, {} ({:.1}%) Low depth for constit variants", self.n_constit, self.n_constit as f64 * k, self.n_loh, self.n_loh as f64 * k, self.n_low_mrd_depth, self.n_low_mrd_depth as f64 * k ); write!(f, "{}", string) } } #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct StatsBAM { n_lasting: i32, n_constit: i32, n_low_mrd_depth: i32, n_low_diversity: i32, n_somatic: i32, } impl fmt::Display for StatsBAM { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let k = 100.0 / self.n_lasting as f64; let string = format!( "BAM filters found {} ({:.1}%) constit, {} ({:.1}%) low depth for constit variants, {} ({:.1}%) low diversity of sequence at the variant position, {} ({:.1}%) somatic variants", self.n_constit, self.n_constit as f64 * k, self.n_low_mrd_depth, self.n_low_mrd_depth as f64 * k, self.n_low_diversity, self.n_low_diversity as f64 * k, self.n_somatic, self.n_somatic as f64 * k ); write!(f, "{}", string) } } impl Variants { pub fn from_vec(name: String, mp: &MultiProgress, data: Vec) -> Self { Self { name, data, constit: DashMap::new(), stats_vcf: StatsVCF::default(), stats_bam: StatsBAM::default(), cfg: Config::get().unwrap(), mp: mp.clone(), } } pub fn from_vcfs( name: String, v: Vec<(&str, &VCFSource, &VariantType)>, cfg: &Config, mp: MultiProgress, ) -> Result { let pg = mp.add(new_pg(v.len() as u64)); pg.set_message("Reading VCF"); let constit: Arc> = Arc::new(DashMap::new()); let n_constit = AtomicI32::new(0); let data: Vec = v .par_iter() // .progress_count(v.len() as u64) .flat_map(|(path, source, variant_type)| { let r = match variant_type { VariantType::Somatic => read_vcf(path, source, variant_type).unwrap(), VariantType::Constitutionnal => { read_vcf(path, source, variant_type) .unwrap() .par_iter() .for_each(|e| { n_constit.fetch_add(1, Ordering::SeqCst); constit.insert( format!( "{}:{}|{}>{}", e.contig, e.position, e.reference, e.alternative ), e.clone(), ); }); vec![] } }; pg.inc(1); r }) .collect(); let stats_vcf = StatsVCF::default(); let stats_bam = StatsBAM::default(); let constit = Arc::try_unwrap(constit).unwrap(); let elapsed = pg.elapsed(); pg.finish(); info!("{} variants parsed from somatic VCFs and {} variants positions parsed from constitutionnal VCFs. Executed in {}s", data.len(), constit.len(), elapsed.as_secs()); let cfg = cfg.clone(); Ok(Self { name, data, constit, stats_vcf, stats_bam, cfg, mp: mp.clone(), }) } pub fn vcf_filters(&mut self) { let cfg = &self.cfg; let pg = self.mp.add(new_pg_speed(self.data.len() as u64)); pg.set_message("VCF filtering"); let n_tumoral_init = self.len(); let n_constit_init = self.constit_len(); let min_loh_diff = cfg.deepvariant_loh_pval; let min_mrd_depth = cfg.min_mrd_depth; info!("Filtering Constitutionnal (reported variant in constit), LOH (VAF proportion test < {}), LowMRDDepth (< {} in constit) variants by VCF annotations of {} likely somatic variants", min_loh_diff, min_mrd_depth, n_tumoral_init); let n_constit = AtomicI32::new(0); let n_loh = AtomicI32::new(0); let n_low_mrd_depth = AtomicI32::new(0); self.data = self .data .par_iter() .map(|e| { let mut tumoral = e.clone(); let k = format!( "{}:{}|{}>{}", tumoral.contig, tumoral.position, tumoral.reference, tumoral.alternative ); if let Some(mut constit) = self.constit.get_mut(&k) { if constit.get_depth() < min_mrd_depth { n_low_mrd_depth.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowMRDDepth, )); } else if constit.get_n_alt() == constit.get_depth() && tumoral.get_n_alt() == tumoral.get_depth() { n_constit.fetch_add(1, Ordering::SeqCst); tumoral .annotations .push(AnnotationType::VariantCategory(VariantCategory::Constit)); } else { let pval = chi_square_test_for_proportions( tumoral.get_n_alt() as f64, tumoral.get_depth() as f64, constit.get_n_alt() as f64, constit.get_depth() as f64, ) .unwrap(); if pval != 0.0 && pval <= min_loh_diff { n_loh.fetch_add(1, Ordering::SeqCst); tumoral .annotations .push(AnnotationType::VariantCategory(VariantCategory::LOH)); } else { n_constit.fetch_add(1, Ordering::SeqCst); tumoral .annotations .push(AnnotationType::VariantCategory(VariantCategory::Constit)); } } // If not un Constit registry, ClairS look for VCF constit depth and n_alt } else if let Format::ClairS(format) = &tumoral.callers_data.first().unwrap().format { if format.ndp < min_mrd_depth { n_low_mrd_depth.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowMRDDepth, )); } else if let ReferenceAlternative::Nucleotide(alt_base) = &tumoral.alternative { let mrd_n_alt = match alt_base { Base::A => format.nau, Base::T => format.ntu, Base::C => format.ncu, Base::G => format.ngu, _ => 0, }; if mrd_n_alt != 0 { n_constit.fetch_add(1, Ordering::SeqCst); tumoral .annotations .push(AnnotationType::VariantCategory(VariantCategory::Constit)); } } } pg.inc(1); tumoral }) .collect(); let n_constit = n_constit.load(Ordering::SeqCst); let n_loh = n_loh.load(Ordering::SeqCst); let n_low_mrd_depth = n_low_mrd_depth.load(Ordering::SeqCst); self.stats_vcf = StatsVCF { n_tumoral_init, n_constit_init, n_constit, n_loh, n_low_mrd_depth, }; // let elapsed = start.elapsed(); let elapsed = pg.elapsed(); pg.finish(); info!("{}. Executed in {}s", self.stats_vcf, elapsed.as_secs()); } /// Filter variants by reading informations from constist BAM. pub fn bam_filters(&mut self, mrd_bam: &str) { let cfg = &self.cfg; // let start = Instant::now(); let pg = self.mp.add(new_pg_speed(self.data.len() as u64)); pg.set_message("BAM filtering"); let min_mrd_depth = cfg.min_mrd_depth; info!("Filtering Constitutionnal (Alt base found in BAM pileup), LowDiversity (sequence +/- 20nt around variant with entropy < {}), LowMRDDepth (BAM pileup depth < {}) variants by BAM pileup fetching of {} likely somatic variants", cfg.min_diversity, min_mrd_depth, self.stats_vcf.n_tumoral_init - (self.stats_vcf.n_constit + self.stats_vcf.n_loh + self.stats_vcf.n_low_mrd_depth) as usize); let n_already = AtomicI32::new(0); let n_constit = AtomicI32::new(0); let n_low_mrd_depth = AtomicI32::new(0); let n_low_diversity = AtomicI32::new(0); let n_somatic = AtomicI32::new(0); self.data.par_chunks_mut(10_000).for_each(|chunk| { let mut bam = rust_htslib::bam::IndexedReader::from_path(mrd_bam) .context(anyhow!("Reading {}", mrd_bam)) .unwrap(); let mut genome_reader = FastaBuilder::default() .build_from_path(&cfg.reference_fa) .unwrap(); for tumoral in chunk.iter_mut() { pg.inc(1); if !tumoral.annotations.is_empty() { n_already.fetch_add(1, Ordering::SeqCst); continue; } let (pos, is_ins) = match tumoral.alteration_category() { AlterationCategory::Ins => (tumoral.position, true), AlterationCategory::Del => (tumoral.position, false), _ => (tumoral.position, false), }; match get_hts_nt_pileup( &mut bam, &tumoral.contig, pos as i32, is_ins, // tumoral.position as i32, ) { std::result::Result::Ok(bases) => { let depth = bases.len() as u32; if depth < min_mrd_depth { n_low_mrd_depth.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowMRDDepth, )); } else { // Check local diversity let start = Position::try_from((tumoral.position - 20) as usize).unwrap(); let end = Position::try_from((tumoral.position + 19) as usize).unwrap(); let r = Region::new(tumoral.contig.to_string(), start..=end); if let std::result::Result::Ok(reg) = genome_reader.query(&r) { let s = reg.sequence(); let u = s.as_ref(); let s = String::from_utf8(u.to_vec()).unwrap(); let ent = estimate_shannon_entropy(&s.to_lowercase()); if ent < cfg.min_diversity { n_low_diversity.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowDiversity, )); continue; } // Check triplets or doublets if DeepVariant let callers = tumoral.callers(); if callers.len() == 1 && callers[0] == *"DeepVariant" { let seq_left = &s[0..20]; let seq_right = &s[21..s.len() - 1]; // Triplet right if count_repetitions(seq_right, 3) >= 3 { n_low_diversity.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowDiversity, )); continue; } // Doublet right if count_repetitions(seq_right, 2) >= 4 { n_low_diversity.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowDiversity, )); continue; } // Triplet left if count_repetitions(seq_left, 3) >= 3 { n_low_diversity.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowDiversity, )); continue; } // Doublet left if count_repetitions(seq_left, 2) >= 4 { n_low_diversity.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::LowDiversity, )); continue; } } } // Check if the base is in constitutionnal pileup if let ReferenceAlternative::Nucleotide(alt_b) = &tumoral.alternative { let alt_b = alt_b.clone().into_u8(); let n_alt_mrd = bases .clone() .into_iter() .filter(|e| *e == alt_b) .collect::>() .len(); if n_alt_mrd > 0 { n_constit.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Constit, )); } else { n_somatic.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Somatic, )); } } else if tumoral.is_ins() { let n_alt_mrd = bases.clone().into_iter().filter(|e| *e == b'I').count(); if n_alt_mrd > 0 { n_constit.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Constit, )); } else { n_somatic.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Somatic, )); } } else if tumoral.alteration_category() == AlterationCategory::Del { let n_alt_mrd = bases.clone().into_iter().filter(|e| *e == b'D').count(); if n_alt_mrd > 0 { n_constit.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Constit, )); } else { n_somatic.fetch_add(1, Ordering::SeqCst); tumoral.annotations.push(AnnotationType::VariantCategory( VariantCategory::Somatic, )); } } } } Err(r) => panic!("{}", r), } } }); let n_constit = n_constit.load(Ordering::SeqCst); let n_low_mrd_depth = n_low_mrd_depth.load(Ordering::SeqCst); let n_low_diversity = n_low_diversity.load(Ordering::SeqCst); let n_somatic = n_somatic.load(Ordering::SeqCst); let n_lasting = self.data.len() as i32 - n_already.load(Ordering::SeqCst); self.stats_bam = StatsBAM { n_lasting, n_constit, n_low_mrd_depth, n_low_diversity, n_somatic, }; let elapsed = pg.elapsed(); pg.finish(); info!("{}. Executed in {}s", self.stats_vcf, elapsed.as_secs()); } pub fn get_cat(&mut self, cat: &VariantCategory) -> Vec { let pg = self.mp.add(new_pg_speed(self.data.len() as u64)); pg.set_message(format!("Get cat {:?}", cat)); self.data .par_iter() .progress_with(pg) .flat_map(|e| { if e.annotations .iter() .filter(|e| match e { AnnotationType::VariantCategory(vc) => vc == cat, _ => false, }) .count() > 0 { vec![e.clone()] } else { vec![] } }) .collect::>() } pub fn with_annotation(&self, annotation: &AnnotationType) -> Vec { self.data .clone() .into_iter() .filter(|v| { v.annotations.iter().any(|a| { matches!( (annotation, a), ( AnnotationType::VariantCategory(_), AnnotationType::VariantCategory(_) ) | (AnnotationType::VEP(_), AnnotationType::VEP(_)) | (AnnotationType::Cluster(_), AnnotationType::Cluster(_)) | (AnnotationType::Cosmic(_), AnnotationType::Cosmic(_)) | (AnnotationType::GnomAD(_), AnnotationType::GnomAD(_)) | (AnnotationType::NCBIGFF(_), AnnotationType::NCBIGFF(_)) | (AnnotationType::Pangolin(_), AnnotationType::Pangolin(_)) | (AnnotationType::Phase(_), AnnotationType::Phase(_)) ) }) }) .collect() } pub fn write_vcf_cat(&mut self, path: &str, cat: &VariantCategory) -> Result<()> { info!("Writing VCF {}", path); let mut to_write = sort_variants(self.get_cat(cat), &self.cfg.dict_file)?; if to_write.is_empty() { warn!("No variants to write"); return Ok(()); } let pg = self.mp.add(new_pg_speed(to_write.len() as u64)); pg.set_message("Writing VCF"); let mut w = VariantWritter::new(path, &self.cfg.dict_file)?; for row in to_write.iter_mut() { w.write_variant(row) .context(format!("Error writing VCF row {:#?}", row))?; pg.inc(1); } w.write_index_finish() .context(format!("Can't write index for {}", path))?; Ok(()) } /// Keep variants annotated Somatic pub fn keep_somatics_un(&mut self) { let pg = self.mp.add(new_pg_speed(self.data.len() as u64)); pg.set_message("Filtering Variants"); self.data = self .data .par_iter_mut() .progress_with(pg) .flat_map(|e| { // keep unannotated and somatic if e.annotations .iter() .filter(|a| match a { AnnotationType::VariantCategory(vc) => { !matches!(vc, VariantCategory::Somatic) } _ => false, }) .count() == 0 { vec![e] } else { vec![] } }) .map(|e| e.clone()) .collect(); } /// Annotate with VEP pub fn vep(&mut self) { let pg = self.mp.add(new_pg_speed(self.len() as u64)); pg.set_message("VEP"); self.data .par_chunks_mut(self.cfg.vep_chunk_size) .progress_with(pg) .for_each(|chunks| { if let Err(err) = vep_chunk(chunks) { panic!("{err}"); } }); } /// sort_variants TODO pub fn sort(&mut self) -> Result<()> { let cfg = &self.cfg; self.data = sort_variants(self.data.clone(), &cfg.dict_file)?; Ok(()) } pub fn merge(&mut self) { let pg = self.mp.add(new_pg_speed(self.len() as u64)); pg.set_message("Merging Variants by contig, positions, ref, alt"); let hm: DashMap = DashMap::new(); self.data.par_iter().progress_with(pg).for_each(|e| { let k = format!( "{}:{}|{}>{}", e.contig, e.position, e.reference, e.alternative ); if let Some(mut v) = hm.get_mut(&k) { let v = v.value_mut(); e.callers_data.iter().for_each(|cd| { v.callers_data.push(cd.clone()); v.callers_data.dedup(); }); v.source.extend(e.source.clone()); v.source.dedup(); } else { hm.insert(k, e.clone()); } }); self.data = hm.iter().map(|e| e.value().clone()).collect(); } pub fn annotate_gff_feature(&mut self, gff_path: &str) -> Result<()> { let gff_path = gff_path.to_string(); let len = self.data.len(); let pg = self.mp.add(new_pg_speed(self.len() as u64)); pg.set_message("GFF Annotate"); self.data .par_chunks_mut(len / 33) .progress_with(pg) .for_each(|chunk| { let mut reader = File::open(&gff_path) .map(noodles_bgzf::Reader::new) .map(gff::Reader::new) .unwrap(); let index = noodles_csi::read(format!("{}.csi", gff_path)) .context("Can't load csi index") .unwrap(); for v in chunk.iter_mut() { let start = Position::try_from(v.position as usize).unwrap(); let r = Region::new(v.contig.to_string(), start..=start); if let std::result::Result::Ok(rows) = reader.query(&index, &r.clone()) { for row in rows { let ncbi = NCBIGFF::from(row.unwrap()); v.annotations.push(AnnotationType::NCBIGFF(ncbi)); } } } }); Ok(()) } pub fn echtvar_annotate(&mut self, header_path: &str) -> Result<()> { let len = self.len(); let header = vcf_header_from(header_path)?; let pg = self.mp.add(new_pg_speed(len as u64)); pg.set_message("Echtvar Annotate"); self.data .par_chunks_mut(len / 33) .progress_with(pg) .for_each(|chunk| { let in_tmp = format!( "{}/echtvar_in_{}.vcf", temp_dir().to_str().unwrap(), uuid::Uuid::new_v4() ); let out_tmp = format!( "{}/echtvar_in_{}.vcf.gz", temp_dir().to_str().unwrap(), uuid::Uuid::new_v4() ); let mut vcf = File::create(&in_tmp).unwrap(); let _ = writeln!(vcf, "{}", header); for (i, row) in chunk.iter().enumerate() { let _ = writeln!( vcf, "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.", row.contig, row.position, i + 1, row.reference, row.alternative ); } run_echtvar(&in_tmp, &out_tmp).unwrap(); let mut reader = ReaderBuilder::new() .delimiter(b'\t') .has_headers(false) .comment(Some(b'#')) .flexible(true) .from_reader(get_reader(&out_tmp).unwrap()); // let mut lines: HashMap> = HashMap::new(); let mut last: usize = 1; for row in reader.deserialize::().flatten() { let (cosmic, gnomad) = parse_echtvar_val(&row.info).unwrap(); let id: usize = row.id.parse().unwrap(); if id != last { panic!("Echtvar output not in input order!"); } if let Some(c) = cosmic { chunk[id - 1].annotations.push(AnnotationType::Cosmic(c)); } if let Some(g) = gnomad { chunk[id - 1].annotations.push(AnnotationType::GnomAD(g)); } last += 1; } }); Ok(()) } pub fn category_iter(&self, category: &VariantCategory) -> Vec<&Variant> { self.data .par_iter() .filter(|v| { for annotation in v.annotations.iter() { if let AnnotationType::VariantCategory(cat) = annotation { if cat == category { return true; } } } false }) .collect::>() } /// Filter based on GnomAD if gnomad_af < max_gnomad_af pub fn filter_snp(&mut self) -> Result { let n_snp = AtomicI32::new(0); self.data = self .data .clone() .into_par_iter() .filter(|e| { let mut res = true; e.annotations.iter().for_each(|a| { if let AnnotationType::GnomAD(g) = a { res = g.gnomad_af < self.cfg.max_gnomad_af; }; }); if !res { n_snp.fetch_add(1, Ordering::SeqCst); } res }) .collect(); let n = n_snp.load(Ordering::SeqCst); Ok(n) } pub fn pangolin(&mut self) -> Result<()> { let tmp_file = pangolin_save_variants(self)?; let res_file = run_pangolin(&tmp_file)?; fs::remove_file(tmp_file)?; let res = pangolin_parse_results(&res_file)?; let mut res = res.iter(); fs::remove_file(res_file)?; info!("Adding pangolin results for {} variants.", res.len()); let mut n_added = 0; if let Some(r) = res.next() { let mut curr = r.clone(); for variant in self.data.iter_mut() { if variant.contig == curr.0 && variant.position == curr.1 && variant.reference == curr.2 && variant.alternative == curr.3 { variant.annotations.push(AnnotationType::Pangolin(curr.4)); n_added += 1; if let Some(r) = res.next() { curr = r.clone(); } else { break; } } } } info!("Pangolin annotations {}", n_added); assert_eq!(res.len(), 0); Ok(()) } pub fn find_monoallelics(&self, rna_bam: &str, out_tsv: &str) -> anyhow::Result<()> { let mut variants = self.clone(); variants.data = variants .data .into_iter() .filter_map(|mut v| { if v.vaf() < 0.75 && v.vaf() > 0.25 && v.get_depth() > 6 { Some(v) } else { None } }) .collect(); // variants.save_bytes("/data/test_var.bytes_chr1.gz")?; let monoall = find_monoallelics( &variants, &rna_bam, )?; monoall .iter() .filter(|(_k, v)| v.iter().any(|v| v.0)) // can be used as a CTRL for women .filter(|(_, v)| v[0].2 != "chrX") // can .for_each(|(k, v)| { println!( "{k}\t{}/{}\t{}", v.iter().filter(|v| v.0).count(), v.len(), v.iter() .map(|(_, _, c, p)| format!("{c}:{p}")) .collect::>() .join("|") ) }); Ok(()) } pub fn len(&self) -> usize { self.data.len() } pub fn is_empty(&self) -> bool { self.data.is_empty() } pub fn constit_len(&self) -> usize { self.constit.len() } pub fn get_variant(&self, contig: &str, pos: u32) -> Vec { self.data .par_iter() .filter(|v| v.contig == contig && v.position == pos) .map(|v| v.clone()) .collect() } pub fn phase_contig(&mut self, phases: &[Phase], bam_path: &str, contig: &str) { type Iv = rust_lapper::Interval; let data: Vec = phases .iter() .map(|p| Iv { start: p.range.unwrap().0, stop: p.range.unwrap().3 + 1, // TODO verif if inclusif val: p.clone(), }) .collect(); let lapper = rust_lapper::Lapper::new(data); let mut bam = IndexedReader::from_path(bam_path).unwrap(); let mut annotations = 0; for v in self.data.iter_mut().filter(|v| v.contig == contig) { let over_phases: Vec<&Phase> = lapper .find(v.position as u64, v.position as u64) .map(|iv| &iv.val) .collect(); if !over_phases.is_empty() { let reads: Vec = match v.alteration_category() { AlterationCategory::Snv => { let base = v.alternative.to_string().as_bytes()[0]; pandora_lib_pileup::qnames_at_base( &mut bam, &v.contig, v.position as i32, false, 50, ) .unwrap() .iter() .filter(|(_, b)| *b == base) .map(|(r, _)| r.to_string()) .collect() } AlterationCategory::Ins => pandora_lib_pileup::qnames_at_base( &mut bam, &v.contig, v.position as i32, true, 50, ) .unwrap() .iter() .filter(|(_, b)| *b == b'I') .map(|(r, _)| r.to_string()) .collect(), AlterationCategory::Del => pandora_lib_pileup::qnames_at_base( &mut bam, &v.contig, v.position as i32, false, 50, ) .unwrap() .iter() .filter(|(_, b)| *b == b'D') .map(|(r, _)| r.to_string()) .collect(), AlterationCategory::Rep => Vec::new(), AlterationCategory::Other => Vec::new(), }; over_phases .iter() .filter(|p| p.contains(&reads)) .for_each(|p| { if let Some(id) = p.id() { annotations += 1; v.annotations .push(AnnotationType::Phase(PhaseAnnotation { id })); }; }); } } } pub fn stats(&self) -> Result> { let mut callers_cat = HashMap::new(); let mut n_caller_data = 0; let mut variants_cat = HashMap::new(); let mut n_variants_wcat = 0; let mut ncbi_feature = HashMap::new(); let mut n_ncbi_feature = 0; let mut cosmic_sup_1 = HashMap::new(); let mut n_cosmic_sup_1 = 0; let mut cons_cat = HashMap::new(); let mut n_csq = 0; let add_hm = |hm: &mut HashMap, k: &str| { let (_, v) = hm.raw_entry_mut().from_key(k).or_insert(k.to_string(), 1); *v += 1; }; for ele in self.data.iter() { // Callers let mut callers = Vec::new(); for cd in &ele.callers_data { callers.push( match cd.format { Format::DeepVariant(_) => "DeepVariant", Format::ClairS(_) => "ClairS", Format::Sniffles(_) => "Sniffles", Format::Nanomonsv(_) => "Nanomonsv", } .to_string(), ); } if !callers.is_empty() { n_caller_data += 1; callers.sort(); let k = callers.join(","); let (_, v) = callers_cat .raw_entry_mut() .from_key(&k) .or_insert(k.clone(), 1); *v += 1; } // Var cat // Annotations for annot in ele.annotations.iter() { let mut features = Vec::new(); let mut variant_cat = Vec::new(); let mut cosmic_m1 = false; match annot { AnnotationType::NCBIGFF(ncbi) => { features.push(ncbi.feature.to_string()); } AnnotationType::Cosmic(c) => { if c.cosmic_cnt > 1 { cosmic_m1 = true; } } AnnotationType::VariantCategory(vc) => { let s = serde_json::to_string(vc)?; variant_cat.push(s); } _ => (), }; if !features.is_empty() { features.sort(); add_hm(&mut ncbi_feature, &features.join(",")); n_ncbi_feature += 1; } if !variant_cat.is_empty() { add_hm(&mut variants_cat, &variant_cat.join(",")); n_variants_wcat += 1; } if cosmic_m1 { add_hm(&mut cosmic_sup_1, "Cosmic > 1"); n_cosmic_sup_1 += 1; } } // VEP let d: Vec = ele .annotations .iter() .flat_map(|e| { if let AnnotationType::VEP(e) = e { e.clone() } else { vec![] } }) .collect(); if let std::result::Result::Ok(vep) = get_best_vep(&d) { if let Some(csq) = vep.consequence { n_csq += 1; let csq = csq.join(","); let (_, v) = cons_cat .raw_entry_mut() .from_key(&csq) .or_insert(csq.clone(), 1); *v += 1; } } } print_stat_cat(&cons_cat, n_csq as u32); print_stat_cat(&ncbi_feature, n_ncbi_feature as u32); print_stat_cat(&cosmic_sup_1, n_cosmic_sup_1 as u32); print_stat_cat(&callers_cat, n_caller_data as u32); // let file = File::create(path)?; // let mut writer = BufWriter::new(file); let results = vec![ Stat::new("consequences".to_string(), cons_cat, n_csq as u32), Stat::new( "variants_cat".to_string(), variants_cat, n_variants_wcat as u32, ), Stat::new( "ncbi_feature".to_string(), ncbi_feature, n_ncbi_feature as u32, ), Stat::new("callers_cat".to_string(), callers_cat, n_caller_data as u32), ]; Ok(results) } pub fn save_sql(&self, path: &str) -> Result<()> { insert_variants(self, path) } pub fn stats_sql(&self, path: &str) -> Result<()> { insert_stats( "VCF".to_string(), serde_json::to_string(&self.stats_vcf)?, path, )?; insert_stats( "BAM".to_string(), serde_json::to_string(&self.stats_bam)?, path, )?; Ok(()) } pub fn stats_json(&self, path: &str) -> Result { let variants_stats = self.stats()?; let all_stats = AllStats { variants_stats, vcf_stats: self.stats_vcf.clone(), bam_stats: self.stats_bam.clone(), n_variants: self.data.len(), }; let s = serde_json::to_string(&all_stats)?; fs::write(path, s)?; Ok(all_stats) } pub fn save_bytes(&self, path: &str) -> Result<()> { let serialized = pot::to_vec(&self.data)?; let mut w = noodles_bgzf::writer::Builder::default().build_from_writer(File::create(path)?); w.write_all(&serialized)?; Ok(()) } pub fn new_from_bytes(name: &str, path: &str, mp: MultiProgress) -> Result { info!("Loading variants from: {path}"); let r = in_out::get_reader_progress(path, &mp)?; let data: Vec = pot::from_reader(r)?; Ok(Self { name: name.to_string(), data, constit: DashMap::new(), stats_vcf: StatsVCF::default(), stats_bam: StatsBAM::default(), cfg: Config::get()?, mp, }) } pub fn filter_category(&self, and_categories: &[Category]) -> Vec<&Variant> { self.data .par_iter() .flat_map(|v| { if v.is_from_category(and_categories) { vec![v] } else { vec![] } }) .collect() } } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct Stat { name: String, counts: HashMap, n_with_annotation: u32, } impl Stat { pub fn new(name: String, counts: HashMap, n_with_annotation: u32) -> Self { Stat { counts, n_with_annotation, name, } } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AllStats { pub variants_stats: Vec, pub vcf_stats: StatsVCF, pub bam_stats: StatsBAM, pub n_variants: usize, } impl AllStats { pub fn load_json(path: &str) -> anyhow::Result { File::open(path) .context(format!("Failed to open file: {path}")) .and_then(|mut file| { let mut contents = String::new(); file.read_to_string(&mut contents) .context(format!("Failed to read file: {path}")) .map(|_| contents) }) .and_then(|json_str| { serde_json::from_str::(&json_str).with_context(|| "Failed to parse JSON") }) } pub fn generate_graph(&self, prefix: &str) -> anyhow::Result<()> { let bar_interval = 20_f64; // 1. Consequences let consequences_data: Vec<&Stat> = self .variants_stats .iter() .filter(|v| v.name == "consequences") .collect(); if consequences_data.is_empty() { bail!("No consequences data"); } let mut consequences_data: Vec<(String, i32)> = consequences_data .first() .unwrap() .counts .iter() .map(|(k, v)| { let k = k.replace("_variant", ""); let k = k.replace("_", " "); let k = if k == "intron,non coding transcript" { "non coding transcript intron".to_string() } else { k }; let k = if let Some((before_comma, _)) = k.split_once(',') { before_comma.to_string() } else { k }; (k, *v as i32) }) .collect(); consequences_data.sort_by(|a, b| a.1.cmp(&b.1)); let n = consequences_data.len() as f64; let height = (n * bar_interval) + 30.0; let (y_data, x_data): (Vec, Vec) = consequences_data.into_iter().unzip(); let chart = Chart::new() .grid(Grid::new().left("18%").right("5%").top("0%").bottom(30)) .x_axis(Axis::new().type_(AxisType::Log)) .y_axis( Axis::new() .type_(AxisType::Category) .data(y_data) .axis_label(AxisLabel::new().font_size(12)), ) .series( Bar::new() .show_background(true) .data(x_data) .bar_width(12) .item_style(ItemStyle::new().border_radius(3)) .label(Label::new().show(true).position(LabelPosition::Right)), ); let mut renderer = charming::ImageRenderer::new(1000, height as u32); renderer .save(&chart, format!("{prefix}_consequences.svg")) .unwrap(); // 2. NCBI features let ncbi_data: Vec<&Stat> = self .variants_stats .iter() .filter(|v| v.name == "ncbi_feature") .collect(); if ncbi_data.is_empty() { bail!("No NCBI features data"); } let mut ncbi_data: Vec<(String, i32)> = ncbi_data .first() .unwrap() .counts .iter() .map(|(k, v)| { let k = if k == "non_allelic_homologous_recombination_region" { "non_allelic_homologous\n_recombination_region".to_string() } else { k.to_string() }; (k.replace("_", " "), *v as i32) }) .collect(); ncbi_data.sort_by(|a, b| a.1.cmp(&b.1)); let n = ncbi_data.len() as f64; let height = (n * bar_interval) + 30.0; let (y_data, x_data): (Vec, Vec) = ncbi_data.into_iter().unzip(); let chart = Chart::new() .grid(Grid::new().left("18%").right("5%").top("0%").bottom(30)) .x_axis(Axis::new().type_(AxisType::Log)) .y_axis( Axis::new() .type_(AxisType::Category) .data(y_data) .axis_label(AxisLabel::new().font_size(12)), ) .series( Bar::new() .show_background(true) .data(x_data) .bar_width(12) .item_style(ItemStyle::new().border_radius(3)) .label(Label::new().show(true).position(LabelPosition::Right)), ); let mut renderer = charming::ImageRenderer::new(1000, height as u32); renderer.save(&chart, format!("{prefix}_ncbi.svg")).unwrap(); // 3. Callers let callers_data: Vec<&Stat> = self .variants_stats .iter() .filter(|v| v.name == "callers_cat") .collect(); if callers_data.is_empty() { bail!("No callers data"); } let mut callers_data: Vec<(String, i32)> = callers_data .first() .unwrap() .counts .iter() .map(|(k, v)| (k.replace(",", " & "), *v as i32)) .collect(); let n = callers_data.len() as f64; let height = (n * bar_interval) + 30.0; callers_data.sort_by(|a, b| a.1.cmp(&b.1)); let (y_data, x_data): (Vec, Vec) = callers_data.into_iter().unzip(); let chart = Chart::new() .grid(Grid::new().left("18%").right("5%").top("0%").bottom(30)) .x_axis(Axis::new().type_(AxisType::Log)) .y_axis( Axis::new() .type_(AxisType::Category) .data(y_data) .axis_label(AxisLabel::new().font_size(12)), ) .series( Bar::new() .show_background(true) .data(x_data) .bar_width(12) .item_style(ItemStyle::new().border_radius(3)) .label(Label::new().show(true).position(LabelPosition::Right)), ); let mut renderer = charming::ImageRenderer::new(1000, height as u32); renderer .save(&chart, format!("{prefix}_callers.svg")) .unwrap(); Ok(()) } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] pub struct Variant { pub contig: String, pub position: u32, pub reference: ReferenceAlternative, pub alternative: ReferenceAlternative, pub callers_data: Vec, pub n_alt: Option, pub n_ref: Option, pub vaf: Option, pub depth: Option, pub variant_type: VariantType, pub source: Vec, pub annotations: Vec, } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone, ToSchema)] pub struct CallerData { pub qual: Option, pub format: Format, pub info: Info, } impl CallerData { pub fn get_vaf(&self) -> f64 { match &self.format { Format::DeepVariant(v) => v.vaf as f64, Format::ClairS(v) => v.af, Format::Sniffles(v) => v.dv as f64 / (v.dv as f64 + v.dr as f64), Format::Nanomonsv(v) => v.vr as f64 / v.tr as f64, } } pub fn get_depth(&mut self) -> u32 { match &self.format { Format::DeepVariant(v) => v.dp, Format::ClairS(v) => v.dp, Format::Sniffles(v) => v.dv + v.dr, Format::Nanomonsv(v) => v.tr, } } pub fn get_n_alt(&mut self) -> u32 { match &self.format { Format::DeepVariant(v) => v.ad.get(1).unwrap().to_owned(), Format::ClairS(v) => v.ad.get(1).unwrap().to_owned(), Format::Sniffles(v) => v.dv, Format::Nanomonsv(v) => v.tr - v.vr, } } /// Variants filter rules pub fn should_filter(&self) -> bool { if let Info::Sniffles(info) = &self.info { let imprecise = info .tags .iter() .filter(|s| s.to_string() == *"IMPRECISE") .count(); let mut n_alt = 0; if let Format::Sniffles(f) = &self.format { n_alt = f.dv; } !(imprecise == 0 && n_alt >= 3) } else { false } } } #[derive(Debug, Clone, Serialize, Eq, PartialEq, Deserialize, ToSchema)] pub enum VariantType { Somatic, Constitutionnal, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] pub enum VCFSource { DeepVariant, ClairS, Sniffles, Nanomonsv, } impl FromStr for VCFSource { type Err = anyhow::Error; fn from_str(s: &str) -> Result { match s { "DeepVariant" => Ok(VCFSource::DeepVariant), "ClairS" => Ok(VCFSource::ClairS), "Sniffles" => Ok(VCFSource::Sniffles), "Nanomonsv" => Ok(VCFSource::Nanomonsv), _ => Err(anyhow!("Error parsing VCFSource")), } } } impl fmt::Display for VCFSource { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { VCFSource::DeepVariant => "DeepVariant", VCFSource::ClairS => "ClairS", VCFSource::Sniffles => "Sniffles", VCFSource::Nanomonsv => "Nanomonsv", }; write!(f, "{}", s) } } impl Variant { pub fn from_vcfrow(row: &VCFRow, source: VCFSource, variant_type: VariantType) -> Result { let callers_data = vec![CallerData { qual: row.qual.parse::().ok(), info: parse_info(&row.info, &source).context(anyhow!( "Can't parse {:?} info for {}", source, row.info ))?, format: parse_format(&source, &row.value).context(anyhow!( "Can't parse {:?} format for {}", source, row.value ))?, }]; Ok(Variant { contig: row.chr.to_string(), position: row.pos, reference: row .reference .parse() .context(anyhow!("Error while parsing {}", row.reference))?, alternative: row .alt .parse() .context(anyhow!("Error while parsing {}", row.alt))?, n_ref: None, n_alt: None, vaf: None, depth: None, callers_data, source: vec![source], variant_type, annotations: Vec::new(), }) } pub fn get_depth(&mut self) -> u32 { if let Some(depth) = self.depth { depth } else { let depth = self .callers_data .iter_mut() .map(|v| v.get_depth()) .max() .unwrap(); self.depth = Some(depth); depth } } pub fn get_n_alt(&mut self) -> u32 { if let Some(n_alt) = self.n_alt { n_alt } else { let n_alt = self .callers_data .iter_mut() .map(|v| v.get_n_alt()) .max() .unwrap(); self.n_alt = Some(n_alt); n_alt } } pub fn vaf(&mut self) -> f32 { let n_alt = self.get_n_alt() as f32; let depth = self.get_depth() as f32; self.vaf = Some(n_alt / depth); self.vaf.unwrap() } pub fn is_ins(&self) -> bool { matches!( (&self.reference, &self.alternative), ( ReferenceAlternative::Nucleotide(_), ReferenceAlternative::Nucleotides(_) ) ) } pub fn alteration_category(&self) -> AlterationCategory { match (&self.reference, &self.alternative) { (ReferenceAlternative::Nucleotide(_), ReferenceAlternative::Nucleotide(_)) => { AlterationCategory::Snv } (ReferenceAlternative::Nucleotide(_), ReferenceAlternative::Nucleotides(_)) => { AlterationCategory::Ins } (ReferenceAlternative::Nucleotide(_), ReferenceAlternative::Unstructured(_)) => { AlterationCategory::Other } (ReferenceAlternative::Nucleotides(_), ReferenceAlternative::Nucleotide(_)) => { AlterationCategory::Del } (ReferenceAlternative::Nucleotides(a), ReferenceAlternative::Nucleotides(b)) if a.len() < b.len() => { AlterationCategory::Ins } (ReferenceAlternative::Nucleotides(a), ReferenceAlternative::Nucleotides(b)) if a.len() > b.len() => { AlterationCategory::Del } (ReferenceAlternative::Nucleotides(_), ReferenceAlternative::Nucleotides(_)) => { AlterationCategory::Rep } (ReferenceAlternative::Nucleotides(_), ReferenceAlternative::Unstructured(_)) => { AlterationCategory::Other } (ReferenceAlternative::Unstructured(_), ReferenceAlternative::Nucleotide(_)) => { AlterationCategory::Other } (ReferenceAlternative::Unstructured(_), ReferenceAlternative::Nucleotides(_)) => { AlterationCategory::Other } (ReferenceAlternative::Unstructured(_), ReferenceAlternative::Unstructured(_)) => { AlterationCategory::Other } } } pub fn to_min_string(&mut self) -> String { let depth = self.get_depth(); let n_alt = self.get_n_alt(); format!( "DP:AD\t{}:{}", depth, [(depth - n_alt).to_string(), n_alt.to_string()].join(",") ) } pub fn get_veps(&self) -> Vec { self.annotations .iter() .flat_map(|e| { if let AnnotationType::VEP(e) = e { e.clone() } else { vec![] } }) .collect() } pub fn get_best_vep(&self) -> Result { get_best_vep(&self.get_veps()) } pub fn is_from_category(&self, and_categories: &[Category]) -> bool { let mut vec_bools = Vec::new(); for category in and_categories.iter() { match category { Category::VariantCategory(vc) => { for annotations in self.annotations.iter() { if let AnnotationType::VariantCategory(vvc) = annotations { if vc == vvc { vec_bools.push(true); break; } } } } Category::PositionRange { contig, from, to } => { if self.contig == *contig { match (from, to) { (None, None) => vec_bools.push(true), (None, Some(to)) => vec_bools.push(self.position <= *to), (Some(from), None) => vec_bools.push(self.position >= *from), (Some(from), Some(to)) => { vec_bools.push(self.position >= *from && self.position <= *to) } } } else { vec_bools.push(false); } } Category::VCFSource(_) => (), Category::NCosmic(n) => { let mut bools = Vec::new(); for annotations in self.annotations.iter() { if let AnnotationType::Cosmic(c) = annotations { bools.push(c.cosmic_cnt >= *n); break; } } vec_bools.push(bools.iter().any(|&b| b)); } Category::NCBIFeature(ncbi_feature) => { let mut bools = Vec::new(); for annotations in self.annotations.iter() { if let AnnotationType::NCBIGFF(v) = annotations { bools.push(v.feature == *ncbi_feature); } } vec_bools.push(bools.iter().any(|&b| b)); } Category::VAF { min, max } => { let v = if self.vaf.is_none() { let mut s = self.clone(); s.vaf() } else { self.vaf.unwrap() }; vec_bools.push(v >= *min && v <= *max); } Category::Pangolin => { vec_bools.push( self.annotations .iter() .filter(|a| matches!(a, AnnotationType::Pangolin(_))) .count() > 0, ); } } } vec_bools.iter().all(|&x| x) } pub fn callers(&self) -> Vec { self.source .iter() .map(|source| source.to_string()) .collect() } } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum AlterationCategory { Snv, Ins, Del, Rep, Other, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] pub enum AnnotationType { VariantCategory(VariantCategory), VEP(Vec), Cluster(i32), Cosmic(Cosmic), GnomAD(GnomAD), NCBIGFF(NCBIGFF), Pangolin(Pangolin), Phase(PhaseAnnotation), } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] pub enum VariantCategory { Somatic, LowMRDDepth, LOH, Constit, LowDiversity, } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, ToSchema)] pub enum ReferenceAlternative { Nucleotide(Base), Nucleotides(Vec), Unstructured(String), } impl FromStr for ReferenceAlternative { type Err = anyhow::Error; fn from_str(s: &str) -> Result { let possible_bases = s.as_bytes().iter(); let mut res: Vec = Vec::new(); for &base in possible_bases { match base.try_into() { std::result::Result::Ok(b) => res.push(b), Err(_) => { return Ok(Self::Unstructured(s.to_string())); } } } if res.len() == 1 { Ok(Self::Nucleotide(res.pop().unwrap())) } else { Ok(Self::Nucleotides(res)) } } } impl fmt::Display for ReferenceAlternative { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let string = match self { ReferenceAlternative::Nucleotide(b) => b.to_string(), ReferenceAlternative::Nucleotides(bases) => bases .iter() .fold(String::new(), |acc, e| format!("{}{}", acc, e)), ReferenceAlternative::Unstructured(s) => s.to_string(), }; write!(f, "{}", string) } } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, ToSchema)] pub enum Base { A, T, C, G, N, } impl TryFrom for Base { type Error = anyhow::Error; fn try_from(base: u8) -> Result { match base { b'A' => Ok(Base::A), b'T' => Ok(Base::T), b'C' => Ok(Base::C), b'G' => Ok(Base::G), b'N' => Ok(Base::N), _ => Err(anyhow!( "Unknown base: {}", String::from_utf8_lossy(&[base]) )), } } } impl Base { pub fn into_u8(self) -> u8 { match self { Base::A => b'A', Base::T => b'T', Base::C => b'C', Base::G => b'G', Base::N => b'N', } } } impl fmt::Display for Base { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { // Use `self.number` to refer to each positional data point. let str = match self { Base::A => "A", Base::T => "T", Base::C => "C", Base::G => "G", Base::N => "N", }; write!(f, "{}", str) } } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, ToSchema)] pub enum Format { DeepVariant(DeepVariantFormat), ClairS(ClairSFormat), Sniffles(SnifflesFormat), Nanomonsv(NanomonsvFormat), } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, ToSchema)] pub enum Info { #[schema(value_type=String)] DeepVariant(DeepVariantInfo), #[schema(value_type=String)] ClairS(ClairSInfo), #[schema(value_type=String)] Sniffles(SnifflesInfo), #[schema(value_type=String)] Nanomonsv(NanomonsvInfo), } fn parse_info(s: &str, source: &VCFSource) -> Result { match source { VCFSource::DeepVariant => Ok(Info::DeepVariant(s.parse()?)), VCFSource::ClairS => Ok(Info::ClairS(s.parse()?)), VCFSource::Sniffles => Ok(Info::Sniffles(s.parse()?)), VCFSource::Nanomonsv => Ok(Info::Nanomonsv(s.parse()?)), } } fn parse_format(vcf_source: &VCFSource, data: &str) -> Result { let res = match vcf_source { VCFSource::DeepVariant => Format::DeepVariant(data.parse()?), VCFSource::ClairS => Format::ClairS(data.parse()?), VCFSource::Sniffles => Format::Sniffles(data.parse()?), VCFSource::Nanomonsv => Format::Nanomonsv(data.parse()?), }; Ok(res) } pub fn sort_variants(d: Vec, dict_path: &str) -> Result> { info!("Sorting {} entries", d.len()); let dict = read_dict(dict_path)?; let mut store: HashMap> = HashMap::new(); // add to store d.iter().for_each(|e| { if let Some(vec) = store.get_mut(&e.contig) { vec.push(e.clone()); } else { store.insert(e.contig.to_string(), vec![e.clone()]); } }); // sort in each contig store .iter_mut() .for_each(|(_, vec)| vec.sort_by(|a, b| a.position.partial_cmp(&b.position).unwrap())); // return contig in the order of dict file Ok(dict .iter() .flat_map(|(chr, _)| { if let Some((_, vec)) = store.remove_entry(chr) { vec } else { vec![] } }) .collect()) } #[derive(Debug)] pub enum Category { VariantCategory(VariantCategory), PositionRange { contig: String, from: Option, to: Option, }, VCFSource(VCFSource), NCosmic(u64), NCBIFeature(String), VAF { min: f32, max: f32, }, Pangolin, } pub fn run_pipe(id: &str, force: bool, multi: &MultiProgress) -> Result<()> { let cfg = config::Config::get()?; let deepvariant_diag_vcf = format!( "{}/{id}/diag/DeepVariant/{id}_diag_DeepVariant_PASSED.vcf.gz", cfg.longreads_results_dir ); if !std::path::Path::new(&deepvariant_diag_vcf).exists() { return Err(anyhow!("{deepvariant_diag_vcf} is required")); // panic!("{deepvariant_diag_vcf} is required") } let deepvariant_mrd_vcf = format!( "{}/{id}/mrd/DeepVariant/{id}_mrd_DeepVariant_PASSED.vcf.gz", cfg.longreads_results_dir ); if !std::path::Path::new(&deepvariant_mrd_vcf).exists() { return Err(anyhow!("{deepvariant_mrd_vcf} is required")); } let mrd_bam = format!("{}/{id}/mrd/{id}_mrd_hs1.bam", cfg.longreads_results_dir); if !std::path::Path::new(&mrd_bam).exists() { return Err(anyhow!("{mrd_bam} is required")); } let clairs_vcf = format!( "{}/{id}/diag/ClairS/{id}_diag_clairs_PASSED.vcf.gz", cfg.longreads_results_dir ); if !std::path::Path::new(&clairs_vcf).exists() { return Err(anyhow!("{clairs_vcf} is required")); } let clairs_indels_vcf = format!( "{}/{id}/diag/ClairS/{id}_diag_clairs_indel_PASSED.vcf.gz", cfg.longreads_results_dir ); if !std::path::Path::new(&clairs_indels_vcf).exists() { return Err(anyhow!("{clairs_indels_vcf} is required")); } // let sniffles_vcf = format!( // "{}/{name}/diag/Sniffles/{name}_diag_sniffles.vcf", // cfg.longreads_results_dir // ); // let sniffles_mrd_vcf = format!( // "{}/{name}/mrd/Sniffles/{name}_mrd_sniffles.vcf", // cfg.longreads_results_dir // ); // if !std::path::Path::new(&sniffles_vcf).exists() { // return Err(anyhow!("{sniffles_vcf} is required")); // } let nanomonsv_vcf = format!( "{}/{id}/diag/nanomonsv/{id}_diag_nanomonsv_PASSED.vcf.gz", cfg.longreads_results_dir ); if !std::path::Path::new(&nanomonsv_vcf).exists() { return Err(anyhow!("{nanomonsv_vcf} is required")); } // let db_path = "/data/db_results.sqlite".to_string(); // `${data_dir}/${name}/diag/${name}_variants.sqlite` let db_path = format!( "{}/{id}/diag/{id}_variants.sqlite", cfg.longreads_results_dir ); let bytes_path = format!( "{}/{id}/diag/{id}_variants.bytes.gz", cfg.longreads_results_dir ); let loh_path = format!("{}/{id}/diag/{id}_loh.vcf.gz", cfg.longreads_results_dir); // let db_constit_path = format!( // "{}/{name}/diag/{name}_constit.sqlite", // cfg.longreads_results_dir // ); let bytes_constit_path = format!( "{}/{id}/diag/{id}_constit.bytes.gz", cfg.longreads_results_dir ); let report_data_dir = format!("{}/{id}/diag/report/data", cfg.longreads_results_dir); if !std::path::Path::new(&report_data_dir).exists() { fs::create_dir_all(&report_data_dir)?; } let stats_path = format!("{}/{id}_variants_stats.json", report_data_dir); let af_init_path = format!("{}/{id}_variants_af_init.csv", report_data_dir); let af_final_path = format!("{}/{id}_variants_af_final.csv", report_data_dir); let graphs_prefix = format!("{}/{id}_barcharts", report_data_dir); let annot_json = format!("{report_data_dir}/{id}_annot_variants.json"); let sources = vec![ ( deepvariant_diag_vcf.as_str(), &VCFSource::DeepVariant, &VariantType::Somatic, ), ( deepvariant_mrd_vcf.as_str(), &VCFSource::DeepVariant, &VariantType::Constitutionnal, ), ( clairs_vcf.as_str(), &VCFSource::ClairS, &VariantType::Somatic, ), // ( // sniffles_vcf.as_str(), // &VCFSource::Sniffles, // &VariantType::Somatic, // ), // ( // sniffles_mrd_vcf.as_str(), // &VCFSource::Sniffles, // &VariantType::Constitutionnal, // ), ( nanomonsv_vcf.as_str(), &VCFSource::Nanomonsv, &VariantType::Somatic, ), ]; let mut variants = Variants::from_vcfs(id.to_string(), sources, &cfg, multi.clone())?; write_af_data(&variants, &af_init_path).context("Can't write initial AF data")?; variants.vcf_filters(); variants .write_vcf_cat(&loh_path, &VariantCategory::LOH) .context("Can't write LOH")?; variants.bam_filters(&mrd_bam); let constits = variants.get_cat(&VariantCategory::Constit); let constits = Variants::from_vec(id.to_string(), multi, constits); constits.save_bytes(&bytes_constit_path)?; variants.keep_somatics_un(); info!("Variants retained: {}", variants.len()); // TODO check if SNP are matching if variants.len() > 100_000 { return Err(anyhow!( "Too many variants, verify if somatic and tumoral samples match." )); } variants.merge(); variants.sort()?; info!("Variants retained: {}", variants.len()); variants.vep(); // variants.pangolin()?; variants.annotate_gff_feature(&cfg.gff_path)?; variants.echtvar_annotate(&deepvariant_mrd_vcf)?; variants.filter_snp()?; variants.save_bytes(&bytes_path)?; let all_stats = variants .stats_json(&stats_path) .context("Can't write stats")?; all_stats.generate_graph(&graphs_prefix)?; write_af_data(&variants, &af_final_path).context("Can't write final AF data")?; if std::path::Path::new(&db_path).exists() { if force { write_annotaded(&db_path, &annot_json)?; fs::remove_file(&db_path)?; variants.save_sql(&db_path)?; } } else { variants.save_sql(&db_path)?; } if std::path::Path::new(&annot_json).exists() { update_annotations(&db_path, &annot_json)?; } info!("Variants : {}", variants.len()); Ok(()) } // pub fn cluster_variants(d: &mut Vec, max_dist: u32) -> i32 { // let mut cluster_id = 0; // let first = d.get(0).unwrap(); // let mut last_pos = first.position; // let mut last_contig = first.contig.to_string(); // // d.iter_mut().for_each(|e| { // if e.contig != last_contig { // cluster_id += 1; // last_contig = e.contig.to_string(); // } else if e.position - last_pos > max_dist { // cluster_id += 1; // } // e.annotations.push(AnnotationType::Cluster(cluster_id)); // last_pos = e.position; // }); // // cluster_id // } fn write_af_data(variants: &Variants, path: &str) -> anyhow::Result<()> { info!("Writing AF data into {path}"); let af_data: DashMap> = DashMap::new(); variants.data.par_iter().for_each(|v| { af_data .entry(v.depth.unwrap()) .or_default() .entry(v.n_alt.unwrap()) .and_modify(|count| *count += 1) .or_insert(1); }); write_dashmap_to_tsv(&af_data, path)?; Ok(()) } fn write_dashmap_to_tsv( af_data: &DashMap>, filename: &str, ) -> anyhow::Result<()> { let mut writer = csv::Writer::from_path(filename)?; // Find the maximum second key to determine the number of columns let max_second_key = af_data .iter() .flat_map(|entry| { entry .value() .iter() .map(|inner_entry| *inner_entry.key()) .collect::>() }) .max() .unwrap_or(0); // Write header let mut header = vec!["depth".to_string()]; header.extend((0..=max_second_key).map(|i| format!("n_alt_{}", i))); writer.write_record(&header)?; // Write data rows for entry in af_data.iter() { let depth = *entry.key(); let inner_map = entry.value(); let mut row = vec![depth.to_string()]; for i in 0..=max_second_key { let count = inner_map.get(&i).map(|v| *v).unwrap_or(0).to_string(); row.push(count); } writer.write_record(&row)?; } writer.flush()?; Ok(()) }