deep_variant.rs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. use anyhow::Context;
  2. use log::info;
  3. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  4. use std::{fs, path::Path};
  5. use crate::{
  6. annotation::{Annotation, Annotations, Caller},
  7. collection::{vcf::Vcf, InitializeSolo},
  8. commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
  9. config::Config,
  10. helpers::{force_or_not, path_prefix},
  11. io::vcf::read_vcf,
  12. runners::{run_wait, DockerRun, Run},
  13. variant::{
  14. variant::{RunnerVariants, Variants},
  15. variant_collection::VariantCollection,
  16. },
  17. };
  18. #[derive(Debug, Clone)]
  19. pub struct DeepVariant {
  20. pub id: String,
  21. pub time: String,
  22. pub bam: String,
  23. pub output_dir: String,
  24. pub output_vcf: String,
  25. pub vcf_passed: String,
  26. pub log_dir: String,
  27. pub config: Config,
  28. }
  29. impl InitializeSolo for DeepVariant {
  30. fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
  31. let id = id.to_string();
  32. let time = time.to_string();
  33. info!("Initializing DeepVariant for {id} {time}.");
  34. let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
  35. if !Path::new(&log_dir).exists() {
  36. fs::create_dir_all(&log_dir)
  37. .context(format!("Failed to create {log_dir} directory"))?;
  38. }
  39. let bam = config.solo_bam(&id, &time);
  40. if !Path::new(&bam).exists() {
  41. anyhow::bail!("Bam files doesn't exists: {bam}")
  42. }
  43. let output_dir = config.deepvariant_output_dir(&id, &time);
  44. fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?;
  45. let output_vcf = config.deepvariant_output_vcf(&id, &time);
  46. let vcf_passed = format!("{}_PASSED.vcf.gz", path_prefix(&output_vcf)?);
  47. Ok(Self {
  48. id,
  49. time,
  50. bam,
  51. output_dir,
  52. output_vcf,
  53. vcf_passed,
  54. log_dir,
  55. config,
  56. })
  57. }
  58. }
  59. impl Run for DeepVariant {
  60. fn run(&mut self) -> anyhow::Result<()> {
  61. force_or_not(&self.vcf_passed, self.config.deepvariant_force)?;
  62. // Run Docker command if output VCF doesn't exist
  63. if !Path::new(&self.output_vcf).exists() {
  64. let mut docker_run = DockerRun::new(&[
  65. "run",
  66. "-d",
  67. "-v",
  68. "/data:/data",
  69. "-v",
  70. &format!("{}:/output", self.output_dir),
  71. &format!("google/deepvariant:{}", self.config.deepvariant_bin_version),
  72. "/opt/deepvariant/bin/run_deepvariant",
  73. &format!("--model_type={}", self.config.deepvariant_model_type),
  74. "--ref",
  75. &self.config.reference,
  76. "--reads",
  77. &self.bam,
  78. "--output_vcf",
  79. &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time),
  80. "--output_gvcf",
  81. &format!("/output/{}_{}_DeepVariant.g.vcf.gz", self.id, self.time),
  82. &format!("--num_shards={}", self.config.deepvariant_threads),
  83. "--logging_dir",
  84. "--vcf_stats_report=true",
  85. &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time),
  86. "--dry_run=false",
  87. "--sample_name",
  88. &format!("{}_{}", self.id, self.time),
  89. ]);
  90. let report = run_wait(&mut docker_run).context(format!(
  91. "Erreur while running DeepVariant for {} {}",
  92. self.id, self.time
  93. ))?;
  94. report
  95. .save_to_file(&format!("{}/deepvariant_", self.log_dir))
  96. .context("Can't save DeepVariant logs")?;
  97. }
  98. // Keep PASS
  99. if !Path::new(&self.vcf_passed).exists() {
  100. info!("Filtering PASS variants");
  101. let report = bcftools_keep_pass(
  102. &self.output_vcf,
  103. &self.vcf_passed,
  104. BcftoolsConfig::default(),
  105. )
  106. .unwrap();
  107. report
  108. .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
  109. .unwrap();
  110. }
  111. Ok(())
  112. }
  113. }
  114. impl Variants for DeepVariant {
  115. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  116. let solo = match self.time.as_str() {
  117. "diag" => Annotation::SoloDiag,
  118. "mrd" => Annotation::SoloConstit,
  119. _ => return Err(anyhow::anyhow!("Invalid time point.")),
  120. };
  121. let add = vec![Annotation::Callers(Caller::DeepVariant), solo];
  122. info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add);
  123. let variants = read_vcf(&self.vcf_passed)?;
  124. variants.par_iter().for_each(|v| {
  125. annotations.insert_update(v.hash_variant(), &add);
  126. });
  127. Ok(VariantCollection {
  128. variants,
  129. vcf: Vcf::new(self.vcf_passed.clone().into())?,
  130. })
  131. }
  132. }
  133. impl RunnerVariants for DeepVariant {}