use std::{ collections::HashMap, fs::{self, File}, path::PathBuf, }; use anyhow::{anyhow, Context}; use chrono::{DateTime, Utc}; use dashmap::DashMap; use glob::glob; use log::{debug, info, warn}; use rand::{rng, Rng}; use rayon::prelude::*; use rust_htslib::bam::{ext::BamRecordExtensions, record::Cigar, Read}; use serde::{Deserialize, Serialize}; use crate::config::Config; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct WGSBam { pub id: String, pub time_point: String, pub reference_genome: String, // pub bam_type: BamType, pub path: PathBuf, pub modified: DateTime, pub bam_stats: WGSBamStats, // pub cramino: Option, pub composition: Vec<(String, f64)>, // acquisition id } // #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] // pub enum BamType { // WGS, // Panel(String), // ChIP(String), // } // impl WGSBam { pub fn new(path: PathBuf) -> anyhow::Result { let stem = path .clone() .file_stem() .context(format!("Can't parse stem from {}", path.display()))? .to_string_lossy() .to_string(); let stem: Vec<&str> = stem.split('_').collect(); if stem.len() != 3 { return Err(anyhow!("Error in bam name: {}", path.display())); } let id = stem[0].to_string(); let time_point = stem[1].to_string(); let reference_genome = stem .last() .context("Can't get last from stem {stem}")? .to_string(); // let bam_type = if stem.len() == 4 { // match stem[2] { // "oncoT" => BamType::Panel("oncoT".to_string()), // "H3K27ac" => BamType::ChIP("H3K27ac".to_string()), // "H3K4me3" => BamType::ChIP("H3K4me3".to_string()), // _ => return Err(anyhow!("Error in bam name: {}", path.display())), // } // } else { // BamType::WGS // }; let file_metadata = fs::metadata(&path)?; let modified = file_metadata.modified()?; let tp_dir = path .parent() .context("Can't parse parent from: {bam_path}")?; let json_path = format!( "{}/{id}_{time_point}_{reference_genome}_info.json", tp_dir.to_string_lossy() ); let json_file = PathBuf::from(&json_path); if json_file.exists() && json_file.metadata()?.modified()? > modified { match Self::load_json(&json_path) { Ok(s) => return Ok(s), Err(e) => { warn!("Error while loading {}.\n{e}", json_path); } } } // let cramino_path = format!( // "{}/{id}_{time_point}_{reference_genome}_cramino.txt", // tp_dir.to_string_lossy() // ); // let cramino = if bam_type == BamType::WGS { // if !PathBuf::from_str(&cramino_path)?.exists() { // // return Err(anyhow!("Cramino file missing {cramino_path}")); // Cramino::default().with_bam(path.to_str().unwrap())?; // } // let mut cramino = Cramino::default().with_result_path(&cramino_path); // cramino // .parse_results() // .context(format!("Error while parsing cramino for {cramino_path}"))?; // // if let Some(cramino) = cramino.results { // Some(cramino) // } else { // return Err(anyhow!("Cramino results parsing failed")); // } // } else { // None // }; let composition = bam_compo(path.to_string_lossy().as_ref(), 20_000).context(format!( "Error while reading BAM composition for {}", path.display() ))?; let s = Self { path, bam_stats: WGSBamStats::new(&id, &time_point, Config::default())?, // cramino, id: id.to_string(), time_point: time_point.to_string(), // bam_type, reference_genome, composition, modified: modified.into(), }; s.save_json(&json_path)?; Ok(s) } pub fn load_json(path: &str) -> anyhow::Result { let f = File::open(path)?; let s: Self = serde_json::from_reader(f)?; Ok(s) } pub fn save_json(&self, path: &str) -> anyhow::Result<()> { let f = File::create(path)?; serde_json::to_writer(f, self)?; Ok(()) } } #[derive(Debug, Default)] pub struct BamCollection { pub bams: Vec, } impl BamCollection { pub fn new(result_dir: &str) -> Self { load_bam_collection(result_dir) } pub fn by_acquisition_id(&self) -> HashMap> { let mut acq: HashMap> = HashMap::new(); for bam in self.bams.iter() { for (acq_id, _) in bam.composition.iter() { if let Some(entry) = acq.get_mut(acq_id) { entry.push(bam); } else { acq.insert(acq_id.to_string(), vec![bam]); } } } acq } pub fn get(&self, id: &str, time_point: &str) -> Vec<&WGSBam> { self.bams .iter() .filter(|b| b.id == id && b.time_point == time_point) .collect() } pub fn by_id_completed(&self, min_diag_cov: f32, min_mrd_cov: f32) -> Vec { self.bams .iter() // .filter(|b| matches!(b.bam_type, BamType::WGS)) .filter(|b| match b.time_point.as_str() { "diag" => b.bam_stats.global_coverage >= min_diag_cov as f64, "mrd" => b.bam_stats.global_coverage >= min_mrd_cov as f64, _ => false, }) // .filter(|b| match &b.cramino { // Some(cramino) => match b.time_point.as_str() { // "diag" => cramino.mean_length >= min_diag_cov as f64, // "mrd" => cramino.mean_length >= min_mrd_cov as f64, // _ => false, // }, // _ => false, // }) .cloned() .collect() } pub fn ids(&self) -> Vec { let mut ids: Vec = self.bams.iter().map(|b| b.id.clone()).collect(); ids.sort(); ids.dedup(); ids } } pub fn load_bam_collection(result_dir: &str) -> BamCollection { let pattern = format!("{}/*/*/*.bam", result_dir); let bams = glob(&pattern) .expect("Failed to read glob pattern") // .par_bridge() .filter_map(|entry| { match entry { Ok(path) => match WGSBam::new(path) { Ok(bam) => return Some(bam), Err(e) => warn!("{e}"), }, Err(e) => warn!("Error: {:?}", e), } None }) .collect(); BamCollection { bams } } pub fn bam_compo(file_path: &str, sample_size: usize) -> anyhow::Result> { let mut bam = rust_htslib::bam::Reader::from_path(file_path)?; let mut rgs: HashMap = HashMap::new(); for result in bam.records().filter_map(Result::ok).take(sample_size) { if let rust_htslib::bam::record::Aux::String(s) = result.aux(b"RG")? { *rgs.entry(s.to_string()).or_default() += 1; } } Ok(rgs .into_iter() .map(|(rg, n)| (rg.to_string(), n as f64 * 100.0 / sample_size as f64)) .map(|(rg, p)| { let (a, _) = rg.split_once('_').unwrap(); (a.to_string(), p) }) .collect()) } pub struct BamReadsSampler { pub positions: Vec<(String, u64)>, pub reader: rust_htslib::bam::IndexedReader, current_index: usize, } impl BamReadsSampler { pub fn new(path: &str, n: usize) -> anyhow::Result { debug!("Reading BAM file: {path}"); let reader = rust_htslib::bam::IndexedReader::from_path(path)?; let header = reader.header(); let contig_len: Vec<(String, u64)> = header .target_names() .into_iter() .map(|target_name| { let tid = header.tid(target_name).unwrap(); ( String::from_utf8(target_name.to_vec()).unwrap(), header.target_len(tid).unwrap(), ) }) .collect(); let positions = sample_random_positions(&contig_len, n); Ok(Self { positions, reader, current_index: 0, }) } } impl Iterator for BamReadsSampler { type Item = Result; fn next(&mut self) -> Option { loop { if self.current_index < self.positions.len() { let (contig, pos) = &self.positions[self.current_index]; match self.reader.fetch((contig, *pos, pos + 1)) { Ok(_) => (), Err(e) => warn!("Error while reading BAM {e}"), } let result = self.reader.records().next(); self.current_index += 1; if result.is_some() { return result; } } else { return None; } } } } pub fn n_mapped_reads(bam_path: &str) -> anyhow::Result { let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path)?; let mapped = bam .index_stats()? .into_iter() .filter(|(tid, _, _, _)| *tid < 24) .map(|(_, _, m, _)| m) .sum(); Ok(mapped) } // 0-based inclusif pub fn nt_pileup( bam: &mut rust_htslib::bam::IndexedReader, chr: &str, position: u32, with_next_ins: bool, ) -> anyhow::Result> { use rust_htslib::{bam, bam::Read}; let mut bases = Vec::new(); bam.fetch((chr, position, position + 1))?; let mut bam_pileup = Vec::new(); for p in bam.pileup() { let pileup = p.context(format!( "Can't pileup bam at position {}:{} (0-based)", chr, position ))?; let cur_position = pileup.pos(); if cur_position == position { for alignment in pileup.alignments() { match alignment.indel() { bam::pileup::Indel::Ins(_len) => bam_pileup.push(b'I'), bam::pileup::Indel::Del(_len) => bam_pileup.push(b'D'), _ => { let record = alignment.record(); if record.seq_len() > 0 { if let Some(b) = base_at(&record, position, with_next_ins)? { bases.push(b); } } else if alignment.is_del() { bases.push(b'D'); } } } } } } Ok(bases) } pub fn base_at( record: &rust_htslib::bam::record::Record, at_pos: u32, with_next_ins: bool, ) -> anyhow::Result> { let cigar = record.cigar(); let seq = record.seq(); let pos = cigar.pos() as u32; let mut read_i = 0u32; // let at_pos = at_pos - 1; let mut ref_pos = pos; if ref_pos > at_pos { return Ok(None); } for (id, op) in cigar.iter().enumerate() { let (add_read, add_ref) = match *op { Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len), Cigar::Ins(len) => (len, 0), Cigar::Del(len) => (0, len), Cigar::RefSkip(len) => (0, len), Cigar::SoftClip(len) => (len, 0), Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0), }; // If at the end of the op len and next op is Ins return I if with_next_ins && ref_pos + add_read == at_pos + 1 { if let Some(Cigar::Ins(_)) = cigar.get(id + 1) { return Ok(Some(b'I')); } } if ref_pos + add_ref > at_pos { // Handle deletions directly if let Cigar::Del(_) = *op { return Ok(Some(b'D')); } else if let Cigar::RefSkip(_) = op { return Ok(None); } else { let diff = at_pos - ref_pos; let p = read_i + diff; return Ok(Some(seq[p as usize])); } } read_i += add_read; ref_pos += add_ref; } Ok(None) } pub fn counts_at( bam: &mut rust_htslib::bam::IndexedReader, chr: &str, position: u32, ) -> anyhow::Result> { let p = nt_pileup(bam, chr, position, false)? .iter() .map(|e| String::from_utf8(vec![*e]).unwrap()) .collect::>(); let mut counts = HashMap::new(); for item in p.iter() { *counts.entry(item.to_string()).or_insert(0) += 1; } Ok(counts) } pub fn ins_pileup( bam: &mut rust_htslib::bam::IndexedReader, chr: &str, position: u32, ) -> anyhow::Result> { let mut bases = Vec::new(); bam.fetch((chr, position, position + 10))?; for p in bam.pileup() { let pileup = p.context(format!( "Can't pileup bam at position {}:{} (0-based)", chr, position ))?; let cur_position = pileup.pos(); // Ins in the next position if cur_position == position + 1 { // debug!("{cur_position}"); for alignment in pileup.alignments() { let record = alignment.record(); if record.seq_len() > 0 { if let Some(b) = ins_at(&record, position)? { bases.push(b); } } } } } Ok(bases) } pub fn ins_at( record: &rust_htslib::bam::record::Record, at_pos: u32, ) -> anyhow::Result> { use rust_htslib::bam::record::Cigar; let cigar = record.cigar(); let seq = record.seq(); let pos = cigar.pos() as u32; let mut read_i = 0u32; let mut ref_pos = pos; if ref_pos > at_pos { return Ok(None); } // debug!( // "read: {}", // String::from_utf8(record.qname().to_vec()).unwrap() // ); for op in cigar.iter() { let (add_read, add_ref) = match *op { Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len), Cigar::Ins(len) => (len, 0), Cigar::Del(len) => (0, len), Cigar::RefSkip(len) => (0, len), Cigar::SoftClip(len) => (len, 0), Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0), }; if ref_pos + add_read > at_pos && ref_pos + add_read < at_pos + 10 { if let Cigar::Ins(v) = *op { // debug!( // "ins size {v} @ {} (corrected {})", // ref_pos + add_read, // (ref_pos + add_read) - v - 1 // ); if (ref_pos + add_read) - v - 1 == at_pos { let inserted_seq = seq.as_bytes()[read_i as usize..(read_i + v) as usize].to_vec(); return Ok(Some(String::from_utf8(inserted_seq)?)); } } } read_i += add_read; ref_pos += add_ref; } Ok(None) } pub fn counts_ins_at( bam: &mut rust_htslib::bam::IndexedReader, chr: &str, position: u32, ) -> anyhow::Result> { let p = ins_pileup(bam, chr, position)?; let mut counts = HashMap::new(); for item in p.iter() { *counts.entry(item.to_string()).or_insert(0) += 1; } Ok(counts) } pub fn sample_random_positions(chromosomes: &Vec<(String, u64)>, n: usize) -> Vec<(String, u64)> { let mut rng = rng(); let total_length: u64 = chromosomes.iter().map(|(_, length)| length).sum(); (0..n) .map(|_| { let random_position = rng.random_range(0..total_length); let mut cumulative_length = 0; for (chr, length) in chromosomes { cumulative_length += length; if random_position < cumulative_length { let position_within_chr = random_position - (cumulative_length - length); return (chr.clone(), position_within_chr); } } unreachable!("Should always find a chromosome") }) .collect() } #[derive(Debug, Clone, Deserialize, Serialize)] pub struct WGSBamStats { pub all_records: u128, pub n_reads: u128, pub mapped_yield: u128, pub global_coverage: f64, pub karyotype: Vec<(i32, u64, String, u128)>, pub n50: u128, pub by_lengths: Vec<(u128, u32)>, } impl WGSBamStats { pub fn new(id: &str, time: &str, config: Config) -> anyhow::Result { let bam_path = config.solo_bam(id, time); let Config { bam_min_mapq, bam_n_threads, .. } = config; let mut bam = rust_htslib::bam::Reader::from_path(bam_path)?; let h = bam.header().clone(); let header = rust_htslib::bam::Header::from_template(&h); bam.set_threads(bam_n_threads as usize)?; let mut mapped_lengths = Vec::new(); let mut all_records = 0; let mut n_reads = 0; let mut lengths_by_tid: HashMap> = HashMap::new(); for read in bam .rc_records() .map(|x| x.expect("Failure parsing Bam file")) .inspect(|_| all_records += 1) .filter(|read| { read.flags() & (rust_htslib::htslib::BAM_FUNMAP | rust_htslib::htslib::BAM_FSECONDARY | rust_htslib::htslib::BAM_FQCFAIL | rust_htslib::htslib::BAM_FDUP) as u16 == 0 }) .filter(|r| r.mapq() >= bam_min_mapq) { n_reads += 1; let mapped_len = (read.reference_end() - read.pos()) as u128; mapped_lengths.push(mapped_len); lengths_by_tid .entry(read.tid()) .or_default() .push(mapped_len); if n_reads % 500_000 == 0 { info!("{n_reads} reads parsed"); } } let mapped_yield = mapped_lengths.par_iter().sum::(); let genome = get_genome_sizes(&header)?; let genome_size = genome.values().map(|v| *v as u128).sum::(); let global_coverage = mapped_yield as f64 / genome_size as f64; let by_lengths: DashMap = DashMap::new(); mapped_lengths .par_iter() .for_each(|l| *by_lengths.entry(*l).or_default() += 1); let mut by_lengths: Vec<(u128, u32)> = by_lengths.iter().map(|e| (*e.key(), *e.value())).collect(); by_lengths.par_sort_by(|a, b| a.0.cmp(&b.0)); // This is not n50 let n50 = by_lengths[by_lengths.len() / 2].0; let mut karyotype: Vec<(i32, u64, String, u128)> = lengths_by_tid .iter() .map(|(tid, lengths)| { let contig = String::from_utf8(h.tid2name(*tid as u32).to_vec()).unwrap(); ( *tid, *genome.get(&contig).unwrap(), contig, lengths.par_iter().sum::(), ) }) .collect(); karyotype.sort_by(|(ac, _, _, _), (bc, _, _, _)| ac.cmp(bc)); Ok(Self { all_records, n_reads, mapped_yield, global_coverage, karyotype, n50, by_lengths, }) } pub fn print(&self) { println!("N records\t{}", self.all_records); println!("N counted reads\t{}", self.n_reads); println!("Mapped yield [Gb]\t{:.2}", self.mapped_yield as f64 / 1e9); println!("Mean coverage\t{:.2}", self.global_coverage); self.karyotype .iter() .for_each(|(_, contig_len, contig, contig_yield)| { println!( "{contig}\t{:.2}", (*contig_yield as f64 / *contig_len as f64) / self.global_coverage ); }); } } pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result> { let mut genome = HashMap::new(); for (key, records) in header.to_hashmap() { for record in records { if key == "SQ" { genome.insert( record["SN"].to_string(), record["LN"] .parse::() .expect("Failed parsing length of chromosomes"), ); } } } Ok(genome) } // fn softclipped_bases(read: &rust_htslib::bam::Record) -> u128 { // (read.cigar().leading_softclips() + read.cigar().trailing_softclips()) as u128 // }