| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- use std::{io::{BufReader, BufRead}, fs::File, collections::HashMap, cmp, sync::{Mutex}};
- use rust_htslib::{bam, bam::{Read, IndexedReader}};
- use clap::Parser;
- use rand::{thread_rng, seq::SliceRandom};
- use indicatif::{ProgressBar, ParallelProgressIterator};
- use std::io::Write;
- use merge_sequences::*;
- use desc_sequence::*;
- use rayon::{prelude::*, ThreadPoolBuilder};
- #[derive(Parser, Debug)]
- #[command(author, version, about, long_about = None)]
- struct Args {
- /// bam file path
- #[arg(short, long)]
- bam: String,
- /// positions tsv path (seqname, start, end)
- #[arg(short, long)]
- positions: String,
- /// output tsv path
- #[arg(short, long)]
- output: String,
- /// output tsv path
- #[arg(short, long)]
- genome: String,
- /// output tsv path
- #[arg(short, long)]
- threads: usize,
- }
- fn main() {
- let args = Args::parse();
- let input_file = File::open(args.positions).unwrap();
- let pb = ProgressBar::new(input_file.metadata().unwrap().len());
-
- let ii = &mut pb.wrap_read(input_file);
- let reader = BufReader::new(ii);
- let mut ranges: HashMap<usize, (String, i64, i64)> = HashMap::new();
- for (n, line) in reader.lines().enumerate() {
- if let Ok(line) = line {
- let mut split = line.split("\t");
-
- ranges.insert(
- n, (
- split.next().unwrap().to_string(),
- split.next().unwrap().parse().unwrap(),
- split.next().unwrap().parse().unwrap()
- )
- );
- }
- }
- let chunk_size = 100;
- let ranges: Vec<(String, i64, i64)> = ranges.into_iter().map(|(_, e)| e).collect();
-
- let mut file = File::create(args.output).unwrap();
- writeln!(file, "ALT\tn_reads\tdepth").unwrap();
- let mutex = Mutex::new(file);
- let n_chunks = ranges.len() / chunk_size;
- ThreadPoolBuilder::new().num_threads(args.threads).build_global().unwrap();
- ranges.par_chunks(chunk_size)
- .progress_count(n_chunks as u64)
- .for_each(|ranges_chunk| {
- let aligner = BwaAlign::init(&args.genome);
- let mut bam = bam::IndexedReader::from_path(&args.bam).unwrap();
- let mut res: Vec<(String, (i32, i32))> = Vec::new();
- for (seqname, start, end) in ranges_chunk {
- let r = select_reads(
- &mut bam,
- (seqname, *start, *end),
- &aligner
- );
- res.extend(r.iter().map(|(k, v)| (k.to_string(), *v)).collect::<Vec<(String, (i32, i32))>>());
- }
- let mut file = mutex.lock().unwrap();
- res.iter().for_each(|(alt, (n, d))| {
- writeln!(file, "{}\t{}\t{}", alt, n, d).unwrap();
- });
- });
- }
- fn select_reads(bam: &mut IndexedReader, range: (&str, i64, i64), aligner: &BwaAlign) -> HashMap<String, (i32, i32)> {
- let merge_dist = 5;
- let _ = bam.fetch(range);
- let mut depths: HashMap<u32, u32> = HashMap::new();
- for p in bam.pileup() {
- let pileup = p.unwrap();
- depths.insert(pileup.pos(), pileup.depth());
- }
- let _ = bam.fetch(range);
- let mut seqs = Vec::new();
- for read in bam.records() {
- if let Ok(read) = read {
- let cigar_str: desc_sequence::Cigar = Cigar::new_from_str(&read.cigar().to_string());
- let qname = String::from_utf8(read.qname().to_vec()).unwrap();
-
- for cigar_item in read.cigar().iter() {
- match cigar_item {
- bam::record::Cigar::Match(_) => (),
- bam::record::Cigar::Del(_) | bam::record::Cigar::Ins(_) => {
- let dnas = read.seq().as_bytes();
- if !dnas.contains(&b'N') { // conversion doesnt work with N TODO
- let (del, ins) = find_deletion_insertion_ranges(read.pos() as u32, cigar_str.clone(), read.is_reverse());
-
- del.into_iter().for_each(|(start, end)| {
- seqs.push((qname.to_string(), "del".to_string(), start, end, DNAString::new(read.seq().as_bytes())));
- });
- ins.into_iter().for_each(|(start, end)| {
- seqs.push((qname.to_string(), "ins".to_string(), start, end, DNAString::new(read.seq().as_bytes())));
- });
- }
- },
- bam::record::Cigar::RefSkip(_) => (),
- bam::record::Cigar::SoftClip(_) => (),
- bam::record::Cigar::HardClip(_) => (),
- bam::record::Cigar::Pad(_) => (),
- bam::record::Cigar::Equal(_) => (),
- bam::record::Cigar::Diff(_) => (),
- }
- }
- }
- }
- seqs.sort_by(|a, b| {
- a.2.cmp(&b.2)
- });
- let mut grouped: HashMap<i32, (u32, u32, Vec<String>, Vec<DNAString>)> = HashMap::new();
- let mut last_key = None;
- for seq in seqs {
- if let Some(lk) = last_key {
- let last = grouped.get_mut(&lk).unwrap();
- if last.1 + merge_dist >= seq.2 {
- last.1 = cmp::max(last.1, seq.2);
- last.2.push(seq.0);
- last.3.push(seq.4)
- } else {
- let nk = lk + 1;
- last_key = Some(nk);
- grouped.insert(nk, (seq.2, seq.3, vec![seq.0], vec![seq.4]));
-
- }
- } else {
- last_key = Some(0);
- grouped.insert(0, (seq.2, seq.3, vec![seq.0], vec![seq.4]));
- }
- }
-
- let mut results = Vec::new();
- for (_k, group) in grouped {
- if group.3.len() > 1 {
- let res = rec_merger(group.3, 15, 2, 1 as i8);
- res.iter().for_each(|s| {
- let r = aligner.get_ref_positions_indel(&s.as_string());
- results.push((group.2.clone(), s.as_string(), r));
- });
- } else if group.3.len() > 0 {
- let u = group.3.first().unwrap();
-
- let r = aligner.get_ref_positions_indel(&u.as_string());
- results.push((group.2.clone(), u.as_string(), r));
- }
- }
-
- let mut indels: HashMap<String, (i32, i32)> = HashMap::new();
- results
- .into_iter()
- .filter(|p| p.0.len() > 1 && p.2.len() >= 1)
- .for_each(|e| {
- e.2.iter().for_each(|em| {
- let k = format!("{}_{}:{}-{}", em.0, em.1, em.2, em.3);
- if let Some(val) = indels.get_mut(&k) {
- val.0 = val.0 + e.0.len() as i32;
- } else {
- let depth = if let Some(v) = depths.get(&(em.2 as u32)) {
- *v
- } else {
- 0
- };
-
- indels.insert(k, (e.0.len() as i32, depth as i32));
- }
- });
- });
- indels
- }
- pub fn rec_merger(mut seqs: Vec<DNAString>, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Vec<DNAString> {
- let mut n_retry = 0;
- let mut merged = Vec::new();
- let mut last_len = seqs.len();
- loop {
- if last_len <= 1 { break; }
- let tmp = seqs.get(1..).unwrap().to_vec();
- let re = merger(&tmp, seqs.get(0).unwrap(), kmer_len, max_mismatches, max_consecutive_mismatches);
- match re {
- Some(res) => {
- merged.push(res.0.clone());
- seqs = res.1;
- },
- None => {
- match n_retry {
- 0 => seqs.reverse(),
- _ => seqs.shuffle(&mut thread_rng())
- }
- },
- }
- n_retry += 1;
- if last_len == seqs.len() {
- break;
- } else {
- last_len = seqs.len();
- }
- }
- merged
- }
- pub fn merger(sequences: &Vec<DNAString>, to_merge: &DNAString, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Option<(DNAString, Vec<DNAString>)> {
- let mut to_merge = to_merge.clone();
- let mut merged = Vec::new();
- // try to merge to others seeds
- for (i, seed) in sequences.iter().enumerate() {
- if seed.0.len() >= kmer_len {
- let r = to_merge.merge(seed, kmer_len, max_mismatches, max_consecutive_mismatches);
- match r {
- Ok(_) => {
- merged.push(i);
- },
- Err(_err) => (),
- }
- }
- }
-
- if merged.len() > 0 {
- Some((
- to_merge,
- sequences.into_iter().enumerate().filter(|(i, _)| !merged.contains(i)).map(|(_, e)| e.clone()).collect::<Vec<DNAString>>()
- ))
- } else {
- None
- }
- }
|