//! # DeepSomatic Somatic Variant Calling Pipeline //! //! This module provides a pipeline runner for [DeepSomatic](https://github.com/google/deepsomatic), //! a deep learning-based somatic variant caller for paired tumor-normal samples. //! //! ## Overview //! //! DeepSomatic performs somatic variant calling using: //! //! - CNN-based variant classification (derived from DeepVariant) //! - Paired tumor-normal analysis //! - Containerized execution via Singularity //! //! ## Key Features //! //! - **Deep learning-based** - CNN models specifically trained for somatic variant detection //! - **Tumor-normal paired** - Leverages matched control for improved specificity //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina //! - **Parallel execution** - Genome chunking for HPC scalability //! //! ## Requirements //! //! Before running DeepSomatic, ensure: //! - Tumor and normal BAMs are indexed (`.bai` files present) //! - Reference genome is accessible //! - Singularity/Docker image is available //! - Model type matches sequencing platform (`config.deepsomatic_model_type`) //! //! ## Execution Modes //! //! Execution mode is automatically selected via `config.slurm_runner`: //! //! - **Local** — Single-node execution //! - **Slurm** — HPC job submission //! //! Both modes support chunked parallel execution via [`run_deepsomatic_chunked_with_merge`]. //! //! ## Output Files //! //! PASS-filtered somatic variants: //! ```text //! {result_dir}/{id}/deepsomatic/{id}_{tumoral_name}_DeepSomatic.pass.vcf.gz //! ``` //! //! ## Usage //! //! ### Chunked Parallel Execution (Recommended for WGS) //! //! ```ignore //! use pandora_lib_promethion::callers::deep_somatic::run_deepsomatic_chunked_with_merge; //! use pandora_lib_promethion::config::Config; //! //! let config = Config::default(); //! // Run DeepSomatic in 30 parallel chunks //! let outputs = run_deepsomatic_chunked_with_merge("sample_001", &config, 30)?; //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ### Single-Run Execution //! //! ```ignore //! use pandora_lib_promethion::callers::deep_somatic::DeepSomatic; //! use pandora_lib_promethion::config::Config; //! use pandora_lib_promethion::pipes::Initialize; //! use pandora_lib_promethion::runners::Run; //! //! let config = Config::default(); //! let mut caller = DeepSomatic::initialize("sample_001", &config)?; //! //! if caller.should_run() { //! caller.run()?; //! } //! //! // Load somatic variants //! let variants = caller.variants(&annotations)?; //! println!("Found {} somatic variants", variants.variants.len()); //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ## References //! //! - [DeepSomatic GitHub repository](https://github.com/google/deepsomatic) use std::{ fmt, fs, path::{Path, PathBuf}, process::{Command as ProcessCommand, Stdio}, }; use anyhow::Context; use log::{debug, info}; use rayon::prelude::*; use regex::Regex; use rust_htslib::bam::{self, Read}; use uuid::Uuid; use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{ bcftools::{BcftoolsConcat, BcftoolsIndex, BcftoolsKeepPass}, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, }, config::Config, helpers::{ get_genome_sizes_in_header_order, is_file_older, remove_dir_if_exists, singularity_bind_flags, split_ordered_genome_into_n_regions_exact, }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{ variant_collection::VariantCollection, vcf_variant::{Label, Variants}, }, }; /// DeepSomatic paired tumor-normal somatic variant caller. /// /// Executes DeepSomatic for somatic SNV and indel detection using matched /// tumor and normal BAM files with automatic post-processing (PASS filtering). /// /// # Fields /// /// - `id` - Sample identifier (e.g., "34528") /// - `regions` - Space-separated list of genomic regions to process (e.g., "chr1 chr2 chr3") /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepsomatic") /// - `config` - Global pipeline configuration /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N) /// /// # Execution Modes /// /// - **Local** - Direct execution /// - **Slurm** - Single job submission /// - **Chunked** - Parallel genome-wide execution via [`run_deepsomatic_chunked_with_merge`] #[derive(Debug, Clone)] pub struct DeepSomatic { /// Sample identifier pub id: String, /// Space-separated list of genomic regions to process pub regions: String, /// Directory for log file storage pub log_dir: String, /// Global pipeline configuration pub config: Config, /// Optional part index for chunked parallel runs (1-indexed) pub part_index: Option, } impl fmt::Display for DeepSomatic { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "🧬 DeepSomatic")?; writeln!(f, " Case ID : {}", self.id)?; writeln!(f, " Regions : {}", self.regions)?; writeln!(f, " Log dir : {}", self.log_dir)?; writeln!( f, " Part : {}", self.part_index .map_or("full".into(), |n| format!("part{n}")) ) } } impl Initialize for DeepSomatic { fn initialize(id: &str, config: &Config) -> anyhow::Result { let id = id.to_string(); info!("Initializing DeepSomatic for {id}."); let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id); let regions = (1..=22) .map(|i| format!("chr{i}")) .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from)) .collect::>(); let deep_somatic = Self { id, regions: regions.join(" "), log_dir, config: config.clone(), part_index: None, }; if deep_somatic.config.deepsomatic_force { remove_dir_if_exists(&deep_somatic.config.deepsomatic_output_dir(&deep_somatic.id))?; } Ok(deep_somatic) } } impl ShouldRun for DeepSomatic { fn should_run(&self) -> bool { let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id); let normal_older = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true); let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true); let result = normal_older || tumor_older; if result { info!("DeepSomatic should run for id: {}.", self.id); } result } } impl JobCommand for DeepSomatic { fn init(&mut self) -> anyhow::Result<()> { let output_dir = self.part_output_dir(); fs::create_dir_all(&output_dir) .with_context(|| format!("Failed to create dir: {output_dir}"))?; fs::create_dir_all(&self.log_dir) .with_context(|| format!("Failed to create dir: {}", self.log_dir))?; Ok(()) } fn cmd(&self) -> String { let output_dir = self.part_output_dir(); let output_vcf_path = self.output_vcf_path(); let sample_name_tumor = format!("{}_{}", self.id, self.config.tumoral_name); let sample_name_normal = format!("{}_{}", self.id, self.config.normal_name); let log_subdir = match self.part_index { Some(idx) => format!("{}_DeepSomatic_part{idx}_logs", sample_name_tumor), None => format!("{}_DeepSomatic_logs", sample_name_tumor), }; let bind_flags = singularity_bind_flags([ &self.config.normal_bam(&self.id), &self.config.tumoral_bam(&self.id), &self.config.reference, &output_dir, &self.log_dir, &self.config.tmp_dir, ]); let output_vcf = format!( "/output/{}", Path::new(&output_vcf_path) .file_name() .unwrap() .to_string_lossy() ); format!( "{singularity_bin} exec --cleanenv \ {binds} \ --bind {output_dir}:/output \ {image} \ /opt/deepvariant/bin/deepsomatic/run_deepsomatic \ --model_type={model_type} \ --ref={reference} \ --reads_normal={normal_bam} \ --reads_tumor={tumor_bam} \ --regions=\"{regions}\" \ --output_vcf={output_vcf} \ --num_shards={threads} \ --logging_dir=/output/{log_subdir} \ --vcf_stats_report=true \ --dry_run=false \ --sample_name_tumor={sample_name_tumor} \ --sample_name_normal={sample_name_normal}", singularity_bin = self.config.singularity_bin, binds = bind_flags, output_dir = output_dir, image = self.config.deepsomatic_image, model_type = self.config.deepsomatic_model_type, reference = self.config.reference, normal_bam = self.config.normal_bam(&self.id), tumor_bam = self.config.tumoral_bam(&self.id), regions = self.regions, threads = self.config.deepsomatic_threads, ) } } impl LocalRunner for DeepSomatic {} impl LocalBatchRunner for DeepSomatic {} impl SlurmRunner for DeepSomatic { fn slurm_args(&self) -> Vec { let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default(); SlurmParams { job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)), cpus_per_task: Some(self.config.deepsomatic_threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } impl SbatchRunner for DeepSomatic { fn slurm_params(&self) -> SlurmParams { let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default(); SlurmParams { job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)), cpus_per_task: Some(self.config.deepsomatic_threads as u32), mem: Some("60G".into()), partition: Some("shortq".into()), gres: None, } } } impl DeepSomatic { fn part_output_dir(&self) -> String { let base_dir = self.config.deepsomatic_output_dir(&self.id); match self.part_index { Some(idx) => format!("{base_dir}/part{idx}"), None => base_dir, } } fn output_vcf_path(&self) -> String { let output_dir = self.part_output_dir(); format!( "{output_dir}/{}_{}_DeepSomatic.vcf.gz", self.id, self.config.tumoral_name ) } fn passed_vcf_path(&self) -> String { match self.part_index { Some(_) => self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz"), None => self.config.deepsomatic_passed_vcf(&self.id), } } fn filter_pass(&self) -> anyhow::Result<()> { let output_vcf = self.output_vcf_path(); let vcf_passed = self.passed_vcf_path(); if Path::new(&vcf_passed).exists() { debug!("PASS VCF already exists: {vcf_passed}"); return Ok(()); } info!( "Filtering DeepSomatic PASS variants for {} (part: {:?})", self.id, self.part_index ); let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, &vcf_passed); let report = run!(&self.config, &mut cmd) .with_context(|| format!("Failed to filter PASS for {}", self.id))?; report .save_to_file(format!("{}/bcftools_pass_", self.log_dir)) .context("Can't save bcftools PASS logs")?; let mut cmd = BcftoolsIndex::from_config(&self.config, vcf_passed); let report = run!(&self.config, &mut cmd) .with_context(|| format!("Failed to index vcf PASS for {}", self.id))?; report .save_to_file(format!("{}/bcftools_index_", self.log_dir)) .context("Failed to save bcftools index logs")?; Ok(()) } } impl Run for DeepSomatic { fn run(&mut self) -> anyhow::Result<()> { let lock_dir = format!("{}/locks", self.config.result_dir); let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepsomatic") .with_context(|| format!("Cannot start DeepSomatic chunked for {}", self.id))?; if !self.should_run() { info!("DeepSomatic is up-to-data."); return Ok(()); } if self.config.slurm_runner { run_deepsomatic_chunked(&self.id, &self.config, 30) } else { run!(&self.config, self)?; self.filter_pass() } } } impl CallerCat for DeepSomatic { fn caller_cat(&self) -> Annotation { Annotation::Callers(Caller::DeepSomatic, Sample::Somatic) } } impl Variants for DeepSomatic { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id); info!("Loading variants from {caller}: {vcf_passed}"); let variants = read_vcf(&vcf_passed) .with_context(|| format!("Failed to read DeepSomatic VCF {vcf_passed}"))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{caller}: {} variants loaded", variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(vcf_passed.into())?, caller, }) } } impl Label for DeepSomatic { fn label(&self) -> String { self.caller_cat().to_string() } } impl Version for DeepSomatic { fn version(config: &Config) -> anyhow::Result { let out = ProcessCommand::new("bash") .arg("-c") .arg(format!( "{} exec {} /opt/deepvariant/bin/deepsomatic/run_deepsomatic --version", config.singularity_bin, config.deepsomatic_image )) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .context("Failed to spawn Singularity process")?; if !out.status.success() { let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); anyhow::bail!( "Singularity exec failed with status {}: {}", out.status, combined ); } let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); parse_deepsomatic_version(&combined) } fn version_slurm(config: &Config) -> anyhow::Result { struct DeepSomaticVersionJob<'a> { config: &'a Config, } impl JobCommand for DeepSomaticVersionJob<'_> { fn cmd(&self) -> String { format!( "{} exec {} /opt/deepvariant/bin/deepsomatic/run_deepsomatic --version", self.config.singularity_bin, self.config.deepsomatic_image ) } } impl SlurmRunner for DeepSomaticVersionJob<'_> { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("deepsomatic_version".into()), partition: Some("shortq".into()), cpus_per_task: Some(1), mem: Some("10G".into()), gres: None, } .to_args() } } let mut job = DeepSomaticVersionJob { config }; let out = crate::commands::SlurmRunner::exec(&mut job) .context("Failed to run DeepSomatic --version via Slurm")?; let mut combined = out.stdout.clone(); if let Some(epilog) = &out.slurm_epilog { combined.push_str(&epilog.to_string()); } combined.push_str(&out.stderr); parse_deepsomatic_version(&combined) } } fn parse_deepsomatic_version(output: &str) -> anyhow::Result { let deepsomatic_re = Regex::new(r"(?mi)\bdeepsomatic(?:\s+version)?\s*[:=]?\s*([^\s]+)") .expect("DeepSomatic version regex is valid"); if let Some(caps) = deepsomatic_re.captures(output) { return Ok(caps .get(1) .expect("Regex has capture group 1") .as_str() .to_string()); } // Fallback for containers that only expose the embedded DeepVariant version. let deepvariant_re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("DeepVariant regex is valid"); let caps = deepvariant_re .captures(output) .context("Could not parse DeepSomatic version from output")?; Ok(caps .get(1) .expect("Regex has capture group 1") .as_str() .to_string()) } fn merge_deepsomatic_parts(base: &DeepSomatic, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let mut ds = base.clone(); ds.part_index = Some(i); let part_pass = ds.passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing DeepSomatic part {i} PASS VCF: {part_pass}" ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base.config.deepsomatic_passed_vcf(&base.id); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} DeepSomatic part VCFs into {}", n_parts, final_passed_vcf ); let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp); run!(&base.config, &mut concat) .context("Failed to run bcftools concat for DeepSomatic parts")?; fs::rename(&final_tmp, &final_passed_vcf) .context("Failed to rename merged DeepSomatic PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged DeepVariant PASS VCF CSI")?; info!( "Successfully merged {} DeepSomatic parts into {}", n_parts, final_passed_vcf ); Ok(()) } /// Runs DeepSomatic in parallel chunks, then merges results. /// /// Splits the genome into N equal-sized regions, runs DeepSomatic on each region /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants, /// and concatenates the final VCF. /// /// # Arguments /// /// * `id` - Sample identifier /// * `config` - Global pipeline configuration /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome) /// /// # Returns /// /// `Ok(())` on success, or an error if any step fails. /// /// # Errors /// /// Returns an error if: /// - `n_parts` is 0 /// - Tumor or normal BAM file cannot be opened /// - BAM header is malformed /// - DeepSomatic execution fails on any part /// - PASS filtering fails /// - VCF merging fails /// - Output directory cannot be created /// /// # Example /// /// ```ignore /// let config = Config::default(); /// run_deepsomatic_chunked("sample_001", &config, 30)?; /// ``` pub fn run_deepsomatic_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> { anyhow::ensure!(n_parts > 0, "n_parts must be > 0"); // Lock at the chunked level (caller may have already locked, but this is idempotent protection) // let lock_dir = format!("{}/locks", config.result_dir); // let _lock = SampleLock::acquire(&lock_dir, id, "deepsomatic") // .with_context(|| format!("Cannot start DeepSomatic chunked for {}", id))?; let base = DeepSomatic::initialize(id, config)?; // if !base.should_run() { // debug!("DeepSomatic PASS VCF already up-to-date for {id}, skipping."); // return Ok(()); // } // Get genome sizes from tumor BAM header let tumor_bam = config.tumoral_bam(id); let reader = bam::Reader::from_path(&tumor_bam) .with_context(|| format!("Failed to open BAM: {tumor_bam}"))?; let genome_sizes = get_genome_sizes_in_header_order(reader.header())?; let region_chunks = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts) .into_iter() .map(|v| v.join(" ")) .collect::>(); let actual_n_parts = region_chunks.len(); info!( "Running DeepSomatic in {} parallel parts for {}", actual_n_parts, id ); let mut jobs = Vec::with_capacity(actual_n_parts); for (i, regions) in region_chunks.into_iter().enumerate() { let mut job = base.clone(); job.regions = regions; job.part_index = Some(i + 1); job.log_dir = format!("{}/part{}", base.log_dir, i + 1); info!("Planned DeepSomatic job:\n{job}"); jobs.push(job); } // Run DeepSomatic jobs let outputs = run_many!(config, jobs.clone())?; for output in outputs.iter() { output.save_to_file(format!("{}/deepsomatic_", base.log_dir))?; } // Filter PASS variants for each part in parallel info!( "Filtering PASS variants for all {} parts in parallel", actual_n_parts ); let filter_jobs: Vec<_> = jobs .iter() .map(|job| { BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path()) }) .collect(); run_many!(config, filter_jobs)?; // Merge PASS VCFs merge_deepsomatic_parts(&base, actual_n_parts)?; info!( "DeepSomatic completed for {}: {} parts merged", id, actual_n_parts ); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::helpers::test_init; #[test] fn deepsomatic_version() -> anyhow::Result<()> { test_init(); let vl = DeepSomatic::version(&Config::default())?; info!("DeepSomatic local version: {vl}"); let vs = DeepSomatic::version_slurm(&Config::default())?; info!("DeepSomatic slurm version: {vs}"); assert_eq!(vl, vs); Ok(()) } #[test] fn deepsomatic_run() -> anyhow::Result<()> { test_init(); let config = Config::default(); run_deepsomatic_chunked("DUMCO", &config, 50) } }