Przeglądaj źródła

refactor + spades

Thomas 1 rok temu
rodzic
commit
debf8b0a87
13 zmienionych plików z 1820 dodań i 1052 usunięć
  1. 109 16
      Cargo.lock
  2. 6 5
      Cargo.toml
  3. 241 0
      src/assembler/flye.rs
  4. 118 0
      src/assembler/lastz.rs
  5. 93 0
      src/assembler/mod.rs
  6. 243 0
      src/assembler/spades.rs
  7. 151 0
      src/io/bam.rs
  8. 0 0
      src/io/bed.rs
  9. 0 0
      src/io/dict.rs
  10. 49 0
      src/io/fasta.rs
  11. 24 0
      src/io/fastq.rs
  12. 5 0
      src/io/mod.rs
  13. 781 1031
      src/lib.rs

+ 109 - 16
Cargo.lock

@@ -95,6 +95,26 @@ version = "0.22.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
 
+[[package]]
+name = "bindgen"
+version = "0.69.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0"
+dependencies = [
+ "bitflags",
+ "cexpr",
+ "clang-sys",
+ "itertools",
+ "lazy_static",
+ "lazycell",
+ "proc-macro2",
+ "quote",
+ "regex",
+ "rustc-hash",
+ "shlex",
+ "syn 2.0.75",
+]
+
 [[package]]
 name = "bio-types"
 version = "1.0.4"
@@ -124,6 +144,15 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "buffer-redux"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e8acf87c5b9f5897cd3ebb9a327f420e0cae9dd4e5c1d2e36f2c84c571a58f1"
+dependencies = [
+ "memchr",
+]
+
 [[package]]
 name = "byteorder"
 version = "1.5.0"
@@ -158,12 +187,32 @@ dependencies = [
  "shlex",
 ]
 
+[[package]]
+name = "cexpr"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
+dependencies = [
+ "nom",
+]
+
 [[package]]
 name = "cfg-if"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "clang-sys"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
+dependencies = [
+ "glob",
+ "libc",
+ "libloading",
+]
+
 [[package]]
 name = "cmake"
 version = "0.1.51"
@@ -374,9 +423,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
 
 [[package]]
 name = "flate2"
