| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598 |
- use anyhow::Context;
- use dashmap::DashMap;
- use noodles_bgzf as bgzf;
- use noodles_core::{Position, Region};
- use noodles_gff as gff;
- use noodles_vcf::{self as vcf};
- use rust_htslib::{bam, bam::Read};
- use clap::Parser;
- use log::info;
- use statrs::distribution::{ChiSquared, ContinuousCDF};
- use std::{
- collections::HashMap,
- fmt,
- fs::File,
- io::{BufReader, Write},
- time::Instant,
- };
- use rayon::prelude::*;
- #[derive(Parser, Debug)]
- #[command(author, version, about, long_about = None)]
- struct Args {
- #[arg(short = 'b', long)]
- bam_path: String,
- #[arg(short = 'v', long)]
- constit_vcf: String,
- #[arg(short, long)]
- gff_path: String,
- #[arg(short, long)]
- out_prefix: String,
- }
- #[derive(Debug, Default, Clone)]
- struct Exon {
- id: String,
- contig: String,
- start: u32, // inclusive 1-based inclusive
- end: u32,
- strand: noodles_gff::record::Strand,
- depth_start: u32,
- depth_before_start: u32,
- depth_end: u32,
- depth_after_end: u32,
- mean_depth: u32,
- snps: Vec<Snp>,
- }
- impl fmt::Display for Exon {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let n_p_sign = self
- .snps
- .iter()
- .filter_map(|s| s.get_p().ok())
- .filter(|p| p <= &0.01)
- .count();
- writeln!(
- f,
- "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
- self.contig,
- self.start,
- self.end,
- self.mean_depth,
- n_p_sign,
- self.depth_before_start,
- self.depth_start,
- self.depth_end,
- self.depth_after_end,
- self.id
- )
- }
- }
- // impl FromStr for Exon {
- // type Err = anyhow::Error;
- //
- // fn from_str(s: &str) -> anyhow::Result<Self> {
- // let parts: Vec<&str> = s.split('-').collect();
- // if parts.len() != 2 {
- // return Err("Invalid format".parse().unwrap());
- // }
- //
- // let start = parts[0].parse()?;
- // let end = parts[1].parse()?;
- //
- // Ok(Exon {})
- // }
- // }
- #[derive(Debug, Clone)]
- struct Snp {
- pos: u32,
- alt: u8,
- n_alt_mrd: u32,
- depth_mrd: u32,
- n_alt_rna: Option<u32>,
- depth_rna: Option<u32>,
- }
- fn main() -> anyhow::Result<()> {
- let now = Instant::now();
- env_logger::init();
- let args = Args::parse();
- // let mut vcf_reader = vcf::io::indexed_reader::Builder::default()
- // .build_from_path(&args.constit_vcf)
- // .context(format!("Failed to open: {}", args.constit_vcf))?;
- // let header = vcf_reader.read_header()?;
- // let mut gff_reader = File::open(&args.gff_path)
- // .map(BufReader::new)
- // .context(format!("Failed to open: {}", args.gff_path))?;
- //
- // let mut buffer = [0u8; 1];
- // let mut line_buffer = Vec::new();
- //
- // let mut exons: Vec<Exon> = Vec::new();
- // loop {
- // match std::io::Read::read_exact(&mut gff_reader, &mut buffer) {
- // Ok(_) => {
- // // Process each byte here
- // println!("Byte: {}", buffer[0]);
- // match buffer[0] {
- // b'\n' => {
- // // new line
- // let line_str = str::from_utf8(&line_buffer)?;
- // line_buffer.clear();
- // if line_str.contains("\texon\t") {
- // exons.push(line_str.parse()?);
- // }
- // }
- // _ => {
- // line_buffer.push(buffer[0]);
- // }
- // }
- // }
- // Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
- // // Reached end of file
- // break;
- // }
- // Err(e) => anyhow::bail!(e),
- // }
- // }
- let mut gff_reader = File::open(&args.gff_path)
- .map(BufReader::new)
- .map(gff::io::Reader::new)
- .context(format!("Failed to open: {}", args.gff_path))?;
- let out_exons = format!("{}_exons.gz", args.out_prefix);
- let out_genes = format!("{}_genes.gz", args.out_prefix);
- let mut n_exons = 0;
- let mut exons: Vec<Exon> = Vec::new();
- for result in gff_reader.records() {
- let record = result?;
- if record.ty() == "exon" {
- let mut exon = Exon::default();
- let ids: Vec<String> = record
- .attributes()
- .iter()
- .filter(|(k, _)| *k == "Parent")
- .filter_map(|(_, v)| {
- if let noodles_gff::record::attributes::field::value::Value::String(s) = v {
- Some(s.to_string())
- } else {
- None
- }
- })
- .collect();
- exon.id = ids
- .first()
- .map(|s| s.as_str())
- .unwrap_or("No_id")
- .to_string();
- if !exon.id.starts_with("NM") {
- continue;
- }
- let contig = record.reference_sequence_name();
- exon.contig = contig.to_string();
- let gff_start = record.start(); // [1]
- exon.start = gff_start.get() as u32;
- let gff_end = record.end();
- exon.end = gff_end.get() as u32;
- // let region = Region::new(contig, gff_start..=gff_end);
- let gff_strand = record.strand();
- exon.strand = gff_strand;
- // let query = vcf_reader.query(&header, ®ion)?;
- // let mut snps = Vec::new();
- // for result in query {
- // let record = Snp::from_vcf_record(&result?, &header)?;
- // snps.push(record);
- // }
- // exon.snps = snps;
- exons.push(exon);
- n_exons += 1;
- if n_exons % 10_000 == 0 {
- info!("{n_exons} exons parsed.");
- }
- }
- }
- info!(
- "{} exons to look in bam file in {} chunks...",
- exons.len(),
- exons.len() / 1_000
- );
- let contig_exons: DashMap<String, Vec<Exon>> = DashMap::new();
- exons.par_chunks(1_000).enumerate().for_each(|(nc, c)| {
- let mut bam_reader = bam::IndexedReader::from_path(&args.bam_path)
- .context(format!("Failed to open: {}", args.bam_path))
- .unwrap();
- let mut vcf_reader = vcf::io::indexed_reader::Builder::default()
- .build_from_path(&args.constit_vcf)
- .context(format!("Failed to open: {}", args.constit_vcf))
- .unwrap();
- let header = vcf_reader.read_header().unwrap();
- for exon in c {
- let mut exon = exon.clone();
- let start = Position::try_from(exon.start as usize).unwrap();
- let end = Position::try_from(exon.end as usize).unwrap();
- let region = Region::new(exon.contig.clone(), start..=end);
- let query = vcf_reader.query(&header, ®ion).unwrap();
- for result in query {
- let record = Snp::from_vcf_record(&result.unwrap(), &header).unwrap();
- exon.snps.push(record);
- }
- let bam_start = exon.start - 1; // [0] <- bug ??
- let bam_end = exon.end - 1;
- bam_reader
- .fetch((&exon.contig, bam_start, bam_end))
- .unwrap();
- let mut len = 0;
- let mut depths = 0;
- let snps_pos: Vec<u32> = exon.snps.iter().map(|s| s.pos).collect();
- for p in bam_reader.pileup() {
- let pile = p.unwrap();
- let pos = pile.pos();
- let mut depth = 0;
- let mut bases = Vec::new();
- let populate_bases = snps_pos.contains(&pos);
- for a in pile.alignments() {
- if a.is_refskip() || a.is_del() {
- continue;
- }
- let r = a.record();
- match (r.strand(), exon.strand) {
- // dont ask me why it's inverted.....
- (
- bio_types::strand::ReqStrand::Reverse,
- noodles_gff::record::Strand::Forward,
- ) => depth += 1,
- (
- bio_types::strand::ReqStrand::Forward,
- noodles_gff::record::Strand::Reverse,
- ) => depth += 1,
- _ => (),
- }
- if populate_bases {
- match a.indel() {
- bam::pileup::Indel::Ins(_len) => bases.push(b'I'),
- bam::pileup::Indel::Del(_len) => bases.push(b'D'),
- _ => {
- if r.seq_len() > 0 {
- if let Some(b) = hts_base_at(&r, pos, false).unwrap() {
- bases.push(b);
- }
- } else if a.is_del() {
- bases.push(b'D');
- }
- }
- }
- }
- }
- // one before start and one after end
- if pos == bam_start - 1 {
- exon.depth_before_start = depth;
- }
- if pos == bam_start {
- exon.depth_start = depth;
- }
- if pos == bam_end {
- exon.depth_end = depth;
- }
- if pos == bam_end + 1 {
- exon.depth_after_end = depth;
- }
- // for mean depth
- if pos >= bam_start && pos <= bam_end {
- len += 1;
- depths += depth;
- }
- if populate_bases {
- exon.snps.iter_mut().filter(|s| s.pos == pos).for_each(|s| {
- s.n_alt_rna = Some(bases.iter().filter(|b| **b == s.alt).count() as u32);
- s.depth_rna = Some(bases.len() as u32);
- });
- }
- }
- exon.mean_depth = (depths as f64 / len as f64).round() as u32;
- contig_exons
- .entry(exon.contig.clone())
- .or_default()
- .push(exon);
- }
- info!("chunk {nc} finished.");
- });
- let bam_reader = bam::IndexedReader::from_path(&args.bam_path)
- .context(format!("Failed to open: {}", args.bam_path))
- .unwrap();
- let contigs: Vec<String> = bam_reader
- .header()
- .target_names()
- .iter()
- .map(|s| String::from_utf8(s.to_vec()).unwrap())
- .collect();
- let mut genes: HashMap<String, (String, u32, Vec<u32>)> = HashMap::new();
- let mut writer = File::create(out_exons).map(bgzf::MultithreadedWriter::new)?;
- contigs.iter().for_each(|c| {
- if let Some(exons) = contig_exons.get(c) {
- let mut exons = exons.clone();
- info!("Sorting {c}");
- exons.par_sort_by(|a, b| a.start.cmp(&b.start));
- info!("Writing {c}");
- exons.iter().for_each(|e| {
- writer.write_all(e.to_string().as_bytes()).unwrap();
- genes
- .entry(e.id.clone())
- .or_insert((e.id.clone(), e.start, Vec::new()))
- .2
- .push(e.mean_depth);
- writer.flush().unwrap();
- });
- }
- });
- writer.finish()?;
- let genes: Vec<_> = genes
- .values()
- .map(|(c, p, v)| {
- let sum: u32 = v.iter().copied().sum();
- (c.to_string(), *p, sum as f64 / v.len() as f64)
- })
- .collect();
- let mut writer = File::create(out_genes).map(bgzf::MultithreadedWriter::new)?;
- contigs.iter().for_each(|c| {
- let mut g: Vec<&(String, u32, f64)> =
- genes.iter().filter(|(contig, _, _)| c == contig).collect();
- info!("Sorting {c}");
- g.par_sort_by(|a, b| a.1.cmp(&b.1));
- info!("Writing {c}");
- g.iter().for_each(|e| {
- writer
- .write_all(
- [e.0.to_string(), e.1.to_string(), e.2.to_string()]
- .join("\t")
- .as_bytes(),
- )
- .unwrap();
- writer.flush().unwrap();
- });
- });
- writer.finish()?;
- info!("Process done in {:#?}", now.elapsed());
- Ok(())
- }
- impl Snp {
- pub fn from_vcf_record(
- value: &noodles_vcf::record::Record,
- header: &noodles_vcf::Header,
- ) -> anyhow::Result<Self> {
- let pos = value
- .variant_start()
- .context(format!("Can't parse variant start {:?}", value))?
- .context(format!("Can't parse variant start {:?}", value))?
- .get() as u32;
- let alt = *value
- .alternate_bases()
- .as_ref()
- .as_bytes()
- .to_vec()
- .first()
- .context(format!("Can't parse variant alt {:?}", value))?;
- let (dp, ad) = get_dp_ad(value, header)?;
- Ok(Snp {
- pos,
- alt,
- n_alt_mrd: ad[1],
- depth_mrd: dp,
- n_alt_rna: None,
- depth_rna: None,
- })
- }
- pub fn get_p(&self) -> anyhow::Result<f64> {
- let depth_rna = self.depth_rna.context("")?;
- let n_alt_rna = self.n_alt_rna.context("")?;
- // Homozygote SNP
- if self.n_alt_mrd == self.depth_mrd {
- if depth_rna > 6 && n_alt_rna == 0 {
- return Ok(0.0);
- } else if n_alt_rna == depth_rna {
- return Ok(1.0);
- }
- }
- chi_square_test_for_proportions(
- self.n_alt_mrd as f64,
- self.depth_mrd as f64,
- self.n_alt_rna.context("")? as f64,
- self.depth_rna.context("")? as f64,
- )
- }
- }
- pub fn get_dp_ad(
- record: &noodles_vcf::record::Record,
- header: &noodles_vcf::Header,
- ) -> anyhow::Result<(u32, Vec<u32>)> {
- Ok((record.samples().iter().next().and_then(|s| {
- s.iter(header).find_map(|g| {
- g.ok().and_then(|(k, v)| {
- if k == "DP" {
- v.and_then(|v| match v {
- noodles_vcf::variant::record::samples::series::Value::Integer(i) => Some(i as u32),
- _ => None
- })
- } else {
- None
- }
- })
- })
- }).context(format!("Error while parsing AD {:?}", record))?,
- record
- .samples()
- .iter()
- .next()
- .and_then(|s| {
- s.iter(header)
- .find_map(|g| {
- g.ok().and_then(|(k, v)| {
- if k == "AD" {
- v.and_then(|v| match v {
- noodles_vcf::variant::record::samples::series::Value::Array(noodles_vcf::variant::record::samples::series::value::Array::Integer(values)) => Some(values),
- _ => None,
- })
- } else {
- None
- }
- })
- })
- })
- .map(|values| {
- values
- .iter()
- .filter_map(|v| v.ok().and_then(|v| v.map(|i| i as u32)))
- .collect()
- })
- .context(format!("Error while parsing AD {:?}", record))?))
- }
- pub fn hts_base_at(
- record: &rust_htslib::bam::record::Record,
- at_pos: u32,
- with_next_ins: bool,
- ) -> anyhow::Result<Option<u8>> {
- use rust_htslib::bam::record::Cigar;
- let cigar = record.cigar();
- let seq = record.seq();
- let pos = cigar.pos() as u32;
- let mut read_i = 0u32;
- let at_pos = at_pos - 1;
- let mut ref_pos = pos;
- if ref_pos > at_pos {
- return Ok(None);
- }
- for (id, op) in cigar.iter().enumerate() {
- let (add_read, add_ref) = match *op {
- Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
- Cigar::Ins(len) => (len, 0),
- Cigar::Del(len) => (0, len),
- Cigar::RefSkip(len) => (0, len),
- Cigar::SoftClip(len) => (len, 0),
- Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
- };
- // If at the end of the op len and next op is Ins return I
- if with_next_ins && ref_pos + add_read == at_pos + 1 {
- if let Some(Cigar::Ins(_)) = cigar.get(id + 1) {
- return Ok(Some(b'I'));
- }
- }
- if ref_pos + add_ref > at_pos {
- // Handle deletions directly
- if let Cigar::Del(_) = *op {
- return Ok(Some(b'D'));
- } else if let Cigar::RefSkip(_) = op {
- return Ok(None);
- } else {
- let diff = at_pos - ref_pos;
- let p = read_i + diff;
- return Ok(Some(seq[p as usize]));
- }
- }
- read_i += add_read;
- ref_pos += add_ref;
- }
- Ok(None)
- }
- pub fn chi_square_test_impl(observed: &[f64], expected: &[f64]) -> anyhow::Result<f64> {
- if observed.len() != expected.len() {
- return Err(anyhow::anyhow!("Input vectors must have the same length"));
- }
- // Calculate the chi-squared statistic
- let chi_squared_statistic: f64 = observed
- .iter()
- .zip(expected.iter())
- .map(|(obs, exp)| ((obs - exp).abs() - 0.5).powi(2) / exp)
- .sum();
- // Degrees of freedom is the number of categories minus 1
- // let degrees_of_freedom = (observed.len() - 1) as f64;
- let degrees_of_freedom = 1.0;
- // Calculate p-value using chi-squared distribution
- let chi_squared_distribution = ChiSquared::new(degrees_of_freedom).unwrap();
- let p_value = 1.0 - chi_squared_distribution.cdf(chi_squared_statistic);
- // You can use the p-value to make decisions based on your significance level
- // For example, with a significance level of 0.05, if p_value < 0.05, reject the null hypothesis
- Ok(p_value)
- }
- /// 2-sample test for equality of proportions with continuity correction
- /// remerciements to chatGPT
- pub fn chi_square_test_for_proportions(
- success_a: f64,
- total_a: f64,
- success_b: f64,
- total_b: f64,
- ) -> anyhow::Result<f64> {
- let observed_counts = vec![
- success_a,
- total_a - success_a,
- success_b,
- total_b - success_b,
- ];
- let expected_counts = vec![
- total_a * (success_a + success_b) / (total_a + total_b),
- total_a * (total_a - success_a + total_b - success_b) / (total_a + total_b),
- total_b * (success_a + success_b) / (total_a + total_b),
- total_b * (total_a - success_a + total_b - success_b) / (total_a + total_b),
- ];
- chi_square_test_impl(&observed_counts, &expected_counts)
- }
|