Thomas 1 жил өмнө
parent
commit
23edb94149
8 өөрчлөгдсөн 619 нэмэгдсэн , 102 устгасан
  1. 11 0
      Cargo.lock
  2. 1 0
      Cargo.toml
  3. 167 0
      src/assembler/canu.rs
  4. 72 2
      src/assembler/mod.rs
  5. 100 64
      src/assembler/spades.rs
  6. 131 0
      src/assembler/spoa.rs
  7. 57 19
      src/io/bam.rs
  8. 80 17
      src/lib.rs

+ 11 - 0
Cargo.lock

@@ -719,6 +719,7 @@ dependencies = [
  "rayon",
  "regex",
  "rust-htslib",
+ "rust-spoa",
  "seq_io",
  "serde",
  "thiserror",
@@ -881,6 +882,16 @@ dependencies = [
  "url",
 ]
 
+[[package]]
+name = "rust-spoa"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3e2b549b9af17763a1e6f040e8aa2e5c1cbe4a762390b5a1798fdc24971cf35"
+dependencies = [
+ "cc",
+ "cmake",
+]
+
 [[package]]
 name = "rustc-hash"
 version = "1.1.0"

+ 1 - 0
Cargo.toml

@@ -23,4 +23,5 @@ csv = "1.3.0"
 serde = { version = "1.0.209", features = ["derive"] }
 thiserror = "1.0.63"
 seq_io = "0.3.2"
+rust-spoa = "0.2.4"
 

+ 167 - 0
src/assembler/canu.rs

@@ -0,0 +1,167 @@
+use std::{
+    fs,
+    io::{BufRead, BufReader},
+    path::{Path, PathBuf},
+    process::{Command, Stdio},
+};
+
+use log::{info, warn};
+use seq_io::fasta::Record;
+
+use crate::{
+    assembler::AssembleError,
+    io::{bam::read_bam, fastq::records_to_fastq},
+};
+
+use super::{calculate_shannon_entropy, default_save, Assemble, AssembleConfig};
+
+#[derive(Debug, Clone)]
+pub struct CanuConfig {
+    pub output_dir: PathBuf,
+
+    pub min_cov: u16,
+    pub min_reads: u32,
+    pub min_entropy: f64,
+    pub threads: u16,
+    pub canu_bin: String,
+}
+
+impl Default for CanuConfig {
+    fn default() -> Self {
+        Self {
+            output_dir: PathBuf::new(),
+            min_cov: 2,
+            min_reads: 3,
+            min_entropy: 0.2,
+            threads: 12,
+            canu_bin: "/data/tools/canu/build/bin/canu".to_string(),
+        }
+    }
+}
+
+impl CanuConfig {
+    pub fn new(output_dir: &str) -> Self {
+        CanuConfig {
+            output_dir: PathBuf::from(output_dir),
+            ..Default::default()
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Canu {
+    pub config: CanuConfig,
+    pub input_id: String,
+    pub tmp_dir: String,
+    pub input_fq: String,
+
+    pub input_records: Vec<bam::Record>,
+    pub on_contig_bam: String,
+    pub contigs: Option<Vec<String>>,
+}
+
+impl Assemble for Canu {
+    fn init(input_bam: &std::path::Path, config: &AssembleConfig) -> anyhow::Result<Self> {
+        match config {
+            super::AssembleConfig::Canu(config) => {
+                let input_id = input_bam.file_stem().unwrap().to_str().unwrap().to_string();
+                let input_records = read_bam(input_bam)?;
+
+                let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
+                info!("Creating tmp directory {tmp_dir}");
+                fs::create_dir(&tmp_dir)?;
+
+                let input_fq = format!("{}/{}.fastq", tmp_dir, input_id);
+                if !Path::new(&input_fq).exists() {
+                    records_to_fastq(&input_records, &input_fq)?;
+                }
+
+                Ok(Self {
+                    config: config.clone(),
+                    input_id,
+                    input_records,
+                    on_contig_bam: String::new(),
+                    contigs: None,
+                    tmp_dir,
+                    input_fq,
+                })
+            }
+            _ => Err(anyhow::anyhow!("Wrong config format for Canu.")),
+        }
+    }
+
+    fn assemble(mut self) -> anyhow::Result<Self> {
+        let tmp_dir = &self.tmp_dir;
+        let input_fq = &self.input_fq;
+
+        let canu_tmp_dir = format!("{tmp_dir}/canu");
+        fs::create_dir(&canu_tmp_dir).unwrap();
+
+        run_canu(input_fq, &canu_tmp_dir, &self.config);
+
+        // let contigs_path = format!("{tmp_dir}/canu.cns.fa");
+        // warn!("reading {}", contigs_path);
+        //
+        // if Path::new(&contigs_path).exists() {
+        //     let mut reader = seq_io::fasta::Reader::from_path(&contigs_path)?;
+        //
+        //     let mut contigs = Vec::new();
+        //     for result in reader.records() {
+        //         let record = result?;
+        //         let seq = record.seq();
+        //         let seq = String::from_utf8(seq.to_vec())?;
+        //         if calculate_shannon_entropy(&seq) >= self.config.min_entropy {
+        //             contigs.push(seq);
+        //         }
+        //     }
+        //     self.contigs = Some(contigs);
+        // } else {
+        //     anyhow::bail!(AssembleError::NoContig(self.input_id));
+        // }
+
+        Ok(self)
+    }
+
+    fn save(self) -> anyhow::Result<()> {
+        Ok(())
+        // default_save(
+        //     "canu",
+        //     self.contigs,
+        //     self.input_id,
+        //     self.config.output_dir,
+        //     self.input_fq,
+        //     self.input_records,
+        //     self.tmp_dir,
+        // )
+    }
+}
+
+pub fn run_canu(input_fq: &str, tmp_dir: &str, config: &CanuConfig) {
+    info!("Running Canu for {input_fq}");
+    let prefix = PathBuf::from(input_fq).file_stem().unwrap().to_owned();
+    let mut cmd = Command::new(&config.canu_bin)
+        .arg("-p")
+        .arg(prefix)
+        .arg("-d")
+        .arg(tmp_dir)
+        .arg("genomeSize=25k")
+        .arg("minInputCoverage=2")
+        .arg("stopOnLowCoverage=2")
+        .arg("-nanopore")
+        .arg(input_fq)
+        .stderr(Stdio::piped())
+        .spawn()
+        .expect("Canu failed to start");
+
+    let stderr = cmd.stderr.take().unwrap();
+    let reader = BufReader::new(stderr);
+    reader
+        .lines()
+        .map_while(Result::ok)
+        .inspect(|o| info!("{o}"))
+        .filter(|line| line.contains("ERROR"))
+        .for_each(|line| warn!("[Canu] {line}"));
+
+    cmd.wait().unwrap();
+    cmd.kill().unwrap();
+}

+ 72 - 2
src/assembler/mod.rs

@@ -1,14 +1,21 @@
 use std::{
-    collections::HashMap, fs, path::{Path, PathBuf}
+    collections::HashMap,
+    fs,
+    path::{Path, PathBuf},
 };
 
 use anyhow::{Context, Ok};
 use log::{info, warn};
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use thiserror::Error;
 
-use crate::io::{bam::{all_bam_paths, cp_mod_tags, create_bam_leave_two_out, remap_bam, run_modkit}, fasta::{fai, write_fasta}};
+use crate::io::{
+    bam::{all_bam_paths, cp_mod_tags, create_bam_leave_two_out, remap_bam, run_modkit},
+    fasta::{fai, write_fasta},
+};
 
 use self::{
+    canu::{Canu, CanuConfig},
     flye::{Flye, FlyeConfig},
     spades::{Spades, SpadesConfig},
     wtdbg2::{Wtdbg2, Wtdbg2Config},
@@ -16,6 +23,8 @@ use self::{
 
 pub mod flye;
 pub mod spades;
+// pub mod spoa;
+pub mod canu;
 pub mod wtdbg2;
 
 pub trait Assemble: Sized {
@@ -24,10 +33,13 @@ pub trait Assemble: Sized {
     fn save(self) -> anyhow::Result<()>;
 }
 
+#[derive(Debug, Clone)]
 pub enum AssembleConfig {
     Flye(FlyeConfig),
     Spades(SpadesConfig),
     Wtdbg2(Wtdbg2Config),
+    // Spoa(SpoaConfig),
+    Canu(CanuConfig),
 }
 
 pub fn assemble<A: Assemble>(
@@ -100,6 +112,14 @@ pub fn assemble_records(
                 "wtdbg2".to_string(),
                 assemble::<Wtdbg2>(bam_input, config, max_depth, 0),
             ),
+            // AssembleConfig::Spoa(_) => (
+            //     "spoa".to_string(),
+            //     assemble::<Spoa>(bam_input, config, max_depth, 0),
+            // ),
+            AssembleConfig::Canu(_) => (
+                "canu".to_string(),
+                assemble::<Canu>(bam_input, config, max_depth, 0),
+            ),
         };
 
         results.push((bam_input.to_owned(), r.0, r.1));
@@ -118,6 +138,56 @@ pub fn assemble_dir(
         .collect())
 }
 
+pub fn assemble_whole(
+    input_dir: &str,
+    configs: &[AssembleConfig],
+    max_depth: u8,
+) -> anyhow::Result<()> {
+    let paths = all_bam_paths(input_dir)?;
+    let bams: Vec<_> = paths
+        .iter()
+        .filter(|path| {
+            path.file_name()
+                .and_then(|name| name.to_str())
+                .map(|name| name.matches('_').count() == 1)
+                .unwrap_or(false)
+        })
+        .collect();
+
+    bams.par_iter().for_each(|bam_path| {
+        let contig = bam_path
+            .parent()
+            .and_then(|p| p.file_name())
+            .and_then(|p| p.to_str())
+            .unwrap()
+            .to_string();
+        let mut configs = configs.to_owned();
+        configs.iter_mut().for_each(|c| match c {
+            AssembleConfig::Flye(c) => {
+                c.threads = 1;
+                c.output_dir.push(&contig);
+            }
+            AssembleConfig::Spades(c) => {
+                c.threads = 1;
+                c.output_dir.push(&contig);
+            }
+            AssembleConfig::Wtdbg2(c) => {
+                c.threads = 1;
+                c.output_dir.push(&contig);
+            }
+            AssembleConfig::Canu(c) => {
+                c.threads = 1;
+                c.output_dir.push(&contig);
+            }
+        });
+
+        info!("trying to assemble {}", bam_path.display());
+        let _ = assemble_records(bam_path, &configs, max_depth);
+    });
+
+    Ok(())
+}
+
 #[derive(Error, Debug)]
 pub enum AssembleError {
     #[error("No contig assembled: {0}")]

+ 100 - 64
src/assembler/spades.rs

@@ -5,19 +5,15 @@ use std::{
     process::{Command, Stdio},
 };
 
-use log::{info, warn};
+use log::{error, info, warn};
 use seq_io::fasta::Record;
 
 use crate::{
-    assembler::{remove_bwa_indices, AssembleError},
-    io::{
-        bam::{cp_mod_tags, read_bam, run_modkit},
-        fasta::{fai, write_fasta},
-        fastq::records_to_fastq,
-    },
+    assembler::AssembleError,
+    io::{bam::read_bam, fastq::records_to_fastq},
 };
 
-use super::{calculate_shannon_entropy, Assemble, AssembleConfig};
+use super::{calculate_shannon_entropy, default_save, Assemble, AssembleConfig};
 
 #[derive(Debug, Clone)]
 pub struct SpadesConfig {
@@ -126,52 +122,62 @@ impl Assemble for Spades {
     }
 
     fn save(self) -> anyhow::Result<()> {
-        if self.contigs.is_none() {
-            anyhow::bail!(AssembleError::NoContig(self.input_id));
-        }
-
-        for (i, contig) in self.contigs.unwrap().iter().enumerate() {
-            let suffixe = if i == 0 {
-                "".to_string()
-            } else {
-                format!("_{i}")
-            };
-
-            if !self.config.output_dir.exists() {
-                fs::create_dir_all(&self.config.output_dir)?;
-            }
-
-            let contig_id = format!("{}{suffixe}_spades", self.input_id);
-            let contig_fa = format!("{}/{contig_id}.fa", self.config.output_dir.display());
-
-            info!("Saving contig {contig_id} in {contig_fa}");
-            write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())])?;
-            fai(&contig_fa)?;
-
-            // Remaping input bam to contig
-            info!("Mapping  input reads to {contig_id}");
-            let new_bam = format!("{}/{contig_id}.bam", self.config.output_dir.display());
-            duct::cmd!("bwa", "index", contig_fa.clone()).run()?;
-            let bwa = format!("bwa mem {contig_fa} {}", self.input_fq);
-            let samtools = "samtools sort /dev/stdin";
-            let pipe = format!("{bwa} | {samtools} > {new_bam}");
-            duct::cmd!("bash", "-c", pipe).run()?;
-
-            // clean bwa indices
-            remove_bwa_indices(&contig_fa)?;
-
-            // Copy modified base tags to new bam
-            cp_mod_tags(&self.input_records, &new_bam)?;
-
-            // Run modkit
-            let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.config.output_dir.display());
-            run_modkit(&new_bam, &contig_fa, &modkit_pileup)?;
-        }
-
-        // Cleaning
-        fs::remove_dir_all(self.tmp_dir)?;
-
-        Ok(())
+        default_save(
+            "spades",
+            self.contigs,
+            self.input_id,
+            self.config.output_dir,
+            self.input_fq,
+            self.input_records,
+            self.tmp_dir,
+        )
+
+        //     if self.contigs.is_none() {
+        //         anyhow::bail!(AssembleError::NoContig(self.input_id));
+        //     }
+        //
+        //     for (i, contig) in self.contigs.unwrap().iter().enumerate() {
+        //         let suffixe = if i == 0 {
+        //             "".to_string()
+        //         } else {
+        //             format!("_{i}")
+        //         };
+        //
+        //         if !self.config.output_dir.exists() {
+        //             fs::create_dir_all(&self.config.output_dir)?;
+        //         }
+        //
+        //         let contig_id = format!("{}{suffixe}_spades", self.input_id);
+        //         let contig_fa = format!("{}/{contig_id}.fa", self.config.output_dir.display());
+        //
+        //         info!("Saving contig {contig_id} in {contig_fa}");
+        //         write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())])?;
+        //         fai(&contig_fa)?;
+        //
+        //         // Remaping input bam to contig
+        //         info!("Mapping  input reads to {contig_id}");
+        //         let new_bam = format!("{}/{contig_id}.bam", self.config.output_dir.display());
+        //         duct::cmd!("bwa", "index", contig_fa.clone()).run()?;
+        //         let bwa = format!("bwa mem {contig_fa} {}", self.input_fq);
+        //         let samtools = "samtools sort /dev/stdin";
+        //         let pipe = format!("{bwa} | {samtools} > {new_bam}");
+        //         duct::cmd!("bash", "-c", pipe).run()?;
+        //
+        //         // clean bwa indices
+        //         remove_bwa_indices(&contig_fa)?;
+        //
+        //         // Copy modified base tags to new bam
+        //         cp_mod_tags(&self.input_records, &new_bam)?;
+        //
+        //         // Run modkit
+        //         let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.config.output_dir.display());
+        //         run_modkit(&new_bam, &contig_fa, &modkit_pileup)?;
+        //     }
+        //
+        //     // Cleaning
+        //     fs::remove_dir_all(self.tmp_dir)?;
+        //
+        //     Ok(())
     }
 }
 
@@ -190,19 +196,49 @@ pub fn run_spades(input_fq: &str, tmp_dir: &str, config: &SpadesConfig) {
         .arg(input_fq)
         .arg("-o")
         .arg(tmp_dir)
+        .stdout(Stdio::piped())
         .stderr(Stdio::piped())
         .spawn()
         .expect("Spades failed to start");
 
+    let stdout = cmd.stdout.take().unwrap();
     let stderr = cmd.stderr.take().unwrap();
-    let reader = BufReader::new(stderr);
-    reader
-        .lines()
-        .map_while(Result::ok)
-        .inspect(|o| info!("{o}"))
-        .filter(|line| line.contains("ERROR"))
-        .for_each(|line| warn!("[Spades] {line}"));
-
-    cmd.wait().unwrap();
-    cmd.kill().unwrap();
+
+    let stdout_reader = BufReader::new(stdout);
+    let stderr_reader = BufReader::new(stderr);
+
+    // Read stdout
+    std::thread::spawn(move || {
+        let _ = stdout_reader
+            .lines()
+            .map_while(Result::ok)
+            /* .for_each(|line| info!("[Spades stdout] {line}")) */;
+    });
+
+    // Read stderr
+    std::thread::spawn(move || {
+        stderr_reader
+            .lines()
+            .map_while(Result::ok)
+            .filter(|line| line.contains("ERROR"))
+            .for_each(|line| warn!("[Spades stderr] {line}"));
+    });
+
+    let status = cmd.wait().expect("Failed to wait on Spades");
+
+    if !status.success() {
+        error!("Spades failed with exit code: {}", status);
+    }
+
+    // let stderr = cmd.stderr.take().unwrap();
+    // let reader = BufReader::new(stderr);
+    // reader
+    //     .lines()
+    //     .map_while(Result::ok)
+    //     // .inspect(|o| info!("{o}"))
+    //     .filter(|line| line.contains("ERROR"));
+    //     .for_each(|line| warn!("[Spades] {line}"));
+    //
+    // cmd.wait().unwrap();
+    // cmd.kill().unwrap();
 }

+ 131 - 0
src/assembler/spoa.rs

@@ -0,0 +1,131 @@
+// https://github.com/pjedge/rust-spoa
+
+// use poa_consensus;
+
+use std::{fs, path::{Path, PathBuf}};
+
+use log::info;
+
+use crate::io::{bam::read_bam, fastq::records_to_fastq};
+
+use super::{default_save, Assemble, AssembleConfig};
+
+#[derive(Debug, Clone)]
+pub struct SpoaConfig {
+    pub output_dir: PathBuf,
+    pub consensus_max_length: usize,
+    pub alignment_type: i32,
+    pub match_score: i32,
+    pub mismatch_score: i32,
+    pub gap_open: i32,
+    pub gap_extend: i32,
+}
+impl Default for SpoaConfig {
+    fn default() -> Self {
+        Self {
+            consensus_max_length: 100_000,
+            alignment_type: 1,
+            match_score: 5,
+            mismatch_score: -4,
+            gap_open: -3,
+            gap_extend: -1,
+            output_dir: PathBuf::default(),
+        }
+    }
+}
+
+impl SpoaConfig {
+    pub fn new(output_dir: &str) -> Self {
+        SpoaConfig {
+            output_dir: PathBuf::from(output_dir),
+            ..Default::default()
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Spoa {
+    pub config: SpoaConfig,
+    pub input_id: String,
+    pub tmp_dir: String,
+    pub input_fq: String,
+
+    pub input_records: Vec<bam::Record>,
+    pub contigs: Option<Vec<String>>,
+}
+
+impl Assemble for Spoa {
+    fn init(input_bam: &std::path::Path, config: &AssembleConfig) -> anyhow::Result<Self> {
+        match config {
+            AssembleConfig::Spoa(config) => {
+                let input_id = input_bam.file_stem().unwrap().to_str().unwrap().to_string();
+                let input_records = read_bam(input_bam)?;
+
+                let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
+                info!("Creating tmp directory {tmp_dir}");
+                fs::create_dir(&tmp_dir)?;
+
+                let input_fq = format!("{}/{}.fastq", tmp_dir, input_id);
+                if !Path::new(&input_fq).exists() {
+                    records_to_fastq(&input_records, &input_fq)?;
+                }
+
+                Ok(Self {
+                    config: config.clone(),
+                    input_id,
+                    input_records,
+                    contigs: None,
+                    tmp_dir,
+                    input_fq,
+                })
+            }
+            _ => Err(anyhow::anyhow!("Wrong config format for Spoa.")),
+        }
+    }
+
+    fn assemble(mut self) -> anyhow::Result<Self> {
+        let sequences: Vec<Vec<u8>> = self
+            .input_records
+            .iter()
+            .map(|r| {
+                let mut seq = r.sequence().to_vec();
+                seq.push(0);
+                seq
+            })
+            .collect();
+
+        self.contigs = Some(vec![String::from_utf8(poa_consensus(
+            &sequences,
+            &self.config,
+        ))?]);
+
+        Ok(self)
+    }
+
+    fn save(self) -> anyhow::Result<()> {
+        default_save(
+            "spoa",
+            self.contigs,
+            self.input_id,
+            self.config.output_dir,
+            self.input_fq,
+            self.input_records,
+            self.tmp_dir,
+        )
+        // info!("{:#?}", self.contigs);
+        // info!("{:#?}", self.contigs.unwrap().len());
+        // Ok(())
+    }
+}
+
+pub fn poa_consensus(seq: &Vec<Vec<u8>>, config: &SpoaConfig) -> Vec<u8> {
+    rust_spoa::poa_consensus(
+        seq,
+        config.consensus_max_length,
+        config.alignment_type,
+        config.match_score,
+        config.mismatch_score,
+        config.gap_open,
+        config.gap_extend,
+    )
+}

+ 57 - 19
src/io/bam.rs

@@ -34,6 +34,10 @@ pub fn read_bam(input_bam: &Path) -> anyhow::Result<Vec<Record>> {
 }
 
 pub fn cp_mod_tags(input_records: &Vec<Record>, output_path: &str) -> anyhow::Result<()> {
+    // omit tags representing alignement values
+    let omit_tags = [
+        b"NM", b"ms", b"AS", b"nn", b"de", b"tp", b"cm", b"s1", b"s2", b"MD", b"rl",
+    ];
     info!("Transferring SAM tags from records to {output_path}");
     // Read tags from input tags
     let mut mm_hm = HashMap::new();
@@ -42,8 +46,7 @@ pub fn cp_mod_tags(input_records: &Vec<Record>, output_path: &str) -> anyhow::Re
     for record in input_records {
         // let record = record?;
         let query_name = String::from_utf8(record.name().to_vec())?;
-        let u = record.tags();
-        tags_hm.insert(query_name.clone(), u.clone());
+        tags_hm.insert(query_name.clone(), record.tags().clone());
         if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") {
             mm_hm.insert(query_name.clone(), value.to_vec());
         }
@@ -70,16 +73,36 @@ pub fn cp_mod_tags(input_records: &Vec<Record>, output_path: &str) -> anyhow::Re
         let mut record = record.clone();
         let record_name = String::from_utf8(record.name().to_vec())?;
         let tags = record.tags_mut();
-        if let Some(all) = tags_hm.get(&record_name) {
-            *tags = all.clone();
+
+        if let Some(from) = tags_hm.get(&record_name) {
+            from.iter().filter(|(name, _)| !omit_tags.contains(&name)).for_each(|(name, tag)| {
+                match tag {
+                    TagValue::Char(v) => tags.push_char(&name, v),
+                    TagValue::Int(v, iv) => match iv {
+                        bam::record::tags::IntegerType::I8 => tags.push_num(&name, v as i8),
+                        bam::record::tags::IntegerType::U8 => tags.push_num(&name, v as u8),
+                        bam::record::tags::IntegerType::I16 => tags.push_num(&name, v as i16),
+                        bam::record::tags::IntegerType::U16 => tags.push_num(&name, v as u16),
+                        bam::record::tags::IntegerType::I32 => tags.push_num(&name, v as i32),
+                        bam::record::tags::IntegerType::U32 => tags.push_num(&name, v as u32),
+                    },
+                    TagValue::Float(v) => tags.push_num(&name, v),
+                    TagValue::String(v, iv) => match iv {
+                        StringType::String => tags.push_string(&name, v),
+                        StringType::Hex => tags.push_hex(&name, v),
+                    },
+                    TagValue::IntArray(arr) => tags.push_array(&name, arr.raw()),
+                    TagValue::FloatArray(arr) => tags.push_array(&name, arr.raw()),
+                }
+            })
         }
         // if let Some(mm) = mm_hm.get(&record_name) {
         //     tags.push_string(b"MM", mm)
         // }
         // let tags = record.tags_mut();
         // if let Some(ml) = ml_hm.get(&record_name) {
-        //     let v = ml.as_bytes();
-        //     tags.push_array(b"ML", v);
+        //     // let v = ml.bytes();
+        //     tags.push_array(b"ML", ml);
         // }
         w.write(&record)?;
     }
@@ -146,7 +169,7 @@ pub fn create_bam_leave_two_out(input_path: &str) -> anyhow::Result<Vec<PathBuf>
                 writer.write(record)?;
             } else {
                 warn!(
-                    "{} removing {}",
+                    "{} removing qname {}",
                     output_path.display(),
                     String::from_utf8(record.qname().to_vec()).unwrap()
                 );
@@ -168,14 +191,17 @@ pub fn create_bam_leave_two_out(input_path: &str) -> anyhow::Result<Vec<PathBuf>
 // }
 pub fn remap_bam(reference: &str, input_seq: &str, output_bam: &str) -> anyhow::Result<()> {
     info!("Remaping {input_seq} to {reference} into {output_bam}");
-     // Index the reference
+    // Index the reference
     let index_output = Command::new("bwa")
         .args(["index", reference])
         .output()
         .context("Failed to run bwa index")?;
 
     if !index_output.status.success() {
-        anyhow::bail!("bwa index failed: {}", String::from_utf8_lossy(&index_output.stderr));
+        anyhow::bail!(
+            "bwa index failed: {}",
+            String::from_utf8_lossy(&index_output.stderr)
+        );
     }
 
     // Prepare the bwa mem command
@@ -184,7 +210,8 @@ pub fn remap_bam(reference: &str, input_seq: &str, output_bam: &str) -> anyhow::
     bwa_command.stdout(Stdio::piped());
     bwa_command.stderr(Stdio::piped());
 
-    let mut bwa_process = bwa_command.spawn()
+    let mut bwa_process = bwa_command
+        .spawn()
         .context("Failed to spawn bwa mem command")?;
 
     // Prepare the samtools sort command
@@ -192,23 +219,28 @@ pub fn remap_bam(reference: &str, input_seq: &str, output_bam: &str) -> anyhow::
     samtools_command.args(["sort", "/dev/stdin"]);
     samtools_command.stdin(Stdio::piped());
     samtools_command.stdout(Stdio::from(
-        std::fs::File::create(output_bam).context("Failed to create output BAM file")?
+        std::fs::File::create(output_bam).context("Failed to create output BAM file")?,
     ));
     samtools_command.stderr(Stdio::piped());
 
-    let mut samtools_process = samtools_command.spawn()
+    let mut samtools_process = samtools_command
+        .spawn()
         .context("Failed to spawn samtools command")?;
 
     // Connect bwa stdout to samtools stdin
-    if let (Some(mut bwa_stdout), Some(mut samtools_stdin)) = (bwa_process.stdout.take(), samtools_process.stdin.take()) {
-        std::thread::spawn(move || {
-            std::io::copy(&mut bwa_stdout, &mut samtools_stdin)
-        });
+    if let (Some(mut bwa_stdout), Some(mut samtools_stdin)) =
+        (bwa_process.stdout.take(), samtools_process.stdin.take())
+    {
+        std::thread::spawn(move || std::io::copy(&mut bwa_stdout, &mut samtools_stdin));
     }
 
     // Capture stderr from bwa
     let mut bwa_stderr = String::new();
-    bwa_process.stderr.take().unwrap().read_to_string(&mut bwa_stderr)?;
+    bwa_process
+        .stderr
+        .take()
+        .unwrap()
+        .read_to_string(&mut bwa_stderr)?;
 
     // Wait for bwa to finish
     let bwa_status = bwa_process.wait().context("Failed to wait for bwa mem")?;
@@ -218,10 +250,16 @@ pub fn remap_bam(reference: &str, input_seq: &str, output_bam: &str) -> anyhow::
 
     // Capture stderr from samtools
     let mut samtools_stderr = String::new();
-    samtools_process.stderr.take().unwrap().read_to_string(&mut samtools_stderr)?;
+    samtools_process
+        .stderr
+        .take()
+        .unwrap()
+        .read_to_string(&mut samtools_stderr)?;
 
     // Wait for samtools to finish
-    let samtools_status = samtools_process.wait().context("Failed to wait for samtools")?;
+    let samtools_status = samtools_process
+        .wait()
+        .context("Failed to wait for samtools")?;
     if !samtools_status.success() {
         anyhow::bail!("samtools sort failed: {}", samtools_stderr);
     }

+ 80 - 17
src/lib.rs

@@ -5,7 +5,9 @@ pub mod io;
 mod tests {
     use std::path::PathBuf;
 
-    use crate::assembler::{spades::SpadesConfig, wtdbg2::Wtdbg2Config};
+    use crate::assembler::{
+        assemble_whole, canu::CanuConfig, spades::SpadesConfig, wtdbg2::Wtdbg2Config,
+    };
 
     use self::assembler::{assemble_records, flye::FlyeConfig, AssembleConfig};
 
@@ -29,7 +31,7 @@ mod tests {
         let res = assemble_records(
             &PathBuf::from(bam),
             &vec![AssembleConfig::Flye(FlyeConfig::new("/tmp/test_ass_flye"))],
-            1
+            1,
         );
         println!("{res:#?}");
     }
@@ -48,7 +50,7 @@ mod tests {
         let res = assemble_records(
             &PathBuf::from(bam),
             &vec![AssembleConfig::Flye(FlyeConfig::new("/tmp/test_ass_flye"))],
-            1
+            1,
         );
         println!("{res:#?}");
     }
@@ -56,13 +58,13 @@ mod tests {
     #[test]
     fn spades_bam() {
         init();
-        // let id = "ROBIN";
-        // let input_id = "143063957-143063957_8a24";
-        // let chr = "chr10";
+        let id = "ROBIN";
+        let input_id = "143063957-143063957_8a24";
+        let chr = "chr9";
 
-        let id = "LEVASSEUR";
-        let input_id = "12677262-102018167_2c45";
-        let chr = "chr10";
+        // let id = "LEVASSEUR";
+        // let input_id = "12677262-102018167_2c45";
+        // let chr = "chr10";
 
         let res_dir = "/data/longreads_basic_pipe";
         let asm_dir = format!("{res_dir}/{id}/diag/scan/reads/{chr}");
@@ -70,10 +72,28 @@ mod tests {
 
         let res = assemble_records(
             &PathBuf::from(bam),
-            &vec![AssembleConfig::Spades(SpadesConfig::new(
-                "/data/spades",
-            ))],
-            1
+            &vec![AssembleConfig::Spades(SpadesConfig::new("/data/spades"))],
+            1,
+        );
+        println!("{res:#?}");
+    }
+
+    #[test]
+    fn spades_whole() {
+        init();
+        let id = "ROBIN";
+
+        let res_dir = "/data/longreads_basic_pipe";
+        let asm_dir = format!("{res_dir}/{id}/diag/scan/reads");
+        let output_dir = format!("{res_dir}/{id}/diag/assemblies");
+
+        let res = assemble_whole(
+            &asm_dir,
+            &[
+                AssembleConfig::Spades(SpadesConfig::new(&output_dir)),
+                AssembleConfig::Wtdbg2(Wtdbg2Config::new(&output_dir)),
+            ],
+            1,
         );
         println!("{res:#?}");
     }
@@ -95,10 +115,53 @@ mod tests {
 
         let res = assemble_records(
             &PathBuf::from(bam),
-            &vec![AssembleConfig::Wtdbg2(Wtdbg2Config::new(
-                "/data/wtdbg2",
-            ))],
-            1
+            &vec![AssembleConfig::Wtdbg2(Wtdbg2Config::new("/data/wtdbg2"))],
+            1,
+        );
+        println!("{res:#?}");
+    }
+
+    // #[test]
+    // fn spoa_bam() {
+    //     init();
+    //     // let id = "LEVASSEUR";
+    //     // let input_id = "12677262-102018167_2c45";
+    //     // let chr = "chr10";
+    //
+    //     let id = "ROBIN";
+    //     let input_id = "143063957-143063957_8a24";
+    //     let chr = "chr9";
+    //
+    //     let res_dir = "/data/longreads_basic_pipe";
+    //     let asm_dir = format!("{res_dir}/{id}/diag/scan/reads/{chr}");
+    //     let bam = format!("{asm_dir}/{input_id}.bam");
+    //
+    //     let res = assemble_records(
+    //         &PathBuf::from(bam),
+    //         &vec![AssembleConfig::Spoa(SpoaConfig::new("/data/spoa"))],
+    //         1,
+    //     );
+    //     println!("{res:#?}");
+    // }
+    #[test]
+    fn canu_bam() {
+        init();
+        // let id = "LEVASSEUR";
+        // let input_id = "12677262-102018167_2c45";
+        // let chr = "chr10";
+
+        let id = "ROBIN";
+        let input_id = "143063957-143063957_8a24";
+        let chr = "chr9";
+
+        let res_dir = "/data/longreads_basic_pipe";
+        let asm_dir = format!("{res_dir}/{id}/diag/scan/reads/{chr}");
+        let bam = format!("{asm_dir}/{input_id}.bam");
+
+        let res = assemble_records(
+            &PathBuf::from(bam),
+            &vec![AssembleConfig::Canu(CanuConfig::new("/data/canu"))],
+            1,
         );
         println!("{res:#?}");
     }