-version = "1.0.32"
+version = "1.0.33"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666"
+checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253"
 dependencies = [
  "crc32fast",
  "miniz_oxide",
@@ -435,6 +484,7 @@ version = "2.1.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e9f348d14cb4e50444e39fcd6b00302fe2ed2bc88094142f6278391d349a386d"
 dependencies = [
+ "bindgen",
  "bzip2-sys",
  "cc",
  "curl-sys",
@@ -483,6 +533,15 @@ version = "1.70.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
 
+[[package]]
+name = "itertools"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itoa"
 version = "1.0.11"
@@ -504,12 +563,28 @@ version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
 
+[[package]]
+name = "lazycell"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
+
 [[package]]
 name = "libc"
 version = "0.2.158"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
 
+[[package]]
+name = "libloading"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
+dependencies = [
+ "cfg-if",
+ "windows-targets 0.52.6",
+]
+
 [[package]]
 name = "libredox"
 version = "0.1.3"
@@ -619,9 +694,9 @@ dependencies = [
 
 [[package]]
 name = "noodles-fasta"
-version = "0.41.0"
+version = "0.42.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d7a1662ac3ace299515c982a322e378bbeb4c1bd90fb098d823ef0f3a6abcc00"
+checksum = "d60db7c4c514211598f2d7eb38e499e3b42d3eb690779fd0b36f224650c75c82"
 dependencies = [
  "bstr",
  "bytes",
@@ -630,15 +705,6 @@ dependencies = [
  "noodles-core",
 ]
 
-[[package]]
-name = "noodles-fastq"
-version = "0.13.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1edf1f924acddeee36304c444e242b9bda52ef9383dc2d7f008fca190753207"
-dependencies = [
- "memchr",
-]
-
 [[package]]
 name = "num-format"
 version = "0.4.4"
@@ -707,14 +773,15 @@ dependencies = [
  "log",
  "nom",
  "noodles-fasta",
- "noodles-fastq",
  "pandora_lib_blastn",
  "pandora_lib_igv",
  "petgraph",
  "rayon",
  "regex",
  "rust-htslib",
+ "seq_io",
  "serde",
+ "thiserror",
  "uuid",
 ]
 
@@ -854,9 +921,9 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
 
 [[package]]
 name = "rust-htslib"
-version = "0.46.0"
+version = "0.47.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aec6f9ca4601beb4ae75ff8c99144dd15de5a873f6adf058da299962c760968e"
+checksum = "41f1796800e73ebb282c6fc5c03f1fe160e867e01114a58a7e115ee3c1d02482"
 dependencies = [
  "bio-types",
  "byteorder",
@@ -874,6 +941,12 @@ dependencies = [
  "url",
 ]
 
+[[package]]
+name = "rustc-hash"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
+
 [[package]]
 name = "rustc_version"
 version = "0.1.7"
@@ -895,12 +968,32 @@ version = "1.0.18"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
 
+[[package]]
+name = "scoped_threadpool"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d51f5df5af43ab3f1360b429fa5e0152ac5ce8c0bd6485cae490332e96846a8"
+
 [[package]]
 name = "semver"
 version = "0.1.20"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
 
+[[package]]
+name = "seq_io"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "86b213fabdebb1c56d330d1dbf92cef0257f3873b718eb2202bb73c707197e9c"
+dependencies = [
+ "buffer-redux",
+ "crossbeam-utils",
+ "memchr",
+ "scoped_threadpool",
+ "serde",
+ "serde_derive",
+]
+
 [[package]]
 name = "serde"
 version = "1.0.209"

+ 6 - 5
Cargo.toml

@@ -8,19 +8,20 @@ log = "^0.4.22"
 env_logger = "^0.11.5"
 anyhow = "1.0.86"
 duct = "0.13.7"
-noodles-fasta = "0.41.0"
-noodles-fastq = "0.13.0"
-rust-htslib = "0.46.0"
+noodles-fasta = "0.42.0"
+rust-htslib = "0.47.0"
 uuid = { version = "1.10.0", features = ["v4"] }
 pandora_lib_blastn = { git = "https://git.t0m4.fr/Thomas/pandora_lib_blastn.git" }
 pandora_lib_igv = { git = "https://git.t0m4.fr/Thomas/pandora_lib_igv.git" }
 nom = "7.1.3"
 petgraph = "0.6.5"
 bam = "0.1.4"
-flate2 = "1.0.30"
+flate2 = "1.0.33"
 glob = "0.3.1"
-regex = "1.10.5"
+regex = "1.10.6"
 rayon = "1.10.0"
 csv = "1.3.0"
 serde = { version = "1.0.209", features = ["derive"] }
+thiserror = "1.0.63"
+seq_io = "0.3.2"
 

+ 241 - 0
src/assembler/flye.rs

@@ -0,0 +1,241 @@
+use log::{info, warn};
+use std::{
+    fs::{self, File},
+    io::{BufRead, BufReader, Read},
+    path::{Path, PathBuf},
+    process::{Command, Stdio},
+};
+use super::Assemble;
+use crate::{assembler::AssembleError, io::{
+    bam::{cp_mod_tags, read_bam},
+    fasta::{fai, records_to_fasta, write_fasta},
+}};
+
+#[derive(Debug, Clone)]
+pub struct FlyeConfig {
+    pub output_dir: PathBuf,
+
+    pub min_cov: u16,
+    pub min_reads: u32,
+    pub threads: u16,
+    pub min_overlap: u32,
+    pub flye_bin: String,
+}
+
+impl Default for FlyeConfig {
+    fn default() -> Self {
+        Self {
+            output_dir: PathBuf::new(),
+            min_cov: 2,
+            min_reads: 3,
+            threads: 12,
+            min_overlap: 1000,
+            flye_bin: "/data/tools/Flye/bin/flye".to_string(),
+        }
+    }
+}
+
+impl FlyeConfig {
+    pub fn new(output_dir: &str) -> Self {
+        FlyeConfig { output_dir: PathBuf::from(output_dir), ..Default::default() }
+    }
+}
+
+#[derive(Debug)]
+pub struct Flye {
+    pub config: FlyeConfig,
+    pub input_id: String,
+
+    pub input_records: Vec<bam::Record>,
+    pub on_contig_bam: String,
+    pub contigs: Option<Vec<String>>,
+}
+
+impl Assemble for Flye {
+    fn init(
+        input_bam: &Path,
+        config: &super::AssembleConfig,
+    ) -> anyhow::Result<Self> {
+        match config {
+            super::AssembleConfig::Flye(config) => {
+                let input_id = input_bam.file_stem().unwrap().to_str().unwrap().to_string();
+                let input_records = read_bam(input_bam)?;
+
+                Ok(Self {
+                    config: config.clone(),
+                    input_id,
+                    input_records,
+                    on_contig_bam: String::new(),
+                    contigs: None,
+                })
+            }
+            _ => Err(anyhow::anyhow!("Wrong config format for Flye.")),
+        }
+    }
+
+    fn assemble(mut self) -> anyhow::Result<Self> {
+        let min_cov = self.config.min_cov;
+
+        let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
+        info!("Creating tmp directory {tmp_dir}");
+        fs::create_dir(&tmp_dir).unwrap();
+
+        let input_fa = format!("{}/{}.fasta", tmp_dir, self.input_id);
+        if !Path::new(&input_fa).exists() {
+            records_to_fasta(&self.input_records, &input_fa)?;
+        }
+
+        let flye_tmp_dir = format!("{tmp_dir}/flye");
+        fs::create_dir(&flye_tmp_dir).unwrap();
+
+        run_flye(&input_fa, &flye_tmp_dir, &self.config);
+
+        let contigs_path = format!("{flye_tmp_dir}/assembly.fasta");
+
+        if Path::new(&contigs_path).exists() {
+            let assembly_path = format!("{flye_tmp_dir}/assembly.fasta");
+            let flye_cov_path = format!("{flye_tmp_dir}/40-polishing/base_coverage.bed.gz");
+
+            // Read the assembly fasta
+            let mut reader = File::open(assembly_path)
+                .map(BufReader::new)
+                .map(noodles_fasta::Reader::new)?;
+
+            let mut contigs = Vec::new();
+            for result in reader.records() {
+                let record = result?;
+                let contig_name = String::from_utf8(record.name().to_vec()).unwrap();
+                let (s, e) = read_flye_coverage(&flye_cov_path, min_cov.into(), &contig_name);
+                let seq = record.sequence().as_ref();
+                let seq = String::from_utf8(seq.to_vec()).unwrap();
+                let seq: String = seq[s..e].into();
+                contigs.push(seq);
+            }
+            self.contigs = Some(contigs);
+        } else {
+            // warn!("No reads assembled for {}", self.input_id);
+            anyhow::bail!(AssembleError::NoContig(self.input_id));
+        }
+
+        // Cleaning
+        fs::remove_dir_all(tmp_dir)?;
+
+        Ok(self)
+    }
+
+    fn save(self) -> anyhow::Result<()> {
+        if self.contigs.is_none() {
+            anyhow::bail!(AssembleError::NoContig(self.input_id));
+        }
+
+        for (i, contig) in self.contigs.unwrap().iter().enumerate() {
+            let suffixe = if i == 0 {
+                "".to_string()
+            } else {
+                format!("_{i}")
+            };
+
+            if !self.config.output_dir.exists() {
+                fs::create_dir_all(&self.config.output_dir)?;
+            }
+
+            let contig_id = format!("{}{suffixe}_flye", self.input_id);
+            let contig_fa = format!("{}/{contig_id}.fa", self.config.output_dir.display());
+
+            info!("Saving contig {contig_id} in {contig_fa}");
+            write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())]);
+            fai(&contig_fa)?;
+
+            // Writing bed file from best blastn results
+            // let bed_path = format!("{}/{contig_id}.bed", self.output_dir.display());
+            // let bed = pandora_lib_blastn::Blast::init(&contig_fa)?
+            //     .run()?
+            //     .keep_best()
+            //     .to_bed()?;
+            // let mut f = File::create(bed_path)?;
+            // f.write_all(bed.as_bytes())?;
+            //
+            // Remaping input bam to contig
+            info!("Mapping  input reads to {contig_id}");
+            let new_bam = format!("{}/{contig_id}.bam", self.config.output_dir.display());
+            duct::cmd!("bwa", "index", contig_fa.clone()).run()?;
+            let input_fa = format!("{}/{}.fasta", self.config.output_dir.display(), self.input_id);
+            let bwa = format!("bwa mem {contig_fa} {input_fa}");
+            let samtools = "samtools sort /dev/stdin";
+            let pipe = format!("{bwa} | {samtools} > {new_bam}");
+            duct::cmd!("bash", "-c", pipe).run()?;
+
+            // Copy modified base tags to new bam
+            cp_mod_tags(&self.input_records, &new_bam)?;
+
+            // Run modkit
+            let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.config.output_dir.display());
+            duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?;
+        }
+        Ok(())
+    }
+}
+
+pub fn run_flye(fasta_path: &str, tmp_dir: &str, config: &FlyeConfig) {
+    info!("Running Flye for {fasta_path}");
+    let mut cmd = Command::new(&config.flye_bin)
+        .arg("--threads")
+        .arg(config.threads.to_string())
+        // .arg("--keep-haplotypes")
+        // .arg("--meta")
+        .arg("--min-overlap")
+        .arg(config.min_overlap.to_string())
+        .arg("--out-dir")
+        .arg(tmp_dir)
+        .arg("--nano-hq")
+        .arg(fasta_path)
+        .stderr(Stdio::piped())
+        .spawn()
+        .expect("Flye failed to start");
+
+    let stderr = cmd.stderr.take().unwrap();
+    let reader = BufReader::new(stderr);
+    reader
+        .lines()
+        .map_while(Result::ok)
+        .filter(|line| line.contains("ERROR"))
+        .for_each(|line| warn!("[FLYE] {line}"));
+
+    cmd.wait().unwrap();
+    cmd.kill().unwrap();
+}
+
+pub fn read_flye_coverage(path: &str, min_cov: i32, contig_name: &str) -> (usize, usize) {
+    let mut reader = File::open(path).map(flate2::read::GzDecoder::new).unwrap();
+    let mut buf = Vec::new();
+    reader.read_to_end(&mut buf).unwrap();
+
+    let mut line_acc = Vec::new();
+    let mut start = None;
+    let mut end = None;
+    let mut last_end = 0;
+    for b in buf.iter() {
+        match b {
+            b'\n' => {
+                let s = String::from_utf8(line_acc.clone()).unwrap();
+                line_acc.clear();
+                if !s.starts_with(&format!("{contig_name}\t")) {
+                    continue;
+                }
+                let s = s.split('\t').collect::<Vec<&str>>();
+                let cov: i32 = s.get(3).unwrap().parse().unwrap();
+                if start.is_none() && cov >= min_cov {
+                    let st: i32 = s.get(1).unwrap().parse().unwrap();
+                    start = Some(st);
+                } else if end.is_none() && start.is_some() && cov < min_cov {
+                    let en: i32 = s.get(1).unwrap().parse().unwrap();
+                    end = Some(en);
+                    break;
+                }
+                last_end = s.get(2).unwrap().parse().unwrap();
+            }
+            _ => line_acc.push(*b),
+        }
+    }
+    (start.unwrap() as usize, end.unwrap_or(last_end) as usize)
+}

+ 118 - 0
src/assembler/lastz.rs

@@ -0,0 +1,118 @@
+
+
+// TODO
+
+pub fn lastz_remap(
+    // bam_path: &str,
+    input_records: &Vec<bam::Record>,
+    ref_fa: &str,
+    ref_len: u32,
+    new_bam: &str,
+    modkit_pileup: &str,
+) -> anyhow::Result<()> {
+    let tmp = "/tmp";
+    let tmp_dir = format!("{tmp}/{}", Uuid::new_v4());
+    fs::create_dir(&tmp_dir)?;
+
+    let mut records = Vec::new();
+    let mut mm_hm = HashMap::new();
+    let mut ml_hm = HashMap::new();
+    // let reader = bam::BamReader::from_path(bam_path, 4).unwrap();
+    for record in input_records {
+        // let record = record?;
+        let query_name = String::from_utf8(record.name().to_vec())?;
+        if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") {
+            mm_hm.insert(query_name.clone(), value.to_vec());
+        }
+        if let Some(TagValue::IntArray(value)) = record.tags().get(b"ML") {
+            ml_hm.insert(query_name.clone(), value.raw().to_vec());
+        }
+
+        let query_path = format!("{}/{}.fa", &tmp_dir, query_name);
+
+        let query_seq = String::from_utf8(record.sequence().to_vec())?;
+
+        write_fasta(&query_path, &vec![(query_name.clone(), query_seq)]);
+        let mut res = lastz_bam(&query_path, ref_fa)?;
+        // take best length
+        res.sort_by_cached_key(|r| r.aligned_query_end() - r.aligned_query_start());
+        let mut r = res.last().unwrap().clone();
+        r.set_name(query_name.as_bytes().to_vec().clone());
+        records.push(r);
+    }
+    records.sort_by_cached_key(|b| b.start());
+
+    let contig_name = PathBuf::from(&ref_fa)
+        .file_stem()
+        .unwrap()
+        .to_string_lossy()
+        .to_string();
+    // Header format
+    let mut header = bam::header::Header::new();
+    let mut header_line = HeaderEntry::header_line("1.6".to_string());
+    header_line.push(b"SO", "Coordinate".to_string());
+    header.push_entry(header_line).unwrap();
+    header
+        .push_entry(HeaderEntry::ref_sequence(contig_name, ref_len))
+        .unwrap();
+
+    let mut bam_writer = bam::bam_writer::BamWriterBuilder::new();
+    let mut w = bam_writer.from_path(new_bam, header)?;
+    for record in records.iter() {
+        let mut record = record.clone();
+        let record_name = String::from_utf8(record.name().to_vec())?;
+        let tags = record.tags_mut();
+        if let Some(mm) = mm_hm.get(&record_name) {
+            tags.push_string(b"MM", mm)
+        }
+        let tags = record.tags_mut();
+        if let Some(ml) = ml_hm.get(&record_name) {
+            let v = ml.as_bytes();
+            tags.push_array(b"ML", v);
+        }
+        w.write(&record)?;
+    }
+    w.finish()?;
+    fs::remove_dir_all(tmp_dir)?;
+
+    duct::cmd!("samtools", "index", "-@", "10", new_bam).run()?;
+    duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?;
+    Ok(())
+}
+
+pub fn lastz_bam(query_path: &str, subject_path: &str) -> anyhow::Result<Vec<bam::Record>> {
+    let lastz_bin = "lastz";
+    let pipe = format!("{lastz_bin} --format=softsam {query_path} {subject_path} | samtools sort");
+    let out = duct::cmd!("bash", "-c", pipe).reader()?;
+    let reader = bam::BamReader::from_stream(out, 4)?;
+    Ok(reader.into_iter().filter_map(anyhow::Result::ok).collect())
+}
+
+pub fn lastz_two_by_two(records: &[String], best: bool) -> anyhow::Result<Vec<BlastResult>> {
+    let lastz_bin = "lastz";
+    let mut all: Vec<BlastResult> = Vec::new();
+    for i in 0..records.len() {
+        for j in (i + 1)..records.len() {
+            let record_a = &records[i];
+            let record_b = &records[j];
+            let out = duct::cmd!(lastz_bin, "--format=blastn", record_a, record_b).read()?;
+            let mut results: Vec<pandora_lib_blastn::BlastResult> = out
+                .lines()
+                .filter(|s| !s.starts_with('#'))
+                .map(|s| s.to_string())
+                .filter_map(|s| BlastResult::try_from(s).ok())
+                .collect();
+            if !results.is_empty() {
+                if best {
+                    results.sort_by_key(|r| r.alignment_length);
+                    all.push(results.last().unwrap().clone())
+                } else {
+                    all.extend(results.into_iter());
+                }
+            }
+        }
+    }
+
+    Ok(all)
+}
+

