flye.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. use log::{info, warn};
  2. use std::{
  3. fs::{self, File}, io::{BufRead, BufReader}, path::Path, process::{Command, Stdio}
  4. };
  5. pub fn run_flye(fasta_path: &str, tmp_dir: &str, _size: &str) {
  6. info!("Running Flye for {fasta_path}");
  7. let flye_bin = "/data/tools/Flye/bin/flye";
  8. let mut cmd = Command::new(flye_bin)
  9. .arg("--threads")
  10. .arg("12")
  11. // .arg("--keep-haplotypes")
  12. // .arg("--meta")
  13. .arg("--min-overlap")
  14. .arg("1000")
  15. .arg("--out-dir")
  16. .arg(tmp_dir)
  17. .arg("--nano-hq")
  18. .arg(fasta_path)
  19. .stderr(Stdio::piped())
  20. .spawn()
  21. .expect("Flye failed to start");
  22. let stderr = cmd.stderr.take().unwrap();
  23. let reader = BufReader::new(stderr);
  24. reader
  25. .lines()
  26. .map_while(Result::ok)
  27. .filter(|line| line.contains("ERROR"))
  28. .for_each(|line| warn!("[FLYE] {line}"));
  29. cmd.wait().unwrap();
  30. cmd.kill().unwrap();
  31. }
  32. pub fn read_flye_coverage(path: &str, min_cov: i32, contig_name: &str) -> (usize, usize) {
  33. use std::io::Read;
  34. let mut reader = File::open(path).map(flate2::read::GzDecoder::new).unwrap();
  35. let mut buf = Vec::new();
  36. reader.read_to_end(&mut buf).unwrap();
  37. let mut line_acc = Vec::new();
  38. let mut start = None;
  39. let mut end = None;
  40. let mut last_end = 0;
  41. for b in buf.iter() {
  42. match b {
  43. b'\n' => {
  44. let s = String::from_utf8(line_acc.clone()).unwrap();
  45. line_acc.clear();
  46. if !s.starts_with(&format!("{contig_name}\t")) {
  47. continue;
  48. }
  49. let s = s.split('\t').collect::<Vec<&str>>();
  50. let cov: i32 = s.get(3).unwrap().parse().unwrap();
  51. if start.is_none() && cov >= min_cov {
  52. let st: i32 = s.get(1).unwrap().parse().unwrap();
  53. start = Some(st);
  54. } else if end.is_none() && start.is_some() && cov < min_cov {
  55. let en: i32 = s.get(1).unwrap().parse().unwrap();
  56. end = Some(en);
  57. break;
  58. }
  59. last_end = s.get(2).unwrap().parse().unwrap();
  60. }
  61. _ => line_acc.push(*b),
  62. }
  63. }
  64. (start.unwrap() as usize, end.unwrap_or(last_end) as usize)
  65. }
  66. pub fn assemble_flye(reads_path: &str) -> anyhow::Result<Option<Vec<String>>> {
  67. let min_cov = 2;
  68. let tmp_dir = format!("/tmp/ass_{}", uuid::Uuid::new_v4());
  69. fs::create_dir(tmp_dir.clone()).unwrap();
  70. run_flye(reads_path, &tmp_dir, "10M");
  71. let contigs_path = format!("{tmp_dir}/assembly.fasta");
  72. let mut res = None;
  73. if Path::new(&contigs_path).exists() {
  74. let assembly_path = format!("{tmp_dir}/assembly.fasta");
  75. let flye_cov_path = format!("{tmp_dir}/40-polishing/base_coverage.bed.gz");
  76. let mut reader = File::open(assembly_path)
  77. .map(BufReader::new)
  78. .map(noodles_fasta::Reader::new)?;
  79. let mut contigs = Vec::new();
  80. for result in reader.records() {
  81. let record = result?;
  82. let contig_name = String::from_utf8(record.name().to_vec()).unwrap();
  83. let (s, e) = read_flye_coverage(&flye_cov_path, min_cov, &contig_name);
  84. let seq = record.sequence().as_ref();
  85. let seq = String::from_utf8(seq.to_vec()).unwrap();
  86. let seq: String = seq[s..e].into();
  87. contigs.push(seq.clone());
  88. }
  89. res = Some(contigs);
  90. }
  91. fs::remove_dir_all(tmp_dir)?;
  92. Ok(res)
  93. }