cases.rs 12 KB


  1. use crate::{
  2. bam::{Bam, BamOrigin},
  3. config::Config,
  4. };
  5. use anyhow::{Context, Ok, Result};
  6. use glob::glob;
  7. use indicatif::MultiProgress;
  8. use itertools::Itertools;
  9. use log::{info, warn};
  10. use pandora_lib_pileup::get_hts_nt_pileup;
  11. use pandora_lib_stats::chi_square_test_for_proportions;
  12. use pandora_lib_variants::variants::*;
  13. use rand::seq::IteratorRandom;
  14. use rayon::prelude::*;
  15. use rust_htslib::bam;
  16. use serde::Serialize;
  17. use utoipa::ToSchema;
  18. use std::{
  19. collections::HashMap,
  20. io::{BufRead, BufReader},
  21. ops::{Deref, DerefMut},
  22. path::Path,
  23. sync::{Arc, Mutex},
  24. };
  25. #[derive(Debug, Clone, Serialize)]
  26. pub struct Cases {
  27. pub cases: Vec<Case>,
  28. pub config: Config,
  29. }
  30. impl Cases {
  31. pub fn load(
  32. mp: MultiProgress,
  33. skip_ids: Option<Vec<String>>,
  34. only_ids: Option<Vec<String>>,
  35. check_snp: bool,
  36. load_variants: bool,
  37. ) -> Result<Self> {
  38. let ids_to_skip = skip_ids.unwrap_or(vec![]);
  39. let config = Config::get()?;
  40. let mut diag_bams = HashMap::new();
  41. for entry in glob(&config.diag_bam_glob).context("Failed to read glob pattern")? {
  42. let path = entry?;
  43. let case_id = path
  44. .parent()
  45. .context("")?
  46. .parent()
  47. .context("")?
  48. .file_name()
  49. .context("")?
  50. .to_str()
  51. .context("")?
  52. .to_string();
  53. let b = Bam::new(path.clone(), BamOrigin::DIAG);
  54. diag_bams.insert(case_id, b);
  55. }
  56. let mut mrd_bams = HashMap::new();
  57. for entry in glob(&config.mrd_bam_glob).context("Failed to read glob pattern")? {
  58. let path = entry?;
  59. let case_id = path
  60. .parent()
  61. .context("")?
  62. .parent()
  63. .context("")?
  64. .file_name()
  65. .context("")?
  66. .to_str()
  67. .context("")?
  68. .to_string();
  69. let b = Bam::new(path.clone(), BamOrigin::MRD);
  70. mrd_bams.insert(case_id, b);
  71. }
  72. // Check SNP AF differences between diag and mrd
  73. let mut retained_cases = Vec::new();
  74. let diff_snp_opt = if check_snp {
  75. Some(DiffSnp::init(&config.commun_snp)?)
  76. } else {
  77. None
  78. };
  79. for (id, diag_bam) in diag_bams {
  80. if ids_to_skip.contains(&id) {
  81. continue;
  82. }
  83. if let Some(only_ids) = &only_ids {
  84. if !only_ids.contains(&id) {
  85. continue;
  86. }
  87. }
  88. if let Some(mrd_bam) = mrd_bams.get(&id) {
  89. // verify if both samples match commun snps
  90. if let Some(diff_snp) = &diff_snp_opt {
  91. let diff = diff_snp.diff_prop(
  92. diag_bam.path.to_str().unwrap(),
  93. mrd_bam.path.to_str().unwrap(),
  94. )?;
  95. if diff > config.max_snp_diff_prop {
  96. warn!("{id} diag and mrd seems to have been sequenced from two patients.");
  97. continue;
  98. }
  99. }
  100. let case = Case::new(id.clone(), diag_bam.clone(), mrd_bam.clone())?;
  101. retained_cases.push(case);
  102. }
  103. }
  104. if load_variants {
  105. for case in retained_cases.iter_mut() {
  106. let dir = case
  107. .diag_bam
  108. .path
  109. .parent()
  110. .context("")?
  111. .to_str()
  112. .context("")?;
  113. case.load_variants(&format!("{dir}/{}_variants.bytes.gz", case.id), &mp)?;
  114. }
  115. }
  116. Ok(Self { cases: retained_cases, config })
  117. }
  118. pub fn stats(&self) {
  119. self.iter().for_each(|case| {
  120. let n_var = match case.n_variants() {
  121. Some(n) => n.to_string(),
  122. None => "NA".to_string(),
  123. };
  124. let cols = vec![case.id.to_string(), n_var];
  125. info!("{}", cols.join("\t"));
  126. })
  127. }
  128. }
  129. impl IntoIterator for Cases {
  130. type Item = Case;
  131. type IntoIter = <Vec<Case> as IntoIterator>::IntoIter;
  132. fn into_iter(self) -> Self::IntoIter {
  133. self.cases.into_iter()
  134. }
  135. }
  136. impl Deref for Cases {
  137. type Target = Vec<Case>;
  138. fn deref(&self) -> &Self::Target {
  139. &self.cases
  140. }
  141. }
  142. impl DerefMut for Cases {
  143. fn deref_mut(&mut self) -> &mut Vec<Case> {
  144. &mut self.cases
  145. }
  146. }
  147. #[derive(Debug, Clone, Serialize)]
  148. pub struct Case {
  149. pub id: String,
  150. pub diag_bam: Bam,
  151. pub mrd_bam: Bam,
  152. pub variants: Option<Variants>,
  153. }
  154. impl Case {
  155. pub fn new(id: String, diag_bam: Bam, mrd_bam: Bam) -> Result<Self> {
  156. Ok(Self {
  157. id,
  158. diag_bam,
  159. mrd_bam,
  160. variants: None,
  161. })
  162. }
  163. pub fn load_variants(&mut self, variants_path: &str, mp: &MultiProgress) -> Result<()> {
  164. info!("Loading variants for {}", self.id);
  165. if !Path::new(variants_path).exists() {
  166. match run_pipe(&self.id, &mp) {
  167. std::result::Result::Ok(_) => info!("{} variants analyzed.", self.id),
  168. Err(e) => warn!("Error while analyzing {}: {e}", self.id),
  169. }
  170. }
  171. if let std::result::Result::Ok(variants) =
  172. Variants::new_from_bytes(&self.id, variants_path, mp.clone())
  173. {
  174. self.variants = Some(variants);
  175. }
  176. Ok(())
  177. }
  178. pub fn n_variants(&self) -> Option<usize> {
  179. match &self.variants {
  180. Some(v) => Some(v.len()),
  181. None => None,
  182. }
  183. }
  184. pub fn variants_stat(&self) -> Option<CaseStats> {
  185. match &self.variants {
  186. Some(v) => {
  187. if let std::result::Result::Ok(stats) = v.stats() {
  188. Some(CaseStats {
  189. id: self.id.to_string(),
  190. stats,
  191. })
  192. } else {
  193. None
  194. }
  195. }
  196. None => None,
  197. }
  198. }
  199. }
  200. #[derive(Debug, Clone, Serialize, ToSchema)]
  201. pub struct CaseStats {
  202. id: String,
  203. stats: Vec<Stat>,
  204. }
  205. pub struct DiffSnp {
  206. lines: Vec<String>,
  207. }
  208. impl DiffSnp {
  209. pub fn init(commun_snp: &str) -> Result<Self> {
  210. info!("Loading {commun_snp}");
  211. let commun_snp_reader =
  212. BufReader::new(pandora_lib_variants::in_out::get_reader(commun_snp)?);
  213. let lines: Vec<String> = commun_snp_reader
  214. .lines()
  215. .into_iter()
  216. .map(|e| e.unwrap())
  217. .collect();
  218. Ok(Self { lines })
  219. }
  220. pub fn diff_prop(&self, diag_bam_path: &str, mrd_bam_path: &str) -> Result<f64> {
  221. let mut rng = rand::thread_rng();
  222. let lines = self
  223. .lines
  224. .clone()
  225. .into_iter()
  226. .choose_multiple(&mut rng, 10_000);
  227. let max_p_val = 0.0001;
  228. let diff = Arc::new(Mutex::new(0u64));
  229. let eq = Arc::new(Mutex::new(0u64));
  230. lines.par_chunks(100).for_each(|snp_lines| {
  231. let mut diag_bam = bam::IndexedReader::from_path(diag_bam_path).unwrap();
  232. let mut mrd_bam = bam::IndexedReader::from_path(mrd_bam_path).unwrap();
  233. for snp_line in snp_lines.iter() {
  234. let snp_cols: Vec<&str> = snp_line.splitn(3, "\t").collect();
  235. let diag_pileup = get_hts_nt_pileup(
  236. &mut diag_bam,
  237. snp_cols.get(0).unwrap(),
  238. snp_cols[1].parse().unwrap(),
  239. false,
  240. )
  241. .unwrap();
  242. let diag_pileup_counts: Vec<_> = diag_pileup
  243. .clone()
  244. .into_iter()
  245. .counts()
  246. .into_iter()
  247. .filter(|(b, _)| *b != b'D')
  248. .filter(|(_, n)| *n > 2)
  249. .collect();
  250. let diag_pileup_len = diag_pileup.len();
  251. if diag_pileup_len >= 10 && diag_pileup_counts.len() <= 2 {
  252. let mrd_pileup = get_hts_nt_pileup(
  253. &mut mrd_bam,
  254. snp_cols[0],
  255. snp_cols[1].parse().unwrap(),
  256. false,
  257. )
  258. .unwrap();
  259. let mrd_pileup_len = mrd_pileup.len();
  260. let mrd_pileup_counts: Vec<_> = mrd_pileup
  261. .clone()
  262. .into_iter()
  263. .counts()
  264. .into_iter()
  265. .filter(|(b, _)| *b != b'D')
  266. .filter(|(_, n)| *n > 2)
  267. .collect();
  268. if mrd_pileup_len >= 10 && mrd_pileup_counts.len() <= 2 {
  269. if mrd_pileup_counts.len() == 1 && diag_pileup_counts.len() == 1 {
  270. let (diag_a_b, _) = diag_pileup_counts.first().unwrap();
  271. let (mrd_a_b, _) = diag_pileup_counts.first().unwrap();
  272. if diag_a_b != mrd_a_b {
  273. // println!("diff {snp_cols:?}");
  274. let mut d = diff.lock().unwrap();
  275. *d += 1;
  276. } else {
  277. // println!("eq {snp_cols:?}");
  278. let mut e = eq.lock().unwrap();
  279. *e += 1;
  280. }
  281. } else if mrd_pileup_counts.len() != diag_pileup_counts.len() {
  282. // println!("diff {snp_cols:?} {mrd_pileup_counts:?} {diag_pileup_counts:?}");
  283. let mut d = diff.lock().unwrap();
  284. *d += 1;
  285. // let mut d = diff.lock().unwrap();
  286. // *d += 1;
  287. } else if mrd_pileup_counts.len() == 2 && diag_pileup_counts.len() == 2 {
  288. let (diag_a_b, diag_a_n) = diag_pileup_counts.first().unwrap();
  289. let (diag_b_b, diag_b_n) = diag_pileup_counts.last().unwrap();
  290. let (mrd_a_b, mrd_a_n) = diag_pileup_counts.first().unwrap();
  291. let (a, c) = if diag_a_b == mrd_a_b {
  292. (diag_a_n, mrd_a_n)
  293. } else if diag_b_b == mrd_a_b {
  294. (diag_b_n, mrd_a_n)
  295. } else {
  296. continue;
  297. };
  298. let p = chi_square_test_for_proportions(
  299. *a as f64,
  300. diag_pileup_len as f64,
  301. *c as f64,
  302. mrd_pileup_len as f64,
  303. )
  304. .unwrap();
  305. if p <= max_p_val && p != 0.0 {
  306. let mut d = diff.lock().unwrap();
  307. // println!(
  308. // "diff {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
  309. // );
  310. *d += 1;
  311. } else {
  312. // println!(
  313. // "eq {snp_cols:?} {p} {a} {diag_pileup_len} {c} {mrd_pileup_len}"
  314. // );
  315. let mut e = eq.lock().unwrap();
  316. *e += 1;
  317. }
  318. } else {
  319. continue;
  320. }
  321. }
  322. }
  323. }
  324. });
  325. let diff = Arc::try_unwrap(diff).unwrap().into_inner()?;
  326. let eq = Arc::try_unwrap(eq).unwrap().into_inner()?;
  327. Ok(diff as f64 * 100.0 / (diff + eq) as f64)
  328. }
  329. }