+ 93 - 0
src/assembler/mod.rs

@@ -0,0 +1,93 @@
+use std::path::{Path, PathBuf};
+
+use anyhow::{Context, Ok};
+use log::warn;
+use thiserror::Error;
+
+use crate::io::bam::{all_bam_paths, create_bam_leave_two_out};
+
+use self::{
+    flye::{Flye, FlyeConfig},
+    spades::{Spades, SpadesConfig},
+};
+
+pub mod flye;
+pub mod spades;
+
+pub trait Assemble: Sized {
+    fn init(bam_input: &Path, config: &AssembleConfig) -> anyhow::Result<Self>;
+    fn assemble(self) -> anyhow::Result<Self>;
+    fn save(self) -> anyhow::Result<()>;
+}
+
+pub enum AssembleConfig {
+    Flye(FlyeConfig),
+    Spades(SpadesConfig),
+}
+
+pub fn assemble<A: Assemble>(bam_input: &Path, config: &AssembleConfig) -> anyhow::Result<()> {
+    let result = A::init(bam_input, config)
+        .and_then(|b| b.assemble())
+        .and_then(|c| c.save());
+
+    match result {
+        std::result::Result::Ok(()) => Ok(()),
+        Err(err) => {
+            if let Some(AssembleError::NoContig(err_msg)) = err.downcast_ref::<AssembleError>() {
+                warn!("No contig assembled for {}, rescuing...", err_msg);
+                rescue_assembly::<A>(bam_input, config)
+            } else {
+                Err(err)
+            }
+        }
+    }
+}
+
+fn rescue_assembly<A: Assemble>(bam_input: &Path, config: &AssembleConfig) -> anyhow::Result<()> {
+    let bams = create_bam_leave_two_out(bam_input.to_str().context("Invalid path")?)?;
+
+    let has_contig = bams.iter().any(|bam| assemble::<A>(bam, config).is_ok());
+
+    if has_contig {
+        Ok(())
+    } else {
+        Err(AssembleError::NoContigRescue(bam_input.to_str().unwrap().to_string()).into())
+    }
+}
+
+pub fn assemble_records(
+    bam_input: &PathBuf,
+    configs: &Vec<AssembleConfig>,
+) -> Vec<(PathBuf, String, anyhow::Result<()>)> {
+    let mut results = Vec::new();
+    for config in configs {
+        let r = match config {
+            AssembleConfig::Flye(_) => ("flye".to_string(), assemble::<Flye>(bam_input, config)),
+            AssembleConfig::Spades(_) => {
+                ("spades".to_string(), assemble::<Spades>(bam_input, config))
+            }
+        };
+
+        results.push((bam_input.to_owned(), r.0, r.1));
+    }
+    results
+}
+
+pub fn assemble_dir(
+    input_dir: &str,
+    configs: Vec<AssembleConfig>,
+) -> anyhow::Result<Vec<(PathBuf, String, anyhow::Result<()>)>> {
+    Ok(all_bam_paths(input_dir)?
+        .iter()
+        .flat_map(|bam_input| assemble_records(bam_input, &configs))
+        .collect())
+}
+
+#[derive(Error, Debug)]
+pub enum AssembleError {
+    #[error("No contig assembled: {0}")]
+    NoContig(String),
+
+    #[error("No contig assembled after rescue: {0}")]
+    NoContigRescue(String),
+}

