| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- use anyhow::Context;
- use log::info;
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use std::{fs, path::Path};
- use crate::{
- annotation::{Annotation, Annotations, Caller},
- collection::{vcf::Vcf, InitializeSolo},
- commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
- config::Config,
- helpers::{force_or_not, path_prefix},
- io::vcf::read_vcf,
- runners::{run_wait, DockerRun, Run},
- variant::{
- variant::{RunnerVariants, Variants},
- variant_collection::VariantCollection,
- },
- };
- #[derive(Debug, Clone)]
- pub struct DeepVariant {
- pub id: String,
- pub time: String,
- pub bam: String,
- pub output_dir: String,
- pub output_vcf: String,
- pub vcf_passed: String,
- pub log_dir: String,
- pub config: Config,
- }
- impl InitializeSolo for DeepVariant {
- fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- let time = time.to_string();
- info!("Initializing DeepVariant for {id} {time}.");
- let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
- if !Path::new(&log_dir).exists() {
- fs::create_dir_all(&log_dir)
- .context(format!("Failed to create {log_dir} directory"))?;
- }
- let bam = config.solo_bam(&id, &time);
- if !Path::new(&bam).exists() {
- anyhow::bail!("Bam files doesn't exists: {bam}")
- }
- let output_dir = config.deepvariant_output_dir(&id, &time);
- fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?;
- let output_vcf = config.deepvariant_output_vcf(&id, &time);
- let vcf_passed = format!("{}_PASSED.vcf.gz", path_prefix(&output_vcf)?);
- Ok(Self {
- id,
- time,
- bam,
- output_dir,
- output_vcf,
- vcf_passed,
- log_dir,
- config,
- })
- }
- }
- impl Run for DeepVariant {
- fn run(&mut self) -> anyhow::Result<()> {
- force_or_not(&self.vcf_passed, self.config.deepvariant_force)?;
- // Run Docker command if output VCF doesn't exist
- if !Path::new(&self.output_vcf).exists() {
- let mut docker_run = DockerRun::new(&[
- "run",
- "-d",
- "-v",
- "/data:/data",
- "-v",
- &format!("{}:/output", self.output_dir),
- &format!("google/deepvariant:{}", self.config.deepvariant_bin_version),
- "/opt/deepvariant/bin/run_deepvariant",
- &format!("--model_type={}", self.config.deepvariant_model_type),
- "--ref",
- &self.config.reference,
- "--reads",
- &self.bam,
- "--output_vcf",
- &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time),
- "--output_gvcf",
- &format!("/output/{}_{}_DeepVariant.g.vcf.gz", self.id, self.time),
- &format!("--num_shards={}", self.config.deepvariant_threads),
- "--logging_dir",
- "--vcf_stats_report=true",
- &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time),
- "--dry_run=false",
- "--sample_name",
- &format!("{}_{}", self.id, self.time),
- ]);
- let report = run_wait(&mut docker_run).context(format!(
- "Erreur while running DeepVariant for {} {}",
- self.id, self.time
- ))?;
- report
- .save_to_file(&format!("{}/deepvariant_", self.log_dir))
- .context("Can't save DeepVariant logs")?;
- }
- // Keep PASS
- if !Path::new(&self.vcf_passed).exists() {
- info!("Filtering PASS variants");
- let report = bcftools_keep_pass(
- &self.output_vcf,
- &self.vcf_passed,
- BcftoolsConfig::default(),
- )
- .unwrap();
- report
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
- .unwrap();
- }
- Ok(())
- }
- }
- impl Variants for DeepVariant {
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let solo = match self.time.as_str() {
- "diag" => Annotation::SoloDiag,
- "mrd" => Annotation::SoloConstit,
- _ => return Err(anyhow::anyhow!("Invalid time point.")),
- };
- let add = vec![Annotation::Callers(Caller::DeepVariant), solo];
- info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add);
- let variants = read_vcf(&self.vcf_passed)?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash_variant(), &add);
- });
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(self.vcf_passed.clone().into())?,
- })
- }
- }
- impl RunnerVariants for DeepVariant {}
|