use anyhow::Context; use log::info; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{fs, path::Path}; use crate::{ annotation::{Annotation, Annotations, Caller}, collection::{vcf::Vcf, InitializeSolo}, commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}, config::Config, helpers::{force_or_not, path_prefix}, io::vcf::read_vcf, runners::{run_wait, DockerRun, Run}, variant::{ variant::{RunnerVariants, Variants}, variant_collection::VariantCollection, }, }; #[derive(Debug, Clone)] pub struct DeepVariant { pub id: String, pub time: String, pub bam: String, pub output_dir: String, pub output_vcf: String, pub vcf_passed: String, pub log_dir: String, pub config: Config, } impl InitializeSolo for DeepVariant { fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result { let id = id.to_string(); let time = time.to_string(); info!("Initializing DeepVariant for {id} {time}."); let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id); if !Path::new(&log_dir).exists() { fs::create_dir_all(&log_dir) .context(format!("Failed to create {log_dir} directory"))?; } let bam = config.solo_bam(&id, &time); if !Path::new(&bam).exists() { anyhow::bail!("Bam files doesn't exists: {bam}") } let output_dir = config.deepvariant_output_dir(&id, &time); fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?; let output_vcf = config.deepvariant_output_vcf(&id, &time); let vcf_passed = format!("{}_PASSED.vcf.gz", path_prefix(&output_vcf)?); Ok(Self { id, time, bam, output_dir, output_vcf, vcf_passed, log_dir, config, }) } } impl Run for DeepVariant { fn run(&mut self) -> anyhow::Result<()> { force_or_not(&self.vcf_passed, self.config.deepvariant_force)?; // Run Docker command if output VCF doesn't exist if !Path::new(&self.output_vcf).exists() { let mut docker_run = DockerRun::new(&[ "run", "-d", "-v", "/data:/data", "-v", &format!("{}:/output", self.output_dir), &format!("google/deepvariant:{}", self.config.deepvariant_bin_version), "/opt/deepvariant/bin/run_deepvariant", &format!("--model_type={}", self.config.deepvariant_model_type), "--ref", &self.config.reference, "--reads", &self.bam, "--output_vcf", &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time), "--output_gvcf", &format!("/output/{}_{}_DeepVariant.g.vcf.gz", self.id, self.time), &format!("--num_shards={}", self.config.deepvariant_threads), "--logging_dir", "--vcf_stats_report=true", &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time), "--dry_run=false", "--sample_name", &format!("{}_{}", self.id, self.time), ]); let report = run_wait(&mut docker_run).context(format!( "Erreur while running DeepVariant for {} {}", self.id, self.time ))?; report .save_to_file(&format!("{}/deepvariant_", self.log_dir)) .context("Can't save DeepVariant logs")?; } // Keep PASS if !Path::new(&self.vcf_passed).exists() { info!("Filtering PASS variants"); let report = bcftools_keep_pass( &self.output_vcf, &self.vcf_passed, BcftoolsConfig::default(), ) .unwrap(); report .save_to_file(&format!("{}/bcftools_pass_", self.log_dir)) .unwrap(); } Ok(()) } } impl Variants for DeepVariant { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let solo = match self.time.as_str() { "diag" => Annotation::SoloDiag, "mrd" => Annotation::SoloConstit, _ => return Err(anyhow::anyhow!("Invalid time point.")), }; let add = vec![Annotation::Callers(Caller::DeepVariant), solo]; info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add); let variants = read_vcf(&self.vcf_passed)?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash_variant(), &add); }); Ok(VariantCollection { variants, vcf: Vcf::new(self.vcf_passed.clone().into())?, }) } } impl RunnerVariants for DeepVariant {}