||
- use crate::{
- bam::{Bam, BamOrigin},
- config::Config,
- };
- use anyhow::{Context, Ok, Result};
- use glob::glob;
- use indicatif::MultiProgress;
- use itertools::Itertools;
- use log::{info, warn};
- use pandora_lib_pileup::get_hts_nt_pileup;
- use pandora_lib_stats::chi_square_test_for_proportions;
- use pandora_lib_variants::variants::*;
- use rayon::prelude::*;
- use serde::Serialize;
- use std::{
- collections::HashMap,
- io::{BufRead, BufReader},
- ops::{Deref, DerefMut},
- path::Path,
- sync::{Arc, Mutex},
- };
- use rand::seq::IteratorRandom;
- use rust_htslib::bam;
- #[derive(Debug, Clone, Serialize)]
- pub struct Cases {
- pub cases: Vec<Case>,
- pub config: Config,
- }
- impl Cases {
- pub fn load(mp: MultiProgress, skip_ids: Option<Vec<String>>) -> Result<Self> {
- let ids_to_skip = skip_ids.unwrap_or(vec![]);
- let config = Config::get()?;
- let mut diag_bams = HashMap::new();
- for entry in glob(&config.diag_bam_glob).context("Failed to read glob pattern")? {
- let path = entry?;
- let case_id = path
- .parent()
- .context("")?
- .parent()
- .context("")?
- .file_name()
- .context("")?
- .to_str()
- .context("")?
- .to_string();
- let b = Bam::new(path.clone(), BamOrigin::DIAG);
- diag_bams.insert(case_id, b);
- }
- let mut mrd_bams = HashMap::new();
- for entry in glob(&config.mrd_bam_glob).context("Failed to read glob pattern")? {
- let path = entry?;
- let case_id = path
- .parent()
- .context("")?
- .parent()
- .context("")?
- .file_name()
- .context("")?
- .to_str()
- .context("")?
- .to_string();
- let b = Bam::new(path.clone(), BamOrigin::MRD);
- mrd_bams.insert(case_id, b);
- }
- // Check SNP AF differences between diag and mrd
- let mut cases = Vec::new();
- let diff_snp = DiffSnp::init(&config.commun_snp)?;
- for (id, diag_bam) in diag_bams {
- if ids_to_skip.contains(&id) {
- continue;
- }
- if let Some(mrd_bam) = mrd_bams.get(&id) {
- // verify if both samples match commun snps
- let diff = diff_snp.diff_prop(diag_bam.path.to_str().unwrap(), mrd_bam.path.to_str().unwrap())?;
- if diff < config.max_snp_diff_prop {
- let mut case = Case::new(id.clone(), diag_bam.clone(), mrd_bam.clone())?;
- let dir = diag_bam.path.parent().context("")?.to_str().context("")?;
- case.load_variants(&format!("{dir}/{id}_variants.bytes.gz"), &mp)?;
- cases.push(case);
- } else {
- warn!("{id} diag and mrd seems to have been sequenced from two patients.");
- }
- }
- }
- Ok(Self { cases, config })
- }
- pub fn stats(&self) {
- self.iter().for_each(|case| {
- let n_var = match case.n_variants() {
- Some(n) => n.to_string(),
- None => "NA".to_string(),
- };
- let cols = vec![case.id.to_string(), n_var];
- info!("{}", cols.join("\t"));
- })
- }
- }
- impl IntoIterator for Cases {
- type Item = Case;
- type IntoIter = <Vec<Case> as IntoIterator>::IntoIter;
- fn into_iter(self) -> Self::IntoIter {
- self.cases.into_iter()
- }
- }
- impl Deref for Cases {
- type Target = Vec<Case>;
- fn deref(&self) -> &Self::Target {
- &self.cases
- }
- }
- impl DerefMut for Cases {
- fn deref_mut(&mut self) -> &mut Vec<Case> {
- &mut self.cases
- }
- }
- #[derive(Debug, Clone, Serialize)]
- pub struct Case {
- pub id: String,
- pub diag_bam: Bam,
- pub mrd_bam: Bam,
- pub variants: Option<Variants>,
- }
- impl Case {
- pub fn new(id: String, diag_bam: Bam, mrd_bam: Bam) -> Result<Self> {
- Ok(Self {
- id,
- diag_bam,
- mrd_bam,
- variants: None,
- })
- }
- pub fn load_variants(&mut self, variants_path: &str, mp: &MultiProgress) -> Result<()> {
- info!("Loading variants for {}", self.id);
- if !Path::new(variants_path).exists() {
- match run_pipe(&self.id, &mp) {
- std::result::Result::Ok(_) => info!("{} variants analyzed.", self.id),
- Err(e) => warn!("Error while analyzing {}: {e}", self.id),
- }
- }
- if let std::result::Result::Ok(variants) =
- Variants::new_from_bytes(&self.id, variants_path, mp.clone())
- {
- self.variants = Some(variants);
- }
- Ok(())
- }
- pub fn n_variants(&self) -> Option<usize> {
- match &self.variants {
- Some(v) => Some(v.len()),
- None => None,
- }
- }
- pub fn variants_stat(&self) -> Option<String> {
- match &self.variants {
- Some(v) => {
- if let std::result::Result::Ok(res) = v.stats() {
- Some(res)
- } else {
- None
- }
- },
- None => None,
- }
- }
- }
- pub struct DiffSnp {
- lines: Vec<String>,
- }
- impl DiffSnp {
- pub fn init(commun_snp: &str) -> Result<Self> {
- info!("Loading {commun_snp}");
- let commun_snp_reader =
- BufReader::new(pandora_lib_variants::in_out::get_reader(commun_snp)?);
- let lines: Vec<String> = commun_snp_reader
- .lines()
- .into_iter()
- .map(|e| e.unwrap())
- .collect();
- Ok(Self { lines })
- }
- pub fn diff_prop(&self, diag_bam_path: &str, mrd_bam_path: &str) -> Result<f64> {
- let mut rng = rand::thread_rng();
- let lines = self.lines.clone().into_iter().choose_multiple(&mut rng, 10_000);
- let max_p_val = 0.0001;
- let diff = Arc::new(Mutex::new(0u64));
- let eq = Arc::new(Mutex::new(0u64));
- lines.par_chunks(100).for_each(|snp_lines| {
- let mut diag_bam = bam::IndexedReader::from_path(diag_bam_path).unwrap();
- let mut mrd_bam = bam::IndexedReader::from_path(mrd_bam_path).unwrap();
- for snp_line in snp_lines.iter() {
- let snp_cols: Vec<&str> = snp_line.splitn(3, "\t").collect();
- let diag_pileup = get_hts_nt_pileup(
- &mut diag_bam,
- snp_cols.get(0).unwrap(),
- snp_cols[1].parse().unwrap(),
- false,
- )
- .unwrap();
- let diag_pileup_counts: Vec<_> = diag_pileup
- .clone()
- .into_iter()
- .counts()
- .into_iter()
- .filter(|(b, _)| *b != b'D')
- .filter(|(_, n)| *n > 2)
- .collect();
- let diag_pileup_len = diag_pileup.len();
- if diag_pileup_len >= 10 && diag_pileup_counts.len() <= 2 {
- let mrd_pileup = get_hts_nt_pileup(
- &mut mrd_bam,
- snp_cols[0],
- snp_cols[1].parse().unwrap(),
- false,
- )
- .unwrap();
- let mrd_pileup_len = mrd_pileup.len();
- let mrd_pileup_counts: Vec<_> = mrd_pileup
- .clone()
- .into_iter()
- .counts()
- .into_iter()
- .filter(|(b, _)| *b != b'D')
- .filter(|(_, n)| *n > 2)
- .collect();
- if mrd_pileup_len >= 10 && mrd_pileup_counts.len() <= 2 {
- if mrd_pileup_counts.len() == 1 && diag_pileup_counts.len() == 1 {
- let (diag_a_b, _) = diag_pileup_counts.first().unwrap();
- let (mrd_a_b, _) = diag_pileup_counts.first().unwrap();
- if diag_a_b != mrd_a_b {
- // println!("diff {snp_cols:?}");
- let mut d = diff.lock().unwrap();
- *d += 1;
- } else {
- // println!("eq {snp_cols:?}");
- let mut e = eq.lock().unwrap();
- *e += 1;
- }
- } else if mrd_pileup_counts.len() != diag_pileup_counts.len() {
- // println!("diff {snp_cols:?} {mrd_pileup_counts:?} {diag_pileup_counts:?}");
- let mut d = diff.lock().unwrap();
- *d += 1;
- // let mut d = diff.lock().unwrap();
- // *d += 1;
- } else if mrd_pileup_counts.len() == 2 && diag_pileup_counts.len() == 2 {
- let (diag_a_b, diag_a_n) = diag_pileup_counts.first().unwrap();
- let (diag_b_b, diag_b_n) = diag_pileup_counts.last().unwrap();
- let (mrd_a_b, mrd_a_n) = diag_pileup_counts.first().unwrap();
- let (a, c) = if diag_a_b == mrd_a_b {
- (diag_a_n, mrd_a_n)
- } else if diag_b_b == mrd_a_b {
- (diag_b_n, mrd_a_n)
- } else {
- continue;
- };
- let p = chi_square_test_for_proportions(
- *a as f64,
- diag_pileup_len as f64,
- *c as f64,
- mrd_pileup_len as f64,
- )
- .unwrap();
- if p <= max_p_val && p != 0.0 {
- let mut d = diff.lock().unwrap();
- // println!(
- // "diff {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
- // );
- *d += 1;
- } else {
- // println!(
- // "eq {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
- // );
- let mut e = eq.lock().unwrap();
- *e += 1;
- }
- } else {
- continue;
- }
- }
- }
- }
- });
- let diff = Arc::try_unwrap(diff).unwrap().into_inner()?;
- let eq = Arc::try_unwrap(eq).unwrap().into_inner()?;
- Ok(diff as f64 * 100.0 / (diff + eq) as f64)
- }
- }
|