Thomas 1 жил өмнө
parent
commit
9ec2f6f4e2
5 өөрчлөгдсөн 234 нэмэгдсэн , 39 устгасан
  1. 56 0
      Cargo.lock
  2. 1 0
      Cargo.toml
  3. 93 2
      src/bam.rs
  4. 17 15
      src/bin.rs
  5. 67 22
      src/lib.rs

+ 56 - 0
Cargo.lock

@@ -338,6 +338,21 @@ version = "0.1.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
 
+[[package]]
+name = "dashmap"
+version = "6.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28"
+dependencies = [
+ "cfg-if",
+ "crossbeam-utils",
+ "hashbrown",
+ "lock_api",
+ "once_cell",
+ "parking_lot_core",
+ "rayon",
+]
+
 [[package]]
 name = "derive-new"
 version = "0.5.9"
@@ -476,6 +491,12 @@ dependencies = [
  "byteorder",
 ]
 
+[[package]]
+name = "hashbrown"
+version = "0.14.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
+
 [[package]]
 name = "heapless"
 version = "0.7.17"
@@ -777,6 +798,12 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
 
+[[package]]
+name = "once_cell"
+version = "1.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
+
 [[package]]
 name = "openssl-src"
 version = "300.3.1+3.3.1"
@@ -821,6 +848,7 @@ dependencies = [
  "anyhow",
  "crossbeam-channel",
  "csv",
+ "dashmap",
  "env_logger",
  "flate2",
  "indicatif",
@@ -835,6 +863,19 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "parking_lot_core"
+version = "0.9.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "redox_syscall",
+ "smallvec",
+ "windows-targets",
+]
+
 [[package]]
 name = "percent-encoding"
 version = "2.3.1"
@@ -910,6 +951,15 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "redox_syscall"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4"
+dependencies = [
+ "bitflags",
+]
+
 [[package]]
 name = "regex"
 version = "1.10.6"
@@ -1041,6 +1091,12 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
 
+[[package]]
+name = "smallvec"
+version = "1.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
+
 [[package]]
 name = "spin"
 version = "0.9.8"

+ 1 - 0
Cargo.toml

@@ -20,4 +20,5 @@ serde = { version = "1.0.*", default-features = false }
 postcard = { version = "1.0.8", features = ["alloc"] }
 flate2 = "1.0.31"
 csv = "1.3.0"
+dashmap = { version = "6.0.1", features = ["rayon"] }
 

+ 93 - 2
src/bam.rs

@@ -2,6 +2,7 @@ use log::warn;
 use rust_htslib::bam::{
     ext::BamRecordExtensions, record::Aux, HeaderView, IndexedReader, Read, Record,
 };