+ 243 - 0
src/assembler/spades.rs

@@ -0,0 +1,243 @@
+use std::{
+    collections::HashMap,
+    fs::{self, File},
+    io::{BufRead, BufReader},
+    path::{Path, PathBuf},
+    process::{Command, Stdio},
+};
+
+use log::{info, warn};
+
+use crate::{
+    assembler::AssembleError,
+    io::{
+        bam::{cp_mod_tags, read_bam},
+        fasta::{fai, write_fasta},
+        fastq::records_to_fastq,
+    },
+};
+
+use super::{Assemble, AssembleConfig};
+
+#[derive(Debug, Clone)]
+pub struct SpadesConfig {
+    pub output_dir: PathBuf,
+
+    pub min_cov: u16,
+    pub min_reads: u32,
+    pub min_entropy: f64,
+    pub threads: u16,
+    pub spades_bin: String,
+}
+
+impl Default for SpadesConfig {
+    fn default() -> Self {
+        Self {
+            output_dir: PathBuf::new(),
+            min_cov: 2,
+            min_reads: 3,
+            min_entropy: 0.2,
+            threads: 12,
+            spades_bin: "/data/tools/SPAdes-4.0.0-Linux/bin/spades.py".to_string(),
+        }
+    }
+}
+
+impl SpadesConfig {
+    pub fn new(output_dir: &str) -> Self {
+        SpadesConfig {
+            output_dir: PathBuf::from(output_dir),
+            ..Default::default()
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Spades {
+    pub config: SpadesConfig,
+    pub input_id: String,
+
+    pub input_records: Vec<bam::Record>,
+    pub on_contig_bam: String,
+    pub contigs: Option<Vec<String>>,
+}
+
+impl Assemble for Spades {
+    fn init(input_bam: &std::path::Path, config: &AssembleConfig) -> anyhow::Result<Self> {
+        match config {
+            super::AssembleConfig::Spades(config) => {
+                let input_id = input_bam.file_stem().unwrap().to_str().unwrap().to_string();
+                let input_records = read_bam(input_bam)?;
+
+                Ok(Self {
+                    config: config.clone(),
+                    input_id,
+                    input_records,
+                    on_contig_bam: String::new(),
+                    contigs: None,
+                })
+            }
+            _ => Err(anyhow::anyhow!("Wrong config format for Spades.")),
+        }
+    }
+
+    fn assemble(mut self) -> anyhow::Result<Self> {
+        let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
+        info!("Creating tmp directory {tmp_dir}");
+        fs::create_dir(&tmp_dir).unwrap();
+
+        let input_fq = format!("{}/{}.fastq", tmp_dir, self.input_id);
+        if !Path::new(&input_fq).exists() {
+            records_to_fastq(&self.input_records, &input_fq)?;
+        }
+
+        let spades_tmp_dir = format!("{tmp_dir}/spades");
+        fs::create_dir(&spades_tmp_dir).unwrap();
+
+        run_spades(&input_fq, &spades_tmp_dir, &self.config);
+
+        let contigs_path = format!("{spades_tmp_dir}/contigs.fasta");
+
+        if Path::new(&contigs_path).exists() {
+            // Read the assembly fasta
+            let mut reader = File::open(contigs_path)
+                .map(BufReader::new)
+                .map(noodles_fasta::Reader::new)?;
+
+            let mut contigs = Vec::new();
+            for result in reader.records() {
+                let record = result?;
+                let seq = record.sequence().as_ref();
+                let seq = String::from_utf8(seq.to_vec()).unwrap();
+                if calculate_shannon_entropy(&seq) >= self.config.min_entropy {
+                    contigs.push(seq);
+                }
+            }
+            self.contigs = Some(contigs);
+        } else {
+            anyhow::bail!(AssembleError::NoContig(self.input_id));
+        }
+
+        // Cleaning
+        fs::remove_dir_all(tmp_dir)?;
+
+        Ok(self)
+    }
+
+    fn save(self) -> anyhow::Result<()> {
+        if self.contigs.is_none() {
+            anyhow::bail!(AssembleError::NoContig(self.input_id));
+        }
+
+        for (i, contig) in self.contigs.unwrap().iter().enumerate() {
+            let suffixe = if i == 0 {
+                "".to_string()
+            } else {
+                format!("_{i}")
+            };
+
+            if !self.config.output_dir.exists() {
+                fs::create_dir_all(&self.config.output_dir)?;
+            }
+
+            let contig_id = format!("{}{suffixe}_spades", self.input_id);
+            let contig_fa = format!("{}/{contig_id}.fa", self.config.output_dir.display());
+
+            info!("Saving contig {contig_id} in {contig_fa}");
+            write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())]);
+            fai(&contig_fa)?;
+
+            // Writing bed file from best blastn results
+            // let bed_path = format!("{}/{contig_id}.bed", self.output_dir.display());
+            // let bed = pandora_lib_blastn::Blast::init(&contig_fa)?
+            //     .run()?
+            //     .keep_best()
+            //     .to_bed()?;
+            // let mut f = File::create(bed_path)?;
+            // f.write_all(bed.as_bytes())?;
+            //
+            // Remaping input bam to contig
+            info!("Mapping  input reads to {contig_id}");
+            let new_bam = format!("{}/{contig_id}.bam", self.config.output_dir.display());
+            duct::cmd!("bwa", "index", contig_fa.clone()).run()?;
+            let input_fa = format!(
+                "{}/{}.fasta",
+                self.config.output_dir.display(),
+                self.input_id
+            );
+            let bwa = format!("bwa mem {contig_fa} {input_fa}");
+            let samtools = "samtools sort /dev/stdin";
+            let pipe = format!("{bwa} | {samtools} > {new_bam}");
+            duct::cmd!("bash", "-c", pipe).run()?;
+
+            // Copy modified base tags to new bam
+            cp_mod_tags(&self.input_records, &new_bam)?;
+
+            // Run modkit
+            let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.config.output_dir.display());
+            duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?;
+        }
+        Ok(())
+    }
+    // add code here
+}
+
+pub fn run_spades(input_fq: &str, tmp_dir: &str, config: &SpadesConfig) {
+    info!("Running Spades for {input_fq}");
+    let mut cmd = Command::new(&config.spades_bin)
+        .arg("--disable-gzip-output")
+        .arg("--isolate")
+        .arg("-k")
+        .arg("21,33,55,77,99,127")
+        .arg("--cov-cutoff")
+        .arg(config.min_cov.to_string())
+        .arg("--threads")
+        .arg(config.threads.to_string())
+        .arg("-s")
+        .arg(input_fq)
+        .arg("-o")
+        .arg(tmp_dir)
+        .stderr(Stdio::piped())
+        .spawn()
+        .expect("Spades failed to start");
+
+    let stderr = cmd.stderr.take().unwrap();
+    let reader = BufReader::new(stderr);
+    reader
+        .lines()
+        .map_while(Result::ok)
+        .inspect(|o| info!("{o}"))
+        .filter(|line| line.contains("ERROR"))
+        .for_each(|line| warn!("[Spades] {line}"));
+
+    cmd.wait().unwrap();
+    cmd.kill().unwrap();
+}
+
+fn calculate_shannon_entropy(sequence: &str) -> f64 {
+    let total_length = sequence.len() as f64;
+
+    if total_length == 0.0 {
+        return 0.0;
+    }
+
+    // Count the frequency of each base
+    let mut base_counts = HashMap::new();
+    for base in sequence.chars() {
+        *base_counts.entry(base).or_insert(0) += 1;
+    }
+
+    // Calculate Shannon entropy
+    let mut entropy = 0.0;
+    for &count in base_counts.values() {
+        let probability = count as f64 / total_length;
+        entropy -= probability * probability.log2();
+    }
+
+    // Normalize entropy by dividing by log2(4) since we have 4 possible bases
+    let max_entropy = 4f64.log2();
+    let normalized_entropy = entropy / max_entropy;
+
+    // Round to 4 decimal places
+    (normalized_entropy * 10000.0).round() / 10000.0
+}

