cases.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. use crate::{
  2. bam::{Bam, BamOrigin},
  3. config::Config,
  4. };
  5. use anyhow::{Context, Ok, Result};
  6. use glob::glob;
  7. use indicatif::MultiProgress;
  8. use itertools::Itertools;
  9. use log::{info, warn};
  10. use pandora_lib_pileup::get_hts_nt_pileup;
  11. use pandora_lib_stats::chi_square_test_for_proportions;
  12. use pandora_lib_variants::variants::*;
  13. use rayon::prelude::*;
  14. use serde::Serialize;
  15. use std::{
  16. collections::HashMap,
  17. io::{BufRead, BufReader},
  18. ops::{Deref, DerefMut},
  19. path::Path,
  20. sync::{Arc, Mutex},
  21. };
  22. use rand::seq::IteratorRandom;
  23. use rust_htslib::bam;
  24. #[derive(Debug, Clone, Serialize)]
  25. pub struct Cases {
  26. pub cases: Vec<Case>,
  27. pub config: Config,
  28. }
  29. impl Cases {
  30. pub fn load(mp: MultiProgress, skip_ids: Option<Vec<String>>) -> Result<Self> {
  31. let ids_to_skip = skip_ids.unwrap_or(vec![]);
  32. let config = Config::get()?;
  33. let mut diag_bams = HashMap::new();
  34. for entry in glob(&config.diag_bam_glob).context("Failed to read glob pattern")? {
  35. let path = entry?;
  36. let case_id = path
  37. .parent()
  38. .context("")?
  39. .parent()
  40. .context("")?
  41. .file_name()
  42. .context("")?
  43. .to_str()
  44. .context("")?
  45. .to_string();
  46. let b = Bam::new(path.clone(), BamOrigin::DIAG);
  47. diag_bams.insert(case_id, b);
  48. }
  49. let mut mrd_bams = HashMap::new();
  50. for entry in glob(&config.mrd_bam_glob).context("Failed to read glob pattern")? {
  51. let path = entry?;
  52. let case_id = path
  53. .parent()
  54. .context("")?
  55. .parent()
  56. .context("")?
  57. .file_name()
  58. .context("")?
  59. .to_str()
  60. .context("")?
  61. .to_string();
  62. let b = Bam::new(path.clone(), BamOrigin::MRD);
  63. mrd_bams.insert(case_id, b);
  64. }
  65. // Check SNP AF differences between diag and mrd
  66. let mut cases = Vec::new();
  67. let diff_snp = DiffSnp::init(&config.commun_snp)?;
  68. for (id, diag_bam) in diag_bams {
  69. if ids_to_skip.contains(&id) {
  70. continue;
  71. }
  72. if let Some(mrd_bam) = mrd_bams.get(&id) {
  73. // verify if both samples match commun snps
  74. let diff = diff_snp.diff_prop(diag_bam.path.to_str().unwrap(), mrd_bam.path.to_str().unwrap())?;
  75. if diff < config.max_snp_diff_prop {
  76. let mut case = Case::new(id.clone(), diag_bam.clone(), mrd_bam.clone())?;
  77. let dir = diag_bam.path.parent().context("")?.to_str().context("")?;
  78. case.load_variants(&format!("{dir}/{id}_variants.bytes.gz"), &mp)?;
  79. cases.push(case);
  80. } else {
  81. warn!("{id} diag and mrd seems to have been sequenced from two patients.");
  82. }
  83. }
  84. }
  85. Ok(Self { cases, config })
  86. }
  87. pub fn stats(&self) {
  88. self.iter().for_each(|case| {
  89. let n_var = match case.n_variants() {
  90. Some(n) => n.to_string(),
  91. None => "NA".to_string(),
  92. };
  93. let cols = vec![case.id.to_string(), n_var];
  94. info!("{}", cols.join("\t"));
  95. })
  96. }
  97. }
  98. impl IntoIterator for Cases {
  99. type Item = Case;
  100. type IntoIter = <Vec<Case> as IntoIterator>::IntoIter;
  101. fn into_iter(self) -> Self::IntoIter {
  102. self.cases.into_iter()
  103. }
  104. }
  105. impl Deref for Cases {
  106. type Target = Vec<Case>;
  107. fn deref(&self) -> &Self::Target {
  108. &self.cases
  109. }
  110. }
  111. impl DerefMut for Cases {
  112. fn deref_mut(&mut self) -> &mut Vec<Case> {
  113. &mut self.cases
  114. }
  115. }
  116. #[derive(Debug, Clone, Serialize)]
  117. pub struct Case {
  118. pub id: String,
  119. pub diag_bam: Bam,
  120. pub mrd_bam: Bam,
  121. pub variants: Option<Variants>,
  122. }
  123. impl Case {
  124. pub fn new(id: String, diag_bam: Bam, mrd_bam: Bam) -> Result<Self> {
  125. Ok(Self {
  126. id,
  127. diag_bam,
  128. mrd_bam,
  129. variants: None,
  130. })
  131. }
  132. pub fn load_variants(&mut self, variants_path: &str, mp: &MultiProgress) -> Result<()> {
  133. info!("Loading variants for {}", self.id);
  134. if !Path::new(variants_path).exists() {
  135. match run_pipe(&self.id, &mp) {
  136. std::result::Result::Ok(_) => info!("{} variants analyzed.", self.id),
  137. Err(e) => warn!("Error while analyzing {}: {e}", self.id),
  138. }
  139. }
  140. if let std::result::Result::Ok(variants) =
  141. Variants::new_from_bytes(&self.id, variants_path, mp.clone())
  142. {
  143. self.variants = Some(variants);
  144. }
  145. Ok(())
  146. }
  147. pub fn n_variants(&self) -> Option<usize> {
  148. match &self.variants {
  149. Some(v) => Some(v.len()),
  150. None => None,
  151. }
  152. }
  153. pub fn variants_stat(&self) -> Option<String> {
  154. match &self.variants {
  155. Some(v) => {
  156. if let std::result::Result::Ok(res) = v.stats() {
  157. Some(res)
  158. } else {
  159. None
  160. }
  161. },
  162. None => None,
  163. }
  164. }
  165. }
  166. pub struct DiffSnp {
  167. lines: Vec<String>,
  168. }
  169. impl DiffSnp {
  170. pub fn init(commun_snp: &str) -> Result<Self> {
  171. info!("Loading {commun_snp}");
  172. let commun_snp_reader =
  173. BufReader::new(pandora_lib_variants::in_out::get_reader(commun_snp)?);
  174. let lines: Vec<String> = commun_snp_reader
  175. .lines()
  176. .into_iter()
  177. .map(|e| e.unwrap())
  178. .collect();
  179. Ok(Self { lines })
  180. }
  181. pub fn diff_prop(&self, diag_bam_path: &str, mrd_bam_path: &str) -> Result<f64> {
  182. let mut rng = rand::thread_rng();
  183. let lines = self.lines.clone().into_iter().choose_multiple(&mut rng, 10_000);
  184. let max_p_val = 0.0001;
  185. let diff = Arc::new(Mutex::new(0u64));
  186. let eq = Arc::new(Mutex::new(0u64));
  187. lines.par_chunks(100).for_each(|snp_lines| {
  188. let mut diag_bam = bam::IndexedReader::from_path(diag_bam_path).unwrap();
  189. let mut mrd_bam = bam::IndexedReader::from_path(mrd_bam_path).unwrap();
  190. for snp_line in snp_lines.iter() {
  191. let snp_cols: Vec<&str> = snp_line.splitn(3, "\t").collect();
  192. let diag_pileup = get_hts_nt_pileup(
  193. &mut diag_bam,
  194. snp_cols.get(0).unwrap(),
  195. snp_cols[1].parse().unwrap(),
  196. false,
  197. )
  198. .unwrap();
  199. let diag_pileup_counts: Vec<_> = diag_pileup
  200. .clone()
  201. .into_iter()
  202. .counts()
  203. .into_iter()
  204. .filter(|(b, _)| *b != b'D')
  205. .filter(|(_, n)| *n > 2)
  206. .collect();
  207. let diag_pileup_len = diag_pileup.len();
  208. if diag_pileup_len >= 10 && diag_pileup_counts.len() <= 2 {
  209. let mrd_pileup = get_hts_nt_pileup(
  210. &mut mrd_bam,
  211. snp_cols[0],
  212. snp_cols[1].parse().unwrap(),
  213. false,
  214. )
  215. .unwrap();
  216. let mrd_pileup_len = mrd_pileup.len();
  217. let mrd_pileup_counts: Vec<_> = mrd_pileup
  218. .clone()
  219. .into_iter()
  220. .counts()
  221. .into_iter()
  222. .filter(|(b, _)| *b != b'D')
  223. .filter(|(_, n)| *n > 2)
  224. .collect();
  225. if mrd_pileup_len >= 10 && mrd_pileup_counts.len() <= 2 {
  226. if mrd_pileup_counts.len() == 1 && diag_pileup_counts.len() == 1 {
  227. let (diag_a_b, _) = diag_pileup_counts.first().unwrap();
  228. let (mrd_a_b, _) = diag_pileup_counts.first().unwrap();
  229. if diag_a_b != mrd_a_b {
  230. // println!("diff {snp_cols:?}");
  231. let mut d = diff.lock().unwrap();
  232. *d += 1;
  233. } else {
  234. // println!("eq {snp_cols:?}");
  235. let mut e = eq.lock().unwrap();
  236. *e += 1;
  237. }
  238. } else if mrd_pileup_counts.len() != diag_pileup_counts.len() {
  239. // println!("diff {snp_cols:?} {mrd_pileup_counts:?} {diag_pileup_counts:?}");
  240. let mut d = diff.lock().unwrap();
  241. *d += 1;
  242. // let mut d = diff.lock().unwrap();
  243. // *d += 1;
  244. } else if mrd_pileup_counts.len() == 2 && diag_pileup_counts.len() == 2 {
  245. let (diag_a_b, diag_a_n) = diag_pileup_counts.first().unwrap();
  246. let (diag_b_b, diag_b_n) = diag_pileup_counts.last().unwrap();
  247. let (mrd_a_b, mrd_a_n) = diag_pileup_counts.first().unwrap();
  248. let (a, c) = if diag_a_b == mrd_a_b {
  249. (diag_a_n, mrd_a_n)
  250. } else if diag_b_b == mrd_a_b {
  251. (diag_b_n, mrd_a_n)
  252. } else {
  253. continue;
  254. };
  255. let p = chi_square_test_for_proportions(
  256. *a as f64,
  257. diag_pileup_len as f64,
  258. *c as f64,
  259. mrd_pileup_len as f64,
  260. )
  261. .unwrap();
  262. if p <= max_p_val && p != 0.0 {
  263. let mut d = diff.lock().unwrap();
  264. // println!(
  265. // "diff {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
  266. // );
  267. *d += 1;
  268. } else {
  269. // println!(
  270. // "eq {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
  271. // );
  272. let mut e = eq.lock().unwrap();
  273. *e += 1;
  274. }
  275. } else {
  276. continue;
  277. }
  278. }
  279. }
  280. }
  281. });
  282. let diff = Arc::try_unwrap(diff).unwrap().into_inner()?;
  283. let eq = Arc::try_unwrap(eq).unwrap().into_inner()?;
  284. Ok(diff as f64 * 100.0 / (diff + eq) as f64)
  285. }
  286. }