Browse Source

tidy all ok

Thomas 1 year ago
parent
commit
cb40a1a719
3 changed files with 136 additions and 353 deletions
  1. 1 18
      src/bam.rs
  2. 3 4
      src/bin.rs
  3. 132 331
      src/lib.rs

+ 1 - 18
src/bam.rs

@@ -1,11 +1,6 @@
-use std::{collections::HashMap, u32};
-
-use anyhow::Context;
 use log::warn;
 use rust_htslib::bam::{
-    ext::BamRecordExtensions,
-    record::{Aux, Cigar, CigarString},
-    Header, HeaderView, IndexedReader, Read, Record,
+    ext::BamRecordExtensions, record::Aux, HeaderView, IndexedReader, Read, Record,
 };
 
 pub fn primary_record(bam: &mut IndexedReader, record: Record) -> Record {
@@ -82,15 +77,3 @@ pub fn get_all_positions(
     positions.dedup();
     Ok(positions)
 }
-
-pub fn create_tid_2_contig(bam_path: &str) -> anyhow::Result<HashMap<i32, String>> {
-    let bam_reader = rust_htslib::bam::IndexedReader::from_path(bam_path)
-        .context(format!("Can't open {}", bam_path))?;
-    let h = bam_reader.header();
-    let mut r = HashMap::new();
-    h.target_names().iter().enumerate().for_each(|(i, name)| {
-        let tid = (i + 1) as i32;
-        r.insert(tid, String::from_utf8(name.to_vec()).unwrap());
-    });
-    Ok(r)
-}

+ 3 - 4
src/bin.rs

@@ -1,8 +1,7 @@
-use rayon::prelude::*;
-use std::{collections::HashMap, usize};
-
 use anyhow::Context;
+use rayon::prelude::*;
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
+use std::{collections::HashMap, usize};
 
 use crate::bam::primary_record;
 
@@ -10,7 +9,7 @@ use crate::bam::primary_record;
 #[derive(Debug)]
 pub struct Bin {
     pub contig: String,
-    pub start: u32, // 0-based inclusif
+    pub start: u32, // 0-based inclusive
     pub end: u32,
     pub reads_store: HashMap<Vec<u8>, Record>,
     pub bam_reader: IndexedReader,

+ 132 - 331
src/lib.rs

@@ -1,160 +1,131 @@
+use crate::{
+    bam::get_all_positions,
+    bin::{scan_outliers, Bin},
+};
+use log::info;
+use rust_htslib::bam::{Format, Header, Read, Record, Writer};
+use std::{
+    collections::{HashMap, HashSet},
+    fs,
+};
+
 pub mod bam;
 pub mod bin;
 
-// fn get_se_diag_mrd(
-//     diag_bam_path: &str,
-//     mrd_bam_path: &str,
-//     chr: &str,
-//     start: i32,
-//     stop: i32,
-//     mapq: u8,
-//     name: String,
-// ) {
-//     let min_reads = 3;
-//     let bin_size = 1_000;
-//     let n_bins = stop - start;
-//
-//     let mut se_raw = vec![(0, 0)];
-//     se_raw.resize(n_bins as usize, (0, 0));
-//
-//     let get_pos = |i: usize| -> i32 { start + (i as i32 * bin_size as i32) };
-//     let se_raw: Vec<(i32, i32)> = se_raw
-//         .par_chunks(bin_size)
-//         .enumerate()
-//         .flat_map(|(i, _)| {
-//             let mut diag_bam = rust_htslib::bam::IndexedReader::from_path(diag_bam_path)
-//                 .context(anyhow!("Reading {}", diag_bam_path))
-//                 .unwrap();
-//             let mut mrd_bam = rust_htslib::bam::IndexedReader::from_path(mrd_bam_path)
-//                 .context(anyhow!("Reading {}", mrd_bam_path))
-//                 .unwrap();
-//
-//             let s = get_pos(i);
-//             let e = s + bin_size as i32;
-//             let d = get_start_end_qual(&mut diag_bam, chr, s, e, mapq).unwrap();
-//             let m = get_start_end_qual(&mut mrd_bam, chr, s, e, mapq).unwrap();
-//
-//             d.iter()
-//                 .zip(m.iter())
-//                 .map(|(d, m)| (*d, *m))
-//                 .collect::<Vec<(i32, i32)>>()
-//         })
-//         .collect();
-//
-//     let (diag_sum, mrd_sum) = se_raw
-//         .iter()
-//         .fold((0i32, 0i32), |acc, (d, m)| (acc.0 + d, acc.1 + m));
-//
-//     let diag = se_raw.iter().map(|(d, _)| *d as f64).collect::<Vec<f64>>();
-//     let diag_mean = mean(&diag).unwrap();
-//     let diag_std = std_deviation(&diag).unwrap();
-//
-//     let mrd = se_raw.iter().map(|(_, m)| *m as f64).collect::<Vec<f64>>();
-//     let mrd_mean = mean(&mrd).unwrap();
-//     let mrd_std = std_deviation(&mrd).unwrap();
-//
-//     info!(
-//         "N/nt diag {} mrd {}",
-//         diag_sum as f64 / (stop - start) as f64,
-//         mrd_sum as f64 / (stop - start) as f64
-//     );
-//     info!("Mean diag {diag_mean} mrd {mrd_mean}");
-//     info!("std dev diag {diag_std} mrd {mrd_std}");
-//
-//     let ratio_se: Vec<f64> = diag
-//         .iter()
-//         .zip(mrd.iter())
-//         .map(|(d, m)| (((d / diag_mean) + 1.0) / ((m / mrd_mean) + 1.0)) - 1.0)
-//         .collect();
-//
-//     let r_stddev = std_deviation(&ratio_se).unwrap();
-//     info!("ratio mean {} std dev {r_stddev}", mean(&ratio_se).unwrap());
-//
-//     let all: Vec<_> = se_raw
-//         .into_iter()
-//         .enumerate()
-//         .zip(ratio_se.iter())
-//         .filter(|((_i, (d, m)), _r)| *d > 3 && *m == 0)
-//         .collect();
-//
-//     info!("{} locations to assemble", all.len());
-//
-//     // getting primary reads from bam at given positions
-//     let se_data: Vec<(Vec<Record>, Vec<Record>)> = all
-//         .par_chunks(100)
-//         .flat_map(|chunks| {
-//             // Loading bam reader.
-//             let mut diag_bam = rust_htslib::bam::IndexedReader::from_path(diag_bam_path)
-//                 .context(anyhow!("Reading {}", diag_bam_path))
-//                 .unwrap();
-//             chunks
-//                 .to_vec()
-//                 .iter()
-//                 .map(|((i, (d, m)), r)| {
-//                     let pos = *i as i32 + start;
-//                     let (mut s, mut e) =
-//                         pileup::get_start_end_qual_rec(&mut diag_bam, chr, pos, pos + 1, mapq)
-//                             .unwrap();
-//                     let s = s.pop().unwrap();
-//                     let s = pileup::swap_by_primary(diag_bam_path, s);
-//                     let e = e.pop().unwrap();
-//                     let e = pileup::swap_by_primary(diag_bam_path, e);
-//                     // info!("{chr}:{pos}\t{r}\t{d} (s:{} e:{:?})\t{m}", s.len(), e.len());
-//                     // println!("e {e:?}");
-//                     // println!("s {s:?}");
-//
-//                     (s, e)
-//                 })
-//                 .collect::<Vec<(Vec<Record>, Vec<Record>)>>()
-//         })
-//         .collect();
-// }
-//
-fn mean(data: &[f64]) -> Option<f64> {
-    let sum = data.iter().sum::<f64>();
-    let count = data.len() as f64;
-
-    match count {
-        positive if positive > 0.0 => Some(sum / count),
-        _ => None,
+pub fn scan_save(
+    diag_bam_path: &str,
+    contig: &str,
+    start: u32,
+    end: u32,
+    bin_length: u32,
+    min_reads: usize,
+    out_dir: &str,
+) -> anyhow::Result<()> {
+    let outliers_records: Vec<Vec<Record>> =
+        scan_outliers(diag_bam_path, contig, start, end, bin_length)
+            .iter()
+            .filter(|(_, _, _, sa_outlier, _, se_outlier)| *sa_outlier || *se_outlier)
+            .map(|(start, _n, _sa, sa_outlier, _se, se_outlier)| {
+                let mut bin = Bin::new(diag_bam_path, contig, *start, bin_length).unwrap();
+                let mut records = Vec::new();
+                if *sa_outlier {
+                    records.extend(bin.sa_primary());
+                } else if *se_outlier {
+                    let (pos, _) = bin.max_start_or_end();
+                    records.extend(bin.se_primary(pos));
+                };
+                // info!(
+                //     "{}:{start}-{}\t{n}\t{sa}\t{se}\t{}",
+                //     contig,
+                //     start + bin_length - 1,
+                //     records.len()
+                // );
+                records
+            })
+            .collect();
+    info!("n group of records {}", outliers_records.len());
+
+    let mut bam = rust_htslib::bam::IndexedReader::from_path(diag_bam_path).unwrap();
+    let header = bam.header().to_owned();
+    let mut grouped_records = Vec::new();
+    for outlier_record in outliers_records {
+        let mut hm_positions: HashMap<String, Vec<String>> = HashMap::new();
+        outlier_record.iter().for_each(|r| {
+            for pos in get_all_positions(r, &header, &mut bam).unwrap() {
+                let qname = String::from_utf8(r.qname().to_vec()).unwrap();
+                hm_positions
+                    .entry(format!("{}-{}", pos.0, pos.1))
+                    .or_default()
+                    .push(qname.clone());
+                hm_positions
+                    .entry(format!("{}-{}", pos.0, pos.2))
+                    .or_default()
+                    .push(qname);
+            }
+        });
+        for v in hm_positions.values_mut() {
+            v.sort(); // Ensure dedup works correctly
+            v.dedup();
+
+            if v.len() >= min_reads {
+                let rec: Vec<_> = outlier_record
+                    .clone()
+                    .into_iter()
+                    .filter(|r| {
+                        let qname = String::from_utf8(r.qname().to_vec()).unwrap();
+                        v.contains(&qname)
+                    })
+                    .collect();
+                grouped_records.push(rec);
+            }
+        }
     }
-}
-fn std_deviation(data: &[f64]) -> Option<f64> {
-    match (mean(data), data.len() as f64) {
-        (Some(data_mean), count) if count > 0.0 => {
-            let variance = data
+
+    let mut dedup = HashSet::new();
+
+    let mut n_records = 0;
+    let grouped_records: Vec<Vec<Record>> = grouped_records
+        .iter()
+        .filter(|r| {
+            let mut s: Vec<String> = r
                 .iter()
-                .map(|value| {
-                    let diff = data_mean - *value;
-                    diff * diff
-                })
-                .sum::<f64>()
-                / count;
+                .map(|rr| String::from_utf8(rr.qname().to_vec()).unwrap())
+                .collect();
 
-            Some(variance.sqrt())
+            s.sort();
+            let s = s.join("//");
+            if dedup.contains(&s) {
+                false
+            } else {
+                n_records += s.len();
+                dedup.insert(s);
+                true
+            }
+        })
+        .map(|r| r.to_vec())
+        .collect();
+
+    fs::create_dir_all(out_dir).unwrap();
+
+    info!(
+        "{} reads to assemble in {} groups",
+        n_records,
+        grouped_records.len()
+    );
+
+    let header = Header::from_template(&header);
+    for records in grouped_records {
+        let output_bam_path = format!("{out_dir}/{}.bam", uuid::Uuid::new_v4());
+        let mut output_bam = Writer::from_path(output_bam_path, &header, Format::Bam).unwrap();
+        for record in records {
+            output_bam.write(&record)?;
         }
-        _ => None,
     }
+    Ok(())
 }
+
 #[cfg(test)]
 mod tests {
-    use anyhow::Ok;
-    use log::{info, warn};
-    use rust_htslib::bam::{record, Format, Header, Read, Record, Writer};
-    use std::{
-        collections::{HashMap, HashSet},
-        fs,
-        iter::FromIterator,
-        path::Path,
-    };
-
-    use crate::bam::{create_tid_2_contig, get_all_positions};
-
-    use self::bin::{
-        filter_outliers_modified_z_score, filter_outliers_modified_z_score_with_indices, scan,
-        scan_outliers, Bin,
-    };
-
     use super::*;
 
     fn init() {
@@ -163,200 +134,30 @@ mod tests {
             .try_init();
     }
     #[test]
-    fn bin() -> anyhow::Result<()> {
-        init();
-        let id = "SALICETTO";
-        let contig = "chr10";
-        let start = 91_000_000;
-        let start = 91902888 - 10;
-        // let stop = 104_000_000;
-
-        let bin_length = 1000;
-
-        let result_dir = "/data/longreads_basic_pipe";
-        let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
-
-        let bin = Bin::new(&diag_bam_path, contig, start, bin_length)?;
-        info!(
-            "{}:{}-{}\tn_reads: {}\tn_sa: {}\tmax_start_end: {}",
-            bin.contig,
-            bin.start,
-            bin.end,
-            bin.n_reads(),
-            bin.n_sa(),
-            bin.max_start_or_end().1
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn par() {
-        init();
-        let id = "SALICETTO";
-        let contig = "chr10";
-        let start = 91_000_000;
-        // let start = 91902888 - 10;
-        let end = start + 1_000_000;
-        // let stop = 104_000_000;
-
-        let bin_length = 1000;
-        let result_dir = "/data/longreads_basic_pipe";
-        let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
-
-        let res = scan(&diag_bam_path, contig, start, end, bin_length);
-        let indexed_values: Vec<(u32, f64, f64)> = res
-            .iter()
-            .map(|(index, n, sa, se)| (*index, *sa as f64 / *n as f64, se.1 as f64 / *n as f64))
-            .collect();
-
-        let (indices, sa_ratios, se_ratios): (Vec<u32>, Vec<f64>, Vec<f64>) = indexed_values
-            .into_iter()
-            // .map(|(index, sa_ratio, se_ratio)| (index, sa_ratio, se_ratio))
-            .fold(
-                (Vec::new(), Vec::new(), Vec::new()),
-                |(mut indices, mut sa_ratios, mut se_ratios), (index, sa_ratio, se_ratio)| {
-                    indices.push(index);
-                    sa_ratios.push(sa_ratio);
-                    se_ratios.push(se_ratio);
-                    (indices, sa_ratios, se_ratios)
-                },
-            );
-
-        let filtered_sa_indices =
-            filter_outliers_modified_z_score_with_indices(sa_ratios, indices.clone());
-        let filtered_se_indices = filter_outliers_modified_z_score_with_indices(se_ratios, indices);
-
-        const RESET: &str = "\x1b[0m"; // Reset to default color
-        const RED: &str = "\x1b[31m"; // Red text
-
-        res.iter().for_each(|(p, n, sa, se)| {
-            let se = se.1;
-            let sa = if filtered_sa_indices.contains(p) {
-                format!("{RED}{sa}{RESET}")
-            } else {
-                sa.to_string()
-            };
-            let se = if filtered_se_indices.contains(p) {
-                format!("{RED}{se}{RESET}")
-            } else {
-                se.to_string()
-            };
-            info!("{}:{}-{}\t{n}\t{sa}\t{se}", contig, p, p + bin_length - 1,);
-        });
-    }
-
-    #[test]
-    fn outliers() -> anyhow::Result<()> {
+    fn it_works() -> anyhow::Result<()> {
         init();
         let min_reads = 3;
         let id = "SALICETTO";
         let contig = "chr10";
         let start = 91_000_000;
-        // let start = 91902888 - 10;
         let end = start + 1_000_000;
-        // let stop = 104_000_000;
-
         let bin_length = 1000;
+
         let result_dir = "/data/longreads_basic_pipe";
         let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
-
-        let outliers_records: Vec<Vec<Record>> =
-            scan_outliers(&diag_bam_path, contig, start, end, bin_length)
-                .iter()
-                .filter(|(_, _, _, sa_outlier, _, se_outlier)| *sa_outlier || *se_outlier)
-                .map(|(start, n, sa, sa_outlier, se, se_outlier)| {
-                    let mut bin = Bin::new(&diag_bam_path, contig, *start, bin_length).unwrap();
-                    let mut records = Vec::new();
-                    if *sa_outlier {
-                        records.extend(bin.sa_primary());
-                    } else if *se_outlier {
-                        let (pos, _) = bin.max_start_or_end();
-                        records.extend(bin.se_primary(pos));
-                    };
-                    info!(
-                        "{}:{start}-{}\t{n}\t{sa}\t{se}\t{}",
-                        contig,
-                        start + bin_length - 1,
-                        records.len()
-                    );
-                    records
-                })
-                .collect();
-        info!("n group of records {}", outliers_records.len());
-
-        let mut bam = rust_htslib::bam::IndexedReader::from_path(&diag_bam_path).unwrap();
-        // let u = bam.header().clone();
-        let header = bam.header().to_owned();
-        let mut n_recors = 0;
-        let mut grouped_records = Vec::new();
-        for outlier_record in outliers_records {
-            n_recors += outlier_record.len();
-            let mut hm_positions: HashMap<String, Vec<String>> = HashMap::new();
-            outlier_record.iter().for_each(|r| {
-                for pos in get_all_positions(r, &header, &mut bam).unwrap() {
-                    let qname = String::from_utf8(r.qname().to_vec()).unwrap();
-                    hm_positions
-                        .entry(format!("{}-{}", pos.0, pos.1))
-                        .or_default()
-                        .push(qname.clone());
-                    hm_positions
-                        .entry(format!("{}-{}", pos.0, pos.2))
-                        .or_default()
-                        .push(qname);
-                }
-            });
-            for v in hm_positions.values_mut() {
-                v.sort(); // Ensure dedup works correctly
-                v.dedup();
-
-                if v.len() >= min_reads {
-                    let rec: Vec<_> = outlier_record
-                        .clone()
-                        .into_iter()
-                        .filter(|r| {
-                            let qname = String::from_utf8(r.qname().to_vec()).unwrap();
-                            v.contains(&qname)
-                        })
-                        .collect();
-                    grouped_records.push(rec);
-                }
-            }
-        }
-        println!("{grouped_records:#?}");
-        println!("n records: {n_recors}");
-
-        let header = Header::from_template(&header);
-
-        let tmp_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
-        fs::create_dir_all(&tmp_dir).unwrap();
-        let mut dedup = HashSet::new();
-
-        let grouped_records: Vec<Vec<Record>> = grouped_records
-            .iter()
-            .filter(|r| {
-                let mut s: Vec<String> = r
-                    .iter()
-                    .map(|rr| String::from_utf8(rr.qname().to_vec()).unwrap())
-                    .collect();
-                s.sort();
-                let s = s.join("//");
-                if dedup.contains(&s) {
-                    false
-                } else {
-                    dedup.insert(s);
-                    true
-                }
-            })
-            .map(|r| r.to_vec())
-            .collect();
-        for records in grouped_records {
-            // Create a new BAM writer with the new header
-            let output_bam_path = format!("{tmp_dir}/{}.bam", uuid::Uuid::new_v4());
-            let mut output_bam = Writer::from_path(output_bam_path, &header, Format::Bam).unwrap();
-            for record in records {
-                output_bam.write(&record)?;
-            }
-        }
+        let out_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
+
+        scan_save(
+            &diag_bam_path,
+            contig,
+            start,
+            end,
+            bin_length,
+            min_reads,
+            &out_dir,
+        )?;
+        let n_files_out = fs::read_dir(out_dir).into_iter().count();
+        println!("{n_files_out}");
         Ok(())
     }
 }