| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- use anyhow::Context;
- use log::{debug, info};
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use std::{fs, path::Path};
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
- collection::{vcf::Vcf, InitializeSolo, ShouldRun},
- commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
- config::Config,
- helpers::{is_file_older, remove_dir_if_exists},
- io::vcf::read_vcf,
- runners::{run_wait, DockerRun, Run},
- variant::{
- variant::{Label, Variants},
- variant_collection::VariantCollection,
- },
- };
- /// A pipeline runner for executing DeepVariant on a single sample and a specific time point (e.g., normal or tumor).
- ///
- /// This struct holds all necessary metadata, configuration, and output paths to perform variant calling
- /// on a given BAM file using DeepVariant in a Dockerized environment. It also supports:
- /// - Conditional execution via `should_run`
- /// - Cleanup and preparation via `initialize`
- /// - Log tracking
- /// - Integration into annotation and variant aggregation workflows
- #[derive(Debug, Clone)]
- pub struct DeepVariant {
- pub id: String,
- pub time_point: String,
- pub log_dir: String,
- pub config: Config,
- }
- impl InitializeSolo for DeepVariant {
- /// Initializes the DeepVariant runner for a specific sample and time point.
- /// If `force` is true or the output is outdated, the previous output directory is removed.
- /// Initializes the DeepVariant runner for a given ID and time point.
- ///
- /// # Arguments
- /// * `id` - Sample ID
- /// * `time_point` - Either the normal or tumor label for this run
- /// * `config` - Global configuration object
- ///
- /// # Returns
- /// A ready-to-run `DeepVariant` instance with proper path resolution.
- ///
- /// # Errors
- /// Will return an error if directory cleanup fails when forced or outdated.
- fn initialize(id: &str, time_point: &str, config: Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- let time_point = time_point.to_string();
- info!("Initializing DeepVariant for {id} {time_point}.");
- let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
- let deepvariant = Self {
- id,
- time_point,
- log_dir,
- config,
- };
- if deepvariant.config.deepvariant_force || deepvariant.should_run() {
- remove_dir_if_exists(
- &deepvariant
- .config
- .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point),
- )?;
- }
- Ok(deepvariant)
- }
- }
- impl ShouldRun for DeepVariant {
- /// Returns true if the DeepVariant PASS VCF doesn't exist or is outdated vs input BAM.
- /// Determines whether DeepVariant should be rerun by comparing input BAM
- /// modification time to the output VCF.
- ///
- /// # Returns
- /// `true` if the output is outdated or missing.
- fn should_run(&self) -> bool {
- let passed_vcf = self
- .config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
- let bam = self.config.solo_bam(&self.id, &self.time_point);
- let result = is_file_older(&passed_vcf, &bam).unwrap_or(true);
- if result {
- info!(
- "DeepVariant should run for: {} {}.",
- self.id, self.time_point
- );
- }
- result
- }
- }
- impl Run for DeepVariant {
- /// Executes DeepVariant inside Docker and filters PASS variants via bcftools.
- /// Runs DeepVariant inside Docker and filters variants using bcftools.
- ///
- /// This function:
- /// - Creates necessary output directories
- /// - Executes DeepVariant through Docker if needed
- /// - Filters for PASS variants
- /// - Saves logs and handles caching logic via file existence
- ///
- /// # Errors
- /// Returns an error if any external command or file operation fails.
- fn run(&mut self) -> anyhow::Result<()> {
- let bam = self.config.solo_bam(&self.id, &self.time_point);
- let output_vcf = self
- .config
- .deepvariant_solo_output_vcf(&self.id, &self.time_point);
- // Run Docker command if output VCF doesn't exist
- if !Path::new(&output_vcf).exists() {
- let output_dir = self
- .config
- .deepvariant_output_dir(&self.id, &self.time_point);
- fs::create_dir_all(&output_dir)
- .context(format!("Failed to create dir: {output_dir}"))?;
- let mut docker_run = DockerRun::new(&[
- "run",
- "-d",
- "-v",
- "/data:/data",
- "-v",
- &format!("{}:/output", output_dir),
- &format!("google/deepvariant:{}", self.config.deepvariant_bin_version),
- "/opt/deepvariant/bin/run_deepvariant",
- &format!("--model_type={}", self.config.deepvariant_model_type),
- "--ref",
- &self.config.reference,
- "--reads",
- &bam,
- "--output_vcf",
- &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point),
- "--output_gvcf",
- &format!(
- "/output/{}_{}_DeepVariant.g.vcf.gz",
- self.id, self.time_point
- ),
- &format!("--num_shards={}", self.config.deepvariant_threads),
- "--logging_dir",
- "--vcf_stats_report=true",
- &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
- "--dry_run=false",
- "--sample_name",
- &format!("{}_{}", self.id, self.time_point),
- ]);
- let report = run_wait(&mut docker_run).context(format!(
- "Erreur while running DeepVariant for {} {}",
- self.id, self.time_point
- ))?;
- report
- .save_to_file(&format!("{}/deepvariant_", self.log_dir))
- .context("Can't save DeepVariant logs")?;
- } else {
- debug!(
- "DeepVariant output already exists for {} {}, skipping execution.",
- self.id, self.time_point
- );
- }
- let vcf_passed = self
- .config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
- if !Path::new(&vcf_passed).exists() {
- info!(
- "Filtering PASS variants for {} {}",
- self.id, self.time_point
- );
- let report =
- bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
- report
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
- .unwrap();
- }
- Ok(())
- }
- }
- impl CallerCat for DeepVariant {
- /// Identifies the caller and whether it's for a normal or tumor sample.
- /// Determines the category for variant annotation (normal or tumor).
- ///
- /// # Returns
- /// A variant caller classification used for downstream tagging.
- fn caller_cat(&self) -> Annotation {
- let Config {
- normal_name,
- tumoral_name,
- ..
- } = &self.config;
- if *normal_name == self.time_point {
- Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit)
- } else if *tumoral_name == self.time_point {
- Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor)
- } else {
- panic!("Error in time_point name: {}", self.time_point);
- }
- }
- }
- impl Variants for DeepVariant {
- /// Loads variants from DeepVariant VCF and registers annotations.
- /// Loads and annotates variants from the DeepVariant PASS-filtered VCF.
- ///
- /// # Arguments
- /// * `annotations` - A mutable registry to tag loaded variants
- ///
- /// # Returns
- /// A `VariantCollection` with variant data, source VCF, and caller identity.
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let vcf_passed = self
- .config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
- info!("Loading variants from {}: {}", caller, vcf_passed);
- let variants = read_vcf(&vcf_passed)
- .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), &[caller.clone()]);
- });
- info!("{}, {} variants loaded.", caller, variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| {
- anyhow::anyhow!(
- "Error while creating a VCF representation for {}.\n{e}",
- vcf_passed
- )
- })?,
- caller,
- })
- }
- }
- impl Label for DeepVariant {
- fn label(&self) -> String {
- self.caller_cat().to_string()
- }
- }
|