pub mod flye; use anyhow::anyhow; use bam::{ header::HeaderEntry, record::tags::{StringType, TagValue}, RecordWriter, }; use flye::{read_flye_coverage, run_flye}; use log::{info, warn}; use nom::AsBytes; use pandora_lib_blastn::BlastResult; use petgraph::{ algo::dijkstra, data::{Build, DataMap}, dot::Dot, graph::NodeIndex, stable_graph::StableUnGraph, visit::{EdgeRef, IntoNeighbors, NodeIndexable}, }; use regex::Regex; use rust_htslib::bam::{Read, Reader, Record}; use std::{ collections::{HashMap, HashSet, VecDeque}, fs::{self, File}, io::{BufReader, Cursor, Write}, path::{Path, PathBuf}, }; use uuid::Uuid; pub fn read_bam(path: &str) -> anyhow::Result> { let mut res = Vec::new(); let mut bam = Reader::from_path(Path::new(path))?; // Iterate over all records for result in bam.records() { let record = result?; res.push(record); } Ok(res) } fn write_fastqs(dir_path: &str, records: &Vec) -> anyhow::Result> { let mut paths = Vec::new(); for record in records { let qname = String::from_utf8(record.qname().to_vec())?; let qseq = record.seq().as_bytes(); let qqual = String::from_utf8(record.qual().to_vec())?; assert_eq!(qseq.len(), qqual.len()); let fastq_record = noodles_fasta::Record::new( noodles_fasta::record::Definition::new(qname.clone(), None), qseq.into(), ); let mut writer = noodles_fasta::io::Writer::new(Vec::new()); writer.write_record(&fastq_record)?; let file_path = format!("{dir_path}/{qname}.fasta"); let mut file = File::create(&file_path)?; file.write_all(writer.get_ref())?; paths.push(file_path); } Ok(paths) } pub fn lastz_two_by_two(records: &[String], best: bool) -> anyhow::Result> { let lastz_bin = "lastz"; let mut all: Vec = Vec::new(); for i in 0..records.len() { for j in (i + 1)..records.len() { let record_a = &records[i]; let record_b = &records[j]; let out = duct::cmd!(lastz_bin, "--format=blastn", record_a, record_b).read()?; let mut results: Vec = out .lines() .filter(|s| !s.starts_with('#')) .map(|s| s.to_string()) .filter_map(|s| BlastResult::try_from(s).ok()) .collect(); if !results.is_empty() { if best { results.sort_by_key(|r| r.alignment_length); all.push(results.last().unwrap().clone()) } else { all.extend(results.into_iter()); } } } } Ok(all) } pub fn lastz_bam(query_path: &str, subject_path: &str) -> anyhow::Result> { let lastz_bin = "lastz"; let pipe = format!("{lastz_bin} --format=softsam {query_path} {subject_path} | samtools view -b"); let out = duct::cmd!("bash", "-c", pipe).reader()?; let reader = bam::BamReader::from_stream(out, 4)?; Ok(reader.into_iter().filter_map(anyhow::Result::ok).collect()) } pub fn lastz_remap( bam_path: &str, ref_fa: &str, ref_len: u32, new_bam: &str, modkit_pileup: &str, ) -> anyhow::Result<()> { let tmp = "/tmp"; let tmp_dir = format!("{tmp}/{}", Uuid::new_v4()); fs::create_dir(&tmp_dir)?; let mut records = Vec::new(); let mut mm_hm = HashMap::new(); let mut ml_hm = HashMap::new(); let reader = bam::BamReader::from_path(bam_path, 4).unwrap(); for record in reader { let record = record?; let query_name = String::from_utf8(record.name().to_vec())?; if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") { mm_hm.insert(query_name.clone(), value.to_vec()); } if let Some(TagValue::IntArray(value)) = record.tags().get(b"ML") { ml_hm.insert(query_name.clone(), value.raw().to_vec()); } let query_path = format!("{}/{}.fa", &tmp_dir, query_name); let query_seq = String::from_utf8(record.sequence().to_vec())?; write_fasta(&query_path, &vec![(query_name.clone(), query_seq)]); let mut res = lastz_bam(&query_path, ref_fa)?; // take best length res.sort_by_cached_key(|r| r.aligned_query_end() - r.aligned_query_start()); let mut r = res.last().unwrap().clone(); r.set_name(query_name.as_bytes().to_vec().clone()); records.push(r); } records.sort_by_cached_key(|b| b.start()); let contig_name = PathBuf::from(&ref_fa) .file_stem() .unwrap() .to_string_lossy() .to_string(); // Header format let mut header = bam::header::Header::new(); let mut header_line = HeaderEntry::header_line("1.6".to_string()); header_line.push(b"SO", "Coordinate".to_string()); header.push_entry(header_line).unwrap(); header .push_entry(HeaderEntry::ref_sequence(contig_name, ref_len)) .unwrap(); let mut bam_writer = bam::bam_writer::BamWriterBuilder::new(); let mut w = bam_writer.from_path(new_bam, header)?; for record in records.iter() { let mut record = record.clone(); let record_name = String::from_utf8(record.name().to_vec())?; let tags = record.tags_mut(); if let Some(mm) = mm_hm.get(&record_name) { tags.push_string(b"MM", mm) } let tags = record.tags_mut(); if let Some(ml) = ml_hm.get(&record_name) { let v = ml.as_bytes(); tags.push_array(b"ML", v); } w.write(&record)?; } w.finish()?; fs::remove_dir_all(tmp_dir)?; duct::cmd!("samtools", "index", "-@", "10", new_bam).run()?; duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?; Ok(()) } pub fn write_fasta(contig_fa: &str, d: &Vec<(String, String)>) { let file = File::create(contig_fa).unwrap(); let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file); let mut passed = Vec::new(); for (name, sequence) in d { let name = name.to_string(); if sequence.is_empty() { continue; } if passed.contains(&name) { continue; } passed.push(name.clone()); let record = noodles_fasta::Record::new( noodles_fasta::record::Definition::new(name.as_str(), None), noodles_fasta::record::Sequence::from(sequence.as_bytes().to_vec()), ); writer.write_record(&record).unwrap(); } } pub fn make_graph(d: Vec, records: &Vec) { let mut g: StableUnGraph> = StableUnGraph::default(); let mut name_to_id: HashMap = HashMap::new(); for blast_result in d { let nid_a = if let Some(nid_a) = name_to_id.get(&blast_result.query_id) { *nid_a } else { let k = blast_result.query_id.clone(); let nid_a = g.add_node(k.clone()); name_to_id.insert(k, nid_a); nid_a }; let nid_b = if let Some(nid_b) = name_to_id.get(&blast_result.subject_id) { *nid_b } else { let k = blast_result.subject_id.clone(); let nid_b = g.add_node(k.clone()); name_to_id.insert(k, nid_b); nid_b }; g.add_edge(nid_a, nid_b, Some(blast_result)); } println!( "{:?}", Dot::with_config(&g, &[petgraph::dot::Config::EdgeNoLabel]) ); // let order = dijkstra(&g, start_node, None, |_| 0); // println!("{order:#?}"); // begin with start node let mut n_neigh: Vec<(usize, NodeIndex)> = g .node_indices() .map(|nid| (g.neighbors(nid).count(), nid)) .collect(); n_neigh.sort_by_key(|v| v.0); let start_node = n_neigh.last().unwrap().1; let start_name = g.node_weight(start_node).unwrap().to_string(); let neighbors = g.neighbors(start_node); let mut neighbors_blast = Vec::new(); for neighbor in neighbors { let e: Vec = g .edges_connecting(start_node, neighbor) .map(|e| e.weight().clone().unwrap()) .collect(); neighbors_blast.extend(e); } let mut neighbors_blast: Vec<(BlastResult, usize)> = neighbors_blast .iter() .map(|b| { if b.subject_id != start_name { b.swap_query_subject() } else { b.clone() } }) .map(|b| (b.clone(), get_seq_len(records, &b.query_id))) .collect(); neighbors_blast.sort_by_key(|b| b.0.s_end); let start_len = get_seq_len(records, &start_name); println!("{start_len}"); println!("{neighbors_blast:#?}"); } pub fn get_seq_len(records: &[Record], id: &str) -> usize { let lens: Vec = records .iter() .filter(|r| String::from_utf8(r.qname().to_vec()).unwrap() == id) .map(|r| r.seq_len()) .collect(); lens[0] } pub fn order_sequences_with_positions(blast_results: &[BlastResult]) -> Vec<(String, usize)> { let mut graph: HashMap> = HashMap::new(); let mut in_degree: HashMap = HashMap::new(); let mut all_sequences: HashSet = HashSet::new(); // Build the graph for result in blast_results { all_sequences.insert(result.query_id.clone()); all_sequences.insert(result.subject_id.clone()); if result.q_start < result.s_start { graph .entry(result.query_id.clone()) .or_insert_with(HashSet::new) .insert(result.subject_id.clone()); *in_degree.entry(result.subject_id.clone()).or_insert(0) += 1; } else { graph .entry(result.subject_id.clone()) .or_insert_with(HashSet::new) .insert(result.query_id.clone()); *in_degree.entry(result.query_id.clone()).or_insert(0) += 1; } } // Topological sort let mut queue: VecDeque = all_sequences .iter() .filter(|&seq| !in_degree.contains_key(seq)) .cloned() .collect(); let mut ordered_sequences: Vec = Vec::new(); while let Some(seq) = queue.pop_front() { ordered_sequences.push(seq.clone()); if let Some(neighbors) = graph.get(&seq) { for neighbor in neighbors { if let Some(degree) = in_degree.get_mut(neighbor) { *degree -= 1; if *degree == 0 { queue.push_back(neighbor.clone()); } } } } } // Handle any remaining sequences (in case of cycles) for seq in all_sequences { if !ordered_sequences.contains(&seq) { ordered_sequences.push(seq); } } // Calculate absolute positions let mut positions: HashMap = HashMap::new(); let mut current_position = 0; for seq in &ordered_sequences { positions.insert(seq.clone(), current_position); current_position += 1; // Increment position for each sequence } let mut res: Vec<_> = ordered_sequences .into_iter() .map(|seq| (seq.clone(), *positions.get(&seq).unwrap())) .collect(); res.sort_by(|a, b| a.1.cmp(&b.1)); res } // fn order_sequences_with_positions_bam(records: &Vec, header: &Header) -> anyhow::Result> { // let mut graph: HashMap> = HashMap::new(); // let mut in_degree: HashMap = HashMap::new(); // let mut all_sequences: HashSet = HashSet::new(); // // for record in records { // let query_id = String::from_utf8(record.name().to_vec()); // let subject_id = record.ref_id().clone(); // let q_start = record.aligned_query_start(); // let s_start = record.start(); // // all_sequences.insert(query_id.clone()); // all_sequences.insert(subject_id.clone()); // // if q_start < s_start { // graph.entry(query_id.clone()) // .or_insert_with(HashSet::new()) // .insert(subject_id.clone()); // *in_degree.entry(subject_id.clone()).or_insert(0) += 1; // } else { // graph.entry(subject_id.clone()) // .or_insert_with(HashSet::new()) // .insert(query_id.clone()); // *in_degree.entry(query_id.clone()).or_insert(0) += 1; // } // } // // let mut queue: VecDeque = all_sequences.iter() // .filter(|&seq| !in_degree.contains_key(seq)) // .cloned() // .collect(); // // let mut ordered_sequences: Vec = Vec::new(); // // while let Some(seq) = queue.pop_front() { // ordered_sequences.push(seq.clone()); // // if let Some(neighbors) = graph.get(&seq) { // for neighbor in neighbors { // if let Some(degree) = in_degree.get_mut(neighbor) { // *degree -= 1; // if *degree == 0 { // queue.push_back(neighbor.clone()); // } // } // } // } // } // // for seq in all_sequences { // if !ordered_sequences.contains(&seq) { // ordered_sequences.push(seq.clone()); // } // } // // let mut positions: HashMap = HashMap::new(); // let mut current_position = 0; // // for seq in &ordered_sequences { // positions.insert(seq.clone(), current_position); // current_position += 1; // } // // ordered_sequences.into_iter() // .map(|seq| (seq.clone(), *positions.get(&seq).unwrap())) // .collect() // } pub fn fai(path: &str) -> Result { duct::cmd!("samtools", "faidx", path).run() } #[derive(Debug, Default)] pub struct FlyeAsm { pub input_id: String, pub asm_dir: String, pub input_bam: String, pub input_records: Vec, pub on_contig_bam: String, pub contigs: Option>, pub flye_asm_params: FlyeAsmParams, } #[derive(Debug)] pub struct FlyeAsmParams { pub min_cov: u16, } impl Default for FlyeAsmParams { fn default() -> Self { Self { min_cov: 2 } } } impl FlyeAsm { pub fn new(asm_dir: &str, input_id: &str) -> anyhow::Result { let input_bam = format!("{asm_dir}/{input_id}.bam"); let on_contig_bam = format!("{asm_dir}/{input_id}_contig.bam"); let reader = bam::BamReader::from_path(&input_bam, 3)?; let mut input_records = Vec::new(); for rec in reader { input_records.push(rec?); } Ok(FlyeAsm { asm_dir: asm_dir.to_string(), input_id: input_id.to_string(), on_contig_bam, contigs: None, flye_asm_params: FlyeAsmParams::default(), input_records, input_bam, }) } pub fn assemble(mut self) -> anyhow::Result { let min_cov = self.flye_asm_params.min_cov; let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4()); info!("Creating tmp directory {tmp_dir}"); fs::create_dir(&tmp_dir).unwrap(); let input_fa = format!("{tmp_dir}/input.fa"); records_to_fasta(&self.input_records, &input_fa)?; let flye_tmp_dir = format!("{tmp_dir}/flye"); fs::create_dir(&flye_tmp_dir).unwrap(); run_flye(&input_fa, &flye_tmp_dir, "10M"); let contigs_path = format!("{flye_tmp_dir}/assembly.fasta"); if Path::new(&contigs_path).exists() { let assembly_path = format!("{flye_tmp_dir}/assembly.fasta"); let flye_cov_path = format!("{flye_tmp_dir}/40-polishing/base_coverage.bed.gz"); // Read the assembly fasta let mut reader = File::open(assembly_path) .map(BufReader::new) .map(noodles_fasta::Reader::new)?; let mut contigs = Vec::new(); for result in reader.records() { let record = result?; let contig_name = String::from_utf8(record.name().to_vec()).unwrap(); let (s, e) = read_flye_coverage(&flye_cov_path, min_cov.into(), &contig_name); let seq = record.sequence().as_ref(); let seq = String::from_utf8(seq.to_vec()).unwrap(); let seq: String = seq[s..e].into(); contigs.push(seq.clone()); } self.contigs = Some(contigs); } else { warn!("No reads assembled") } // Cleaning fs::remove_dir_all(tmp_dir)?; Ok(self) } pub fn save(self) -> anyhow::Result<()> { if self.contigs.is_none() { return Err(anyhow!("No reads were assembled")); } for (i, contig) in self.contigs.unwrap().iter().enumerate() { let suffixe = if i == 0 { "".to_string() } else { format!("_{i}") }; let contig_id = format!("{}{suffixe}_flye", self.input_id); let contig_fa = format!("{}/{contig_id}.fa", self.asm_dir); info!("Saving contig {contig_id} in {contig_fa}"); write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())]); fai(&contig_fa)?; // Writing bed file from best blastn results let bed_path = format!("{}/{contig_id}.bed", self.asm_dir); let bed = pandora_lib_blastn::Blast::init(&contig_fa)? .run()? .keep_best() .to_bed()?; let mut f = File::create(bed_path)?; f.write_all(bed.as_bytes())?; // Remaping input bam to contig info!("Mapping input reads to {contig_id}"); let new_bam = format!("{}/{contig_id}.bam", self.asm_dir); let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.asm_dir); lastz_remap( &self.input_bam, &contig_fa, contig.len() as u32, &new_bam, &modkit_pileup, )?; } Ok(()) } } pub fn records_to_fasta(r: &Vec, fa_path: &str) -> anyhow::Result<()> { let file = File::create(fa_path).unwrap(); let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file); let mut names = Vec::new(); for bam_record in r { let name = bam_record.name(); if !names.contains(&name) { names.push(name); let sequence = bam_record.sequence().to_vec(); let record = noodles_fasta::Record::new( noodles_fasta::record::Definition::new(name, None), noodles_fasta::record::Sequence::from(sequence), ); writer.write_record(&record)?; } } Ok(()) } pub fn dir_flye(dir: &str) -> anyhow::Result<()> { let pattern = format!("{}/*.bam", dir); let re = Regex::new(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.bam$") .unwrap(); for entry in glob::glob(&pattern).expect("Failed to read glob pattern") { match entry { Ok(path) => { if let Some(file_name) = path.file_name().and_then(|name| name.to_str()) { if re.is_match(file_name) { if let Some(input_id ) = path.file_stem().and_then(|n| n.to_str()) { if PathBuf::from(format!("{dir}/{input_id}_flye.bam")).exists() { warn!("{input_id} already assembled"); } else { let f = FlyeAsm::new(dir, input_id)?; if let Ok(f) = f.assemble() { match f.save() { Ok(_) => info!("✅"), Err(e) => warn!("❌ {e}"), } } } } } } } Err(e) => println!("{:?}", e), } } Ok(()) } #[cfg(test)] mod tests { use std::fs; use uuid::Uuid; use super::*; fn init() { let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) .is_test(true) .try_init(); } #[test] fn it_works() -> anyhow::Result<()> { let dir = "/tmp"; let records = read_bam("/home/prom/Documents/Programmes/pandora_lib_pileup/44d3d2a9-ad53-45f7-9a1a-2e1e229377e4.bam")?; println!("{} rec", records.len()); let tmp_dir = format!("{dir}/ass_{}", Uuid::new_v4()); println!("{tmp_dir}"); fs::create_dir(&tmp_dir)?; let paths = write_fastqs(&tmp_dir, &records)?; println!("kkkk"); let blast_results = lastz_two_by_two(&paths, true)?; // println!("{blast_results:#?}"); fs::remove_dir_all(tmp_dir)?; make_graph(blast_results, &records); // let res = order_sequences_with_positions(&blast_results); // println!("{res:#?}"); // let leftmost = res.first().unwrap().0.clone(); // let mut on_lm: Vec = blast_results // .iter() // .filter(|b| b.subject_id == leftmost || b.query_id == leftmost) // .map(|b| { // if b.query_id == leftmost { // b.swap_query_subject() // } else { // b.clone() // } // }) // .collect(); // on_lm.sort_by(|a, b| a.s_start.cmp(&b.s_start)); // println!("{on_lm:#?}"); Ok(()) } #[test] fn flye() -> anyhow::Result<()> { init(); let id = "BECERRA"; let input_id = "6d51d94e-b951-48ef-9cef-63c10f235ebe"; // let id = "SALICETTO"; // let input_id = "cd8e4a8a-27ed-4045-af88-9838201f164f"; let res_dir = "/data/longreads_basic_pipe"; let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs"); FlyeAsm::new(&asm_dir, input_id)?.assemble()?.save()?; Ok(()) } #[test] fn flye_d() -> anyhow::Result<()> { init(); let id = "SALICETTO"; let res_dir = "/data/longreads_basic_pipe"; let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs"); dir_flye(&asm_dir) } }