Thomas 1 year ago
parent
commit
313c336f26
4 changed files with 845 additions and 82 deletions
  1. 761 30
      Cargo.lock
  2. 5 0
      Cargo.toml
  3. 29 13
      src/lib.rs
  4. 50 39
      src/phase.rs

File diff suppressed because it is too large
+ 761 - 30
Cargo.lock


+ 5 - 0
Cargo.toml

@@ -14,3 +14,8 @@ rayon = "1.10.0"
 rust-htslib = "0.47.0"
 uuid = { version = "1.10.0", features = ["v4"] }
 pandora_lib_variants = { git = "https://git.t0m4.fr/Thomas/pandora_lib_variants.git" }
+pandora_lib_pileup = { git = "https://git.t0m4.fr/Thomas/pandora_lib_pileup.git" }
+indicatif-log-bridge = "0.2.2"
+serde = { version = "1.0.*", default-features = false }
+postcard = { version = "1.0.8", features = ["alloc"] }
+

+ 29 - 13
src/lib.rs

@@ -9,7 +9,7 @@ use rust_htslib::bam::{Format, Header, Read, Record, Writer};
 use std::{
     collections::{HashMap, HashSet},
     fs::{self, File},
-    io::{self, BufRead, BufReader, BufWriter, Write},
+    io::{self, BufReader, BufWriter, Write},
 };
 
 pub mod bam;
