deep_somatic.rs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. use std::{fs, path::Path};
  2. use log::info;
  3. use rayon::prelude::*;
  4. use crate::{
  5. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  6. collection::{vcf::Vcf, Initialize, ShouldRun},
  7. commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
  8. config::Config,
  9. helpers::{is_file_older, remove_dir_if_exists},
  10. io::vcf::read_vcf,
  11. runners::{run_wait, DockerRun, Run},
  12. variant::{
  13. variant::{Label, Variants},
  14. variant_collection::VariantCollection,
  15. },
  16. };
  17. /// A pipeline runner for executing DeepSomatic on paired tumor and normal BAM files.
  18. ///
  19. /// This struct encapsulates the configuration and metadata required to run the DeepSomatic
  20. /// variant caller inside a Docker container. It integrates into the pipeline ecosystem with:
  21. /// - Docker-based execution logic
  22. /// - Output filtering for PASS variants
  23. /// - Logging and diagnostics tracking
  24. /// - Annotation tagging for somatic variants
  25. #[derive(Debug)]
  26. pub struct DeepSomatic {
  27. pub id: String,
  28. pub log_dir: String,
  29. pub config: Config,
  30. }
  31. impl Initialize for DeepSomatic {
  32. /// Initializes the DeepSomatic runner by setting paths and logging.
  33. ///
  34. /// # Arguments
  35. /// * `id` - Sample ID used for output directory and log tagging.
  36. /// * `config` - Shared pipeline configuration.
  37. ///
  38. /// # Returns
  39. /// A ready-to-use `DeepSomatic` runner.
  40. fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
  41. let id = id.to_string();
  42. info!("Initializing DeepSomatic for {id}.");
  43. let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id);
  44. let deep_somatic = Self {
  45. id,
  46. config,
  47. log_dir,
  48. };
  49. if deep_somatic.config.deepsomatic_force {
  50. remove_dir_if_exists(&deep_somatic.config.deepsomatic_output_dir(&deep_somatic.id))?;
  51. }
  52. Ok(deep_somatic)
  53. }
  54. }
  55. /// Determines whether DeepSomatic should be re-run based on whether
  56. /// the filtered PASS VCF is older than the input BAMs.
  57. ///
  58. /// If either input BAM (normal or tumor) is newer than the PASS VCF,
  59. /// DeepSomatic is considered out of date and should be re-executed.
  60. ///
  61. /// # Returns
  62. /// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist)
  63. impl ShouldRun for DeepSomatic {
  64. fn should_run(&self) -> bool {
  65. let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id);
  66. let result = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true)
  67. || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
  68. if result {
  69. info!("DeepSomatic should run for id: {}.", self.id);
  70. }
  71. result
  72. }
  73. }
  74. impl Run for DeepSomatic {
  75. /// Runs DeepSomatic inside Docker and filters resulting variants with bcftools.
  76. ///
  77. /// # Workflow
  78. /// - Creates output directory
  79. /// - Executes Docker container for DeepSomatic
  80. /// - Applies PASS filtering using bcftools
  81. /// - Stores log outputs in gzipped format
  82. ///
  83. /// # Errors
  84. /// Returns an error if any subprocess or file operation fails.
  85. fn run(&mut self) -> anyhow::Result<()> {
  86. let output_vcf = self.config.deepsomatic_output_vcf(&self.id);
  87. if !Path::new(&output_vcf).exists() {
  88. let output_dir = self.config.deepsomatic_output_dir(&self.id);
  89. fs::create_dir_all(&output_dir)
  90. .map_err(|e| anyhow::anyhow!("Failed to create dir: {output_dir}.\n{e}"))?;
  91. let mut docker_run = DockerRun::new(&[
  92. "run",
  93. "-d",
  94. "-v",
  95. "/data:/data",
  96. "-v",
  97. &format!("{}:/output", output_dir),
  98. &format!("google/deepsomatic:{}", self.config.deepsomatic_bin_version),
  99. "run_deepsomatic",
  100. &format!("--model_type={}", self.config.deepsomatic_model_type),
  101. "--ref",
  102. &self.config.reference,
  103. "--reads_normal",
  104. &self.config.normal_bam(&self.id),
  105. "--reads_tumor",
  106. &self.config.tumoral_bam(&self.id),
  107. "--output_vcf",
  108. &format!(
  109. "/output/{}_{}_DeepSomatic.vcf.gz",
  110. self.id, self.config.tumoral_name
  111. ),
  112. "--output_gvcf",
  113. &format!(
  114. "/output/{}_{}_DeepSomatic.g.vcf.gz",
  115. self.id, self.config.tumoral_name
  116. ),
  117. &format!("--num_shards={}", self.config.deepsomatic_threads),
  118. "--logging_dir",
  119. &format!(
  120. "/output/{}_{}_DeepSomatic_logs",
  121. self.id, self.config.tumoral_name
  122. ),
  123. "--vcf_stats_report=true",
  124. "--dry_run=false",
  125. "--sample_name_tumor",
  126. &format!("{}_{}", self.id, self.config.tumoral_name),
  127. "--sample_name_normal",
  128. &format!("{}_{}", self.id, self.config.normal_name),
  129. ]);
  130. let report = run_wait(&mut docker_run)
  131. .map_err(|e| anyhow::anyhow!("Failed to run DeepSomatic for {}.\n{e}", self.id))?;
  132. report
  133. .save_to_file(&format!("{}/deepvariant_", self.log_dir))
  134. .map_err(|e| anyhow::anyhow!("Can't save DeepVariant logs.\n{e}"))?;
  135. }
  136. // Keep PASS
  137. let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
  138. if !Path::new(&vcf_passed).exists() {
  139. info!("Filtering DeepSomatic PASS variants for {}", self.id);
  140. let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default())
  141. .map_err(|e| {
  142. anyhow::anyhow!("Error while running bcftools pass for {}.\n{e}", output_vcf)
  143. })?;
  144. report
  145. .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
  146. .map_err(|e| {
  147. anyhow::anyhow!(
  148. "Error while saving bcftools report for {}.\n{e}",
  149. output_vcf
  150. )
  151. })?;
  152. }
  153. Ok(())
  154. }
  155. }
  156. impl CallerCat for DeepSomatic {
  157. /// Returns a classification tag for this caller, identifying it as somatic.
  158. fn caller_cat(&self) -> Annotation {
  159. Annotation::Callers(Caller::DeepSomatic, Sample::Somatic)
  160. }
  161. }
  162. impl Variants for DeepSomatic {
  163. /// Loads and annotates variants from the DeepSomatic VCF (PASS-filtered).
  164. ///
  165. /// # Arguments
  166. /// * `annotations` - The global annotation map to which variants are added.
  167. ///
  168. /// # Returns
  169. /// A `VariantCollection` containing variants, associated VCF path, and caller category.
  170. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  171. let caller = self.caller_cat();
  172. let add = vec![caller.clone()];
  173. let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
  174. info!("Loading variants from {}: {}", caller, vcf_passed);
  175. let variants = read_vcf(&vcf_passed)
  176. .map_err(|e| anyhow::anyhow!("Failed to read DeepSomatic VCF {}.\n{e}", vcf_passed))?;
  177. variants.par_iter().for_each(|v| {
  178. annotations.insert_update(v.hash(), &add);
  179. });
  180. info!("{}, {} variants loaded.", caller, variants.len());
  181. Ok(VariantCollection {
  182. variants,
  183. vcf: Vcf::new(vcf_passed.into())?,
  184. caller,
  185. })
  186. }
  187. }
  188. impl Label for DeepSomatic {
  189. fn label(&self) -> String {
  190. self.caller_cat().to_string()
  191. }
  192. }