use std::{fs, path::PathBuf}; use anyhow::Context; use log::info; use uuid::Uuid; use crate::{ collection::bam::bam_composition, commands::{SbatchRunner, SlurmParams}, config::Config, }; /// Wrapper around a `samtools index` invocation. /// /// This struct is meant to be used with the [`super::Command`] trait to /// build a command-line string of the form: /// /// ```text /// index -@ /// ``` #[derive(Debug)] pub struct SamtoolsIndex { /// Path to the `samtools` binary. pub bin: String, /// Number of threads passed to `-@`. pub threads: u8, /// Path to the BAM file to index. pub bam: String, } impl super::Command for SamtoolsIndex { /// Build the `samtools index` command line as a single string. fn cmd(&self) -> String { format!("{} index -@ {} {}", self.bin, self.threads, self.bam) } } impl super::Runner for SamtoolsIndex {} impl super::SlurmRunner for SamtoolsIndex { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("samtools_index".into()), cpus_per_task: Some(self.threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } /// Wrapper around a `samtools merge` invocation used to append one BAM into /// another while preserving read group (RG) uniqueness. /// /// Typical lifecycle with the [`super::Command`] trait: /// /// 1. [`init`](super::Command::init): /// - Checks that `into` and `bam` exist. /// - Checks that there is no RG overlap between `into` and `bam`. /// - Renames `into` and its index to temporary files in the same directory. /// 2. [`cmd`](super::Command::cmd): /// - Returns the `samtools merge` command line using: /// - `-h ` /// - Output: `` /// - Inputs: ` ` /// 3. [`clean_up`](super::Command::clean_up): /// - Removes the temporary BAM, its index, and the source `bam`. /// /// # Invariants /// /// - `init()` must be called successfully before `cmd()`/`clean_up()`. /// - `tmp_bam` and `tmp_bam_index` are only valid after `init()` has run. pub struct SamtoolsMerge { /// Path to the `samtools` binary. pub bin: String, /// Number of threads passed to `-@`. pub threads: u8, /// Destination BAM (final merged output path). pub into: PathBuf, /// Source BAM to merge into `into`. pub bam: PathBuf, /// Temporary location for the original `into` BAM (created in `init()`). tmp_bam: PathBuf, /// Temporary location for the original `into` BAI (created in `init()`). tmp_bam_index: PathBuf, } impl super::Command for SamtoolsMerge { /// Prepare the filesystem and validate preconditions for the merge. /// /// This method: /// - Ensures both `into` and `bam` BAM files exist. /// - Ensures there is no overlap of RG IDs between the two BAMs. /// - Ensures a BAI index exists for `into`. /// - Moves `into` and its BAI to temporary files in the same directory. fn init(&mut self) -> anyhow::Result<()> { // Collect RG IDs from destination BAM. let composition_a: Vec = bam_composition(self.into.to_string_lossy().as_ref(), 20000)? .iter() .map(|(rg, _, _)| rg.clone()) .collect(); // Collect RG IDs from source BAM. let composition_b: Vec = bam_composition(self.bam.to_string_lossy().as_ref(), 20000)? .iter() .map(|(rg, _, _)| rg.clone()) .collect(); // Check for overlapping RGs between the two BAM files. let n_id = composition_a .iter() .filter(|id| composition_b.contains(id)) .count(); if n_id > 0 { anyhow::bail!( "{} {} are already merged, reads with the same RG in the destination BAM.", self.into.display(), self.bam.display() ); } if !self.into.exists() { anyhow::bail!("BAM file doesn't exists for: {}", self.into.display()); } if !self.bam.exists() { anyhow::bail!("BAM file doesn't exists for: {}", self.bam.display()); } let dir = self.into.parent().context(format!( "Failed to find parent dir of: {}", self.into.display() ))?; let into_file_name = self .into .file_name() .context(format!( "Failed to get file name of: {}", self.into.display() ))? .to_string_lossy() .to_string(); // For merging, the destination BAM must be indexed. let from_index = dir.join(format!("{into_file_name}.bai")); if !from_index.exists() { anyhow::bail!("BAI file is required for: {}", self.into.display()); } let tmp_original_file = format!("{}.bam", Uuid::new_v4()); self.tmp_bam = dir.join(&tmp_original_file); self.tmp_bam_index = dir.join(format!("{tmp_original_file}.bai")); // Move BAM and BAI to temporary names. info!( "Moving {} to {}", self.into.display(), self.tmp_bam.display() ); fs::rename(&self.into, &self.tmp_bam).context(format!( "Failed to move {} to {}", self.into.display(), self.tmp_bam.display() ))?; info!( "Moving {} to {}", from_index.display(), self.tmp_bam_index.display() ); fs::rename(&from_index, &self.tmp_bam_index).context(format!( "Failed to move {} to {}", from_index.display(), self.tmp_bam_index.display() ))?; Ok(()) } /// Build the `samtools merge` command line as a single string. /// /// Output: `self.into` /// Inputs (in order): `self.bam`, `self.tmp_bam` fn cmd(&self) -> String { format!( "{} merge -@ {} -h {} {} {} {}", self.bin, self.threads, self.bam.display(), self.into.display(), self.bam.display(), self.tmp_bam.display() ) } /// Clean up temporary files after a successful merge. /// /// This removes: /// - The temporary original BAM (`tmp_bam`) /// - The temporary original index (`tmp_bam_index`) /// - The source BAM (`bam`) fn clean_up(&self) -> anyhow::Result<()> { fs::remove_file(&self.tmp_bam)?; fs::remove_file(&self.tmp_bam_index)?; fs::remove_file(&self.bam)?; Ok(()) } } impl super::Runner for SamtoolsMerge {} impl super::SlurmRunner for SamtoolsMerge { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("samtools_merge".into()), cpus_per_task: Some(self.threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } #[derive(Debug)] pub struct SamtoolsSplit { /// Path to the `samtools` executable. pub samtools: PathBuf, /// Number of threads to use with `samtools split` (`-@` option). pub threads: u8, /// Input BAM file to demultiplex. pub input_bam: PathBuf, /// Output directory where split BAM files will be written. pub output_dir: PathBuf, } impl SamtoolsSplit { pub fn from_config(config: &Config, input_bam: PathBuf, output_dir: PathBuf) -> Self { Self { samtools: (&config.align.samtools_bin).into(), threads: config.align.samtools_split_threads, input_bam, output_dir, } } } impl super::Command for SamtoolsSplit { fn init(&mut self) -> anyhow::Result<()> { if !self.input_bam.exists() { anyhow::bail!("No BAM file at: {}", self.input_bam.display()); } if self.output_dir.exists() { anyhow::bail!("Output dir already exists: {}", self.output_dir.display()); } else { fs::create_dir(&self.output_dir).context(format!( "Failed to create dir: {}", self.output_dir.display() ))? } Ok(()) } /// Build the `samtools split` command line as a single shell string. fn cmd(&self) -> String { format!( "{} split -@ {} -f '{}/%*_%!.bam' {}", self.samtools.display(), self.threads, self.output_dir.display(), self.input_bam.display() ) } } impl super::Runner for SamtoolsSplit {} impl super::SlurmRunner for SamtoolsSplit { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("samtools_split".into()), cpus_per_task: Some(self.threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } #[derive(Debug)] pub struct SamtoolsSort { /// Path to the `samtools` executable. pub samtools: PathBuf, /// Number of threads to use with `samtools sort` (`-@` option). pub threads: u8, /// Input BAM file to sort. pub input_bam: PathBuf, /// Output sorted BAM. pub output_bam: PathBuf, /// Slurm params pub slurm_params: SlurmParams, } impl SamtoolsSort { pub fn from_config(config: &Config, input_bam: PathBuf, output_bam: PathBuf) -> Self { let threads = config.align.samtools_sort_threads; Self { samtools: (&config.align.samtools_bin).into(), threads, input_bam, output_bam, slurm_params: SlurmParams { job_name: Some("samtools_split".into()), cpus_per_task: Some(threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, }, } } } impl super::Command for SamtoolsSort { fn init(&mut self) -> anyhow::Result<()> { if !self.input_bam.exists() { anyhow::bail!("No BAM file at: {}", self.input_bam.display()); } if self.output_bam.exists() { anyhow::bail!("Output BAM already exists: {}", self.output_bam.display()); } Ok(()) } /// Build the `samtools sort` command line as a single shell string. fn cmd(&self) -> String { format!( "{} sort -@ {} -o {} {}", self.samtools.display(), self.threads, self.input_bam.display(), self.output_bam.display(), ) } } impl super::Runner for SamtoolsSort {} impl super::SlurmRunner for SamtoolsSort { fn slurm_args(&self) -> Vec { self.slurm_params.to_args() } } impl SbatchRunner for SamtoolsSort { fn sbatch_extra_args(&self) -> Vec { self.slurm_params.to_args() } } struct TestRun; impl super::Command for TestRun { fn cmd(&self) -> String { "echo 'hello from sbatch'".to_string() } } impl SbatchRunner for TestRun { fn slurm_params(&self) -> SlurmParams { SlurmParams { job_name: Some("test-sbatch".into()), // Make sure this partition exists on your cluster partition: Some("gpgpuq".into()), // or your default compute partition // Often required: cpus + mem, maybe account cpus_per_task: Some(1), mem: Some("1G".into()), gres: None, } } fn sbatch_extra_args(&self) -> Vec { // If your cluster requires account: // vec!["--account=myaccount".into()] Vec::new() } } // pub struct TestRun {} // impl super::Command for TestRun { // fn cmd(&self) -> String { // "echo 'test'".to_string() // } // } // impl super::Runner for TestRun {} // impl super::SlurmRunner for TestRun {} #[cfg(test)] mod tests { use super::*; use log::info; use crate::{ commands::{run_many_sbatch, SlurmRunner}, config::Config, helpers::test_init, TEST_DIR, }; #[test] fn sbatch_test_run() -> anyhow::Result<()> { let mut t = TestRun; let out = SbatchRunner::run(&mut t)?; println!("{out:#?}"); // assert!(out.stdout.contains("hello from sbatch")); Ok(()) } #[test] fn samtools_index_merge() -> anyhow::Result<()> { test_init(); let config = Config::default(); info!("Copying the test BAM 1."); fs::copy( format!("{}/inputs/fcA_NB02.bam", TEST_DIR.as_str()), format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()), )?; info!("Copying the test BAM 2."); fs::copy( format!("{}/inputs/fcB_NB02.bam", TEST_DIR.as_str()), format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()), )?; info!("Indexing BAM 1."); let mut idx = SamtoolsIndex { bin: config.align.samtools_bin.clone(), threads: config.align.samtools_view_threads, bam: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()), }; let captured_output = SlurmRunner::run(&mut idx)?; assert_eq!(captured_output.stderr, String::new()); info!("Indexing BAM 2."); let mut idx = SamtoolsIndex { bin: config.align.samtools_bin.clone(), threads: config.align.samtools_view_threads, bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()), }; let captured_output = SlurmRunner::run(&mut idx)?; assert_eq!(captured_output.stderr, String::new()); info!("Mergin both BAM."); let mut idx = SamtoolsMerge { bin: config.align.samtools_bin, threads: config.align.samtools_merge_threads as u8, bam: format!("{}/outputs/to_be_merged_2.bam", TEST_DIR.as_str()).into(), into: format!("{}/outputs/to_be_merged.bam", TEST_DIR.as_str()).into(), tmp_bam: "".into(), tmp_bam_index: "".into(), }; let captured_output = SlurmRunner::run(&mut idx)?; assert_eq!(captured_output.stderr, String::new()); // TODO: add number of reads verification use bam_compo Ok(()) } #[test] fn samtools_split() -> anyhow::Result<()> { test_init(); let config = Config::default(); let mut cmd = SamtoolsSplit::from_config( &config, format!("{}/outputs/10_hs1_sorted.bam", TEST_DIR.as_str()).into(), format!("{}/outputs/by_rg_splitted", TEST_DIR.as_str()).into(), ); let captured_output = SlurmRunner::run(&mut cmd)?; assert_eq!(captured_output.stderr, String::new()); Ok(()) } #[test] fn samtools_sort_sbatch() -> anyhow::Result<()> { test_init(); let config = Config::default(); let sort_1 = SamtoolsSort::from_config( &config, format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode02.bam", TEST_DIR.as_str()).into(), format!("{}/outputs/01_sorted.bam", TEST_DIR.as_str()).into(), ); let sort_2 = SamtoolsSort::from_config( &config, format!("{}/outputs/by_rg_splitted/10_hs1_sorted_22582b29-2858-43a6-86ee-47ed858dbcde_dna_r10.4.1_e8.2_400bps_sup@v5.2.0_SQK-NBD114-24_barcode04.bam", TEST_DIR.as_str()).into(), format!("{}/outputs/02_sorted.bam", TEST_DIR.as_str()).into(), ); let r = run_many_sbatch(vec![sort_1, sort_2])?; println!("{r:#?}"); Ok(()) } }