dorado.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. use std::{
  2. fs,
  3. io::{Read, Write},
  4. path::{Path, PathBuf},
  5. time::SystemTime,
  6. };
  7. use duct::cmd;
  8. use log::{info, warn};
  9. use uuid::Uuid;
  10. use crate::{config::Config, pod5::FlowCellCase};
  11. pub trait Run {
  12. fn run(self) -> anyhow::Result<()>;
  13. }
  14. #[derive(Debug, Clone)]
  15. pub struct DoradoParams {
  16. pub ref_fa: String,
  17. pub ref_mmi: String,
  18. pub name: String,
  19. pub time: String,
  20. pub pod_dir: String,
  21. pub samtools_view_threads: u16,
  22. pub samtools_sort_threads: u16,
  23. }
  24. pub struct Dorado {
  25. config: Config,
  26. case: FlowCellCase,
  27. case_dir: String,
  28. time_dir: String,
  29. bam: String,
  30. start_time: SystemTime,
  31. end_time: SystemTime,
  32. is_done: bool,
  33. log: Vec<String>,
  34. }
  35. impl Dorado {
  36. pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
  37. let data_dir = "/data/longreads_basic_pipe";
  38. let case_dir = format!("{}/{}", data_dir, case.id);
  39. let time_dir = format!("{}/{}", case_dir, case.time_point);
  40. let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
  41. Ok(Self {
  42. config,
  43. start_time: SystemTime::now(),
  44. end_time: SystemTime::now(),
  45. is_done: false,
  46. log: Vec::new(),
  47. case_dir,
  48. time_dir,
  49. bam,
  50. case,
  51. })
  52. }
  53. fn create_reference_mmi(&self) -> anyhow::Result<()> {
  54. if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
  55. cmd!(
  56. "minimap2",
  57. "-x",
  58. "map-ont",
  59. "-d",
  60. &self.config.align.ref_mmi,
  61. &self.config.align.ref_fa
  62. )
  63. .run()?;
  64. }
  65. Ok(())
  66. }
  67. fn create_directories(&self) -> anyhow::Result<()> {
  68. if !std::path::Path::new(&self.case_dir).exists() {
  69. fs::create_dir(&self.case_dir)?;
  70. }
  71. if !std::path::Path::new(&self.time_dir).exists() {
  72. fs::create_dir(&self.time_dir)?;
  73. }
  74. Ok(())
  75. }
  76. fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
  77. let pod_dir = &self.case.pod_dir.display();
  78. let ref_mmi = &self.config.align.ref_mmi;
  79. let bam = &self.bam;
  80. let samtools_view_threads = self.config.align.samtools_view_threads;
  81. let samtools_sort_threads = self.config.align.samtools_sort_threads;
  82. let dorado = format!(
  83. "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}"
  84. );
  85. let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
  86. let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
  87. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  88. info!("Running: {pipe}");
  89. let pipe_cmd = cmd!("bash", "-c", pipe);
  90. let mut reader = pipe_cmd.stdout_capture().reader()?;
  91. let mut buffer = [0; 1];
  92. let mut line = String::new();
  93. loop {
  94. match reader.read(&mut buffer) {
  95. Ok(0) => break, // End of output
  96. Ok(_) => {
  97. let char = buffer[0] as char;
  98. eprint!("{}", char);
  99. std::io::stderr().flush()?;
  100. if char == '\n' {
  101. // Send the complete line
  102. self.log.push(line.clone());
  103. line.clear();
  104. } else {
  105. line.push(char);
  106. }
  107. }
  108. Err(err) => {
  109. warn!("Error reading from stderr: {}", err);
  110. break;
  111. }
  112. }
  113. }
  114. Ok(())
  115. }
  116. pub fn index(&self) -> anyhow::Result<()> {
  117. let t = self.config.align.samtools_view_threads.to_string();
  118. let cmd = format!("index -@ {t} {}", &self.bam);
  119. info!("Running samtools {cmd}");
  120. cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
  121. Ok(())
  122. }
  123. fn run_cramino(&self) -> anyhow::Result<()> {
  124. let cramino_out = format!(
  125. "{}/{}_{}_hs1_cramino.txt",
  126. self.time_dir, self.case.id, self.case.time_point
  127. );
  128. // if !Path::new(&cramino_out).exists() {
  129. info!("Quality control of BAM: {}", self.bam);
  130. let output = duct::cmd!(
  131. "cramino",
  132. "-t",
  133. "150",
  134. "--hist",
  135. "--checksum",
  136. "--karyotype",
  137. &self.bam
  138. )
  139. .stdout_capture()
  140. .unchecked()
  141. .run()?;
  142. fs::write(cramino_out, output.stdout)?;
  143. // }
  144. Ok(())
  145. }
  146. fn run_modkit(&self) -> anyhow::Result<()> {
  147. let mod_summary = format!(
  148. "{}/{}_{}_5mC_5hmC_summary.txt",
  149. self.time_dir, self.case.id, self.case.time_point
  150. );
  151. // if !Path::new(&mod_summary).exists() {
  152. info!("Generating base modification summary for BAM: {}", self.bam);
  153. let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
  154. .stdout_capture()
  155. .unchecked()
  156. .run()?;
  157. fs::write(mod_summary, output.stdout)?;
  158. // }
  159. Ok(())
  160. }
  161. fn create_fastq(&self) -> anyhow::Result<()> {
  162. let bam = &self.bam;
  163. let fastq = format!(
  164. "{}/{}/{}/{}_{}.fastq.gz",
  165. self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
  166. );
  167. if !std::path::Path::new(&fastq).exists() {
  168. let samtools = format!("samtools fastq -@ 150 {bam}");
  169. let crabz = format!("crabz -f bgzf - -o {fastq}");
  170. let pipe = format!("{samtools} | {crabz}");
  171. info!("Running: {pipe}");
  172. let pipe_cmd = duct::cmd!("bash", "-c", pipe);
  173. pipe_cmd.run()?;
  174. }
  175. Ok(())
  176. }
  177. fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
  178. let into = PathBuf::from(&self.bam);
  179. let dir = into.parent().unwrap();
  180. let original_file = into.file_name().unwrap().to_string_lossy().to_string();
  181. let original_i = dir.join(format!("{original_file}.bai"));
  182. if !original_i.exists() {
  183. self.index()?;
  184. }
  185. let tmp_original_file = format!("{}.bam", Uuid::new_v4());
  186. let tmp_original = dir.join(tmp_original_file.clone());
  187. let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
  188. info!("Moving {} to {}", &into.display(), &tmp_original.display());
  189. fs::rename(&into, &tmp_original)?;
  190. info!(
  191. "Moving {} to {}",
  192. &original_i.display(),
  193. &tmp_original_i.display()
  194. );
  195. fs::rename(original_i, tmp_original_i.clone())?;
  196. let cmd = format!(
  197. "samtools merge -@ 160 -h {} {} {} {}",
  198. bam.display(),
  199. into.display(),
  200. bam.display(),
  201. tmp_original.display()
  202. );
  203. info!("Running {cmd}");
  204. cmd!(
  205. "samtools",
  206. "merge",
  207. "-@",
  208. "160",
  209. "-h",
  210. bam,
  211. into,
  212. bam,
  213. tmp_original.clone()
  214. )
  215. .run()?;
  216. fs::remove_file(tmp_original)?;
  217. fs::remove_file(tmp_original_i)?;
  218. self.index()?;
  219. Ok(())
  220. }
  221. pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
  222. // Creating a temporary directory
  223. let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
  224. info!("Creating tmp dir {tmp_dir}");
  225. fs::create_dir(&tmp_dir)?;
  226. // Dorado base calling and align into a temporary bam file
  227. let muxed_bam = format!("{tmp_dir}/muxed.bam");
  228. let dorado_bin = &config.align.dorado_bin;
  229. let dorado_arg = &config.align.dorado_basecall_arg;
  230. let pod_dir = cases[0].pod_dir.display();
  231. let ref_mmi = &config.align.ref_mmi;
  232. let samtools_view_threads = config.align.samtools_view_threads;
  233. let dorado = format!(
  234. "{dorado_bin} basecaller {dorado_arg} {pod_dir} --trim all --reference {ref_mmi}"
  235. );
  236. let samtools_view =
  237. format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
  238. let pipe = format!("{dorado} | {samtools_view}");
  239. info!("Running: {pipe}");
  240. let pipe_cmd = cmd!("bash", "-c", pipe);
  241. pipe_cmd.run()?;
  242. info!("Basecalling ✅");
  243. // Demux the temporary bam file
  244. // Get the sequencing kit from the first pod5 file
  245. let muxed_pod_dir = &cases.first().unwrap().pod_dir;
  246. let pod_path = fs::read_dir(muxed_pod_dir)?
  247. .filter_map(|p| p.ok())
  248. .map(|p| p.path())
  249. .filter(|p| p.extension().unwrap() == "pod5")
  250. .take(1)
  251. .collect::<Vec<PathBuf>>()
  252. .pop()
  253. .unwrap();
  254. let sequencing_kit = pandora_lib_pod5::Pod5Info::from_pod5(pod_path.to_str().unwrap())
  255. .sequencing_kit
  256. .to_uppercase();
  257. let tmp_demux_dir = format!("{tmp_dir}/demuxed");
  258. fs::create_dir(&tmp_demux_dir)?;
  259. info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
  260. duct::cmd!(
  261. &config.align.dorado_bin,
  262. "demux",
  263. "--output-dir",
  264. &tmp_demux_dir,
  265. "--kit-name",
  266. &sequencing_kit,
  267. &tmp_dir,
  268. )
  269. .run()?;
  270. info!("Demux ✅");
  271. for case in cases.iter() {
  272. let bam = format!(
  273. "{tmp_demux_dir}/{sequencing_kit}_barcode{}.bam",
  274. case.barcode
  275. );
  276. // Trim
  277. let trimmed_bam = format!(
  278. "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam",
  279. case.barcode
  280. );
  281. let pipe = format!(
  282. "{} trim {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
  283. config.align.dorado_bin, &config.align.samtools_view_threads
  284. );
  285. cmd!("bash", "-c", pipe).run()?;
  286. // Align
  287. let aligned_bam = format!(
  288. "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
  289. case.barcode
  290. );
  291. let dorado = format!(
  292. "{} aligner --threads 160 {} {trimmed_bam}",
  293. config.align.dorado_bin, config.align.ref_fa,
  294. );
  295. let samtools_view = format!(
  296. "samtools view -h -@ {} -b /dev/stdin",
  297. &config.align.samtools_view_threads
  298. );
  299. let samtools_sort = format!(
  300. "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
  301. &config.align.samtools_sort_threads
  302. );
  303. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  304. info!("Running {pipe}");
  305. cmd!("bash", "-c", pipe).run()?;
  306. let d = Dorado::init(case.clone(), config.clone())?;
  307. d.create_directories()?;
  308. if PathBuf::from(&d.bam).exists() {
  309. info!("merge");
  310. d.merge_bam(&PathBuf::from(aligned_bam))?;
  311. } else {
  312. info!("Moving from {} to {}", bam, d.bam);
  313. fs::rename(aligned_bam, d.bam.clone())?;
  314. d.index()?;
  315. }
  316. d.run_cramino()?;
  317. d.run_modkit()?;
  318. }
  319. fs::remove_dir(tmp_dir)?;
  320. Ok(())
  321. }
  322. pub fn run_pipe(&mut self) -> anyhow::Result<()> {
  323. let start_time = std::time::SystemTime::now();
  324. self.start_time = start_time;
  325. info!("Running Dorado with params: {:#?}", self.config);
  326. let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
  327. self.create_reference_mmi()?;
  328. self.create_directories()?;
  329. info!(
  330. "Reading {} pod5 from: {}",
  331. self.case.time_point, self.config.pod_dir
  332. );
  333. let bam_path = std::path::Path::new(&self.bam);
  334. if !bam_path.exists() {
  335. self.basecall_align(dorado_bin)?;
  336. } else {
  337. let new_bam_path = bam_path
  338. .parent()
  339. .unwrap()
  340. .join(format!("{}.bam", Uuid::new_v4()));
  341. warn!("Creating new bam {}", new_bam_path.display());
  342. self.basecall_align(dorado_bin)?;
  343. self.merge_bam(&new_bam_path)?;
  344. }
  345. self.run_cramino()?;
  346. self.run_modkit()?;
  347. self.create_fastq()?;
  348. let end_time = std::time::SystemTime::now();
  349. self.end_time = end_time;
  350. let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
  351. info!(
  352. "Dorado and Minimap2 execution time: {} seconds",
  353. execution_time
  354. );
  355. self.is_done = true;
  356. Ok(())
  357. }
  358. }