use anyhow::Context; use log::{debug, info}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{fs, path::Path}; use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::{vcf::Vcf, InitializeSolo, ShouldRun}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{ variant::{Label, Variants}, variant_collection::VariantCollection, }, }; /// A pipeline runner for executing DeepVariant on a single sample and a specific time point (e.g., normal or tumor). /// /// This struct holds all necessary metadata, configuration, and output paths to perform variant calling /// on a given BAM file using DeepVariant in a Dockerized environment. It also supports: /// - Conditional execution via `should_run` /// - Cleanup and preparation via `initialize` /// - Log tracking /// - Integration into annotation and variant aggregation workflows #[derive(Debug, Clone)] pub struct DeepVariant { pub id: String, pub time_point: String, pub log_dir: String, pub config: Config, } impl InitializeSolo for DeepVariant { /// Initializes the DeepVariant runner for a specific sample and time point. /// If `force` is true or the output is outdated, the previous output directory is removed. /// Initializes the DeepVariant runner for a given ID and time point. /// /// # Arguments /// * `id` - Sample ID /// * `time_point` - Either the normal or tumor label for this run /// * `config` - Global configuration object /// /// # Returns /// A ready-to-run `DeepVariant` instance with proper path resolution. /// /// # Errors /// Will return an error if directory cleanup fails when forced or outdated. fn initialize(id: &str, time_point: &str, config: Config) -> anyhow::Result { let id = id.to_string(); let time_point = time_point.to_string(); info!("Initializing DeepVariant for {id} {time_point}."); let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id); let deepvariant = Self { id, time_point, log_dir, config, }; if deepvariant.config.deepvariant_force || deepvariant.should_run() { remove_dir_if_exists( &deepvariant .config .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point), )?; } Ok(deepvariant) } } impl ShouldRun for DeepVariant { /// Returns true if the DeepVariant PASS VCF doesn't exist or is outdated vs input BAM. /// Determines whether DeepVariant should be rerun by comparing input BAM /// modification time to the output VCF. /// /// # Returns /// `true` if the output is outdated or missing. fn should_run(&self) -> bool { let passed_vcf = self .config .deepvariant_solo_passed_vcf(&self.id, &self.time_point); let bam = self.config.solo_bam(&self.id, &self.time_point); let result = is_file_older(&passed_vcf, &bam).unwrap_or(true); if result { info!( "DeepVariant should run for: {} {}.", self.id, self.time_point ); } result } } impl Run for DeepVariant { /// Executes DeepVariant inside Docker and filters PASS variants via bcftools. /// Runs DeepVariant inside Docker and filters variants using bcftools. /// /// This function: /// - Creates necessary output directories /// - Executes DeepVariant through Docker if needed /// - Filters for PASS variants /// - Saves logs and handles caching logic via file existence /// /// # Errors /// Returns an error if any external command or file operation fails. fn run(&mut self) -> anyhow::Result<()> { let bam = self.config.solo_bam(&self.id, &self.time_point); let output_vcf = self .config .deepvariant_solo_output_vcf(&self.id, &self.time_point); // Run Docker command if output VCF doesn't exist if !Path::new(&output_vcf).exists() { let output_dir = self .config .deepvariant_output_dir(&self.id, &self.time_point); fs::create_dir_all(&output_dir) .context(format!("Failed to create dir: {output_dir}"))?; let mut docker_run = DockerRun::new(&[ "run", "-d", "-v", "/data:/data", "-v", &format!("{}:/output", output_dir), &format!("google/deepvariant:{}", self.config.deepvariant_bin_version), "/opt/deepvariant/bin/run_deepvariant", &format!("--model_type={}", self.config.deepvariant_model_type), "--ref", &self.config.reference, "--reads", &bam, "--output_vcf", &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point), "--output_gvcf", &format!( "/output/{}_{}_DeepVariant.g.vcf.gz", self.id, self.time_point ), &format!("--num_shards={}", self.config.deepvariant_threads), "--logging_dir", "--vcf_stats_report=true", &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point), "--dry_run=false", "--sample_name", &format!("{}_{}", self.id, self.time_point), ]); let report = run_wait(&mut docker_run).context(format!( "Erreur while running DeepVariant for {} {}", self.id, self.time_point ))?; report .save_to_file(&format!("{}/deepvariant_", self.log_dir)) .context("Can't save DeepVariant logs")?; } else { debug!( "DeepVariant output already exists for {} {}, skipping execution.", self.id, self.time_point ); } let vcf_passed = self .config .deepvariant_solo_passed_vcf(&self.id, &self.time_point); if !Path::new(&vcf_passed).exists() { info!( "Filtering PASS variants for {} {}", self.id, self.time_point ); let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap(); report .save_to_file(&format!("{}/bcftools_pass_", self.log_dir)) .unwrap(); } Ok(()) } } impl CallerCat for DeepVariant { /// Identifies the caller and whether it's for a normal or tumor sample. /// Determines the category for variant annotation (normal or tumor). /// /// # Returns /// A variant caller classification used for downstream tagging. fn caller_cat(&self) -> Annotation { let Config { normal_name, tumoral_name, .. } = &self.config; if *normal_name == self.time_point { Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit) } else if *tumoral_name == self.time_point { Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor) } else { panic!("Error in time_point name: {}", self.time_point); } } } impl Variants for DeepVariant { /// Loads variants from DeepVariant VCF and registers annotations. /// Loads and annotates variants from the DeepVariant PASS-filtered VCF. /// /// # Arguments /// * `annotations` - A mutable registry to tag loaded variants /// /// # Returns /// A `VariantCollection` with variant data, source VCF, and caller identity. fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let vcf_passed = self .config .deepvariant_solo_passed_vcf(&self.id, &self.time_point); info!("Loading variants from {}: {}", caller, vcf_passed); let variants = read_vcf(&vcf_passed) .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), &[caller.clone()]); }); info!("{}, {} variants loaded.", caller, variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| { anyhow::anyhow!( "Error while creating a VCF representation for {}.\n{e}", vcf_passed ) })?, caller, }) } } impl Label for DeepVariant { fn label(&self) -> String { self.caller_cat().to_string() } }