Thomas 1 gadu atpakaļ
vecāks
revīzija
8a24d9f845
5 mainītis faili ar 758 papildinājumiem un 105 dzēšanām
  1. 93 0
      Cargo.lock
  2. 4 0
      Cargo.toml
  3. 45 45
      src/bin.rs
  4. 167 60
      src/lib.rs
  5. 449 0
      src/phase.rs

+ 93 - 0
Cargo.lock

@@ -66,6 +66,12 @@ version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
 
+[[package]]
+name = "arrayvec"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
+
 [[package]]
 name = "bindgen"
 version = "0.69.4"
@@ -173,6 +179,28 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
 
+[[package]]
+name = "console"
+version = "0.15.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb"
+dependencies = [
+ "encode_unicode",
+ "lazy_static",
+ "libc",
+ "unicode-width",
+ "windows-sys",
+]
+
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-deque"
 version = "0.8.5"
@@ -247,6 +275,12 @@ version = "1.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
 
+[[package]]
+name = "encode_unicode"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
+
 [[package]]
 name = "env_filter"
 version = "0.1.2"
@@ -350,6 +384,28 @@ version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9007da9cacbd3e6343da136e98b0d2df013f553d35bdec8b518f07bea768e19c"
 
+[[package]]
+name = "indicatif"
+version = "0.17.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3"
+dependencies = [
+ "console",
+ "instant",
+ "number_prefix",
+ "portable-atomic",
+ "unicode-width",
+]
+
+[[package]]
+name = "instant"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
+dependencies = [
+ "cfg-if",
+]
+
 [[package]]
 name = "is_terminal_polyfill"
 version = "1.70.1"
@@ -365,6 +421,12 @@ dependencies = [
  "either",
 ]
 
+[[package]]
+name = "itoa"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
+
 [[package]]
 name = "jobserver"
 version = "0.1.32"
@@ -469,6 +531,22 @@ dependencies = [
  "minimal-lexical",
 ]
 
+[[package]]
+name = "num-format"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
+dependencies = [
+ "arrayvec",
+ "itoa",
+]
+
+[[package]]
+name = "number_prefix"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
+
 [[package]]
 name = "openssl-src"
 version = "300.3.1+3.3.1"
@@ -496,8 +574,11 @@ name = "pandora_lib_scan"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "crossbeam-channel",
  "env_logger",
+ "indicatif",
  "log",
+ "num-format",
  "rayon",
  "rust-htslib",
  "uuid",
@@ -515,6 +596,12 @@ version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
 
+[[package]]
+name = "portable-atomic"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265"
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.86"
@@ -734,6 +821,12 @@ dependencies = [
  "tinyvec",
 ]
 
+[[package]]
+name = "unicode-width"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
+
 [[package]]
 name = "url"
 version = "2.5.2"

+ 4 - 0
Cargo.toml

@@ -5,8 +5,12 @@ edition = "2021"
 
 [dependencies]
 anyhow = "1.0.86"
+crossbeam-channel = "0.5.13"
 env_logger = "0.11.5"
+indicatif = "0.17.8"
 log = "0.4.22"
+num-format = "0.4.4"
 rayon = "1.10.0"
 rust-htslib = "0.47.0"
 uuid = { version = "1.10.0", features = ["v4"] }
+pandora_lib_variants = { git = "https://git.t0m4.fr/Thomas/pandora_lib_variants.git" }

+ 45 - 45
src/bin.rs

@@ -2,7 +2,7 @@ use anyhow::Context;
 use log::warn;
 use rayon::prelude::*;
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
-use std::{collections::HashMap, usize};
+use std::collections::HashMap;
 
 use crate::bam::primary_record;
 
@@ -131,48 +131,48 @@ pub fn filter_outliers_z_score(data: &mut Vec<f64>) {
     });
 }
 
