Thomas 1 year ago
parent
commit
447eb48a7e
4 changed files with 297 additions and 19 deletions
  1. 1 0
      .gitignore
  2. 161 0
      Cargo.lock
  3. 4 0
      Cargo.toml
  4. 131 19
      src/lib.rs

+ 1 - 0
.gitignore

@@ -1 +1,2 @@
 /target
+/data_test

+ 161 - 0
Cargo.lock

@@ -118,6 +118,19 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "crossbeam"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-epoch",
+ "crossbeam-queue",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-channel"
 version = "0.5.12"
@@ -127,6 +140,34 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-queue"
+version = "0.3.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.19"
@@ -170,14 +211,50 @@ name = "desc_seq_lib"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "crossbeam",
+ "crossbeam-channel",
+ "env_logger 0.10.2",
  "log",
  "minimap2",
  "noodles-fasta",
  "rust-htslib",
  "seq_io",
+ "test-log",
  "uuid",
 ]
 
+[[package]]
+name = "env_filter"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
+dependencies = [
+ "log",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
+dependencies = [
+ "humantime",
+ "is-terminal",
+ "log",
+ "regex",
+ "termcolor",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9"
+dependencies = [
+ "env_filter",
+ "log",
+]
+
 [[package]]
 name = "flate2"
 version = "1.0.28"
@@ -229,6 +306,12 @@ version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
 
+[[package]]
+name = "hermit-abi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
+
 [[package]]
 name = "hts-sys"
 version = "2.1.1"
@@ -245,6 +328,12 @@ dependencies = [
  "openssl-sys",
 ]
 
+[[package]]
+name = "humantime"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
+
 [[package]]
 name = "idna"
 version = "0.5.0"
@@ -261,6 +350,17 @@ version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9007da9cacbd3e6343da136e98b0d2df013f553d35bdec8b518f07bea768e19c"
 
+[[package]]
+name = "is-terminal"
+version = "0.4.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "windows-sys",
+]
+
 [[package]]
 name = "jobserver"
 version = "0.1.28"
@@ -620,6 +720,36 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "termcolor"
+version = "1.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
+dependencies = [
+ "winapi-util",
+]
+
+[[package]]
+name = "test-log"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b319995299c65d522680decf80f2c108d85b861d81dfe340a10d16cee29d9e6"
+dependencies = [
+ "env_logger 0.11.3",
+ "test-log-macros",
+]
+
+[[package]]
+name = "test-log-macros"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c8f546451eaa38373f549093fe9fd05e7d2bade739e2ddf834b9968621d60107"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.52",
+]
+
 [[package]]
 name = "thiserror"
 version = "1.0.58"
@@ -709,6 +839,37 @@ version = "0.11.0+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
 
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-util"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
+dependencies = [
+ "winapi",
+]
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
 [[package]]
 name = "windows-sys"
 version = "0.52.0"

+ 4 - 0
Cargo.toml

@@ -6,6 +6,7 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+env_logger = "^0.10.1"
 minimap2 = { git = "https://github.com/jguhlin/minimap2-rs", features = ["htslib", "simde"]}
 rust-htslib = "0.46.0"
 anyhow = "1.0.75"
@@ -13,3 +14,6 @@ log = "0.4.19"
 uuid = { version = "1.6.1", features = ["serde", "v4"] }
 seq_io = "0.3.2"
 noodles-fasta = "0.34.0"
+crossbeam = "0.8.4"
+crossbeam-channel = "0.5.12"
+test-log = "0.2.15"

+ 131 - 19
src/lib.rs

@@ -1,4 +1,4 @@
-use anyhow::{Ok, Result};
+use anyhow::{anyhow, Ok, Result};
 use log::info;
 use minimap2::{Aligner, Mapping};
 use noodles_fasta as fasta;
@@ -7,8 +7,9 @@ use std::{
     collections::{HashMap, VecDeque},
     fmt,
     fs::{self, File},
-    io::{BufWriter, Write},
+    io::{BufReader, BufWriter, Write},
     process::{Command, Stdio},
+    thread
 };
 use uuid::Uuid;
 
