|
|
@@ -1,9 +1,634 @@
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
use utoipa::ToSchema;
|
|
|
|
|
|
+use anyhow::{anyhow, Ok, Result};
|
|
|
+use crossbeam_channel::{unbounded, Receiver, Sender};
|
|
|
+use flate2::{read::GzDecoder, write::GzEncoder, Compression};
|
|
|
+use indicatif::MultiProgress;
|
|
|
+use log::{info, warn};
|
|
|
+use num_format::{CustomFormat, Grouping, ToFormattedString};
|
|
|
+use rayon::prelude::*;
|
|
|
+use rust_htslib::bam::IndexedReader;
|
|
|
+use std::{
|
|
|
+ cmp::Ordering,
|
|
|
+ collections::{HashSet, VecDeque},
|
|
|
+ fs::{self, File},
|
|
|
+ io::Read,
|
|
|
+ thread::spawn,
|
|
|
+};
|
|
|
+use std::{fs::OpenOptions, io::Write};
|
|
|
+
|
|
|
+use crate::{in_out::dict_reader::read_dict, utils::new_pg_speed, variants::{AlterationCategory, Variant, Variants}};
|
|
|
+
|
|
|
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, ToSchema)]
|
|
|
+pub struct PhaseAnnotation {
|
|
|
+ pub id: String,
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
+pub struct HeteroVar {
|
|
|
+ pub chr: String,
|
|
|
+ pub position: i32,
|
|
|
+ pub base: u8,
|
|
|
+ pub vaf: f64,
|
|
|
+ pub reads: HashSet<String>,
|
|
|
+}
|
|
|
+
|
|
|
+impl HeteroVar {
|
|
|
+ pub fn new(
|
|
|
+ bam_a: &mut IndexedReader,
|
|
|
+ bam_b: &mut IndexedReader,
|
|
|
+ chr: &str,
|
|
|
+ position: i32,
|
|
|
+ reference: u8,
|
|
|
+ alternative: u8,
|
|
|
+ ) -> Result<(HeteroVar, HeteroVar)> {
|
|
|
+ let mut rec_base = if let std::result::Result::Ok(rb) =
|
|
|
+ pandora_lib_pileup::qnames_at_base(bam_a, chr, position, false, 50)
|
|
|
+ {
|
|
|
+ rb
|
|
|
+ } else {
|
|
|
+ return Err(anyhow!("Error while reading BAM file."));
|
|
|
+ };
|
|
|
+
|
|
|
+ if let std::result::Result::Ok(rb) =
|
|
|
+ pandora_lib_pileup::qnames_at_base(bam_b, chr, position, false, 50)
|
|
|
+ {
|
|
|
+ rec_base.extend(rb);
|
|
|
+ } else {
|
|
|
+ return Err(anyhow!("Error while reading BAM file."));
|
|
|
+ };
|
|
|
+
|
|
|
+ let depth = rec_base.len() as i32;
|
|
|
+ if depth == 0 {
|
|
|
+ return Err(anyhow!("No records"));
|
|
|
+ }
|
|
|
+ let mut a = (reference, Vec::new());
|
|
|
+ let mut b = (alternative, Vec::new());
|
|
|
+ for (record, base) in rec_base.into_iter() {
|
|
|
+ if base == reference {
|
|
|
+ a.1.push(record.clone());
|
|
|
+ } else if base == alternative {
|
|
|
+ b.1.push(record.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let res: Vec<HeteroVar> = vec![a, b]
|
|
|
+ .into_iter()
|
|
|
+ .enumerate()
|
|
|
+ .filter(|(_, (_, rec))| !rec.is_empty())
|
|
|
+ .map(|(_, (base, records))| {
|
|
|
+ let mut records_ids: HashSet<String> = HashSet::new();
|
|
|
+ records_ids.extend(records);
|
|
|
+ HeteroVar {
|
|
|
+ chr: chr.to_string(),
|
|
|
+ position,
|
|
|
+ base,
|
|
|
+ vaf: records_ids.len() as f64 / depth as f64,
|
|
|
+ reads: records_ids,
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ if res.len() == 2 {
|
|
|
+ let a = res.first().unwrap().to_owned();
|
|
|
+ let b = res.get(1).unwrap().to_owned();
|
|
|
+ Ok((a, b))
|
|
|
+ } else {
|
|
|
+ Err(anyhow!("Not enough reads"))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+impl PartialEq for HeteroVar {
|
|
|
+ fn eq(&self, other: &Self) -> bool {
|
|
|
+ self.reads == other.reads
|
|
|
+ && self.chr == other.chr
|
|
|
+ && self.position == other.position
|
|
|
+ && self.base == other.base
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
pub struct Phase {
|
|
|
+ pub id: Option<String>,
|
|
|
+ pub data: Vec<HeteroVar>,
|
|
|
+ pub reads: HashSet<String>,
|
|
|
+ pub range: Option<(u64, u64, u64, u64)>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Phase {
|
|
|
+ pub fn new(hetero_var: &HeteroVar) -> Self {
|
|
|
+ Self {
|
|
|
+ id: None,
|
|
|
+ data: vec![hetero_var.clone()],
|
|
|
+ reads: hetero_var.reads.clone(),
|
|
|
+ range: None,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pub fn try_merge(&mut self, hetero_var: &HeteroVar, min_records: usize) -> bool {
|
|
|
+ if hetero_var
|
|
|
+ .reads
|
|
|
+ .par_iter()
|
|
|
+ .filter(|r| self.reads.contains(*r))
|
|
|
+ .count()
|
|
|
+ >= min_records
|
|
|
+ {
|
|
|
+ self.reads.extend(hetero_var.reads.clone());
|
|
|
+ self.data.push(hetero_var.clone());
|
|
|
+ self.data.sort_by(|a, b| a.position.cmp(&b.position));
|
|
|
+ true
|
|
|
+ } else {
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn contains(&self, records_ids: &[String]) -> bool {
|
|
|
+ self.reads
|
|
|
+ .par_iter()
|
|
|
+ .filter(|v| records_ids.contains(*v))
|
|
|
+ .count()
|
|
|
+ > 0
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn try_merge_phase(&mut self, phase: &Phase, min_records: usize) -> bool {
|
|
|
+ if phase
|
|
|
+ .reads
|
|
|
+ .par_iter()
|
|
|
+ .filter(|r| self.reads.contains(*r))
|
|
|
+ .count()
|
|
|
+ >= min_records
|
|
|
+ {
|
|
|
+ self.reads.extend(phase.reads.clone());
|
|
|
+ self.data.extend(phase.data.clone());
|
|
|
+ self.data.sort_by(|a, b| a.position.cmp(&b.position));
|
|
|
+ true
|
|
|
+ } else {
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn make_range(
|
|
|
+ &mut self,
|
|
|
+ bam_a: &mut IndexedReader,
|
|
|
+ bam_b: &mut IndexedReader,
|
|
|
+ ) -> Result<()> {
|
|
|
+ self.sort_dedup();
|
|
|
+
|
|
|
+ let left = self.data.first().unwrap();
|
|
|
+ let right = self.data.last().unwrap();
|
|
|
+
|
|
|
+ let mut left_records =
|
|
|
+ pandora_lib_pileup::records_at_base(bam_a, &left.chr, left.position, false)?;
|
|
|
+ let mut right_records =
|
|
|
+ pandora_lib_pileup::records_at_base(bam_a, &right.chr, right.position, false)?;
|
|
|
+
|
|
|
+ left_records.extend(pandora_lib_pileup::records_at_base(
|
|
|
+ bam_b,
|
|
|
+ &left.chr,
|
|
|
+ left.position,
|
|
|
+ false,
|
|
|
+ )?);
|
|
|
+ right_records.extend(pandora_lib_pileup::records_at_base(
|
|
|
+ bam_b,
|
|
|
+ &right.chr,
|
|
|
+ right.position,
|
|
|
+ false,
|
|
|
+ )?);
|
|
|
+
|
|
|
+ let left_starts: Vec<_> = left_records
|
|
|
+ .iter()
|
|
|
+ .filter(|(_, b)| *b == left.base)
|
|
|
+ .map(|(r, _)| r.pos() as u64)
|
|
|
+ .collect();
|
|
|
+ let right_ends: Vec<_> = right_records
|
|
|
+ .iter()
|
|
|
+ .filter(|(_, b)| *b == right.base)
|
|
|
+ .map(|(r, _)| (r.pos() as usize + r.seq_len()) as u64)
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ let min = left_starts.iter().min();
|
|
|
+ let min_cov = left_starts.iter().max();
|
|
|
+ let max_cov = right_ends.iter().min();
|
|
|
+ let max = right_ends.iter().max();
|
|
|
+ if max.is_none() {
|
|
|
+ println!("{right:?}");
|
|
|
+ println!("{}", String::from_utf8(vec![right.base]).unwrap());
|
|
|
+ println!("{right_records:?}");
|
|
|
+ println!("{right_ends:?}");
|
|
|
+ println!("{min:?} {min_cov:?} {max:?} {max_cov:?}");
|
|
|
+ }
|
|
|
+
|
|
|
+ if let (Some(min), Some(min_cov), Some(max_cov), Some(max)) = (min, min_cov, max_cov, max) {
|
|
|
+ self.range = Some((*min, *min_cov, *max_cov, *max));
|
|
|
+ Ok(())
|
|
|
+ } else {
|
|
|
+ Err(anyhow!(format!("can't find range {:#?}", self)))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn mean_vaf(&self) -> f64 {
|
|
|
+ let vafs: Vec<_> = self.data.iter().map(|s| s.vaf).collect();
|
|
|
+ vafs.iter().sum::<f64>() / vafs.len() as f64
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn sort_dedup(&mut self) {
|
|
|
+ self.data.sort_by(|a, b| a.position.cmp(&b.position));
|
|
|
+ self.data
|
|
|
+ .dedup_by(|a, b| a.position == b.position && a.chr == b.chr && a.base == b.base);
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn bed_string(&self) -> Option<String> {
|
|
|
+ // if let (Some((_min, min_cov, max_cov, _max)), Some(id)) = (self.range, self.id()) {
|
|
|
+ if let Some(id) = self.id() {
|
|
|
+ let f = self.data.first().unwrap();
|
|
|
+ let chr = f.chr.clone();
|
|
|
+ let start = f.position;
|
|
|
+ let end = self.data.last().unwrap().position;
|
|
|
+ Some(
|
|
|
+ [
|
|
|
+ chr,
|
|
|
+ start.to_string(),
|
|
|
+ end.to_string(),
|
|
|
+ format!("{} {:.2}", id, self.mean_vaf()),
|
|
|
+ ]
|
|
|
+ .join("\t"),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn id(&self) -> Option<String> {
|
|
|
+ let synteny = String::from_utf8(self.data.iter().map(|var| var.base).collect()).unwrap();
|
|
|
+ if let Some((min, _, _, max)) = self.range {
|
|
|
+ let f = self.data.first().unwrap().chr.clone();
|
|
|
+ let l = self.data.last().unwrap().chr.clone();
|
|
|
+ let b = if l != f {
|
|
|
+ format!("{l}:")
|
|
|
+ } else {
|
|
|
+ "".to_string()
|
|
|
+ };
|
|
|
+ Some(format!("{}:{min}-{}{max}[{synteny}]", f, b))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Ord for Phase {
|
|
|
+ fn cmp(&self, other: &Self) -> Ordering {
|
|
|
+ let s = self.data.first().unwrap();
|
|
|
+ let other = other.data.first().unwrap();
|
|
|
+ s.position.cmp(&other.position)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl PartialOrd for Phase {
|
|
|
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
+ Some(self.cmp(other))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl PartialEq for Phase {
|
|
|
+ fn eq(&self, other: &Self) -> bool {
|
|
|
+ self.reads == other.reads
|
|
|
+ // self.data.iter().filter(|s| other.data.contains(s)).count() == other.data.len()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Eq for Phase {}
|
|
|
+
|
|
|
+pub fn merge_phases(
|
|
|
+ phases: Vec<Phase>,
|
|
|
+ min_records: usize,
|
|
|
+ takes_n: usize,
|
|
|
+ multi: &MultiProgress,
|
|
|
+) -> Result<Vec<Phase>> {
|
|
|
+ let pg = multi.add(new_pg_speed(phases.len() as u64));
|
|
|
+ pg.set_message(format!("Phasing by {takes_n}"));
|
|
|
+
|
|
|
+ let mut merged_phases = Vec::new();
|
|
|
+ let mut round_merges = 0;
|
|
|
+
|
|
|
+ let mut phases = VecDeque::from_iter(phases);
|
|
|
+ while let Some(phase) = phases.pop_front() {
|
|
|
+ let (s, r) = spawn_phase(&phase, min_records);
|
|
|
+ phases
|
|
|
+ .par_iter()
|
|
|
+ .take(takes_n)
|
|
|
+ .enumerate()
|
|
|
+ .for_each(|(i, p)| s.send(MessagesIn::Phase((i, p.clone()))).unwrap());
|
|
|
+
|
|
|
+ s.send(MessagesIn::Return)?;
|
|
|
+
|
|
|
+ let mut merged_ids = Vec::new();
|
|
|
+ loop {
|
|
|
+ match r.recv() {
|
|
|
+ std::result::Result::Ok(msg) => match msg {
|
|
|
+ MessagesOut::Phase(p) => {
|
|
|
+ merged_phases.push(p);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ MessagesOut::Merged((i, b)) => {
|
|
|
+ if b {
|
|
|
+ round_merges += 1;
|
|
|
+ merged_ids.push(i);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Err(e) => warn!("{e}"),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // let n_merged = merged_ids.len();
|
|
|
+ // if n_merged > 0 {
|
|
|
+ // info!("Merged {}", merged_ids.len());
|
|
|
+ // }
|
|
|
+
|
|
|
+ // Remove merged from phases.
|
|
|
+ pg.set_length(pg.length().unwrap() as u64 - merged_ids.len() as u64);
|
|
|
+ phases = phases
|
|
|
+ .iter()
|
|
|
+ .enumerate()
|
|
|
+ .filter(|(i, _)| !merged_ids.contains(i))
|
|
|
+ .map(|(_, p)| p.to_owned())
|
|
|
+ .collect();
|
|
|
+ merged_ids.clear();
|
|
|
+
|
|
|
+ if phases.is_empty() && round_merges > 0 {
|
|
|
+ phases.extend(merged_phases.clone().into_iter());
|
|
|
+ merged_phases.clear();
|
|
|
+ warn!("Round merges {round_merges}");
|
|
|
+ round_merges = 0;
|
|
|
+ pg.reset();
|
|
|
+ pg.set_length(phases.len() as u64);
|
|
|
+ }
|
|
|
+ pg.inc(1);
|
|
|
+ }
|
|
|
+ pg.finish();
|
|
|
+ multi.remove(&pg);
|
|
|
+
|
|
|
+ let phases = merged_phases.clone();
|
|
|
+
|
|
|
+ Ok(phases)
|
|
|
+}
|
|
|
+
|
|
|
+pub fn variants_phasing(
|
|
|
+ variants: Vec<Variant>,
|
|
|
+ bam_path_a: &str,
|
|
|
+ bam_path_b: &str,
|
|
|
+ min_records: usize,
|
|
|
+ multi: &MultiProgress,
|
|
|
+) -> Vec<Phase> {
|
|
|
+ info!("{} variants to analyse.", variants.len());
|
|
|
+ let pg = multi.add(new_pg_speed(variants.len() as u64));
|
|
|
+ pg.set_message("Parsing reads at SNPs positions");
|
|
|
+ let mut phases = variants
|
|
|
+ .par_chunks(2_000)
|
|
|
+ .flat_map(|chunks| {
|
|
|
+ let mut bam_a = rust_htslib::bam::IndexedReader::from_path(bam_path_a).unwrap();
|
|
|
+ let mut bam_b = rust_htslib::bam::IndexedReader::from_path(bam_path_b).unwrap();
|
|
|
+ let mut errors = Vec::new();
|
|
|
+ let mut phases: Vec<Phase> = Vec::new();
|
|
|
+ for v in chunks {
|
|
|
+ pg.inc(1);
|
|
|
+ let mut reference = v.reference.to_string().as_bytes().to_vec();
|
|
|
+ let mut altenative = v.alternative.to_string().as_bytes().to_vec();
|
|
|
+ if reference.len() != 1 && altenative.len() != 1 {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ match HeteroVar::new(
|
|
|
+ &mut bam_a,
|
|
|
+ &mut bam_b,
|
|
|
+ &v.contig.to_string(),
|
|
|
+ v.position as i32,
|
|
|
+ reference.pop().unwrap(),
|
|
|
+ altenative.pop().unwrap(),
|
|
|
+ ) {
|
|
|
+ std::result::Result::Ok((a, b)) => {
|
|
|
+ phases.push(Phase::new(&a));
|
|
|
+ phases.push(Phase::new(&b));
|
|
|
+ }
|
|
|
+ Err(e) => errors.push(e),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ phases
|
|
|
+ })
|
|
|
+ .collect::<Vec<Phase>>();
|
|
|
+
|
|
|
+ pg.finish();
|
|
|
+ multi.remove(&pg);
|
|
|
+ phases.sort();
|
|
|
+ phases.dedup();
|
|
|
+
|
|
|
+ let mut phases: Vec<Phase> = phases
|
|
|
+ .par_chunks(phases.len() / 75)
|
|
|
+ .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 5, multi).unwrap())
|
|
|
+ .collect();
|
|
|
+ phases.par_sort();
|
|
|
+ info!("{} phases", phases.len());
|
|
|
+
|
|
|
+ let mut phases: Vec<Phase> = phases
|
|
|
+ .par_chunks(phases.len() / 50)
|
|
|
+ .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 100, multi).unwrap())
|
|
|
+ .collect();
|
|
|
+ phases.par_sort();
|
|
|
+ info!("{} phases", phases.len());
|
|
|
+
|
|
|
+ let mut phases: Vec<Phase> = phases
|
|
|
+ .par_chunks(phases.len() / 25)
|
|
|
+ .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 1000, multi).unwrap())
|
|
|
+ .collect();
|
|
|
+ info!("{} phases", phases.len());
|
|
|
+
|
|
|
+ let chunk_size = phases.len() / 25;
|
|
|
+ phases.par_chunks_mut(chunk_size).for_each(|chunk| {
|
|
|
+ let mut bam_a = rust_htslib::bam::IndexedReader::from_path(bam_path_a).unwrap();
|
|
|
+ let mut bam_b = rust_htslib::bam::IndexedReader::from_path(bam_path_b).unwrap();
|
|
|
+ chunk.iter_mut().for_each(|p| {
|
|
|
+ if let Err(e) = p.make_range(&mut bam_a, &mut bam_b) {
|
|
|
+ warn!("{e}");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+
|
|
|
+ phases.par_sort_by(|a, b| a.range.unwrap().0.cmp(&b.range.unwrap().0));
|
|
|
+
|
|
|
+ phases
|
|
|
+}
|
|
|
+
|
|
|
+pub fn write_phases_bed(
|
|
|
+ phases: &[Phase], // should be already sorted and with ranges
|
|
|
+ file: &str,
|
|
|
+) -> Result<()> {
|
|
|
+ let mut w = OpenOptions::new()
|
|
|
+ .read(true)
|
|
|
+ // .write(true)
|
|
|
+ .create(true)
|
|
|
+ .append(true)
|
|
|
+ .open(file)?;
|
|
|
+
|
|
|
+ for phase in phases.iter() {
|
|
|
+ if let Some(s) = phase.bed_string() {
|
|
|
+ writeln!(&mut w, "{s}")?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+pub enum MessagesIn {
|
|
|
+ Phase((usize, Phase)),
|
|
|
+ Return,
|
|
|
+}
|
|
|
+pub enum MessagesOut {
|
|
|
+ Phase(Phase),
|
|
|
+ Merged((usize, bool)),
|
|
|
+}
|
|
|
+
|
|
|
+pub fn spawn_phase(
|
|
|
+ phase: &Phase,
|
|
|
+ min_records: usize,
|
|
|
+) -> (Sender<MessagesIn>, Receiver<MessagesOut>) {
|
|
|
+ let (s, r) = unbounded::<MessagesIn>();
|
|
|
+ let (ss, rr) = unbounded::<MessagesOut>();
|
|
|
+ let mut phase = phase.clone();
|
|
|
+ spawn(move || loop {
|
|
|
+ match r.recv() {
|
|
|
+ std::result::Result::Ok(msg) => match msg {
|
|
|
+ MessagesIn::Phase((i, p)) => {
|
|
|
+ let res = phase.try_merge_phase(&p, min_records);
|
|
|
+ ss.send(MessagesOut::Merged((i, res))).unwrap();
|
|
|
+ }
|
|
|
+ MessagesIn::Return => {
|
|
|
+ ss.send(MessagesOut::Phase(phase)).unwrap();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Err(e) => panic!("Err {e}"),
|
|
|
+ };
|
|
|
+ });
|
|
|
+ (s, rr)
|
|
|
+}
|
|
|
+
|
|
|
+pub fn save_phases(phases: &Vec<Phase>, filename: &str) -> anyhow::Result<()> {
|
|
|
+ let bytes = postcard::to_allocvec(phases).expect("Serialization failed");
|
|
|
+ let file = File::create(filename)?;
|
|
|
+ let mut encoder = GzEncoder::new(file, Compression::default());
|
|
|
+ encoder.write_all(&bytes)?;
|
|
|
+ encoder.finish()?;
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+pub fn load_phases(filename: &str) -> anyhow::Result<Vec<Phase>> {
|
|
|
+ let file = File::open(filename)?;
|
|
|
+ let mut decoder = GzDecoder::new(file);
|
|
|
+ let mut bytes = Vec::new();
|
|
|
+ decoder.read_to_end(&mut bytes)?;
|
|
|
+ let phases: Vec<Phase> = postcard::from_bytes(&bytes).expect("Deserialization failed");
|
|
|
+ Ok(phases)
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct PhaserConfig {
|
|
|
pub id: String,
|
|
|
+ pub bam_path_a: String,
|
|
|
+ pub bam_path_b: String,
|
|
|
+ pub const_variants: String,
|
|
|
+ pub phases_dir: String,
|
|
|
+ pub min_records: usize,
|
|
|
+ pub around_hetero: f32,
|
|
|
+}
|
|
|
+
|
|
|
+impl PhaserConfig {
|
|
|
+ pub fn new(id: &str, data_dir: &str, min_records: usize, around_hetero: f32) -> Self {
|
|
|
+ let bam_path_a = format!("{data_dir}/{id}/diag/{id}_diag_hs1.bam");
|
|
|
+ let bam_path_b = format!("{data_dir}/{id}/mrd/{id}_mrd_hs1.bam");
|
|
|
+ let const_variants = format!("{data_dir}/{id}/diag/{id}_constit.bytes.gz");
|
|
|
+ let phases_dir = format!("{data_dir}/{id}/diag/phases");
|
|
|
+ Self {
|
|
|
+ id: id.to_string(),
|
|
|
+ bam_path_a,
|
|
|
+ bam_path_b,
|
|
|
+ const_variants,
|
|
|
+ phases_dir,
|
|
|
+ min_records,
|
|
|
+ around_hetero,
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+pub fn phase(config: PhaserConfig, progress: MultiProgress) -> anyhow::Result<()> {
|
|
|
+ let format = CustomFormat::builder()
|
|
|
+ .grouping(Grouping::Standard)
|
|
|
+ .minus_sign("-")
|
|
|
+ .separator("_")
|
|
|
+ .build()
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ let PhaserConfig {
|
|
|
+ id,
|
|
|
+ bam_path_a,
|
|
|
+ bam_path_b,
|
|
|
+ const_variants,
|
|
|
+ phases_dir,
|
|
|
+ min_records,
|
|
|
+ around_hetero,
|
|
|
+ } = config;
|
|
|
+
|
|
|
+ fs::create_dir_all(&phases_dir)?;
|
|
|
|
|
|
+ // Loading variants
|
|
|
+ let variants = Variants::new_from_bytes(
|
|
|
+ &id,
|
|
|
+ &const_variants,
|
|
|
+ progress.clone(),
|
|
|
+ )?;
|
|
|
+ info!(
|
|
|
+ "{} variants loaded",
|
|
|
+ variants.len().to_formatted_string(&format)
|
|
|
+ );
|
|
|
+
|
|
|
+ // Filtering variants for heterozigous SNPs
|
|
|
+ let mut variants: Vec<Variant> = variants
|
|
|
+ .data
|
|
|
+ .into_par_iter()
|
|
|
+ .filter(|v| {
|
|
|
+ let mut v = v.clone();
|
|
|
+ v.vaf() > (0.5 - around_hetero) && v.vaf() < (0.5 + around_hetero)
|
|
|
+ })
|
|
|
+ .filter(|v| matches!(v.alteration_category(), AlterationCategory::Snv))
|
|
|
+ .collect();
|
|
|
+ let mut contigs = HashSet::new();
|
|
|
+ variants.iter().for_each(|v| {
|
|
|
+ contigs.insert(v.contig.to_string());
|
|
|
+ });
|
|
|
+ variants.par_sort_by(|a, b| a.position.cmp(&b.position));
|
|
|
+ info!("{} heterozigous SNPs considered", variants.len());
|
|
|
+
|
|
|
+ let dict = read_dict("/data/ref/hs1/chm13v2.0.dict")?;
|
|
|
+ for (contig, _) in dict {
|
|
|
+ if !contigs.contains(&contig) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let v: Vec<_> = variants
|
|
|
+ .clone()
|
|
|
+ .into_par_iter()
|
|
|
+ .filter(|v| v.contig == contig)
|
|
|
+ .collect();
|
|
|
+ if variants.len() > 1 {
|
|
|
+ info!("{contig}: {} SNPs to phase", v.len());
|
|
|
+ let phases = variants_phasing(v, &bam_path_a, &bam_path_b, min_records, &progress);
|
|
|
+ if !phases.is_empty() {
|
|
|
+ save_phases(
|
|
|
+ &phases,
|
|
|
+ &format!("{phases_dir}/{id}_{contig}_phases.postcard.gz"),
|
|
|
+ )?;
|
|
|
+ let bed_path = format!("{phases_dir}/{id}_{contig}.bed");
|
|
|
+ info!("Writting bed file {bed_path}");
|
|
|
+ write_phases_bed(&phases, &bed_path)?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|