+use std::collections::HashMap;
 
 pub fn primary_record(bam: &mut IndexedReader, record: Record) -> Record {
     if record.is_supplementary() {
@@ -32,11 +33,101 @@ pub fn primary_record(bam: &mut IndexedReader, record: Record) -> Record {
     record
 }
 
+pub fn primary_records(bam: &mut IndexedReader, records: Vec<Record>) -> Vec<Record> {
+    let mut res = Vec::new();
+    let mut all_positions = Vec::new();
+    let mut all_qnames_to_fetch = Vec::new();
+    for record in records.iter() {
+        if record.is_supplementary() {
+            let qname = record.qname();
+
+            if let Ok(Aux::String(sa)) = record.aux(b"SA") {
+                let positions: Vec<(&str, i32)> = sa
+                    .split(';')
+                    .filter(|s| !s.is_empty())
+                    .map(|s| {
+                        let parts: Vec<&str> = s.split(',').take(2).collect();
+                        let chr = parts.first().unwrap();
+                        let position: i32 = parts.get(1).unwrap().parse().unwrap();
+                        (*chr, position)
+                    })
+                    .collect();
+                all_positions.extend(positions);
+                all_qnames_to_fetch.push(String::from_utf8(qname.to_vec()).unwrap());
+            }
+        } else {
+            res.push(record.clone());
+        }
+    }
+    all_positions.sort_by_key(|(_, pos)| *pos);
+    all_positions.sort_by_key(|(chr, _)| *chr);
+    all_positions.dedup();
+
+    // Group by &str and create bins of ±1000
+    let mut grouped: HashMap<&str, Vec<Vec<(&str, i32)>>> = HashMap::new();
+
+    for (key, value) in all_positions {
+        let entry = grouped.entry(key).or_default();
+
+        if let Some(last_bin) = entry.last_mut() {
+            if last_bin.is_empty() || (value - last_bin[0].1).abs() <= 1000 {
+                last_bin.push((key, value));
+            } else {
+                entry.push(vec![(key, value)]);
+            }
+        } else {
+            entry.push(vec![(key, value)]);
+        }
+    }
+
+    // Flatten and sort by bin size
+    let mut flattened: Vec<Vec<(&str, i32)>> = grouped.values().flatten().cloned().collect();
+    flattened.sort_by_key(|b| std::cmp::Reverse(b.len()));
+
+    let mut primary_records: Vec<String> = res
+        .iter()
+        .map(|r| String::from_utf8(r.qname().to_vec()).unwrap())
+        .collect();
+
+    for group in flattened {
+        let (chr, start) = group.first().unwrap();
+
+        let end = group.last().unwrap().1 + 1;
+        // info!("{} {}:{}-{}", group.len(), group.first().unwrap().0, group.first().unwrap().1, group.last().unwrap().1);
+        bam.fetch((*chr, *start, end)).unwrap();
+        for record in bam.records() {
+            let record = record.unwrap();
+            if record.is_secondary() {
+                continue;
+            }
+            let qname = String::from_utf8(record.qname().to_vec()).unwrap();
+            if primary_records.contains(&qname) {
+                continue;
+            }
+            if all_qnames_to_fetch.contains(&qname) {
+                res.push(record);
+                primary_records.push(qname);
+            }
+        }
+        let unfound = all_qnames_to_fetch
+            .iter()
+            .filter(|q| !primary_records.contains(q))
+            .count();
+        if unfound == 0 {
+            break;
+        }
+    }
+
+    res
+}
+
 pub fn get_all_positions(
     record: &Record,
-    header: &HeaderView,
-    bam: &mut IndexedReader,
+    bam_path: &str,
+    // bam: &mut IndexedReader,
 ) -> anyhow::Result<Vec<(String, u32, u32)>> {
+    let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+    let header = bam.header().to_owned();
     let mut positions = Vec::new();
     let qname = record.qname();
     let contig = String::from_utf8(header.tid2name(record.tid().try_into()?).to_vec())?;

+ 17 - 15
src/bin.rs

@@ -1,10 +1,10 @@
 use anyhow::Context;
-use log::warn;
+use log::{info, warn};
 use rayon::prelude::*;
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
 use std::collections::HashMap;
 
-use crate::bam::primary_record;
+use crate::bam::{primary_record, primary_records};
 
 /// Enforce that reads should have unique qnames
 #[derive(Debug)]
@@ -96,7 +96,6 @@ impl Bin {
             if reference_start >= self.start && reference_start <= self.end {
                 *s.entry(reference_start).or_default() += 1;
             }
-            
         }
 
         let max_pos = s.into_iter().max_by_key(|(_, v)| *v);
@@ -111,7 +110,6 @@ impl Bin {
             if reference_end >= self.start && reference_end <= self.end {
                 *e.entry(reference_end).or_default() += 1;
             }
-            
         }
 
         let max_pos = e.into_iter().max_by_key(|(_, v)| *v);
@@ -119,21 +117,28 @@ impl Bin {
     }
 
     pub fn se_primary(&mut self, pos: u32) -> Vec<Record> {
-        self.reads_store
+        let records: Vec<Record> = self
+            .reads_store
             .values()
             .filter(|record| {
                 record.reference_start() as u32 == pos || record.reference_end() as u32 == pos
             })
-            .map(|record| primary_record(&mut self.bam_reader, record.clone()))
-            .collect()
+            .cloned()
+            .collect();
+        primary_records(&mut self.bam_reader, records)
+        // self.reads_store
+        //     .values()
+        //     .filter(|record| {
+        //         record.reference_start() as u32 == pos || record.reference_end() as u32 == pos
+        //     }).inspect(|r| info!("rec {}", r.reference_start()))
+        //     .map(|record| primary_record(&mut self.bam_reader, record.clone()))
+        //     .collect()
     }
 
     pub fn s_primary(&mut self, pos: u32) -> Vec<Record> {
         self.reads_store
             .values()
-            .filter(|record| {
-                record.reference_start() as u32 == pos
-            })
+            .filter(|record| record.reference_start() as u32 == pos)
             .map(|record| primary_record(&mut self.bam_reader, record.clone()))
             .collect()
     }
@@ -141,9 +146,7 @@ impl Bin {
     pub fn e_primary(&mut self, pos: u32) -> Vec<Record> {
         self.reads_store
             .values()
-            .filter(|record| {
-                record.reference_end() as u32 == pos
-            })
+            .filter(|record| record.reference_end() as u32 == pos)
             .map(|record| primary_record(&mut self.bam_reader, record.clone()))
             .collect()
     }
@@ -294,8 +297,7 @@ pub fn scan_outliers(
         starts.push(current);
         current += length;
     }
-    // println!("group {contig}:{}-{}", starts.first().unwrap(), starts.last().unwrap() + length - 1);
-
+    
     let ratios: Vec<(u32, usize, u32, f64, f64)> = starts
         .into_par_iter()
         .filter_map(|start| {

+ 67 - 22
src/lib.rs

@@ -3,6 +3,7 @@ use crate::{
     bin::{scan_outliers, Bin},
 };
 use anyhow::Context;
+use dashmap::DashMap;
 use dict_reader::read_dict;
 use log::info;
 use rayon::prelude::*;
@@ -10,7 +11,7 @@ use rust_htslib::bam::{Format, Header, Read, Record, Writer};
 use std::{
     collections::{HashMap, HashSet},
     fs::{self, File},
-    io::{self, BufReader, BufWriter, Write},
+    io::{self, BufReader, BufWriter, Write}, sync::Arc,
 };
 
 pub mod bam;
@@ -92,44 +93,88 @@ pub fn scan_save(
             })
             .collect();
 
-    let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+    info!("🆗");
+    let bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
     let header = bam.header().to_owned();
     let mut grouped_records = Vec::new();
+    // for outlier_record in outliers_records {
+    //     let mut hm_positions: DashMap<String, Vec<String>> = DashMap::new();
+    //     outlier_record.par_iter().for_each(|r| {
+    //         let mut bam = rust_htslib::bam::IndexedReader::from_path(bam_path).unwrap();
+    //         for pos in get_all_positions(r, &header, &mut bam).unwrap() {
+    //             let qname = String::from_utf8(r.qname().to_vec()).unwrap();
+    //             hm_positions
+    //                 .entry(format!("{}-{}", pos.0, pos.1))
+    //                 .or_default()
+    //                 .push(qname.clone());
+    //             hm_positions
+    //                 .entry(format!("{}-{}", pos.0, pos.2))
+    //                 .or_default()
+    //                 .push(qname);
+    //         }
+    //     });
+    //     for v in hm_positions.iter_mut() {
+    //         let mut v = v.value_mut();
+    //         v.sort(); // Ensure dedup works correctly
+    //         v.dedup();
+    //
+    //         if v.len() >= min_reads {
+    //             let rec: Vec<_> = outlier_record
+    //                 .clone()
+    //                 .into_iter()
+    //                 .filter(|r| {
+    //                     let qname = String::from_utf8(r.qname().to_vec()).unwrap();
+    //                     v.contains(&qname)
+    //                 })
+    //                 .collect();
+    //             grouped_records.push(rec);
+    //         }
+    //     }
+    // }
+
     for outlier_record in outliers_records {
-        let mut hm_positions: HashMap<String, Vec<String>> = HashMap::new();
-        outlier_record.iter().for_each(|r| {
-            for pos in get_all_positions(r, &header, &mut bam).unwrap() {
-                let qname = String::from_utf8(r.qname().to_vec()).unwrap();
-                hm_positions
-                    .entry(format!("{}-{}", pos.0, pos.1))
-                    .or_default()
-                    .push(qname.clone());
-                hm_positions
-                    .entry(format!("{}-{}", pos.0, pos.2))
-                    .or_default()
-                    .push(qname);
+        let hm_positions: DashMap<String, Vec<String>> = DashMap::new();
+        // let bam_clone = Arc::clone(&mut bam);
+
+        outlier_record.par_iter().for_each(|r| {
+            let r = r.clone();
+            if let Ok(positions) = get_all_positions(&r, bam_path) {
+            // if let Ok(positions) = get_all_positions(&r, &header, &mut bam) {
+                let qname = String::from_utf8_lossy(r.qname()).to_string();
+                for pos in positions {
+                    hm_positions
+                        .entry(format!("{}-{}", pos.0, pos.1))
+                        .or_default()
+                        .push(qname.clone());
+                    hm_positions
+                        .entry(format!("{}-{}", pos.0, pos.2))
+                        .or_default()
+                        .push(qname.clone());
+                }
             }
         });
-        for v in hm_positions.values_mut() {
-            v.sort(); // Ensure dedup works correctly
+
+        for mut entry in hm_positions.iter_mut() {
+            let v = entry.value_mut();
+            v.sort();
             v.dedup();
 
             if v.len() >= min_reads {
                 let rec: Vec<_> = outlier_record
-                    .clone()
-                    .into_iter()
+                    .iter()
                     .filter(|r| {
-                        let qname = String::from_utf8(r.qname().to_vec()).unwrap();
+                        let qname = String::from_utf8_lossy(r.qname()).to_string();
                         v.contains(&qname)
                     })
+                    .cloned()
                     .collect();
                 grouped_records.push(rec);
             }
         }
     }
 
+    info!("n groups {}", grouped_records.len());
     let mut dedup = HashSet::new();
-
     let mut n_records = 0;
     let grouped_records: Vec<Vec<Record>> = grouped_records
         .iter()
@@ -340,7 +385,7 @@ mod tests {
     #[test]
     fn locus() {
         init();
-        let len = 1000;
+        // let len = 1000;
         let id = "LEVASSEUR";
         let bam_path = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam");
         let out_scan = "/tmp/test.txt";
@@ -353,7 +398,7 @@ mod tests {
             &bam_path,
             "chr10",
             /* 102020492 - (len * 1000) */ 0,
-            120000000,
+            120_000,
             &mut writer,
             &out_dir_bam,
             &config,