@@ -28,7 +29,7 @@ pub struct Contig {
     // contig seq on ref
     pub mappings: Vec<Mapping>,
     // reads on ref
-    pub supporting_records: Vec<Record>,
+    pub supporting_records: Option<Vec<Record>>,
     pub sequence: String,
     pub contig_ref: ContigRef,
 }
@@ -103,6 +104,20 @@ impl ContigRef {
             ContigRef::RightAmbiguity(_) => None,
             ContigRef::Ambigous(_) => None,
             ContigRef::ChimericTriple((a, b, c)) => {
+                let mut v = vec![a, b, c];
+                v.sort_by(|a, b| a.query_start.cmp(&b.query_start));
+
+                let (a, b, c) = (
+                    *v.get(0).clone().unwrap(),
+                    *v.get(1).clone().unwrap(),
+                    *v.get(2).clone().unwrap(),
+                );
+                let a_target_name = a.target_name.clone().unwrap_or(uk.clone());
+                let b_target_name = b.target_name.clone().unwrap_or(uk.clone());
+                let c_target_name = c.target_name.clone().unwrap_or(uk.clone());
+
+                if a_target_name != b_target_name {}
+
                 // Insertions
                 // prioritize first len
                 let (bp_a_1, bp_a_2) = if a.query_end <= b.query_end {
@@ -268,7 +283,7 @@ impl Genome {
         &mut self,
         id: String,
         mappings: Vec<Mapping>,
-        supporting_records: Vec<Record>,
+        supporting_records: Option<Vec<Record>>,
         sequence: String,
     ) -> Result<()> {
         let new_contig = Contig {
@@ -364,6 +379,19 @@ impl Genome {
         Ok(())
     }
 
+    pub fn add_contig_from_seq(
+        &mut self,
+        name: String,
+        sequence: &Vec<u8>,
+        aligner: &Aligner,
+    ) -> Result<()> {
+        let mappings = aligner
+            .map(sequence, false, false, None, None)
+            .expect("Unable to align");
+        self.add_contig(name, mappings, None, String::from_utf8(sequence.to_vec())?)?;
+        Ok(())
+    }
+
     pub fn stats(&self) {
         for (k, v) in self.chromosomes.iter() {
             info!("{}:{}", k, v.contigs.len());
@@ -386,13 +414,14 @@ impl Chromosome {
 }
 
 impl Contig {
-    pub fn sort(&mut self) {
-        // sorting by target order
-        self.mappings
-            .sort_by(|a, b| a.target_start.cmp(&b.target_start));
-    }
+    // pub fn sort(&mut self) {
+    //     // sorting by target order
+    //     self.mappings
+    //         .sort_by(|a, b| a.target_start.cmp(&b.target_start));
+    // }
 
     pub fn to_igv(&self, dir_path: &str) -> Result<()> {
+        let supporting_records = self.supporting_records.clone().ok_or(anyhow!("no reads"))?;
         let contig_name = if let Some(hgvs) = self.hgvs() {
             hgvs
         } else {
@@ -406,8 +435,7 @@ impl Contig {
         write_fai(&fasta_path);
 
         let reads_path = format!("{contig_dir}/reads.fa");
-        let n_reads = self
-            .supporting_records
+        let n_reads = supporting_records
             .clone()
             .into_iter()
             .map(|r| {
@@ -450,7 +478,7 @@ impl Contig {
                     ),
                 ];
                 write_bed(&bed_path, &d)?;
-            },
+            }
             ContigRef::ChimericTriple((a, b, c)) => {
                 let d: Vec<(String, i32, i32, String)> = vec![a, b, c]
                     .iter()
@@ -469,7 +497,7 @@ impl Contig {
                     })
                     .collect();
                 write_bed(&bed_path, &d)?;
-            },
+            }
             _ => (),
         }
 
@@ -486,7 +514,11 @@ impl Contig {
             .expect("Unable to build index");
 
         let mut mappings = Vec::new();
-        for record in self.supporting_records.iter() {
+        let supporting_records = self
+            .supporting_records
+            .clone()
+            .ok_or(anyhow!("no supporting records"))?;
+        for record in supporting_records.iter() {
             let seq = record.seq().as_bytes();
             let alignment = aligner
                 .map(&seq, false, false, None, None)
@@ -546,7 +578,6 @@ fn group_mappings(mappings: &mut Vec<Mapping>) -> Result<Vec<Vec<Mapping>>> {
             alignments.push(vec![aln.clone()]);
         }
     }
-    println!("{mappings:?}");
 
     Ok(alignments)
 }
@@ -665,13 +696,94 @@ pub fn write_bam(ref_path: &str, reads_path: &str, bam_path: &str) -> Result<()>
     Ok(())
 }
 
+pub fn read_fasta(path: &str) -> Result<Vec<(String, Vec<u8>)>> {
+    let mut reader = File::open(&path)
+        .map(BufReader::new)
+        .map(fasta::Reader::new)?;
+
+    let mut res = Vec::new();
+    for result in reader.records() {
+        let record = result?;
+        let u = String::from_utf8(record.name().to_vec())?;
+        let s = record.sequence().as_ref().to_vec();
+        res.push((u, s));
+    }
+
+    Ok(res)
+}
+use crossbeam_channel::{unbounded, Receiver, Sender};
+
+pub fn spawn_aligner(reference: String, mmi: Option<String>) -> (Sender<Option<Vec<u8>>>, Receiver<(Vec<u8>, Vec<Mapping>)>) {
+    println!("kkkkkkkkkk");
+    let reference = reference.clone();
+    let mmi = mmi.clone();
+    let (thread_s, out_r) = unbounded::<(Vec<u8>, Vec<Mapping>)>();
+    let (out_s, thread_r) = unbounded::<Option<Vec<u8>>>();
+
+    thread::spawn(move || {
+        println!("Thread spawned");
+        let aligner = Aligner::builder()
+            .map_ont()
+            .with_threads(8)
+            .with_cigar()
+            .with_index(reference, mmi.as_deref())
+            .expect("Unable to build index");
+
+         println!("aligner spawned");
+
+
+        loop {
+            if let std::result::Result::Ok(seq) = thread_r.recv() {
+                println!("rec");
+                match seq {
+                    Some(seq) => {
+                        let mappings = aligner
+                            .map(&seq, false, false, None, None)
+                            .expect("Unable to align");
+                        let _ = thread_s.send((seq, mappings));
+                    }
+                    None => break,
+                }
+            }
+        }
+    });
+
+    println!("Out");
+
+    return (out_s, out_r);
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
+    use test_log::test;
+
+
+    #[test_log::test]
+    fn it_works() -> Result<()> {
+        let _ = env_logger::builder().is_test(true).try_init();
+        let contig_fa = "./data_test/contig_1.fa";
 
-    #[test]
-    fn it_works() {
-        // let result = add(2, 2);
-        // assert_eq!(result, 4);
+        let reference = "/data/ref/hs1/chm13v2.0.fa";
+        let mmi_path = "/data/ref/hs1/hs1-ont.mmi".to_string();
+        let mmi = Some("/data/ref/hs1/hs1-ont.mmi");
+
+        let mut genome = Genome::new();
+        let (s, r) =  spawn_aligner(reference.to_string(), Some(mmi_path.to_string()));
+
+        let sequences = read_fasta(contig_fa)?;
+        for (name, seq) in sequences {
+            println!("Sending to thread");
+            s.send(Some(seq)).unwrap();
+            // genome.add_contig_from_seq(name, &seq, &aligner)?;
+        }
+
+        while let std::result::Result::Ok(res) = r.recv() {
+            println!("Recived {:?}", res);
+        }
+
+        s.send(None)?;
+
+        Ok(())
     }
 }