dorado.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. use std::{
  2. fs::{self, File},
  3. path::{Path, PathBuf},
  4. time::SystemTime,
  5. };
  6. use duct::cmd;
  7. use log::{debug, info, warn};
  8. use uuid::Uuid;
  9. use crate::{
  10. collection::{bam::bam_compo, flowcells::FlowCell, pod5::FlowCellCase},
  11. config::Config,
  12. helpers::find_unique_file,
  13. io::pod5_infos::Pod5Info,
  14. };
  15. #[derive(Debug, Clone)]
  16. pub struct DoradoParams {
  17. pub ref_fa: String,
  18. pub ref_mmi: String,
  19. pub name: String,
  20. pub time: String,
  21. pub pod_dir: String,
  22. pub samtools_view_threads: u16,
  23. pub samtools_sort_threads: u16,
  24. }
  25. pub struct Dorado {
  26. config: Config,
  27. case: FlowCellCase,
  28. case_dir: String,
  29. time_dir: String,
  30. bam: String,
  31. start_time: SystemTime,
  32. end_time: SystemTime,
  33. is_done: bool,
  34. }
  35. impl Dorado {
  36. pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
  37. let data_dir = &config.result_dir;
  38. let case_dir = format!("{}/{}", data_dir, case.id);
  39. let time_dir = format!("{}/{}", case_dir, case.time_point);
  40. let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
  41. debug!("Dorado init with config: {config:#?}");
  42. info!("Final BAM file: {bam}");
  43. Ok(Self {
  44. config,
  45. start_time: SystemTime::now(),
  46. end_time: SystemTime::now(),
  47. is_done: false,
  48. case_dir,
  49. time_dir,
  50. bam,
  51. case,
  52. })
  53. }
  54. fn create_reference_mmi(&self) -> anyhow::Result<()> {
  55. if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
  56. cmd!(
  57. "minimap2",
  58. "-x",
  59. "map-ont",
  60. "-d",
  61. &self.config.align.ref_mmi,
  62. &self.config.align.ref_fa
  63. )
  64. .run()?;
  65. }
  66. Ok(())
  67. }
  68. fn create_directories(&self) -> anyhow::Result<()> {
  69. if !std::path::Path::new(&self.case_dir).exists() {
  70. info!("Creating directory {}", self.case_dir);
  71. fs::create_dir(&self.case_dir)?;
  72. }
  73. if !std::path::Path::new(&self.time_dir).exists() {
  74. info!("Creating directory {}", self.time_dir);
  75. fs::create_dir(&self.time_dir)?;
  76. }
  77. Ok(())
  78. }
  79. fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
  80. let pod_dir = &self.case.pod_dir;
  81. let ref_mmi = &self.config.align.ref_mmi;
  82. let bam = &self.bam;
  83. let samtools_view_threads = self.config.align.samtools_view_threads;
  84. let samtools_sort_threads = self.config.align.samtools_sort_threads;
  85. let dorado_arg = self.config.align.dorado_basecall_arg.clone();
  86. let pod_path = fs::read_dir(pod_dir)
  87. .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))?
  88. .filter_map(|p| p.ok())
  89. .map(|p| p.path())
  90. .filter(|p| p.extension().unwrap() == "pod5")
  91. .take(1)
  92. .collect::<Vec<PathBuf>>()
  93. .pop()
  94. .unwrap();
  95. let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
  96. .sequencing_kit
  97. .to_uppercase();
  98. let dorado = format!(
  99. "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_mmi} ",
  100. pod_dir.display()
  101. );
  102. info!("running Dorado: {dorado}");
  103. let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
  104. let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
  105. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  106. info!("Running: {pipe}");
  107. let pipe_cmd = cmd!("bash", "-c", &pipe);
  108. pipe_cmd
  109. .run()
  110. .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
  111. // let pipe_cmd = cmd!("bash", "-c", pipe);
  112. // let mut reader = pipe_cmd.stderr_capture().reader()?;
  113. //
  114. // let mut buffer = [0; 1];
  115. // let mut line = String::new();
  116. //
  117. // loop {
  118. // match reader.read(&mut buffer) {
  119. // Ok(0) => break, // End of output
  120. // Ok(_) => {
  121. // let char = buffer[0] as char;
  122. // eprint!("-{}", char);
  123. // std::io::stderr().flush()?;
  124. //
  125. // if char == '\n' {
  126. // // Send the complete line
  127. // self.log.push(line.clone());
  128. // line.clear();
  129. // } else {
  130. // line.push(char);
  131. // }
  132. // }
  133. // Err(err) => {
  134. // warn!("Error reading from stderr: {}", err);
  135. // break;
  136. // }
  137. // }
  138. // }
  139. Ok(())
  140. }
  141. pub fn index(&self) -> anyhow::Result<()> {
  142. let t = self.config.align.samtools_view_threads.to_string();
  143. let cmd = format!("index -@ {t} {}", &self.bam);
  144. info!("Running samtools {cmd}");
  145. cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
  146. Ok(())
  147. }
  148. pub fn run_cramino(&self) -> anyhow::Result<()> {
  149. let cramino_out = format!(
  150. "{}/{}_{}_hs1_cramino.txt",
  151. self.time_dir, self.case.id, self.case.time_point
  152. );
  153. info!("Quality control with cramino for BAM: {}", self.bam);
  154. let output = duct::cmd!("cramino", "-t", "150", "--karyotype", &self.bam)
  155. .stdout_capture()
  156. .unchecked()
  157. .run()?;
  158. fs::write(cramino_out, output.stdout)?;
  159. Ok(())
  160. }
  161. pub fn run_modkit(&self) -> anyhow::Result<()> {
  162. let mod_summary = format!(
  163. "{}/{}_{}_5mC_5hmC_summary.txt",
  164. self.time_dir, self.case.id, self.case.time_point
  165. );
  166. info!("Generating base modification summary for BAM: {}", self.bam);
  167. let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
  168. .stdout_capture()
  169. .unchecked()
  170. .run()?;
  171. fs::write(mod_summary, output.stdout)?;
  172. Ok(())
  173. }
  174. pub fn create_fastq(&self) -> anyhow::Result<()> {
  175. let bam = &self.bam;
  176. let fastq = format!(
  177. "{}/{}/{}/{}_{}.fastq.gz",
  178. self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
  179. );
  180. if !std::path::Path::new(&fastq).exists() {
  181. let samtools = format!("samtools fastq -@ 150 {bam}");
  182. let crabz = format!("crabz -f bgzf - -o {fastq}");
  183. let pipe = format!("{samtools} | {crabz}");
  184. info!("Running: {pipe}");
  185. let pipe_cmd = duct::cmd!("bash", "-c", pipe);
  186. pipe_cmd.run()?;
  187. }
  188. Ok(())
  189. }
  190. pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
  191. let composition_a: Vec<String> = bam_compo(bam.to_string_lossy().as_ref(), 20000)?
  192. .iter()
  193. .map(|(i, _, _)| i.clone())
  194. .collect();
  195. let composition_b: Vec<String> = bam_compo(&self.bam, 20000)?
  196. .iter()
  197. .map(|(i, _, _)| i.clone())
  198. .collect();
  199. let n_id = composition_a
  200. .iter()
  201. .filter(|id| composition_b.contains(id))
  202. .count();
  203. if n_id > 0 {
  204. warn!(
  205. "{} is already merged, reads with the same run_id in the destination BAM.",
  206. self.case.id
  207. );
  208. return Ok(());
  209. }
  210. let into = PathBuf::from(&self.bam);
  211. let dir = into.parent().unwrap();
  212. let original_file = into.file_name().unwrap().to_string_lossy().to_string();
  213. let original_i = dir.join(format!("{original_file}.bai"));
  214. if !original_i.exists() {
  215. self.index()?;
  216. }
  217. let tmp_original_file = format!("{}.bam", Uuid::new_v4());
  218. let tmp_original = dir.join(tmp_original_file.clone());
  219. let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
  220. info!("Moving {} to {}", &into.display(), &tmp_original.display());
  221. fs::rename(&into, &tmp_original)?;
  222. info!(
  223. "Moving {} to {}",
  224. &original_i.display(),
  225. &tmp_original_i.display()
  226. );
  227. fs::rename(original_i, tmp_original_i.clone())?;
  228. let cmd = format!(
  229. "samtools merge -@ 160 -h {} {} {} {}",
  230. bam.display(),
  231. into.display(),
  232. bam.display(),
  233. tmp_original.display()
  234. );
  235. info!("Running {cmd}");
  236. cmd!(
  237. "samtools",
  238. "merge",
  239. "-@",
  240. "160",
  241. "-h",
  242. bam,
  243. into,
  244. bam,
  245. tmp_original.clone()
  246. )
  247. .run()?;
  248. fs::remove_file(tmp_original)?;
  249. fs::remove_file(tmp_original_i)?;
  250. fs::remove_file(bam)?;
  251. self.index()?;
  252. Ok(())
  253. }
  254. pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
  255. // Creating a temporary directory
  256. let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
  257. info!("Creating tmp dir {tmp_dir}");
  258. fs::create_dir(&tmp_dir)?;
  259. // Dorado base calling and align into a temporary bam file
  260. let muxed_bam = format!("{tmp_dir}/muxed.bam");
  261. let dorado_bin = &config.align.dorado_bin;
  262. let dorado_arg = &config.align.dorado_basecall_arg;
  263. let pod_dir = cases[0].pod_dir.display();
  264. let ref_mmi = &config.align.ref_mmi;
  265. let samtools_view_threads = config.align.samtools_view_threads;
  266. // Get the sequencing kit from the first pod5 file
  267. let muxed_pod_dir = &cases.first().unwrap().pod_dir;
  268. let pod_path = fs::read_dir(muxed_pod_dir)
  269. .map_err(|e| {
  270. anyhow::anyhow!(
  271. "Failed to read pod5 dir: {}.\n\t{e}",
  272. muxed_pod_dir.display()
  273. )
  274. })?
  275. .filter_map(|p| p.ok())
  276. .map(|p| p.path())
  277. .filter(|p| p.extension().unwrap() == "pod5")
  278. .take(1)
  279. .collect::<Vec<PathBuf>>()
  280. .pop()
  281. .unwrap();
  282. let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
  283. .sequencing_kit
  284. .to_uppercase();
  285. let dorado = format!(
  286. "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
  287. );
  288. let samtools_view =
  289. format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
  290. let pipe = format!("{dorado} | {samtools_view}");
  291. info!("Running: {pipe}");
  292. let pipe_cmd = cmd!("bash", "-c", &pipe);
  293. pipe_cmd
  294. .run()
  295. .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
  296. info!("Basecalling ✅");
  297. // Demux the temporary bam file
  298. let tmp_demux_dir = format!("{tmp_dir}/demuxed");
  299. fs::create_dir(&tmp_demux_dir)?;
  300. let pipe = format!(
  301. "samtools split -@ {} -f '{tmp_demux_dir}/%*_%!.bam' {muxed_bam}",
  302. config.align.samtools_view_threads
  303. );
  304. info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
  305. info!("Running: {pipe}");
  306. let pipe_cmd = cmd!("bash", "-c", pipe);
  307. pipe_cmd.run()?;
  308. info!("Demux ✅");
  309. for case in cases.iter() {
  310. let barcode = case.barcode.replace("NB", "");
  311. let bam = find_unique_file(
  312. &tmp_demux_dir,
  313. &format!("{sequencing_kit}_barcode{}.bam", barcode),
  314. )?;
  315. // Align
  316. let aligned_bam = format!(
  317. "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
  318. barcode
  319. );
  320. let dorado = format!(
  321. // "{} aligner --threads 160 {} {trimmed_bam}",
  322. "{} aligner --threads 160 {} {bam}",
  323. config.align.dorado_bin, config.align.ref_fa,
  324. );
  325. let samtools_view = format!(
  326. "samtools view -h -@ {} -b /dev/stdin",
  327. &config.align.samtools_view_threads
  328. );
  329. let samtools_sort = format!(
  330. "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
  331. &config.align.samtools_sort_threads
  332. );
  333. let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
  334. info!("Running {pipe}");
  335. cmd!("bash", "-c", pipe).run()?;
  336. info!("Alignement ✅");
  337. let d = Dorado::init(case.clone(), config.clone())?;
  338. d.create_directories()?;
  339. if PathBuf::from(&d.bam).exists() {
  340. info!("Merging");
  341. d.merge_bam(&PathBuf::from(aligned_bam))?;
  342. } else {
  343. info!("Moving from {} to {}", bam, d.bam);
  344. fs::rename(aligned_bam, d.bam.clone())?;
  345. d.index()?;
  346. }
  347. d.run_cramino()?;
  348. d.run_modkit()?;
  349. }
  350. fs::remove_dir_all(tmp_dir)?;
  351. Ok(())
  352. }
  353. pub fn run_pipe(&mut self) -> anyhow::Result<()> {
  354. let start_time = std::time::SystemTime::now();
  355. self.start_time = start_time;
  356. debug!("Running Dorado with config: {:#?}", self.config);
  357. let dorado_bin = self.config.align.dorado_bin.clone();
  358. self.create_reference_mmi()?;
  359. self.create_directories()?;
  360. info!(
  361. "Reading {} pod5 from: {}",
  362. self.case.time_point, self.config.pod_dir
  363. );
  364. let bam_path = std::path::Path::new(&self.bam);
  365. if !bam_path.exists() {
  366. info!("Creating new bam file");
  367. self.basecall_align(&dorado_bin)?;
  368. self.index()?;
  369. } else {
  370. // check if merge before call
  371. let new_bam_path = bam_path
  372. .parent()
  373. .unwrap()
  374. .join(format!("{}.bam", Uuid::new_v4()));
  375. warn!("Creating new bam {}", new_bam_path.display());
  376. let bam = self.bam.clone();
  377. self.bam = new_bam_path.clone().to_string_lossy().to_string();
  378. self.basecall_align(&dorado_bin)?;
  379. self.bam.clone_from(&bam);
  380. self.merge_bam(&new_bam_path)?;
  381. }
  382. self.run_cramino()?;
  383. self.run_modkit()?;
  384. // self.create_fastq()?;
  385. let end_time = std::time::SystemTime::now();
  386. self.end_time = end_time;
  387. let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
  388. info!(
  389. "Dorado and Minimap2 execution time: {} seconds",
  390. execution_time
  391. );
  392. self.is_done = true;
  393. Ok(())
  394. }
  395. pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
  396. let tp_conv = |time_point: &str| -> String {
  397. match time_point {
  398. "normal" => config.normal_name.clone(),
  399. "tumoral" => config.tumoral_name.clone(),
  400. _ => panic!("Error time point name"),
  401. }
  402. };
  403. use crate::collection::flowcells::FlowCellLocation::*;
  404. let base_pod_dir = match &flowcell.location {
  405. Local(_) => None,
  406. Archived(pod_tar) => {
  407. let file = File::open(pod_tar)
  408. .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?;
  409. let mut archive = tar::Archive::new(file);
  410. info!("Un-tar of archived {pod_tar}");
  411. archive
  412. .unpack(&config.unarchive_tmp_dir)
  413. .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?;
  414. info!("Un-tar of archived {pod_tar} Done.");
  415. Some(config.unarchive_tmp_dir.to_string())
  416. }
  417. };
  418. use crate::collection::flowcells::FlowCellExperiment::*;
  419. match &flowcell.experiment {
  420. WGSPod5Mux(pod_dir) => {
  421. let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
  422. format!("{base_pod_dir}/{pod_dir}")
  423. } else {
  424. pod_dir.clone()
  425. };
  426. let cases = flowcell
  427. .cases
  428. .iter()
  429. .map(|c| FlowCellCase {
  430. id: c.case_id.clone(),
  431. time_point: tp_conv(&c.sample_type),
  432. barcode: c.barcode.clone(),
  433. pod_dir: pod_dir.clone().into(),
  434. })
  435. .collect();
  436. info!("Starting basecaller for muxed pod5: {cases:#?}");
  437. Dorado::from_mux(cases, config.clone())?;
  438. }
  439. WGSPod5Demux(pod_dir) => {
  440. let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
  441. format!("{base_pod_dir}/{pod_dir}")
  442. } else {
  443. pod_dir.clone()
  444. };
  445. for c in flowcell.cases.iter() {
  446. let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", ""));
  447. info!("Starting basecaller for demuxed pod5: {pod_dir}");
  448. let mut d = Dorado::init(
  449. FlowCellCase {
  450. id: c.case_id.clone(),
  451. time_point: tp_conv(&c.sample_type),
  452. barcode: c.barcode.clone(),
  453. pod_dir: pod_dir.into(),
  454. },
  455. config.clone(),
  456. )?;
  457. d.run_pipe()?;
  458. }
  459. }
  460. }
  461. Ok(())
  462. }
  463. }