main.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. use anyhow::Context;
  2. use dashmap::DashMap;
  3. use noodles_bgzf as bgzf;
  4. use noodles_core::{Position, Region};
  5. use noodles_gff as gff;
  6. use noodles_vcf::{self as vcf};
  7. use rust_htslib::{bam, bam::Read};
  8. use clap::Parser;
  9. use log::info;
  10. use statrs::distribution::{ChiSquared, ContinuousCDF};
  11. use std::{
  12. collections::HashMap,
  13. fmt,
  14. fs::File,
  15. io::{BufReader, Write},
  16. time::Instant,
  17. };
  18. use rayon::prelude::*;
  19. #[derive(Parser, Debug)]
  20. #[command(author, version, about, long_about = None)]
  21. struct Args {
  22. #[arg(short = 'b', long)]
  23. bam_path: String,
  24. #[arg(short = 'v', long)]
  25. constit_vcf: String,
  26. #[arg(short, long)]
  27. gff_path: String,
  28. #[arg(short, long)]
  29. out_prefix: String,
  30. }
  31. #[derive(Debug, Default, Clone)]
  32. struct Exon {
  33. id: String,
  34. contig: String,
  35. start: u32, // inclusive 1-based inclusive
  36. end: u32,
  37. strand: noodles_gff::record::Strand,
  38. depth_start: u32,
  39. depth_before_start: u32,
  40. depth_end: u32,
  41. depth_after_end: u32,
  42. mean_depth: u32,
  43. snps: Vec<Snp>,
  44. }
  45. impl fmt::Display for Exon {
  46. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  47. let n_p_sign = self
  48. .snps
  49. .iter()
  50. .filter_map(|s| s.get_p().ok())
  51. .filter(|p| p <= &0.01)
  52. .count();
  53. writeln!(
  54. f,
  55. "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
  56. self.contig,
  57. self.start,
  58. self.end,
  59. self.mean_depth,
  60. n_p_sign,
  61. self.depth_before_start,
  62. self.depth_start,
  63. self.depth_end,
  64. self.depth_after_end,
  65. self.id
  66. )
  67. }
  68. }
  69. // impl FromStr for Exon {
  70. // type Err = anyhow::Error;
  71. //
  72. // fn from_str(s: &str) -> anyhow::Result<Self> {
  73. // let parts: Vec<&str> = s.split('-').collect();
  74. // if parts.len() != 2 {
  75. // return Err("Invalid format".parse().unwrap());
  76. // }
  77. //
  78. // let start = parts[0].parse()?;
  79. // let end = parts[1].parse()?;
  80. //
  81. // Ok(Exon {})
  82. // }
  83. // }
  84. #[derive(Debug, Clone)]
  85. struct Snp {
  86. pos: u32,
  87. alt: u8,
  88. n_alt_mrd: u32,
  89. depth_mrd: u32,
  90. n_alt_rna: Option<u32>,
  91. depth_rna: Option<u32>,
  92. }
  93. fn main() -> anyhow::Result<()> {
  94. let now = Instant::now();
  95. env_logger::init();
  96. let args = Args::parse();
  97. // let mut vcf_reader = vcf::io::indexed_reader::Builder::default()
  98. // .build_from_path(&args.constit_vcf)
  99. // .context(format!("Failed to open: {}", args.constit_vcf))?;
  100. // let header = vcf_reader.read_header()?;
  101. // let mut gff_reader = File::open(&args.gff_path)
  102. // .map(BufReader::new)
  103. // .context(format!("Failed to open: {}", args.gff_path))?;
  104. //
  105. // let mut buffer = [0u8; 1];
  106. // let mut line_buffer = Vec::new();
  107. //
  108. // let mut exons: Vec<Exon> = Vec::new();
  109. // loop {
  110. // match std::io::Read::read_exact(&mut gff_reader, &mut buffer) {
  111. // Ok(_) => {
  112. // // Process each byte here
  113. // println!("Byte: {}", buffer[0]);
  114. // match buffer[0] {
  115. // b'\n' => {
  116. // // new line
  117. // let line_str = str::from_utf8(&line_buffer)?;
  118. // line_buffer.clear();
  119. // if line_str.contains("\texon\t") {
  120. // exons.push(line_str.parse()?);
  121. // }
  122. // }
  123. // _ => {
  124. // line_buffer.push(buffer[0]);
  125. // }
  126. // }
  127. // }
  128. // Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
  129. // // Reached end of file
  130. // break;
  131. // }
  132. // Err(e) => anyhow::bail!(e),
  133. // }
  134. // }
  135. let mut gff_reader = File::open(&args.gff_path)
  136. .map(BufReader::new)
  137. .map(gff::io::Reader::new)
  138. .context(format!("Failed to open: {}", args.gff_path))?;
  139. let out_exons = format!("{}_exons.gz", args.out_prefix);
  140. let out_genes = format!("{}_genes.gz", args.out_prefix);
  141. let mut n_exons = 0;
  142. let mut exons: Vec<Exon> = Vec::new();
  143. for result in gff_reader.records() {
  144. let record = result?;
  145. if record.ty() == "exon" {
  146. let mut exon = Exon::default();
  147. let ids: Vec<String> = record
  148. .attributes()
  149. .iter()
  150. .filter(|(k, _)| *k == "Parent")
  151. .filter_map(|(_, v)| {
  152. if let noodles_gff::record::attributes::field::value::Value::String(s) = v {
  153. Some(s.to_string())
  154. } else {
  155. None
  156. }
  157. })
  158. .collect();
  159. exon.id = ids
  160. .first()
  161. .map(|s| s.as_str())
  162. .unwrap_or("No_id")
  163. .to_string();
  164. if !exon.id.starts_with("NM") {
  165. continue;
  166. }
  167. let contig = record.reference_sequence_name();
  168. exon.contig = contig.to_string();
  169. let gff_start = record.start(); // [1]
  170. exon.start = gff_start.get() as u32;
  171. let gff_end = record.end();
  172. exon.end = gff_end.get() as u32;
  173. // let region = Region::new(contig, gff_start..=gff_end);
  174. let gff_strand = record.strand();
  175. exon.strand = gff_strand;
  176. // let query = vcf_reader.query(&header, &region)?;
  177. // let mut snps = Vec::new();
  178. // for result in query {
  179. // let record = Snp::from_vcf_record(&result?, &header)?;
  180. // snps.push(record);
  181. // }
  182. // exon.snps = snps;
  183. exons.push(exon);
  184. n_exons += 1;
  185. if n_exons % 10_000 == 0 {
  186. info!("{n_exons} exons parsed.");
  187. }
  188. }
  189. }
  190. info!(
  191. "{} exons to look in bam file in {} chunks...",
  192. exons.len(),
  193. exons.len() / 1_000
  194. );
  195. let contig_exons: DashMap<String, Vec<Exon>> = DashMap::new();
  196. exons.par_chunks(1_000).enumerate().for_each(|(nc, c)| {
  197. let mut bam_reader = bam::IndexedReader::from_path(&args.bam_path)
  198. .context(format!("Failed to open: {}", args.bam_path))
  199. .unwrap();
  200. let mut vcf_reader = vcf::io::indexed_reader::Builder::default()
  201. .build_from_path(&args.constit_vcf)
  202. .context(format!("Failed to open: {}", args.constit_vcf))
  203. .unwrap();
  204. let header = vcf_reader.read_header().unwrap();
  205. for exon in c {
  206. let mut exon = exon.clone();
  207. let start = Position::try_from(exon.start as usize).unwrap();
  208. let end = Position::try_from(exon.end as usize).unwrap();
  209. let region = Region::new(exon.contig.clone(), start..=end);
  210. let query = vcf_reader.query(&header, &region).unwrap();
  211. for result in query {
  212. let record = Snp::from_vcf_record(&result.unwrap(), &header).unwrap();
  213. exon.snps.push(record);
  214. }
  215. let bam_start = exon.start - 1; // [0] <- bug ??
  216. let bam_end = exon.end - 1;
  217. bam_reader
  218. .fetch((&exon.contig, bam_start, bam_end))
  219. .unwrap();
  220. let mut len = 0;
  221. let mut depths = 0;
  222. let snps_pos: Vec<u32> = exon.snps.iter().map(|s| s.pos).collect();
  223. for p in bam_reader.pileup() {
  224. let pile = p.unwrap();
  225. let pos = pile.pos();
  226. let mut depth = 0;
  227. let mut bases = Vec::new();
  228. let populate_bases = snps_pos.contains(&pos);
  229. for a in pile.alignments() {
  230. if a.is_refskip() || a.is_del() {
  231. continue;
  232. }
  233. let r = a.record();
  234. match (r.strand(), exon.strand) {
  235. // dont ask me why it's inverted.....
  236. (
  237. bio_types::strand::ReqStrand::Reverse,
  238. noodles_gff::record::Strand::Forward,
  239. ) => depth += 1,
  240. (
  241. bio_types::strand::ReqStrand::Forward,
  242. noodles_gff::record::Strand::Reverse,
  243. ) => depth += 1,
  244. _ => (),
  245. }
  246. if populate_bases {
  247. match a.indel() {
  248. bam::pileup::Indel::Ins(_len) => bases.push(b'I'),
  249. bam::pileup::Indel::Del(_len) => bases.push(b'D'),
  250. _ => {
  251. if r.seq_len() > 0 {
  252. if let Some(b) = hts_base_at(&r, pos, false).unwrap() {
  253. bases.push(b);
  254. }
  255. } else if a.is_del() {
  256. bases.push(b'D');
  257. }
  258. }
  259. }
  260. }
  261. }
  262. // one before start and one after end
  263. if pos == bam_start - 1 {
  264. exon.depth_before_start = depth;
  265. }
  266. if pos == bam_start {
  267. exon.depth_start = depth;
  268. }
  269. if pos == bam_end {
  270. exon.depth_end = depth;
  271. }
  272. if pos == bam_end + 1 {
  273. exon.depth_after_end = depth;
  274. }
  275. // for mean depth
  276. if pos >= bam_start && pos <= bam_end {
  277. len += 1;
  278. depths += depth;
  279. }
  280. if populate_bases {
  281. exon.snps.iter_mut().filter(|s| s.pos == pos).for_each(|s| {
  282. s.n_alt_rna = Some(bases.iter().filter(|b| **b == s.alt).count() as u32);
  283. s.depth_rna = Some(bases.len() as u32);
  284. });
  285. }
  286. }
  287. exon.mean_depth = (depths as f64 / len as f64).round() as u32;
  288. contig_exons
  289. .entry(exon.contig.clone())
  290. .or_default()
  291. .push(exon);
  292. }
  293. info!("chunk {nc} finished.");
  294. });
  295. let bam_reader = bam::IndexedReader::from_path(&args.bam_path)
  296. .context(format!("Failed to open: {}", args.bam_path))
  297. .unwrap();
  298. let contigs: Vec<String> = bam_reader
  299. .header()
  300. .target_names()
  301. .iter()
  302. .map(|s| String::from_utf8(s.to_vec()).unwrap())
  303. .collect();
  304. let mut genes: HashMap<String, (String, u32, Vec<u32>)> = HashMap::new();
  305. let mut writer = File::create(out_exons).map(bgzf::MultithreadedWriter::new)?;
  306. contigs.iter().for_each(|c| {
  307. if let Some(exons) = contig_exons.get(c) {
  308. let mut exons = exons.clone();
  309. info!("Sorting {c}");
  310. exons.par_sort_by(|a, b| a.start.cmp(&b.start));
  311. info!("Writing {c}");
  312. exons.iter().for_each(|e| {
  313. writer.write_all(e.to_string().as_bytes()).unwrap();
  314. genes
  315. .entry(e.id.clone())
  316. .or_insert((e.id.clone(), e.start, Vec::new()))
  317. .2
  318. .push(e.mean_depth);
  319. writer.flush().unwrap();
  320. });
  321. }
  322. });
  323. writer.finish()?;
  324. let genes: Vec<_> = genes
  325. .values()
  326. .map(|(c, p, v)| {
  327. let sum: u32 = v.iter().copied().sum();
  328. (c.to_string(), *p, sum as f64 / v.len() as f64)
  329. })
  330. .collect();
  331. let mut writer = File::create(out_genes).map(bgzf::MultithreadedWriter::new)?;
  332. contigs.iter().for_each(|c| {
  333. let mut g: Vec<&(String, u32, f64)> =
  334. genes.iter().filter(|(contig, _, _)| c == contig).collect();
  335. info!("Sorting {c}");
  336. g.par_sort_by(|a, b| a.1.cmp(&b.1));
  337. info!("Writing {c}");
  338. g.iter().for_each(|e| {
  339. writer
  340. .write_all(
  341. [e.0.to_string(), e.1.to_string(), e.2.to_string()]
  342. .join("\t")
  343. .as_bytes(),
  344. )
  345. .unwrap();
  346. writer.flush().unwrap();
  347. });
  348. });
  349. writer.finish()?;
  350. info!("Process done in {:#?}", now.elapsed());
  351. Ok(())
  352. }
  353. impl Snp {
  354. pub fn from_vcf_record(
  355. value: &noodles_vcf::record::Record,
  356. header: &noodles_vcf::Header,
  357. ) -> anyhow::Result<Self> {
  358. let pos = value
  359. .variant_start()
  360. .context(format!("Can't parse variant start {:?}", value))?
  361. .context(format!("Can't parse variant start {:?}", value))?
  362. .get() as u32;
  363. let alt = *value
  364. .alternate_bases()
  365. .as_ref()
  366. .as_bytes()
  367. .to_vec()
  368. .first()
  369. .context(format!("Can't parse variant alt {:?}", value))?;
  370. let (dp, ad) = get_dp_ad(value, header)?;
  371. Ok(Snp {
  372. pos,
  373. alt,
  374. n_alt_mrd: ad[1],
  375. depth_mrd: dp,
  376. n_alt_rna: None,
  377. depth_rna: None,
  378. })
  379. }
  380. pub fn get_p(&self) -> anyhow::Result<f64> {
  381. let depth_rna = self.depth_rna.context("")?;
  382. let n_alt_rna = self.n_alt_rna.context("")?;
  383. // Homozygote SNP
  384. if self.n_alt_mrd == self.depth_mrd {
  385. if depth_rna > 6 && n_alt_rna == 0 {
  386. return Ok(0.0);
  387. } else if n_alt_rna == depth_rna {
  388. return Ok(1.0);
  389. }
  390. }
  391. chi_square_test_for_proportions(
  392. self.n_alt_mrd as f64,
  393. self.depth_mrd as f64,
  394. self.n_alt_rna.context("")? as f64,
  395. self.depth_rna.context("")? as f64,
  396. )
  397. }
  398. }
  399. pub fn get_dp_ad(
  400. record: &noodles_vcf::record::Record,
  401. header: &noodles_vcf::Header,
  402. ) -> anyhow::Result<(u32, Vec<u32>)> {
  403. Ok((record.samples().iter().next().and_then(|s| {
  404. s.iter(header).find_map(|g| {
  405. g.ok().and_then(|(k, v)| {
  406. if k == "DP" {
  407. v.and_then(|v| match v {
  408. noodles_vcf::variant::record::samples::series::Value::Integer(i) => Some(i as u32),
  409. _ => None
  410. })
  411. } else {
  412. None
  413. }
  414. })
  415. })
  416. }).context(format!("Error while parsing AD {:?}", record))?,
  417. record
  418. .samples()
  419. .iter()
  420. .next()
  421. .and_then(|s| {
  422. s.iter(header)
  423. .find_map(|g| {
  424. g.ok().and_then(|(k, v)| {
  425. if k == "AD" {
  426. v.and_then(|v| match v {
  427. noodles_vcf::variant::record::samples::series::Value::Array(noodles_vcf::variant::record::samples::series::value::Array::Integer(values)) => Some(values),
  428. _ => None,
  429. })
  430. } else {
  431. None
  432. }
  433. })
  434. })
  435. })
  436. .map(|values| {
  437. values
  438. .iter()
  439. .filter_map(|v| v.ok().and_then(|v| v.map(|i| i as u32)))
  440. .collect()
  441. })
  442. .context(format!("Error while parsing AD {:?}", record))?))
  443. }
  444. pub fn hts_base_at(
  445. record: &rust_htslib::bam::record::Record,
  446. at_pos: u32,
  447. with_next_ins: bool,
  448. ) -> anyhow::Result<Option<u8>> {
  449. use rust_htslib::bam::record::Cigar;
  450. let cigar = record.cigar();
  451. let seq = record.seq();
  452. let pos = cigar.pos() as u32;
  453. let mut read_i = 0u32;
  454. let at_pos = at_pos - 1;
  455. let mut ref_pos = pos;
  456. if ref_pos > at_pos {
  457. return Ok(None);
  458. }
  459. for (id, op) in cigar.iter().enumerate() {
  460. let (add_read, add_ref) = match *op {
  461. Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => (len, len),
  462. Cigar::Ins(len) => (len, 0),
  463. Cigar::Del(len) => (0, len),
  464. Cigar::RefSkip(len) => (0, len),
  465. Cigar::SoftClip(len) => (len, 0),
  466. Cigar::HardClip(_) | Cigar::Pad(_) => (0, 0),
  467. };
  468. // If at the end of the op len and next op is Ins return I
  469. if with_next_ins && ref_pos + add_read == at_pos + 1 {
  470. if let Some(Cigar::Ins(_)) = cigar.get(id + 1) {
  471. return Ok(Some(b'I'));
  472. }
  473. }
  474. if ref_pos + add_ref > at_pos {
  475. // Handle deletions directly
  476. if let Cigar::Del(_) = *op {
  477. return Ok(Some(b'D'));
  478. } else if let Cigar::RefSkip(_) = op {
  479. return Ok(None);
  480. } else {
  481. let diff = at_pos - ref_pos;
  482. let p = read_i + diff;
  483. return Ok(Some(seq[p as usize]));
  484. }
  485. }
  486. read_i += add_read;
  487. ref_pos += add_ref;
  488. }
  489. Ok(None)
  490. }
  491. pub fn chi_square_test_impl(observed: &[f64], expected: &[f64]) -> anyhow::Result<f64> {
  492. if observed.len() != expected.len() {
  493. return Err(anyhow::anyhow!("Input vectors must have the same length"));
  494. }
  495. // Calculate the chi-squared statistic
  496. let chi_squared_statistic: f64 = observed
  497. .iter()
  498. .zip(expected.iter())
  499. .map(|(obs, exp)| ((obs - exp).abs() - 0.5).powi(2) / exp)
  500. .sum();
  501. // Degrees of freedom is the number of categories minus 1
  502. // let degrees_of_freedom = (observed.len() - 1) as f64;
  503. let degrees_of_freedom = 1.0;
  504. // Calculate p-value using chi-squared distribution
  505. let chi_squared_distribution = ChiSquared::new(degrees_of_freedom).unwrap();
  506. let p_value = 1.0 - chi_squared_distribution.cdf(chi_squared_statistic);
  507. // You can use the p-value to make decisions based on your significance level
  508. // For example, with a significance level of 0.05, if p_value < 0.05, reject the null hypothesis
  509. Ok(p_value)
  510. }
  511. /// 2-sample test for equality of proportions with continuity correction
  512. /// remerciements to chatGPT
  513. pub fn chi_square_test_for_proportions(
  514. success_a: f64,
  515. total_a: f64,
  516. success_b: f64,
  517. total_b: f64,
  518. ) -> anyhow::Result<f64> {
  519. let observed_counts = vec![
  520. success_a,
  521. total_a - success_a,
  522. success_b,
  523. total_b - success_b,
  524. ];
  525. let expected_counts = vec![
  526. total_a * (success_a + success_b) / (total_a + total_b),
  527. total_a * (total_a - success_a + total_b - success_b) / (total_a + total_b),
  528. total_b * (success_a + success_b) / (total_a + total_b),
  529. total_b * (total_a - success_a + total_b - success_b) / (total_a + total_b),
  530. ];
  531. chi_square_test_impl(&observed_counts, &expected_counts)
  532. }