Thomas 1 year ago
parent
commit
e768dbae54
2 changed files with 156 additions and 51 deletions
  1. 19 5
      src/bin.rs
  2. 137 46
      src/lib.rs

+ 19 - 5
src/bin.rs

@@ -1,4 +1,5 @@
 use anyhow::Context;
+use log::warn;
 use rayon::prelude::*;
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
 use std::{collections::HashMap, usize};
@@ -24,6 +25,7 @@ impl Bin {
         // Inclusive range
         let end = start + length - 1;
 
+        // println!("bin {contig}:{start}-{end}");
         // Fetch the required positions
         bam_reader.fetch((contig, start, end))?;
 
@@ -237,15 +239,24 @@ pub fn scan_outliers(
         starts.push(current);
         current += length;
     }
+    // println!("group {contig}:{}-{}", starts.first().unwrap(), starts.last().unwrap() + length - 1);
 
     let ratios: Vec<(u32, usize, f64, f64)> = starts
         .into_par_iter()
         .filter_map(|start| {
-            if let Ok(bin) = Bin::new(bam_path, contig, start, length) {
-                let n = bin.n_reads();
-                let n_f = n as f64;
-                let (_, se) = bin.max_start_or_end();
-                return Some((start, n, bin.n_sa() as f64 / n_f, se as f64 / n_f));
+            match  Bin::new(bam_path, contig, start, length) {
+                Ok(bin) => {
+                    let n = bin.n_reads();
+                    let (_, se) = bin.max_start_or_end();
+                    let (r_sa, r_se) = if n > 0 {
+                        let n_f = n as f64;
+                        (bin.n_sa() as f64 / n_f, se as f64 / n_f)
+                    } else {
+                        (0.0, 0.0)
+                    };
+                    return Some((start, n, r_sa, r_se));
+                },
+                Err(e) => warn!("{e}"),
             }
             None
         })
@@ -276,6 +287,9 @@ pub fn scan_outliers(
         filter_outliers_modified_z_score_with_indices(sa_ratios, indices.clone());
     let filtered_se_indices = filter_outliers_modified_z_score_with_indices(se_ratios, indices);
 
+    // if !filtered_sa_indices.is_empty() {
+    //     warn!("{filtered_sa_indices:?}");
+    // }
     ratios
         .iter()
         .map(|(p, n, sa, se)| {

+ 137 - 46
src/lib.rs

@@ -2,7 +2,9 @@ use crate::{
     bam::get_all_positions,
     bin::{scan_outliers, Bin},
 };
+use anyhow::Context;
 use log::info;
+use rayon::prelude::*;
 use rust_htslib::bam::{Format, Header, Read, Record, Writer};
 use std::{
     collections::{HashMap, HashSet},
@@ -13,26 +15,43 @@ use std::{
 pub mod bam;
 pub mod bin;
 
-#[allow(clippy::too_many_arguments)]
+#[derive(Debug)]
+pub struct Config {
+    bin_length: u32,
+    min_reads: usize,
+}
+
+impl Default for Config {
+    fn default() -> Self {
+        Self {
+            bin_length: 1_000,
+            min_reads: 3,
+        }
+    }
+}
+
 pub fn scan_save(
-    diag_bam_path: &str,
+    bam_path: &str,
     contig: &str,
     start: u32,
     end: u32,
-    bin_length: u32,
-    min_reads: usize,
-    out_scan_file: &str,
+    writer: &mut BufWriter<File>,
     out_dir_bam: &str,
+    config: &Config,
 ) -> anyhow::Result<()> {
-    let out_scan = File::create(out_scan_file).expect("Failed to create file");
-    let mut writer = BufWriter::new(out_scan);
+    let Config {
+        bin_length,
+        min_reads,
+    } = *config;
+
+    info!("scan {contig}:{start}-{end}");
 
     let outliers_records: Vec<Vec<Record>> =
-        scan_outliers(diag_bam_path, contig, start, end, bin_length)
+        scan_outliers(bam_path, contig, start, end, bin_length)
             .iter()
             .filter(|(_, _, _, sa_outlier, _, se_outlier)| *sa_outlier || *se_outlier)
             .map(|(start, n, sa, sa_outlier, se, se_outlier)| {
-                let mut bin = Bin::new(diag_bam_path, contig, *start, bin_length).unwrap();
+                let mut bin = Bin::new(bam_path, contig, *start, bin_length).unwrap();
                 let mut records = Vec::new();
                 if *sa_outlier {
                     records.extend(bin.sa_primary());
@@ -40,22 +59,29 @@ pub fn scan_save(
                     let (pos, _) = bin.max_start_or_end();
                     records.extend(bin.se_primary(pos));
                 };
-                writeln!(
-                    writer,
+                writer.write_all(format!(
                     "{}:{start}-{}\t{n}\t{sa}\t{}\t{se}\t{}\t{}",
                     contig,
                     start + bin_length - 1,
                     sa_outlier,
                     se_outlier,
                     records.len(),
-                )
-                .unwrap();
+                ).as_bytes()).unwrap();
+                // writeln!(
+                //     writer,
+                //     "{}:{start}-{}\t{n}\t{sa}\t{}\t{se}\t{}\t{}",
+                //     contig,
+                //     start + bin_length - 1,
+                //     sa_outlier,
+                //     se_outlier,
+                //     records.len(),
+                // )
+                // .unwrap();
                 records
             })
             .collect();
-    info!("n group of records {}", outliers_records.len());
 
-    let mut bam = rust_htslib::bam::IndexedReader::from_path(diag_bam_path).unwrap();
+    let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
     let header = bam.header().to_owned();
     let mut grouped_records = Vec::new();
     for outlier_record in outliers_records {
@@ -117,11 +143,13 @@ pub fn scan_save(
 
     fs::create_dir_all(out_dir_bam).unwrap();
 
-    info!(
-        "{} reads to assemble in {} groups",
-        n_records,
-        grouped_records.len()
-    );
+    if !grouped_records.is_empty() {
+        info!(
+            "{} reads to assemble in {} groups",
+            n_records,
+            grouped_records.len()
+        );
+    }
 
     let header = Header::from_template(&header);
     for records in grouped_records {
@@ -134,6 +162,60 @@ pub fn scan_save(
     Ok(())
 }
 
+pub fn par_whole_scan(contig: &str, bam_path: &str, out_dir: &str) -> anyhow::Result<()> {
+    let bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+    let header_view = bam.header();
+    let contig_tid = header_view
+        .tid(contig.as_bytes())
+        .context("Contig not found")?;
+    let contig_len = header_view
+        .target_len(contig_tid)
+        .context("Contig length not found")? as u32;
+    let chunk_size = 1_000_u32;
+    let config = Config::default();
+    let n_chunks = contig_len / config.bin_length; // the last chunk wont be scanned
+    let i: Vec<Vec<u32>> = (0..n_chunks)
+        .collect::<Vec<_>>()
+        .chunks(100)
+        .map(|chunk| chunk.to_vec())
+        .collect();
+
+    assert!(
+        contig_len - i[i.len() - 1][i[i.len() - 1].len() - 1] * config.bin_length < 2 * chunk_size
+    );
+    info!("{} scan chunks to run", i.len());
+
+    let bam_out = format!("{out_dir}/reads/{contig}");
+    fs::create_dir_all(&bam_out)?;
+    let file_out = format!("{out_dir}/counts/{contig}");
+    fs::create_dir_all(&file_out)?;
+    // let u = i.get(1..2).unwrap();
+    i.par_iter().enumerate().for_each(|(i, chunk)| {
+        info!("Chunk scan {i} started");
+
+        let out_scan_file = format!("{file_out}/{i:03}_counts.tsv");
+        let out_scan = File::create(out_scan_file).expect("Failed to create file");
+        let mut writer = BufWriter::new(out_scan);
+        let start = *chunk.first().unwrap() * config.bin_length;
+        let end = *chunk.last().unwrap() * config.bin_length;
+        scan_save(
+            bam_path,
+            contig,
+            start,
+            end,
+            &mut writer,
+            &bam_out,
+            &config,
+        )
+            .unwrap();
+
+        writer.flush().unwrap();
+        info!("Chunk scan {i} finished");
+    });
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -143,33 +225,42 @@ mod tests {
             .is_test(true)
             .try_init();
     }
+    // #[test]
+    // fn it_works() -> anyhow::Result<()> {
+    //     init();
+    //     let id = "SALICETTO";
+    //     let contig = "chr10";
+    //     let start = 91_000_000;
+    //     let end = start + 1_000_000;
+    //
+    //     let result_dir = "/data/longreads_basic_pipe";
+    //     let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
+    //     let out_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
+    //     let out_scan_file = "/data/tmp/scan.tsv";
+    //
+    //     scan_save(
+    //         &diag_bam_path,
+    //         contig,
+    //         start,
+    //         end,
+    //         out_scan_file,
+    //         &out_dir,
+    //         &Config::default(),
+    //     )?;
+    //     let n_files_out = fs::read_dir(out_dir).into_iter().count();
+    //     println!("{n_files_out}");
+    //     Ok(())
+    // }
     #[test]
-    fn it_works() -> anyhow::Result<()> {
+    fn par() {
         init();
-        let min_reads = 3;
-        let id = "SALICETTO";
-        let contig = "chr10";
-        let start = 91_000_000;
-        let end = start + 1_000_000;
-        let bin_length = 1000;
-
-        let result_dir = "/data/longreads_basic_pipe";
-        let diag_bam_path = format!("{result_dir}/{id}/diag/{id}_diag_hs1.bam");
-        let out_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
-        let out_scan_file = "/data/tmp/scan.tsv";
-
-        scan_save(
-            &diag_bam_path,
-            contig,
-            start,
-            end,
-            bin_length,
-            min_reads,
-            out_scan_file,
-            &out_dir,
-        )?;
-        let n_files_out = fs::read_dir(out_dir).into_iter().count();
-        println!("{n_files_out}");
-        Ok(())
+        let tmp_dir = format!("/data/tmp/scan_{}", uuid::Uuid::new_v4());
+        fs::create_dir_all(&tmp_dir).unwrap();
+        info!("Creating {tmp_dir}");
+        par_whole_scan(
+            "chr9",
+            "/data/longreads_basic_pipe/SALICETTO/diag/SALICETTO_diag_hs1.bam",
+            &tmp_dir,
+        );
     }
 }