bam.rs 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120
  1. use core::fmt;
  2. use std::{
  3. collections::{BTreeMap, HashMap},
  4. fs::{self, File},
  5. path::PathBuf,
  6. };
  7. use anyhow::{anyhow, Context};
  8. use chrono::{DateTime, Utc};
  9. use dashmap::DashMap;
  10. use glob::glob;
  11. use log::{debug, info, warn};
  12. use rand::{rng, Rng};
  13. use rayon::prelude::*;
  14. use rust_htslib::bam::{ext::BamRecordExtensions, record::Cigar, Read};
  15. use serde::{Deserialize, Serialize};
  16. use crate::config::Config;
  17. use super::flowcells::FlowCell;
  18. #[derive(Debug, Clone, Deserialize, Serialize)]
  19. pub struct WGSBam {
  20. pub id: String,
  21. pub time_point: String,
  22. pub reference_genome: String,
  23. // pub bam_type: BamType,
  24. pub path: PathBuf,
  25. pub modified: DateTime<Utc>,
  26. pub bam_stats: WGSBamStats,
  27. // pub cramino: Option<CraminoRes>,
  28. pub composition: Vec<(String, String, f64)>, // acquisition id, fn
  29. }
  30. // #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
  31. // pub enum BamType {
  32. // WGS,
  33. // Panel(String),
  34. // ChIP(String),
  35. // }
  36. //
  37. impl WGSBam {
  38. pub fn new(path: PathBuf) -> anyhow::Result<Self> {
  39. let stem = path
  40. .clone()
  41. .file_stem()
  42. .context(format!("Can't parse stem from {}", path.display()))?
  43. .to_string_lossy()
  44. .to_string();
  45. let stem: Vec<&str> = stem.split('_').collect();
  46. if stem.len() != 3 {
  47. return Err(anyhow!("Error in bam name: {}", path.display()));
  48. }
  49. let id = stem[0].to_string();
  50. let time_point = stem[1].to_string();
  51. let reference_genome = stem
  52. .last()
  53. .context("Can't get last from stem {stem}")?
  54. .to_string();
  55. // let bam_type = if stem.len() == 4 {
  56. // match stem[2] {
  57. // "oncoT" => BamType::Panel("oncoT".to_string()),
  58. // "H3K27ac" => BamType::ChIP("H3K27ac".to_string()),
  59. // "H3K4me3" => BamType::ChIP("H3K4me3".to_string()),
  60. // _ => return Err(anyhow!("Error in bam name: {}", path.display())),
  61. // }
  62. // } else {
  63. // BamType::WGS
  64. // };
  65. let file_metadata = fs::metadata(&path)?;
  66. let modified = file_metadata.modified()?;
  67. let tp_dir = path
  68. .parent()
  69. .context("Can't parse parent from: {bam_path}")?;
  70. let json_path = format!(
  71. "{}/{id}_{time_point}_{reference_genome}_info.json",
  72. tp_dir.to_string_lossy()
  73. );
  74. let json_file = PathBuf::from(&json_path);
  75. if json_file.exists() && json_file.metadata()?.modified()? > modified {
  76. match Self::load_json(&json_path) {
  77. Ok(s) => return Ok(s),
  78. Err(e) => {
  79. warn!("Error while loading {}.\n{e}", json_path);
  80. }
  81. }
  82. }
  83. // let cramino_path = format!(
  84. // "{}/{id}_{time_point}_{reference_genome}_cramino.txt",
  85. // tp_dir.to_string_lossy()
  86. // );
  87. // let cramino = if bam_type == BamType::WGS {
  88. // if !PathBuf::from_str(&cramino_path)?.exists() {
  89. // // return Err(anyhow!("Cramino file missing {cramino_path}"));
  90. // Cramino::default().with_bam(path.to_str().unwrap())?;
  91. // }
  92. // let mut cramino = Cramino::default().with_result_path(&cramino_path);
  93. // cramino
  94. // .parse_results()
  95. // .context(format!("Error while parsing cramino for {cramino_path}"))?;
  96. //
  97. // if let Some(cramino) = cramino.results {
  98. // Some(cramino)
  99. // } else {
  100. // return Err(anyhow!("Cramino results parsing failed"));
  101. // }
  102. // } else {
  103. // None
  104. // };
  105. let composition = bam_composition(path.to_string_lossy().as_ref(), 20_000).context(format!(
  106. "Error while reading BAM composition for {}",
  107. path.display()
  108. ))?;
  109. let s = Self {
  110. path,
  111. bam_stats: WGSBamStats::new(&id, &time_point, &Config::default())?,
  112. // cramino,
  113. id: id.to_string(),
  114. time_point: time_point.to_string(),
  115. // bam_type,
  116. reference_genome,
  117. composition,
  118. modified: modified.into(),
  119. };
  120. s.save_json(&json_path)?;
  121. Ok(s)
  122. }
  123. pub fn load_json(path: &str) -> anyhow::Result<Self> {
  124. let f = File::open(path)?;
  125. let s: Self = serde_json::from_reader(f)?;
  126. Ok(s)
  127. }
  128. pub fn save_json(&self, path: &str) -> anyhow::Result<()> {
  129. let f = File::create(path)?;
  130. serde_json::to_writer(f, self)?;
  131. Ok(())
  132. }
  133. }
  134. #[derive(Debug, Default, Clone)]
  135. pub struct BamCollection {
  136. pub bams: Vec<WGSBam>,
  137. }
  138. impl BamCollection {
  139. pub fn new(result_dir: &str) -> Self {
  140. load_bam_collection(result_dir)
  141. }
  142. pub fn by_acquisition_id(&self) -> HashMap<String, Vec<&WGSBam>> {
  143. let mut acq: HashMap<String, Vec<&WGSBam>> = HashMap::new();
  144. for bam in self.bams.iter() {
  145. for (acq_id, _, _) in bam.composition.iter() {
  146. if let Some(entry) = acq.get_mut(acq_id) {
  147. entry.push(bam);
  148. } else {
  149. acq.insert(acq_id.to_string(), vec![bam]);
  150. }
  151. }
  152. }
  153. acq
  154. }
  155. pub fn get(&self, id: &str, time_point: &str) -> Vec<&WGSBam> {
  156. self.bams
  157. .iter()
  158. .filter(|b| b.id == id && b.time_point == time_point)
  159. .collect()
  160. }
  161. pub fn by_id_completed(&self, min_diag_cov: f32, min_mrd_cov: f32) -> Vec<WGSBam> {
  162. self.bams
  163. .iter()
  164. // .filter(|b| matches!(b.bam_type, BamType::WGS))
  165. .filter(|b| match b.time_point.as_str() {
  166. "diag" => b.bam_stats.global_coverage >= min_diag_cov as f64,
  167. "mrd" => b.bam_stats.global_coverage >= min_mrd_cov as f64,
  168. _ => false,
  169. })
  170. // .filter(|b| match &b.cramino {
  171. // Some(cramino) => match b.time_point.as_str() {
  172. // "diag" => cramino.mean_length >= min_diag_cov as f64,
  173. // "mrd" => cramino.mean_length >= min_mrd_cov as f64,
  174. // _ => false,
  175. // },
  176. // _ => false,
  177. // })
  178. .cloned()
  179. .collect()
  180. }
  181. pub fn ids(&self) -> Vec<String> {
  182. let mut ids: Vec<String> = self.bams.iter().map(|b| b.id.clone()).collect();
  183. ids.sort();
  184. ids.dedup();
  185. ids
  186. }
  187. }
  188. pub fn load_bam_collection(result_dir: &str) -> BamCollection {
  189. let pattern = format!("{}/*/*/*.bam", result_dir);
  190. let bams = glob(&pattern)
  191. .expect("Failed to read glob pattern.")
  192. // .par_bridge()
  193. .filter_map(|entry| {
  194. match entry {
  195. Ok(path) => match WGSBam::new(path) {
  196. Ok(bam) => return Some(bam),
  197. Err(e) => warn!("{e}"),
  198. },
  199. Err(e) => warn!("Error: {:?}", e),
  200. }
  201. None
  202. })
  203. .collect();
  204. BamCollection { bams }
  205. }
  206. /// Computes the composition of `(RG, fn)` tag prefixes in a BAM file sample.
  207. ///
  208. /// This function parses a BAM file and, for a subset of records (`sample_size`),
  209. /// extracts the `RG` (read group) and `fn` (typically a file or flowcell ID) string tags from each record.
  210. /// It groups records by the prefix (before `_`, if present) of each tag,
  211. /// counts occurrences for each pair, and reports their proportion in the sample.
  212. ///
  213. /// # Arguments
  214. ///
  215. /// * `file_path` - Path to the BAM file.
  216. /// * `sample_size` - Maximum number of records to sample and process.
  217. ///
  218. /// # Returns
  219. ///
  220. /// Returns a vector of tuples, each containing:
  221. /// - `String`: the <runid> (up to the first underscore of the `RG` tag, or full tag if no underscore)
  222. /// - `String`: the <flowcell-id> (up to the first underscore of the `fn` tag, or full tag if no underscore)
  223. /// - `f64`: percentage of records with this `(runid, flowcell-id)` prefix pair, relative to total successfully processed records
  224. ///
  225. /// # Errors
  226. ///
  227. /// Returns an error if the BAM file cannot be opened or read.
  228. /// Any record in the sampled set does not have both required tags, or the tags are not strings.
  229. ///
  230. /// # Examples
  231. ///
  232. /// ```no_run
  233. /// # use anyhow::Result;
  234. /// let stats = bam_composition("reads.bam", 10_000)?;
  235. /// for (rg, flowcell, pct) in stats {
  236. /// println!("{} / {}: {:.2}%", rg, flowcell, pct);
  237. /// }
  238. /// # Ok::<(), anyhow::Error>(())
  239. /// ```
  240. ///
  241. /// # Notes
  242. ///
  243. /// - Only the first `sample_size` records in the BAM are processed.
  244. /// - Percentages are normalized by the number of records that had both required tags.
  245. /// - This function uses the `rust-htslib` crate for BAM parsing.
  246. ///
  247. /// # Tag requirements
  248. ///
  249. /// - Both `RG` and `fn` tags must be present and of string type.
  250. /// - Tag values are split on the first underscore; only the prefix is used for grouping.
  251. ///
  252. /// # See also
  253. ///
  254. /// - [SAM/BAM tag conventions](https://samtools.github.io/hts-specs/SAMv1.pdf)
  255. /// - [Dorado SAM specifications](https://github.com/nanoporetech/dorado/blob/release-v1.0/documentation/SAM.md)
  256. ///
  257. pub fn bam_composition(
  258. file_path: &str,
  259. sample_size: usize,
  260. ) -> anyhow::Result<Vec<(String, String, f64)>> {
  261. use std::collections::HashMap;
  262. use rust_htslib::bam::{Reader, record::Aux};
  263. let mut bam = Reader::from_path(file_path)?;
  264. let mut rgs: HashMap<(String, String), u64> = HashMap::new();
  265. let mut processed = 0u64;
  266. for (i, rec) in bam.records().filter_map(Result::ok).take(sample_size).enumerate() {
  267. let rg = match rec.aux(b"RG") {
  268. Ok(Aux::String(s)) => s,
  269. Ok(_) => return Err(anyhow::anyhow!("Record {}: RG tag is not a string", i + 1)),
  270. Err(_) => return Err(anyhow::anyhow!("Record {}: RG tag missing", i + 1)),
  271. };
  272. let fn_tag = match rec.aux(b"fn") {
  273. Ok(Aux::String(b)) => b,
  274. Ok(_) => return Err(anyhow::anyhow!("Record {}: fn tag is not a string", i + 1)),
  275. Err(_) => return Err(anyhow::anyhow!("Record {}: fn tag missing", i + 1)),
  276. };
  277. let rg_prefix = rg.split_once('_').map(|(a, _)| a).unwrap_or(rg);
  278. let fn_prefix = fn_tag.split_once('_').map(|(b, _)| b).unwrap_or(fn_tag);
  279. *rgs.entry((rg_prefix.to_string(), fn_prefix.to_string())).or_default() += 1;
  280. processed += 1;
  281. }
  282. let results: Vec<_> = rgs
  283. .into_iter()
  284. .map(|((rg, fn_tag), n)| (
  285. rg,
  286. fn_tag,
  287. n as f64 * 100.0 / processed as f64
  288. ))
  289. .collect();
  290. Ok(results)
  291. }
  292. pub fn bam_has_fc(bam: &WGSBam, flow_cell: &FlowCell) -> anyhow::Result<bool> {
  293. let mut bam = rust_htslib::bam::Reader::from_path(&bam.path)?;
  294. let mut rgs: HashMap<String, u64> = HashMap::new();
  295. for result in bam.records().filter_map(Result::ok).take(2_000) {
  296. if let rust_htslib::bam::record::Aux::String(s) = result.aux(b"fn")? {
  297. *rgs.entry(s.to_string()).or_default() += 1;
  298. }
  299. }
  300. for k in rgs.keys() {
  301. if k.contains(&flow_cell.sample_sheet.flow_cell_id) {
  302. return Ok(true);
  303. }
  304. }
  305. Ok(false)
  306. }
  307. pub struct BamReadsSampler {
  308. pub positions: Vec<(String, u64)>,
  309. pub reader: rust_htslib::bam::IndexedReader,
  310. current_index: usize,
  311. }
  312. impl BamReadsSampler {
  313. pub fn new(path: &str, n: usize) -> anyhow::Result<Self> {
  314. debug!("Reading BAM file: {path}");
  315. let reader = rust_htslib::bam::IndexedReader::from_path(path)?;
  316. let header = reader.header();
  317. let contig_len: Vec<(String, u64)> = header
  318. .target_names()
  319. .into_iter()
  320. .map(|target_name| {
  321. let tid = header.tid(target_name).unwrap();
  322. (
  323. String::from_utf8(target_name.to_vec()).unwrap(),
  324. header.target_len(tid).unwrap(),
  325. )
  326. })
  327. .collect();
  328. let positions = sample_random_positions(&contig_len, n);
  329. Ok(Self {
  330. positions,
  331. reader,
  332. current_index: 0,
  333. })
  334. }
  335. }
  336. impl Iterator for BamReadsSampler {
  337. type Item = Result<rust_htslib::bam::Record, rust_htslib::errors::Error>;
  338. fn next(&mut self) -> Option<Self::Item> {
  339. loop {
  340. if self.current_index < self.positions.len() {
  341. let (contig, pos) = &self.positions[self.current_index];
  342. match self.reader.fetch((contig, *pos, pos + 1)) {
  343. Ok(_) => (),
  344. Err(e) => warn!("Error while reading BAM {e}"),
  345. }
  346. let result = self.reader.records().next();
  347. self.current_index += 1;
  348. if result.is_some() {
  349. return result;
  350. }
  351. } else {
  352. return None;
  353. }
  354. }
  355. }
  356. }
  357. pub fn n_mapped_reads(bam_path: &str) -> anyhow::Result<u64> {
  358. let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path)?;
  359. let mapped = bam
  360. .index_stats()?
  361. .into_iter()
  362. .filter(|(tid, _, _, _)| *tid < 24)
  363. .map(|(_, _, m, _)| m)
  364. .sum();
  365. Ok(mapped)
  366. }
  367. // 0-based inclusif
  368. pub fn nt_pileup(
  369. bam: &mut rust_htslib::bam::IndexedReader,
  370. chr: &str,
  371. position: u32,
  372. with_next_ins: bool,
  373. ) -> anyhow::Result<Vec<u8>> {
  374. use rust_htslib::{bam, bam::Read};
  375. let mut bases = Vec::new();
  376. bam.fetch((chr, position, position + 1))?;
  377. let mut bam_pileup = Vec::new();
  378. for p in bam.pileup() {
  379. let pileup = p.context(format!(
  380. "Can't pileup bam at position {}:{} (0-based)",
  381. chr, position
  382. ))?;
  383. let cur_position = pileup.pos();
  384. if cur_position == position {
  385. for alignment in pileup.alignments() {
  386. match alignment.indel() {
  387. bam::pileup::Indel::Ins(_len) => bam_pileup.push(b'I'),
  388. bam::pileup::Indel::Del(_len) => bam_pileup.push(b'D'),
  389. _ => {
  390. let record = alignment.record();
  391. if record.seq_len() > 0 {
  392. if let Some(b) = base_at(&record, position, with_next_ins)? {
  393. bases.push(b);
  394. }
  395. } else if alignment.is_del() {
  396. bases.push(b'D');
  397. }
  398. }
  399. }
  400. }
  401. }
  402. }
  403. Ok(bases)
  404. }
  405. pub fn base_at(
  406. record: &rust_htslib::bam::record::Record,
  407. at_pos: u32,
  408. with_next_ins: bool,
  409. ) -> anyhow::Result<Option<u8>> {
  410. let cigar = record.cigar();
  411. let seq = record.seq();
  412. let pos = cigar.pos() as u32;
  413. let mut read_i = 0u32;
  414. // let at_pos = at_pos - 1;
  415. let mut ref_pos = pos;
  416. if ref_pos > at_pos {
  417. return Ok(None);
  418. }
  419. for (id, op) in cigar.iter().enumerate() {
  420. let (add_read, add_ref) = match *op {
  421. Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
  422. Cigar::Ins(len) => (len, 0),
  423. Cigar::Del(len) => (0, len),
  424. Cigar::RefSkip(len) => (0, len),
  425. Cigar::SoftClip(len) => (len, 0),
  426. Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
  427. };
  428. // If at the end of the op len and next op is Ins return I
  429. if with_next_ins && ref_pos + add_read == at_pos + 1 {
  430. if let Some(Cigar::Ins(_)) = cigar.get(id + 1) {
  431. return Ok(Some(b'I'));
  432. }
  433. }
  434. if ref_pos + add_ref > at_pos {
  435. // Handle deletions directly
  436. if let Cigar::Del(_) = *op {
  437. return Ok(Some(b'D'));
  438. } else if let Cigar::RefSkip(_) = op {
  439. return Ok(None);
  440. } else {
  441. let diff = at_pos - ref_pos;
  442. let p = read_i + diff;
  443. return Ok(Some(seq[p as usize]));
  444. }
  445. }
  446. read_i += add_read;
  447. ref_pos += add_ref;
  448. }
  449. Ok(None)
  450. }
  451. pub fn counts_at(
  452. bam: &mut rust_htslib::bam::IndexedReader,
  453. chr: &str,
  454. position: u32,
  455. ) -> anyhow::Result<HashMap<String, i32>> {
  456. let p = nt_pileup(bam, chr, position, false)?
  457. .iter()
  458. .map(|e| String::from_utf8(vec![*e]).unwrap())
  459. .collect::<Vec<_>>();
  460. let mut counts = HashMap::new();
  461. for item in p.iter() {
  462. *counts.entry(item.to_string()).or_insert(0) += 1;
  463. }
  464. Ok(counts)
  465. }
  466. pub fn ins_pileup(
  467. bam: &mut rust_htslib::bam::IndexedReader,
  468. chr: &str,
  469. position: u32,
  470. ) -> anyhow::Result<Vec<String>> {
  471. let mut bases = Vec::new();
  472. bam.fetch((chr, position, position + 10))?;
  473. for p in bam.pileup() {
  474. let pileup = p.context(format!(
  475. "Can't pileup bam at position {}:{} (0-based)",
  476. chr, position
  477. ))?;
  478. let cur_position = pileup.pos();
  479. // Ins in the next position
  480. if cur_position == position + 1 {
  481. // debug!("{cur_position}");
  482. for alignment in pileup.alignments() {
  483. let record = alignment.record();
  484. if record.seq_len() > 0 {
  485. if let Some(b) = ins_at(&record, position)? {
  486. bases.push(b);
  487. }
  488. }
  489. }
  490. }
  491. }
  492. Ok(bases)
  493. }
  494. pub fn ins_at(
  495. record: &rust_htslib::bam::record::Record,
  496. at_pos: u32,
  497. ) -> anyhow::Result<Option<String>> {
  498. use rust_htslib::bam::record::Cigar;
  499. let cigar = record.cigar();
  500. let seq = record.seq();
  501. let pos = cigar.pos() as u32;
  502. let mut read_i = 0u32;
  503. let mut ref_pos = pos;
  504. if ref_pos > at_pos {
  505. return Ok(None);
  506. }
  507. // debug!(
  508. // "read: {}",
  509. // String::from_utf8(record.qname().to_vec()).unwrap()
  510. // );
  511. for op in cigar.iter() {
  512. let (add_read, add_ref) = match *op {
  513. Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
  514. Cigar::Ins(len) => (len, 0),
  515. Cigar::Del(len) => (0, len),
  516. Cigar::RefSkip(len) => (0, len),
  517. Cigar::SoftClip(len) => (len, 0),
  518. Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
  519. };
  520. if ref_pos + add_read > at_pos && ref_pos + add_read < at_pos + 10 {
  521. if let Cigar::Ins(v) = *op {
  522. // debug!(
  523. // "ins size {v} @ {} (corrected {})",
  524. // ref_pos + add_read,
  525. // (ref_pos + add_read) - v - 1
  526. // );
  527. if (ref_pos + add_read) - v - 1 == at_pos {
  528. let inserted_seq =
  529. seq.as_bytes()[read_i as usize..(read_i + v) as usize].to_vec();
  530. return Ok(Some(String::from_utf8(inserted_seq)?));
  531. }
  532. }
  533. }
  534. read_i += add_read;
  535. ref_pos += add_ref;
  536. }
  537. Ok(None)
  538. }
  539. pub fn counts_ins_at(
  540. bam: &mut rust_htslib::bam::IndexedReader,
  541. chr: &str,
  542. position: u32,
  543. ) -> anyhow::Result<HashMap<String, i32>> {
  544. let p = ins_pileup(bam, chr, position)?;
  545. let mut counts = HashMap::new();
  546. for item in p.iter() {
  547. *counts.entry(item.to_string()).or_insert(0) += 1;
  548. }
  549. Ok(counts)
  550. }
  551. pub fn sample_random_positions(chromosomes: &Vec<(String, u64)>, n: usize) -> Vec<(String, u64)> {
  552. let mut rng = rng();
  553. let total_length: u64 = chromosomes.iter().map(|(_, length)| length).sum();
  554. (0..n)
  555. .map(|_| {
  556. let random_position = rng.random_range(0..total_length);
  557. let mut cumulative_length = 0;
  558. for (chr, length) in chromosomes {
  559. cumulative_length += length;
  560. if random_position < cumulative_length {
  561. let position_within_chr = random_position - (cumulative_length - length);
  562. return (chr.clone(), position_within_chr);
  563. }
  564. }
  565. unreachable!("Should always find a chromosome")
  566. })
  567. .collect()
  568. }
  569. /// High-level mapping statistics for a WGS BAM file.
  570. ///
  571. /// This struct aggregates essential quality control and mapping metrics, suitable for reporting or downstream QC.
  572. /// Unless otherwise specified, units are in base pairs (bp) or counts of reads.
  573. ///
  574. /// # Fields
  575. /// - `all_records`: Total number of records in the BAM, before any filtering.
  576. /// - `n_reads`: Number of primary, mapped, QC-passed, non-duplicate reads counted for statistics.
  577. /// - `mapped_fraction`: Fraction of mapped (counted) reads relative to all records.
  578. /// - `n_unmapped`: Number of unmapped reads (`BAM_FUNMAP`).
  579. /// - `n_duplicates`: Number of reads marked as duplicate (`BAM_FDUP`).
  580. /// - `mapped_yield`: Total sum of mapped read lengths (bp).
  581. /// - `mean_read_length`: Mean length of mapped reads (bp).
  582. /// - `median_read_length`: Median mapped read length (bp).
  583. /// - `coverage_stddev`: Standard deviation of per-read global coverage contribution.
  584. /// - `global_coverage`: Mean mapped yield divided by total genome size.
  585. /// - `karyotype`: Per-contig summary: (TID, contig length, contig name, mapped yield, coverage stddev).
  586. /// - `n50`: N50 statistic for mapped read lengths (bp).
  587. /// - `by_lengths`: Histogram of mapped read lengths as (length, count), sorted by length.
  588. ///
  589. /// # Filtering Rule
  590. /// Only records that are:
  591. /// - primary alignments (not secondary/supplementary),
  592. /// - mapped (`!BAM_FUNMAP`),
  593. /// - pass vendor quality checks (`!BAM_FQCFAIL`),
  594. /// - not marked as duplicate (`!BAM_FDUP`),
  595. /// - and with mapping quality ≥ `bam_min_mapq`,
  596. /// are counted as mapped reads.
  597. ///
  598. #[derive(Debug, Clone, Deserialize, Serialize)]
  599. pub struct WGSBamStats {
  600. /// Total number of BAM records (including unmapped, filtered, duplicates).
  601. pub all_records: u64,
  602. /// Number of mapped, primary, QC-passed, non-duplicate reads (final filtered count).
  603. pub n_reads: u64,
  604. /// Fraction of counted mapped reads over all BAM records.
  605. pub mapped_fraction: f64,
  606. /// Number of unmapped records (flag 0x4).
  607. pub n_unmapped: u64,
  608. /// Number of duplicate records (flag 0x400).
  609. pub n_duplicates: u64,
  610. /// Number of duplicate records (flag 0x400).
  611. pub n_lowq: u64,
  612. /// Total sum of mapped read lengths (bp).
  613. pub mapped_yield: u64,
  614. /// Mean mapped read length (bp).
  615. pub mean_read_length: f64,
  616. /// Median mapped read length (bp).
  617. pub median_read_length: u64,
  618. /// Standard deviation of per-read global coverage contributions.
  619. pub coverage_stddev: f64,
  620. /// Mean global coverage: `mapped_yield / genome_size`.
  621. pub global_coverage: f64,
  622. /// (tid, contig_length, contig_name, mapped_yield, mean_coverage, coverage_stddev, coverage_ratio)
  623. pub karyotype: Vec<(i32, u64, String, u64, f64, f64, f64)>,
  624. /// N50 value of mapped read lengths (bp).
  625. pub n50: u64,
  626. /// Histogram of mapped read lengths: (length, count), sorted by length.
  627. pub by_lengths: Vec<(u64, u32)>,
  628. }
  629. impl WGSBamStats {
  630. /// Computes `WGSBamStats` by scanning a BAM file and summarizing essential statistics.
  631. ///
  632. /// # Parameters
  633. /// - `case_id`: Sample/case identifier used to locate the BAM file via config.
  634. /// - `time`: Timestamp string for progress/logging.
  635. /// - `config`: Configuration struct. Must provide:
  636. /// - `solo_bam(case_id, time)`: path to BAM file.
  637. /// - `bam_min_mapq`: minimum mapping quality to include read.
  638. /// - `bam_n_threads`: number of BAM decompression threads.
  639. ///
  640. /// # Filtering Rule
  641. /// Only primary, mapped, non-duplicate, QC-passed reads with sufficient mapping quality are included (see struct doc).
  642. ///
  643. /// # Returns
  644. /// On success, returns a fully populated `WGSBamStats` with all core and extended fields.
  645. /// Returns an error if the BAM cannot be parsed or genome sizes cannot be retrieved.
  646. pub fn new(case_id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
  647. let bam_path = config.solo_bam(case_id, time);
  648. let bam_min_mapq = config.bam_min_mapq;
  649. let bam_n_threads = config.bam_n_threads;
  650. let mut bam = rust_htslib::bam::Reader::from_path(&bam_path)?;
  651. let h = bam.header().clone();
  652. let header = rust_htslib::bam::Header::from_template(&h);
  653. bam.set_threads(bam_n_threads as usize)?;
  654. info!("Parsing BAM file: {bam_path}");
  655. let mut all_records = 0u64;
  656. let mut n_reads = 0u64;
  657. let mut n_unmapped = 0u64;
  658. let mut n_duplicates = 0u64;
  659. let mut n_lowq = 0u64;
  660. let mut mapped_lengths = Vec::new();
  661. let mut lengths_by_tid: HashMap<i32, Vec<u64>> = HashMap::new();
  662. // Main scan loop: apply flag and MAPQ filters.
  663. for rec in bam.rc_records() {
  664. all_records += 1;
  665. let r = rec.with_context(|| "failed to parse BAM record")?;
  666. let flags = r.flags();
  667. // Count and exclude unmapped.
  668. if flags & (rust_htslib::htslib::BAM_FUNMAP as u16) != 0 {
  669. n_unmapped += 1;
  670. continue;
  671. }
  672. // Count and exclude duplicates.
  673. if flags & (rust_htslib::htslib::BAM_FDUP as u16) != 0 {
  674. n_duplicates += 1;
  675. continue;
  676. }
  677. // Filter by mapping quality.
  678. if r.mapq() < bam_min_mapq {
  679. n_lowq += 1;
  680. continue;
  681. }
  682. // Exclude secondary and QC-failed.
  683. let bad_flags = rust_htslib::htslib::BAM_FSECONDARY | rust_htslib::htslib::BAM_FQCFAIL;
  684. if (flags & (bad_flags as u16)) != 0 {
  685. continue;
  686. }
  687. n_reads += 1;
  688. let start = r.pos();
  689. let end = r.reference_end();
  690. let len = if end > start { (end - start) as u64 } else { 0 };
  691. mapped_lengths.push(len);
  692. lengths_by_tid.entry(r.tid()).or_default().push(len);
  693. if n_reads % 500_000 == 0 {
  694. info!("{case_id} {time}: processed {n_reads} mapped reads");
  695. }
  696. }
  697. let mapped_fraction = if all_records > 0 {
  698. n_reads as f64 / all_records as f64
  699. } else {
  700. 0.0
  701. };
  702. // Compute mean, median, N50 (single sort).
  703. let mut lens = mapped_lengths.clone();
  704. lens.sort_unstable();
  705. let mean_read_length = if n_reads > 0 {
  706. mapped_lengths.iter().sum::<u64>() as f64 / n_reads as f64
  707. } else {
  708. 0.0
  709. };
  710. let median_read_length = if lens.is_empty() {
  711. 0
  712. } else {
  713. lens[lens.len() / 2]
  714. };
  715. let mapped_yield: u64 = mapped_lengths.iter().sum();
  716. // N50: minimal length L such that sum(lengths >= L) >= 50% yield.
  717. let mut cum = 0u64;
  718. let half = mapped_yield / 2;
  719. let n50 = lens
  720. .iter()
  721. .rev()
  722. .find_map(|&l| {
  723. cum += l;
  724. if cum >= half {
  725. Some(l)
  726. } else {
  727. None
  728. }
  729. })
  730. .unwrap_or(0);
  731. // Histogram (sorted by length).
  732. let mut histo = BTreeMap::new();
  733. for &len in &mapped_lengths {
  734. *histo.entry(len).or_insert(0) += 1;
  735. }
  736. let by_lengths: Vec<_> = histo.into_iter().collect();
  737. // Genome/coverage.
  738. let genome = get_genome_sizes(&header)?;
  739. let genome_size: u64 = genome.values().sum();
  740. let global_coverage = if genome_size > 0 {
  741. mapped_yield as f64 / genome_size as f64
  742. } else {
  743. 0.0
  744. };
  745. // Global coverage stdev: stdev of per-read contributions to global coverage.
  746. let mut total_sq_cov = 0.0;
  747. for &len in &mapped_lengths {
  748. let cov = if genome_size > 0 {
  749. len as f64 / genome_size as f64
  750. } else {
  751. 0.0
  752. };
  753. total_sq_cov += cov * cov;
  754. }
  755. let mean_cov = global_coverage;
  756. let variance = if n_reads > 0 {
  757. let var = total_sq_cov / n_reads as f64 - mean_cov.powi(2);
  758. if var < 0.0 {
  759. 0.0
  760. } else {
  761. var
  762. }
  763. } else {
  764. 0.0
  765. };
  766. let coverage_stddev = variance.sqrt();
  767. // Per-contig: (TID, contig_length, contig_name, mapped_yield, mean_coverage, coverage_stddev , coverage_ratio).
  768. let mut karyotype: Vec<(i32, u64, String, u64, f64, f64, f64)> = lengths_by_tid
  769. .iter()
  770. .map(|(tid, lengths)| {
  771. let contig = String::from_utf8(h.tid2name(*tid as u32).to_vec()).unwrap();
  772. let contig_len = genome.get(&contig).copied().unwrap_or(0);
  773. let mapped_sum: u64 = lengths.iter().sum();
  774. // Per-contig mean coverage
  775. let mean_cov = if contig_len > 0 {
  776. mapped_sum as f64 / contig_len as f64
  777. } else {
  778. 0.0
  779. };
  780. let coverage_ratio = if global_coverage > 0.0 {
  781. mean_cov / global_coverage
  782. } else {
  783. 0.0
  784. };
  785. // Per-contig coverage stdev (over all reads mapped to this contig)
  786. let var = if contig_len > 0 && !lengths.is_empty() {
  787. lengths
  788. .iter()
  789. .map(|&l| {
  790. let cov = l as f64 / contig_len as f64;
  791. (cov - mean_cov).powi(2)
  792. })
  793. .sum::<f64>()
  794. / lengths.len() as f64
  795. } else {
  796. 0.0
  797. };
  798. let stddev = var.sqrt();
  799. (
  800. *tid,
  801. contig_len,
  802. contig,
  803. mapped_sum,
  804. mean_cov,
  805. stddev,
  806. coverage_ratio,
  807. )
  808. })
  809. .collect();
  810. karyotype.sort_unstable_by_key(|&(tid, _, _, _, _, _, _)| tid);
  811. Ok(Self {
  812. all_records,
  813. n_reads,
  814. mapped_fraction,
  815. n_unmapped,
  816. n_duplicates,
  817. mapped_yield,
  818. mean_read_length,
  819. median_read_length,
  820. coverage_stddev,
  821. global_coverage,
  822. karyotype,
  823. n50,
  824. by_lengths,
  825. n_lowq,
  826. })
  827. }
  828. }
  829. impl fmt::Display for WGSBamStats {
  830. /// Formats the BAM statistics as a readable summary table.
  831. ///
  832. /// Shows all global stats and a per-contig breakdown with coverage stdev.
  833. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  834. writeln!(f, "BAM statistics summary:")?;
  835. writeln!(f, " All BAM records: {}", self.all_records)?;
  836. writeln!(f, " Unmapped reads: {}", self.n_unmapped)?;
  837. writeln!(f, " Duplicate reads: {}", self.n_duplicates)?;
  838. writeln!(f, " Low Q reads: {}", self.n_lowq)?;
  839. writeln!(f, " Counted (mapped) reads: {}", self.n_reads)?;
  840. writeln!(f, " Mapped fraction: {:.4}", self.mapped_fraction)?;
  841. writeln!(
  842. f,
  843. " Mapped yield [Gb]: {:.2}",
  844. self.mapped_yield as f64 / 1e9
  845. )?;
  846. writeln!(f, " Mean read length [bp]: {:.2}", self.mean_read_length)?;
  847. writeln!(f, " Median read length [bp]: {}", self.median_read_length)?;
  848. writeln!(f, " N50 [bp]: {}", self.n50)?;
  849. writeln!(f, " Global mean coverage: {:.2}", self.global_coverage)?;
  850. writeln!(f, " Global coverage stdev: {:.2}", self.coverage_stddev)?;
  851. writeln!(f, " Per-contig stats:")?;
  852. writeln!(
  853. f,
  854. " {:<8} {:<10} {:<12} {:<14} {:<12} {:<10} {:<10}",
  855. "TID", "Length", "Name", "MappedYield", "CovMean", "CovStdev", "CovRatio"
  856. )?;
  857. for (tid, contig_len, contig, mapped_sum, mean_cov, stddev, coverage_ratio) in
  858. &self.karyotype
  859. {
  860. writeln!(
  861. f,
  862. " {:<8} {:<10} {:<12} {:<14} {:<12.4} {:.4} {:.2}",
  863. tid, contig_len, contig, mapped_sum, mean_cov, stddev, coverage_ratio
  864. )?;
  865. }
  866. Ok(())
  867. }
  868. }
  869. pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<HashMap<String, u64>> {
  870. let mut genome = HashMap::new();
  871. for (key, records) in header.to_hashmap() {
  872. for record in records {
  873. if key == "SQ" {
  874. genome.insert(
  875. record["SN"].to_string(),
  876. record["LN"]
  877. .parse::<u64>()
  878. .expect("Failed parsing length of chromosomes"),
  879. );
  880. }
  881. }
  882. }
  883. Ok(genome)
  884. }
  885. /// Result of querying a read at a reference position.
  886. #[derive(Clone, Debug, Eq, PartialEq)]
  887. pub enum PileBase {
  888. A,
  889. C,
  890. G,
  891. T,
  892. N,
  893. Ins, // insertion immediately after the queried base
  894. Del((Vec<u8>, u32)), // deletion covering the queried base
  895. Skip, // reference skip (N)
  896. }
  897. /// Decode one HTSlib 4bit nucleotide (0=A,1=C,2=G,3=T,4=N, …) to `Base`
  898. #[inline]
  899. fn decode(b: u8) -> PileBase {
  900. match b & 0b1111 {
  901. 0 => PileBase::A,
  902. 1 => PileBase::C,
  903. 2 => PileBase::G,
  904. 3 => PileBase::T,
  905. _ => PileBase::N,
  906. }
  907. }
  908. pub fn base_at_new(
  909. record: &rust_htslib::bam::Record,
  910. at_pos: i64,
  911. with_next_ins: bool,
  912. ) -> Option<PileBase> {
  913. if record.pos() > at_pos {
  914. return None;
  915. }
  916. let mut ref_pos = record.pos();
  917. let mut read_pos = 0_i64;
  918. let cigar = record.cigar();
  919. let seq = record.seq().as_bytes(); // already expanded to ASCII (A,C,G,T,N)
  920. for (i, op) in cigar.iter().enumerate() {
  921. let l = op.len() as i64;
  922. let (consume_read, consume_ref) = match op.char() {
  923. 'M' | '=' | 'X' => (l, l),
  924. 'I' | 'S' => (l, 0),
  925. 'D' | 'N' => (0, l),
  926. 'H' | 'P' => (0, 0),
  927. _ => unreachable!(),
  928. };
  929. /* look ahead for an insertion immediately AFTER the queried base */
  930. if with_next_ins
  931. && ref_pos + consume_ref == at_pos + 1
  932. && matches!(cigar.get(i + 1), Some(Cigar::Ins(_)))
  933. {
  934. return Some(PileBase::Ins);
  935. }
  936. // Instead of PileBase::Ins, consider
  937. // extracting the inserted sequence and returning its length or actual bases:
  938. // if let Some(Cigar::Ins(len)) = cigar.get(i + 1) {
  939. // let ins_seq = &seq[read_pos as usize + (consume_ref as usize)
  940. // ..read_pos as usize + (consume_ref as usize) + (*len as usize)];
  941. // return Some(PileBase::Ins(ins_seq.to_vec()));
  942. // }
  943. /* Does the queried reference coordinate fall inside this CIGAR op? */
  944. if ref_pos + consume_ref > at_pos {
  945. return Some(match op {
  946. Cigar::Del(e) => PileBase::Del((record.qname().to_vec(), *e)),
  947. Cigar::RefSkip(_) => PileBase::Skip,
  948. _ => {
  949. let offset = (at_pos - ref_pos) as usize;
  950. if read_pos as usize + offset >= seq.len() {
  951. return None; // out of range, malformed BAM
  952. }
  953. decode(seq[read_pos as usize + offset])
  954. }
  955. });
  956. }
  957. read_pos += consume_read;
  958. ref_pos += consume_ref;
  959. }
  960. None // beyond alignment
  961. }
  962. /// Return the pile‑up of **all reads** at `chr:pos` (0‑based, inclusive).
  963. ///
  964. /// * `bam` – an open [`rust_htslib::bam::IndexedReader`].
  965. /// * `chr` / `pos` – reference contig name and coordinate.
  966. /// * `with_next_ins` – if `true`, report [`Base::Ins`] when an insertion starts
  967. /// **right after** the queried base.
  968. ///
  969. /// The function bounds the internal pile‑up depth to 10 000 reads to protect
  970. /// against malformed BAM files that could otherwise allocate unbounded memory.
  971. ///
  972. /// # Errors
  973. ///
  974. /// Propagates any I/O or parsing error from *rust‑htslib*, wrapped in
  975. /// [`anyhow::Error`].
  976. pub fn nt_pileup_new(
  977. bam: &mut rust_htslib::bam::IndexedReader,
  978. chr: &str,
  979. pos: u32,
  980. with_next_ins: bool,
  981. ) -> anyhow::Result<Vec<PileBase>> {
  982. // Storage for the per‑read observations.
  983. let mut pile = Vec::new();
  984. // Restrict BAM iterator to the one‑base span of interest.
  985. bam.fetch((chr, pos, pos + 1))?;
  986. // Hard cap on depth (adjust if your use‑case needs deeper coverage).
  987. bam.pileup().set_max_depth(10_000);
  988. // Stream the pileup; HTSlib walks the reads for us.
  989. for pileup in bam.pileup() {
  990. // Attach context to any low‑level error.
  991. let pileup = pileup.context(format!("Failed to pileup BAM at {chr}:{pos}"))?;
  992. // Skip other positions quickly.
  993. if pileup.pos() != pos {
  994. continue;
  995. }
  996. for aln in pileup.alignments() {
  997. // Handle the three indel states reported by HTSlib.
  998. match aln.indel() {
  999. rust_htslib::bam::pileup::Indel::Ins(_) => {
  1000. pile.push(PileBase::Ins); // insertion *starts at* this base
  1001. }
  1002. rust_htslib::bam::pileup::Indel::Del(e) => {
  1003. let qname = aln.record().qname().to_vec();
  1004. pile.push(PileBase::Del((qname, e))); // deletion length > 1, but still covers here
  1005. }
  1006. rust_htslib::bam::pileup::Indel::None => {
  1007. // Regular match/mismatch: delegate to `base_at`.
  1008. if let Some(base) = base_at_new(&aln.record(), pos as i64, with_next_ins) {
  1009. pile.push(base);
  1010. }
  1011. }
  1012. }
  1013. }
  1014. }
  1015. Ok(pile)
  1016. }