| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- use std::{fs, path::PathBuf};
- use anyhow::Context;
- use log::info;
- use uuid::Uuid;
- use crate::{
- collection::bam::bam_composition,
- commands::{SbatchRunner, SlurmParams},
- config::Config,
- };
- /// Wrapper around a `samtools index` invocation.
- ///
- /// This struct is meant to be used with the [`super::Command`] trait to
- /// build a command-line string of the form:
- ///
- /// ```text
- /// <bin> index -@ <threads> <bam>
- /// ```
- #[derive(Debug)]
- pub struct SamtoolsIndex {
- /// Path to the `samtools` binary.
- pub bin: String,
- /// Number of threads passed to `-@`.
- pub threads: u8,
- /// Path to the BAM file to index.
- pub bam: String,
- }
- impl super::Command for SamtoolsIndex {
- /// Build the `samtools index` command line as a single string.
- fn cmd(&self) -> String {
- format!("{} index -@ {} {}", self.bin, self.threads, self.bam)
- }
- }
- impl super::Runner for SamtoolsIndex {}
- impl super::SlurmRunner for SamtoolsIndex {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("samtools_index".into()),
- cpus_per_task: Some(self.threads as u32),
- mem: Some("60G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- .to_args()
- }
- }
- /// Wrapper around a `samtools merge` invocation used to append one BAM into
- /// another while preserving read group (RG) uniqueness.
- ///
- /// Typical lifecycle with the [`super::Command`] trait:
- ///
- /// 1. [`init`](super::Command::init):
- /// - Checks that `into` and `bam` exist.
- /// - Checks that there is no RG overlap between `into` and `bam`.
- /// - Renames `into` and its index to temporary files in the same directory.
- /// 2. [`cmd`](super::Command::cmd):
- /// - Returns the `samtools merge` command line using:
- /// - `-h <bam>`
- /// - Output: `<into>`
- /// - Inputs: `<bam> <tmp_bam>`
- /// 3. [`clean_up`](super::Command::clean_up):
- /// - Removes the temporary BAM, its index, and the source `bam`.
- ///
- /// # Invariants
- ///
- /// - `init()` must be called successfully before `cmd()`/`clean_up()`.
- /// - `tmp_bam` and `tmp_bam_index` are only valid after `init()` has run.
- pub struct SamtoolsMerge {
- /// Path to the `samtools` binary.
- pub bin: String,
- /// Number of threads passed to `-@`.
- pub threads: u8,
- /// Destination BAM (final merged output path).
- pub into: PathBuf,
- /// Source BAM to merge into `into`.
- pub bam: PathBuf,
- /// Temporary location for the original `into` BAM (created in `init()`).
- tmp_bam: PathBuf,
- /// Temporary location for the original `into` BAI (created in `init()`).
- tmp_bam_index: PathBuf,
- }
- impl super::Command for SamtoolsMerge {
- /// Prepare the filesystem and validate preconditions for the merge.
- ///
- /// This method:
- /// - Ensures both `into` and `bam` BAM files exist.
- /// - Ensures there is no overlap of RG IDs between the two BAMs.
- /// - Ensures a BAI index exists for `into`.
- /// - Moves `into` and its BAI to temporary files in the same directory.
- fn init(&mut self) -> anyhow::Result<()> {
- // Collect RG IDs from destination BAM.
- let composition_a: Vec<String> =
- bam_composition(self.into.to_string_lossy().as_ref(), 20000)?
- .iter()
- .map(|(rg, _, _)| rg.clone())
- .collect();
- // Collect RG IDs from source BAM.
- let composition_b: Vec<String> =
- bam_composition(self.bam.to_string_lossy().as_ref(), 20000)?
- .iter()
- .map(|(rg, _, _)| rg.clone())
- .collect();
- // Check for overlapping RGs between the two BAM files.
- let n_id = composition_a
- .iter()
- .filter(|id| composition_b.contains(id))
- .count();
- if n_id > 0 {
- anyhow::bail!(
- "{} {} are already merged, reads with the same RG in the destination BAM.",
- self.into.display(),
- self.bam.display()
- );
- }
- if !self.into.exists() {
- anyhow::bail!("BAM file doesn't exists for: {}", self.into.display());
- }
- if !self.bam.exists() {
- anyhow::bail!("BAM file doesn't exists for: {}", self.bam.display());
- }
- let dir = self.into.parent().context(format!(
- "Failed to find parent dir of: {}",
- self.into.display()
- ))?;
- let into_file_name = self
- .into
- .file_name()
- .context(format!(
- "Failed to get file name of: {}",
- self.into.display()
- ))?
- .to_string_lossy()
- .to_string();
- // For merging, the destination BAM must be indexed.
- let from_index = dir.join(format!("{into_file_name}.bai"));
- if !from_index.exists() {
- anyhow::bail!("BAI file is required for: {}", self.into.display());
- }
- let tmp_original_file = format!("{}.bam", Uuid::new_v4());
- self.tmp_bam = dir.join(&tmp_original_file);
- self.tmp_bam_index = dir.join(format!("{tmp_original_file}.bai"));
- // Move BAM and BAI to temporary names.
- info!(
- "Moving {} to {}",
- self.into.display(),
- self.tmp_bam.display()
- );
- fs::rename(&self.into, &self.tmp_bam).context(format!(
- "Failed to move {} to {}",
- self.into.display(),
- self.tmp_bam.display()
- ))?;
- info!(
- "Moving {} to {}",
- from_index.display(),
- self.tmp_bam_index.display()
- );
- fs::rename(&from_index, &self.tmp_bam_index).context(format!(
- "Failed to move {} to {}",
- from_index.display(),
- self.tmp_bam_index.display()
- ))?;
- Ok(())
- }
- /// Build the `samtools merge` command line as a single string.
- ///
- /// Output: `self.into`
- /// Inputs (in order): `self.bam`, `self.tmp_bam`
- fn cmd(&self) -> String {
- format!(
- "{} merge -@ {} -h {} {} {} {}",
- self.bin,
- self.threads,
- self.bam.display(),
- self.into.display(),
- self.bam.display(),
- self.tmp_bam.display()
- )
- }
- /// Clean up temporary files after a successful merge.
- ///
- /// This removes:
- /// - The temporary original BAM (`tmp_bam`)
- /// - The temporary original index (`tmp_bam_index`)
- /// - The source BAM (`bam`)
- fn clean_up(&self) -> anyhow::Result<()> {
- fs::remove_file(&self.tmp_bam)?;
- fs::remove_file(&self.tmp_bam_index)?;
- fs::remove_file(&self.bam)?;
- Ok(())
- }
- }
- impl super::Runner for SamtoolsMerge {}
- impl super::SlurmRunner for SamtoolsMerge {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("samtools_merge".into()),
- cpus_per_task: Some(self.threads as u32),
- mem: Some("60G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- .to_args()
- }
- }
- #[derive(Debug)]
- pub struct SamtoolsSplit {
- /// Path to the `samtools` executable.
- pub samtools: PathBuf,
- /// Number of threads to use with `samtools split` (`-@` option).
- pub threads: u8,
- /// Input BAM file to demultiplex.
- pub input_bam: PathBuf,
- /// Output directory where split BAM files will be written.
- pub output_dir: PathBuf,
- }
- impl SamtoolsSplit {
- pub fn from_config(config: &Config, input_bam: PathBuf, output_dir: PathBuf) -> Self {
- Self {
- samtools: (&config.align.samtools_bin).into(),
- threads: config.align.samtools_split_threads,
- input_bam,
- output_dir,
- }
- }
- }
- impl super::Command for SamtoolsSplit {
- fn init(&mut self) -> anyhow::Result<()> {
- if !self.input_bam.exists() {
- anyhow::bail!("No BAM file at: {}", self.input_bam.display());
- }
- if self.output_dir.exists() {
- anyhow::bail!("Output dir already exists: {}", self.output_dir.display());
- } else {
- fs::create_dir(&self.output_dir).context(format!(
- "Failed to create dir: {}",
- self.output_dir.display()
- ))?
- }
- Ok(())
- }
- /// Build the `samtools split` command line as a single shell string.
- fn cmd(&self) -> String {
- format!(
- "{} split -@ {} -f '{}/%*_%!.bam' {}",
- self.samtools.display(),
- self.threads,
- self.output_dir.display(),
- self.input_bam.display()
- )
- }
- }
- impl super::Runner for SamtoolsSplit {}
- impl super::SlurmRunner for SamtoolsSplit {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("samtools_split".into()),
- cpus_per_task: Some(self.threads as u32),
- mem: Some("60G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- .to_args()
- }
- }
- #[derive(Debug)]
- pub struct SamtoolsSort {
- /// Path to the `samtools` executable.
- pub samtools: PathBuf,
- /// Number of threads to use with `samtools sort` (`-@` option).
- pub threads: u8,
- /// Input BAM file to sort.
- pub input_bam: PathBuf,
- /// Output sorted BAM.
- pub output_bam: PathBuf,
- /// Slurm params
- pub slurm_params: SlurmParams,
- }
- impl SamtoolsSort {
- pub fn from_config(config: &Config, input_bam: PathBuf, output_bam: PathBuf) -> Self {
- let threads = config.align.samtools_sort_threads;
- Self {
- samtools: (&config.align.samtools_bin).into(),
- threads,
- input_bam,
- output_bam,
- slurm_params: SlurmParams {
- job_name: Some("samtools_split".into()),
- cpus_per_task: Some(threads as u32),
- mem: Some("60G".into()),
- partition: Some("shortq".into()),
- gres: None,
- },
- }
- }
- }
- impl super::Command for SamtoolsSort {
- fn init(&mut self) -> anyhow::Result<()> {
- if !self.input_bam.exists() {
- anyhow::bail!("No BAM file at: {}", self.input_bam.display());
- }
- if self.output_bam.exists() {
- anyhow::bail!("Output BAM already exists: {}", self.output_bam.display());
- }
- Ok(())
- }
- /// Build the `samtools sort` command line as a single shell string.
- fn cmd(&self) -> String {
- format!(
- "{} sort -@ {} -o {} {}",
- self.samtools.display(),
- self.threads,
- self.input_bam.display(),
- self.output_bam.display(),
- )
- }
- }
- impl super::Runner for SamtoolsSort {}
- impl super::SlurmRunner for SamtoolsSort {
- fn slurm_args(&self) -> Vec<String> {
- self.slurm_params.to_args()
- }
- }
- impl SbatchRunner for SamtoolsSort {
- fn sbatch_extra_args(&self) -> Vec<String> {
- self.slurm_params.to_args()
- }
- }
- struct TestRun;
- impl super::Command for TestRun {
- fn cmd(&self) -> String {
- "echo 'hello from sbatch'".to_string()
- }
- }
- impl SbatchRunner for TestRun {
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams {
- job_name: Some("test-sbatch".into()),
- // Make sure this partition exists on your cluster
- partition: Some("gpgpuq".into()), // or your default compute partition
- // Often required: cpus + mem, maybe account
- cpus_per_task: Some(1),
- mem: Some("1G".into()),
- gres: None,
- }
- }
- fn sbatch_extra_args(&self) -> Vec<String> {
- // If your cluster requires account:
- // vec!["--account=myaccount".into()]
- Vec::new()
- }
- }
- // pub struct TestRun {}
- // impl super::Command for TestRun {
- // fn cmd(&self) -> String {
- // "echo 'test'".to_string()
- // }
- // }
- // impl super::Runner for TestRun {}
- // impl super::SlurmRunner for TestRun {}
- #[cfg(test)]
- mod tests {
- use super::*;
- use log::info;
- use crate::{
- commands::{run_many_sbatch, SlurmRunner},
- config::Config,
- helpers::test_init,
- TEST_DIR,
- };
- #[test]
- fn sbatch_test_run() -> anyhow::Result<()> {
- let mut t = TestRun;
- let out = SbatchRunner::run(&mut t)?;
- println!("{out:#?}");
- // assert!(out.stdout.contains("hello from sbatch"));
- Ok(())
- }
- #[test]
- fn samtools_index_merge() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- info!("Copying the test BAM 1.");
- fs::copy(
- format!("{}/inputs/fcA_NB02.bam", TEST_DIR.as_str()),
- format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
- )?;
- info!("Copying the test BAM 2.");
- fs::copy(
- format!("{}/inputs/fcB_NB02.bam", TEST_DIR.as_str()),
- format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
- )?;
- info!("Indexing BAM 1.");
- let mut idx = SamtoolsIndex {
- bin: config.align.samtools_bin.clone(),
- threads: config.align.samtools_view_threads,
- bam: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()),
- };
- let captured_output = SlurmRunner::run(&mut idx)?;
- assert_eq!(captured_output.stderr, String::new());
- info!("Indexing BAM 2.");
- let mut idx = SamtoolsIndex {
- bin: config.align.samtools_bin.clone(),
- threads: config.align.samtools_view_threads,
- bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()),
- };
- let captured_output = SlurmRunner::run(&mut idx)?;
- assert_eq!(captured_output.stderr, String::new());
- info!("Mergin both BAM.");
- let mut idx = SamtoolsMerge {
- bin: config.align.samtools_bin,
- threads: config.align.samtools_merge_threads as u8,
- bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(),
- into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(),
- tmp_bam: "".into(),
- tmp_bam_index: "".into(),
- };
- let captured_output = SlurmRunner::run(&mut idx)?;
- assert_eq!(captured_output.stderr, String::new());
- // TODO: add number of reads verification use bam_compo
- Ok(())
- }
- #[test]
- fn samtools_split() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- let mut cmd = SamtoolsSplit::from_config(
- &config,
- format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(),
- format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()).into(),
- );
- let captured_output = SlurmRunner::run(&mut cmd)?;
- assert_eq!(captured_output.stderr, String::new());
- Ok(())
- }
- #[test]
- fn samtools_sort_sbatch() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- let sort_1 = SamtoolsSort::from_config(
- &config,
- format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()).into(),
- format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()).into(),
- );
- let sort_2 = SamtoolsSort::from_config(
- &config,
- format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()).into(),
- format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()).into(),
- );
- let r = run_many_sbatch(vec![sort_1, sort_2])?;
- println!("{r:#?}");
- Ok(())
- }
- }
|