samtools.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. use std::{fs, path::PathBuf};
  2. use anyhow::Context;
  3. use log::info;
  4. use uuid::Uuid;
  5. use crate::{
  6. collection::bam::bam_composition,
  7. commands::{SbatchRunner, SlurmParams},
  8. config::Config,
  9. };
  10. /// Wrapper around a `samtools index` invocation.
  11. ///
  12. /// This struct is meant to be used with the [`super::Command`] trait to
  13. /// build a command-line string of the form:
  14. ///
  15. /// ```text
  16. /// <bin> index -@ <threads> <bam>
  17. /// ```
  18. #[derive(Debug)]
  19. pub struct SamtoolsIndex {
  20. /// Path to the `samtools` binary.
  21. pub bin: String,
  22. /// Number of threads passed to `-@`.
  23. pub threads: u8,
  24. /// Path to the BAM file to index.
  25. pub bam: String,
  26. }
  27. impl super::Command for SamtoolsIndex {
  28. /// Build the `samtools index` command line as a single string.
  29. fn cmd(&self) -> String {
  30. format!("{} index -@ {} {}", self.bin, self.threads, self.bam)
  31. }
  32. }
  33. impl super::Runner for SamtoolsIndex {}
  34. impl super::SlurmRunner for SamtoolsIndex {
  35. fn slurm_args(&self) -> Vec<String> {
  36. SlurmParams {
  37. job_name: Some("samtools_index".into()),
  38. cpus_per_task: Some(self.threads as u32),
  39. mem: Some("60G".into()),
  40. partition: Some("shortq".into()),
  41. gres: None,
  42. }
  43. .to_args()
  44. }
  45. }
  46. /// Wrapper around a `samtools merge` invocation used to append one BAM into
  47. /// another while preserving read group (RG) uniqueness.
  48. ///
  49. /// Typical lifecycle with the [`super::Command`] trait:
  50. ///
  51. /// 1. [`init`](super::Command::init):
  52. /// - Checks that `into` and `bam` exist.
  53. /// - Checks that there is no RG overlap between `into` and `bam`.
  54. /// - Renames `into` and its index to temporary files in the same directory.
  55. /// 2. [`cmd`](super::Command::cmd):
  56. /// - Returns the `samtools merge` command line using:
  57. /// - `-h <bam>`
  58. /// - Output: `<into>`
  59. /// - Inputs: `<bam> <tmp_bam>`
  60. /// 3. [`clean_up`](super::Command::clean_up):
  61. /// - Removes the temporary BAM, its index, and the source `bam`.
  62. ///
  63. /// # Invariants
  64. ///
  65. /// - `init()` must be called successfully before `cmd()`/`clean_up()`.
  66. /// - `tmp_bam` and `tmp_bam_index` are only valid after `init()` has run.
  67. pub struct SamtoolsMerge {
  68. /// Path to the `samtools` binary.
  69. pub bin: String,
  70. /// Number of threads passed to `-@`.
  71. pub threads: u8,
  72. /// Destination BAM (final merged output path).
  73. pub into: PathBuf,
  74. /// Source BAM to merge into `into`.
  75. pub bam: PathBuf,
  76. /// Temporary location for the original `into` BAM (created in `init()`).
  77. tmp_bam: PathBuf,
  78. /// Temporary location for the original `into` BAI (created in `init()`).
  79. tmp_bam_index: PathBuf,
  80. }
  81. impl super::Command for SamtoolsMerge {
  82. /// Prepare the filesystem and validate preconditions for the merge.
  83. ///
  84. /// This method:
  85. /// - Ensures both `into` and `bam` BAM files exist.
  86. /// - Ensures there is no overlap of RG IDs between the two BAMs.
  87. /// - Ensures a BAI index exists for `into`.
  88. /// - Moves `into` and its BAI to temporary files in the same directory.
  89. fn init(&mut self) -> anyhow::Result<()> {
  90. // Collect RG IDs from destination BAM.
  91. let composition_a: Vec<String> =
  92. bam_composition(self.into.to_string_lossy().as_ref(), 20000)?
  93. .iter()
  94. .map(|(rg, _, _)| rg.clone())
  95. .collect();
  96. // Collect RG IDs from source BAM.
  97. let composition_b: Vec<String> =
  98. bam_composition(self.bam.to_string_lossy().as_ref(), 20000)?
  99. .iter()
  100. .map(|(rg, _, _)| rg.clone())
  101. .collect();
  102. // Check for overlapping RGs between the two BAM files.
  103. let n_id = composition_a
  104. .iter()
  105. .filter(|id| composition_b.contains(id))
  106. .count();
  107. if n_id > 0 {
  108. anyhow::bail!(
  109. "{} {} are already merged, reads with the same RG in the destination BAM.",
  110. self.into.display(),
  111. self.bam.display()
  112. );
  113. }
  114. if !self.into.exists() {
  115. anyhow::bail!("BAM file doesn't exists for: {}", self.into.display());
  116. }
  117. if !self.bam.exists() {
  118. anyhow::bail!("BAM file doesn't exists for: {}", self.bam.display());
  119. }
  120. let dir = self.into.parent().context(format!(
  121. "Failed to find parent dir of: {}",
  122. self.into.display()
  123. ))?;
  124. let into_file_name = self
  125. .into
  126. .file_name()
  127. .context(format!(
  128. "Failed to get file name of: {}",
  129. self.into.display()
  130. ))?
  131. .to_string_lossy()
  132. .to_string();
  133. // For merging, the destination BAM must be indexed.
  134. let from_index = dir.join(format!("{into_file_name}.bai"));
  135. if !from_index.exists() {
  136. anyhow::bail!("BAI file is required for: {}", self.into.display());
  137. }
  138. let tmp_original_file = format!("{}.bam", Uuid::new_v4());
  139. self.tmp_bam = dir.join(&tmp_original_file);
  140. self.tmp_bam_index = dir.join(format!("{tmp_original_file}.bai"));
  141. // Move BAM and BAI to temporary names.
  142. info!(
  143. "Moving {} to {}",
  144. self.into.display(),
  145. self.tmp_bam.display()
  146. );
  147. fs::rename(&self.into, &self.tmp_bam).context(format!(
  148. "Failed to move {} to {}",
  149. self.into.display(),
  150. self.tmp_bam.display()
  151. ))?;
  152. info!(
  153. "Moving {} to {}",
  154. from_index.display(),
  155. self.tmp_bam_index.display()
  156. );
  157. fs::rename(&from_index, &self.tmp_bam_index).context(format!(
  158. "Failed to move {} to {}",
  159. from_index.display(),
  160. self.tmp_bam_index.display()
  161. ))?;
  162. Ok(())
  163. }
  164. /// Build the `samtools merge` command line as a single string.
  165. ///
  166. /// Output: `self.into`
  167. /// Inputs (in order): `self.bam`, `self.tmp_bam`
  168. fn cmd(&self) -> String {
  169. format!(
  170. "{} merge -@ {} -h {} {} {} {}",
  171. self.bin,
  172. self.threads,
  173. self.bam.display(),
  174. self.into.display(),
  175. self.bam.display(),
  176. self.tmp_bam.display()
  177. )
  178. }
  179. /// Clean up temporary files after a successful merge.
  180. ///
  181. /// This removes:
  182. /// - The temporary original BAM (`tmp_bam`)
  183. /// - The temporary original index (`tmp_bam_index`)
  184. /// - The source BAM (`bam`)
  185. fn clean_up(&self) -> anyhow::Result<()> {
  186. fs::remove_file(&self.tmp_bam)?;
  187. fs::remove_file(&self.tmp_bam_index)?;
  188. fs::remove_file(&self.bam)?;
  189. Ok(())
  190. }
  191. }
  192. impl super::Runner for SamtoolsMerge {}
  193. impl super::SlurmRunner for SamtoolsMerge {
  194. fn slurm_args(&self) -> Vec<String> {
  195. SlurmParams {
  196. job_name: Some("samtools_merge".into()),
  197. cpus_per_task: Some(self.threads as u32),
  198. mem: Some("60G".into()),
  199. partition: Some("shortq".into()),
  200. gres: None,
  201. }
  202. .to_args()
  203. }
  204. }
  205. #[derive(Debug)]
  206. pub struct SamtoolsSplit {
  207. /// Path to the `samtools` executable.
  208. pub samtools: PathBuf,
  209. /// Number of threads to use with `samtools split` (`-@` option).
  210. pub threads: u8,
  211. /// Input BAM file to demultiplex.
  212. pub input_bam: PathBuf,
  213. /// Output directory where split BAM files will be written.
  214. pub output_dir: PathBuf,
  215. }
  216. impl SamtoolsSplit {
  217. pub fn from_config(config: &Config, input_bam: PathBuf, output_dir: PathBuf) -> Self {
  218. Self {
  219. samtools: (&config.align.samtools_bin).into(),
  220. threads: config.align.samtools_split_threads,
  221. input_bam,
  222. output_dir,
  223. }
  224. }
  225. }
  226. impl super::Command for SamtoolsSplit {
  227. fn init(&mut self) -> anyhow::Result<()> {
  228. if !self.input_bam.exists() {
  229. anyhow::bail!("No BAM file at: {}", self.input_bam.display());
  230. }
  231. if self.output_dir.exists() {
  232. anyhow::bail!("Output dir already exists: {}", self.output_dir.display());
  233. } else {
  234. fs::create_dir(&self.output_dir).context(format!(
  235. "Failed to create dir: {}",
  236. self.output_dir.display()
  237. ))?
  238. }
  239. Ok(())
  240. }
  241. /// Build the `samtools split` command line as a single shell string.
  242. fn cmd(&self) -> String {
  243. format!(
  244. "{} split -@ {} -f '{}/%*_%!.bam' {}",
  245. self.samtools.display(),
  246. self.threads,
  247. self.output_dir.display(),
  248. self.input_bam.display()
  249. )
  250. }
  251. }
  252. impl super::Runner for SamtoolsSplit {}
  253. impl super::SlurmRunner for SamtoolsSplit {
  254. fn slurm_args(&self) -> Vec<String> {
  255. SlurmParams {
  256. job_name: Some("samtools_split".into()),
  257. cpus_per_task: Some(self.threads as u32),
  258. mem: Some("60G".into()),
  259. partition: Some("shortq".into()),
  260. gres: None,
  261. }
  262. .to_args()
  263. }
  264. }
  265. #[derive(Debug)]
  266. pub struct SamtoolsSort {
  267. /// Path to the `samtools` executable.
  268. pub samtools: PathBuf,
  269. /// Number of threads to use with `samtools sort` (`-@` option).
  270. pub threads: u8,
  271. /// Input BAM file to sort.
  272. pub input_bam: PathBuf,
  273. /// Output sorted BAM.
  274. pub output_bam: PathBuf,
  275. /// Slurm params
  276. pub slurm_params: SlurmParams,
  277. }
  278. impl SamtoolsSort {
  279. pub fn from_config(config: &Config, input_bam: PathBuf, output_bam: PathBuf) -> Self {
  280. let threads = config.align.samtools_sort_threads;
  281. Self {
  282. samtools: (&config.align.samtools_bin).into(),
  283. threads,
  284. input_bam,
  285. output_bam,
  286. slurm_params: SlurmParams {
  287. job_name: Some("samtools_split".into()),
  288. cpus_per_task: Some(threads as u32),
  289. mem: Some("60G".into()),
  290. partition: Some("shortq".into()),
  291. gres: None,
  292. },
  293. }
  294. }
  295. }
  296. impl super::Command for SamtoolsSort {
  297. fn init(&mut self) -> anyhow::Result<()> {
  298. if !self.input_bam.exists() {
  299. anyhow::bail!("No BAM file at: {}", self.input_bam.display());
  300. }
  301. if self.output_bam.exists() {
  302. anyhow::bail!("Output BAM already exists: {}", self.output_bam.display());
  303. }
  304. Ok(())
  305. }
  306. /// Build the `samtools sort` command line as a single shell string.
  307. fn cmd(&self) -> String {
  308. format!(
  309. "{} sort -@ {} -o {} {}",
  310. self.samtools.display(),
  311. self.threads,
  312. self.input_bam.display(),
  313. self.output_bam.display(),
  314. )
  315. }
  316. }
  317. impl super::Runner for SamtoolsSort {}
  318. impl super::SlurmRunner for SamtoolsSort {
  319. fn slurm_args(&self) -> Vec<String> {
  320. self.slurm_params.to_args()
  321. }
  322. }
  323. impl SbatchRunner for SamtoolsSort {
  324. fn sbatch_extra_args(&self) -> Vec<String> {
  325. self.slurm_params.to_args()
  326. }
  327. }
  328. struct TestRun;
  329. impl super::Command for TestRun {
  330. fn cmd(&self) -> String {
  331. "echo 'hello from sbatch'".to_string()
  332. }
  333. }
  334. impl SbatchRunner for TestRun {
  335. fn slurm_params(&self) -> SlurmParams {
  336. SlurmParams {
  337. job_name: Some("test-sbatch".into()),
  338. // Make sure this partition exists on your cluster
  339. partition: Some("gpgpuq".into()), // or your default compute partition
  340. // Often required: cpus + mem, maybe account
  341. cpus_per_task: Some(1),
  342. mem: Some("1G".into()),
  343. gres: None,
  344. }
  345. }
  346. fn sbatch_extra_args(&self) -> Vec<String> {
  347. // If your cluster requires account:
  348. // vec!["--account=myaccount".into()]
  349. Vec::new()
  350. }
  351. }
  352. // pub struct TestRun {}
  353. // impl super::Command for TestRun {
  354. // fn cmd(&self) -> String {
  355. // "echo 'test'".to_string()
  356. // }
  357. // }
  358. // impl super::Runner for TestRun {}
  359. // impl super::SlurmRunner for TestRun {}
  360. #[cfg(test)]
  361. mod tests {
  362. use super::*;
  363. use log::info;
  364. use crate::{
  365. commands::{run_many_sbatch, SlurmRunner},
  366. config::Config,
  367. helpers::test_init,
  368. TEST_DIR,
  369. };
  370. #[test]
  371. fn sbatch_test_run() -> anyhow::Result<()> {
  372. let mut t = TestRun;
  373. let out = SbatchRunner::run(&mut t)?;
  374. println!("{out:#?}");
  375. // assert!(out.stdout.contains("hello from sbatch"));
  376. Ok(())
  377. }
  378. #[test]
  379. fn samtools_index_merge() -> anyhow::Result<()> {
  380. test_init();
  381. let config = Config::default();
  382. info!("Copying the test BAM 1.");
  383. fs::copy(
  384. format!("{}/inputs/fcA_NB02.bam", TEST_DIR.as_str()),
  385. format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
  386. )?;
  387. info!("Copying the test BAM 2.");
  388. fs::copy(
  389. format!("{}/inputs/fcB_NB02.bam", TEST_DIR.as_str()),
  390. format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
  391. )?;
  392. info!("Indexing BAM 1.");
  393. let mut idx = SamtoolsIndex {
  394. bin: config.align.samtools_bin.clone(),
  395. threads: config.align.samtools_view_threads,
  396. bam: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
  397. };
  398. let captured_output = SlurmRunner::run(&mut idx)?;
  399. assert_eq!(captured_output.stderr, String::new());
  400. info!("Indexing BAM 2.");
  401. let mut idx = SamtoolsIndex {
  402. bin: config.align.samtools_bin.clone(),
  403. threads: config.align.samtools_view_threads,
  404. bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
  405. };
  406. let captured_output = SlurmRunner::run(&mut idx)?;
  407. assert_eq!(captured_output.stderr, String::new());
  408. info!("Mergin both BAM.");
  409. let mut idx = SamtoolsMerge {
  410. bin: config.align.samtools_bin,
  411. threads: config.align.samtools_merge_threads as u8,
  412. bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(),
  413. into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(),
  414. tmp_bam: "".into(),
  415. tmp_bam_index: "".into(),
  416. };
  417. let captured_output = SlurmRunner::run(&mut idx)?;
  418. assert_eq!(captured_output.stderr, String::new());
  419. // TODO: add number of reads verification use bam_compo
  420. Ok(())
  421. }
  422. #[test]
  423. fn samtools_split() -> anyhow::Result<()> {
  424. test_init();
  425. let config = Config::default();
  426. let mut cmd = SamtoolsSplit::from_config(
  427. &config,
  428. format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
  429. format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()).into(),
  430. );
  431. let captured_output = SlurmRunner::run(&mut cmd)?;
  432. assert_eq!(captured_output.stderr, String::new());
  433. Ok(())
  434. }
  435. #[test]
  436. fn samtools_sort_sbatch() -> anyhow::Result<()> {
  437. test_init();
  438. let config = Config::default();
  439. let sort_1 = SamtoolsSort::from_config(
  440. &config,
  441. format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()).into(),
  442. format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()).into(),
  443. );
  444. let sort_2 = SamtoolsSort::from_config(
  445. &config,
  446. format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()).into(),
  447. format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()).into(),
  448. );
  449. let r = run_many_sbatch(vec![sort_1, sort_2])?;
  450. println!("{r:#?}");
  451. Ok(())
  452. }
  453. }