use std::{fs, path::Path}; use log::info; use rayon::prelude::*; use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::{vcf::Vcf, Initialize, ShouldRun}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{ variant::{Label, Variants}, variant_collection::VariantCollection, }, }; /// A pipeline runner for executing DeepSomatic on paired tumor and normal BAM files. /// /// This struct encapsulates the configuration and metadata required to run the DeepSomatic /// variant caller inside a Docker container. It integrates into the pipeline ecosystem with: /// - Docker-based execution logic /// - Output filtering for PASS variants /// - Logging and diagnostics tracking /// - Annotation tagging for somatic variants #[derive(Debug)] pub struct DeepSomatic { pub id: String, pub log_dir: String, pub config: Config, } impl Initialize for DeepSomatic { /// Initializes the DeepSomatic runner by setting paths and logging. /// /// # Arguments /// * `id` - Sample ID used for output directory and log tagging. /// * `config` - Shared pipeline configuration. /// /// # Returns /// A ready-to-use `DeepSomatic` runner. fn initialize(id: &str, config: Config) -> anyhow::Result { let id = id.to_string(); info!("Initializing DeepSomatic for {id}."); let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id); let deep_somatic = Self { id, config, log_dir, }; if deep_somatic.config.deepsomatic_force { remove_dir_if_exists(&deep_somatic.config.deepsomatic_output_dir(&deep_somatic.id))?; } Ok(deep_somatic) } } /// Determines whether DeepSomatic should be re-run based on whether /// the filtered PASS VCF is older than the input BAMs. /// /// If either input BAM (normal or tumor) is newer than the PASS VCF, /// DeepSomatic is considered out of date and should be re-executed. /// /// # Returns /// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist) impl ShouldRun for DeepSomatic { fn should_run(&self) -> bool { let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id); let result = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true) || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true); if result { info!("DeepSomatic should run for id: {}.", self.id); } result } } impl Run for DeepSomatic { /// Runs DeepSomatic inside Docker and filters resulting variants with bcftools. /// /// # Workflow /// - Creates output directory /// - Executes Docker container for DeepSomatic /// - Applies PASS filtering using bcftools /// - Stores log outputs in gzipped format /// /// # Errors /// Returns an error if any subprocess or file operation fails. fn run(&mut self) -> anyhow::Result<()> { let output_vcf = self.config.deepsomatic_output_vcf(&self.id); if !Path::new(&output_vcf).exists() { let output_dir = self.config.deepsomatic_output_dir(&self.id); fs::create_dir_all(&output_dir) .map_err(|e| anyhow::anyhow!("Failed to create dir: {output_dir}.\n{e}"))?; let mut docker_run = DockerRun::new(&[ "run", "-d", "-v", "/data:/data", "-v", &format!("{}:/output", output_dir), &format!("google/deepsomatic:{}", self.config.deepsomatic_bin_version), "run_deepsomatic", &format!("--model_type={}", self.config.deepsomatic_model_type), "--ref", &self.config.reference, "--reads_normal", &self.config.normal_bam(&self.id), "--reads_tumor", &self.config.tumoral_bam(&self.id), "--output_vcf", &format!( "/output/{}_{}_DeepSomatic.vcf.gz", self.id, self.config.tumoral_name ), "--output_gvcf", &format!( "/output/{}_{}_DeepSomatic.g.vcf.gz", self.id, self.config.tumoral_name ), &format!("--num_shards={}", self.config.deepsomatic_threads), "--logging_dir", &format!( "/output/{}_{}_DeepSomatic_logs", self.id, self.config.tumoral_name ), "--vcf_stats_report=true", "--dry_run=false", "--sample_name_tumor", &format!("{}_{}", self.id, self.config.tumoral_name), "--sample_name_normal", &format!("{}_{}", self.id, self.config.normal_name), ]); let report = run_wait(&mut docker_run) .map_err(|e| anyhow::anyhow!("Failed to run DeepSomatic for {}.\n{e}", self.id))?; report .save_to_file(&format!("{}/deepvariant_", self.log_dir)) .map_err(|e| anyhow::anyhow!("Can't save DeepVariant logs.\n{e}"))?; } // Keep PASS let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id); if !Path::new(&vcf_passed).exists() { info!("Filtering DeepSomatic PASS variants for {}", self.id); let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default()) .map_err(|e| { anyhow::anyhow!("Error while running bcftools pass for {}.\n{e}", output_vcf) })?; report .save_to_file(&format!("{}/bcftools_pass_", self.log_dir)) .map_err(|e| { anyhow::anyhow!( "Error while saving bcftools report for {}.\n{e}", output_vcf ) })?; } Ok(()) } } impl CallerCat for DeepSomatic { /// Returns a classification tag for this caller, identifying it as somatic. fn caller_cat(&self) -> Annotation { Annotation::Callers(Caller::DeepSomatic, Sample::Somatic) } } impl Variants for DeepSomatic { /// Loads and annotates variants from the DeepSomatic VCF (PASS-filtered). /// /// # Arguments /// * `annotations` - The global annotation map to which variants are added. /// /// # Returns /// A `VariantCollection` containing variants, associated VCF path, and caller category. fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let add = vec![caller.clone()]; let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id); info!("Loading variants from {}: {}", caller, vcf_passed); let variants = read_vcf(&vcf_passed) .map_err(|e| anyhow::anyhow!("Failed to read DeepSomatic VCF {}.\n{e}", vcf_passed))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), &add); }); info!("{}, {} variants loaded.", caller, variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(vcf_passed.into())?, caller, }) } } impl Label for DeepSomatic { fn label(&self) -> String { self.caller_cat().to_string() } }