| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677 |
- pub mod flye;
- use anyhow::anyhow;
- use bam::{
- header::HeaderEntry,
- record::tags::{StringType, TagValue},
- RecordWriter,
- };
- use flye::{read_flye_coverage, run_flye};
- use log::{info, warn};
- use nom::AsBytes;
- use pandora_lib_blastn::BlastResult;
- use petgraph::{
- algo::dijkstra,
- data::{Build, DataMap},
- dot::Dot,
- graph::NodeIndex,
- stable_graph::StableUnGraph,
- visit::{EdgeRef, IntoNeighbors, NodeIndexable},
- };
- use regex::Regex;
- use rust_htslib::bam::{Read, Reader, Record};
- use std::{
- collections::{HashMap, HashSet, VecDeque},
- fs::{self, File},
- io::{BufReader, Cursor, Write},
- path::{Path, PathBuf},
- };
- use uuid::Uuid;
- pub fn read_bam(path: &str) -> anyhow::Result<Vec<Record>> {
- let mut res = Vec::new();
- let mut bam = Reader::from_path(Path::new(path))?;
- // Iterate over all records
- for result in bam.records() {
- let record = result?;
- res.push(record);
- }
- Ok(res)
- }
- fn write_fastqs(dir_path: &str, records: &Vec<Record>) -> anyhow::Result<Vec<String>> {
- let mut paths = Vec::new();
- for record in records {
- let qname = String::from_utf8(record.qname().to_vec())?;
- let qseq = record.seq().as_bytes();
- let qqual = String::from_utf8(record.qual().to_vec())?;
- assert_eq!(qseq.len(), qqual.len());
- let fastq_record = noodles_fasta::Record::new(
- noodles_fasta::record::Definition::new(qname.clone(), None),
- qseq.into(),
- );
- let mut writer = noodles_fasta::io::Writer::new(Vec::new());
- writer.write_record(&fastq_record)?;
- let file_path = format!("{dir_path}/{qname}.fasta");
- let mut file = File::create(&file_path)?;
- file.write_all(writer.get_ref())?;
- paths.push(file_path);
- }
- Ok(paths)
- }
- pub fn lastz_two_by_two(records: &[String], best: bool) -> anyhow::Result<Vec<BlastResult>> {
- let lastz_bin = "lastz";
- let mut all: Vec<BlastResult> = Vec::new();
- for i in 0..records.len() {
- for j in (i + 1)..records.len() {
- let record_a = &records[i];
- let record_b = &records[j];
- let out = duct::cmd!(lastz_bin, "--format=blastn", record_a, record_b).read()?;
- let mut results: Vec<pandora_lib_blastn::BlastResult> = out
- .lines()
- .filter(|s| !s.starts_with('#'))
- .map(|s| s.to_string())
- .filter_map(|s| BlastResult::try_from(s).ok())
- .collect();
- if !results.is_empty() {
- if best {
- results.sort_by_key(|r| r.alignment_length);
- all.push(results.last().unwrap().clone())
- } else {
- all.extend(results.into_iter());
- }
- }
- }
- }
- Ok(all)
- }
- pub fn lastz_bam(query_path: &str, subject_path: &str) -> anyhow::Result<Vec<bam::Record>> {
- let lastz_bin = "lastz";
- let pipe =
- format!("{lastz_bin} --format=softsam {query_path} {subject_path} | samtools view -b");
- let out = duct::cmd!("bash", "-c", pipe).reader()?;
- let reader = bam::BamReader::from_stream(out, 4)?;
- Ok(reader.into_iter().filter_map(anyhow::Result::ok).collect())
- }
- pub fn lastz_remap(
- bam_path: &str,
- ref_fa: &str,
- ref_len: u32,
- new_bam: &str,
- modkit_pileup: &str,
- ) -> anyhow::Result<()> {
- let tmp = "/tmp";
- let tmp_dir = format!("{tmp}/{}", Uuid::new_v4());
- fs::create_dir(&tmp_dir)?;
- let mut records = Vec::new();
- let mut mm_hm = HashMap::new();
- let mut ml_hm = HashMap::new();
- let reader = bam::BamReader::from_path(bam_path, 4).unwrap();
- for record in reader {
- let record = record?;
- let query_name = String::from_utf8(record.name().to_vec())?;
- if let Some(TagValue::String(value, StringType::String)) = record.tags().get(b"MM") {
- mm_hm.insert(query_name.clone(), value.to_vec());
- }
- if let Some(TagValue::IntArray(value)) = record.tags().get(b"ML") {
- ml_hm.insert(query_name.clone(), value.raw().to_vec());
- }
- let query_path = format!("{}/{}.fa", &tmp_dir, query_name);
- let query_seq = String::from_utf8(record.sequence().to_vec())?;
- write_fasta(&query_path, &vec![(query_name.clone(), query_seq)]);
- let mut res = lastz_bam(&query_path, ref_fa)?;
- // take best length
- res.sort_by_cached_key(|r| r.aligned_query_end() - r.aligned_query_start());
- let mut r = res.last().unwrap().clone();
- r.set_name(query_name.as_bytes().to_vec().clone());
- records.push(r);
- }
- records.sort_by_cached_key(|b| b.start());
- let contig_name = PathBuf::from(&ref_fa)
- .file_stem()
- .unwrap()
- .to_string_lossy()
- .to_string();
- // Header format
- let mut header = bam::header::Header::new();
- let mut header_line = HeaderEntry::header_line("1.6".to_string());
- header_line.push(b"SO", "Coordinate".to_string());
- header.push_entry(header_line).unwrap();
- header
- .push_entry(HeaderEntry::ref_sequence(contig_name, ref_len))
- .unwrap();
- let mut bam_writer = bam::bam_writer::BamWriterBuilder::new();
- let mut w = bam_writer.from_path(new_bam, header)?;
- for record in records.iter() {
- let mut record = record.clone();
- let record_name = String::from_utf8(record.name().to_vec())?;
- let tags = record.tags_mut();
- if let Some(mm) = mm_hm.get(&record_name) {
- tags.push_string(b"MM", mm)
- }
- let tags = record.tags_mut();
- if let Some(ml) = ml_hm.get(&record_name) {
- let v = ml.as_bytes();
- tags.push_array(b"ML", v);
- }
- w.write(&record)?;
- }
- w.finish()?;
- fs::remove_dir_all(tmp_dir)?;
- duct::cmd!("samtools", "index", "-@", "10", new_bam).run()?;
- duct::cmd!("modkit", "pileup", new_bam, modkit_pileup).run()?;
- Ok(())
- }
- pub fn write_fasta(contig_fa: &str, d: &Vec<(String, String)>) {
- let file = File::create(contig_fa).unwrap();
- let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file);
- let mut passed = Vec::new();
- for (name, sequence) in d {
- let name = name.to_string();
- if sequence.is_empty() {
- continue;
- }
- if passed.contains(&name) {
- continue;
- }
- passed.push(name.clone());
- let record = noodles_fasta::Record::new(
- noodles_fasta::record::Definition::new(name.as_str(), None),
- noodles_fasta::record::Sequence::from(sequence.as_bytes().to_vec()),
- );
- writer.write_record(&record).unwrap();
- }
- }
- pub fn make_graph(d: Vec<BlastResult>, records: &Vec<Record>) {
- let mut g: StableUnGraph<String, Option<BlastResult>> = StableUnGraph::default();
- let mut name_to_id: HashMap<String, NodeIndex> = HashMap::new();
- for blast_result in d {
- let nid_a = if let Some(nid_a) = name_to_id.get(&blast_result.query_id) {
- *nid_a
- } else {
- let k = blast_result.query_id.clone();
- let nid_a = g.add_node(k.clone());
- name_to_id.insert(k, nid_a);
- nid_a
- };
- let nid_b = if let Some(nid_b) = name_to_id.get(&blast_result.subject_id) {
- *nid_b
- } else {
- let k = blast_result.subject_id.clone();
- let nid_b = g.add_node(k.clone());
- name_to_id.insert(k, nid_b);
- nid_b
- };
- g.add_edge(nid_a, nid_b, Some(blast_result));
- }
- println!(
- "{:?}",
- Dot::with_config(&g, &[petgraph::dot::Config::EdgeNoLabel])
- );
- // let order = dijkstra(&g, start_node, None, |_| 0);
- // println!("{order:#?}");
- // begin with start node
- let mut n_neigh: Vec<(usize, NodeIndex)> = g
- .node_indices()
- .map(|nid| (g.neighbors(nid).count(), nid))
- .collect();
- n_neigh.sort_by_key(|v| v.0);
- let start_node = n_neigh.last().unwrap().1;
- let start_name = g.node_weight(start_node).unwrap().to_string();
- let neighbors = g.neighbors(start_node);
- let mut neighbors_blast = Vec::new();
- for neighbor in neighbors {
- let e: Vec<BlastResult> = g
- .edges_connecting(start_node, neighbor)
- .map(|e| e.weight().clone().unwrap())
- .collect();
- neighbors_blast.extend(e);
- }
- let mut neighbors_blast: Vec<(BlastResult, usize)> = neighbors_blast
- .iter()
- .map(|b| {
- if b.subject_id != start_name {
- b.swap_query_subject()
- } else {
- b.clone()
- }
- })
- .map(|b| (b.clone(), get_seq_len(records, &b.query_id)))
- .collect();
- neighbors_blast.sort_by_key(|b| b.0.s_end);
- let start_len = get_seq_len(records, &start_name);
- println!("{start_len}");
- println!("{neighbors_blast:#?}");
- }
- pub fn get_seq_len(records: &[Record], id: &str) -> usize {
- let lens: Vec<usize> = records
- .iter()
- .filter(|r| String::from_utf8(r.qname().to_vec()).unwrap() == id)
- .map(|r| r.seq_len())
- .collect();
- lens[0]
- }
- pub fn order_sequences_with_positions(blast_results: &[BlastResult]) -> Vec<(String, usize)> {
- let mut graph: HashMap<String, HashSet<String>> = HashMap::new();
- let mut in_degree: HashMap<String, usize> = HashMap::new();
- let mut all_sequences: HashSet<String> = HashSet::new();
- // Build the graph
- for result in blast_results {
- all_sequences.insert(result.query_id.clone());
- all_sequences.insert(result.subject_id.clone());
- if result.q_start < result.s_start {
- graph
- .entry(result.query_id.clone())
- .or_insert_with(HashSet::new)
- .insert(result.subject_id.clone());
- *in_degree.entry(result.subject_id.clone()).or_insert(0) += 1;
- } else {
- graph
- .entry(result.subject_id.clone())
- .or_insert_with(HashSet::new)
- .insert(result.query_id.clone());
- *in_degree.entry(result.query_id.clone()).or_insert(0) += 1;
- }
- }
- // Topological sort
- let mut queue: VecDeque<String> = all_sequences
- .iter()
- .filter(|&seq| !in_degree.contains_key(seq))
- .cloned()
- .collect();
- let mut ordered_sequences: Vec<String> = Vec::new();
- while let Some(seq) = queue.pop_front() {
- ordered_sequences.push(seq.clone());
- if let Some(neighbors) = graph.get(&seq) {
- for neighbor in neighbors {
- if let Some(degree) = in_degree.get_mut(neighbor) {
- *degree -= 1;
- if *degree == 0 {
- queue.push_back(neighbor.clone());
- }
- }
- }
- }
- }
- // Handle any remaining sequences (in case of cycles)
- for seq in all_sequences {
- if !ordered_sequences.contains(&seq) {
- ordered_sequences.push(seq);
- }
- }
- // Calculate absolute positions
- let mut positions: HashMap<String, usize> = HashMap::new();
- let mut current_position = 0;
- for seq in &ordered_sequences {
- positions.insert(seq.clone(), current_position);
- current_position += 1; // Increment position for each sequence
- }
- let mut res: Vec<_> = ordered_sequences
- .into_iter()
- .map(|seq| (seq.clone(), *positions.get(&seq).unwrap()))
- .collect();
- res.sort_by(|a, b| a.1.cmp(&b.1));
- res
- }
- // fn order_sequences_with_positions_bam(records: &Vec<bam::Record>, header: &Header) -> anyhow::Result<Vec<(String, usize)>> {
- // let mut graph: HashMap<String, HashSet<String>> = HashMap::new();
- // let mut in_degree: HashMap<String, usize> = HashMap::new();
- // let mut all_sequences: HashSet<String> = HashSet::new();
- //
- // for record in records {
- // let query_id = String::from_utf8(record.name().to_vec());
- // let subject_id = record.ref_id().clone();
- // let q_start = record.aligned_query_start();
- // let s_start = record.start();
- //
- // all_sequences.insert(query_id.clone());
- // all_sequences.insert(subject_id.clone());
- //
- // if q_start < s_start {
- // graph.entry(query_id.clone())
- // .or_insert_with(HashSet::new())
- // .insert(subject_id.clone());
- // *in_degree.entry(subject_id.clone()).or_insert(0) += 1;
- // } else {
- // graph.entry(subject_id.clone())
- // .or_insert_with(HashSet::new())
- // .insert(query_id.clone());
- // *in_degree.entry(query_id.clone()).or_insert(0) += 1;
- // }
- // }
- //
- // let mut queue: VecDeque<String> = all_sequences.iter()
- // .filter(|&seq| !in_degree.contains_key(seq))
- // .cloned()
- // .collect();
- //
- // let mut ordered_sequences: Vec<String> = Vec::new();
- //
- // while let Some(seq) = queue.pop_front() {
- // ordered_sequences.push(seq.clone());
- //
- // if let Some(neighbors) = graph.get(&seq) {
- // for neighbor in neighbors {
- // if let Some(degree) = in_degree.get_mut(neighbor) {
- // *degree -= 1;
- // if *degree == 0 {
- // queue.push_back(neighbor.clone());
- // }
- // }
- // }
- // }
- // }
- //
- // for seq in all_sequences {
- // if !ordered_sequences.contains(&seq) {
- // ordered_sequences.push(seq.clone());
- // }
- // }
- //
- // let mut positions: HashMap<String, usize> = HashMap::new();
- // let mut current_position = 0;
- //
- // for seq in &ordered_sequences {
- // positions.insert(seq.clone(), current_position);
- // current_position += 1;
- // }
- //
- // ordered_sequences.into_iter()
- // .map(|seq| (seq.clone(), *positions.get(&seq).unwrap()))
- // .collect()
- // }
- pub fn fai(path: &str) -> Result<std::process::Output, std::io::Error> {
- duct::cmd!("samtools", "faidx", path).run()
- }
- #[derive(Debug, Default)]
- pub struct FlyeAsm {
- pub input_id: String,
- pub asm_dir: String,
- pub input_bam: String,
- pub input_records: Vec<bam::Record>,
- pub on_contig_bam: String,
- pub contigs: Option<Vec<String>>,
- pub flye_asm_params: FlyeAsmParams,
- }
- #[derive(Debug)]
- pub struct FlyeAsmParams {
- pub min_cov: u16,
- }
- impl Default for FlyeAsmParams {
- fn default() -> Self {
- Self { min_cov: 2 }
- }
- }
- impl FlyeAsm {
- pub fn new(asm_dir: &str, input_id: &str) -> anyhow::Result<Self> {
- let input_bam = format!("{asm_dir}/{input_id}.bam");
- let on_contig_bam = format!("{asm_dir}/{input_id}_contig.bam");
- let reader = bam::BamReader::from_path(&input_bam, 3)?;
- let mut input_records = Vec::new();
- for rec in reader {
- input_records.push(rec?);
- }
- Ok(FlyeAsm {
- asm_dir: asm_dir.to_string(),
- input_id: input_id.to_string(),
- on_contig_bam,
- contigs: None,
- flye_asm_params: FlyeAsmParams::default(),
- input_records,
- input_bam,
- })
- }
- pub fn assemble(mut self) -> anyhow::Result<Self> {
- let min_cov = self.flye_asm_params.min_cov;
- let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
- info!("Creating tmp directory {tmp_dir}");
- fs::create_dir(&tmp_dir).unwrap();
- let input_fa = format!("{tmp_dir}/input.fa");
- records_to_fasta(&self.input_records, &input_fa)?;
- let flye_tmp_dir = format!("{tmp_dir}/flye");
- fs::create_dir(&flye_tmp_dir).unwrap();
- run_flye(&input_fa, &flye_tmp_dir, "10M");
- let contigs_path = format!("{flye_tmp_dir}/assembly.fasta");
- if Path::new(&contigs_path).exists() {
- let assembly_path = format!("{flye_tmp_dir}/assembly.fasta");
- let flye_cov_path = format!("{flye_tmp_dir}/40-polishing/base_coverage.bed.gz");
- // Read the assembly fasta
- let mut reader = File::open(assembly_path)
- .map(BufReader::new)
- .map(noodles_fasta::Reader::new)?;
- let mut contigs = Vec::new();
- for result in reader.records() {
- let record = result?;
- let contig_name = String::from_utf8(record.name().to_vec()).unwrap();
- let (s, e) = read_flye_coverage(&flye_cov_path, min_cov.into(), &contig_name);
- let seq = record.sequence().as_ref();
- let seq = String::from_utf8(seq.to_vec()).unwrap();
- let seq: String = seq[s..e].into();
- contigs.push(seq.clone());
- }
- self.contigs = Some(contigs);
- } else {
- warn!("No reads assembled")
- }
- // Cleaning
- fs::remove_dir_all(tmp_dir)?;
- Ok(self)
- }
- pub fn save(self) -> anyhow::Result<()> {
- if self.contigs.is_none() {
- return Err(anyhow!("No reads were assembled"));
- }
- for (i, contig) in self.contigs.unwrap().iter().enumerate() {
- let suffixe = if i == 0 {
- "".to_string()
- } else {
- format!("_{i}")
- };
- let contig_id = format!("{}{suffixe}_flye", self.input_id);
- let contig_fa = format!("{}/{contig_id}.fa", self.asm_dir);
- info!("Saving contig {contig_id} in {contig_fa}");
- write_fasta(&contig_fa, &vec![(contig_id.clone(), contig.clone())]);
- fai(&contig_fa)?;
- // Writing bed file from best blastn results
- let bed_path = format!("{}/{contig_id}.bed", self.asm_dir);
- let bed = pandora_lib_blastn::Blast::init(&contig_fa)?
- .run()?
- .keep_best()
- .to_bed()?;
- let mut f = File::create(bed_path)?;
- f.write_all(bed.as_bytes())?;
- // Remaping input bam to contig
- info!("Mapping input reads to {contig_id}");
- let new_bam = format!("{}/{contig_id}.bam", self.asm_dir);
- let modkit_pileup = format!("{}/{contig_id}_mod.bed", self.asm_dir);
- lastz_remap(
- &self.input_bam,
- &contig_fa,
- contig.len() as u32,
- &new_bam,
- &modkit_pileup,
- )?;
- }
- Ok(())
- }
- }
- pub fn records_to_fasta(r: &Vec<bam::Record>, fa_path: &str) -> anyhow::Result<()> {
- let file = File::create(fa_path).unwrap();
- let mut writer = noodles_fasta::writer::Builder::default().build_with_writer(file);
- let mut names = Vec::new();
- for bam_record in r {
- let name = bam_record.name();
- if !names.contains(&name) {
- names.push(name);
- let sequence = bam_record.sequence().to_vec();
- let record = noodles_fasta::Record::new(
- noodles_fasta::record::Definition::new(name, None),
- noodles_fasta::record::Sequence::from(sequence),
- );
- writer.write_record(&record)?;
- }
- }
- Ok(())
- }
- pub fn dir_flye(dir: &str) -> anyhow::Result<()> {
- let pattern = format!("{}/*.bam", dir);
- let re = Regex::new(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.bam$")
- .unwrap();
- for entry in glob::glob(&pattern).expect("Failed to read glob pattern") {
- match entry {
- Ok(path) => {
- if let Some(file_name) = path.file_name().and_then(|name| name.to_str()) {
- if re.is_match(file_name) {
- if let Some(input_id ) = path.file_stem().and_then(|n| n.to_str()) {
- if PathBuf::from(format!("{dir}/{input_id}_flye.bam")).exists() {
- warn!("{input_id} already assembled");
- } else {
- let f = FlyeAsm::new(dir, input_id)?;
- if let Ok(f) = f.assemble() {
- match f.save() {
- Ok(_) => info!("✅"),
- Err(e) => warn!("❌ {e}"),
- }
- }
- }
- }
-
- }
- }
- }
- Err(e) => println!("{:?}", e),
- }
- }
- Ok(())
- }
- #[cfg(test)]
- mod tests {
- use std::fs;
- use uuid::Uuid;
- use super::*;
- fn init() {
- let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
- .is_test(true)
- .try_init();
- }
- #[test]
- fn it_works() -> anyhow::Result<()> {
- let dir = "/tmp";
- let records = read_bam("/home/prom/Documents/Programmes/pandora_lib_pileup/44d3d2a9-ad53-45f7-9a1a-2e1e229377e4.bam")?;
- println!("{} rec", records.len());
- let tmp_dir = format!("{dir}/ass_{}", Uuid::new_v4());
- println!("{tmp_dir}");
- fs::create_dir(&tmp_dir)?;
- let paths = write_fastqs(&tmp_dir, &records)?;
- println!("kkkk");
- let blast_results = lastz_two_by_two(&paths, true)?;
- // println!("{blast_results:#?}");
- fs::remove_dir_all(tmp_dir)?;
- make_graph(blast_results, &records);
- // let res = order_sequences_with_positions(&blast_results);
- // println!("{res:#?}");
- // let leftmost = res.first().unwrap().0.clone();
- // let mut on_lm: Vec<BlastResult> = blast_results
- // .iter()
- // .filter(|b| b.subject_id == leftmost || b.query_id == leftmost)
- // .map(|b| {
- // if b.query_id == leftmost {
- // b.swap_query_subject()
- // } else {
- // b.clone()
- // }
- // })
- // .collect();
- // on_lm.sort_by(|a, b| a.s_start.cmp(&b.s_start));
- // println!("{on_lm:#?}");
- Ok(())
- }
- #[test]
- fn flye() -> anyhow::Result<()> {
- init();
- let id = "BECERRA";
- let input_id = "6d51d94e-b951-48ef-9cef-63c10f235ebe";
- // let id = "SALICETTO";
- // let input_id = "cd8e4a8a-27ed-4045-af88-9838201f164f";
- let res_dir = "/data/longreads_basic_pipe";
- let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs");
- FlyeAsm::new(&asm_dir, input_id)?.assemble()?.save()?;
- Ok(())
- }
- #[test]
- fn flye_d() -> anyhow::Result<()> {
- init();
- let id = "SALICETTO";
- let res_dir = "/data/longreads_basic_pipe";
- let asm_dir = format!("{res_dir}/{id}/diag/asm_contigs");
- dir_flye(&asm_dir)
- }
- }
|