| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- use std::{fs, path::Path};
- use log::info;
- use rayon::prelude::*;
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
- collection::{vcf::Vcf, Initialize, ShouldRun},
- commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
- config::Config,
- helpers::{is_file_older, remove_dir_if_exists},
- io::vcf::read_vcf,
- runners::{run_wait, DockerRun, Run},
- variant::{
- variant::{Label, Variants},
- variant_collection::VariantCollection,
- },
- };
- /// A pipeline runner for executing DeepSomatic on paired tumor and normal BAM files.
- ///
- /// This struct encapsulates the configuration and metadata required to run the DeepSomatic
- /// variant caller inside a Docker container. It integrates into the pipeline ecosystem with:
- /// - Docker-based execution logic
- /// - Output filtering for PASS variants
- /// - Logging and diagnostics tracking
- /// - Annotation tagging for somatic variants
- #[derive(Debug)]
- pub struct DeepSomatic {
- pub id: String,
- pub log_dir: String,
- pub config: Config,
- }
- impl Initialize for DeepSomatic {
- /// Initializes the DeepSomatic runner by setting paths and logging.
- ///
- /// # Arguments
- /// * `id` - Sample ID used for output directory and log tagging.
- /// * `config` - Shared pipeline configuration.
- ///
- /// # Returns
- /// A ready-to-use `DeepSomatic` runner.
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- info!("Initializing DeepSomatic for {id}.");
- let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id);
- let deep_somatic = Self {
- id,
- config,
- log_dir,
- };
- if deep_somatic.config.deepsomatic_force {
- remove_dir_if_exists(&deep_somatic.config.deepsomatic_output_dir(&deep_somatic.id))?;
- }
- Ok(deep_somatic)
- }
- }
- /// Determines whether DeepSomatic should be re-run based on whether
- /// the filtered PASS VCF is older than the input BAMs.
- ///
- /// If either input BAM (normal or tumor) is newer than the PASS VCF,
- /// DeepSomatic is considered out of date and should be re-executed.
- ///
- /// # Returns
- /// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist)
- impl ShouldRun for DeepSomatic {
- fn should_run(&self) -> bool {
- let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id);
- let result = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true)
- || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
- if result {
- info!("DeepSomatic should run for id: {}.", self.id);
- }
- result
- }
- }
- impl Run for DeepSomatic {
- /// Runs DeepSomatic inside Docker and filters resulting variants with bcftools.
- ///
- /// # Workflow
- /// - Creates output directory
- /// - Executes Docker container for DeepSomatic
- /// - Applies PASS filtering using bcftools
- /// - Stores log outputs in gzipped format
- ///
- /// # Errors
- /// Returns an error if any subprocess or file operation fails.
- fn run(&mut self) -> anyhow::Result<()> {
- let output_vcf = self.config.deepsomatic_output_vcf(&self.id);
- if !Path::new(&output_vcf).exists() {
- let output_dir = self.config.deepsomatic_output_dir(&self.id);
- fs::create_dir_all(&output_dir)
- .map_err(|e| anyhow::anyhow!("Failed to create dir: {output_dir}.\n{e}"))?;
- let mut docker_run = DockerRun::new(&[
- "run",
- "-d",
- "-v",
- "/data:/data",
- "-v",
- &format!("{}:/output", output_dir),
- &format!("google/deepsomatic:{}", self.config.deepsomatic_bin_version),
- "run_deepsomatic",
- &format!("--model_type={}", self.config.deepsomatic_model_type),
- "--ref",
- &self.config.reference,
- "--reads_normal",
- &self.config.normal_bam(&self.id),
- "--reads_tumor",
- &self.config.tumoral_bam(&self.id),
- "--output_vcf",
- &format!(
- "/output/{}_{}_DeepSomatic.vcf.gz",
- self.id, self.config.tumoral_name
- ),
- "--output_gvcf",
- &format!(
- "/output/{}_{}_DeepSomatic.g.vcf.gz",
- self.id, self.config.tumoral_name
- ),
- &format!("--num_shards={}", self.config.deepsomatic_threads),
- "--logging_dir",
- &format!(
- "/output/{}_{}_DeepSomatic_logs",
- self.id, self.config.tumoral_name
- ),
- "--vcf_stats_report=true",
- "--dry_run=false",
- "--sample_name_tumor",
- &format!("{}_{}", self.id, self.config.tumoral_name),
- "--sample_name_normal",
- &format!("{}_{}", self.id, self.config.normal_name),
- ]);
- let report = run_wait(&mut docker_run)
- .map_err(|e| anyhow::anyhow!("Failed to run DeepSomatic for {}.\n{e}", self.id))?;
- report
- .save_to_file(&format!("{}/deepvariant_", self.log_dir))
- .map_err(|e| anyhow::anyhow!("Can't save DeepVariant logs.\n{e}"))?;
- }
- // Keep PASS
- let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
- if !Path::new(&vcf_passed).exists() {
- info!("Filtering DeepSomatic PASS variants for {}", self.id);
- let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default())
- .map_err(|e| {
- anyhow::anyhow!("Error while running bcftools pass for {}.\n{e}", output_vcf)
- })?;
- report
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
- .map_err(|e| {
- anyhow::anyhow!(
- "Error while saving bcftools report for {}.\n{e}",
- output_vcf
- )
- })?;
- }
- Ok(())
- }
- }
- impl CallerCat for DeepSomatic {
- /// Returns a classification tag for this caller, identifying it as somatic.
- fn caller_cat(&self) -> Annotation {
- Annotation::Callers(Caller::DeepSomatic, Sample::Somatic)
- }
- }
- impl Variants for DeepSomatic {
- /// Loads and annotates variants from the DeepSomatic VCF (PASS-filtered).
- ///
- /// # Arguments
- /// * `annotations` - The global annotation map to which variants are added.
- ///
- /// # Returns
- /// A `VariantCollection` containing variants, associated VCF path, and caller category.
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let add = vec![caller.clone()];
- let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
- info!("Loading variants from {}: {}", caller, vcf_passed);
- let variants = read_vcf(&vcf_passed)
- .map_err(|e| anyhow::anyhow!("Failed to read DeepSomatic VCF {}.\n{e}", vcf_passed))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), &add);
- });
- info!("{}, {} variants loaded.", caller, variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(vcf_passed.into())?,
- caller,
- })
- }
- }
- impl Label for DeepSomatic {
- fn label(&self) -> String {
- self.caller_cat().to_string()
- }
- }
|