瀏覽代碼

flye ok + dir

Thomas 1 年之前
父節點
當前提交
5efdae5fe2
共有 4 個文件被更改,包括 269 次插入69 次删除
  1. 5 0
      Cargo.lock
  2. 4 1
      Cargo.toml
  3. 1 1
      src/flye.rs
  4. 259 67
      src/lib.rs

+ 5 - 0
Cargo.lock

@@ -742,6 +742,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab"
 dependencies = [
  "log",
+ "regex",
 ]
 
 [[package]]
@@ -766,6 +767,7 @@ dependencies = [
  "anstream",
  "anstyle",
  "env_filter",
+ "humantime",
  "log",
 ]
 
@@ -2003,13 +2005,16 @@ dependencies = [
  "anyhow",
  "bam",
  "duct",
+ "env_logger 0.11.5",
  "flate2",
+ "glob",
  "log",
  "nom",
  "noodles-fasta 0.41.0",
  "noodles-fastq",
  "pandora_lib_blastn",
  "petgraph",
+ "regex",
  "rust-htslib 0.46.0",
  "uuid",
 ]

+ 4 - 1
Cargo.toml

@@ -4,6 +4,8 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
+log = "^0.4.22"
+env_logger = "^0.11.5"
 anyhow = "1.0.86"
 duct = "0.13.7"
 noodles-fasta = "0.41.0"
