dorado.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. use std::{
  2. fs,
  3. io::{Read, Write},
  4. path::{Path, PathBuf},
  5. time::SystemTime,
  6. };
  7. use duct::cmd;
  8. use log::{info, warn};
  9. use uuid::Uuid;
  10. use crate::{config::Config, pod5::FlowCellCase};
  11. pub trait Run {
  12. fn run(self) -> anyhow::Result<()>;
  13. }
  14. #[derive(Debug, Clone)]
  15. pub struct DoradoParams {
  16. pub ref_fa: String,
  17. pub ref_mmi: String,
  18. pub name: String,
  19. pub time: String,
  20. pub pod_dir: String,
  21. pub samtools_view_threads: u16,
  22. pub samtools_sort_threads: u16,
  23. }
  24. pub struct Dorado {
  25. config: Config,
  26. case: FlowCellCase,
  27. case_dir: String,
  28. time_dir: String,
  29. bam: String,
  30. start_time: SystemTime,
  31. end_time: SystemTime,
  32. is_done: bool,
  33. log: Vec<String>,
  34. }
  35. impl Dorado {
  36. pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
  37. let data_dir = "/data/longreads_basic_pipe";
  38. let case_dir = format!("{}/{}", data_dir, case.id);
  39. let time_dir = format!("{}/{}", case_dir, case.time_point);
  40. let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
  41. Ok(Self {
  42. config,
  43. start_time: SystemTime::now(),
  44. end_time: SystemTime::now(),
  45. is_done: false,
  46. log: Vec::new(),
  47. case_dir,
  48. time_dir,
  49. bam,
  50. case,
  51. })
  52. }
  53. fn create_reference_mmi(&self) -> anyhow::Result<()> {
  54. if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
  55. cmd!(
  56. "minimap2",
  57. "-x",
  58. "map-ont",
  59. "-d",
  60. &self.config.align.ref_mmi,
  61. &self.config.align.ref_fa
  62. )
  63. .run()?;
  64. }
  65. Ok(())
  66. }
  67. fn create_directories(&self) -> anyhow::Result<()> {
  68. if !std::path::Path::new(&self.case_dir).exists() {
  69. info!("Creating directory {}", self.case_dir);
  70. fs::create_dir(&self.case_dir)?;
  71. }
  72. if !std::path::Path::new(&self.time_dir).exists() {
  73. info!("Creating directory {}", self.time_dir);
  74. fs::create_dir(&self.time_dir)?;
  75. }
  76. Ok(())
  77. }
  78. fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
  79. let pod_dir = &self.case.pod_dir.display();
  80. let ref_mmi = &self.config.align.ref_mmi;
  81. let bam = &self.bam;
  82. let samtools_view_threads = self.config.align.samtools_view_threads;
  83. let samtools_sort_threads = self.config.align.samtools_sort_threads;
  84. let dorado = format!(
  85. "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}"
  86. );
  87. let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
  88. let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
  89. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  90. info!("Running: {pipe}");
  91. let pipe_cmd = cmd!("bash", "-c", pipe);
  92. let mut reader = pipe_cmd.stdout_capture().reader()?;
  93. let mut buffer = [0; 1];
  94. let mut line = String::new();
  95. loop {
  96. match reader.read(&mut buffer) {
  97. Ok(0) => break, // End of output
  98. Ok(_) => {
  99. let char = buffer[0] as char;
  100. eprint!("{}", char);
  101. std::io::stderr().flush()?;
  102. if char == '\n' {
  103. // Send the complete line
  104. self.log.push(line.clone());
  105. line.clear();
  106. } else {
  107. line.push(char);
  108. }
  109. }
  110. Err(err) => {
  111. warn!("Error reading from stderr: {}", err);
  112. break;
  113. }
  114. }
  115. }
  116. Ok(())
  117. }
  118. pub fn index(&self) -> anyhow::Result<()> {
  119. let t = self.config.align.samtools_view_threads.to_string();
  120. let cmd = format!("index -@ {t} {}", &self.bam);
  121. info!("Running samtools {cmd}");
  122. cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
  123. Ok(())
  124. }
  125. fn run_cramino(&self) -> anyhow::Result<()> {
  126. let cramino_out = format!(
  127. "{}/{}_{}_hs1_cramino.txt",
  128. self.time_dir, self.case.id, self.case.time_point
  129. );
  130. // if !Path::new(&cramino_out).exists() {
  131. info!("Quality control of BAM: {}", self.bam);
  132. let output = duct::cmd!(
  133. "cramino",
  134. "-t",
  135. "150",
  136. "--hist",
  137. "--checksum",
  138. "--karyotype",
  139. &self.bam
  140. )
  141. .stdout_capture()
  142. .unchecked()
  143. .run()?;
  144. fs::write(cramino_out, output.stdout)?;
  145. // }
  146. Ok(())
  147. }
  148. fn run_modkit(&self) -> anyhow::Result<()> {
  149. let mod_summary = format!(
  150. "{}/{}_{}_5mC_5hmC_summary.txt",
  151. self.time_dir, self.case.id, self.case.time_point
  152. );
  153. // if !Path::new(&mod_summary).exists() {
  154. info!("Generating base modification summary for BAM: {}", self.bam);
  155. let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
  156. .stdout_capture()
  157. .unchecked()
  158. .run()?;
  159. fs::write(mod_summary, output.stdout)?;
  160. // }
  161. Ok(())
  162. }
  163. fn create_fastq(&self) -> anyhow::Result<()> {
  164. let bam = &self.bam;
  165. let fastq = format!(
  166. "{}/{}/{}/{}_{}.fastq.gz",
  167. self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
  168. );
  169. if !std::path::Path::new(&fastq).exists() {
  170. let samtools = format!("samtools fastq -@ 150 {bam}");
  171. let crabz = format!("crabz -f bgzf - -o {fastq}");
  172. let pipe = format!("{samtools} | {crabz}");
  173. info!("Running: {pipe}");
  174. let pipe_cmd = duct::cmd!("bash", "-c", pipe);
  175. pipe_cmd.run()?;
  176. }
  177. Ok(())
  178. }
  179. fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
  180. let composition_a: Vec<String> =
  181. pandora_lib_pileup::bam_compo(bam.to_string_lossy().as_ref(), 20000)?
  182. .iter()
  183. .map(|(i, _)| i.clone())
  184. .collect();
  185. let composition_b: Vec<String> = pandora_lib_pileup::bam_compo(&self.bam, 20000)?
  186. .iter()
  187. .map(|(i, _)| i.clone())
  188. .collect();
  189. let n_id = composition_a
  190. .iter()
  191. .filter(|id| composition_b.contains(id))
  192. .count();
  193. if n_id > 0 {
  194. return Err(anyhow::anyhow!("Already merged"));
  195. }
  196. let into = PathBuf::from(&self.bam);
  197. let dir = into.parent().unwrap();
  198. let original_file = into.file_name().unwrap().to_string_lossy().to_string();
  199. let original_i = dir.join(format!("{original_file}.bai"));
  200. if !original_i.exists() {
  201. self.index()?;
  202. }
  203. let tmp_original_file = format!("{}.bam", Uuid::new_v4());
  204. let tmp_original = dir.join(tmp_original_file.clone());
  205. let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
  206. info!("Moving {} to {}", &into.display(), &tmp_original.display());
  207. fs::rename(&into, &tmp_original)?;
  208. info!(
  209. "Moving {} to {}",
  210. &original_i.display(),
  211. &tmp_original_i.display()
  212. );
  213. fs::rename(original_i, tmp_original_i.clone())?;
  214. let cmd = format!(
  215. "samtools merge -@ 160 -h {} {} {} {}",
  216. bam.display(),
  217. into.display(),
  218. bam.display(),
  219. tmp_original.display()
  220. );
  221. info!("Running {cmd}");
  222. cmd!(
  223. "samtools",
  224. "merge",
  225. "-@",
  226. "160",
  227. "-h",
  228. bam,
  229. into,
  230. bam,
  231. tmp_original.clone()
  232. )
  233. .run()?;
  234. fs::remove_file(tmp_original)?;
  235. fs::remove_file(tmp_original_i)?;
  236. fs::remove_file(bam)?;
  237. self.index()?;
  238. Ok(())
  239. }
  240. pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
  241. // Creating a temporary directory
  242. let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
  243. info!("Creating tmp dir {tmp_dir}");
  244. fs::create_dir(&tmp_dir)?;
  245. // Dorado base calling and align into a temporary bam file
  246. let muxed_bam = format!("{tmp_dir}/muxed.bam");
  247. let dorado_bin = &config.align.dorado_bin;
  248. let dorado_arg = &config.align.dorado_basecall_arg;
  249. let pod_dir = cases[0].pod_dir.display();
  250. let ref_mmi = &config.align.ref_mmi;
  251. let samtools_view_threads = config.align.samtools_view_threads;
  252. let dorado = format!(
  253. "{dorado_bin} basecaller {dorado_arg} {pod_dir} --trim all --reference {ref_mmi}"
  254. );
  255. let samtools_view =
  256. format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
  257. let pipe = format!("{dorado} | {samtools_view}");
  258. info!("Running: {pipe}");
  259. let pipe_cmd = cmd!("bash", "-c", pipe);
  260. pipe_cmd.run()?;
  261. info!("Basecalling ✅");
  262. // Demux the temporary bam file
  263. // Get the sequencing kit from the first pod5 file
  264. let muxed_pod_dir = &cases.first().unwrap().pod_dir;
  265. let pod_path = fs::read_dir(muxed_pod_dir)?
  266. .filter_map(|p| p.ok())
  267. .map(|p| p.path())
  268. .filter(|p| p.extension().unwrap() == "pod5")
  269. .take(1)
  270. .collect::<Vec<PathBuf>>()
  271. .pop()
  272. .unwrap();
  273. let sequencing_kit = pandora_lib_pod5::Pod5Info::from_pod5(pod_path.to_str().unwrap())
  274. .sequencing_kit
  275. .to_uppercase();
  276. let tmp_demux_dir = format!("{tmp_dir}/demuxed");
  277. fs::create_dir(&tmp_demux_dir)?;
  278. info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
  279. duct::cmd!(
  280. &config.align.dorado_bin,
  281. "demux",
  282. "--output-dir",
  283. &tmp_demux_dir,
  284. "--kit-name",
  285. &sequencing_kit,
  286. &tmp_dir,
  287. )
  288. .run()?;
  289. info!("Demux ✅");
  290. for case in cases.iter() {
  291. let barcode = case.barcode.replace("NB", "");
  292. let bam = format!("{tmp_demux_dir}/{sequencing_kit}_barcode{}.bam", barcode);
  293. // Trim
  294. let trimmed_bam = format!(
  295. "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam",
  296. barcode
  297. );
  298. let pipe = format!(
  299. "{} trim {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
  300. config.align.dorado_bin, &config.align.samtools_view_threads
  301. );
  302. cmd!("bash", "-c", pipe).run()?;
  303. // Align
  304. let aligned_bam = format!(
  305. "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
  306. barcode
  307. );
  308. let dorado = format!(
  309. "{} aligner --threads 160 {} {trimmed_bam}",
  310. config.align.dorado_bin, config.align.ref_fa,
  311. );
  312. let samtools_view = format!(
  313. "samtools view -h -@ {} -b /dev/stdin",
  314. &config.align.samtools_view_threads
  315. );
  316. let samtools_sort = format!(
  317. "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
  318. &config.align.samtools_sort_threads
  319. );
  320. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  321. info!("Running {pipe}");
  322. cmd!("bash", "-c", pipe).run()?;
  323. let d = Dorado::init(case.clone(), config.clone())?;
  324. d.create_directories()?;
  325. if PathBuf::from(&d.bam).exists() {
  326. info!("merge");
  327. d.merge_bam(&PathBuf::from(aligned_bam))?;
  328. } else {
  329. info!("Moving from {} to {}", bam, d.bam);
  330. fs::rename(aligned_bam, d.bam.clone())?;
  331. d.index()?;
  332. }
  333. d.run_cramino()?;
  334. d.run_modkit()?;
  335. }
  336. fs::remove_dir_all(tmp_dir)?;
  337. Ok(())
  338. }
  339. pub fn run_pipe(&mut self) -> anyhow::Result<()> {
  340. let start_time = std::time::SystemTime::now();
  341. self.start_time = start_time;
  342. info!("Running Dorado with params: {:#?}", self.config);
  343. let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
  344. self.create_reference_mmi()?;
  345. self.create_directories()?;
  346. info!(
  347. "Reading {} pod5 from: {}",
  348. self.case.time_point, self.config.pod_dir
  349. );
  350. let bam_path = std::path::Path::new(&self.bam);
  351. if !bam_path.exists() {
  352. info!("Creating new bam file");
  353. self.basecall_align(dorado_bin)?;
  354. } else {
  355. // check if merge before call
  356. let new_bam_path = bam_path
  357. .parent()
  358. .unwrap()
  359. .join(format!("{}.bam", Uuid::new_v4()));
  360. warn!("Creating new bam {}", new_bam_path.display());
  361. let bam = self.bam.clone();
  362. self.bam = new_bam_path.clone().to_string_lossy().to_string();
  363. self.basecall_align(dorado_bin)?;
  364. self.bam.clone_from(&bam);
  365. self.merge_bam(&new_bam_path)?;
  366. }
  367. self.index()?;
  368. self.run_cramino()?;
  369. self.run_modkit()?;
  370. // self.create_fastq()?;
  371. let end_time = std::time::SystemTime::now();
  372. self.end_time = end_time;
  373. let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
  374. info!(
  375. "Dorado and Minimap2 execution time: {} seconds",
  376. execution_time
  377. );
  378. self.is_done = true;
  379. Ok(())
  380. }
  381. }