|
|
@@ -9,7 +9,7 @@ use std::{
|
|
|
use crate::{
|
|
|
callers::nanomonsv::nanomonsv_insert_classify,
|
|
|
io::{somaticpipe_container::PandoraReader, tsv::TsvLine, vcf::read_vcf},
|
|
|
- runners::Run,
|
|
|
+ runners::Run, variant::vcf_variant::VariantId,
|
|
|
};
|
|
|
use anyhow::Context;
|
|
|
use bitcode::{Decode, Encode};
|
|
|
@@ -309,13 +309,9 @@ impl VariantCollection {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- fn chunk_size(&self, max_threads: u8) -> usize {
|
|
|
- let total_items = self.variants.len();
|
|
|
- let min_chunk_size = 1000;
|
|
|
- let max_chunks = max_threads;
|
|
|
-
|
|
|
- let optimal_chunk_size = total_items.div_ceil(max_chunks as usize);
|
|
|
- optimal_chunk_size.max(min_chunk_size)
|
|
|
+ fn chunk_size(&self) -> usize {
|
|
|
+ let n_threads = rayon::current_num_threads().max(1);
|
|
|
+ self.variants.len().div_ceil(n_threads).max(1)
|
|
|
}
|
|
|
|
|
|
/// Annotates variants with local sequence context–based features:
|
|
|
@@ -372,7 +368,6 @@ impl VariantCollection {
|
|
|
annotations: &Annotations,
|
|
|
reference: &str,
|
|
|
seq_len: usize,
|
|
|
- max_threads: u8,
|
|
|
) -> anyhow::Result<()> {
|
|
|
// Preflight: fail early rather than panicking in rayon workers.
|
|
|
noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
@@ -382,7 +377,7 @@ impl VariantCollection {
|
|
|
// Need at least 3 to compute a trinucleotide.
|
|
|
let seq_len = seq_len.max(3);
|
|
|
|
|
|
- let chunk_size = self.chunk_size(max_threads);
|
|
|
+ let chunk_size = self.chunk_size();
|
|
|
|
|
|
self.variants.par_chunks(chunk_size).for_each_init(
|
|
|
|| noodles_fasta::io::indexed_reader::Builder::default().build_from_path(reference),
|
|
|
@@ -530,7 +525,7 @@ impl VariantCollection {
|
|
|
&self,
|
|
|
annotations: &Annotations,
|
|
|
constit_bam_path: &str,
|
|
|
- max_threads: u8,
|
|
|
+ config: &Config,
|
|
|
) -> anyhow::Result<()> {
|
|
|
fn folder<'a>(alt_seq: &'a str) -> impl Fn((u32, u32), (String, i32)) -> (u32, u32) + 'a {
|
|
|
move |(depth_acc, alt_acc), (seq, n): (String, i32)| {
|
|
|
@@ -543,142 +538,158 @@ impl VariantCollection {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- self.variants
|
|
|
- .par_chunks(self.chunk_size(max_threads))
|
|
|
- .try_for_each(|chunk| {
|
|
|
- let mut bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to open BAM file: {e}"))?;
|
|
|
+ // Preflight: fail early rather than silently inside rayon workers.
|
|
|
+ rust_htslib::bam::IndexedReader::from_path(constit_bam_path)
|
|
|
+ .map_err(|e| anyhow::anyhow!("Failed to open BAM {constit_bam_path}: {e}"))?;
|
|
|
+ noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
+ .build_from_path(&config.reference)
|
|
|
+ .map_err(|e| anyhow::anyhow!("Failed to open FASTA {}: {e}", config.reference))?;
|
|
|
|
|
|
- let c = crate::config::Config::default();
|
|
|
+ let chunk_size = self.chunk_size();
|
|
|
+ let reference = config.reference.clone();
|
|
|
|
|
|
- let mut fasta_reader = noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
- .build_from_path(c.reference)?;
|
|
|
+ self.variants
|
|
|
+ .par_chunks(chunk_size)
|
|
|
+ .for_each_init(
|
|
|
+ || {
|
|
|
+ let bam = rust_htslib::bam::IndexedReader::from_path(constit_bam_path);
|
|
|
+ let fasta = noodles_fasta::io::indexed_reader::Builder::default()
|
|
|
+ .build_from_path(&reference);
|
|
|
+ (bam, fasta)
|
|
|
+ },
|
|
|
+ |(bam_res, fasta_res), chunk| {
|
|
|
+ let Ok(ref mut bam) = bam_res else {
|
|
|
+ error!("BAM reader unavailable for chunk");
|
|
|
+ return;
|
|
|
+ };
|
|
|
+ let Ok(ref mut fasta_reader) = fasta_res else {
|
|
|
+ error!("FASTA reader unavailable for chunk");
|
|
|
+ return;
|
|
|
+ };
|
|
|
|
|
|
- for var in chunk {
|
|
|
- let key = var.hash;
|
|
|
- let mut anns = annotations.store.entry(key).or_default();
|
|
|
+ for var in chunk {
|
|
|
+ let key = var.hash;
|
|
|
+ let mut anns = annotations.store.entry(key).or_default();
|
|
|
|
|
|
- if anns
|
|
|
- .iter()
|
|
|
- .filter(|e| {
|
|
|
- matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
|
|
|
- })
|
|
|
- .count()
|
|
|
- != 2
|
|
|
- {
|
|
|
- match var.alteration_category() {
|
|
|
- AlterationCategory::SNV => {
|
|
|
- let pileup = counts_at(
|
|
|
- &mut bam,
|
|
|
- &var.position.contig(),
|
|
|
- var.position.position,
|
|
|
- )?;
|
|
|
- let alt_seq = var.alternative.to_string();
|
|
|
-
|
|
|
- let (depth, alt) =
|
|
|
- pileup.into_iter().fold((0, 0), folder(&alt_seq));
|
|
|
- // debug!("{} {alt} / {depth}", var.variant_id());
|
|
|
- anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
- anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
- }
|
|
|
- AlterationCategory::DEL => {
|
|
|
- if let Some(del_repr) = var.deletion_desc() {
|
|
|
- let len = var.deletion_len().unwrap_or_default();
|
|
|
+ if anns
|
|
|
+ .iter()
|
|
|
+ .filter(|e| {
|
|
|
+ matches!(e, Annotation::ConstitAlt(_) | Annotation::ConstitDepth(_))
|
|
|
+ })
|
|
|
+ .count()
|
|
|
+ == 2
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- let pileup_start = crate::collection::bam::nt_pileup_new(
|
|
|
- &mut bam,
|
|
|
+ let result = (|| -> anyhow::Result<()> {
|
|
|
+ match var.alteration_category() {
|
|
|
+ AlterationCategory::SNV => {
|
|
|
+ let pileup = counts_at(
|
|
|
+ bam,
|
|
|
&var.position.contig(),
|
|
|
- del_repr.start.saturating_sub(1),
|
|
|
- false,
|
|
|
+ var.position.position,
|
|
|
)?;
|
|
|
+ let alt_seq = var.alternative.to_string();
|
|
|
+ let (depth, alt) =
|
|
|
+ pileup.into_iter().fold((0, 0), folder(&alt_seq));
|
|
|
+ anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
+ anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
+ }
|
|
|
+ AlterationCategory::DEL => {
|
|
|
+ if let Some(del_repr) = var.deletion_desc() {
|
|
|
+ let len = var.deletion_len().unwrap_or_default();
|
|
|
|
|
|
- let pileup_end = crate::collection::bam::nt_pileup_new(
|
|
|
- &mut bam,
|
|
|
- &var.position.contig(),
|
|
|
- del_repr.end.saturating_sub(1),
|
|
|
- false,
|
|
|
- )?;
|
|
|
+ let pileup_start = crate::collection::bam::nt_pileup_new(
|
|
|
+ bam,
|
|
|
+ &var.position.contig(),
|
|
|
+ del_repr.start.saturating_sub(1),
|
|
|
+ false,
|
|
|
+ )?;
|
|
|
|
|
|
- let tol = if len > 1 {
|
|
|
- let seq = crate::io::fasta::sequence_range(
|
|
|
- &mut fasta_reader,
|
|
|
+ let pileup_end = crate::collection::bam::nt_pileup_new(
|
|
|
+ bam,
|
|
|
&var.position.contig(),
|
|
|
- del_repr.start.saturating_sub(1) as usize,
|
|
|
- del_repr.end.saturating_sub(1) as usize,
|
|
|
+ del_repr.end.saturating_sub(1),
|
|
|
+ false,
|
|
|
)?;
|
|
|
|
|
|
- match detect_repetition(&seq) {
|
|
|
- Repeat::None => 0,
|
|
|
- Repeat::RepOne(_, _) => 3,
|
|
|
- Repeat::RepTwo(_, _) => 3,
|
|
|
- }
|
|
|
- } else {
|
|
|
- 0
|
|
|
- };
|
|
|
-
|
|
|
- let alt: u32 = pileup_start
|
|
|
- .iter()
|
|
|
- .map(|pb| match pb {
|
|
|
- crate::collection::bam::PileBase::Del((_qn, l))
|
|
|
- if /* end_qnames.contains(qn) */
|
|
|
- *l >= len.saturating_sub(tol).max(1)
|
|
|
- && *l <= len + tol =>
|
|
|
- {
|
|
|
- 1
|
|
|
+ let tol = if len > 1 {
|
|
|
+ let seq = crate::io::fasta::sequence_range(
|
|
|
+ fasta_reader,
|
|
|
+ &var.position.contig(),
|
|
|
+ del_repr.start.saturating_sub(1) as usize,
|
|
|
+ del_repr.end.saturating_sub(1) as usize,
|
|
|
+ )?;
|
|
|
+ match detect_repetition(&seq) {
|
|
|
+ Repeat::None => 0,
|
|
|
+ Repeat::RepOne(_, _) => 3,
|
|
|
+ Repeat::RepTwo(_, _) => 3,
|
|
|
}
|
|
|
- _ => 0,
|
|
|
- })
|
|
|
- .sum();
|
|
|
-
|
|
|
- let depth = pileup_start.len().max(pileup_end.len());
|
|
|
-
|
|
|
- // debug!("{} {alt} / {depth} {len}", var.variant_id());
|
|
|
-
|
|
|
- anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
- anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
- } else {
|
|
|
- anns.push(Annotation::ConstitAlt(0_u16));
|
|
|
- anns.push(Annotation::ConstitDepth(111_u16));
|
|
|
+ } else {
|
|
|
+ 0
|
|
|
+ };
|
|
|
+
|
|
|
+ let alt: u32 = pileup_start
|
|
|
+ .iter()
|
|
|
+ .map(|pb| match pb {
|
|
|
+ crate::collection::bam::PileBase::Del((_qn, l))
|
|
|
+ if *l >= len.saturating_sub(tol).max(1)
|
|
|
+ && *l <= len + tol =>
|
|
|
+ {
|
|
|
+ 1
|
|
|
+ }
|
|
|
+ _ => 0,
|
|
|
+ })
|
|
|
+ .sum();
|
|
|
+
|
|
|
+ let depth = pileup_start.len().max(pileup_end.len());
|
|
|
+ anns.push(Annotation::ConstitAlt(alt as u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(depth as u16));
|
|
|
+ } else {
|
|
|
+ anns.push(Annotation::ConstitAlt(0_u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(111_u16));
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- AlterationCategory::INS => {
|
|
|
- let normal_pileup = crate::collection::bam::nt_pileup_new(
|
|
|
- &mut bam,
|
|
|
- &var.position.contig(),
|
|
|
- var.position.position,
|
|
|
- false,
|
|
|
- )?;
|
|
|
- let normal_depth = normal_pileup.len();
|
|
|
-
|
|
|
- let alt_seq = var.inserted_seq().unwrap_or_default();
|
|
|
- let alt_seq = alt_seq.as_bytes().to_vec();
|
|
|
-
|
|
|
- let mut normal_n_alt = 0;
|
|
|
- for pile in normal_pileup {
|
|
|
- if let PileBase::Ins { len, seq } = pile {
|
|
|
- if let Some(seq) = seq {
|
|
|
- let dist = levenshtein_exp(&alt_seq, &seq);
|
|
|
- let dist_frac = dist as f64 / len as f64;
|
|
|
- // dbg!(dist_frac);
|
|
|
- if dist_frac < 0.1 {
|
|
|
+ AlterationCategory::INS => {
|
|
|
+ let normal_pileup = crate::collection::bam::nt_pileup_new(
|
|
|
+ bam,
|
|
|
+ &var.position.contig(),
|
|
|
+ var.position.position,
|
|
|
+ false,
|
|
|
+ )?;
|
|
|
+ let normal_depth = normal_pileup.len();
|
|
|
+ let alt_seq = var.inserted_seq().unwrap_or_default();
|
|
|
+ let alt_seq = alt_seq.as_bytes().to_vec();
|
|
|
+
|
|
|
+ let mut normal_n_alt = 0;
|
|
|
+ for pile in normal_pileup {
|
|
|
+ if let PileBase::Ins { len, seq } = pile {
|
|
|
+ if let Some(seq) = seq {
|
|
|
+ let dist = levenshtein_exp(&alt_seq, &seq);
|
|
|
+ let dist_frac = dist as f64 / len as f64;
|
|
|
+ if dist_frac < 0.1 {
|
|
|
+ normal_n_alt += 1;
|
|
|
+ }
|
|
|
+ } else if alt_seq.len() as u32 == len {
|
|
|
normal_n_alt += 1;
|
|
|
}
|
|
|
- } else if alt_seq.len() as u32 == len {
|
|
|
- normal_n_alt += 1;
|
|
|
}
|
|
|
}
|
|
|
+ anns.push(Annotation::ConstitAlt(normal_n_alt as u16));
|
|
|
+ anns.push(Annotation::ConstitDepth(normal_depth as u16));
|
|
|
}
|
|
|
-
|
|
|
- // debug!("{} {alt} / {depth} ", var.variant_id());
|
|
|
- anns.push(Annotation::ConstitAlt(normal_n_alt as u16));
|
|
|
- anns.push(Annotation::ConstitDepth(normal_depth as u16));
|
|
|
+ _ => (),
|
|
|
}
|
|
|
- _ => (),
|
|
|
+ Ok(())
|
|
|
+ })();
|
|
|
+
|
|
|
+ if let Err(e) = result {
|
|
|
+ warn!("BAM annotation failed for {}: {e}", var.variant_id());
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- anyhow::Ok(())
|
|
|
- })?;
|
|
|
+ },
|
|
|
+ );
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
@@ -2388,7 +2399,7 @@ mod tests {
|
|
|
|
|
|
let constit_bam_path = &config.normal_bam("CHAHA");
|
|
|
let annotations = Annotations::default();
|
|
|
- coll.annotate_with_constit_bam(&annotations, constit_bam_path, 1)?;
|
|
|
+ coll.annotate_with_constit_bam(&annotations, constit_bam_path, &config)?;
|
|
|
|
|
|
println!("{annotations:#?}");
|
|
|
Ok(())
|