@@ -14,6 +16,7 @@ pandora_lib_blastn = { git = "https://git.t0m4.fr/Thomas/pandora_lib_blastn.git"
 nom = "7.1.3"
 petgraph = "0.6.5"
 bam = "0.1.4"
-log = "0.4.22"
 flate2 = "1.0.30"
+glob = "0.3.1"
+regex = "1.10.5"
 

+ 1 - 1
src/flye.rs

@@ -4,7 +4,7 @@ use std::{
 };
 
 pub fn run_flye(fasta_path: &str, tmp_dir: &str, _size: &str) {
-    info!("Running Flye in {tmp_dir}");
+    info!("Running Flye for {fasta_path}");
     let flye_bin = "/data/tools/Flye/bin/flye";
     let mut cmd = Command::new(flye_bin)
         .arg("--out-dir")

+ 259 - 67
src/lib.rs

@@ -1,6 +1,14 @@
 pub mod flye;
 
-use bam::{header::HeaderEntry, RecordWriter};
+use anyhow::anyhow;
+use bam::{
+    header::HeaderEntry,
+    record::tags::{StringType, TagValue},
+    RecordWriter,
+};
+use flye::{read_flye_coverage, run_flye};
+use log::{info, warn};
+use nom::AsBytes;
 use pandora_lib_blastn::BlastResult;
 use petgraph::{
     algo::dijkstra,
@@ -10,6 +18,7 @@ use petgraph::{
     stable_graph::StableUnGraph,
     visit::{EdgeRef, IntoNeighbors, NodeIndexable},
 };
+use regex::Regex;
 use rust_htslib::bam::{Read, Reader, Record};
 use std::{
     collections::{HashMap, HashSet, VecDeque},
@@ -83,24 +92,39 @@ pub fn lastz_two_by_two(records: &[String], best: bool) -> anyhow::Result<Vec<Bl
 
 pub fn lastz_bam(query_path: &str, subject_path: &str) -> anyhow::Result<Vec<bam::Record>> {
     let lastz_bin = "lastz";
-    let pipe = format!("{lastz_bin} --format=sam {query_path} {subject_path} | samtools view -b");
+    let pipe =
+        format!("{lastz_bin} --format=softsam {query_path} {subject_path} | samtools view -b");
     let out = duct::cmd!("bash", "-c", pipe).reader()?;
     let reader = bam::BamReader::from_stream(out, 4)?;
     Ok(reader.into_iter().filter_map(anyhow::Result::ok).collect())
 }
 
-pub fn lastz_remap(bam_path: &str, ref_fa: &str, ref_len: u32, new_bam: &str) -> anyhow::Result<()> {
+pub fn lastz_remap(
+    bam_path: &str,
+    ref_fa: &str,
+    ref_len: u32,
+    new_bam: &str,
+    modkit_pileup: &str,
+) -> anyhow::Result<()> {
     let tmp = "/tmp";
     let tmp_dir = format!("{tmp}/{}", Uuid::new_v4());
     fs::create_dir(&tmp_dir)?;
 
     let mut records = Vec::new();
+    let mut mm_hm = HashMap::new();
+    let mut ml_hm = HashMap::new();
     let reader = bam::BamReader::from_path(bam_path, 4).unwrap();
     for record in reader {
         let record = record?;
         let query_name = String::from_utf8(record.name().to_vec())?;
+        if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") {
+            mm_hm.insert(query_name.clone(), value.to_vec());
+        }
+        if let Some(TagValue::IntArray(value)) = record.tags().get(b"ML") {
+            ml_hm.insert(query_name.clone(), value.raw().to_vec());
+        }
+
         let query_path = format!("{}/{}.fa", &tmp_dir, query_name);
-        println!("writing fasta {}", &query_path);
         let query_seq = String::from_utf8(record.sequence().to_vec())?;
 
         write_fasta(&query_path, &vec![(query_name.clone(), query_seq)]);
@@ -113,27 +137,46 @@ pub fn lastz_remap(bam_path: &str, ref_fa: &str, ref_len: u32, new_bam: &str) ->
     }
     records.sort_by_cached_key(|b| b.start());
 
-    let contig_name = PathBuf::from(&ref_fa).file_stem().unwrap().to_string_lossy().to_string();
+    let contig_name = PathBuf::from(&ref_fa)
+        .file_stem()
+        .unwrap()
+        .to_string_lossy()
+        .to_string();
+    // Header format
     let mut header = bam::header::Header::new();
     let mut header_line = HeaderEntry::header_line("1.6".to_string());
     header_line.push(b"SO", "Coordinate".to_string());
     header.push_entry(header_line).unwrap();
-    header.push_entry(HeaderEntry::ref_sequence(contig_name, ref_len +10_000)).unwrap();
+    header
+        .push_entry(HeaderEntry::ref_sequence(contig_name, ref_len))
+        .unwrap();
 
     let mut bam_writer = bam::bam_writer::BamWriterBuilder::new();
     let mut w = bam_writer.from_path(new_bam, header)?;
     for record in records.iter() {
-        w.write(record)?;
+        let mut record = record.clone();
+        let record_name = String::from_utf8(record.name().to_vec())?;
+        let tags = record.tags_mut();
+        if let Some(mm) = mm_hm.get(&record_name) {
+            tags.push_string(b"MM", mm)
+        }
+        let tags = record.tags_mut();
+        if let Some(ml) = ml_hm.get(&record_name) {
+            let v = ml.as_bytes();
+            tags.push_array(b"ML", v);
+        }
+        w.write(&record)?;
     }
     w.finish()?;
     fs::remove_dir_all(tmp_dir)?;
 
     duct::cmd!("samtools", "index", "-@", "10", new_bam).run()?;
+    duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?;
     Ok(())
 }
 
-pub fn write_fasta(fasta_path: &str, d: &Vec<(String, String)>) {
-    let file = File::create(fasta_path).unwrap();
+pub fn write_fasta(contig_fa: &str, d: &Vec<(String, String)>) {
+    let file = File::create(contig_fa).unwrap();
     let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file);
     let mut passed = Vec::new();
     for (name, sequence) in d {
@@ -229,38 +272,6 @@ pub fn get_seq_len(records: &[Record], id: &str) -> usize {
     lens[0]
 }
 
-// pub fn lastz_two_by_two_bam(records: &[String]) -> anyhow::Result<Vec<BlastResult>> {
-//     let lastz_bin = "lastz";
-//     let mut all: Vec<bam::Record> = Vec::new();
-//     for i in 0..records.len() {
-//         for j in (i + 1)..records.len() {
-//             let pipe = format!(
-//                 "{lastz_bin} --format=sam {} {} | samtools view -b",
-//                 &records[i], &records[j]
-//             );
-//             println!("{pipe}");
-//             let child = duct::cmd!("bash", "-c", pipe).reader()?;
-//             println!("mmm");
-//             let reader = bam::BamReader::from_stream(child, 3).unwrap();
-//             reader.header();
-//             println!("lll");
-//             let mut records = Vec::new();
-//             for r in reader {
-//                 let record = r?;
-//                 records.push(record);
-//             }
-//             if !records.is_empty() {
-//                     records.sort_by_key(|r| r.template_len());
-//                     all.push(records.last().unwrap().clone())
-//             }
-//
-//             // let bam_reader = rust_htslib::bam::Reader::from_reader(buf_reader);
-//         }
-//     }
-//
-//     Ok(all)
-// }
-
 pub fn order_sequences_with_positions(blast_results: &[BlastResult]) -> Vec<(String, usize)> {
     let mut graph: HashMap<String, HashSet<String>> = HashMap::new();
     let mut in_degree: HashMap<String, usize> = HashMap::new();
@@ -405,6 +416,192 @@ pub fn fai(path: &str) -> Result<std::process::Output, std::io::Error> {
     duct::cmd!("samtools", "faidx", path).run()
 }
 
+#[derive(Debug, Default)]
+pub struct FlyeAsm {
+    pub input_id: String,
+    pub asm_dir: String,
+    pub input_bam: String,
+    pub input_records: Vec<bam::Record>,
+    pub on_contig_bam: String,
+    pub contigs: Option<Vec<String>>,
+    pub flye_asm_params: FlyeAsmParams,
+}
+
+#[derive(Debug)]
+pub struct FlyeAsmParams {
+    pub min_cov: u16,
+}
+
+impl Default for FlyeAsmParams {
+    fn default() -> Self {
+        Self { min_cov: 2 }
+    }
+}
+
+impl FlyeAsm {
+    pub fn new(asm_dir: &str, input_id: &str) -> anyhow::Result<Self> {
+        let input_bam = format!("{asm_dir}/{input_id}.bam");
+        let on_contig_bam = format!("{asm_dir}/{input_id}_contig.bam");
+
+        let reader = bam::BamReader::from_path(&input_bam, 3)?;
+        let mut input_records = Vec::new();
+        for rec in reader {
+            input_records.push(rec?);
+        }
+        Ok(FlyeAsm {
+            asm_dir: asm_dir.to_string(),
+            input_id: input_id.to_string(),
+            on_contig_bam,
+            contigs: None,
+            flye_asm_params: FlyeAsmParams::default(),
+            input_records,
+            input_bam,
+        })
+    }
+
+    pub fn assemble(mut self) -> anyhow::Result<Self> {
+        let min_cov = self.flye_asm_params.min_cov;
+
+        let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
+        info!("Creating tmp directory {tmp_dir}");
+        fs::create_dir(&tmp_dir).unwrap();
+        let input_fa = format!("{tmp_dir}/input.fa");
+
+        records_to_fasta(&self.input_records, &input_fa)?;
+
+        let flye_tmp_dir = format!("{tmp_dir}/flye");
+        fs::create_dir(&flye_tmp_dir).unwrap();
+
+        run_flye(&input_fa, &flye_tmp_dir, "10M");
+
+        let contigs_path = format!("{flye_tmp_dir}/assembly.fasta");
+
+        if Path::new(&contigs_path).exists() {
+            let assembly_path = format!("{flye_tmp_dir}/assembly.fasta");
+            let flye_cov_path = format!("{flye_tmp_dir}/40-polishing/base_coverage.bed.gz");
+
+            // Read the assembly fasta
+            let mut reader = File::open(assembly_path)
+                .map(BufReader::new)
+                .map(noodles_fasta::Reader::new)?;
+
+            let mut contigs = Vec::new();
+            for result in reader.records() {
+                let record = result?;
+                let contig_name = String::from_utf8(record.name().to_vec()).unwrap();
+                let (s, e) = read_flye_coverage(&flye_cov_path, min_cov.into(), &contig_name);
+                let seq = record.sequence().as_ref();
+                let seq = String::from_utf8(seq.to_vec()).unwrap();
+                let seq: String = seq[s..e].into();
+                contigs.push(seq.clone());
+            }
+            self.contigs = Some(contigs);
+        } else {
+            warn!("No reads assembled")
+        }
+
+        // Cleaning
+        fs::remove_dir_all(tmp_dir)?;
+
+        Ok(self)
+    }
+
+    pub fn save(self) -> anyhow::Result<()> {
+        if self.contigs.is_none() {
+            return Err(anyhow!("No reads were assembled"));
+        }
+
+        for (i, contig) in self.contigs.unwrap().iter().enumerate() {
+            let suffixe = if i == 0 {
+                "".to_string()
+            } else {
+                format!("_{i}")
+            };
+
+            let contig_id = format!("{}{suffixe}_flye", self.input_id);
+
+            let contig_fa = format!("{}/{contig_id}.fa", self.asm_dir);
+
+            info!("Saving contig {contig_id} in {contig_fa}");
+            write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())]);
+            fai(&contig_fa)?;
+
+            // Writing bed file from best blastn results
+            let bed_path = format!("{}/{contig_id}.bed", self.asm_dir);
+            let bed = pandora_lib_blastn::Blast::init(&contig_fa)?
+                .run()?
+                .keep_best()
+                .to_bed()?;
+            let mut f = File::create(bed_path)?;
+            f.write_all(bed.as_bytes())?;
+
+            // Remaping input bam to contig
+            info!("Mapping  input reads to {contig_id}");
+            let new_bam = format!("{}/{contig_id}.bam", self.asm_dir);
+            let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.asm_dir);
+            lastz_remap(
+                &self.input_bam,
+                &contig_fa,
+                contig.len() as u32,
+                &new_bam,
+                &modkit_pileup,
+            )?;
+        }
+        Ok(())
+    }
+}
+
+pub fn records_to_fasta(r: &Vec<bam::Record>, fa_path: &str) -> anyhow::Result<()> {
+    let file = File::create(fa_path).unwrap();
+    let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file);
+    let mut names = Vec::new();
+    for bam_record in r {
+        let name = bam_record.name();
+        if !names.contains(&name) {
+            names.push(name);
+            let sequence = bam_record.sequence().to_vec();
+            let record = noodles_fasta::Record::new(
+                noodles_fasta::record::Definition::new(name, None),
+                noodles_fasta::record::Sequence::from(sequence),
+            );
+            writer.write_record(&record)?;
+        }
+    }
+    Ok(())
+}
+
+pub fn dir_flye(dir: &str) -> anyhow::Result<()> {
+    let pattern = format!("{}/*.bam", dir);
+    let re = Regex::new(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.bam$")
+        .unwrap();
+
+    for entry in glob::glob(&pattern).expect("Failed to read glob pattern") {
+        match entry {
+            Ok(path) => {
+                if let Some(file_name) = path.file_name().and_then(|name| name.to_str()) {
+                    if re.is_match(file_name) {
+                        if let Some(input_id ) = path.file_stem().and_then(|n| n.to_str()) {
+                            if PathBuf::from(format!("{dir}/{input_id}_flye.bam")).exists() {
+                                warn!("{input_id} already assembled");
+                            } else {
+                                let f = FlyeAsm::new(dir, input_id)?;
+                                if let Ok(f) = f.assemble() {
+                                    match f.save() {
+                                        Ok(_) => info!("✅"),
+                                        Err(e) => warn!("❌ {e}"),
+                                    }
+                                }
+                            }
+                        }
+                        
+                    }
+                }
+            }
+            Err(e) => println!("{:?}", e),
+        }
+    }
+    Ok(())
+}
 
 #[cfg(test)]
 mod tests {
@@ -412,9 +609,12 @@ mod tests {
 
     use uuid::Uuid;
 
-    use self::flye::assemble_flye;
-
     use super::*;
+    fn init() {
+        let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
+            .is_test(true)
+            .try_init();
+    }
 
     #[test]
     fn it_works() -> anyhow::Result<()> {
@@ -452,34 +652,26 @@ mod tests {
 
     #[test]
     fn flye() -> anyhow::Result<()> {
+        init();
         let id = "BECERRA";
-        let locus_id = "6d51d94e-b951-48ef-9cef-63c10f235ebe";
+        let input_id = "6d51d94e-b951-48ef-9cef-63c10f235ebe";
+
+        // let id = "SALICETTO";
+        // let input_id = "cd8e4a8a-27ed-4045-af88-9838201f164f";
+
         let res_dir = "/data/longreads_basic_pipe";
         let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs");
-        let path = format!("{asm_dir}/{locus_id}.fasta");
-        let bam_path = format!("{asm_dir}/{locus_id}.bam");
-        let new_bam = format!("{asm_dir}/{locus_id}_contig.bam");
-
-        let contigs = assemble_flye(&path)?.unwrap();
-        if contigs.len() == 1 {
-            let contig = contigs[0].to_string();
-            let fasta_path = format!("{asm_dir}/{locus_id}_flye.fa");
-            let bed_path = format!("{asm_dir}/{locus_id}_flye.bed");
-            write_fasta(&fasta_path, &vec![(format!("{}_flye", locus_id), contig.clone())]);
-            let bed = pandora_lib_blastn::Blast::init(&fasta_path)?
-                .run()?
-                .keep_best()
-                .to_bed()?;
-            let mut f = File::create(bed_path)?;
-            f.write_all(bed.as_bytes())?;
-            fai(&fasta_path)?;
-            lastz_remap(&bam_path, &fasta_path, contig.len() as u32, &new_bam)?;
 
-        } else {
-            for (i, contig) in contigs.iter().enumerate() {
-                println!("{contig}");
-            }
-        }
+        FlyeAsm::new(&asm_dir, input_id)?.assemble()?.save()?;
         Ok(())
     }
+
+    #[test]
+    fn flye_d() -> anyhow::Result<()> {
+        init();
+        let id = "SALICETTO";
+        let res_dir = "/data/longreads_basic_pipe";
+        let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs");
+        dir_flye(&asm_dir)
+    }
 }