-pub fn filter_outliers_modified_z_score(data: Vec<f64>, indices: Vec<u32>) -> Vec<u32> {
-    if data.is_empty() {
-        return Vec::new();
-    }
-
-    // Compute median
-    let median = {
-        let mut sorted_data = data.clone();
-        sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap());
-        let len = sorted_data.len();
-        if len % 2 == 0 {
-            (sorted_data[len / 2 - 1] + sorted_data[len / 2]) / 2.0
-        } else {
-            sorted_data[len / 2]
-        }
-    };
-
-    // Compute Median Absolute Deviation (MAD)
-    let mad = {
-        let deviations: Vec<f64> = data.iter().map(|&x| (x - median).abs()).collect();
-
-        let mut sorted_deviations = deviations.clone();
-        sorted_deviations.sort_by(|a, b| a.partial_cmp(b).unwrap());
-        let len = sorted_deviations.len();
-        if len % 2 == 0 {
-            (sorted_deviations[len / 2 - 1] + sorted_deviations[len / 2]) / 2.0
-        } else {
-            sorted_deviations[len / 2]
-        }
-    };
-
-    // Filter based on Modified Z-Score and keep indices
-    indices
-        .into_iter()
-        .zip(data)
-        .filter(|(_, x)| {
-            let modified_z_score = 0.6745 * (x - median).abs() / mad;
-            modified_z_score <= 1.5 // Threshold for outliers
-        })
-        .map(|(index, _)| index)
-        .collect()
-}
+// pub fn filter_outliers_modified_z_score(data: Vec<f64>, indices: Vec<u32>) -> Vec<u32> {
+//     if data.is_empty() {
+//         return Vec::new();
+//     }
+//
+//     // Compute median
+//     let median = {
+//         let mut sorted_data = data.clone();
+//         sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap());
+//         let len = sorted_data.len();
+//         if len % 2 == 0 {
+//             (sorted_data[len / 2 - 1] + sorted_data[len / 2]) / 2.0
+//         } else {
+//             sorted_data[len / 2]
+//         }
+//     };
+//
+//     // Compute Median Absolute Deviation (MAD)
+//     let mad = {
+//         let deviations: Vec<f64> = data.iter().map(|&x| (x - median).abs()).collect();
+//
+//         let mut sorted_deviations = deviations.clone();
+//         sorted_deviations.sort_by(|a, b| a.partial_cmp(b).unwrap());
+//         let len = sorted_deviations.len();
+//         if len % 2 == 0 {
+//             (sorted_deviations[len / 2 - 1] + sorted_deviations[len / 2]) / 2.0
+//         } else {
+//             sorted_deviations[len / 2]
+//         }
+//     };
+//
+//     // Filter based on Modified Z-Score and keep indices
+//     indices
+//         .into_iter()
+//         .zip(data)
+//         .filter(|(_, x)| {
+//             let modified_z_score = 0.6745 * (x - median).abs() / mad;
+//             modified_z_score <= 1.5 // Threshold for outliers
+//         })
+//         .map(|(index, _)| index)
+//         .collect()
+// }
 
 // Function to filter outliers based on Modified Z-Score
 pub fn filter_outliers_modified_z_score_with_indices(
@@ -244,7 +244,7 @@ pub fn scan_outliers(
     let ratios: Vec<(u32, usize, f64, f64)> = starts
         .into_par_iter()
         .filter_map(|start| {
-            match  Bin::new(bam_path, contig, start, length) {
+            match Bin::new(bam_path, contig, start, length) {
                 Ok(bin) => {
                     let n = bin.n_reads();
                     let (_, se) = bin.max_start_or_end();
@@ -255,7 +255,7 @@ pub fn scan_outliers(
                         (0.0, 0.0)
                     };
                     return Some((start, n, r_sa, r_se));
-                },
+                }
                 Err(e) => warn!("{e}"),
             }
             None

+ 167 - 60
src/lib.rs

@@ -9,11 +9,12 @@ use rust_htslib::bam::{Format, Header, Read, Record, Writer};
 use std::{
     collections::{HashMap, HashSet},
     fs::{self, File},
-    io::{BufWriter, Write},
+    io::{self, BufRead, BufReader, BufWriter, Write},
 };
 
 pub mod bam;
 pub mod bin;
+pub mod phase;
 
 #[derive(Debug)]
 pub struct Config {
@@ -49,24 +50,31 @@ pub fn scan_save(
     let outliers_records: Vec<Vec<Record>> =
         scan_outliers(bam_path, contig, start, end, bin_length)
             .iter()
-            .filter(|(_, _, _, sa_outlier, _, se_outlier)| *sa_outlier || *se_outlier)
             .map(|(start, n, sa, sa_outlier, se, se_outlier)| {
+                writer
+                    .write_all(
+                        format!(
+                            "{}:{start}-{}\t{n}\t{sa}\t{}\t{se}\t{}\n",
+                            contig,
+                            start + bin_length - 1,
+                            sa_outlier,
+                            se_outlier,
+                        )
+                        .as_bytes(),
+                    )
+                    .unwrap();
+                (start, *n, *sa, *sa_outlier, *se, *se_outlier)
+            })
+            .filter(|(_, _, _, sa_outlier, _, se_outlier)| *sa_outlier || *se_outlier)
+            .map(|(start, _n, _sa, sa_outlier, _se, se_outlier)| {
                 let mut bin = Bin::new(bam_path, contig, *start, bin_length).unwrap();
                 let mut records = Vec::new();
-                if *sa_outlier {
+                if sa_outlier {
                     records.extend(bin.sa_primary());
-                } else if *se_outlier {
+                } else if se_outlier {
                     let (pos, _) = bin.max_start_or_end();
                     records.extend(bin.se_primary(pos));
                 };
-                writer.write_all(format!(
-                    "{}:{start}-{}\t{n}\t{sa}\t{}\t{se}\t{}\t{}",
-                    contig,
-                    start + bin_length - 1,
-                    sa_outlier,
-                    se_outlier,
-                    records.len(),
-                ).as_bytes()).unwrap();
                 // writeln!(
                 //     writer,
                 //     "{}:{start}-{}\t{n}\t{sa}\t{}\t{se}\t{}\t{}",
@@ -185,37 +193,90 @@ pub fn par_whole_scan(contig: &str, bam_path: &str, out_dir: &str) -> anyhow::Re
     );
     info!("{} scan chunks to run", i.len());
 
-    let bam_out = format!("{out_dir}/reads/{contig}");
-    fs::create_dir_all(&bam_out)?;
+    let out_dir_bam = format!("{out_dir}/reads/{contig}");
+    fs::create_dir_all(&out_dir_bam)?;
     let file_out = format!("{out_dir}/counts/{contig}");
     fs::create_dir_all(&file_out)?;
     // let u = i.get(1..2).unwrap();
-    i.par_iter().enumerate().for_each(|(i, chunk)| {
-        info!("Chunk scan {i} started");
-
-        let out_scan_file = format!("{file_out}/{i:03}_counts.tsv");
-        let out_scan = File::create(out_scan_file).expect("Failed to create file");
-        let mut writer = BufWriter::new(out_scan);
-        let start = *chunk.first().unwrap() * config.bin_length;
-        let end = *chunk.last().unwrap() * config.bin_length;
-        scan_save(
-            bam_path,
-            contig,
-            start,
-            end,
-            &mut writer,
-            &bam_out,
-            &config,
-        )
+    let counts_files: Vec<String> = i
+        .par_iter()
+        .enumerate()
+        .map(|(i, chunk)| {
+            info!("Chunk scan {i} started");
+
+            let out_scan_file = format!("{file_out}/{i:03}_counts.tsv");
+            let out_scan = File::create(&out_scan_file).expect("Failed to create file");
+            let mut writer = BufWriter::new(out_scan);
+            let start = *chunk.first().unwrap() * config.bin_length;
+            let end = *chunk.last().unwrap() * config.bin_length;
+            scan_save(
+                bam_path,
+                contig,
+                start,
+                end,
+                &mut writer,
+                &out_dir_bam,
+                &config,
+            )
             .unwrap();
 
-        writer.flush().unwrap();
-        info!("Chunk scan {i} finished");
-    });
+            writer.flush().unwrap();
+            info!("Chunk scan {i} finished");
+            out_scan_file
+        })
+        .collect();
+
+    // Concat tsv
+
+    let scan_file = format!("{out_dir}/{contig}_counts.tsv");
+    let mut out_scan = File::create(&scan_file).expect("Failed to create file");
+    for count_file in counts_files {
+        let input = File::open(count_file).context("Can't open count file")?;
+        let mut reader = BufReader::new(input);
+        io::copy(&mut reader, &mut out_scan)?;
+    }
+    fs::remove_dir_all(format!("{out_dir}/counts"))?;
 
     Ok(())
 }
 
+// pub fn mrd_count_ratio(mrd_bam: &str, count_file: &str) -> anyhow::Result<()> {
+//     let count_file = File::open(count_file)?;
+//     let reader = BufReader::new(count_file);
+//     let mut tumoral_counts = Vec::new();
+//     for line in reader.lines() {
+//         let line_split: Vec<&str> = line?.split("\t").collect();
+//         let pos = *line_split.get(0).context("Can't parse the tsv file")?;
+//         let (contig, pos) = pos.split_once(':').context("Can't parse the tsv file")?;
+//         let (start, end) = pos.split_once('-').context("Can't parse the tsv file")?;
+//         let start: u32 = start.parse()?;
+//         let end: u32 = end.parse()?;
+//         let n: u32 = line_split
+//             .get(1)
+//             .context("Can't parse the tsv file")?
+//             .parse::<u32>()?;
+//         tumoral_counts.push((contig.to_string(), start, end, n));
+//     }
+//
+//     let ratios: Vec<(usize, String, u32, u32, u32, u32, Option<f64>)> = tumoral_counts
+//         .par_iter()
+//         .enumerate()
+//         .map(|(i, (contig, start, end, n))| {
+//             let bin = Bin::new(mrd_bam, contig, *start, *end - *start + 1).unwrap();
+//             let n_mrd = bin.n_reads() as u32;
+//             let r = if n_mrd == 0 {
+//                 None
+//             } else {
+//                 Some((*n as f64 / n_mrd as f64).log10())
+//             };
+//             (i, contig.to_string(), *start, *end, *n, n_mrd, r)
+//         })
+//         .collect();
+//
+//     // filter_outliers_modified_z_score_with_indices(ratios.par_iter().map(|(_, _, _, _, _, _, r)))
+//     Ok(())
+// }
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -225,32 +286,7 @@ mod tests {
             .is_test(true)
             .try_init();
     }
-    // #[test]
-    // fn it_works() -> anyhow::Result<()> {
-    //     init();
-    //     let id = "SALICETTO";
-    //     let contig = "chr10";
-    //     let start = 91_000_000;
-    //     let end = start + 1_000_000;
-    //
-    //     let result_dir = "/data/longreads_basic_pipe";
-    //     let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
-    //     let out_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
-    //     let out_scan_file = "/data/tmp/scan.tsv";
-    //
-    //     scan_save(
-    //         &diag_bam_path,
-    //         contig,
-    //         start,
-    //         end,
-    //         out_scan_file,
-    //         &out_dir,
-    //         &Config::default(),
-    //     )?;
-    //     let n_files_out = fs::read_dir(out_dir).into_iter().count();
-    //     println!("{n_files_out}");
-    //     Ok(())
-    // }
+
     #[test]
     fn par() {
         init();
@@ -261,6 +297,77 @@ mod tests {
             "chr9",
             "/data/longreads_basic_pipe/SALICETTO/diag/SALICETTO_diag_hs1.bam",
             &tmp_dir,
+        )
+        .unwrap();
+    }
+
+    #[test]
+    fn phasing() -> Result<()> {
+        let name = "CAMARA";
+        let min_records = 2;
+        let min_var = 2;
+
+        let format = CustomFormat::builder()
+            .grouping(Grouping::Standard)
+            .minus_sign("-")
+            .separator("_")
+            .build()
+            .unwrap();
+
+        let logger =
+            env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
+                .build();
+        let multi = MultiProgress::new();
+        LogWrapper::new(multi.clone(), logger).try_init().unwrap();
+
+        let bam_path = &format!("/data/longreads_basic_pipe/{name}/diag/{name}_diag_hs1.bam");
+        let somatic_path =
+            &format!("/data/longreads_basic_pipe/{name}/diag/{name}_constit.bytes.gz");
+        let bed = &format!("/data/longreads_basic_pipe/{name}/diag/{name}_phases.bed");
+
+        let variants =
+            libVariant::variants::Variants::new_from_bytes(name, &somatic_path, multi.clone())?;
+
+        info!(
+            "Variants loaded {}.",
+            variants.len().to_formatted_string(&format)
         );
+
+        let mut variants: Vec<Variant> = variants
+            .data
+            .into_par_iter()
+            .filter(|v| {
+                let mut v = v.clone();
+                v.vaf() > 0.4 && v.vaf() < 0.6
+            })
+            .collect();
+
+        let contigs = DashSet::new();
+        variants.par_iter().for_each(|v| {
+            contigs.insert(v.contig.to_string());
+        });
+
+        variants.par_sort_by(|a, b| a.position.cmp(&b.position));
+
+        let dict = read_dict("/data/ref/hs1/chm13v2.0.dict")?;
+        for (contig, _) in dict {
+            if !contigs.contains(&contig) {
+                continue;
+            }
+
+            let v: Vec<_> = variants
+                .clone()
+                .into_par_iter()
+                .filter(|v| v.contig == contig)
+                .collect();
+            if variants.len() > 1 {
+                info!("{contig}: {} variants", v.len());
+                let phases = variants_phasing(v, bam_path, min_records, &multi);
+                write_phases_bed(&phases, min_var, bam_path, &contig, bed)?;
+            }
+        }
+
+        // TODO: assign somatic to constit phase
+        Ok(())
     }
 }

+ 449 - 0
src/phase.rs

@@ -0,0 +1,449 @@
+use anyhow::{anyhow, Ok, Result};
+use crossbeam_channel::{unbounded, Receiver, Sender};
+use indicatif::MultiProgress;
+use pandora_lib_variants::{utils::new_pg_speed, variants::Variant};
+use log::{info, warn};
+use num_format::{CustomFormat, Grouping, ToFormattedString};
+use rayon::prelude::*;
+use rust_htslib::bam::IndexedReader;
+use std::{
+    cmp::Ordering,
+    collections::{HashSet, VecDeque},
+    thread::spawn,
+};
+use std::{fs::OpenOptions, io::Write};
+
+#[derive(Debug, Clone)]
+pub struct HeteroVar {
+    pub chr: String,
+    pub position: i32,
+    pub base: u8,
+    pub vaf: f64,
+    pub reads: HashSet<String>,
+}
+
+impl HeteroVar {
+    pub fn new(
+        bam: &mut IndexedReader,
+        chr: &str,
+        position: i32,
+        reference: u8,
+        alternative: u8,
+    ) -> Result<(HeteroVar, HeteroVar)> {
+        let rec_base = if let std::result::Result::Ok(rb) =
+            pileup::qnames_at_base(bam, chr, position, false)
+        {
+            rb
+        } else {
+            return Err(anyhow!("Error while reading BAM file."));
+        };
+        let depth = rec_base.len() as i32;
+        if depth == 0 {
+            return Err(anyhow!("No records"));
+        }
+        let mut a = (reference, Vec::new());
+        let mut b = (alternative, Vec::new());
+        for (record, base) in rec_base.into_iter() {
+            if base == reference {
+                a.1.push(record.clone());
+            } else if base == alternative {
+                b.1.push(record.clone());
+            }
+        }
+
+        let res: Vec<HeteroVar> = vec![a, b]
+            .into_iter()
+            .enumerate()
+            .filter(|(_, (_, rec))| rec.len() > 0)
+            .map(|(_, (base, records))| {
+                let mut records_ids: HashSet<String> = HashSet::new();
+                records_ids.extend(records.into_iter());
+                HeteroVar {
+                    chr: chr.to_string(),
+                    position,
+                    base,
+                    vaf: records_ids.len() as f64 / depth as f64,
+                    reads: records_ids,
+                }
+            })
+            .collect();
+
+        if res.len() == 2 {
+            let a = res.get(0).unwrap().to_owned();
+            let b = res.get(1).unwrap().to_owned();
+            return Ok((a, b));
+        } else {
+            return Err(anyhow!("Not enough reads"));
+        }
+    }
+}
+impl PartialEq for HeteroVar {
+    fn eq(&self, other: &Self) -> bool {
+        self.reads == other.reads
+            && self.chr == other.chr
+            && self.position == other.position
+            && self.base == other.base
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Phase {
+    pub data: Vec<HeteroVar>,
+    pub reads: HashSet<String>,
+}
+
+impl Phase {
+    pub fn new(hetero_var: &HeteroVar) -> Self {
+        Self {
+            data: vec![hetero_var.clone()],
+            reads: hetero_var.reads.clone(),
+        }
+    }
+    pub fn try_merge(&mut self, hetero_var: &HeteroVar, min_records: usize) -> bool {
+        if hetero_var
+            .reads
+            .par_iter()
+            .filter(|r| self.reads.contains(*r))
+            .count()
+            >= min_records
+        {
+            self.reads.extend(hetero_var.reads.clone().into_iter());
+            self.data.push(hetero_var.clone());
+            self.data.sort_by(|a, b| a.position.cmp(&b.position));
+            true
+        } else {
+            false
+        }
+    }
+
+    pub fn try_merge_phase(&mut self, phase: &Phase, min_records: usize) -> bool {
+        if phase
+            .reads
+            .par_iter()
+            .filter(|r| self.reads.contains(*r))
+            .count()
+            >= min_records
+        {
+            self.reads.extend(phase.reads.clone().into_iter());
+            self.data.extend(phase.data.clone().into_iter());
+            self.data.sort_by(|a, b| a.position.cmp(&b.position));
+            true
+        } else {
+            false
+        }
+    }
+
+    pub fn range(&self, bam: &mut IndexedReader) -> Result<(i64, i64, i64, i64)> {
+        let mut phase = self.clone();
+        phase.sort_dedup();
+        let data = &self.data;
+
+        let left = data.first().unwrap();
+        let right = data.last().unwrap();
+
+        let left_records = pileup::records_at_base(bam, &left.chr, left.position, true)?;
+        let right_records = pileup::records_at_base(bam, &right.chr, right.position, true)?;
+
+        let left_starts: Vec<_> = left_records
+            .iter()
+            .filter(|(_, b)| *b == left.base)
+            .map(|(r, _)| r.pos())
+            .collect();
+        let right_ends: Vec<_> = right_records
+            .iter()
+            .filter(|(_, b)| *b == right.base)
+            .map(|(r, _)| r.pos() + r.seq_len() as i64)
+            .collect();
+
+        let min = left_starts.iter().min();
+        let min_cov = left_starts.iter().max();
+        let max_cov = right_ends.iter().min();
+        let max = right_ends.iter().max();
+
+        if min.is_some() && min_cov.is_some() && max.is_some() && max_cov.is_some() {
+            Ok((
+                *min.unwrap(),
+                *min_cov.unwrap(),
+                *max_cov.unwrap(),
+                *max.unwrap(),
+            ))
+        } else {
+            Err(anyhow!("problem"))
+        }
+    }
+
+    pub fn mean_vaf(&self) -> f64 {
+        let vafs: Vec<_> = self.data.iter().map(|s| s.vaf).collect();
+        vafs.iter().sum::<f64>() / vafs.len() as f64
+    }
+
+    pub fn sort_dedup(&mut self) {
+        self.data.sort_by(|a, b| a.position.cmp(&b.position));
+        self.data
+            .dedup_by(|a, b| a.position == b.position && a.chr == b.chr && a.base == b.base);
+    }
+
+    pub fn bed_string(&self, bam: &mut IndexedReader) -> Result<String> {
+        let first = self.data.first().unwrap();
+        let (min, min_cov, max_cov, max) = self.range(bam)?;
+        Ok(vec![
+            first.chr.to_string(),
+            min_cov.to_string(),
+            max_cov.to_string(),
+            self.mean_vaf().to_string(),
+        ]
+        .join("\t"))
+    }
+}
+
+impl Ord for Phase {
+    fn cmp(&self, other: &Self) -> Ordering {
+        let s = self.data.first().unwrap();
+        let other = other.data.first().unwrap();
+        s.position.cmp(&other.position)
+    }
+}
+
+impl PartialOrd for Phase {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl PartialEq for Phase {
+    fn eq(&self, other: &Self) -> bool {
+        self.reads == other.reads
+        // self.data.iter().filter(|s| other.data.contains(s)).count() == other.data.len()
+    }
+}
+
+impl Eq for Phase {}
+
+pub fn merge_phases(
+    phases: Vec<Phase>,
+    min_records: usize,
+    takes_n: usize,
+    multi: &MultiProgress,
+) -> Result<Vec<Phase>> {
+    let pg = multi.add(new_pg_speed(phases.len() as u64));
+    pg.set_message(format!("Phasing by {takes_n}"));
+
+    let mut merged_phases = Vec::new();
+    let mut round_merges = 0;
+
+    let mut phases = VecDeque::from_iter(phases.into_iter());
+    while let Some(phase) = phases.pop_front() {
+        let (s, r) = spawn_phase(&phase, min_records);
+        phases
+            .par_iter()
+            .take(takes_n)
+            .enumerate()
+            .for_each(|(i, p)| s.send(MessagesIn::Phase((i, p.clone()))).unwrap());
+
+        s.send(MessagesIn::Return)?;
+
+        let mut merged_ids = Vec::new();
+        loop {
+            match r.recv() {
+                std::result::Result::Ok(msg) => match msg {
+                    MessagesOut::Phase(p) => {
+                        merged_phases.push(p);
+                        break;
+                    }
+                    MessagesOut::Merged((i, b)) => {
+                        if b {
+                            round_merges += 1;
+                            merged_ids.push(i);
+                        }
+                    }
+                },
+                Err(e) => warn!("{e}"),
+            }
+        }
+
+        let n_merged = merged_ids.len();
+        if n_merged > 0 {
+            info!("Merged {}", merged_ids.len());
+        }
+
+        // Remove merged from phases.
+        pg.set_length(pg.length().unwrap() as u64 - merged_ids.len() as u64);
+        phases = phases
+            .iter()
+            .enumerate()
+            .filter(|(i, _)| !merged_ids.contains(i))
+            .map(|(_, p)| p.to_owned())
+            .collect();
+        merged_ids.clear();
+
+        if phases.len() == 0 && round_merges > 0 {
+            phases.extend(merged_phases.clone().into_iter());
+            merged_phases.clear();
+            warn!("Round merges {round_merges}");
+            round_merges = 0;
+            pg.reset();
+            pg.set_length(phases.len() as u64);
+        }
+        pg.inc(1);
+    }
+    pg.finish();
+    multi.remove(&pg);
+
+    let phases = merged_phases.clone();
+
+    Ok(phases)
+}
+
+pub fn variants_phasing(
+    variants: Vec<Variant>,
+    bam_path: &str,
+    min_records: usize,
+    multi: &MultiProgress,
+) -> Vec<Phase> {
+    info!("{} variants to analyse.", variants.len());
+    let pg = multi.add(new_pg_speed(variants.len() as u64));
+    pg.set_message("Parsing reads at SNPs positions");
+    let mut phases = variants
+        .par_chunks(2_000)
+        .flat_map(|chunks| {
+            let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+            let mut errors = Vec::new();
+            let mut phases: Vec<Phase> = Vec::new();
+            for v in chunks {
+                pg.inc(1);
+                let mut reference = v.reference.to_string().as_bytes().to_vec();
+                let mut altenative = v.alternative.to_string().as_bytes().to_vec();
+                if reference.len() != 1 && altenative.len() != 1 {
+                    continue;
+                }
+                match HeteroVar::new(
+                    &mut bam,
+                    &v.contig.to_string(),
+                    v.position as i32,
+                    reference.pop().unwrap(),
+                    altenative.pop().unwrap(),
+                ) {
+                    std::result::Result::Ok((a, b)) => {
+                        phases.push(Phase::new(&a));
+                        phases.push(Phase::new(&b));
+                    }
+                    Err(e) => errors.push(e),
+                }
+            }
+            phases
+        })
+        .collect::<Vec<Phase>>();
+
+    pg.finish();
+    multi.remove(&pg);
+    phases.sort();
+    phases.dedup();
+
+    let mut phases: Vec<Phase> = phases
+        .par_chunks(phases.len() / 75)
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 5, &multi).unwrap())
+        .collect();
+    phases.sort();
+    let phases: Vec<Phase> = phases
+        .par_chunks(phases.len() / 50)
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 100, &multi).unwrap())
+        .collect();
+    let phases: Vec<Phase> = phases
+        .par_chunks(phases.len() / 25)
+        .flat_map(|chunks| merge_phases(chunks.to_vec(), min_records, 1000, &multi).unwrap())
+        .collect();
+
+    phases
+}
+
+pub fn write_phases_bed(
+    phases: &Vec<Phase>,
+    min_var: usize,
+    bam_path: &str,
+    contig: &str,
+    file: &str,
+) -> Result<()> {
+    let format = CustomFormat::builder()
+        .grouping(Grouping::Standard)
+        .minus_sign("-")
+        .separator(",")
+        .build()
+        .unwrap();
+
+    let mut ranges: Vec<_> = phases
+        .par_chunks(100)
+        .flat_map(|chunks| {
+            let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+            chunks
+                .to_vec()
+                .iter()
+                .map(|p| (p.range(&mut bam), p.mean_vaf(), p.data.len()))
+                .collect::<Vec<_>>()
+        })
+        .filter(|(r, _, _)| r.is_ok())
+        .map(|(r, v, n)| (r.unwrap(), v, n))
+        .map(|((min, min_cov, max_cov, max), v, n)| (min..=max, min_cov..=max_cov, v, n))
+        .collect();
+
+    ranges.sort_by(|(a, _, _, _), (b, _, _, _)| a.start().cmp(&b.start()));
+    let mut w = OpenOptions::new()
+        .read(true)
+        .write(true)
+        .create(true)
+        .append(true)
+        .open(file)?;
+
+    for (_i, (range, _cov, vaf, n)) in ranges.iter().enumerate() {
+        if *n < min_var {
+            continue;
+        }
+        let line = vec![
+            contig.to_string(),
+            range.start().to_string(),
+            range.end().to_string(),
+            format!(
+                "{} {} {}",
+                (range.end() - range.start()).to_formatted_string(&format),
+                n,
+                vaf
+            ),
+        ]
+        .join("\t");
+        writeln!(&mut w, "{line}")?;
+    }
+    Ok(())
+}
+
+pub enum MessagesIn {
+    Phase((usize, Phase)),
+    Return,
+}
+pub enum MessagesOut {
+    Phase(Phase),
+    Merged((usize, bool)),
+}
+
+pub fn spawn_phase(
+    phase: &Phase,
+    min_records: usize,
+) -> (Sender<MessagesIn>, Receiver<MessagesOut>) {
+    let (s, r) = unbounded::<MessagesIn>();
+    let (ss, rr) = unbounded::<MessagesOut>();
+    let mut phase = phase.clone();
+    spawn(move || loop {
+        match r.recv() {
+            std::result::Result::Ok(msg) => match msg {
+                MessagesIn::Phase((i, p)) => {
+                    let res = phase.try_merge_phase(&p, min_records);
+                    ss.send(MessagesOut::Merged((i, res))).unwrap();
+                }
+                MessagesIn::Return => {
+                    ss.send(MessagesOut::Phase(phase)).unwrap();
+                    break;
+                }
+            },
+            Err(e) => panic!("Err {e}"),
+        };
+    });
+    (s, rr)
+}