| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883 |
- //! # DeepVariant Variant Calling Pipeline
- //!
- //! This module provides a pipeline runner for [DeepVariant](https://github.com/google/deepvariant),
- //! a deep learning-based variant caller.
- //!
- //! ## Overview
- //!
- //! DeepVariant performs variant calling on a single sample BAM file using:
- //!
- //! - CNN-based variant classification
- //! - Haploid calling for sex chromosomes (configurable by karyotype)
- //! - Containerized execution via Singularity/Apptainer
- //!
- //! ## Key Features
- //!
- //! - **Deep learning-based** - CNN models trained on diverse sequencing platforms
- //! - **Karyotype-aware** - Automatically calls X/Y as haploid based on sample sex
- //! - **Solo mode** - Single-sample germline variant calling
- //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina
- //! - **Parallel execution** - Genome chunking for HPC scalability
- //!
- //! ## Requirements
- //!
- //! Before running DeepVariant, ensure:
- //! - BAM file is indexed (`.bai` file present)
- //! - Reference genome is accessible
- //! - Singularity/Docker image is available
- //! - Model type matches sequencing platform (`config.deepvariant_model_type`)
- //!
- //! ## Execution Modes
- //!
- //! Execution mode is automatically selected via `config.slurm_runner`:
- //!
- //! - **Local** — Single-node execution
- //! - **Slurm** — HPC job submission
- //!
- //! Both modes support chunked parallel execution via [`run_deepvariant_chunked_with_merge`].
- //!
- //! ## Output Files
- //!
- //! PASS-filtered variants:
- //! ```text
- //! {result_dir}/{id}/deepvariant/{id}_{time_point}_DeepVariant.pass.vcf.gz
- //! ```
- //!
- //! ## Usage
- //!
- //! ### Chunked Parallel Execution (Recommended for WGS)
- //!
- //! ```ignore
- //! use pandora_lib_promethion::callers::deep_variant::run_deepvariant_chunked_with_merge;
- //! use pandora_lib_promethion::config::Config;
- //!
- //! let config = Config::default();
- //! // Run DeepVariant in 30 parallel chunks for "norm" time point
- //! let outputs = run_deepvariant_chunked_with_merge("sample_001", "norm", &config, 30)?;
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ### Single-Run Execution
- //!
- //! ```ignore
- //! use pandora_lib_promethion::callers::deep_variant::DeepVariant;
- //! use pandora_lib_promethion::config::Config;
- //! use pandora_lib_promethion::pipes::InitializeSolo;
- //! use pandora_lib_promethion::runners::Run;
- //!
- //! let config = Config::default();
- //! let mut caller = DeepVariant::initialize("sample_001", "norm", &config)?;
- //!
- //! if caller.should_run() {
- //! caller.run()?;
- //! }
- //!
- //! // Load variants
- //! let variants = caller.variants(&annotations)?;
- //! println!("Found {} germline variants", variants.variants.len());
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ## References
- //!
- //! - [DeepVariant GitHub repository](https://github.com/google/deepvariant)
- //! - [DeepVariant publication (Nature Biotechnology, 2018)](https://doi.org/10.1038/nbt.4235)
- use anyhow::Context;
- use log::{debug, info};
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use regex::Regex;
- use rust_htslib::bam::{self, Read};
- use std::{
- fmt, fs,
- path::{Path, PathBuf},
- process::{Command as ProcessCommand, Stdio},
- };
- use uuid::Uuid;
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
- collection::{
- bam_stats::{Karyotype, WGSBamStats},
- vcf::Vcf,
- },
- commands::{
- bcftools::{BcftoolsConcat, BcftoolsKeepPass},
- LocalBatchRunner, SlurmRunner,
- },
- config::Config,
- helpers::{
- get_genome_sizes, is_file_older, remove_dir_if_exists, singularity_bind_flags,
- split_genome_into_n_regions_exact,
- },
- io::vcf::read_vcf,
- pipes::{InitializeSolo, ShouldRun, Version},
- run, run_many,
- runners::Run,
- variant::{
- variant_collection::VariantCollection,
- vcf_variant::{Label, Variants},
- },
- };
- use crate::commands::{
- Command as JobCommand, // your trait
- LocalRunner,
- SbatchRunner,
- SlurmParams,
- };
- /// DeepVariant solo (single-sample) variant caller.
- ///
- /// Executes DeepVariant for germline variant calling on a single BAM file.
- /// Supports karyotype-aware calling for accurate sex chromosome genotyping.
- ///
- /// # Fields
- ///
- /// - `id` - Sample identifier (e.g., "34528")
- /// - `time_point` - Time point label: typically `config.normal_name` ("norm") or `config.tumoral_name` ("diag")
- /// - `regions` - Genomic regions to process (e.g., "chr1 chr2 chr3" or "chr1:1-1000000")
- /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepvariant")
- /// - `config` - Global pipeline configuration
- /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
- /// - `karyotype` - Sex karyotype for haploid calling on X/Y chromosomes (XX or XY)
- ///
- /// # Execution Modes
- ///
- /// - **Local** - Direct execution via `run_local()`
- /// - **Slurm** - Single job submission via `run_sbatch()`
- /// - **Chunked** - Parallel genome-wide execution via [`run_deepvariant_chunked_with_merge`]
- ///
- /// # Karyotype-Aware Calling
- ///
- /// DeepVariant automatically adjusts ploidy based on `karyotype`:
- /// - **XY karyotype**: chrX and chrY called as haploid
- /// - **XX karyotype**: chrX called as diploid, chrY skipped
- #[derive(Debug, Clone)]
- pub struct DeepVariant {
- /// Sample identifier
- pub id: String,
- /// Time point identifier (e.g., "norm" or "diag")
- pub time_point: String,
- /// Space-separated list of genomic regions to process
- pub regions: String,
- /// Directory for log file storage
- pub log_dir: String,
- /// Global pipeline configuration
- pub config: Config,
- /// Optional part index for chunked parallel runs (1-indexed)
- pub part_index: Option<usize>,
- /// Sex karyotype for haploid contig handling (XX or XY)
- pub karyotype: Karyotype,
- }
- impl fmt::Display for DeepVariant {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "🧬 DeepVariant Solo")?;
- writeln!(f, " Case ID : {}", self.id)?;
- writeln!(f, " Time point : {}", self.time_point)?;
- writeln!(f, " Regions : {}", self.regions)?;
- writeln!(f, " Log dir : {}", self.log_dir)?;
- writeln!(
- f,
- " Part : {}",
- self.part_index
- .map_or("full".into(), |n| format!("part{n}"))
- )?;
- writeln!(f, " Karyotype : {}", self.karyotype)
- }
- }
- impl InitializeSolo for DeepVariant {
- /// Initializes a DeepVariant runner for a specific sample and time point.
- ///
- /// Creates necessary directory structures and optionally cleans up previous runs
- /// when [`Config::deepvariant_force`] is set.
- ///
- /// # Arguments
- ///
- /// * `id` - Sample identifier
- /// * `time_point` - Either normal or tumor label for this run
- /// * `config` - Global pipeline configuration
- ///
- /// # Returns
- ///
- /// A configured [`DeepVariant`] instance ready for execution.
- ///
- /// # Errors
- ///
- /// Returns an error if directory cleanup fails when force mode is enabled.
- fn initialize(id: &str, time_point: &str, config: &Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- let time_point = time_point.to_string();
- // Validate time_point matches configured names
- anyhow::ensure!(
- time_point == config.normal_name || time_point == config.tumoral_name,
- "Invalid time_point '{}': must be either '{}' (normal) or '{}' (tumor)",
- time_point,
- config.normal_name,
- config.tumoral_name
- );
- let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?;
- info!("Initializing DeepVariant for {id} {time_point}.");
- let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
- let regions = (1..=22)
- .map(|i| format!("chr{i}"))
- .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
- .collect::<Vec<String>>();
- let deepvariant = Self {
- id,
- time_point,
- log_dir,
- config: config.clone(),
- regions: regions.join(" "),
- part_index: None,
- karyotype,
- };
- if deepvariant.config.deepvariant_force {
- remove_dir_if_exists(
- &deepvariant
- .config
- .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point),
- )?;
- }
- Ok(deepvariant)
- }
- }
- impl ShouldRun for DeepVariant {
- /// Determines whether DeepVariant should be (re)run.
- ///
- /// Compares the modification time of the input BAM against the output
- /// PASS-filtered VCF. Returns `true` if:
- /// - The output VCF does not exist
- /// - The output VCF is older than the input BAM
- ///
- /// # Implementation Note
- ///
- /// This is a conservative check—any error during timestamp comparison
- /// results in `true` (i.e., re-run).
- fn should_run(&self) -> bool {
- let passed_vcf = self
- .config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
- let bam = self.config.solo_bam(&self.id, &self.time_point);
- let result = is_file_older(&passed_vcf, &bam, true).unwrap_or(true);
- if result {
- info!(
- "DeepVariant should run for: {} {}.",
- self.id, self.time_point
- );
- }
- result
- }
- }
- impl JobCommand for DeepVariant {
- /// Initializes output and log directories for this run.
- ///
- /// # Errors
- ///
- /// Returns an error if directory creation fails.
- fn init(&mut self) -> anyhow::Result<()> {
- // Part-specific output dir
- let output_dir = self.part_output_dir();
- fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?;
- // log dir
- fs::create_dir_all(&self.log_dir)
- .context(format!("Failed to create dir: {}", self.log_dir))?;
- Ok(())
- }
- /// Generates the DeepVariant command string for Singularity execution.
- ///
- /// The command binds the output directory and runs DeepVariant with:
- /// - Configured model type
- /// - Haploid calling for sex chromosomes
- /// - VCF stats report generation
- fn cmd(&self) -> String {
- let bam = self.config.solo_bam(&self.id, &self.time_point);
- let output_dir = self.part_output_dir();
- let output_vcf_path = self.output_vcf_path();
- // Part-specific log subdir to avoid collision
- let log_subdir = match self.part_index {
- Some(idx) => format!("{}_{}_DeepVariant_part{idx}_logs", self.id, self.time_point),
- None => format!("{}_{}_DeepVariant_logs", self.id, self.time_point),
- };
- let sample_name = format!("{}_{}", self.id, self.time_point);
- let output_vcf = format!(
- "/output/{}",
- Path::new(&output_vcf_path)
- .file_name()
- .unwrap()
- .to_string_lossy()
- );
- // Build haploid_contigs flag only for XY karyotype
- let haploid_flag = match self.karyotype {
- Karyotype::XY => "--haploid_contigs='chrX,chrY' \\",
- Karyotype::XX => "", // No haploid contigs for XX
- };
- let bind_flags = singularity_bind_flags([
- &bam,
- &self.config.reference,
- &self.config.pseudoautosomal_regions_bed,
- &output_dir,
- &self.log_dir,
- &self.config.tmp_dir,
- ]);
- format!(
- "{singularity_bin} exec \
- {binds} \
- --bind {output_dir}:/output \
- {image} \
- /opt/deepvariant/bin/run_deepvariant \
- --model_type={model_type} \
- --ref={reference} \
- --reads={bam} \
- --regions=\"{regions}\" \
- {haploid_flag} \
- --par_regions_bed={par_bed} \
- --output_vcf={output_vcf} \
- --num_shards={threads} \
- --vcf_stats_report=true \
- --postprocess_cpus={postprocess_cpus} \
- --logging_dir=/output/{log_dir} \
- --dry_run=false \
- --sample_name={sample_name}",
- singularity_bin = self.config.singularity_bin,
- binds = bind_flags,
- output_dir = output_dir,
- image = self.config.deepvariant_image,
- model_type = self.config.deepvariant_model_type,
- reference = self.config.reference,
- bam = bam,
- regions = self.regions,
- par_bed = self.config.pseudoautosomal_regions_bed,
- // we mount output_dir as /output; just rewrite path here:
- output_vcf = output_vcf,
- threads = self.config.deepvariant_threads,
- log_dir = log_subdir,
- postprocess_cpus = self.config.deepvariant_threads,
- sample_name = sample_name,
- )
- }
- }
- impl LocalRunner for DeepVariant {}
- impl LocalBatchRunner for DeepVariant {}
- impl SlurmRunner for DeepVariant {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
- cpus_per_task: Some(self.config.deepvariant_threads.into()),
- mem: Some("40G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- .to_args()
- }
- }
- impl SbatchRunner for DeepVariant {
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams {
- job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
- cpus_per_task: Some(self.config.deepvariant_threads.into()),
- mem: Some("40G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- }
- }
- impl DeepVariant {
- /// Returns the per-part output directory.
- ///
- /// For partitioned runs, creates an isolated subdirectory to prevent
- /// intermediate file collisions between parallel jobs.
- fn part_output_dir(&self) -> String {
- let base_dir = self
- .config
- .deepvariant_output_dir(&self.id, &self.time_point);
- match self.part_index {
- Some(idx) => format!("{base_dir}/part{idx}"),
- None => base_dir,
- }
- }
- /// Returns the output VCF path for this run.
- ///
- /// For partitioned runs, the file is placed in a part-specific subdirectory.
- fn output_vcf_path(&self) -> String {
- let output_dir = self.part_output_dir();
- format!(
- "{output_dir}/{}_{}_DeepVariant.vcf.gz",
- self.id, self.time_point
- )
- }
- /// Returns the PASS-filtered VCF path.
- ///
- /// - For partitioned runs: part-specific path for intermediate PASS VCF
- /// - For single runs or final output: canonical path from config
- ///
- /// The final merged PASS VCF always uses the canonical path regardless
- /// of whether it was produced by a single run or merged from parts.
- fn passed_vcf_path(&self) -> String {
- match self.part_index {
- Some(_) => {
- // Part runs: intermediate PASS VCF in part-specific directory
- self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz")
- }
- None => {
- // Single run or final merged output: canonical path
- self.config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point)
- }
- }
- }
- /// Filters variants to keep only PASS entries.
- fn filter_pass(&self) -> anyhow::Result<()> {
- let output_vcf = self.output_vcf_path();
- let vcf_passed = self.passed_vcf_path();
- info!(
- "Filtering PASS variants for {} {} (part: {:?})",
- self.id, self.time_point, self.part_index
- );
- let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
- let report = run!(&self.config, &mut cmd)?;
- report
- .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
- .context("Can't save bcftools PASS logs")?;
- Ok(())
- }
- }
- impl Run for DeepVariant {
- fn run(&mut self) -> anyhow::Result<()> {
- if !self.should_run() {
- info!("DeepVariant is up-to-data.");
- return Ok(());
- }
- if self.config.slurm_runner {
- run_deepvariant_chunked(&self.id, &self.time_point, &self.config, 30)
- } else {
- run!(&self.config, self)?;
- self.filter_pass()
- }
- }
- }
- impl CallerCat for DeepVariant {
- /// Returns the caller annotation category based on time point.
- ///
- /// Maps the time point to either [`Sample::SoloConstit`] (normal) or
- /// [`Sample::SoloTumor`] (tumoral).
- ///
- /// # Safety
- ///
- /// The time_point is validated during initialization, so this can never fail.
- /// If it does, it indicates a serious logic error in the code.
- fn caller_cat(&self) -> Annotation {
- let Config {
- normal_name,
- tumoral_name,
- ..
- } = &self.config;
- if *normal_name == self.time_point {
- Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit)
- } else if *tumoral_name == self.time_point {
- Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor)
- } else {
- // SAFETY: time_point is validated in initialize() to be either normal_name or tumoral_name.
- // If we reach here, it's a logic error in the code, not a user error.
- unreachable!(
- "Invalid time_point '{}': expected '{}' or '{}'. This should have been caught during initialization.",
- self.time_point, normal_name, tumoral_name
- )
- }
- }
- }
- impl Variants for DeepVariant {
- /// Loads variants from the DeepVariant PASS-filtered VCF.
- ///
- /// Reads the VCF, registers each variant in the annotation store with
- /// the appropriate caller tag, and returns a [`VariantCollection`].
- ///
- /// # Arguments
- ///
- /// * `annotations` - Mutable annotation registry for tagging variants
- ///
- /// # Returns
- ///
- /// A [`VariantCollection`] containing:
- /// - Parsed variant records
- /// - Source VCF metadata
- /// - Caller identification
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - VCF file cannot be read or parsed
- /// - VCF metadata extraction fails
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let vcf_passed = self
- .config
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point);
- info!("Loading variants from {caller}: {vcf_passed}");
- let variants = read_vcf(&vcf_passed)
- .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
- });
- info!("{}, {} variants loaded.", caller, variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| {
- anyhow::anyhow!(
- "Error while creating a VCF representation for {}.\n{e}",
- vcf_passed
- )
- })?,
- caller,
- })
- }
- }
- impl Label for DeepVariant {
- /// Returns a string label for this caller (e.g., "DeepVariant_SoloConstit").
- fn label(&self) -> String {
- self.caller_cat().to_string()
- }
- }
- impl Version for DeepVariant {
- /// Retrieves the DeepVariant version via local Singularity execution.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - Singularity execution fails
- /// - Version string cannot be parsed from output
- fn version(config: &Config) -> anyhow::Result<String> {
- let out = ProcessCommand::new("bash")
- .arg("-c")
- .arg(format!(
- "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
- config.singularity_bin, config.deepvariant_image
- ))
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .output()
- .context("failed to spawn singularity")?;
- if !out.status.success() {
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
- log.push_str(&String::from_utf8_lossy(&out.stderr));
- anyhow::bail!("singularity exec failed: {}\n{}", out.status, log);
- }
- let combined = format!(
- "{}{}",
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr)
- );
- parse_deepvariant_version(&combined)
- }
- /// Retrieves the DeepVariant version via Slurm job submission.
- ///
- /// Useful when local GPU access is unavailable but Slurm has GPU nodes.
- ///
- /// # Errors
- ///
- /// Returns an error if job submission fails or version cannot be parsed.
- fn version_slurm(config: &Config) -> anyhow::Result<String> {
- // Minimal Slurm job just to run the version command
- struct DeepVariantVersionJob<'a> {
- config: &'a Config,
- }
- impl<'a> JobCommand for DeepVariantVersionJob<'a> {
- fn cmd(&self) -> String {
- format!(
- "{} exec {} /opt/deepvariant/bin/run_deepvariant --version",
- self.config.singularity_bin, self.config.deepvariant_image
- )
- }
- }
- impl<'a> SlurmRunner for DeepVariantVersionJob<'a> {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("deepvariant_version".into()),
- partition: Some("gpgpuq".into()),
- cpus_per_task: Some(1),
- mem: Some("1G".into()),
- gres: None,
- }
- .to_args()
- }
- }
- let mut job = DeepVariantVersionJob { config };
- let out =
- SlurmRunner::exec(&mut job).context("failed to run DeepVariant --version via Slurm")?;
- // Combine stdout, Slurm epilog (if any), and stderr for parsing
- let mut combined = out.stdout.clone();
- if let Some(epilog) = &out.slurm_epilog {
- combined.push_str(&format!("{epilog}"));
- }
- combined.push_str(&out.stderr);
- parse_deepvariant_version(&combined)
- }
- }
- /// Parses the DeepVariant version from command output.
- ///
- /// # Arguments
- ///
- /// * `output` - Combined stdout/stderr from DeepVariant --version
- ///
- /// # Returns
- ///
- /// The version string (e.g., "1.6.1")
- ///
- /// # Errors
- ///
- /// Returns an error if the version pattern is not found.
- fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
- let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid");
- let caps = re
- .captures(output)
- .context("Could not parse DeepVariant version from output")?;
- Ok(caps
- .get(1)
- .expect("Regex has capture group 1")
- .as_str()
- .to_string())
- }
- fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
- let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
- for i in 1..=n_parts {
- let mut dv = base.clone();
- dv.part_index = Some(i);
- let part_pass = dv.passed_vcf_path();
- anyhow::ensure!(
- Path::new(&part_pass).exists(),
- "Missing part {i} PASS VCF: {part_pass}"
- );
- part_pass_paths.push(PathBuf::from(part_pass));
- }
- let final_passed_vcf = base
- .config
- .deepvariant_solo_passed_vcf(&base.id, &base.time_point);
- let rand = Uuid::new_v4();
- let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
- let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
- if let Some(parent) = Path::new(&final_passed_vcf).parent() {
- fs::create_dir_all(parent)?;
- }
- info!(
- "Concatenating {} part VCFs into {}",
- n_parts, final_passed_vcf
- );
- let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
- run!(&base.config, &mut concat)
- .context("Failed to run bcftools concat for DeepVariant parts")?
- .save_to_file(format!("{}/bcftools_concat", base.log_dir))?;
- fs::rename(&final_tmp, &final_passed_vcf)
- .context("Failed to rename merged DeepVariant PASS VCF")?;
- fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
- .context("Failed to rename merged DeepVariant PASS VCF CSI")?;
- info!(
- "Successfully merged {} parts into {}",
- n_parts, final_passed_vcf
- );
- Ok(())
- }
- /// Runs DeepVariant in parallel chunks, then merges results.
- ///
- /// Splits the genome into N equal-sized regions, runs DeepVariant on each region
- /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants,
- /// and concatenates the final VCF. Karyotype-aware calling ensures X/Y chromosomes
- /// are handled correctly based on sample sex.
- ///
- /// # Arguments
- ///
- /// * `id` - Sample identifier
- /// * `time_point` - Time point label ("norm" or "diag")
- /// * `config` - Global pipeline configuration
- /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
- ///
- /// # Returns
- ///
- /// `Ok(())` on success, or an error if any step fails.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - `n_parts` is 0
- /// - `time_point` is invalid (not matching `config.normal_name` or `config.tumoral_name`)
- /// - BAM file cannot be opened or is corrupted
- /// - BAM header is malformed
- /// - Karyotype detection fails
- /// - DeepVariant execution fails on any part
- /// - PASS filtering fails
- /// - VCF merging fails
- ///
- /// # Example
- ///
- /// ```ignore
- /// let config = Config::default();
- /// run_deepvariant_chunked("sample_001", "norm", &config, 30)?;
- /// ```
- pub fn run_deepvariant_chunked(
- id: &str,
- time_point: &str,
- config: &Config,
- n_parts: usize,
- ) -> anyhow::Result<()> {
- anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
- let base = DeepVariant::initialize(id, time_point, config)?;
- if !base.should_run() {
- debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
- return Ok(());
- }
- let bam_path = config.solo_bam(id, time_point);
- let reader = bam::Reader::from_path(&bam_path)
- .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
- let header = bam::Header::from_template(reader.header());
- let genome_sizes = get_genome_sizes(&header)?;
- let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
- .into_iter()
- .map(|v| v.join(" "))
- .collect::<Vec<String>>();
- let actual_n_parts = region_chunks.len();
- info!(
- "Running DeepVariant in {} parallel parts for {} {}",
- actual_n_parts, id, time_point
- );
- let mut jobs = Vec::with_capacity(actual_n_parts);
- for (i, regions) in region_chunks.into_iter().enumerate() {
- let mut job = base.clone();
- job.regions = regions;
- job.part_index = Some(i + 1);
- job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
- info!("Planned DeepVariant job:\n{job}");
- jobs.push(job);
- }
- // Run all DeepVariant jobs
- let outputs = run_many!(config, jobs.clone())?;
- for output in outputs.iter() {
- output.save_to_file(format!("{}/deepvariant_", base.log_dir))?;
- }
- // Filter PASS variants for each part
- info!(
- "Filtering PASS variants for all {} parts in parallel",
- actual_n_parts
- );
- let filter_jobs: Vec<_> = jobs
- .iter()
- .map(|job| {
- BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
- })
- .collect();
- run_many!(config, filter_jobs)?;
- // Merge PASS VCFs
- merge_deepvariant_parts(&base, actual_n_parts)?;
- info!(
- "DeepVariant completed for {} {}: {} parts merged",
- id, time_point, actual_n_parts
- );
- Ok(())
- }
- #[cfg(test)]
- mod tests {
- use crate::helpers::test_init;
- use super::*;
- #[test]
- fn deepvariant_version() -> anyhow::Result<()> {
- test_init();
- let vl = DeepVariant::version(&Config::default())?;
- info!("DeepVariant local version: {vl}");
- let vs = DeepVariant::version_slurm(&Config::default())?;
- info!("DeepVariant slurm version: {vs}");
- assert_eq!(vl, vs);
- Ok(())
- }
- #[test]
- fn deepvariant_run() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- for id in ["DUMCO", "CHAHA"] {
- run_deepvariant_chunked(id, "diag", &config, 30)?;
- }
- Ok(())
- }
- }
|