deep_variant.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. //! # DeepVariant Variant Calling Pipeline
  2. //!
  3. //! This module provides a pipeline runner for [DeepVariant](https://github.com/google/deepvariant),
  4. //! a deep learning-based variant caller.
  5. //!
  6. //! ## Overview
  7. //!
  8. //! DeepVariant performs variant calling on a single sample BAM file using:
  9. //!
  10. //! - CNN-based variant classification
  11. //! - Haploid calling for sex chromosomes (configurable by karyotype)
  12. //! - Containerized execution via Singularity/Apptainer
  13. //!
  14. //! ## Key Features
  15. //!
  16. //! - **Deep learning-based** - CNN models trained on diverse sequencing platforms
  17. //! - **Karyotype-aware** - Automatically calls X/Y as haploid based on sample sex
  18. //! - **Solo mode** - Single-sample germline variant calling
  19. //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina
  20. //! - **Parallel execution** - Genome chunking for HPC scalability
  21. //!
  22. //! ## Requirements
  23. //!
  24. //! Before running DeepVariant, ensure:
  25. //! - BAM file is indexed (`.bai` file present)
  26. //! - Reference genome is accessible
  27. //! - Singularity/Docker image is available
  28. //! - Model type matches sequencing platform (`config.deepvariant_model_type`)
  29. //!
  30. //! ## Execution Modes
  31. //!
  32. //! Execution mode is automatically selected via `config.slurm_runner`:
  33. //!
  34. //! - **Local** — Single-node execution
  35. //! - **Slurm** — HPC job submission
  36. //!
  37. //! Both modes support chunked parallel execution via [`run_deepvariant_chunked_with_merge`].
  38. //!
  39. //! ## Output Files
  40. //!
  41. //! PASS-filtered variants:
  42. //! ```text
  43. //! {result_dir}/{id}/deepvariant/{id}_{time_point}_DeepVariant.pass.vcf.gz
  44. //! ```
  45. //!
  46. //! ## Usage
  47. //!
  48. //! ### Chunked Parallel Execution (Recommended for WGS)
  49. //!
  50. //! ```ignore
  51. //! use pandora_lib_promethion::callers::deep_variant::run_deepvariant_chunked_with_merge;
  52. //! use pandora_lib_promethion::config::Config;
  53. //!
  54. //! let config = Config::default();
  55. //! // Run DeepVariant in 30 parallel chunks for "norm" time point
  56. //! let outputs = run_deepvariant_chunked_with_merge("sample_001", "norm", &config, 30)?;
  57. //! # Ok::<(), anyhow::Error>(())
  58. //! ```
  59. //!
  60. //! ### Single-Run Execution
  61. //!
  62. //! ```ignore
  63. //! use pandora_lib_promethion::callers::deep_variant::DeepVariant;
  64. //! use pandora_lib_promethion::config::Config;
  65. //! use pandora_lib_promethion::pipes::InitializeSolo;
  66. //! use pandora_lib_promethion::runners::Run;
  67. //!
  68. //! let config = Config::default();
  69. //! let mut caller = DeepVariant::initialize("sample_001", "norm", &config)?;
  70. //!
  71. //! if caller.should_run() {
  72. //! caller.run()?;
  73. //! }
  74. //!
  75. //! // Load variants
  76. //! let variants = caller.variants(&annotations)?;
  77. //! println!("Found {} germline variants", variants.variants.len());
  78. //! # Ok::<(), anyhow::Error>(())
  79. //! ```
  80. //!
  81. //! ## References
  82. //!
  83. //! - [DeepVariant GitHub repository](https://github.com/google/deepvariant)
  84. //! - [DeepVariant publication (Nature Biotechnology, 2018)](https://doi.org/10.1038/nbt.4235)
  85. use anyhow::Context;
  86. use log::{debug, info};
  87. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  88. use regex::Regex;
  89. use rust_htslib::bam::{self, Read};
  90. use std::{
  91. fmt, fs,
  92. path::{Path, PathBuf},
  93. process::{Command as ProcessCommand, Stdio},
  94. };
  95. use uuid::Uuid;
  96. use crate::{
  97. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  98. collection::{
  99. bam_stats::{Karyotype, WGSBamStats},
  100. vcf::Vcf,
  101. },
  102. commands::{
  103. bcftools::{BcftoolsConcat, BcftoolsKeepPass},
  104. LocalBatchRunner, SlurmRunner,
  105. },
  106. config::Config,
  107. helpers::{
  108. get_genome_sizes, is_file_older, remove_dir_if_exists, singularity_bind_flags,
  109. split_genome_into_n_regions_exact,
  110. },
  111. io::vcf::read_vcf,
  112. pipes::{InitializeSolo, ShouldRun, Version},
  113. run, run_many,
  114. runners::Run,
  115. variant::{
  116. variant_collection::VariantCollection,
  117. vcf_variant::{Label, Variants},
  118. },
  119. };
  120. use crate::commands::{
  121. Command as JobCommand, // your trait
  122. LocalRunner,
  123. SbatchRunner,
  124. SlurmParams,
  125. };
  126. /// DeepVariant solo (single-sample) variant caller.
  127. ///
  128. /// Executes DeepVariant for germline variant calling on a single BAM file.
  129. /// Supports karyotype-aware calling for accurate sex chromosome genotyping.
  130. ///
  131. /// # Fields
  132. ///
  133. /// - `id` - Sample identifier (e.g., "34528")
  134. /// - `time_point` - Time point label: typically `config.normal_name` ("norm") or `config.tumoral_name` ("diag")
  135. /// - `regions` - Genomic regions to process (e.g., "chr1 chr2 chr3" or "chr1:1-1000000")
  136. /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepvariant")
  137. /// - `config` - Global pipeline configuration
  138. /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
  139. /// - `karyotype` - Sex karyotype for haploid calling on X/Y chromosomes (XX or XY)
  140. ///
  141. /// # Execution Modes
  142. ///
  143. /// - **Local** - Direct execution via `run_local()`
  144. /// - **Slurm** - Single job submission via `run_sbatch()`
  145. /// - **Chunked** - Parallel genome-wide execution via [`run_deepvariant_chunked_with_merge`]
  146. ///
  147. /// # Karyotype-Aware Calling
  148. ///
  149. /// DeepVariant automatically adjusts ploidy based on `karyotype`:
  150. /// - **XY karyotype**: chrX and chrY called as haploid
  151. /// - **XX karyotype**: chrX called as diploid, chrY skipped
  152. #[derive(Debug, Clone)]
  153. pub struct DeepVariant {
  154. /// Sample identifier
  155. pub id: String,
  156. /// Time point identifier (e.g., "norm" or "diag")
  157. pub time_point: String,
  158. /// Space-separated list of genomic regions to process
  159. pub regions: String,
  160. /// Directory for log file storage
  161. pub log_dir: String,
  162. /// Global pipeline configuration
  163. pub config: Config,
  164. /// Optional part index for chunked parallel runs (1-indexed)
  165. pub part_index: Option<usize>,
  166. /// Sex karyotype for haploid contig handling (XX or XY)
  167. pub karyotype: Karyotype,
  168. }
  169. impl fmt::Display for DeepVariant {
  170. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  171. writeln!(f, "🧬 DeepVariant Solo")?;
  172. writeln!(f, " Case ID : {}", self.id)?;
  173. writeln!(f, " Time point : {}", self.time_point)?;
  174. writeln!(f, " Regions : {}", self.regions)?;
  175. writeln!(f, " Log dir : {}", self.log_dir)?;
  176. writeln!(
  177. f,
  178. " Part : {}",
  179. self.part_index
  180. .map_or("full".into(), |n| format!("part{n}"))
  181. )?;
  182. writeln!(f, " Karyotype : {}", self.karyotype)
  183. }
  184. }
  185. impl InitializeSolo for DeepVariant {
  186. /// Initializes a DeepVariant runner for a specific sample and time point.
  187. ///
  188. /// Creates necessary directory structures and optionally cleans up previous runs
  189. /// when [`Config::deepvariant_force`] is set.
  190. ///
  191. /// # Arguments
  192. ///
  193. /// * `id` - Sample identifier
  194. /// * `time_point` - Either normal or tumor label for this run
  195. /// * `config` - Global pipeline configuration
  196. ///
  197. /// # Returns
  198. ///
  199. /// A configured [`DeepVariant`] instance ready for execution.
  200. ///
  201. /// # Errors
  202. ///
  203. /// Returns an error if directory cleanup fails when force mode is enabled.
  204. fn initialize(id: &str, time_point: &str, config: &Config) -> anyhow::Result<Self> {
  205. let id = id.to_string();
  206. let time_point = time_point.to_string();
  207. // Validate time_point matches configured names
  208. anyhow::ensure!(
  209. time_point == config.normal_name || time_point == config.tumoral_name,
  210. "Invalid time_point '{}': must be either '{}' (normal) or '{}' (tumor)",
  211. time_point,
  212. config.normal_name,
  213. config.tumoral_name
  214. );
  215. let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?;
  216. info!("Initializing DeepVariant for {id} {time_point}.");
  217. let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
  218. let regions = (1..=22)
  219. .map(|i| format!("chr{i}"))
  220. .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
  221. .collect::<Vec<String>>();
  222. let deepvariant = Self {
  223. id,
  224. time_point,
  225. log_dir,
  226. config: config.clone(),
  227. regions: regions.join(" "),
  228. part_index: None,
  229. karyotype,
  230. };
  231. if deepvariant.config.deepvariant_force {
  232. remove_dir_if_exists(
  233. &deepvariant
  234. .config
  235. .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point),
  236. )?;
  237. }
  238. Ok(deepvariant)
  239. }
  240. }
  241. impl ShouldRun for DeepVariant {
  242. /// Determines whether DeepVariant should be (re)run.
  243. ///
  244. /// Compares the modification time of the input BAM against the output
  245. /// PASS-filtered VCF. Returns `true` if:
  246. /// - The output VCF does not exist
  247. /// - The output VCF is older than the input BAM
  248. ///
  249. /// # Implementation Note
  250. ///
  251. /// This is a conservative check—any error during timestamp comparison
  252. /// results in `true` (i.e., re-run).
  253. fn should_run(&self) -> bool {
  254. let passed_vcf = self
  255. .config
  256. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  257. let bam = self.config.solo_bam(&self.id, &self.time_point);
  258. let result = is_file_older(&passed_vcf, &bam, true).unwrap_or(true);
  259. if result {
  260. info!(
  261. "DeepVariant should run for: {} {}.",
  262. self.id, self.time_point
  263. );
  264. }
  265. result
  266. }
  267. }
  268. impl JobCommand for DeepVariant {
  269. /// Initializes output and log directories for this run.
  270. ///
  271. /// # Errors
  272. ///
  273. /// Returns an error if directory creation fails.
  274. fn init(&mut self) -> anyhow::Result<()> {
  275. // Part-specific output dir
  276. let output_dir = self.part_output_dir();
  277. fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?;
  278. // log dir
  279. fs::create_dir_all(&self.log_dir)
  280. .context(format!("Failed to create dir: {}", self.log_dir))?;
  281. Ok(())
  282. }
  283. /// Generates the DeepVariant command string for Singularity execution.
  284. ///
  285. /// The command binds the output directory and runs DeepVariant with:
  286. /// - Configured model type
  287. /// - Haploid calling for sex chromosomes
  288. /// - VCF stats report generation
  289. fn cmd(&self) -> String {
  290. let bam = self.config.solo_bam(&self.id, &self.time_point);
  291. let output_dir = self.part_output_dir();
  292. let output_vcf_path = self.output_vcf_path();
  293. // Part-specific log subdir to avoid collision
  294. let log_subdir = match self.part_index {
  295. Some(idx) => format!("{}_{}_DeepVariant_part{idx}_logs", self.id, self.time_point),
  296. None => format!("{}_{}_DeepVariant_logs", self.id, self.time_point),
  297. };
  298. let sample_name = format!("{}_{}", self.id, self.time_point);
  299. let output_vcf = format!(
  300. "/output/{}",
  301. Path::new(&output_vcf_path)
  302. .file_name()
  303. .unwrap()
  304. .to_string_lossy()
  305. );
  306. // Build haploid_contigs flag only for XY karyotype
  307. let haploid_flag = match self.karyotype {
  308. Karyotype::XY => "--haploid_contigs='chrX,chrY' \\",
  309. Karyotype::XX => "", // No haploid contigs for XX
  310. };
  311. let bind_flags = singularity_bind_flags([
  312. &bam,
  313. &self.config.reference,
  314. &self.config.pseudoautosomal_regions_bed,
  315. &output_dir,
  316. &self.log_dir,
  317. &self.config.tmp_dir,
  318. ]);
  319. format!(
  320. "{singularity_bin} exec \
  321. {binds} \
  322. --bind {output_dir}:/output \
  323. {image} \
  324. /opt/deepvariant/bin/run_deepvariant \
  325. --model_type={model_type} \
  326. --ref={reference} \
  327. --reads={bam} \
  328. --regions=\"{regions}\" \
  329. {haploid_flag} \
  330. --par_regions_bed={par_bed} \
  331. --output_vcf={output_vcf} \
  332. --num_shards={threads} \
  333. --vcf_stats_report=true \
  334. --postprocess_cpus={postprocess_cpus} \
  335. --logging_dir=/output/{log_dir} \
  336. --dry_run=false \
  337. --sample_name={sample_name}",
  338. singularity_bin = self.config.singularity_bin,
  339. binds = bind_flags,
  340. output_dir = output_dir,
  341. image = self.config.deepvariant_image,
  342. model_type = self.config.deepvariant_model_type,
  343. reference = self.config.reference,
  344. bam = bam,
  345. regions = self.regions,
  346. par_bed = self.config.pseudoautosomal_regions_bed,
  347. // we mount output_dir as /output; just rewrite path here:
  348. output_vcf = output_vcf,
  349. threads = self.config.deepvariant_threads,
  350. log_dir = log_subdir,
  351. postprocess_cpus = self.config.deepvariant_threads,
  352. sample_name = sample_name,
  353. )
  354. }
  355. }
  356. impl LocalRunner for DeepVariant {}
  357. impl LocalBatchRunner for DeepVariant {}
  358. impl SlurmRunner for DeepVariant {
  359. fn slurm_args(&self) -> Vec<String> {
  360. SlurmParams {
  361. job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
  362. cpus_per_task: Some(self.config.deepvariant_threads.into()),
  363. mem: Some("40G".into()),
  364. partition: Some("shortq".into()),
  365. gres: None,
  366. }
  367. .to_args()
  368. }
  369. }
  370. impl SbatchRunner for DeepVariant {
  371. fn slurm_params(&self) -> SlurmParams {
  372. SlurmParams {
  373. job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
  374. cpus_per_task: Some(self.config.deepvariant_threads.into()),
  375. mem: Some("40G".into()),
  376. partition: Some("shortq".into()),
  377. gres: None,
  378. }
  379. }
  380. }
  381. impl DeepVariant {
  382. /// Returns the per-part output directory.
  383. ///
  384. /// For partitioned runs, creates an isolated subdirectory to prevent
  385. /// intermediate file collisions between parallel jobs.
  386. fn part_output_dir(&self) -> String {
  387. let base_dir = self
  388. .config
  389. .deepvariant_output_dir(&self.id, &self.time_point);
  390. match self.part_index {
  391. Some(idx) => format!("{base_dir}/part{idx}"),
  392. None => base_dir,
  393. }
  394. }
  395. /// Returns the output VCF path for this run.
  396. ///
  397. /// For partitioned runs, the file is placed in a part-specific subdirectory.
  398. fn output_vcf_path(&self) -> String {
  399. let output_dir = self.part_output_dir();
  400. format!(
  401. "{output_dir}/{}_{}_DeepVariant.vcf.gz",
  402. self.id, self.time_point
  403. )
  404. }
  405. /// Returns the PASS-filtered VCF path.
  406. ///
  407. /// - For partitioned runs: part-specific path for intermediate PASS VCF
  408. /// - For single runs or final output: canonical path from config
  409. ///
  410. /// The final merged PASS VCF always uses the canonical path regardless
  411. /// of whether it was produced by a single run or merged from parts.
  412. fn passed_vcf_path(&self) -> String {
  413. match self.part_index {
  414. Some(_) => {
  415. // Part runs: intermediate PASS VCF in part-specific directory
  416. self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz")
  417. }
  418. None => {
  419. // Single run or final merged output: canonical path
  420. self.config
  421. .deepvariant_solo_passed_vcf(&self.id, &self.time_point)
  422. }
  423. }
  424. }
  425. /// Filters variants to keep only PASS entries.
  426. fn filter_pass(&self) -> anyhow::Result<()> {
  427. let output_vcf = self.output_vcf_path();
  428. let vcf_passed = self.passed_vcf_path();
  429. info!(
  430. "Filtering PASS variants for {} {} (part: {:?})",
  431. self.id, self.time_point, self.part_index
  432. );
  433. let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
  434. let report = run!(&self.config, &mut cmd)?;
  435. report
  436. .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
  437. .context("Can't save bcftools PASS logs")?;
  438. Ok(())
  439. }
  440. }
  441. impl Run for DeepVariant {
  442. fn run(&mut self) -> anyhow::Result<()> {
  443. if !self.should_run() {
  444. info!("DeepVariant is up-to-data.");
  445. return Ok(());
  446. }
  447. if self.config.slurm_runner {
  448. run_deepvariant_chunked(&self.id, &self.time_point, &self.config, 30)
  449. } else {
  450. run!(&self.config, self)?;
  451. self.filter_pass()
  452. }
  453. }
  454. }
  455. impl CallerCat for DeepVariant {
  456. /// Returns the caller annotation category based on time point.
  457. ///
  458. /// Maps the time point to either [`Sample::SoloConstit`] (normal) or
  459. /// [`Sample::SoloTumor`] (tumoral).
  460. ///
  461. /// # Safety
  462. ///
  463. /// The time_point is validated during initialization, so this can never fail.
  464. /// If it does, it indicates a serious logic error in the code.
  465. fn caller_cat(&self) -> Annotation {
  466. let Config {
  467. normal_name,
  468. tumoral_name,
  469. ..
  470. } = &self.config;
  471. if *normal_name == self.time_point {
  472. Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit)
  473. } else if *tumoral_name == self.time_point {
  474. Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor)
  475. } else {
  476. // SAFETY: time_point is validated in initialize() to be either normal_name or tumoral_name.
  477. // If we reach here, it's a logic error in the code, not a user error.
  478. unreachable!(
  479. "Invalid time_point '{}': expected '{}' or '{}'. This should have been caught during initialization.",
  480. self.time_point, normal_name, tumoral_name
  481. )
  482. }
  483. }
  484. }
  485. impl Variants for DeepVariant {
  486. /// Loads variants from the DeepVariant PASS-filtered VCF.
  487. ///
  488. /// Reads the VCF, registers each variant in the annotation store with
  489. /// the appropriate caller tag, and returns a [`VariantCollection`].
  490. ///
  491. /// # Arguments
  492. ///
  493. /// * `annotations` - Mutable annotation registry for tagging variants
  494. ///
  495. /// # Returns
  496. ///
  497. /// A [`VariantCollection`] containing:
  498. /// - Parsed variant records
  499. /// - Source VCF metadata
  500. /// - Caller identification
  501. ///
  502. /// # Errors
  503. ///
  504. /// Returns an error if:
  505. /// - VCF file cannot be read or parsed
  506. /// - VCF metadata extraction fails
  507. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  508. let caller = self.caller_cat();
  509. let vcf_passed = self
  510. .config
  511. .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
  512. info!("Loading variants from {caller}: {vcf_passed}");
  513. let variants = read_vcf(&vcf_passed)
  514. .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?;
  515. variants.par_iter().for_each(|v| {
  516. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  517. });
  518. info!("{}, {} variants loaded.", caller, variants.len());
  519. Ok(VariantCollection {
  520. variants,
  521. vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| {
  522. anyhow::anyhow!(
  523. "Error while creating a VCF representation for {}.\n{e}",
  524. vcf_passed
  525. )
  526. })?,
  527. caller,
  528. })
  529. }
  530. }
  531. impl Label for DeepVariant {
  532. /// Returns a string label for this caller (e.g., "DeepVariant_SoloConstit").
  533. fn label(&self) -> String {
  534. self.caller_cat().to_string()
  535. }
  536. }
  537. impl Version for DeepVariant {
  538. /// Retrieves the DeepVariant version via local Singularity execution.
  539. ///
  540. /// # Errors
  541. ///
  542. /// Returns an error if:
  543. /// - Singularity execution fails
  544. /// - Version string cannot be parsed from output
  545. fn version(config: &Config) -> anyhow::Result<String> {
  546. let out = ProcessCommand::new("bash")
  547. .arg("-c")
  548. .arg(format!(
  549. "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
  550. config.singularity_bin, config.deepvariant_image
  551. ))
  552. .stdout(Stdio::piped())
  553. .stderr(Stdio::piped())
  554. .output()
  555. .context("failed to spawn singularity")?;
  556. if !out.status.success() {
  557. let mut log = String::from_utf8_lossy(&out.stdout).to_string();
  558. log.push_str(&String::from_utf8_lossy(&out.stderr));
  559. anyhow::bail!("singularity exec failed: {}\n{}", out.status, log);
  560. }
  561. let combined = format!(
  562. "{}{}",
  563. String::from_utf8_lossy(&out.stdout),
  564. String::from_utf8_lossy(&out.stderr)
  565. );
  566. parse_deepvariant_version(&combined)
  567. }
  568. /// Retrieves the DeepVariant version via Slurm job submission.
  569. ///
  570. /// Useful when local GPU access is unavailable but Slurm has GPU nodes.
  571. ///
  572. /// # Errors
  573. ///
  574. /// Returns an error if job submission fails or version cannot be parsed.
  575. fn version_slurm(config: &Config) -> anyhow::Result<String> {
  576. // Minimal Slurm job just to run the version command
  577. struct DeepVariantVersionJob<'a> {
  578. config: &'a Config,
  579. }
  580. impl<'a> JobCommand for DeepVariantVersionJob<'a> {
  581. fn cmd(&self) -> String {
  582. format!(
  583. "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
  584. self.config.singularity_bin, self.config.deepvariant_image
  585. )
  586. }
  587. }
  588. impl<'a> SlurmRunner for DeepVariantVersionJob<'a> {
  589. fn slurm_args(&self) -> Vec<String> {
  590. SlurmParams {
  591. job_name: Some("deepvariant_version".into()),
  592. partition: Some("gpgpuq".into()),
  593. cpus_per_task: Some(1),
  594. mem: Some("1G".into()),
  595. gres: None,
  596. }
  597. .to_args()
  598. }
  599. }
  600. let mut job = DeepVariantVersionJob { config };
  601. let out =
  602. SlurmRunner::exec(&mut job).context("failed to run DeepVariant --version via Slurm")?;
  603. // Combine stdout, Slurm epilog (if any), and stderr for parsing
  604. let mut combined = out.stdout.clone();
  605. if let Some(epilog) = &out.slurm_epilog {
  606. combined.push_str(&format!("{epilog}"));
  607. }
  608. combined.push_str(&out.stderr);
  609. parse_deepvariant_version(&combined)
  610. }
  611. }
  612. /// Parses the DeepVariant version from command output.
  613. ///
  614. /// # Arguments
  615. ///
  616. /// * `output` - Combined stdout/stderr from DeepVariant --version
  617. ///
  618. /// # Returns
  619. ///
  620. /// The version string (e.g., "1.6.1")
  621. ///
  622. /// # Errors
  623. ///
  624. /// Returns an error if the version pattern is not found.
  625. fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
  626. let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid");
  627. let caps = re
  628. .captures(output)
  629. .context("Could not parse DeepVariant version from output")?;
  630. Ok(caps
  631. .get(1)
  632. .expect("Regex has capture group 1")
  633. .as_str()
  634. .to_string())
  635. }
  636. fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
  637. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  638. for i in 1..=n_parts {
  639. let mut dv = base.clone();
  640. dv.part_index = Some(i);
  641. let part_pass = dv.passed_vcf_path();
  642. anyhow::ensure!(
  643. Path::new(&part_pass).exists(),
  644. "Missing part {i} PASS VCF: {part_pass}"
  645. );
  646. part_pass_paths.push(PathBuf::from(part_pass));
  647. }
  648. let final_passed_vcf = base
  649. .config
  650. .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
  651. let rand = Uuid::new_v4();
  652. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  653. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  654. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  655. fs::create_dir_all(parent)?;
  656. }
  657. info!(
  658. "Concatenating {} part VCFs into {}",
  659. n_parts, final_passed_vcf
  660. );
  661. let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
  662. run!(&base.config, &mut concat)
  663. .context("Failed to run bcftools concat for DeepVariant parts")?
  664. .save_to_file(format!("{}/bcftools_concat", base.log_dir))?;
  665. fs::rename(&final_tmp, &final_passed_vcf)
  666. .context("Failed to rename merged DeepVariant PASS VCF")?;
  667. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  668. .context("Failed to rename merged DeepVariant PASS VCF CSI")?;
  669. info!(
  670. "Successfully merged {} parts into {}",
  671. n_parts, final_passed_vcf
  672. );
  673. Ok(())
  674. }
  675. /// Runs DeepVariant in parallel chunks, then merges results.
  676. ///
  677. /// Splits the genome into N equal-sized regions, runs DeepVariant on each region
  678. /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants,
  679. /// and concatenates the final VCF. Karyotype-aware calling ensures X/Y chromosomes
  680. /// are handled correctly based on sample sex.
  681. ///
  682. /// # Arguments
  683. ///
  684. /// * `id` - Sample identifier
  685. /// * `time_point` - Time point label ("norm" or "diag")
  686. /// * `config` - Global pipeline configuration
  687. /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
  688. ///
  689. /// # Returns
  690. ///
  691. /// `Ok(())` on success, or an error if any step fails.
  692. ///
  693. /// # Errors
  694. ///
  695. /// Returns an error if:
  696. /// - `n_parts` is 0
  697. /// - `time_point` is invalid (not matching `config.normal_name` or `config.tumoral_name`)
  698. /// - BAM file cannot be opened or is corrupted
  699. /// - BAM header is malformed
  700. /// - Karyotype detection fails
  701. /// - DeepVariant execution fails on any part
  702. /// - PASS filtering fails
  703. /// - VCF merging fails
  704. ///
  705. /// # Example
  706. ///
  707. /// ```ignore
  708. /// let config = Config::default();
  709. /// run_deepvariant_chunked("sample_001", "norm", &config, 30)?;
  710. /// ```
  711. pub fn run_deepvariant_chunked(
  712. id: &str,
  713. time_point: &str,
  714. config: &Config,
  715. n_parts: usize,
  716. ) -> anyhow::Result<()> {
  717. anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
  718. let base = DeepVariant::initialize(id, time_point, config)?;
  719. if !base.should_run() {
  720. debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
  721. return Ok(());
  722. }
  723. let bam_path = config.solo_bam(id, time_point);
  724. let reader = bam::Reader::from_path(&bam_path)
  725. .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
  726. let header = bam::Header::from_template(reader.header());
  727. let genome_sizes = get_genome_sizes(&header)?;
  728. let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
  729. .into_iter()
  730. .map(|v| v.join(" "))
  731. .collect::<Vec<String>>();
  732. let actual_n_parts = region_chunks.len();
  733. info!(
  734. "Running DeepVariant in {} parallel parts for {} {}",
  735. actual_n_parts, id, time_point
  736. );
  737. let mut jobs = Vec::with_capacity(actual_n_parts);
  738. for (i, regions) in region_chunks.into_iter().enumerate() {
  739. let mut job = base.clone();
  740. job.regions = regions;
  741. job.part_index = Some(i + 1);
  742. job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
  743. info!("Planned DeepVariant job:\n{job}");
  744. jobs.push(job);
  745. }
  746. // Run all DeepVariant jobs
  747. let outputs = run_many!(config, jobs.clone())?;
  748. for output in outputs.iter() {
  749. output.save_to_file(format!("{}/deepvariant_", base.log_dir))?;
  750. }
  751. // Filter PASS variants for each part
  752. info!(
  753. "Filtering PASS variants for all {} parts in parallel",
  754. actual_n_parts
  755. );
  756. let filter_jobs: Vec<_> = jobs
  757. .iter()
  758. .map(|job| {
  759. BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
  760. })
  761. .collect();
  762. run_many!(config, filter_jobs)?;
  763. // Merge PASS VCFs
  764. merge_deepvariant_parts(&base, actual_n_parts)?;
  765. info!(
  766. "DeepVariant completed for {} {}: {} parts merged",
  767. id, time_point, actual_n_parts
  768. );
  769. Ok(())
  770. }
  771. #[cfg(test)]
  772. mod tests {
  773. use crate::helpers::test_init;
  774. use super::*;
  775. #[test]
  776. fn deepvariant_version() -> anyhow::Result<()> {
  777. test_init();
  778. let vl = DeepVariant::version(&Config::default())?;
  779. info!("DeepVariant local version: {vl}");
  780. let vs = DeepVariant::version_slurm(&Config::default())?;
  781. info!("DeepVariant slurm version: {vs}");
  782. assert_eq!(vl, vs);
  783. Ok(())
  784. }
  785. #[test]
  786. fn deepvariant_run() -> anyhow::Result<()> {
  787. test_init();
  788. let config = Config::default();
  789. for id in ["DUMCO", "CHAHA"] {
  790. run_deepvariant_chunked(id, "diag", &config, 30)?;
  791. }
  792. Ok(())
  793. }
  794. }