//! # DeepVariant Variant Calling Pipeline //! //! This module provides a pipeline runner for [DeepVariant](https://github.com/google/deepvariant), //! a deep learning-based variant caller. //! //! ## Overview //! //! DeepVariant performs variant calling on a single sample BAM file using: //! //! - CNN-based variant classification //! - Haploid calling for sex chromosomes (configurable by karyotype) //! - Containerized execution via Singularity/Apptainer //! //! ## Key Features //! //! - **Deep learning-based** - CNN models trained on diverse sequencing platforms //! - **Karyotype-aware** - Automatically calls X/Y as haploid based on sample sex //! - **Solo mode** - Single-sample germline variant calling //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina //! - **Parallel execution** - Genome chunking for HPC scalability //! //! ## Requirements //! //! Before running DeepVariant, ensure: //! - BAM file is indexed (`.bai` file present) //! - Reference genome is accessible //! - Singularity/Docker image is available //! - Model type matches sequencing platform (`config.deepvariant_model_type`) //! //! ## Execution Modes //! //! Execution mode is automatically selected via `config.slurm_runner`: //! //! - **Local** — Single-node execution //! - **Slurm** — HPC job submission //! //! Both modes support chunked parallel execution via [`run_deepvariant_chunked_with_merge`]. //! //! ## Output Files //! //! PASS-filtered variants: //! ```text //! {result_dir}/{id}/deepvariant/{id}_{time_point}_DeepVariant.pass.vcf.gz //! ``` //! //! ## Usage //! //! ### Chunked Parallel Execution (Recommended for WGS) //! //! ```ignore //! use pandora_lib_promethion::callers::deep_variant::run_deepvariant_chunked_with_merge; //! use pandora_lib_promethion::config::Config; //! //! let config = Config::default(); //! // Run DeepVariant in 30 parallel chunks for "norm" time point //! let outputs = run_deepvariant_chunked_with_merge("sample_001", "norm", &config, 30)?; //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ### Single-Run Execution //! //! ```ignore //! use pandora_lib_promethion::callers::deep_variant::DeepVariant; //! use pandora_lib_promethion::config::Config; //! use pandora_lib_promethion::pipes::InitializeSolo; //! use pandora_lib_promethion::runners::Run; //! //! let config = Config::default(); //! let mut caller = DeepVariant::initialize("sample_001", "norm", &config)?; //! //! if caller.should_run() { //! caller.run()?; //! } //! //! // Load variants //! let variants = caller.variants(&annotations)?; //! println!("Found {} germline variants", variants.variants.len()); //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ## References //! //! - [DeepVariant GitHub repository](https://github.com/google/deepvariant) //! - [DeepVariant publication (Nature Biotechnology, 2018)](https://doi.org/10.1038/nbt.4235) use anyhow::Context; use log::{debug, info}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use rust_htslib::bam::{self, Read}; use std::{ fmt, fs, path::{Path, PathBuf}, process::{Command as ProcessCommand, Stdio}, }; use uuid::Uuid; use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::{ bam_stats::{Karyotype, WGSBamStats}, vcf::Vcf, }, commands::{ bcftools::{BcftoolsConcat, BcftoolsKeepPass}, LocalBatchRunner, SlurmRunner, }, config::Config, helpers::{ get_genome_sizes, is_file_older, remove_dir_if_exists, singularity_bind_flags, split_genome_into_n_regions_exact, }, io::vcf::read_vcf, locker::SampleLock, pipes::{InitializeSolo, ShouldRun, Version}, run, run_many, runners::Run, variant::{ variant_collection::VariantCollection, vcf_variant::{Label, Variants}, }, }; use crate::commands::{ Command as JobCommand, // your trait LocalRunner, SbatchRunner, SlurmParams, }; /// DeepVariant solo (single-sample) variant caller. /// /// Executes DeepVariant for germline variant calling on a single BAM file. /// Supports karyotype-aware calling for accurate sex chromosome genotyping. /// /// # Fields /// /// - `id` - Sample identifier (e.g., "34528") /// - `time_point` - Time point label: typically `config.normal_name` ("norm") or `config.tumoral_name` ("diag") /// - `regions` - Genomic regions to process (e.g., "chr1 chr2 chr3" or "chr1:1-1000000") /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepvariant") /// - `config` - Global pipeline configuration /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N) /// - `karyotype` - Sex karyotype for haploid calling on X/Y chromosomes (XX or XY) /// /// # Execution Modes /// /// - **Local** - Direct execution via `run_local()` /// - **Slurm** - Single job submission via `run_sbatch()` /// - **Chunked** - Parallel genome-wide execution via [`run_deepvariant_chunked_with_merge`] /// /// # Karyotype-Aware Calling /// /// DeepVariant automatically adjusts ploidy based on `karyotype`: /// - **XY karyotype**: chrX and chrY called as haploid /// - **XX karyotype**: chrX called as diploid, chrY skipped #[derive(Debug, Clone)] pub struct DeepVariant { /// Sample identifier pub id: String, /// Time point identifier (e.g., "norm" or "diag") pub time_point: String, /// Space-separated list of genomic regions to process pub regions: String, /// Directory for log file storage pub log_dir: String, /// Global pipeline configuration pub config: Config, /// Optional part index for chunked parallel runs (1-indexed) pub part_index: Option, /// Sex karyotype for haploid contig handling (XX or XY) pub karyotype: Karyotype, } impl fmt::Display for DeepVariant { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "🧬 DeepVariant Solo")?; writeln!(f, " Case ID : {}", self.id)?; writeln!(f, " Time point : {}", self.time_point)?; writeln!(f, " Regions : {}", self.regions)?; writeln!(f, " Log dir : {}", self.log_dir)?; writeln!( f, " Part : {}", self.part_index .map_or("full".into(), |n| format!("part{n}")) )?; writeln!(f, " Karyotype : {}", self.karyotype) } } impl InitializeSolo for DeepVariant { /// Initializes a DeepVariant runner for a specific sample and time point. /// /// Creates necessary directory structures and optionally cleans up previous runs /// when [`Config::deepvariant_force`] is set. /// /// # Arguments /// /// * `id` - Sample identifier /// * `time_point` - Either normal or tumor label for this run /// * `config` - Global pipeline configuration /// /// # Returns /// /// A configured [`DeepVariant`] instance ready for execution. /// /// # Errors /// /// Returns an error if directory cleanup fails when force mode is enabled. fn initialize(id: &str, time_point: &str, config: &Config) -> anyhow::Result { let id = id.to_string(); let time_point = time_point.to_string(); // Validate time_point matches configured names anyhow::ensure!( time_point == config.normal_name || time_point == config.tumoral_name, "Invalid time_point '{}': must be either '{}' (normal) or '{}' (tumor)", time_point, config.normal_name, config.tumoral_name ); let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?; info!("Initializing DeepVariant for {id} {time_point}."); let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id); let regions = (1..=22) .map(|i| format!("chr{i}")) .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from)) .collect::>(); let deepvariant = Self { id, time_point, log_dir, config: config.clone(), regions: regions.join(" "), part_index: None, karyotype, }; if deepvariant.config.deepvariant_force { remove_dir_if_exists( &deepvariant .config .deepvariant_output_dir(&deepvariant.id, &deepvariant.time_point), )?; } Ok(deepvariant) } } impl ShouldRun for DeepVariant { /// Determines whether DeepVariant should be (re)run. /// /// Compares the modification time of the input BAM against the output /// PASS-filtered VCF. Returns `true` if: /// - The output VCF does not exist /// - The output VCF is older than the input BAM /// /// # Implementation Note /// /// This is a conservative check—any error during timestamp comparison /// results in `true` (i.e., re-run). fn should_run(&self) -> bool { let passed_vcf = self .config .deepvariant_solo_passed_vcf(&self.id, &self.time_point); let bam = self.config.solo_bam(&self.id, &self.time_point); let result = is_file_older(&passed_vcf, &bam, true).unwrap_or(true); if result { info!( "DeepVariant should run for: {} {}.", self.id, self.time_point ); } result } } impl JobCommand for DeepVariant { /// Initializes output and log directories for this run. /// /// # Errors /// /// Returns an error if directory creation fails. fn init(&mut self) -> anyhow::Result<()> { // Part-specific output dir let output_dir = self.part_output_dir(); fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?; // log dir fs::create_dir_all(&self.log_dir) .context(format!("Failed to create dir: {}", self.log_dir))?; Ok(()) } /// Generates the DeepVariant command string for Singularity execution. /// /// The command binds the output directory and runs DeepVariant with: /// - Configured model type /// - Haploid calling for sex chromosomes /// - VCF stats report generation fn cmd(&self) -> String { let bam = self.config.solo_bam(&self.id, &self.time_point); let output_dir = self.part_output_dir(); let output_vcf_path = self.output_vcf_path(); // Part-specific log subdir to avoid collision let log_subdir = match self.part_index { Some(idx) => format!("{}_{}_DeepVariant_part{idx}_logs", self.id, self.time_point), None => format!("{}_{}_DeepVariant_logs", self.id, self.time_point), }; let sample_name = format!("{}_{}", self.id, self.time_point); let output_vcf = format!( "/output/{}", Path::new(&output_vcf_path) .file_name() .unwrap() .to_string_lossy() ); // Build haploid_contigs flag only for XY karyotype let haploid_flag = match self.karyotype { Karyotype::XY => "--haploid_contigs='chrX,chrY' \\", Karyotype::XX => "", // No haploid contigs for XX }; let bind_flags = singularity_bind_flags([ &bam, &self.config.reference, &self.config.pseudoautosomal_regions_bed, &output_dir, &self.log_dir, &self.config.tmp_dir, ]); format!( "{singularity_bin} exec \ {binds} \ --bind {output_dir}:/output \ {image} \ /opt/deepvariant/bin/run_deepvariant \ --model_type={model_type} \ --ref={reference} \ --reads={bam} \ --regions=\"{regions}\" \ {haploid_flag} \ --par_regions_bed={par_bed} \ --output_vcf={output_vcf} \ --num_shards={threads} \ --vcf_stats_report=true \ --postprocess_cpus={postprocess_cpus} \ --logging_dir=/output/{log_dir} \ --dry_run=false \ --sample_name={sample_name}", singularity_bin = self.config.singularity_bin, binds = bind_flags, output_dir = output_dir, image = self.config.deepvariant_image, model_type = self.config.deepvariant_model_type, reference = self.config.reference, bam = bam, regions = self.regions, par_bed = self.config.pseudoautosomal_regions_bed, // we mount output_dir as /output; just rewrite path here: output_vcf = output_vcf, threads = self.config.deepvariant_threads, log_dir = log_subdir, postprocess_cpus = self.config.deepvariant_threads, sample_name = sample_name, ) } } impl LocalRunner for DeepVariant {} impl LocalBatchRunner for DeepVariant {} impl SlurmRunner for DeepVariant { fn slurm_args(&self) -> Vec { let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default(); SlurmParams { job_name: Some(format!( "deepvariant_{}_{}{}", self.id, self.time_point, batch_id )), cpus_per_task: Some(self.config.deepvariant_threads.into()), mem: Some("40G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } impl SbatchRunner for DeepVariant { fn slurm_params(&self) -> SlurmParams { let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default(); SlurmParams { job_name: Some(format!( "deepvariant_{}_{}{}", self.id, self.time_point, batch_id )), cpus_per_task: Some(self.config.deepvariant_threads.into()), mem: Some("40G".into()), partition: Some("shortq".into()), gres: None, } } } impl DeepVariant { /// Returns the per-part output directory. /// /// For partitioned runs, creates an isolated subdirectory to prevent /// intermediate file collisions between parallel jobs. fn part_output_dir(&self) -> String { let base_dir = self .config .deepvariant_output_dir(&self.id, &self.time_point); match self.part_index { Some(idx) => format!("{base_dir}/part{idx}"), None => base_dir, } } /// Returns the output VCF path for this run. /// /// For partitioned runs, the file is placed in a part-specific subdirectory. fn output_vcf_path(&self) -> String { let output_dir = self.part_output_dir(); format!( "{output_dir}/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point ) } /// Returns the PASS-filtered VCF path. /// /// - For partitioned runs: part-specific path for intermediate PASS VCF /// - For single runs or final output: canonical path from config /// /// The final merged PASS VCF always uses the canonical path regardless /// of whether it was produced by a single run or merged from parts. fn passed_vcf_path(&self) -> String { match self.part_index { Some(_) => { // Part runs: intermediate PASS VCF in part-specific directory self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz") } None => { // Single run or final merged output: canonical path self.config .deepvariant_solo_passed_vcf(&self.id, &self.time_point) } } } /// Filters variants to keep only PASS entries. fn filter_pass(&self) -> anyhow::Result<()> { let output_vcf = self.output_vcf_path(); let vcf_passed = self.passed_vcf_path(); info!( "Filtering PASS variants for {} {} (part: {:?})", self.id, self.time_point, self.part_index ); let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed); let report = run!(&self.config, &mut cmd)?; report .save_to_file(format!("{}/bcftools_pass_", self.log_dir)) .context("Can't save bcftools PASS logs")?; Ok(()) } } impl Run for DeepVariant { fn run(&mut self) -> anyhow::Result<()> { let lock_dir = format!("{}/locks", self.config.result_dir); let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepvariant") .with_context(|| format!("Cannot start DeepVariant chunked for {}", self.id))?; if !self.should_run() { info!("DeepVariant is up-to-data."); return Ok(()); } if self.config.slurm_runner { run_deepvariant_chunked(&self.id, &self.time_point, &self.config, 30) } else { run!(&self.config, self)?; self.filter_pass() } } } impl CallerCat for DeepVariant { /// Returns the caller annotation category based on time point. /// /// Maps the time point to either [`Sample::SoloConstit`] (normal) or /// [`Sample::SoloTumor`] (tumoral). /// /// # Safety /// /// The time_point is validated during initialization, so this can never fail. /// If it does, it indicates a serious logic error in the code. fn caller_cat(&self) -> Annotation { let Config { normal_name, tumoral_name, .. } = &self.config; if *normal_name == self.time_point { Annotation::Callers(Caller::DeepVariant, Sample::SoloConstit) } else if *tumoral_name == self.time_point { Annotation::Callers(Caller::DeepVariant, Sample::SoloTumor) } else { // SAFETY: time_point is validated in initialize() to be either normal_name or tumoral_name. // If we reach here, it's a logic error in the code, not a user error. unreachable!( "Invalid time_point '{}': expected '{}' or '{}'. This should have been caught during initialization.", self.time_point, normal_name, tumoral_name ) } } } impl Variants for DeepVariant { /// Loads variants from the DeepVariant PASS-filtered VCF. /// /// Reads the VCF, registers each variant in the annotation store with /// the appropriate caller tag, and returns a [`VariantCollection`]. /// /// # Arguments /// /// * `annotations` - Mutable annotation registry for tagging variants /// /// # Returns /// /// A [`VariantCollection`] containing: /// - Parsed variant records /// - Source VCF metadata /// - Caller identification /// /// # Errors /// /// Returns an error if: /// - VCF file cannot be read or parsed /// - VCF metadata extraction fails fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let vcf_passed = self .config .deepvariant_solo_passed_vcf(&self.id, &self.time_point); info!("Loading variants from {caller}: {vcf_passed}"); let variants = read_vcf(&vcf_passed) .map_err(|e| anyhow::anyhow!("Failed to read DeepVariant VCF {}.\n{e}", vcf_passed))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{}, {} variants loaded.", caller, variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(vcf_passed.clone().into()).map_err(|e| { anyhow::anyhow!( "Error while creating a VCF representation for {}.\n{e}", vcf_passed ) })?, caller, }) } } impl Label for DeepVariant { /// Returns a string label for this caller (e.g., "DeepVariant_SoloConstit"). fn label(&self) -> String { self.caller_cat().to_string() } } impl Version for DeepVariant { /// Retrieves the DeepVariant version via local Singularity execution. /// /// # Errors /// /// Returns an error if: /// - Singularity execution fails /// - Version string cannot be parsed from output fn version(config: &Config) -> anyhow::Result { let out = ProcessCommand::new("bash") .arg("-c") .arg(format!( "{} exec {} /opt/deepvariant/bin/run_deepvariant --version", config.singularity_bin, config.deepvariant_image )) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .context("failed to spawn singularity")?; if !out.status.success() { let mut log = String::from_utf8_lossy(&out.stdout).to_string(); log.push_str(&String::from_utf8_lossy(&out.stderr)); anyhow::bail!("singularity exec failed: {}\n{}", out.status, log); } let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); parse_deepvariant_version(&combined) } /// Retrieves the DeepVariant version via Slurm job submission. /// /// Useful when local GPU access is unavailable but Slurm has GPU nodes. /// /// # Errors /// /// Returns an error if job submission fails or version cannot be parsed. fn version_slurm(config: &Config) -> anyhow::Result { // Minimal Slurm job just to run the version command struct DeepVariantVersionJob<'a> { config: &'a Config, } impl<'a> JobCommand for DeepVariantVersionJob<'a> { fn cmd(&self) -> String { format!( "{} exec {} /opt/deepvariant/bin/run_deepvariant --version", self.config.singularity_bin, self.config.deepvariant_image ) } } impl<'a> SlurmRunner for DeepVariantVersionJob<'a> { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("deepvariant_version".into()), partition: Some("gpgpuq".into()), cpus_per_task: Some(1), mem: Some("1G".into()), gres: None, } .to_args() } } let mut job = DeepVariantVersionJob { config }; let out = SlurmRunner::exec(&mut job).context("failed to run DeepVariant --version via Slurm")?; // Combine stdout, Slurm epilog (if any), and stderr for parsing let mut combined = out.stdout.clone(); if let Some(epilog) = &out.slurm_epilog { combined.push_str(&format!("{epilog}")); } combined.push_str(&out.stderr); parse_deepvariant_version(&combined) } } /// Parses the DeepVariant version from command output. /// /// # Arguments /// /// * `output` - Combined stdout/stderr from DeepVariant --version /// /// # Returns /// /// The version string (e.g., "1.6.1") /// /// # Errors /// /// Returns an error if the version pattern is not found. fn parse_deepvariant_version(output: &str) -> anyhow::Result { let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid"); let caps = re .captures(output) .context("Could not parse DeepVariant version from output")?; Ok(caps .get(1) .expect("Regex has capture group 1") .as_str() .to_string()) } fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let mut dv = base.clone(); dv.part_index = Some(i); let part_pass = dv.passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing part {i} PASS VCF: {part_pass}" ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base .config .deepvariant_solo_passed_vcf(&base.id, &base.time_point); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} part VCFs into {}", n_parts, final_passed_vcf ); let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp); run!(&base.config, &mut concat) .context("Failed to run bcftools concat for DeepVariant parts")? .save_to_file(format!("{}/bcftools_concat", base.log_dir))?; fs::rename(&final_tmp, &final_passed_vcf) .context("Failed to rename merged DeepVariant PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged DeepVariant PASS VCF CSI")?; info!( "Successfully merged {} parts into {}", n_parts, final_passed_vcf ); Ok(()) } /// Runs DeepVariant in parallel chunks, then merges results. /// /// Splits the genome into N equal-sized regions, runs DeepVariant on each region /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants, /// and concatenates the final VCF. Karyotype-aware calling ensures X/Y chromosomes /// are handled correctly based on sample sex. /// /// # Arguments /// /// * `id` - Sample identifier /// * `time_point` - Time point label ("norm" or "diag") /// * `config` - Global pipeline configuration /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome) /// /// # Returns /// /// `Ok(())` on success, or an error if any step fails. /// /// # Errors /// /// Returns an error if: /// - `n_parts` is 0 /// - `time_point` is invalid (not matching `config.normal_name` or `config.tumoral_name`) /// - BAM file cannot be opened or is corrupted /// - BAM header is malformed /// - Karyotype detection fails /// - DeepVariant execution fails on any part /// - PASS filtering fails /// - VCF merging fails /// /// # Example /// /// ```ignore /// let config = Config::default(); /// run_deepvariant_chunked("sample_001", "norm", &config, 30)?; /// ``` pub fn run_deepvariant_chunked( id: &str, time_point: &str, config: &Config, n_parts: usize, ) -> anyhow::Result<()> { anyhow::ensure!(n_parts > 0, "n_parts must be > 0"); // let lock_dir = format!("{}/locks", config.result_dir); // let _lock = SampleLock::acquire(&lock_dir, id, "deepvariant") // .with_context(|| format!("Cannot start DeepVariant chunked for {}", id))?; let base = DeepVariant::initialize(id, time_point, config)?; if !base.should_run() { debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping."); return Ok(()); } let bam_path = config.solo_bam(id, time_point); let reader = bam::Reader::from_path(&bam_path) .with_context(|| format!("Failed to open BAM: {bam_path}"))?; let header = bam::Header::from_template(reader.header()); let genome_sizes = get_genome_sizes(&header)?; let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts) .into_iter() .map(|v| v.join(" ")) .collect::>(); let actual_n_parts = region_chunks.len(); info!( "Running DeepVariant in {} parallel parts for {} {}", actual_n_parts, id, time_point ); let mut jobs = Vec::with_capacity(actual_n_parts); for (i, regions) in region_chunks.into_iter().enumerate() { let mut job = base.clone(); job.regions = regions; job.part_index = Some(i + 1); job.log_dir = format!("{}/part{}", base.log_dir, i + 1); info!("Planned DeepVariant job:\n{job}"); jobs.push(job); } // Run all DeepVariant jobs let outputs = run_many!(config, jobs.clone())?; for output in outputs.iter() { output.save_to_file(format!("{}/deepvariant_", base.log_dir))?; } // Filter PASS variants for each part info!( "Filtering PASS variants for all {} parts in parallel", actual_n_parts ); let filter_jobs: Vec<_> = jobs .iter() .map(|job| { BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path()) }) .collect(); run_many!(config, filter_jobs)?; // Merge PASS VCFs merge_deepvariant_parts(&base, actual_n_parts)?; info!( "DeepVariant completed for {} {}: {} parts merged", id, time_point, actual_n_parts ); Ok(()) } #[cfg(test)] mod tests { use crate::helpers::test_init; use super::*; #[test] fn deepvariant_version() -> anyhow::Result<()> { test_init(); let vl = DeepVariant::version(&Config::default())?; info!("DeepVariant local version: {vl}"); let vs = DeepVariant::version_slurm(&Config::default())?; info!("DeepVariant slurm version: {vs}"); assert_eq!(vl, vs); Ok(()) } #[test] fn deepvariant_run() -> anyhow::Result<()> { test_init(); let config = Config::default(); for id in ["DUMCO", "CHAHA"] { run_deepvariant_chunked(id, "diag", &config, 30)?; } Ok(()) } }