@@ -279,6 +279,13 @@ pub fn par_whole_scan(contig: &str, bam_path: &str, out_dir: &str) -> anyhow::Re
 
 #[cfg(test)]
 mod tests {
+    use indicatif::MultiProgress;
+    use indicatif_log_bridge::LogWrapper;
+    use num_format::{CustomFormat, Grouping, ToFormattedString};
+    use pandora_lib_variants::{in_out::dict_reader::read_dict, variants::Variant};
+
+    use crate::phase::{save_phases, variants_phasing};
+
     use super::*;
 
     fn init() {
@@ -302,10 +309,9 @@ mod tests {
     }
 
     #[test]
-    fn phasing() -> Result<()> {
-        let name = "CAMARA";
+    fn phasing() -> anyhow::Result<()> {
+        let id = "SALICETTO";
         let min_records = 2;
-        let min_var = 2;
 
         let format = CustomFormat::builder()
             .grouping(Grouping::Standard)
@@ -320,13 +326,14 @@ mod tests {
         let multi = MultiProgress::new();
         LogWrapper::new(multi.clone(), logger).try_init().unwrap();
 
-        let bam_path = &format!("/data/longreads_basic_pipe/{name}/diag/{name}_diag_hs1.bam");
+        let bam_path = &format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
         let somatic_path =
-            &format!("/data/longreads_basic_pipe/{name}/diag/{name}_constit.bytes.gz");
-        let bed = &format!("/data/longreads_basic_pipe/{name}/diag/{name}_phases.bed");
+            &format!("/data/longreads_basic_pipe/{id}/diag/{id}_constit.bytes.gz");
+        let phases_dir = format!("/data/longreads_basic_pipe/{id}/diag/phases");
+        fs::create_dir_all(&phases_dir)?;
 
         let variants =
-            libVariant::variants::Variants::new_from_bytes(name, &somatic_path, multi.clone())?;
+            pandora_lib_variants::variants::Variants::new_from_bytes(id, somatic_path, multi.clone())?;
 
         info!(
             "Variants loaded {}.",
@@ -342,8 +349,8 @@ mod tests {
             })
             .collect();
 
-        let contigs = DashSet::new();
-        variants.par_iter().for_each(|v| {
+        let mut contigs = HashSet::new();
+        variants.iter().for_each(|v| {
             contigs.insert(v.contig.to_string());
         });
 
@@ -354,16 +361,25 @@ mod tests {
             if !contigs.contains(&contig) {
                 continue;
             }
+            // if contig != *"chr9" {
+            //     continue;
+            // }
 
             let v: Vec<_> = variants
                 .clone()
                 .into_par_iter()
                 .filter(|v| v.contig == contig)
                 .collect();
-            if variants.len() > 1 {
-                info!("{contig}: {} variants", v.len());
+            if !variants.is_empty() {
+                info!("{contig}: {} variants to phase", v.len());
                 let phases = variants_phasing(v, bam_path, min_records, &multi);
-                write_phases_bed(&phases, min_var, bam_path, &contig, bed)?;
+                save_phases(&phases, &format!("{phases_dir}/{id}_{contig}_phases.postcard"))?;
+                // info!("{} phases", phases.len());
+                // let f = phases.first().unwrap();
+                // println!("{f:#?}");
+                // let f = phases.last().unwrap();
+                // println!("{f:#?}");
+               // write_phases_bed(&phases, min_var, bam_path, &contig, bed)?;
             }
         }
 

+ 50 - 39
src/phase.rs

@@ -1,19 +1,18 @@
 use anyhow::{anyhow, Ok, Result};
 use crossbeam_channel::{unbounded, Receiver, Sender};
 use indicatif::MultiProgress;
-use pandora_lib_variants::{utils::new_pg_speed, variants::Variant};
 use log::{info, warn};
 use num_format::{CustomFormat, Grouping, ToFormattedString};
+use pandora_lib_variants::{utils::new_pg_speed, variants::Variant};
 use rayon::prelude::*;
 use rust_htslib::bam::IndexedReader;
+use serde::{Deserialize, Serialize};
 use std::{
-    cmp::Ordering,
-    collections::{HashSet, VecDeque},
-    thread::spawn,
+    cmp::Ordering, collections::{HashSet, VecDeque}, fs::File, io::Read, thread::spawn
 };
 use std::{fs::OpenOptions, io::Write};
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct HeteroVar {
     pub chr: String,
     pub position: i32,
@@ -31,7 +30,7 @@ impl HeteroVar {
         alternative: u8,
     ) -> Result<(HeteroVar, HeteroVar)> {
         let rec_base = if let std::result::Result::Ok(rb) =
-            pileup::qnames_at_base(bam, chr, position, false)
+            pandora_lib_pileup::qnames_at_base(bam, chr, position, false)
         {
             rb
         } else {
@@ -54,10 +53,10 @@ impl HeteroVar {
         let res: Vec<HeteroVar> = vec![a, b]
             .into_iter()
             .enumerate()
-            .filter(|(_, (_, rec))| rec.len() > 0)
+            .filter(|(_, (_, rec))| !rec.is_empty())
             .map(|(_, (base, records))| {
                 let mut records_ids: HashSet<String> = HashSet::new();
-                records_ids.extend(records.into_iter());
+                records_ids.extend(records);
                 HeteroVar {
                     chr: chr.to_string(),
                     position,
@@ -69,11 +68,11 @@ impl HeteroVar {
             .collect();
 
         if res.len() == 2 {
-            let a = res.get(0).unwrap().to_owned();
+            let a = res.first().unwrap().to_owned();
             let b = res.get(1).unwrap().to_owned();
-            return Ok((a, b));
+            Ok((a, b))
         } else {
-            return Err(anyhow!("Not enough reads"));
+            Err(anyhow!("Not enough reads"))
         }
     }
 }
@@ -86,7 +85,7 @@ impl PartialEq for HeteroVar {
     }
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct Phase {
     pub data: Vec<HeteroVar>,
     pub reads: HashSet<String>,
@@ -107,7 +106,7 @@ impl Phase {
             .count()
             >= min_records
         {
-            self.reads.extend(hetero_var.reads.clone().into_iter());
+            self.reads.extend(hetero_var.reads.clone());
             self.data.push(hetero_var.clone());
             self.data.sort_by(|a, b| a.position.cmp(&b.position));
             true
@@ -124,8 +123,8 @@ impl Phase {
             .count()
             >= min_records
         {
-            self.reads.extend(phase.reads.clone().into_iter());
-            self.data.extend(phase.data.clone().into_iter());
+            self.reads.extend(phase.reads.clone());
+            self.data.extend(phase.data.clone());
             self.data.sort_by(|a, b| a.position.cmp(&b.position));
             true
         } else {
@@ -141,8 +140,10 @@ impl Phase {
         let left = data.first().unwrap();
         let right = data.last().unwrap();
 
-        let left_records = pileup::records_at_base(bam, &left.chr, left.position, true)?;
-        let right_records = pileup::records_at_base(bam, &right.chr, right.position, true)?;
+        let left_records =
+            pandora_lib_pileup::records_at_base(bam, &left.chr, left.position, true)?;
+        let right_records =
+            pandora_lib_pileup::records_at_base(bam, &right.chr, right.position, true)?;
 
         let left_starts: Vec<_> = left_records
             .iter()
@@ -160,13 +161,8 @@ impl Phase {
         let max_cov = right_ends.iter().min();
         let max = right_ends.iter().max();
 
-        if min.is_some() && min_cov.is_some() && max.is_some() && max_cov.is_some() {
-            Ok((
-                *min.unwrap(),
-                *min_cov.unwrap(),
-                *max_cov.unwrap(),
-                *max.unwrap(),
-            ))
+        if let (Some(min), Some(min_cov), Some(max_cov), Some(max)) = (min, min_cov, max_cov, max) {
+            Ok((*min, *min_cov, *max_cov, *max))
         } else {
             Err(anyhow!("problem"))
         }
@@ -185,8 +181,8 @@ impl Phase {
 
     pub fn bed_string(&self, bam: &mut IndexedReader) -> Result<String> {
         let first = self.data.first().unwrap();
-        let (min, min_cov, max_cov, max) = self.range(bam)?;
-        Ok(vec![
+        let (_min, min_cov, max_cov, _max) = self.range(bam)?;
+        Ok([
             first.chr.to_string(),
             min_cov.to_string(),
             max_cov.to_string(),
@@ -231,7 +227,7 @@ pub fn merge_phases(
     let mut merged_phases = Vec::new();
     let mut round_merges = 0;
 
-    let mut phases = VecDeque::from_iter(phases.into_iter());
+    let mut phases = VecDeque::from_iter(phases);
     while let Some(phase) = phases.pop_front() {
         let (s, r) = spawn_phase(&phase, min_records);
         phases
@@ -261,10 +257,10 @@ pub fn merge_phases(
             }
         }
 
-        let n_merged = merged_ids.len();
-        if n_merged > 0 {
-            info!("Merged {}", merged_ids.len());
-        }
+        // let n_merged = merged_ids.len();
+        // if n_merged > 0 {
+        //     info!("Merged {}", merged_ids.len());
+        // }
 
         // Remove merged from phases.
         pg.set_length(pg.length().unwrap() as u64 - merged_ids.len() as u64);
@@ -276,7 +272,7 @@ pub fn merge_phases(
             .collect();
         merged_ids.clear();
 
-        if phases.len() == 0 && round_merges > 0 {
+        if phases.is_empty() && round_merges > 0 {
             phases.extend(merged_phases.clone().into_iter());
             merged_phases.clear();
             warn!("Round merges {round_merges}");
@@ -341,16 +337,16 @@ pub fn variants_phasing(
 
     let mut phases: Vec<Phase> = phases
         .par_chunks(phases.len() / 75)
-        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 5, &multi).unwrap())
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 5, multi).unwrap())
         .collect();
     phases.sort();
     let phases: Vec<Phase> = phases
         .par_chunks(phases.len() / 50)
-        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 100, &multi).unwrap())
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 100, multi).unwrap())
         .collect();
     let phases: Vec<Phase> = phases
         .par_chunks(phases.len() / 25)
-        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 1000, &multi).unwrap())
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 1000, multi).unwrap())
         .collect();
 
     phases
@@ -385,19 +381,19 @@ pub fn write_phases_bed(
         .map(|((min, min_cov, max_cov, max), v, n)| (min..=max, min_cov..=max_cov, v, n))
         .collect();
 
-    ranges.sort_by(|(a, _, _, _), (b, _, _, _)| a.start().cmp(&b.start()));
+    ranges.sort_by(|(a, _, _, _), (b, _, _, _)| a.start().cmp(b.start()));
     let mut w = OpenOptions::new()
         .read(true)
-        .write(true)
+        // .write(true)
         .create(true)
         .append(true)
         .open(file)?;
 
-    for (_i, (range, _cov, vaf, n)) in ranges.iter().enumerate() {
+    for (range, _cov, vaf, n) in ranges.iter() {
         if *n < min_var {
             continue;
         }
-        let line = vec![
+        let line = [
             contig.to_string(),
             range.start().to_string(),
             range.end().to_string(),
@@ -447,3 +443,18 @@ pub fn spawn_phase(
     });
     (s, rr)
 }
+
+pub fn save_phases(phases: &Vec<Phase>, filename: &str) -> anyhow::Result<()> {
+    let bytes = postcard::to_allocvec(phases)?;
+    let mut file = File::create(filename)?;
+    file.write_all(&bytes)?;
+    Ok(())
+}
+
+pub fn load_phases(filename: &str) -> anyhow::Result<Vec<Phase>> {
+    let mut file = File::open(filename)?;
+    let mut bytes = Vec::new();
+    file.read_to_end(&mut bytes)?;
+    let phases: Vec<Phase> = postcard::from_bytes(&bytes)?;
+    Ok(phases)
+}

Some files were not shown because too many files changed in this diff