use crate::{ bam::{Bam, BamOrigin}, config::Config, }; use anyhow::{Context, Ok, Result}; use glob::glob; use indicatif::MultiProgress; use itertools::Itertools; use log::{info, warn}; use pandora_lib_pileup::get_hts_nt_pileup; use pandora_lib_stats::chi_square_test_for_proportions; use pandora_lib_variants::variants::*; use rand::seq::IteratorRandom; use rayon::prelude::*; use rust_htslib::bam; use serde::Serialize; use utoipa::ToSchema; use std::{ collections::HashMap, io::{BufRead, BufReader}, ops::{Deref, DerefMut}, path::Path, sync::{Arc, Mutex}, }; #[derive(Debug, Clone, Serialize)] pub struct Cases { pub cases: Vec, pub config: Config, } impl Cases { pub fn load( mp: MultiProgress, skip_ids: Option>, only_ids: Option>, check_snp: bool, load_variants: bool, ) -> Result { let ids_to_skip = skip_ids.unwrap_or(vec![]); let config = Config::get()?; let mut diag_bams = HashMap::new(); for entry in glob(&config.diag_bam_glob).context("Failed to read glob pattern")? { let path = entry?; let case_id = path .parent() .context("")? .parent() .context("")? .file_name() .context("")? .to_str() .context("")? .to_string(); let b = Bam::new(path.clone(), BamOrigin::DIAG); diag_bams.insert(case_id, b); } let mut mrd_bams = HashMap::new(); for entry in glob(&config.mrd_bam_glob).context("Failed to read glob pattern")? { let path = entry?; let case_id = path .parent() .context("")? .parent() .context("")? .file_name() .context("")? .to_str() .context("")? .to_string(); let b = Bam::new(path.clone(), BamOrigin::MRD); mrd_bams.insert(case_id, b); } // Check SNP AF differences between diag and mrd let mut retained_cases = Vec::new(); let diff_snp_opt = if check_snp { Some(DiffSnp::init(&config.commun_snp)?) } else { None }; for (id, diag_bam) in diag_bams { if ids_to_skip.contains(&id) { continue; } if let Some(only_ids) = &only_ids { if !only_ids.contains(&id) { continue; } } if let Some(mrd_bam) = mrd_bams.get(&id) { // verify if both samples match commun snps if let Some(diff_snp) = &diff_snp_opt { let diff = diff_snp.diff_prop( diag_bam.path.to_str().unwrap(), mrd_bam.path.to_str().unwrap(), )?; if diff > config.max_snp_diff_prop { warn!("{id} diag and mrd seems to have been sequenced from two patients."); continue; } } let case = Case::new(id.clone(), diag_bam.clone(), mrd_bam.clone())?; retained_cases.push(case); } } if load_variants { for case in retained_cases.iter_mut() { let dir = case .diag_bam .path .parent() .context("")? .to_str() .context("")?; case.load_variants(&format!("{dir}/{}_variants.bytes.gz", case.id), &mp)?; } } Ok(Self { cases: retained_cases, config }) } pub fn stats(&self) { self.iter().for_each(|case| { let n_var = match case.n_variants() { Some(n) => n.to_string(), None => "NA".to_string(), }; let cols = vec![case.id.to_string(), n_var]; info!("{}", cols.join("\t")); }) } } impl IntoIterator for Cases { type Item = Case; type IntoIter = as IntoIterator>::IntoIter; fn into_iter(self) -> Self::IntoIter { self.cases.into_iter() } } impl Deref for Cases { type Target = Vec; fn deref(&self) -> &Self::Target { &self.cases } } impl DerefMut for Cases { fn deref_mut(&mut self) -> &mut Vec { &mut self.cases } } #[derive(Debug, Clone, Serialize)] pub struct Case { pub id: String, pub diag_bam: Bam, pub mrd_bam: Bam, pub variants: Option, } impl Case { pub fn new(id: String, diag_bam: Bam, mrd_bam: Bam) -> Result { Ok(Self { id, diag_bam, mrd_bam, variants: None, }) } pub fn load_variants(&mut self, variants_path: &str, mp: &MultiProgress) -> Result<()> { info!("Loading variants for {}", self.id); if !Path::new(variants_path).exists() { match run_pipe(&self.id, &mp) { std::result::Result::Ok(_) => info!("{} variants analyzed.", self.id), Err(e) => warn!("Error while analyzing {}: {e}", self.id), } } if let std::result::Result::Ok(variants) = Variants::new_from_bytes(&self.id, variants_path, mp.clone()) { self.variants = Some(variants); } Ok(()) } pub fn n_variants(&self) -> Option { match &self.variants { Some(v) => Some(v.len()), None => None, } } pub fn variants_stat(&self) -> Option { match &self.variants { Some(v) => { if let std::result::Result::Ok(stats) = v.stats() { Some(CaseStats { id: self.id.to_string(), stats, }) } else { None } } None => None, } } } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct CaseStats { id: String, stats: Vec, } pub struct DiffSnp { lines: Vec, } impl DiffSnp { pub fn init(commun_snp: &str) -> Result { info!("Loading {commun_snp}"); let commun_snp_reader = BufReader::new(pandora_lib_variants::in_out::get_reader(commun_snp)?); let lines: Vec = commun_snp_reader .lines() .into_iter() .map(|e| e.unwrap()) .collect(); Ok(Self { lines }) } pub fn diff_prop(&self, diag_bam_path: &str, mrd_bam_path: &str) -> Result { let mut rng = rand::thread_rng(); let lines = self .lines .clone() .into_iter() .choose_multiple(&mut rng, 10_000); let max_p_val = 0.0001; let diff = Arc::new(Mutex::new(0u64)); let eq = Arc::new(Mutex::new(0u64)); lines.par_chunks(100).for_each(|snp_lines| { let mut diag_bam = bam::IndexedReader::from_path(diag_bam_path).unwrap(); let mut mrd_bam = bam::IndexedReader::from_path(mrd_bam_path).unwrap(); for snp_line in snp_lines.iter() { let snp_cols: Vec<&str> = snp_line.splitn(3, "\t").collect(); let diag_pileup = get_hts_nt_pileup( &mut diag_bam, snp_cols.get(0).unwrap(), snp_cols[1].parse().unwrap(), false, ) .unwrap(); let diag_pileup_counts: Vec<_> = diag_pileup .clone() .into_iter() .counts() .into_iter() .filter(|(b, _)| *b != b'D') .filter(|(_, n)| *n > 2) .collect(); let diag_pileup_len = diag_pileup.len(); if diag_pileup_len >= 10 && diag_pileup_counts.len() <= 2 { let mrd_pileup = get_hts_nt_pileup( &mut mrd_bam, snp_cols[0], snp_cols[1].parse().unwrap(), false, ) .unwrap(); let mrd_pileup_len = mrd_pileup.len(); let mrd_pileup_counts: Vec<_> = mrd_pileup .clone() .into_iter() .counts() .into_iter() .filter(|(b, _)| *b != b'D') .filter(|(_, n)| *n > 2) .collect(); if mrd_pileup_len >= 10 && mrd_pileup_counts.len() <= 2 { if mrd_pileup_counts.len() == 1 && diag_pileup_counts.len() == 1 { let (diag_a_b, _) = diag_pileup_counts.first().unwrap(); let (mrd_a_b, _) = diag_pileup_counts.first().unwrap(); if diag_a_b != mrd_a_b { // println!("diff {snp_cols:?}"); let mut d = diff.lock().unwrap(); *d += 1; } else { // println!("eq {snp_cols:?}"); let mut e = eq.lock().unwrap(); *e += 1; } } else if mrd_pileup_counts.len() != diag_pileup_counts.len() { // println!("diff {snp_cols:?} {mrd_pileup_counts:?} {diag_pileup_counts:?}"); let mut d = diff.lock().unwrap(); *d += 1; // let mut d = diff.lock().unwrap(); // *d += 1; } else if mrd_pileup_counts.len() == 2 && diag_pileup_counts.len() == 2 { let (diag_a_b, diag_a_n) = diag_pileup_counts.first().unwrap(); let (diag_b_b, diag_b_n) = diag_pileup_counts.last().unwrap(); let (mrd_a_b, mrd_a_n) = diag_pileup_counts.first().unwrap(); let (a, c) = if diag_a_b == mrd_a_b { (diag_a_n, mrd_a_n) } else if diag_b_b == mrd_a_b { (diag_b_n, mrd_a_n) } else { continue; }; let p = chi_square_test_for_proportions( *a as f64, diag_pileup_len as f64, *c as f64, mrd_pileup_len as f64, ) .unwrap(); if p <= max_p_val && p != 0.0 { let mut d = diff.lock().unwrap(); // println!( // "diff {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}" // ); *d += 1; } else { // println!( // "eq {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}" // ); let mut e = eq.lock().unwrap(); *e += 1; } } else { continue; } } } } }); let diff = Arc::try_unwrap(diff).unwrap().into_inner()?; let eq = Arc::try_unwrap(eq).unwrap().into_inner()?; Ok(diff as f64 * 100.0 / (diff + eq) as f64) } }