use std::{io::{BufReader, BufRead}, fs::File, collections::HashMap, cmp, sync::{Mutex}}; use rust_htslib::{bam, bam::{Read, IndexedReader}}; use clap::Parser; use rand::{thread_rng, seq::SliceRandom}; use indicatif::{ProgressBar, ParallelProgressIterator}; use std::io::Write; use merge_sequences::*; use desc_sequence::*; use rayon::{prelude::*, ThreadPoolBuilder}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// bam file path #[arg(short, long)] bam: String, /// positions tsv path (seqname, start, end) #[arg(short, long)] positions: String, /// output tsv path #[arg(short, long)] output: String, /// output tsv path #[arg(short, long)] genome: String, /// output tsv path #[arg(short, long)] threads: usize, } fn main() { let args = Args::parse(); let input_file = File::open(args.positions).unwrap(); let pb = ProgressBar::new(input_file.metadata().unwrap().len()); let ii = &mut pb.wrap_read(input_file); let reader = BufReader::new(ii); let mut ranges: HashMap = HashMap::new(); for (n, line) in reader.lines().enumerate() { if let Ok(line) = line { let mut split = line.split("\t"); ranges.insert( n, ( split.next().unwrap().to_string(), split.next().unwrap().parse().unwrap(), split.next().unwrap().parse().unwrap() ) ); } } let chunk_size = 100; let ranges: Vec<(String, i64, i64)> = ranges.into_iter().map(|(_, e)| e).collect(); let mut file = File::create(args.output).unwrap(); writeln!(file, "ALT\tn_reads\tdepth").unwrap(); let mutex = Mutex::new(file); let n_chunks = ranges.len() / chunk_size; ThreadPoolBuilder::new().num_threads(args.threads).build_global().unwrap(); ranges.par_chunks(chunk_size) .progress_count(n_chunks as u64) .for_each(|ranges_chunk| { let aligner = BwaAlign::init(&args.genome); let mut bam = bam::IndexedReader::from_path(&args.bam).unwrap(); let mut res: Vec<(String, (i32, i32))> = Vec::new(); for (seqname, start, end) in ranges_chunk { let r = select_reads( &mut bam, (seqname, *start, *end), &aligner ); res.extend(r.iter().map(|(k, v)| (k.to_string(), *v)).collect::>()); } let mut file = mutex.lock().unwrap(); res.iter().for_each(|(alt, (n, d))| { writeln!(file, "{}\t{}\t{}", alt, n, d).unwrap(); }); }); } fn select_reads(bam: &mut IndexedReader, range: (&str, i64, i64), aligner: &BwaAlign) -> HashMap { let merge_dist = 5; let _ = bam.fetch(range); let mut depths: HashMap = HashMap::new(); for p in bam.pileup() { let pileup = p.unwrap(); depths.insert(pileup.pos(), pileup.depth()); } let _ = bam.fetch(range); let mut seqs = Vec::new(); for read in bam.records() { if let Ok(read) = read { let cigar_str: desc_sequence::Cigar = Cigar::new_from_str(&read.cigar().to_string()); let qname = String::from_utf8(read.qname().to_vec()).unwrap(); for cigar_item in read.cigar().iter() { match cigar_item { bam::record::Cigar::Match(_) => (), bam::record::Cigar::Del(_) | bam::record::Cigar::Ins(_) => { let dnas = read.seq().as_bytes(); if !dnas.contains(&b'N') { // conversion doesnt work with N TODO let (del, ins) = find_deletion_insertion_ranges(read.pos() as u32, cigar_str.clone(), read.is_reverse()); del.into_iter().for_each(|(start, end)| { seqs.push((qname.to_string(), "del".to_string(), start, end, DNAString::new(read.seq().as_bytes()))); }); ins.into_iter().for_each(|(start, end)| { seqs.push((qname.to_string(), "ins".to_string(), start, end, DNAString::new(read.seq().as_bytes()))); }); } }, bam::record::Cigar::RefSkip(_) => (), bam::record::Cigar::SoftClip(_) => (), bam::record::Cigar::HardClip(_) => (), bam::record::Cigar::Pad(_) => (), bam::record::Cigar::Equal(_) => (), bam::record::Cigar::Diff(_) => (), } } } } seqs.sort_by(|a, b| { a.2.cmp(&b.2) }); let mut grouped: HashMap, Vec)> = HashMap::new(); let mut last_key = None; for seq in seqs { if let Some(lk) = last_key { let last = grouped.get_mut(&lk).unwrap(); if last.1 + merge_dist >= seq.2 { last.1 = cmp::max(last.1, seq.2); last.2.push(seq.0); last.3.push(seq.4) } else { let nk = lk + 1; last_key = Some(nk); grouped.insert(nk, (seq.2, seq.3, vec![seq.0], vec![seq.4])); } } else { last_key = Some(0); grouped.insert(0, (seq.2, seq.3, vec![seq.0], vec![seq.4])); } } let mut results = Vec::new(); for (_k, group) in grouped { if group.3.len() > 1 { let res = rec_merger(group.3, 15, 2, 1 as i8); res.iter().for_each(|s| { let r = aligner.get_ref_positions_indel(&s.as_string()); results.push((group.2.clone(), s.as_string(), r)); }); } else if group.3.len() > 0 { let u = group.3.first().unwrap(); let r = aligner.get_ref_positions_indel(&u.as_string()); results.push((group.2.clone(), u.as_string(), r)); } } let mut indels: HashMap = HashMap::new(); results .into_iter() .filter(|p| p.0.len() > 1 && p.2.len() >= 1) .for_each(|e| { e.2.iter().for_each(|em| { let k = format!("{}_{}:{}-{}", em.0, em.1, em.2, em.3); if let Some(val) = indels.get_mut(&k) { val.0 = val.0 + e.0.len() as i32; } else { let depth = if let Some(v) = depths.get(&(em.2 as u32)) { *v } else { 0 }; indels.insert(k, (e.0.len() as i32, depth as i32)); } }); }); indels } pub fn rec_merger(mut seqs: Vec, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Vec { let mut n_retry = 0; let mut merged = Vec::new(); let mut last_len = seqs.len(); loop { if last_len <= 1 { break; } let tmp = seqs.get(1..).unwrap().to_vec(); let re = merger(&tmp, seqs.get(0).unwrap(), kmer_len, max_mismatches, max_consecutive_mismatches); match re { Some(res) => { merged.push(res.0.clone()); seqs = res.1; }, None => { match n_retry { 0 => seqs.reverse(), _ => seqs.shuffle(&mut thread_rng()) } }, } n_retry += 1; if last_len == seqs.len() { break; } else { last_len = seqs.len(); } } merged } pub fn merger(sequences: &Vec, to_merge: &DNAString, kmer_len: usize, max_mismatches: i8, max_consecutive_mismatches: i8) -> Option<(DNAString, Vec)> { let mut to_merge = to_merge.clone(); let mut merged = Vec::new(); // try to merge to others seeds for (i, seed) in sequences.iter().enumerate() { if seed.0.len() >= kmer_len { let r = to_merge.merge(seed, kmer_len, max_mismatches, max_consecutive_mismatches); match r { Ok(_) => { merged.push(i); }, Err(_err) => (), } } } if merged.len() > 0 { Some(( to_merge, sequences.into_iter().enumerate().filter(|(i, _)| !merged.contains(i)).map(|(_, e)| e.clone()).collect::>() )) } else { None } }