main.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. use std::{io::{BufReader, BufRead}, fs::File, collections::HashMap, cmp, sync::{Mutex}};
  2. use rust_htslib::{bam, bam::{Read, IndexedReader}};
  3. use clap::Parser;
  4. use rand::{thread_rng, seq::SliceRandom};
  5. use indicatif::{ProgressBar, ParallelProgressIterator};
  6. use std::io::Write;
  7. use merge_sequences::*;
  8. use desc_sequence::*;
  9. use rayon::{prelude::*, ThreadPoolBuilder};
  10. #[derive(Parser, Debug)]
  11. #[command(author, version, about, long_about = None)]
  12. struct Args {
  13. /// bam file path
  14. #[arg(short, long)]
  15. bam: String,
  16. /// positions tsv path (seqname, start, end)
  17. #[arg(short, long)]
  18. positions: String,
  19. /// output tsv path
  20. #[arg(short, long)]
  21. output: String,
  22. /// output tsv path
  23. #[arg(short, long)]
  24. genome: String,
  25. /// output tsv path
  26. #[arg(short, long)]
  27. threads: usize,
  28. }
  29. fn main() {
  30. let args = Args::parse();
  31. let input_file = File::open(args.positions).unwrap();
  32. let pb = ProgressBar::new(input_file.metadata().unwrap().len());
  33. let ii = &mut pb.wrap_read(input_file);
  34. let reader = BufReader::new(ii);
  35. let mut ranges: HashMap<usize, (String, i64, i64)> = HashMap::new();
  36. for (n, line) in reader.lines().enumerate() {
  37. if let Ok(line) = line {
  38. let mut split = line.split("\t");
  39. ranges.insert(
  40. n, (
  41. split.next().unwrap().to_string(),
  42. split.next().unwrap().parse().unwrap(),
  43. split.next().unwrap().parse().unwrap()
  44. )
  45. );
  46. }
  47. }
  48. let chunk_size = 100;
  49. let ranges: Vec<(String, i64, i64)> = ranges.into_iter().map(|(_, e)| e).collect();
  50. let mut file = File::create(args.output).unwrap();
  51. writeln!(file, "ALT\tn_reads\tdepth").unwrap();
  52. let mutex = Mutex::new(file);
  53. let n_chunks = ranges.len() / chunk_size;
  54. ThreadPoolBuilder::new().num_threads(args.threads).build_global().unwrap();
  55. ranges.par_chunks(chunk_size)
  56. .progress_count(n_chunks as u64)
  57. .for_each(|ranges_chunk| {
  58. let aligner = BwaAlign::init(&args.genome);
  59. let mut bam = bam::IndexedReader::from_path(&args.bam).unwrap();
  60. let mut res: Vec<(String, (i32, i32))> = Vec::new();
  61. for (seqname, start, end) in ranges_chunk {
  62. let r = select_reads(
  63. &mut bam,
  64. (seqname, *start, *end),
  65. &aligner
  66. );
  67. res.extend(r.iter().map(|(k, v)| (k.to_string(), *v)).collect::<Vec<(String, (i32, i32))>>());
  68. }
  69. let mut file = mutex.lock().unwrap();
  70. res.iter().for_each(|(alt, (n, d))| {
  71. writeln!(file, "{}\t{}\t{}", alt, n, d).unwrap();
  72. });
  73. });
  74. }
  75. fn select_reads(bam: &mut IndexedReader, range: (&str, i64, i64), aligner: &BwaAlign) -> HashMap<String, (i32, i32)> {
  76. let merge_dist = 5;
  77. let _ = bam.fetch(range);
  78. let mut depths: HashMap<u32, u32> = HashMap::new();
  79. for p in bam.pileup() {
  80. let pileup = p.unwrap();
  81. depths.insert(pileup.pos(), pileup.depth());
  82. }
  83. let _ = bam.fetch(range);
  84. let mut seqs = Vec::new();
  85. for read in bam.records() {
  86. if let Ok(read) = read {
  87. let cigar_str: desc_sequence::Cigar = Cigar::new_from_str(&read.cigar().to_string());
  88. let qname = String::from_utf8(read.qname().to_vec()).unwrap();
  89. for cigar_item in read.cigar().iter() {
  90. match cigar_item {
  91. bam::record::Cigar::Match(_) => (),
  92. bam::record::Cigar::Del(_) | bam::record::Cigar::Ins(_) => {
  93. let dnas = read.seq().as_bytes();
  94. if !dnas.contains(&b'N') { // conversion doesnt work with N TODO
  95. let (del, ins) = find_deletion_insertion_ranges(read.pos() as u32, cigar_str.clone(), read.is_reverse());
  96. del.into_iter().for_each(|(start, end)| {
  97. seqs.push((qname.to_string(), "del".to_string(), start, end, DNAString::new(read.seq().as_bytes())));
  98. });
  99. ins.into_iter().for_each(|(start, end)| {
  100. seqs.push((qname.to_string(), "ins".to_string(), start, end, DNAString::new(read.seq().as_bytes())));
  101. });
  102. }
  103. },
  104. bam::record::Cigar::RefSkip(_) => (),
  105. bam::record::Cigar::SoftClip(_) => (),
  106. bam::record::Cigar::HardClip(_) => (),
  107. bam::record::Cigar::Pad(_) => (),
  108. bam::record::Cigar::Equal(_) => (),
  109. bam::record::Cigar::Diff(_) => (),
  110. }
  111. }
  112. }
  113. }
  114. seqs.sort_by(|a, b| {
  115. a.2.cmp(&b.2)
  116. });
  117. let mut grouped: HashMap<i32, (u32, u32, Vec<String>, Vec<DNAString>)> = HashMap::new();
  118. let mut last_key = None;
  119. for seq in seqs {
  120. if let Some(lk) = last_key {
  121. let last = grouped.get_mut(&lk).unwrap();
  122. if last.1 + merge_dist >= seq.2 {
  123. last.1 = cmp::max(last.1, seq.2);
  124. last.2.push(seq.0);
  125. last.3.push(seq.4)
  126. } else {
  127. let nk = lk + 1;
  128. last_key = Some(nk);
  129. grouped.insert(nk, (seq.2, seq.3, vec![seq.0], vec![seq.4]));
  130. }
  131. } else {
  132. last_key = Some(0);
  133. grouped.insert(0, (seq.2, seq.3, vec![seq.0], vec![seq.4]));
  134. }
  135. }
  136. let mut results = Vec::new();
  137. for (_k, group) in grouped {
  138. if group.3.len() > 1 {
  139. let res = rec_merger(group.3, 15, 2, 1 as i8);
  140. res.iter().for_each(|s| {
  141. let r = aligner.get_ref_positions_indel(&s.as_string());
  142. results.push((group.2.clone(), s.as_string(), r));
  143. });
  144. } else if group.3.len() > 0 {
  145. let u = group.3.first().unwrap();
  146. let r = aligner.get_ref_positions_indel(&u.as_string());
  147. results.push((group.2.clone(), u.as_string(), r));
  148. }
  149. }
  150. let mut indels: HashMap<String, (i32, i32)> = HashMap::new();
  151. results
  152. .into_iter()
  153. .filter(|p| p.0.len() > 1 && p.2.len() >= 1)
  154. .for_each(|e| {
  155. e.2.iter().for_each(|em| {
  156. let k = format!("{}_{}:{}-{}", em.0, em.1, em.2, em.3);
  157. if let Some(val) = indels.get_mut(&k) {
  158. val.0 = val.0 + e.0.len() as i32;
  159. } else {
  160. let depth = if let Some(v) = depths.get(&(em.2 as u32)) {
  161. *v
  162. } else {
  163. 0
  164. };
  165. indels.insert(k, (e.0.len() as i32, depth as i32));
  166. }
  167. });
  168. });
  169. indels
  170. }
  171. pub fn rec_merger(mut seqs: Vec<DNAString>, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Vec<DNAString> {
  172. let mut n_retry = 0;
  173. let mut merged = Vec::new();
  174. let mut last_len = seqs.len();
  175. loop {
  176. if last_len <= 1 { break; }
  177. let tmp = seqs.get(1..).unwrap().to_vec();
  178. let re = merger(&tmp, seqs.get(0).unwrap(), kmer_len, max_mismatches, max_consecutive_mismatches);
  179. match re {
  180. Some(res) => {
  181. merged.push(res.0.clone());
  182. seqs = res.1;
  183. },
  184. None => {
  185. match n_retry {
  186. 0 => seqs.reverse(),
  187. _ => seqs.shuffle(&mut thread_rng())
  188. }
  189. },
  190. }
  191. n_retry += 1;
  192. if last_len == seqs.len() {
  193. break;
  194. } else {
  195. last_len = seqs.len();
  196. }
  197. }
  198. merged
  199. }
  200. pub fn merger(sequences: &Vec<DNAString>, to_merge: &DNAString, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Option<(DNAString, Vec<DNAString>)> {
  201. let mut to_merge = to_merge.clone();
  202. let mut merged = Vec::new();
  203. // try to merge to others seeds
  204. for (i, seed) in sequences.iter().enumerate() {
  205. if seed.0.len() >= kmer_len {
  206. let r = to_merge.merge(seed, kmer_len, max_mismatches, max_consecutive_mismatches);
  207. match r {
  208. Ok(_) => {
  209. merged.push(i);
  210. },
  211. Err(_err) => (),
  212. }
  213. }
  214. }
  215. if merged.len() > 0 {
  216. Some((
  217. to_merge,
  218. sequences.into_iter().enumerate().filter(|(i, _)| !merged.contains(i)).map(|(_, e)| e.clone()).collect::<Vec<DNAString>>()
  219. ))
  220. } else {
  221. None
  222. }
  223. }