| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676 |
- use std::{
- collections::HashMap,
- fs::{self, File},
- path::PathBuf,
- };
- use anyhow::{anyhow, Context};
- use chrono::{DateTime, Utc};
- use dashmap::DashMap;
- use glob::glob;
- use log::{debug, info, warn};
- use rand::{rng, Rng};
- use rayon::prelude::*;
- use rust_htslib::bam::{ext::BamRecordExtensions, record::Cigar, Read};
- use serde::{Deserialize, Serialize};
- use crate::config::Config;
- #[derive(Debug, Clone, Deserialize, Serialize)]
- pub struct WGSBam {
- pub id: String,
- pub time_point: String,
- pub reference_genome: String,
- // pub bam_type: BamType,
- pub path: PathBuf,
- pub modified: DateTime<Utc>,
- pub bam_stats: WGSBamStats,
- // pub cramino: Option<CraminoRes>,
- pub composition: Vec<(String, f64)>, // acquisition id
- }
- // #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
- // pub enum BamType {
- // WGS,
- // Panel(String),
- // ChIP(String),
- // }
- //
- impl WGSBam {
- pub fn new(path: PathBuf) -> anyhow::Result<Self> {
- let stem = path
- .clone()
- .file_stem()
- .context(format!("Can't parse stem from {}", path.display()))?
- .to_string_lossy()
- .to_string();
- let stem: Vec<&str> = stem.split('_').collect();
- if stem.len() != 3 {
- return Err(anyhow!("Error in bam name: {}", path.display()));
- }
- let id = stem[0].to_string();
- let time_point = stem[1].to_string();
- let reference_genome = stem
- .last()
- .context("Can't get last from stem {stem}")?
- .to_string();
- // let bam_type = if stem.len() == 4 {
- // match stem[2] {
- // "oncoT" => BamType::Panel("oncoT".to_string()),
- // "H3K27ac" => BamType::ChIP("H3K27ac".to_string()),
- // "H3K4me3" => BamType::ChIP("H3K4me3".to_string()),
- // _ => return Err(anyhow!("Error in bam name: {}", path.display())),
- // }
- // } else {
- // BamType::WGS
- // };
- let file_metadata = fs::metadata(&path)?;
- let modified = file_metadata.modified()?;
- let tp_dir = path
- .parent()
- .context("Can't parse parent from: {bam_path}")?;
- let json_path = format!(
- "{}/{id}_{time_point}_{reference_genome}_info.json",
- tp_dir.to_string_lossy()
- );
- let json_file = PathBuf::from(&json_path);
- if json_file.exists() && json_file.metadata()?.modified()? > modified {
- match Self::load_json(&json_path) {
- Ok(s) => return Ok(s),
- Err(e) => {
- warn!("Error while loading {}.\n{e}", json_path);
- }
- }
- }
- // let cramino_path = format!(
- // "{}/{id}_{time_point}_{reference_genome}_cramino.txt",
- // tp_dir.to_string_lossy()
- // );
- // let cramino = if bam_type == BamType::WGS {
- // if !PathBuf::from_str(&cramino_path)?.exists() {
- // // return Err(anyhow!("Cramino file missing {cramino_path}"));
- // Cramino::default().with_bam(path.to_str().unwrap())?;
- // }
- // let mut cramino = Cramino::default().with_result_path(&cramino_path);
- // cramino
- // .parse_results()
- // .context(format!("Error while parsing cramino for {cramino_path}"))?;
- //
- // if let Some(cramino) = cramino.results {
- // Some(cramino)
- // } else {
- // return Err(anyhow!("Cramino results parsing failed"));
- // }
- // } else {
- // None
- // };
- let composition = bam_compo(path.to_string_lossy().as_ref(), 20_000).context(format!(
- "Error while reading BAM composition for {}",
- path.display()
- ))?;
- let s = Self {
- path,
- bam_stats: WGSBamStats::new(&id, &time_point, Config::default())?,
- // cramino,
- id: id.to_string(),
- time_point: time_point.to_string(),
- // bam_type,
- reference_genome,
- composition,
- modified: modified.into(),
- };
- s.save_json(&json_path)?;
- Ok(s)
- }
- pub fn load_json(path: &str) -> anyhow::Result<Self> {
- let f = File::open(path)?;
- let s: Self = serde_json::from_reader(f)?;
- Ok(s)
- }
- pub fn save_json(&self, path: &str) -> anyhow::Result<()> {
- let f = File::create(path)?;
- serde_json::to_writer(f, self)?;
- Ok(())
- }
- }
- #[derive(Debug, Default)]
- pub struct BamCollection {
- pub bams: Vec<WGSBam>,
- }
- impl BamCollection {
- pub fn new(result_dir: &str) -> Self {
- load_bam_collection(result_dir)
- }
- pub fn by_acquisition_id(&self) -> HashMap<String, Vec<&WGSBam>> {
- let mut acq: HashMap<String, Vec<&WGSBam>> = HashMap::new();
- for bam in self.bams.iter() {
- for (acq_id, _) in bam.composition.iter() {
- if let Some(entry) = acq.get_mut(acq_id) {
- entry.push(bam);
- } else {
- acq.insert(acq_id.to_string(), vec![bam]);
- }
- }
- }
- acq
- }
- pub fn get(&self, id: &str, time_point: &str) -> Vec<&WGSBam> {
- self.bams
- .iter()
- .filter(|b| b.id == id && b.time_point == time_point)
- .collect()
- }
- pub fn by_id_completed(&self, min_diag_cov: f32, min_mrd_cov: f32) -> Vec<WGSBam> {
- self.bams
- .iter()
- // .filter(|b| matches!(b.bam_type, BamType::WGS))
- .filter(|b| match b.time_point.as_str() {
- "diag" => b.bam_stats.global_coverage >= min_diag_cov as f64,
- "mrd" => b.bam_stats.global_coverage >= min_mrd_cov as f64,
- _ => false,
- })
- // .filter(|b| match &b.cramino {
- // Some(cramino) => match b.time_point.as_str() {
- // "diag" => cramino.mean_length >= min_diag_cov as f64,
- // "mrd" => cramino.mean_length >= min_mrd_cov as f64,
- // _ => false,
- // },
- // _ => false,
- // })
- .cloned()
- .collect()
- }
- pub fn ids(&self) -> Vec<String> {
- let mut ids: Vec<String> = self.bams.iter().map(|b| b.id.clone()).collect();
- ids.sort();
- ids.dedup();
- ids
- }
- }
- pub fn load_bam_collection(result_dir: &str) -> BamCollection {
- let pattern = format!("{}/*/*/*.bam", result_dir);
- let bams = glob(&pattern)
- .expect("Failed to read glob pattern")
- // .par_bridge()
- .filter_map(|entry| {
- match entry {
- Ok(path) => match WGSBam::new(path) {
- Ok(bam) => return Some(bam),
- Err(e) => warn!("{e}"),
- },
- Err(e) => warn!("Error: {:?}", e),
- }
- None
- })
- .collect();
- BamCollection { bams }
- }
- pub fn bam_compo(file_path: &str, sample_size: usize) -> anyhow::Result<Vec<(String, f64)>> {
- let mut bam = rust_htslib::bam::Reader::from_path(file_path)?;
- let mut rgs: HashMap<String, u64> = HashMap::new();
- for result in bam.records().filter_map(Result::ok).take(sample_size) {
- if let rust_htslib::bam::record::Aux::String(s) = result.aux(b"RG")? {
- *rgs.entry(s.to_string()).or_default() += 1;
- }
- }
- Ok(rgs
- .into_iter()
- .map(|(rg, n)| (rg.to_string(), n as f64 * 100.0 / sample_size as f64))
- .map(|(rg, p)| {
- let (a, _) = rg.split_once('_').unwrap();
- (a.to_string(), p)
- })
- .collect())
- }
- pub struct BamReadsSampler {
- pub positions: Vec<(String, u64)>,
- pub reader: rust_htslib::bam::IndexedReader,
- current_index: usize,
- }
- impl BamReadsSampler {
- pub fn new(path: &str, n: usize) -> anyhow::Result<Self> {
- debug!("Reading BAM file: {path}");
- let reader = rust_htslib::bam::IndexedReader::from_path(path)?;
- let header = reader.header();
- let contig_len: Vec<(String, u64)> = header
- .target_names()
- .into_iter()
- .map(|target_name| {
- let tid = header.tid(target_name).unwrap();
- (
- String::from_utf8(target_name.to_vec()).unwrap(),
- header.target_len(tid).unwrap(),
- )
- })
- .collect();
- let positions = sample_random_positions(&contig_len, n);
- Ok(Self {
- positions,
- reader,
- current_index: 0,
- })
- }
- }
- impl Iterator for BamReadsSampler {
- type Item = Result<rust_htslib::bam::Record, rust_htslib::errors::Error>;
- fn next(&mut self) -> Option<Self::Item> {
- loop {
- if self.current_index < self.positions.len() {
- let (contig, pos) = &self.positions[self.current_index];
- match self.reader.fetch((contig, *pos, pos + 1)) {
- Ok(_) => (),
- Err(e) => warn!("Error while reading BAM {e}"),
- }
- let result = self.reader.records().next();
- self.current_index += 1;
- if result.is_some() {
- return result;
- }
- } else {
- return None;
- }
- }
- }
- }
- pub fn n_mapped_reads(bam_path: &str) -> anyhow::Result<u64> {
- let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path)?;
- let mapped = bam
- .index_stats()?
- .into_iter()
- .filter(|(tid, _, _, _)| *tid < 24)
- .map(|(_, _, m, _)| m)
- .sum();
- Ok(mapped)
- }
- // 0-based inclusif
- pub fn nt_pileup(
- bam: &mut rust_htslib::bam::IndexedReader,
- chr: &str,
- position: u32,
- with_next_ins: bool,
- ) -> anyhow::Result<Vec<u8>> {
- use rust_htslib::{bam, bam::Read};
- let mut bases = Vec::new();
- bam.fetch((chr, position, position + 1))?;
- let mut bam_pileup = Vec::new();
- for p in bam.pileup() {
- let pileup = p.context(format!(
- "Can't pileup bam at position {}:{} (0-based)",
- chr, position
- ))?;
- let cur_position = pileup.pos();
- if cur_position == position {
- for alignment in pileup.alignments() {
- match alignment.indel() {
- bam::pileup::Indel::Ins(_len) => bam_pileup.push(b'I'),
- bam::pileup::Indel::Del(_len) => bam_pileup.push(b'D'),
- _ => {
- let record = alignment.record();
- if record.seq_len() > 0 {
- if let Some(b) = base_at(&record, position, with_next_ins)? {
- bases.push(b);
- }
- } else if alignment.is_del() {
- bases.push(b'D');
- }
- }
- }
- }
- }
- }
- Ok(bases)
- }
- pub fn base_at(
- record: &rust_htslib::bam::record::Record,
- at_pos: u32,
- with_next_ins: bool,
- ) -> anyhow::Result<Option<u8>> {
- let cigar = record.cigar();
- let seq = record.seq();
- let pos = cigar.pos() as u32;
- let mut read_i = 0u32;
- // let at_pos = at_pos - 1;
- let mut ref_pos = pos;
- if ref_pos > at_pos {
- return Ok(None);
- }
- for (id, op) in cigar.iter().enumerate() {
- let (add_read, add_ref) = match *op {
- Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
- Cigar::Ins(len) => (len, 0),
- Cigar::Del(len) => (0, len),
- Cigar::RefSkip(len) => (0, len),
- Cigar::SoftClip(len) => (len, 0),
- Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
- };
- // If at the end of the op len and next op is Ins return I
- if with_next_ins && ref_pos + add_read == at_pos + 1 {
- if let Some(Cigar::Ins(_)) = cigar.get(id + 1) {
- return Ok(Some(b'I'));
- }
- }
- if ref_pos + add_ref > at_pos {
- // Handle deletions directly
- if let Cigar::Del(_) = *op {
- return Ok(Some(b'D'));
- } else if let Cigar::RefSkip(_) = op {
- return Ok(None);
- } else {
- let diff = at_pos - ref_pos;
- let p = read_i + diff;
- return Ok(Some(seq[p as usize]));
- }
- }
- read_i += add_read;
- ref_pos += add_ref;
- }
- Ok(None)
- }
- pub fn counts_at(
- bam: &mut rust_htslib::bam::IndexedReader,
- chr: &str,
- position: u32,
- ) -> anyhow::Result<HashMap<String, i32>> {
- let p = nt_pileup(bam, chr, position, false)?
- .iter()
- .map(|e| String::from_utf8(vec![*e]).unwrap())
- .collect::<Vec<_>>();
- let mut counts = HashMap::new();
- for item in p.iter() {
- *counts.entry(item.to_string()).or_insert(0) += 1;
- }
- Ok(counts)
- }
- pub fn ins_pileup(
- bam: &mut rust_htslib::bam::IndexedReader,
- chr: &str,
- position: u32,
- ) -> anyhow::Result<Vec<String>> {
- let mut bases = Vec::new();
- bam.fetch((chr, position, position + 10))?;
- for p in bam.pileup() {
- let pileup = p.context(format!(
- "Can't pileup bam at position {}:{} (0-based)",
- chr, position
- ))?;
- let cur_position = pileup.pos();
- // Ins in the next position
- if cur_position == position + 1 {
- // debug!("{cur_position}");
- for alignment in pileup.alignments() {
- let record = alignment.record();
- if record.seq_len() > 0 {
- if let Some(b) = ins_at(&record, position)? {
- bases.push(b);
- }
- }
- }
- }
- }
- Ok(bases)
- }
- pub fn ins_at(
- record: &rust_htslib::bam::record::Record,
- at_pos: u32,
- ) -> anyhow::Result<Option<String>> {
- use rust_htslib::bam::record::Cigar;
- let cigar = record.cigar();
- let seq = record.seq();
- let pos = cigar.pos() as u32;
- let mut read_i = 0u32;
- let mut ref_pos = pos;
- if ref_pos > at_pos {
- return Ok(None);
- }
- // debug!(
- // "read: {}",
- // String::from_utf8(record.qname().to_vec()).unwrap()
- // );
- for op in cigar.iter() {
- let (add_read, add_ref) = match *op {
- Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
- Cigar::Ins(len) => (len, 0),
- Cigar::Del(len) => (0, len),
- Cigar::RefSkip(len) => (0, len),
- Cigar::SoftClip(len) => (len, 0),
- Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
- };
- if ref_pos + add_read > at_pos && ref_pos + add_read < at_pos + 10 {
- if let Cigar::Ins(v) = *op {
- // debug!(
- // "ins size {v} @ {} (corrected {})",
- // ref_pos + add_read,
- // (ref_pos + add_read) - v - 1
- // );
- if (ref_pos + add_read) - v - 1 == at_pos {
- let inserted_seq =
- seq.as_bytes()[read_i as usize..(read_i + v) as usize].to_vec();
- return Ok(Some(String::from_utf8(inserted_seq)?));
- }
- }
- }
- read_i += add_read;
- ref_pos += add_ref;
- }
- Ok(None)
- }
- pub fn counts_ins_at(
- bam: &mut rust_htslib::bam::IndexedReader,
- chr: &str,
- position: u32,
- ) -> anyhow::Result<HashMap<String, i32>> {
- let p = ins_pileup(bam, chr, position)?;
- let mut counts = HashMap::new();
- for item in p.iter() {
- *counts.entry(item.to_string()).or_insert(0) += 1;
- }
- Ok(counts)
- }
- pub fn sample_random_positions(chromosomes: &Vec<(String, u64)>, n: usize) -> Vec<(String, u64)> {
- let mut rng = rng();
- let total_length: u64 = chromosomes.iter().map(|(_, length)| length).sum();
- (0..n)
- .map(|_| {
- let random_position = rng.random_range(0..total_length);
- let mut cumulative_length = 0;
- for (chr, length) in chromosomes {
- cumulative_length += length;
- if random_position < cumulative_length {
- let position_within_chr = random_position - (cumulative_length - length);
- return (chr.clone(), position_within_chr);
- }
- }
- unreachable!("Should always find a chromosome")
- })
- .collect()
- }
- #[derive(Debug, Clone, Deserialize, Serialize)]
- pub struct WGSBamStats {
- pub all_records: u128,
- pub n_reads: u128,
- pub mapped_yield: u128,
- pub global_coverage: f64,
- pub karyotype: Vec<(i32, u64, String, u128)>,
- pub n50: u128,
- pub by_lengths: Vec<(u128, u32)>,
- }
- impl WGSBamStats {
- pub fn new(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
- let bam_path = config.solo_bam(id, time);
- let Config {
- bam_min_mapq,
- bam_n_threads,
- ..
- } = config;
- let mut bam = rust_htslib::bam::Reader::from_path(bam_path)?;
- let h = bam.header().clone();
- let header = rust_htslib::bam::Header::from_template(&h);
- bam.set_threads(bam_n_threads as usize)?;
- let mut mapped_lengths = Vec::new();
- let mut all_records = 0;
- let mut n_reads = 0;
- let mut lengths_by_tid: HashMap<i32, Vec<u128>> = HashMap::new();
- for read in bam
- .rc_records()
- .map(|x| x.expect("Failure parsing Bam file"))
- .inspect(|_| all_records += 1)
- .filter(|read| {
- read.flags()
- & (rust_htslib::htslib::BAM_FUNMAP
- | rust_htslib::htslib::BAM_FSECONDARY
- | rust_htslib::htslib::BAM_FQCFAIL
- | rust_htslib::htslib::BAM_FDUP) as u16
- == 0
- })
- .filter(|r| r.mapq() >= bam_min_mapq)
- {
- n_reads += 1;
- let mapped_len = (read.reference_end() - read.pos()) as u128;
- mapped_lengths.push(mapped_len);
- lengths_by_tid
- .entry(read.tid())
- .or_default()
- .push(mapped_len);
- if n_reads % 500_000 == 0 {
- info!("{n_reads} reads parsed");
- }
- }
- let mapped_yield = mapped_lengths.par_iter().sum::<u128>();
- let genome = get_genome_sizes(&header)?;
- let genome_size = genome.values().map(|v| *v as u128).sum::<u128>();
- let global_coverage = mapped_yield as f64 / genome_size as f64;
- let by_lengths: DashMap<u128, u32> = DashMap::new();
- mapped_lengths
- .par_iter()
- .for_each(|l| *by_lengths.entry(*l).or_default() += 1);
- let mut by_lengths: Vec<(u128, u32)> =
- by_lengths.iter().map(|e| (*e.key(), *e.value())).collect();
- by_lengths.par_sort_by(|a, b| a.0.cmp(&b.0));
- // This is not n50
- let n50 = by_lengths[by_lengths.len() / 2].0;
- let mut karyotype: Vec<(i32, u64, String, u128)> = lengths_by_tid
- .iter()
- .map(|(tid, lengths)| {
- let contig = String::from_utf8(h.tid2name(*tid as u32).to_vec()).unwrap();
- (
- *tid,
- *genome.get(&contig).unwrap(),
- contig,
- lengths.par_iter().sum::<u128>(),
- )
- })
- .collect();
- karyotype.sort_by(|(ac, _, _, _), (bc, _, _, _)| ac.cmp(bc));
- Ok(Self {
- all_records,
- n_reads,
- mapped_yield,
- global_coverage,
- karyotype,
- n50,
- by_lengths,
- })
- }
- pub fn print(&self) {
- println!("N records\t{}", self.all_records);
- println!("N counted reads\t{}", self.n_reads);
- println!("Mapped yield [Gb]\t{:.2}", self.mapped_yield as f64 / 1e9);
- println!("Mean coverage\t{:.2}", self.global_coverage);
- self.karyotype
- .iter()
- .for_each(|(_, contig_len, contig, contig_yield)| {
- println!(
- "{contig}\t{:.2}",
- (*contig_yield as f64 / *contig_len as f64) / self.global_coverage
- );
- });
- }
- }
- pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<HashMap<String, u64>> {
- let mut genome = HashMap::new();
- for (key, records) in header.to_hashmap() {
- for record in records {
- if key == "SQ" {
- genome.insert(
- record["SN"].to_string(),
- record["LN"]
- .parse::<u64>()
- .expect("Failed parsing length of chromosomes"),
- );
- }
- }
- }
- Ok(genome)
- }
- // fn softclipped_bases(read: &rust_htslib::bam::Record) -> u128 {
- // (read.cigar().leading_softclips() + read.cigar().trailing_softclips()) as u128
- // }
|