deep_variant.rs 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. use anyhow::Context;
  2. use log::{debug, info};
  3. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  4. use std::{fs, path::Path};
  5. use crate::{
  6. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  7. collection::{vcf::Vcf, InitializeSolo, ShouldRun},
  8. commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
  9. config::Config,
  10. helpers::{is_file_older, remove_dir_if_exists},
  11. io::vcf::read_vcf,
  12. runners::{run_wait, DockerRun, Run},
  13. variant::{
  14. variant::{Label, Variants},
  15. variant_collection::VariantCollection,
  16. },
  17. };
  18. /// A pipeline runner for executing DeepVariant on a single sample and a specific time point (e.g., normal or tumor).
  19. ///
  20. /// This struct holds all necessary metadata, configuration, and output paths to perform variant calling
  21. /// on a given BAM file using DeepVariant in a Dockerized environment. It also supports:
  22. /// - Conditional execution via `should_run`
  23. /// - Cleanup and preparation via `initialize`
  24. /// - Log tracking
  25. /// - Integration into annotation and variant aggregation workflows
  26. #[derive(Debug, Clone)]
  27. pub struct DeepVariant {
  28. pub id: String,
  29. pub time_point: String,
  30. pub log_dir: String,
  31. pub config: Config,
  32. }
  33. impl InitializeSolo for DeepVariant {
  34. /// Initializes the DeepVariant runner for a specific sample and time point.
  35. /// If `force` is true or the output is outdated, the previous output directory is removed.
  36. /// Initializes the DeepVariant runner for a given ID and time point.
  37. ///
  38. /// # Arguments
  39. /// * `id` - Sample ID
  40. /// * `time_point` - Either the normal or tumor label for this run
  41. /// * `config` - Global configuration object
  42. ///
  43. /// # Returns
  44. /// A ready-to-run `DeepVariant` instance with proper path resolution.
  45. ///
  46. /// # Errors
  47. /// Will return an error if directory cleanup fails when forced or outdated.
  48. fn initialize(id: &str, time_point: &str, config: Config) -> anyhow::Result<Self> {
  49. let id = id.to_string();
  50. let time_point = time_point.to_string();
  51. info!("Initializing DeepVariant for {id} {time_point}.");
  52. let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
  53. let deepvariant = Self {
  54. id,
  55. time_point,
  56. log_dir,
  57. config,
  58. };
  59. if deepvariant.config.deepvariant_force || deepvariant.should_run() {
  60. remove_dir_if_exists(
  61. &deepvariant
  62. .config
  63. .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point),
  64. )?;
  65. }
  66. Ok(deepvariant)
  67. }
  68. }
  69. impl ShouldRun for DeepVariant {
  70. /// Returns true if the DeepVariant PASS VCF doesn't exist or is outdated vs input BAM.
  71. /// Determines whether DeepVariant should be rerun by comparing input BAM
  72. /// modification time to the output VCF.
  73. ///
  74. /// # Returns
  75. /// `true` if the output is outdated or missing.
  76. fn should_run(&self) -> bool {
  77. let passed_vcf = self
  78. .config
  79. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  80. let bam = self.config.solo_bam(&self.id, &self.time_point);
  81. let result = is_file_older(&passed_vcf, &bam).unwrap_or(true);
  82. if result {
  83. info!(
  84. "DeepVariant should run for: {} {}.",
  85. self.id, self.time_point
  86. );
  87. }
  88. result
  89. }
  90. }
  91. impl Run for DeepVariant {
  92. /// Executes DeepVariant inside Docker and filters PASS variants via bcftools.
  93. /// Runs DeepVariant inside Docker and filters variants using bcftools.
  94. ///
  95. /// This function:
  96. /// - Creates necessary output directories
  97. /// - Executes DeepVariant through Docker if needed
  98. /// - Filters for PASS variants
  99. /// - Saves logs and handles caching logic via file existence
  100. ///
  101. /// # Errors
  102. /// Returns an error if any external command or file operation fails.
  103. fn run(&mut self) -> anyhow::Result<()> {
  104. let bam = self.config.solo_bam(&self.id, &self.time_point);
  105. let output_vcf = self
  106. .config
  107. .deepvariant_solo_output_vcf(&self.id, &self.time_point);
  108. // Run Docker command if output VCF doesn't exist
  109. if !Path::new(&output_vcf).exists() {
  110. let output_dir = self
  111. .config
  112. .deepvariant_output_dir(&self.id, &self.time_point);
  113. fs::create_dir_all(&output_dir)
  114. .context(format!("Failed to create dir: {output_dir}"))?;
  115. let mut docker_run = DockerRun::new(&[
  116. "run",
  117. "-d",
  118. "-v",
  119. "/data:/data",
  120. "-v",
  121. &format!("{}:/output", output_dir),
  122. &format!("google/deepvariant:{}", self.config.deepvariant_bin_version),
  123. "/opt/deepvariant/bin/run_deepvariant",
  124. &format!("--model_type={}", self.config.deepvariant_model_type),
  125. "--ref",
  126. &self.config.reference,
  127. "--reads",
  128. &bam,
  129. "--output_vcf",
  130. &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point),
  131. "--output_gvcf",
  132. &format!(
  133. "/output/{}_{}_DeepVariant.g.vcf.gz",
  134. self.id, self.time_point
  135. ),
  136. &format!("--num_shards={}", self.config.deepvariant_threads),
  137. "--logging_dir",
  138. "--vcf_stats_report=true",
  139. &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
  140. "--dry_run=false",
  141. "--sample_name",
  142. &format!("{}_{}", self.id, self.time_point),
  143. ]);
  144. let report = run_wait(&mut docker_run).context(format!(
  145. "Erreur while running DeepVariant for {} {}",
  146. self.id, self.time_point
  147. ))?;
  148. report
  149. .save_to_file(&format!("{}/deepvariant_", self.log_dir))
  150. .context("Can't save DeepVariant logs")?;
  151. } else {
  152. debug!(
  153. "DeepVariant output already exists for {} {}, skipping execution.",
  154. self.id, self.time_point
  155. );
  156. }
  157. let vcf_passed = self
  158. .config
  159. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  160. if !Path::new(&vcf_passed).exists() {
  161. info!(
  162. "Filtering PASS variants for {} {}",
  163. self.id, self.time_point
  164. );
  165. let report =
  166. bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
  167. report
  168. .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
  169. .unwrap();
  170. }
  171. Ok(())
  172. }
  173. }
  174. impl CallerCat for DeepVariant {
  175. /// Identifies the caller and whether it's for a normal or tumor sample.
  176. /// Determines the category for variant annotation (normal or tumor).
  177. ///
  178. /// # Returns
  179. /// A variant caller classification used for downstream tagging.
  180. fn caller_cat(&self) -> Annotation {
  181. let Config {
  182. normal_name,
  183. tumoral_name,
  184. ..
  185. } = &self.config;
  186. if *normal_name == self.time_point {
  187. Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit)
  188. } else if *tumoral_name == self.time_point {
  189. Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor)
  190. } else {
  191. panic!("Error in time_point name: {}", self.time_point);
  192. }
  193. }
  194. }
  195. impl Variants for DeepVariant {
  196. /// Loads variants from DeepVariant VCF and registers annotations.
  197. /// Loads and annotates variants from the DeepVariant PASS-filtered VCF.
  198. ///
  199. /// # Arguments
  200. /// * `annotations` - A mutable registry to tag loaded variants
  201. ///
  202. /// # Returns
  203. /// A `VariantCollection` with variant data, source VCF, and caller identity.
  204. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  205. let caller = self.caller_cat();
  206. let vcf_passed = self
  207. .config
  208. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  209. info!("Loading variants from {}: {}", caller, vcf_passed);
  210. let variants = read_vcf(&vcf_passed)
  211. .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?;
  212. variants.par_iter().for_each(|v| {
  213. annotations.insert_update(v.hash(), &[caller.clone()]);
  214. });
  215. info!("{}, {} variants loaded.", caller, variants.len());
  216. Ok(VariantCollection {
  217. variants,
  218. vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| {
  219. anyhow::anyhow!(
  220. "Error while creating a VCF representation for {}.\n{e}",
  221. vcf_passed
  222. )
  223. })?,
  224. caller,
  225. })
  226. }
  227. }
  228. impl Label for DeepVariant {
  229. fn label(&self) -> String {
  230. self.caller_cat().to_string()
  231. }
  232. }