+ 151 - 0
src/io/bam.rs

@@ -0,0 +1,151 @@
+use anyhow::Context;
+use bam::{
+    record::tags::{StringType, TagValue}, BamReader, Record, RecordWriter
+};
+use log::{info, warn};
+use nom::AsBytes;
+use std::{
+    collections::HashMap,
+    fs,
+    path::{Path, PathBuf},
+};
+use uuid::Uuid;
+
+pub fn all_bam_paths(dir: &str) -> anyhow::Result<Vec<PathBuf>> {
+    let pattern = format!("{}/**/*.bam", dir);
+    let bam_paths: Vec<PathBuf> = glob::glob(&pattern)
+        .expect("Failed to read glob pattern")
+        .filter_map(Result::ok)
+        .collect();
+
+    Ok(bam_paths)
+}
+
+pub fn read_bam(input_bam: &Path) -> anyhow::Result<Vec<Record>> {
+    let reader = BamReader::from_path(input_bam, 3)?;
+    let mut input_records = Vec::new();
+    for rec in reader {
+        input_records.push(rec?);
+    }
+    Ok(input_records)
+}
+
+pub fn cp_mod_tags(input_records: &Vec<Record>, output_path: &str) -> anyhow::Result<()> {
+    // Read tags from input tags
+    let mut mm_hm = HashMap::new();
+    let mut ml_hm = HashMap::new();
+    for record in input_records {
+        // let record = record?;
+        let query_name = String::from_utf8(record.name().to_vec())?;
+        if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") {
+            mm_hm.insert(query_name.clone(), value.to_vec());
+        }
+        if let Some(TagValue::IntArray(value)) = record.tags().get(b"ML") {
+            ml_hm.insert(query_name.clone(), value.raw().to_vec());
+        }
+    }
+
+    // Read and keep records from destination bam
+    let reader =
+        bam::BamReader::from_path(output_path, 3).context(format!("Can't open {output_path}"))?;
+    let header = reader.header().to_owned();
+    let mut records = Vec::new();
+    for record in reader {
+        let record = record?;
+        records.push(record);
+    }
+
+    // Write records add tags into tmp file
+    let tmp_bam = format!("/tmp/{}.bam", Uuid::new_v4());
+    let mut bam_writer = bam::bam_writer::BamWriterBuilder::new();
+    let mut w = bam_writer.from_path(&tmp_bam, header)?;
+    for record in records.iter() {
+        let mut record = record.clone();
+        let record_name = String::from_utf8(record.name().to_vec())?;
+        let tags = record.tags_mut();
+        if let Some(mm) = mm_hm.get(&record_name) {
+            tags.push_string(b"MM", mm)
+        }
+        let tags = record.tags_mut();
+        if let Some(ml) = ml_hm.get(&record_name) {
+            let v = ml.as_bytes();
+            tags.push_array(b"ML", v);
+        }
+        w.write(&record)?;
+    }
+    w.finish()?;
+
+    // Remove destination bam, copy tmp bam to destination path and remove tmp bam
+    fs::remove_file(output_path).context(format!("Can't remove {output_path}"))?;
+    fs::copy(&tmp_bam, output_path).context(format!("Can't copy {tmp_bam} to {output_path}"))?;
+    fs::remove_file(tmp_bam)?;
+
+    // Index destination bam
+    duct::cmd!("samtools", "index", output_path).run()?;
+    Ok(())
+}
+
+// creates multiple BAM files, each containing all but two consecutive records from the input BAM file
+pub fn create_bam_leave_two_out(input_path: &str) -> anyhow::Result<Vec<PathBuf>> {
+    // Open the input BAM file
+    let mut reader = rust_htslib::bam::Reader::from_path(input_path)
+        .with_context(|| format!("Failed to open input BAM file: {}", input_path))?;
+    let header = rust_htslib::bam::Read::header(&reader).clone();
+    let header = rust_htslib::bam::Header::from_template(&header);
+
+    // Read all records into a vector
+    let mut records = Vec::new();
+    for record in rust_htslib::bam::Read::records(&mut reader) {
+        records.push(record?);
+    }
+
+    if records.len() < 3 {
+        warn!("Not enough reads");
+        return Ok(Vec::new());
+    }
+
+    let file_stem = PathBuf::from(input_path)
+        .file_stem()
+        .unwrap()
+        .to_str()
+        .unwrap()
+        .to_string();
+    let dir = PathBuf::from(input_path).parent().unwrap().to_owned();
+    let mut output_paths = Vec::new();
+
+    for i in 0..records.len() {
+        let output_filename = format!("{}/{file_stem}_{i}.bam", dir.display());
+        let output_path = PathBuf::from(&output_filename);
+        info!("Creating bam {}", output_path.display());
+
+        let mut writer = rust_htslib::bam::Writer::from_path(
+            &output_path,
+            &header,
+            rust_htslib::bam::Format::Bam,
+        )
+        .with_context(|| {
+            format!(
+                "Failed to create output BAM file: {}",
+                output_path.display()
+            )
+        })?;
+
+        // Write two records to the file, skipping the i-th record
+        for (j, record) in records.iter().enumerate() {
+            if j != i && j != (i + 1) % records.len() {
+                writer.write(record)?;
+            } else {
+                warn!(
+                    "{} removing {}",
+                    output_path.display(),
+                    String::from_utf8(record.qname().to_vec()).unwrap()
+                );
+            }
+        }
+
+        output_paths.push(output_path);
+    }
+    Ok(output_paths)
+}
+
+

