|
|
@@ -8,8 +8,10 @@ use std::{
|
|
|
|
|
|
use crate::{
|
|
|
callers::nanomonsv::nanomonsv_insert_classify,
|
|
|
+ commands::{exec_jobs, AnyJob},
|
|
|
io::{somaticpipe_container::PandoraReader, tsv::TsvLine, vcf::read_vcf},
|
|
|
- runners::Run, variant::vcf_variant::VariantId,
|
|
|
+ runners::Run,
|
|
|
+ variant::vcf_variant::VariantId,
|
|
|
};
|
|
|
use anyhow::Context;
|
|
|
use bitcode::{Decode, Encode};
|
|
|
@@ -26,7 +28,7 @@ use super::vcf_variant::{
|
|
|
use crate::{
|
|
|
annotation::{
|
|
|
cosmic::Cosmic,
|
|
|
- echtvar::{parse_echtvar_val, run_echtvar},
|
|
|
+ echtvar::{parse_echtvar_val, EchtvarJob},
|
|
|
gnomad::GnomAD,
|
|
|
parse_trinuc,
|
|
|
vep::{get_best_vep, VepJob, VepLine, VEP},
|
|
|
@@ -548,148 +550,143 @@ impl VariantCollection {
|
|
|
let chunk_size = self.chunk_size();
|
|
|
let reference = config.reference.clone();
|
|
|
|
|
|
- self.variants
|
|
|
- .par_chunks(chunk_size)
|
|
|
- .for_each_init(
|
|
|
- || {
|
|
|
- let bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path);
|
|
|
- let fasta = noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
- .build_from_path(&reference);
|
|
|
- (bam, fasta)
|
|
|
- },
|
|
|
- |(bam_res, fasta_res), chunk| {
|
|
|
- let Ok(ref mut bam) = bam_res else {
|
|
|
- error!("BAM reader unavailable for chunk");
|
|
|
- return;
|
|
|
- };
|
|
|
- let Ok(ref mut fasta_reader) = fasta_res else {
|
|
|
- error!("FASTA reader unavailable for chunk");
|
|
|
- return;
|
|
|
- };
|
|
|
+ self.variants.par_chunks(chunk_size).for_each_init(
|
|
|
+ || {
|
|
|
+ let bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path);
|
|
|
+ let fasta = noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
+ .build_from_path(&reference);
|
|
|
+ (bam, fasta)
|
|
|
+ },
|
|
|
+ |(bam_res, fasta_res), chunk| {
|
|
|
+ let Ok(ref mut bam) = bam_res else {
|
|
|
+ error!("BAM reader unavailable for chunk");
|
|
|
+ return;
|
|
|
+ };
|
|
|
+ let Ok(ref mut fasta_reader) = fasta_res else {
|
|
|
+ error!("FASTA reader unavailable for chunk");
|
|
|
+ return;
|
|
|
+ };
|
|
|
|
|
|
- for var in chunk {
|
|
|
- let key = var.hash;
|
|
|
- let mut anns = annotations.store.entry(key).or_default();
|
|
|
+ for var in chunk {
|
|
|
+ let key = var.hash;
|
|
|
+ let mut anns = annotations.store.entry(key).or_default();
|
|
|
|
|
|
- if anns
|
|
|
- .iter()
|
|
|
- .filter(|e| {
|
|
|
- matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
|
|
|
- })
|
|
|
- .count()
|
|
|
- == 2
|
|
|
- {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ if anns
|
|
|
+ .iter()
|
|
|
+ .filter(|e| {
|
|
|
+ matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
|
|
|
+ })
|
|
|
+ .count()
|
|
|
+ == 2
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- let result = (|| -> anyhow::Result<()> {
|
|
|
- match var.alteration_category() {
|
|
|
- AlterationCategory::SNV => {
|
|
|
- let pileup = counts_at(
|
|
|
+ let result = (|| -> anyhow::Result<()> {
|
|
|
+ match var.alteration_category() {
|
|
|
+ AlterationCategory::SNV => {
|
|
|
+ let pileup =
|
|
|
+ counts_at(bam, &var.position.contig(), var.position.position)?;
|
|
|
+ let alt_seq = var.alternative.to_string();
|
|
|
+ let (depth, alt) =
|
|
|
+ pileup.into_iter().fold((0, 0), folder(&alt_seq));
|
|
|
+ anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
+ anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
+ }
|
|
|
+ AlterationCategory::DEL => {
|
|
|
+ if let Some(del_repr) = var.deletion_desc() {
|
|
|
+ let len = var.deletion_len().unwrap_or_default();
|
|
|
+
|
|
|
+ let pileup_start = crate::collection::bam::nt_pileup_new(
|
|
|
bam,
|
|
|
&var.position.contig(),
|
|
|
- var.position.position,
|
|
|
+ del_repr.start.saturating_sub(1),
|
|
|
+ false,
|
|
|
)?;
|
|
|
- let alt_seq = var.alternative.to_string();
|
|
|
- let (depth, alt) =
|
|
|
- pileup.into_iter().fold((0, 0), folder(&alt_seq));
|
|
|
- anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
- anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
- }
|
|
|
- AlterationCategory::DEL => {
|
|
|
- if let Some(del_repr) = var.deletion_desc() {
|
|
|
- let len = var.deletion_len().unwrap_or_default();
|
|
|
|
|
|
- let pileup_start = crate::collection::bam::nt_pileup_new(
|
|
|
- bam,
|
|
|
- &var.position.contig(),
|
|
|
- del_repr.start.saturating_sub(1),
|
|
|
- false,
|
|
|
- )?;
|
|
|
+ let pileup_end = crate::collection::bam::nt_pileup_new(
|
|
|
+ bam,
|
|
|
+ &var.position.contig(),
|
|
|
+ del_repr.end.saturating_sub(1),
|
|
|
+ false,
|
|
|
+ )?;
|
|
|
|
|
|
- let pileup_end = crate::collection::bam::nt_pileup_new(
|
|
|
- bam,
|
|
|
+ let tol = if len > 1 {
|
|
|
+ let seq = crate::io::fasta::sequence_range(
|
|
|
+ fasta_reader,
|
|
|
&var.position.contig(),
|
|
|
- del_repr.end.saturating_sub(1),
|
|
|
- false,
|
|
|
+ del_repr.start.saturating_sub(1) as usize,
|
|
|
+ del_repr.end.saturating_sub(1) as usize,
|
|
|
)?;
|
|
|
-
|
|
|
- let tol = if len > 1 {
|
|
|
- let seq = crate::io::fasta::sequence_range(
|
|
|
- fasta_reader,
|
|
|
- &var.position.contig(),
|
|
|
- del_repr.start.saturating_sub(1) as usize,
|
|
|
- del_repr.end.saturating_sub(1) as usize,
|
|
|
- )?;
|
|
|
- match detect_repetition(&seq) {
|
|
|
- Repeat::None => 0,
|
|
|
- Repeat::RepOne(_, _) => 3,
|
|
|
- Repeat::RepTwo(_, _) => 3,
|
|
|
- }
|
|
|
- } else {
|
|
|
- 0
|
|
|
- };
|
|
|
-
|
|
|
- let alt: u32 = pileup_start
|
|
|
- .iter()
|
|
|
- .map(|pb| match pb {
|
|
|
- crate::collection::bam::PileBase::Del((_qn, l))
|
|
|
- if *l >= len.saturating_sub(tol).max(1)
|
|
|
- && *l <= len + tol =>
|
|
|
- {
|
|
|
- 1
|
|
|
- }
|
|
|
- _ => 0,
|
|
|
- })
|
|
|
- .sum();
|
|
|
-
|
|
|
- let depth = pileup_start.len().max(pileup_end.len());
|
|
|
- anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
- anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
+ match detect_repetition(&seq) {
|
|
|
+ Repeat::None => 0,
|
|
|
+ Repeat::RepOne(_, _) => 3,
|
|
|
+ Repeat::RepTwo(_, _) => 3,
|
|
|
+ }
|
|
|
} else {
|
|
|
- anns.push(Annotation::ConstitAlt(0_u16));
|
|
|
- anns.push(Annotation::ConstitDepth(111_u16));
|
|
|
- }
|
|
|
+ 0
|
|
|
+ };
|
|
|
+
|
|
|
+ let alt: u32 = pileup_start
|
|
|
+ .iter()
|
|
|
+ .map(|pb| match pb {
|
|
|
+ crate::collection::bam::PileBase::Del((_qn, l))
|
|
|
+ if *l >= len.saturating_sub(tol).max(1)
|
|
|
+ && *l <= len + tol =>
|
|
|
+ {
|
|
|
+ 1
|
|
|
+ }
|
|
|
+ _ => 0,
|
|
|
+ })
|
|
|
+ .sum();
|
|
|
+
|
|
|
+ let depth = pileup_start.len().max(pileup_end.len());
|
|
|
+ anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
+ } else {
|
|
|
+ anns.push(Annotation::ConstitAlt(0_u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(111_u16));
|
|
|
}
|
|
|
- AlterationCategory::INS => {
|
|
|
- let normal_pileup = crate::collection::bam::nt_pileup_new(
|
|
|
- bam,
|
|
|
- &var.position.contig(),
|
|
|
- var.position.position,
|
|
|
- false,
|
|
|
- )?;
|
|
|
- let normal_depth = normal_pileup.len();
|
|
|
- let alt_seq = var.inserted_seq().unwrap_or_default();
|
|
|
- let alt_seq = alt_seq.as_bytes().to_vec();
|
|
|
-
|
|
|
- let mut normal_n_alt = 0;
|
|
|
- for pile in normal_pileup {
|
|
|
- if let PileBase::Ins { len, seq } = pile {
|
|
|
- if let Some(seq) = seq {
|
|
|
- let dist = levenshtein_exp(&alt_seq, &seq);
|
|
|
- let dist_frac = dist as f64 / len as f64;
|
|
|
- if dist_frac < 0.1 {
|
|
|
- normal_n_alt += 1;
|
|
|
- }
|
|
|
- } else if alt_seq.len() as u32 == len {
|
|
|
+ }
|
|
|
+ AlterationCategory::INS => {
|
|
|
+ let normal_pileup = crate::collection::bam::nt_pileup_new(
|
|
|
+ bam,
|
|
|
+ &var.position.contig(),
|
|
|
+ var.position.position,
|
|
|
+ false,
|
|
|
+ )?;
|
|
|
+ let normal_depth = normal_pileup.len();
|
|
|
+ let alt_seq = var.inserted_seq().unwrap_or_default();
|
|
|
+ let alt_seq = alt_seq.as_bytes().to_vec();
|
|
|
+
|
|
|
+ let mut normal_n_alt = 0;
|
|
|
+ for pile in normal_pileup {
|
|
|
+ if let PileBase::Ins { len, seq } = pile {
|
|
|
+ if let Some(seq) = seq {
|
|
|
+ let dist = levenshtein_exp(&alt_seq, &seq);
|
|
|
+ let dist_frac = dist as f64 / len as f64;
|
|
|
+ if dist_frac < 0.1 {
|
|
|
normal_n_alt += 1;
|
|
|
}
|
|
|
+ } else if alt_seq.len() as u32 == len {
|
|
|
+ normal_n_alt += 1;
|
|
|
}
|
|
|
}
|
|
|
- anns.push(Annotation::ConstitAlt(normal_n_alt as u16));
|
|
|
- anns.push(Annotation::ConstitDepth(normal_depth as u16));
|
|
|
}
|
|
|
- _ => (),
|
|
|
+ anns.push(Annotation::ConstitAlt(normal_n_alt as u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(normal_depth as u16));
|
|
|
}
|
|
|
- Ok(())
|
|
|
- })();
|
|
|
-
|
|
|
- if let Err(e) = result {
|
|
|
- warn!("BAM annotation failed for {}: {e}", var.variant_id());
|
|
|
+ _ => (),
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ })();
|
|
|
+
|
|
|
+ if let Err(e) = result {
|
|
|
+ warn!("BAM annotation failed for {}: {e}", var.variant_id());
|
|
|
}
|
|
|
- },
|
|
|
- );
|
|
|
+ }
|
|
|
+ },
|
|
|
+ );
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
@@ -1884,71 +1881,81 @@ impl ExternalAnnotation {
|
|
|
let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
|
|
|
let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
|
|
|
|
|
|
- let config = Arc::new(&self.config);
|
|
|
-
|
|
|
- let results = unfound
|
|
|
- .par_chunks(optimal_chunk_size)
|
|
|
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
|
|
|
- let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
|
|
|
- let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
|
|
|
-
|
|
|
- // Write input VCF
|
|
|
- let mut vcf = File::create(&in_tmp)?;
|
|
|
- writeln!(vcf, "{}", header)?;
|
|
|
- for (i, row) in chunk.iter().enumerate() {
|
|
|
- writeln!(
|
|
|
- vcf,
|
|
|
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
|
|
|
- row.position.contig(),
|
|
|
- row.position.position + 1, // vcf
|
|
|
- i + 1,
|
|
|
- row.reference,
|
|
|
- row.alternative
|
|
|
- )?;
|
|
|
- }
|
|
|
+ let mut chunk_metadata = Vec::new();
|
|
|
+ let mut jobs: Vec<Vec<Box<dyn AnyJob>>> = Vec::new();
|
|
|
+
|
|
|
+ for chunk in unfound.chunks(optimal_chunk_size) {
|
|
|
+ let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
|
|
|
+ let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
|
|
|
+
|
|
|
+ // Write input VCF
|
|
|
+ let mut vcf = File::create(&in_tmp)?;
|
|
|
+ writeln!(vcf, "{}", header)?;
|
|
|
+ for (i, row) in chunk.iter().enumerate() {
|
|
|
+ writeln!(
|
|
|
+ vcf,
|
|
|
+ "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
|
|
|
+ row.position.contig(),
|
|
|
+ row.position.position + 1, // vcf
|
|
|
+ i + 1,
|
|
|
+ row.reference,
|
|
|
+ row.alternative
|
|
|
+ )?;
|
|
|
+ }
|
|
|
|
|
|
- // Run echtvar
|
|
|
- run_echtvar(&in_tmp, &out_tmp, &config)?;
|
|
|
- // fs::remove_file(in_tmp)?;
|
|
|
+ let chunk_hashes = chunk.iter().map(|row| row.hash).collect::<Vec<_>>();
|
|
|
+ let job = EchtvarJob {
|
|
|
+ input_vcf: in_tmp.clone(),
|
|
|
+ output_vcf: out_tmp.clone(),
|
|
|
+ config: self.config.clone(),
|
|
|
+ };
|
|
|
|
|
|
- // Parse echtvar output
|
|
|
- let mut echtvar_rdr =
|
|
|
- std::io::BufReader::new(get_reader(out_tmp.to_str().unwrap())?);
|
|
|
- let mut echtvar_line = TsvLine::new();
|
|
|
- let mut chunk_results = Vec::new();
|
|
|
- let mut i = 0usize;
|
|
|
+ chunk_metadata.push((chunk_hashes, out_tmp));
|
|
|
+ jobs.push(crate::jobs_seq![job]);
|
|
|
+ }
|
|
|
|
|
|
- while echtvar_line.read(&mut echtvar_rdr)? {
|
|
|
- if echtvar_line.as_str().starts_with('#') || echtvar_line.as_str().is_empty() {
|
|
|
- continue;
|
|
|
- }
|
|
|
- let row: crate::io::vcf::VCFRow = echtvar_line
|
|
|
- .as_str()
|
|
|
- .parse()
|
|
|
- .context("Failed to parse echtvar VCF row")?;
|
|
|
-
|
|
|
- // Verify that the ID corresponds to the input
|
|
|
- let id: usize = row.id.parse().context("Failed to parse ID")?;
|
|
|
- if id != i + 1 {
|
|
|
- return Err(anyhow::anyhow!(
|
|
|
- "Echtvar output ID {} does not match expected ID {}",
|
|
|
- id,
|
|
|
- i + 1
|
|
|
- ));
|
|
|
- }
|
|
|
+ exec_jobs(
|
|
|
+ jobs,
|
|
|
+ self.config.slurm_runner,
|
|
|
+ self.config.slurm_max_par.into(),
|
|
|
+ )
|
|
|
+ .context("Failed to run EchtVar chunked annotation")?;
|
|
|
+
|
|
|
+ let mut results = Vec::new();
|
|
|
+ for (chunk_hashes, out_tmp) in chunk_metadata {
|
|
|
+ // Parse echtvar output
|
|
|
+ let mut echtvar_rdr = std::io::BufReader::new(get_reader(out_tmp.to_str().unwrap())?);
|
|
|
+ let mut echtvar_line = TsvLine::new();
|
|
|
+ let mut i = 0usize;
|
|
|
+
|
|
|
+ while echtvar_line.read(&mut echtvar_rdr)? {
|
|
|
+ if echtvar_line.as_str().starts_with('#') || echtvar_line.as_str().is_empty() {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let row: crate::io::vcf::VCFRow = echtvar_line
|
|
|
+ .as_str()
|
|
|
+ .parse()
|
|
|
+ .context("Failed to parse echtvar VCF row")?;
|
|
|
+
|
|
|
+ // Verify that the ID corresponds to the input
|
|
|
+ let id: usize = row.id.parse().context("Failed to parse ID")?;
|
|
|
+ if id != i + 1 {
|
|
|
+ return Err(anyhow::anyhow!(
|
|
|
+ "Echtvar output ID {} does not match expected ID {}",
|
|
|
+ id,
|
|
|
+ i + 1
|
|
|
+ ));
|
|
|
+ }
|
|
|
|
|
|
- let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
|
|
|
- let hash = chunk[i].hash;
|
|
|
- i += 1;
|
|
|
+ let (cosmic, gnomad) = parse_echtvar_val(&row.info)?;
|
|
|
+ let hash = chunk_hashes[i];
|
|
|
+ i += 1;
|
|
|
|
|
|
- chunk_results.push((hash, cosmic, gnomad));
|
|
|
- }
|
|
|
+ results.push((hash, cosmic, gnomad));
|
|
|
+ }
|
|
|
|
|
|
- // fs::remove_file(out_tmp)?;
|
|
|
- Ok(chunk_results)
|
|
|
- })
|
|
|
- .flatten()
|
|
|
- .collect::<Vec<_>>();
|
|
|
+ // fs::remove_file(out_tmp)?;
|
|
|
+ }
|
|
|
|
|
|
for (hash, cosmic, gnomad) in results {
|
|
|
// let modified = chrono::Utc::now().to_rfc3339();
|