deep_variant.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. //! # DeepVariant Variant Calling Pipeline
  2. //!
  3. //! This module provides a pipeline runner for [DeepVariant](https://github.com/google/deepvariant),
  4. //! a deep learning-based variant caller.
  5. //!
  6. //! ## Overview
  7. //!
  8. //! DeepVariant performs variant calling on a single sample BAM file using:
  9. //!
  10. //! - CNN-based variant classification
  11. //! - Haploid calling for sex chromosomes (configurable by karyotype)
  12. //! - Containerized execution via Singularity/Apptainer
  13. //!
  14. //! ## Key Features
  15. //!
  16. //! - **Deep learning-based** - CNN models trained on diverse sequencing platforms
  17. //! - **Karyotype-aware** - Automatically calls X/Y as haploid based on sample sex
  18. //! - **Solo mode** - Single-sample germline variant calling
  19. //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina
  20. //! - **Parallel execution** - Genome chunking for HPC scalability
  21. //!
  22. //! ## Requirements
  23. //!
  24. //! Before running DeepVariant, ensure:
  25. //! - BAM file is indexed (`.bai` file present)
  26. //! - Reference genome is accessible
  27. //! - Singularity/Docker image is available
  28. //! - Model type matches sequencing platform (`config.deepvariant_model_type`)
  29. //!
  30. //! ## Execution Modes
  31. //!
  32. //! Execution mode is automatically selected via `config.slurm_runner`:
  33. //!
  34. //! - **Local** — Single-node execution
  35. //! - **Slurm** — HPC job submission
  36. //!
  37. //! Both modes support chunked parallel execution via [`run_deepvariant_chunked_with_merge`].
  38. //!
  39. //! ## Output Files
  40. //!
  41. //! PASS-filtered variants:
  42. //! ```text
  43. //! {result_dir}/{id}/deepvariant/{id}_{time_point}_DeepVariant.pass.vcf.gz
  44. //! ```
  45. //!
  46. //! ## Usage
  47. //!
  48. //! ### Chunked Parallel Execution (Recommended for WGS)
  49. //!
  50. //! ```ignore
  51. //! use pandora_lib_promethion::callers::deep_variant::run_deepvariant_chunked_with_merge;
  52. //! use pandora_lib_promethion::config::Config;
  53. //!
  54. //! let config = Config::default();
  55. //! // Run DeepVariant in 30 parallel chunks for "norm" time point
  56. //! let outputs = run_deepvariant_chunked_with_merge("sample_001", "norm", &config, 30)?;
  57. //! # Ok::<(), anyhow::Error>(())
  58. //! ```
  59. //!
  60. //! ### Single-Run Execution
  61. //!
  62. //! ```ignore
  63. //! use pandora_lib_promethion::callers::deep_variant::DeepVariant;
  64. //! use pandora_lib_promethion::config::Config;
  65. //! use pandora_lib_promethion::pipes::InitializeSolo;
  66. //! use pandora_lib_promethion::runners::Run;
  67. //!
  68. //! let config = Config::default();
  69. //! let mut caller = DeepVariant::initialize("sample_001", "norm", &config)?;
  70. //!
  71. //! if caller.should_run() {
  72. //! caller.run()?;
  73. //! }
  74. //!
  75. //! // Load variants
  76. //! let variants = caller.variants(&annotations)?;
  77. //! println!("Found {} germline variants", variants.variants.len());
  78. //! # Ok::<(), anyhow::Error>(())
  79. //! ```
  80. //!
  81. //! ## References
  82. //!
  83. //! - [DeepVariant GitHub repository](https://github.com/google/deepvariant)
  84. //! - [DeepVariant publication (Nature Biotechnology, 2018)](https://doi.org/10.1038/nbt.4235)
  85. use anyhow::Context;
  86. use log::{debug, info};
  87. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  88. use regex::Regex;
  89. use rust_htslib::bam::{self, Read};
  90. use std::{
  91. fmt, fs,
  92. path::{Path, PathBuf},
  93. process::{Command as ProcessCommand, Stdio},
  94. };
  95. use uuid::Uuid;
  96. use crate::{
  97. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  98. collection::{
  99. bam_stats::{Karyotype, WGSBamStats},
  100. vcf::Vcf,
  101. },
  102. commands::{
  103. bcftools::{BcftoolsConcat, BcftoolsKeepPass},
  104. LocalBatchRunner, SlurmRunner,
  105. },
  106. config::Config,
  107. helpers::{
  108. get_genome_sizes, is_file_older, remove_dir_if_exists, singularity_bind_flags,
  109. split_genome_into_n_regions_exact,
  110. },
  111. io::vcf::read_vcf,
  112. locker::SampleLock,
  113. pipes::{InitializeSolo, ShouldRun, Version},
  114. run, run_many,
  115. runners::Run,
  116. variant::{
  117. variant_collection::VariantCollection,
  118. vcf_variant::{Label, Variants},
  119. },
  120. };
  121. use crate::commands::{
  122. Command as JobCommand, // your trait
  123. LocalRunner,
  124. SbatchRunner,
  125. SlurmParams,
  126. };
  127. /// DeepVariant solo (single-sample) variant caller.
  128. ///
  129. /// Executes DeepVariant for germline variant calling on a single BAM file.
  130. /// Supports karyotype-aware calling for accurate sex chromosome genotyping.
  131. ///
  132. /// # Fields
  133. ///
  134. /// - `id` - Sample identifier (e.g., "34528")
  135. /// - `time_point` - Time point label: typically `config.normal_name` ("norm") or `config.tumoral_name` ("diag")
  136. /// - `regions` - Genomic regions to process (e.g., "chr1 chr2 chr3" or "chr1:1-1000000")
  137. /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepvariant")
  138. /// - `config` - Global pipeline configuration
  139. /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
  140. /// - `karyotype` - Sex karyotype for haploid calling on X/Y chromosomes (XX or XY)
  141. ///
  142. /// # Execution Modes
  143. ///
  144. /// - **Local** - Direct execution via `run_local()`
  145. /// - **Slurm** - Single job submission via `run_sbatch()`
  146. /// - **Chunked** - Parallel genome-wide execution via [`run_deepvariant_chunked_with_merge`]
  147. ///
  148. /// # Karyotype-Aware Calling
  149. ///
  150. /// DeepVariant automatically adjusts ploidy based on `karyotype`:
  151. /// - **XY karyotype**: chrX and chrY called as haploid
  152. /// - **XX karyotype**: chrX called as diploid, chrY skipped
  153. #[derive(Debug, Clone)]
  154. pub struct DeepVariant {
  155. /// Sample identifier
  156. pub id: String,
  157. /// Time point identifier (e.g., "norm" or "diag")
  158. pub time_point: String,
  159. /// Space-separated list of genomic regions to process
  160. pub regions: String,
  161. /// Directory for log file storage
  162. pub log_dir: String,
  163. /// Global pipeline configuration
  164. pub config: Config,
  165. /// Optional part index for chunked parallel runs (1-indexed)
  166. pub part_index: Option<usize>,
  167. /// Sex karyotype for haploid contig handling (XX or XY)
  168. pub karyotype: Karyotype,
  169. }
  170. impl fmt::Display for DeepVariant {
  171. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  172. writeln!(f, "🧬 DeepVariant Solo")?;
  173. writeln!(f, " Case ID : {}", self.id)?;
  174. writeln!(f, " Time point : {}", self.time_point)?;
  175. writeln!(f, " Regions : {}", self.regions)?;
  176. writeln!(f, " Log dir : {}", self.log_dir)?;
  177. writeln!(
  178. f,
  179. " Part : {}",
  180. self.part_index
  181. .map_or("full".into(), |n| format!("part{n}"))
  182. )?;
  183. writeln!(f, " Karyotype : {}", self.karyotype)
  184. }
  185. }
  186. impl InitializeSolo for DeepVariant {
  187. /// Initializes a DeepVariant runner for a specific sample and time point.
  188. ///
  189. /// Creates necessary directory structures and optionally cleans up previous runs
  190. /// when [`Config::deepvariant_force`] is set.
  191. ///
  192. /// # Arguments
  193. ///
  194. /// * `id` - Sample identifier
  195. /// * `time_point` - Either normal or tumor label for this run
  196. /// * `config` - Global pipeline configuration
  197. ///
  198. /// # Returns
  199. ///
  200. /// A configured [`DeepVariant`] instance ready for execution.
  201. ///
  202. /// # Errors
  203. ///
  204. /// Returns an error if directory cleanup fails when force mode is enabled.
  205. fn initialize(id: &str, time_point: &str, config: &Config) -> anyhow::Result<Self> {
  206. let id = id.to_string();
  207. let time_point = time_point.to_string();
  208. // Validate time_point matches configured names
  209. anyhow::ensure!(
  210. time_point == config.normal_name || time_point == config.tumoral_name,
  211. "Invalid time_point '{}': must be either '{}' (normal) or '{}' (tumor)",
  212. time_point,
  213. config.normal_name,
  214. config.tumoral_name
  215. );
  216. let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?;
  217. info!("Initializing DeepVariant for {id} {time_point}.");
  218. let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
  219. let regions = (1..=22)
  220. .map(|i| format!("chr{i}"))
  221. .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
  222. .collect::<Vec<String>>();
  223. let deepvariant = Self {
  224. id,
  225. time_point,
  226. log_dir,
  227. config: config.clone(),
  228. regions: regions.join(" "),
  229. part_index: None,
  230. karyotype,
  231. };
  232. if deepvariant.config.deepvariant_force {
  233. remove_dir_if_exists(
  234. &deepvariant
  235. .config
  236. .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point),
  237. )?;
  238. }
  239. Ok(deepvariant)
  240. }
  241. }
  242. impl ShouldRun for DeepVariant {
  243. /// Determines whether DeepVariant should be (re)run.
  244. ///
  245. /// Compares the modification time of the input BAM against the output
  246. /// PASS-filtered VCF. Returns `true` if:
  247. /// - The output VCF does not exist
  248. /// - The output VCF is older than the input BAM
  249. ///
  250. /// # Implementation Note
  251. ///
  252. /// This is a conservative check—any error during timestamp comparison
  253. /// results in `true` (i.e., re-run).
  254. fn should_run(&self) -> bool {
  255. let passed_vcf = self
  256. .config
  257. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  258. let bam = self.config.solo_bam(&self.id, &self.time_point);
  259. let result = is_file_older(&passed_vcf, &bam, true).unwrap_or(true);
  260. if result {
  261. info!(
  262. "DeepVariant should run for: {} {}.",
  263. self.id, self.time_point
  264. );
  265. }
  266. result
  267. }
  268. }
  269. impl JobCommand for DeepVariant {
  270. /// Initializes output and log directories for this run.
  271. ///
  272. /// # Errors
  273. ///
  274. /// Returns an error if directory creation fails.
  275. fn init(&mut self) -> anyhow::Result<()> {
  276. // Part-specific output dir
  277. let output_dir = self.part_output_dir();
  278. fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?;
  279. // log dir
  280. fs::create_dir_all(&self.log_dir)
  281. .context(format!("Failed to create dir: {}", self.log_dir))?;
  282. Ok(())
  283. }
  284. /// Generates the DeepVariant command string for Singularity execution.
  285. ///
  286. /// The command binds the output directory and runs DeepVariant with:
  287. /// - Configured model type
  288. /// - Haploid calling for sex chromosomes
  289. /// - VCF stats report generation
  290. fn cmd(&self) -> String {
  291. let bam = self.config.solo_bam(&self.id, &self.time_point);
  292. let output_dir = self.part_output_dir();
  293. let output_vcf_path = self.output_vcf_path();
  294. // Part-specific log subdir to avoid collision
  295. let log_subdir = match self.part_index {
  296. Some(idx) => format!("{}_{}_DeepVariant_part{idx}_logs", self.id, self.time_point),
  297. None => format!("{}_{}_DeepVariant_logs", self.id, self.time_point),
  298. };
  299. let sample_name = format!("{}_{}", self.id, self.time_point);
  300. let output_vcf = format!(
  301. "/output/{}",
  302. Path::new(&output_vcf_path)
  303. .file_name()
  304. .unwrap()
  305. .to_string_lossy()
  306. );
  307. // Build haploid_contigs flag only for XY karyotype
  308. let haploid_flag = match self.karyotype {
  309. Karyotype::XY => "--haploid_contigs='chrX,chrY' \\",
  310. Karyotype::XX => "", // No haploid contigs for XX
  311. };
  312. let bind_flags = singularity_bind_flags([
  313. &bam,
  314. &self.config.reference,
  315. &self.config.pseudoautosomal_regions_bed,
  316. &output_dir,
  317. &self.log_dir,
  318. &self.config.tmp_dir,
  319. ]);
  320. format!(
  321. "{singularity_bin} exec \
  322. {binds} \
  323. --bind {output_dir}:/output \
  324. {image} \
  325. /opt/deepvariant/bin/run_deepvariant \
  326. --model_type={model_type} \
  327. --ref={reference} \
  328. --reads={bam} \
  329. --regions=\"{regions}\" \
  330. {haploid_flag} \
  331. --par_regions_bed={par_bed} \
  332. --output_vcf={output_vcf} \
  333. --num_shards={threads} \
  334. --vcf_stats_report=true \
  335. --postprocess_cpus={postprocess_cpus} \
  336. --logging_dir=/output/{log_dir} \
  337. --dry_run=false \
  338. --sample_name={sample_name}",
  339. singularity_bin = self.config.singularity_bin,
  340. binds = bind_flags,
  341. output_dir = output_dir,
  342. image = self.config.deepvariant_image,
  343. model_type = self.config.deepvariant_model_type,
  344. reference = self.config.reference,
  345. bam = bam,
  346. regions = self.regions,
  347. par_bed = self.config.pseudoautosomal_regions_bed,
  348. // we mount output_dir as /output; just rewrite path here:
  349. output_vcf = output_vcf,
  350. threads = self.config.deepvariant_threads,
  351. log_dir = log_subdir,
  352. postprocess_cpus = self.config.deepvariant_threads,
  353. sample_name = sample_name,
  354. )
  355. }
  356. }
  357. impl LocalRunner for DeepVariant {}
  358. impl LocalBatchRunner for DeepVariant {}
  359. impl SlurmRunner for DeepVariant {
  360. fn slurm_args(&self) -> Vec<String> {
  361. let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
  362. SlurmParams {
  363. job_name: Some(format!(
  364. "deepvariant_{}_{}{}",
  365. self.id, self.time_point, batch_id
  366. )),
  367. cpus_per_task: Some(self.config.deepvariant_threads.into()),
  368. mem: Some("40G".into()),
  369. partition: Some("shortq".into()),
  370. gres: None,
  371. }
  372. .to_args()
  373. }
  374. }
  375. impl SbatchRunner for DeepVariant {
  376. fn slurm_params(&self) -> SlurmParams {
  377. let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
  378. SlurmParams {
  379. job_name: Some(format!(
  380. "deepvariant_{}_{}{}",
  381. self.id, self.time_point, batch_id
  382. )),
  383. cpus_per_task: Some(self.config.deepvariant_threads.into()),
  384. mem: Some("40G".into()),
  385. partition: Some("shortq".into()),
  386. gres: None,
  387. }
  388. }
  389. }
  390. impl DeepVariant {
  391. /// Returns the per-part output directory.
  392. ///
  393. /// For partitioned runs, creates an isolated subdirectory to prevent
  394. /// intermediate file collisions between parallel jobs.
  395. fn part_output_dir(&self) -> String {
  396. let base_dir = self
  397. .config
  398. .deepvariant_output_dir(&self.id, &self.time_point);
  399. match self.part_index {
  400. Some(idx) => format!("{base_dir}/part{idx}"),
  401. None => base_dir,
  402. }
  403. }
  404. /// Returns the output VCF path for this run.
  405. ///
  406. /// For partitioned runs, the file is placed in a part-specific subdirectory.
  407. fn output_vcf_path(&self) -> String {
  408. let output_dir = self.part_output_dir();
  409. format!(
  410. "{output_dir}/{}_{}_DeepVariant.vcf.gz",
  411. self.id, self.time_point
  412. )
  413. }
  414. /// Returns the PASS-filtered VCF path.
  415. ///
  416. /// - For partitioned runs: part-specific path for intermediate PASS VCF
  417. /// - For single runs or final output: canonical path from config
  418. ///
  419. /// The final merged PASS VCF always uses the canonical path regardless
  420. /// of whether it was produced by a single run or merged from parts.
  421. fn passed_vcf_path(&self) -> String {
  422. match self.part_index {
  423. Some(_) => {
  424. // Part runs: intermediate PASS VCF in part-specific directory
  425. self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz")
  426. }
  427. None => {
  428. // Single run or final merged output: canonical path
  429. self.config
  430. .deepvariant_solo_passed_vcf(&self.id, &self.time_point)
  431. }
  432. }
  433. }
  434. /// Filters variants to keep only PASS entries.
  435. fn filter_pass(&self) -> anyhow::Result<()> {
  436. let output_vcf = self.output_vcf_path();
  437. let vcf_passed = self.passed_vcf_path();
  438. info!(
  439. "Filtering PASS variants for {} {} (part: {:?})",
  440. self.id, self.time_point, self.part_index
  441. );
  442. let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
  443. let report = run!(&self.config, &mut cmd)?;
  444. report
  445. .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
  446. .context("Can't save bcftools PASS logs")?;
  447. Ok(())
  448. }
  449. }
  450. impl Run for DeepVariant {
  451. fn run(&mut self) -> anyhow::Result<()> {
  452. let lock_dir = format!("{}/locks", self.config.result_dir);
  453. let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepvariant")
  454. .with_context(|| format!("Cannot start DeepVariant chunked for {}", self.id))?;
  455. if !self.should_run() {
  456. info!("DeepVariant is up-to-data.");
  457. return Ok(());
  458. }
  459. if self.config.slurm_runner {
  460. run_deepvariant_chunked(&self.id, &self.time_point, &self.config, 30)
  461. } else {
  462. run!(&self.config, self)?;
  463. self.filter_pass()
  464. }
  465. }
  466. }
  467. impl CallerCat for DeepVariant {
  468. /// Returns the caller annotation category based on time point.
  469. ///
  470. /// Maps the time point to either [`Sample::SoloConstit`] (normal) or
  471. /// [`Sample::SoloTumor`] (tumoral).
  472. ///
  473. /// # Safety
  474. ///
  475. /// The time_point is validated during initialization, so this can never fail.
  476. /// If it does, it indicates a serious logic error in the code.
  477. fn caller_cat(&self) -> Annotation {
  478. let Config {
  479. normal_name,
  480. tumoral_name,
  481. ..
  482. } = &self.config;
  483. if *normal_name == self.time_point {
  484. Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit)
  485. } else if *tumoral_name == self.time_point {
  486. Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor)
  487. } else {
  488. // SAFETY: time_point is validated in initialize() to be either normal_name or tumoral_name.
  489. // If we reach here, it's a logic error in the code, not a user error.
  490. unreachable!(
  491. "Invalid time_point '{}': expected '{}' or '{}'. This should have been caught during initialization.",
  492. self.time_point, normal_name, tumoral_name
  493. )
  494. }
  495. }
  496. }
  497. impl Variants for DeepVariant {
  498. /// Loads variants from the DeepVariant PASS-filtered VCF.
  499. ///
  500. /// Reads the VCF, registers each variant in the annotation store with
  501. /// the appropriate caller tag, and returns a [`VariantCollection`].
  502. ///
  503. /// # Arguments
  504. ///
  505. /// * `annotations` - Mutable annotation registry for tagging variants
  506. ///
  507. /// # Returns
  508. ///
  509. /// A [`VariantCollection`] containing:
  510. /// - Parsed variant records
  511. /// - Source VCF metadata
  512. /// - Caller identification
  513. ///
  514. /// # Errors
  515. ///
  516. /// Returns an error if:
  517. /// - VCF file cannot be read or parsed
  518. /// - VCF metadata extraction fails
  519. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  520. let caller = self.caller_cat();
  521. let vcf_passed = self
  522. .config
  523. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  524. info!("Loading variants from {caller}: {vcf_passed}");
  525. let variants = read_vcf(&vcf_passed)
  526. .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?;
  527. variants.par_iter().for_each(|v| {
  528. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  529. });
  530. info!("{}, {} variants loaded.", caller, variants.len());
  531. Ok(VariantCollection {
  532. variants,
  533. vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| {
  534. anyhow::anyhow!(
  535. "Error while creating a VCF representation for {}.\n{e}",
  536. vcf_passed
  537. )
  538. })?,
  539. caller,
  540. })
  541. }
  542. }
  543. impl Label for DeepVariant {
  544. /// Returns a string label for this caller (e.g., "DeepVariant_SoloConstit").
  545. fn label(&self) -> String {
  546. self.caller_cat().to_string()
  547. }
  548. }
  549. impl Version for DeepVariant {
  550. /// Retrieves the DeepVariant version via local Singularity execution.
  551. ///
  552. /// # Errors
  553. ///
  554. /// Returns an error if:
  555. /// - Singularity execution fails
  556. /// - Version string cannot be parsed from output
  557. fn version(config: &Config) -> anyhow::Result<String> {
  558. let out = ProcessCommand::new("bash")
  559. .arg("-c")
  560. .arg(format!(
  561. "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
  562. config.singularity_bin, config.deepvariant_image
  563. ))
  564. .stdout(Stdio::piped())
  565. .stderr(Stdio::piped())
  566. .output()
  567. .context("failed to spawn singularity")?;
  568. if !out.status.success() {
  569. let mut log = String::from_utf8_lossy(&out.stdout).to_string();
  570. log.push_str(&String::from_utf8_lossy(&out.stderr));
  571. anyhow::bail!("singularity exec failed: {}\n{}", out.status, log);
  572. }
  573. let combined = format!(
  574. "{}{}",
  575. String::from_utf8_lossy(&out.stdout),
  576. String::from_utf8_lossy(&out.stderr)
  577. );
  578. parse_deepvariant_version(&combined)
  579. }
  580. /// Retrieves the DeepVariant version via Slurm job submission.
  581. ///
  582. /// Useful when local GPU access is unavailable but Slurm has GPU nodes.
  583. ///
  584. /// # Errors
  585. ///
  586. /// Returns an error if job submission fails or version cannot be parsed.
  587. fn version_slurm(config: &Config) -> anyhow::Result<String> {
  588. // Minimal Slurm job just to run the version command
  589. struct DeepVariantVersionJob<'a> {
  590. config: &'a Config,
  591. }
  592. impl<'a> JobCommand for DeepVariantVersionJob<'a> {
  593. fn cmd(&self) -> String {
  594. format!(
  595. "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
  596. self.config.singularity_bin, self.config.deepvariant_image
  597. )
  598. }
  599. }
  600. impl<'a> SlurmRunner for DeepVariantVersionJob<'a> {
  601. fn slurm_args(&self) -> Vec<String> {
  602. SlurmParams {
  603. job_name: Some("deepvariant_version".into()),
  604. partition: Some("gpgpuq".into()),
  605. cpus_per_task: Some(1),
  606. mem: Some("1G".into()),
  607. gres: None,
  608. }
  609. .to_args()
  610. }
  611. }
  612. let mut job = DeepVariantVersionJob { config };
  613. let out =
  614. SlurmRunner::exec(&mut job).context("failed to run DeepVariant --version via Slurm")?;
  615. // Combine stdout, Slurm epilog (if any), and stderr for parsing
  616. let mut combined = out.stdout.clone();
  617. if let Some(epilog) = &out.slurm_epilog {
  618. combined.push_str(&format!("{epilog}"));
  619. }
  620. combined.push_str(&out.stderr);
  621. parse_deepvariant_version(&combined)
  622. }
  623. }
  624. /// Parses the DeepVariant version from command output.
  625. ///
  626. /// # Arguments
  627. ///
  628. /// * `output` - Combined stdout/stderr from DeepVariant --version
  629. ///
  630. /// # Returns
  631. ///
  632. /// The version string (e.g., "1.6.1")
  633. ///
  634. /// # Errors
  635. ///
  636. /// Returns an error if the version pattern is not found.
  637. fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
  638. let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid");
  639. let caps = re
  640. .captures(output)
  641. .context("Could not parse DeepVariant version from output")?;
  642. Ok(caps
  643. .get(1)
  644. .expect("Regex has capture group 1")
  645. .as_str()
  646. .to_string())
  647. }
  648. fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
  649. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  650. for i in 1..=n_parts {
  651. let mut dv = base.clone();
  652. dv.part_index = Some(i);
  653. let part_pass = dv.passed_vcf_path();
  654. anyhow::ensure!(
  655. Path::new(&part_pass).exists(),
  656. "Missing part {i} PASS VCF: {part_pass}"
  657. );
  658. part_pass_paths.push(PathBuf::from(part_pass));
  659. }
  660. let final_passed_vcf = base
  661. .config
  662. .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
  663. let rand = Uuid::new_v4();
  664. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  665. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  666. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  667. fs::create_dir_all(parent)?;
  668. }
  669. info!(
  670. "Concatenating {} part VCFs into {}",
  671. n_parts, final_passed_vcf
  672. );
  673. let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
  674. run!(&base.config, &mut concat)
  675. .context("Failed to run bcftools concat for DeepVariant parts")?
  676. .save_to_file(format!("{}/bcftools_concat", base.log_dir))?;
  677. fs::rename(&final_tmp, &final_passed_vcf)
  678. .context("Failed to rename merged DeepVariant PASS VCF")?;
  679. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  680. .context("Failed to rename merged DeepVariant PASS VCF CSI")?;
  681. info!(
  682. "Successfully merged {} parts into {}",
  683. n_parts, final_passed_vcf
  684. );
  685. Ok(())
  686. }
  687. /// Runs DeepVariant in parallel chunks, then merges results.
  688. ///
  689. /// Splits the genome into N equal-sized regions, runs DeepVariant on each region
  690. /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants,
  691. /// and concatenates the final VCF. Karyotype-aware calling ensures X/Y chromosomes
  692. /// are handled correctly based on sample sex.
  693. ///
  694. /// # Arguments
  695. ///
  696. /// * `id` - Sample identifier
  697. /// * `time_point` - Time point label ("norm" or "diag")
  698. /// * `config` - Global pipeline configuration
  699. /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
  700. ///
  701. /// # Returns
  702. ///
  703. /// `Ok(())` on success, or an error if any step fails.
  704. ///
  705. /// # Errors
  706. ///
  707. /// Returns an error if:
  708. /// - `n_parts` is 0
  709. /// - `time_point` is invalid (not matching `config.normal_name` or `config.tumoral_name`)
  710. /// - BAM file cannot be opened or is corrupted
  711. /// - BAM header is malformed
  712. /// - Karyotype detection fails
  713. /// - DeepVariant execution fails on any part
  714. /// - PASS filtering fails
  715. /// - VCF merging fails
  716. ///
  717. /// # Example
  718. ///
  719. /// ```ignore
  720. /// let config = Config::default();
  721. /// run_deepvariant_chunked("sample_001", "norm", &config, 30)?;
  722. /// ```
  723. pub fn run_deepvariant_chunked(
  724. id: &str,
  725. time_point: &str,
  726. config: &Config,
  727. n_parts: usize,
  728. ) -> anyhow::Result<()> {
  729. anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
  730. // let lock_dir = format!("{}/locks", config.result_dir);
  731. // let _lock = SampleLock::acquire(&lock_dir, id, "deepvariant")
  732. // .with_context(|| format!("Cannot start DeepVariant chunked for {}", id))?;
  733. let base = DeepVariant::initialize(id, time_point, config)?;
  734. if !base.should_run() {
  735. debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
  736. return Ok(());
  737. }
  738. let bam_path = config.solo_bam(id, time_point);
  739. let reader = bam::Reader::from_path(&bam_path)
  740. .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
  741. let header = bam::Header::from_template(reader.header());
  742. let genome_sizes = get_genome_sizes(&header)?;
  743. let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
  744. .into_iter()
  745. .map(|v| v.join(" "))
  746. .collect::<Vec<String>>();
  747. let actual_n_parts = region_chunks.len();
  748. info!(
  749. "Running DeepVariant in {} parallel parts for {} {}",
  750. actual_n_parts, id, time_point
  751. );
  752. let mut jobs = Vec::with_capacity(actual_n_parts);
  753. for (i, regions) in region_chunks.into_iter().enumerate() {
  754. let mut job = base.clone();
  755. job.regions = regions;
  756. job.part_index = Some(i + 1);
  757. job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
  758. info!("Planned DeepVariant job:\n{job}");
  759. jobs.push(job);
  760. }
  761. // Run all DeepVariant jobs
  762. let outputs = run_many!(config, jobs.clone())?;
  763. for output in outputs.iter() {
  764. output.save_to_file(format!("{}/deepvariant_", base.log_dir))?;
  765. }
  766. // Filter PASS variants for each part
  767. info!(
  768. "Filtering PASS variants for all {} parts in parallel",
  769. actual_n_parts
  770. );
  771. let filter_jobs: Vec<_> = jobs
  772. .iter()
  773. .map(|job| {
  774. BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
  775. })
  776. .collect();
  777. run_many!(config, filter_jobs)?;
  778. // Merge PASS VCFs
  779. merge_deepvariant_parts(&base, actual_n_parts)?;
  780. info!(
  781. "DeepVariant completed for {} {}: {} parts merged",
  782. id, time_point, actual_n_parts
  783. );
  784. Ok(())
  785. }
  786. #[cfg(test)]
  787. mod tests {
  788. use crate::helpers::test_init;
  789. use super::*;
  790. #[test]
  791. fn deepvariant_version() -> anyhow::Result<()> {
  792. test_init();
  793. let vl = DeepVariant::version(&Config::default())?;
  794. info!("DeepVariant local version: {vl}");
  795. let vs = DeepVariant::version_slurm(&Config::default())?;
  796. info!("DeepVariant slurm version: {vs}");
  797. assert_eq!(vl, vs);
  798. Ok(())
  799. }
  800. #[test]
  801. fn deepvariant_run() -> anyhow::Result<()> {
  802. test_init();
  803. let config = Config::default();
  804. for id in ["DUMCO", "CHAHA"] {
  805. run_deepvariant_chunked(id, "diag", &config, 30)?;
  806. }
  807. Ok(())
  808. }
  809. }