+ 0 - 0
src/bed_reader.rs → src/io/bed.rs


+ 0 - 0
src/dict_reader.rs → src/io/dict.rs


+ 49 - 0
src/io/fasta.rs

@@ -0,0 +1,49 @@
+use log::info;
+use noodles_fasta::{
+    record::{Definition, Sequence},
+    writer::Builder,
+    Record,
+};
+use std::fs::File;
+
+pub fn records_to_fasta(r: &Vec<bam::Record>, fa_path: &str) -> anyhow::Result<()> {
+    let file = File::create(fa_path).unwrap();
+    info!("Writing fasta to {fa_path}");
+    let mut writer = Builder::default().build_with_writer(file);
+    let mut names = Vec::new();
+    for bam_record in r {
+        let name = bam_record.name();
+        if !names.contains(&name) {
+            names.push(name);
+            let sequence = bam_record.sequence().to_vec();
+            let record = Record::new(Definition::new(name, None), Sequence::from(sequence));
+            writer.write_record(&record)?;
+        }
+    }
+    Ok(())
+}
+
+pub fn write_fasta(output_fa: &str, d: &Vec<(String, String)>) {
+    let file = File::create(output_fa).unwrap();
+    let mut writer = Builder::default().build_with_writer(file);
+    let mut passed = Vec::new();
+    for (name, sequence) in d {
+        let name = name.to_string();
+        if sequence.is_empty() {
+            continue;
+        }
+        if passed.contains(&name) {
+            continue;
+        }
+        passed.push(name.clone());
+        let record = Record::new(
+            Definition::new(name.as_str(), None),
+            Sequence::from(sequence.as_bytes().to_vec()),
+        );
+        writer.write_record(&record).unwrap();
+    }
+}
+
+pub fn fai(path: &str) -> Result<std::process::Output, std::io::Error> {
+    duct::cmd!("samtools", "faidx", path).run()
+}

+ 24 - 0
src/io/fastq.rs

@@ -0,0 +1,24 @@
+use std::{fs::File, io::BufWriter};
+
+use log::info;
+use seq_io::fastq::write_parts;
+
+pub fn records_to_fastq(r: &Vec<bam::Record>, fq_path: &str) -> anyhow::Result<()> {
+    let file = File::create(fq_path).unwrap();
+    let mut writer = BufWriter::new(file);
+
+    info!("Writing fastq to {fq_path}");
+
+    let mut names = Vec::new();
+    for bam_record in r {
+        let name = bam_record.name();
+        if !names.contains(&name) {
+            names.push(name);
+            let sequence = bam_record.sequence().to_vec();
+            let qualities = bam_record.qualities().to_readable();
+            write_parts(&mut writer, name, None, &sequence, &qualities)?;
+        }
+    }
+    Ok(())
+}
+

+ 5 - 0
src/io/mod.rs

@@ -0,0 +1,5 @@
+pub mod bam;
+pub mod bed;
+pub mod dict;
+pub mod fasta;
+pub mod fastq;

Plik diff jest za duży
+ 781 - 1031
src/lib.rs


Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików