|
|
@@ -3,17 +3,20 @@ use log::{debug, info};
|
|
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|
|
use regex::Regex;
|
|
|
use std::{
|
|
|
- fs,
|
|
|
+ fmt, fs,
|
|
|
path::{Path, PathBuf},
|
|
|
process::{Command as ProcessCommand, Stdio},
|
|
|
};
|
|
|
|
|
|
use crate::{
|
|
|
annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
|
|
|
- collection::vcf::Vcf,
|
|
|
+ collection::{
|
|
|
+ bam_stats::{Karyotype, WGSBamStats},
|
|
|
+ vcf::Vcf,
|
|
|
+ },
|
|
|
commands::{
|
|
|
bcftools::{BcftoolsConcat, BcftoolsKeepPass},
|
|
|
- run_many_sbatch, SlurmRunner,
|
|
|
+ SlurmRunner,
|
|
|
},
|
|
|
config::Config,
|
|
|
helpers::{is_file_older, remove_dir_if_exists},
|
|
|
@@ -46,21 +49,6 @@ use crate::commands::{
|
|
|
/// - **Quartered parallel execution**: Via [`run_deepvariant_quartered_sbatch_with_merge`]
|
|
|
/// - **Annotation integration**: Implements [`CallerCat`], [`Variants`], and [`Label`]
|
|
|
///
|
|
|
-/// # Example
|
|
|
-///
|
|
|
-/// ```no_run
|
|
|
-/// use pandora_lib_promethion::callers::{DeepVariant, Config};
|
|
|
-/// use pandora_lib_promethion::pipes::InitializeSolo;
|
|
|
-///
|
|
|
-/// let config = Config::default();
|
|
|
-/// let mut dv = DeepVariant::initialize("SAMPLE001", "normal", config)?;
|
|
|
-///
|
|
|
-/// if dv.should_run() {
|
|
|
-/// dv.run_local()?;
|
|
|
-/// }
|
|
|
-/// # Ok::<(), anyhow::Error>(())
|
|
|
-/// ```
|
|
|
-///
|
|
|
/// # Parallelization Strategy
|
|
|
///
|
|
|
/// For large genomes, use [`run_deepvariant_quartered_sbatch_with_merge`] which:
|
|
|
@@ -88,27 +76,55 @@ pub struct DeepVariant {
|
|
|
///
|
|
|
/// When `Some(n)`, output files are suffixed with `.partN` to avoid collisions.
|
|
|
pub part_index: Option<usize>,
|
|
|
+
|
|
|
+ // Karyotype for haploid contig handling (default: XY)
|
|
|
+ pub karyotype: Karyotype,
|
|
|
+}
|
|
|
+
|
|
|
+impl fmt::Display for DeepVariant {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ writeln!(f, "🧬 DeepVariant Solo")?;
|
|
|
+ writeln!(f, " Case ID : {}", self.id)?;
|
|
|
+ writeln!(f, " Time point : {}", self.time_point)?;
|
|
|
+ writeln!(f, " Regions : {}", self.regions)?;
|
|
|
+ writeln!(f, " Log dir : {}", self.log_dir)?;
|
|
|
+ writeln!(
|
|
|
+ f,
|
|
|
+ " Part : {}",
|
|
|
+ self.part_index
|
|
|
+ .map_or("full".into(), |n| format!("part{n}"))
|
|
|
+ )?;
|
|
|
+ writeln!(f, " Karyotype : {}", self.karyotype)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
impl InitializeSolo for DeepVariant {
|
|
|
- /// Initializes the DeepVariant runner for a specific sample and time point.
|
|
|
- /// If `force` is true or the output is outdated, the previous output directory is removed.
|
|
|
- /// Initializes the DeepVariant runner for a given ID and time point.
|
|
|
+ /// Initializes a DeepVariant runner for a specific sample and time point.
|
|
|
+ ///
|
|
|
+ /// Creates necessary directory structures and optionally cleans up previous runs
|
|
|
+ /// when [`Config::deepvariant_force`] is set.
|
|
|
///
|
|
|
/// # Arguments
|
|
|
- /// * `id` - Sample ID
|
|
|
- /// * `time_point` - Either the normal or tumor label for this run
|
|
|
- /// * `config` - Global configuration object
|
|
|
+ ///
|
|
|
+ /// * `id` - Sample identifier
|
|
|
+ /// * `time_point` - Either normal or tumor label for this run
|
|
|
+ /// * `config` - Global pipeline configuration
|
|
|
///
|
|
|
/// # Returns
|
|
|
- /// A ready-to-run `DeepVariant` instance with proper path resolution.
|
|
|
+ ///
|
|
|
+ /// A configured [`DeepVariant`] instance ready for execution.
|
|
|
///
|
|
|
/// # Errors
|
|
|
- /// Will return an error if directory cleanup fails when forced or outdated.
|
|
|
- fn initialize(id: &str, time_point: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
+ ///
|
|
|
+ /// Returns an error if directory cleanup fails when force mode is enabled.
|
|
|
+ fn initialize(id: &str, time_point: &str, config: &Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
|
let time_point = time_point.to_string();
|
|
|
|
|
|
+ let karyotype = WGSBamStats::open(&id, &time_point, config)?.karyotype()?;
|
|
|
+
|
|
|
+ // WGSBamStats::load_j
|
|
|
+
|
|
|
info!("Initializing DeepVariant for {id} {time_point}.");
|
|
|
|
|
|
let log_dir = format!("{}/{}/log/deepvariant", config.result_dir, &id);
|
|
|
@@ -121,9 +137,10 @@ impl InitializeSolo for DeepVariant {
|
|
|
id,
|
|
|
time_point,
|
|
|
log_dir,
|
|
|
- config,
|
|
|
+ config: config.clone(),
|
|
|
regions: regions.join(","),
|
|
|
part_index: None,
|
|
|
+ karyotype,
|
|
|
};
|
|
|
|
|
|
if deepvariant.config.deepvariant_force {
|
|
|
@@ -139,12 +156,17 @@ impl InitializeSolo for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl ShouldRun for DeepVariant {
|
|
|
- /// Returns true if the DeepVariant PASS VCF doesn't exist or is outdated vs input BAM.
|
|
|
- /// Determines whether DeepVariant should be rerun by comparing input BAM
|
|
|
- /// modification time to the output VCF.
|
|
|
+ /// Determines whether DeepVariant should be (re)run.
|
|
|
///
|
|
|
- /// # Returns
|
|
|
- /// `true` if the output is outdated or missing.
|
|
|
+ /// Compares the modification time of the input BAM against the output
|
|
|
+ /// PASS-filtered VCF. Returns `true` if:
|
|
|
+ /// - The output VCF does not exist
|
|
|
+ /// - The output VCF is older than the input BAM
|
|
|
+ ///
|
|
|
+ /// # Implementation Note
|
|
|
+ ///
|
|
|
+ /// This is a conservative check—any error during timestamp comparison
|
|
|
+ /// results in `true` (i.e., re-run).
|
|
|
fn should_run(&self) -> bool {
|
|
|
let passed_vcf = self
|
|
|
.config
|
|
|
@@ -162,11 +184,14 @@ impl ShouldRun for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl JobCommand for DeepVariant {
|
|
|
+ /// Initializes output and log directories for this run.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if directory creation fails.
|
|
|
fn init(&mut self) -> anyhow::Result<()> {
|
|
|
- // output dir for DeepVariant
|
|
|
- let output_dir = self
|
|
|
- .config
|
|
|
- .deepvariant_output_dir(&self.id, &self.time_point);
|
|
|
+ // Part-specific output dir
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
|
|
|
fs::create_dir_all(&output_dir).context(format!("Failed to create dir: {output_dir}"))?;
|
|
|
|
|
|
@@ -177,13 +202,24 @@ impl JobCommand for DeepVariant {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
+ /// Generates the DeepVariant command string for Singularity execution.
|
|
|
+ ///
|
|
|
+ /// The command binds the output directory and runs DeepVariant with:
|
|
|
+ /// - GPU acceleration (`--nv`)
|
|
|
+ /// - Configured model type
|
|
|
+ /// - Haploid calling for sex chromosomes
|
|
|
+ /// - VCF stats report generation
|
|
|
fn cmd(&self) -> String {
|
|
|
let bam = self.config.solo_bam(&self.id, &self.time_point);
|
|
|
- let output_dir = self
|
|
|
- .config
|
|
|
- .deepvariant_output_dir(&self.id, &self.time_point);
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
let output_vcf_path = self.output_vcf_path();
|
|
|
- let log_subdir = format!("{}_{}_DeepVariant_logs", self.id, self.time_point);
|
|
|
+
|
|
|
+ // Part-specific log subdir to avoid collision
|
|
|
+ let log_subdir = match self.part_index {
|
|
|
+ Some(idx) => format!("{}_{}_DeepVariant_part{idx}_logs", self.id, self.time_point),
|
|
|
+ None => format!("{}_{}_DeepVariant_logs", self.id, self.time_point),
|
|
|
+ };
|
|
|
+
|
|
|
let sample_name = format!("{}_{}", self.id, self.time_point);
|
|
|
let output_vcf = format!(
|
|
|
"/output/{}",
|
|
|
@@ -193,6 +229,12 @@ impl JobCommand for DeepVariant {
|
|
|
.to_string_lossy()
|
|
|
);
|
|
|
|
|
|
+ // Build haploid_contigs flag only for XY karyotype
|
|
|
+ let haploid_flag = match self.karyotype {
|
|
|
+ Karyotype::XY => "--haploid_contigs='chrX,chrY' \\",
|
|
|
+ Karyotype::XX => "", // No haploid contigs for XX
|
|
|
+ };
|
|
|
+
|
|
|
format!(
|
|
|
"{singularity_bin} exec --nv \
|
|
|
--bind /data:/data \
|
|
|
@@ -203,10 +245,13 @@ impl JobCommand for DeepVariant {
|
|
|
--ref={reference} \
|
|
|
--reads={bam} \
|
|
|
--regions='{regions}' \
|
|
|
+ {haploid_flag} \
|
|
|
+ --par_regions_bed={par_bed} \
|
|
|
--haploid_contigs='chrX,chrY' \
|
|
|
--output_vcf={output_vcf} \
|
|
|
--num_shards={threads} \
|
|
|
--vcf_stats_report=true \
|
|
|
+ --postprocess_cpus={postprocess_cpus},
|
|
|
--logging_dir=/output/{log_dir} \
|
|
|
--dry_run=false \
|
|
|
--sample_name={sample_name}",
|
|
|
@@ -217,10 +262,12 @@ impl JobCommand for DeepVariant {
|
|
|
reference = self.config.reference,
|
|
|
bam = bam,
|
|
|
regions = self.regions,
|
|
|
+ par_bed = self.config.pseudoautosomal_regions_bed,
|
|
|
// we mount output_dir as /output; just rewrite path here:
|
|
|
output_vcf = output_vcf,
|
|
|
threads = self.config.deepvariant_threads,
|
|
|
log_dir = log_subdir,
|
|
|
+ postprocess_cpus = self.config.deepvariant_threads,
|
|
|
sample_name = sample_name,
|
|
|
)
|
|
|
}
|
|
|
@@ -231,6 +278,15 @@ impl LocalRunner for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl SbatchRunner for DeepVariant {
|
|
|
+ /// Returns Slurm parameters optimized for GPU-accelerated DeepVariant.
|
|
|
+ ///
|
|
|
+ /// Automatically selects between H100 and A100 GPUs based on availability,
|
|
|
+ /// preferring H100 for better performance.
|
|
|
+ ///
|
|
|
+ /// # Panics
|
|
|
+ ///
|
|
|
+ /// Panics if neither H100 nor A100 GPUs are configured in Slurm.
|
|
|
+ /// TODO: returning `Result`.
|
|
|
fn slurm_params(&self) -> SlurmParams {
|
|
|
let gpu = if let (Some(h100_av), Some(a100_av)) =
|
|
|
(max_gpu_per_node("h100"), max_gpu_per_node("a100"))
|
|
|
@@ -240,7 +296,7 @@ impl SbatchRunner for DeepVariant {
|
|
|
} else if a100_av > 0 {
|
|
|
"a100"
|
|
|
} else {
|
|
|
- "h100" // waiting for free h100
|
|
|
+ "h100" // Queue for H100 if none immediately available
|
|
|
}
|
|
|
} else {
|
|
|
panic!("Are you running slurm with a100 and h100 GPU ?");
|
|
|
@@ -261,11 +317,18 @@ impl SbatchRunner for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl DeepVariant {
|
|
|
+ /// Filters variants to keep only PASS entries using local execution.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if bcftools execution or log saving fails.
|
|
|
fn filter_pass_local(&self) -> anyhow::Result<()> {
|
|
|
let output_vcf = self.output_vcf_path();
|
|
|
let vcf_passed = self.passed_vcf_path();
|
|
|
|
|
|
+ // Skip if already exists (note: should never be the case if Should run was used)
|
|
|
if Path::new(&vcf_passed).exists() {
|
|
|
+ debug!("PASS VCF already exists: {vcf_passed}");
|
|
|
return Ok(());
|
|
|
}
|
|
|
|
|
|
@@ -284,7 +347,21 @@ impl DeepVariant {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- /// Run DeepVariant directly on the local machine.
|
|
|
+ /// Runs DeepVariant locally on the current machine.
|
|
|
+ ///
|
|
|
+ /// Executes the full pipeline:
|
|
|
+ /// 1. Checks if run is necessary via [`ShouldRun::should_run`]
|
|
|
+ /// 2. Runs DeepVariant via Singularity
|
|
|
+ /// 3. Filters to PASS variants only
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ ///
|
|
|
+ /// Captured stdout/stderr from the DeepVariant process, or an empty
|
|
|
+ /// [`CapturedOutput`] if skipped.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if DeepVariant execution or PASS filtering fails.
|
|
|
pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
|
|
|
if !self.should_run() {
|
|
|
debug!(
|
|
|
@@ -306,6 +383,11 @@ impl DeepVariant {
|
|
|
Ok(out)
|
|
|
}
|
|
|
|
|
|
+ /// Filters variants to keep only PASS entries via Slurm.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if Slurm job submission or log saving fails.
|
|
|
fn filter_pass_sbatch(&self) -> anyhow::Result<()> {
|
|
|
let output_vcf = self.output_vcf_path();
|
|
|
let vcf_passed = self.passed_vcf_path();
|
|
|
@@ -329,7 +411,18 @@ impl DeepVariant {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- /// Run DeepVariant via sbatch + tailing slurm-<jobid>.out.
|
|
|
+ /// Runs DeepVariant via Slurm batch submission.
|
|
|
+ ///
|
|
|
+ /// Submits the job and waits for completion (tailing slurm output).
|
|
|
+ /// Automatically runs PASS filtering after variant calling completes.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ ///
|
|
|
+ /// Captured output from the Slurm job, or empty if skipped.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if job submission, execution, or filtering fails.
|
|
|
pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
|
|
|
if !self.should_run() {
|
|
|
debug!(
|
|
|
@@ -351,34 +444,51 @@ impl DeepVariant {
|
|
|
Ok(out)
|
|
|
}
|
|
|
|
|
|
- /// Per-part output VCF path.
|
|
|
- fn output_vcf_path(&self) -> String {
|
|
|
- let output_dir = self
|
|
|
+ /// Returns the per-part output directory.
|
|
|
+ ///
|
|
|
+ /// For partitioned runs, creates an isolated subdirectory to prevent
|
|
|
+ /// intermediate file collisions between parallel jobs.
|
|
|
+ fn part_output_dir(&self) -> String {
|
|
|
+ let base_dir = self
|
|
|
.config
|
|
|
.deepvariant_output_dir(&self.id, &self.time_point);
|
|
|
|
|
|
match self.part_index {
|
|
|
- Some(idx) => format!(
|
|
|
- "{output_dir}/{}_{}_DeepVariant.part{idx}.vcf.gz",
|
|
|
- self.id, self.time_point
|
|
|
- ),
|
|
|
- None => format!(
|
|
|
- "{output_dir}/{}_{}_DeepVariant.vcf.gz",
|
|
|
- self.id, self.time_point
|
|
|
- ),
|
|
|
+ Some(idx) => format!("{base_dir}/part{idx}"),
|
|
|
+ None => base_dir,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Per-part PASS VCF path (for part runs) or final PASS VCF (single run).
|
|
|
+ /// Returns the output VCF path for this run.
|
|
|
+ ///
|
|
|
+ /// For partitioned runs, the file is placed in a part-specific subdirectory.
|
|
|
+ fn output_vcf_path(&self) -> String {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+
|
|
|
+ format!(
|
|
|
+ "{output_dir}/{}_{}_DeepVariant.vcf.gz",
|
|
|
+ self.id, self.time_point
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the PASS-filtered VCF path.
|
|
|
+ ///
|
|
|
+ /// - For partitioned runs: part-specific path for intermediate PASS VCF
|
|
|
+ /// - For single runs or final output: canonical path from config
|
|
|
+ ///
|
|
|
+ /// The final merged PASS VCF always uses the canonical path regardless
|
|
|
+ /// of whether it was produced by a single run or merged from parts.
|
|
|
fn passed_vcf_path(&self) -> String {
|
|
|
match self.part_index {
|
|
|
- Some(idx) => {
|
|
|
- let v = self.output_vcf_path();
|
|
|
- v.replace(".vcf.gz", &format!(".part{idx}.pass.vcf.gz"))
|
|
|
+ Some(_) => {
|
|
|
+ // Part runs: intermediate PASS VCF in part-specific directory
|
|
|
+ self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz")
|
|
|
+ }
|
|
|
+ None => {
|
|
|
+ // Single run or final merged output: canonical path
|
|
|
+ self.config
|
|
|
+ .deepvariant_solo_passed_vcf(&self.id, &self.time_point)
|
|
|
}
|
|
|
- None => self
|
|
|
- .config
|
|
|
- .deepvariant_solo_passed_vcf(&self.id, &self.time_point),
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -391,11 +501,15 @@ impl Run for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl CallerCat for DeepVariant {
|
|
|
- /// Identifies the caller and whether it's for a normal or tumor sample.
|
|
|
- /// Determines the category for variant annotation (normal or tumor).
|
|
|
+ /// Returns the caller annotation category based on time point.
|
|
|
///
|
|
|
- /// # Returns
|
|
|
- /// A variant caller classification used for downstream tagging.
|
|
|
+ /// Maps the time point to either [`Sample::SoloConstit`] (normal) or
|
|
|
+ /// [`Sample::SoloTumor`] (tumoral).
|
|
|
+ ///
|
|
|
+ /// # Panics
|
|
|
+ ///
|
|
|
+ /// Panics if `time_point` doesn't match either configured name.
|
|
|
+ /// Consider returning `Result` for robustness.
|
|
|
fn caller_cat(&self) -> Annotation {
|
|
|
let Config {
|
|
|
normal_name,
|
|
|
@@ -413,14 +527,27 @@ impl CallerCat for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl Variants for DeepVariant {
|
|
|
- /// Loads variants from DeepVariant VCF and registers annotations.
|
|
|
- /// Loads and annotates variants from the DeepVariant PASS-filtered VCF.
|
|
|
+ /// Loads variants from the DeepVariant PASS-filtered VCF.
|
|
|
+ ///
|
|
|
+ /// Reads the VCF, registers each variant in the annotation store with
|
|
|
+ /// the appropriate caller tag, and returns a [`VariantCollection`].
|
|
|
///
|
|
|
/// # Arguments
|
|
|
- /// * `annotations` - A mutable registry to tag loaded variants
|
|
|
+ ///
|
|
|
+ /// * `annotations` - Mutable annotation registry for tagging variants
|
|
|
///
|
|
|
/// # Returns
|
|
|
- /// A `VariantCollection` with variant data, source VCF, and caller identity.
|
|
|
+ ///
|
|
|
+ /// A [`VariantCollection`] containing:
|
|
|
+ /// - Parsed variant records
|
|
|
+ /// - Source VCF metadata
|
|
|
+ /// - Caller identification
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if:
|
|
|
+ /// - VCF file cannot be read or parsed
|
|
|
+ /// - VCF metadata extraction fails
|
|
|
fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = self.caller_cat();
|
|
|
let vcf_passed = self
|
|
|
@@ -447,13 +574,20 @@ impl Variants for DeepVariant {
|
|
|
}
|
|
|
|
|
|
impl Label for DeepVariant {
|
|
|
- /// Returns the string label for this caller.
|
|
|
+ /// Returns a string label for this caller (e.g., "DeepVariant_SoloConstit").
|
|
|
fn label(&self) -> String {
|
|
|
self.caller_cat().to_string()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl Version for DeepVariant {
|
|
|
+ /// Retrieves the DeepVariant version via local Singularity execution.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if:
|
|
|
+ /// - Singularity execution fails
|
|
|
+ /// - Version string cannot be parsed from output
|
|
|
fn version(config: &Config) -> anyhow::Result<String> {
|
|
|
let out = ProcessCommand::new("bash")
|
|
|
.arg("-c")
|
|
|
@@ -472,16 +606,22 @@ impl Version for DeepVariant {
|
|
|
anyhow::bail!("singularity exec failed: {}\n{}", out.status, log);
|
|
|
}
|
|
|
|
|
|
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
|
|
|
- log.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
|
+ let combined = format!(
|
|
|
+ "{}{}",
|
|
|
+ String::from_utf8_lossy(&out.stdout),
|
|
|
+ String::from_utf8_lossy(&out.stderr)
|
|
|
+ );
|
|
|
|
|
|
- let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)")?;
|
|
|
- let caps = re
|
|
|
- .captures(&log)
|
|
|
- .context("could not parse DeepVariant version from output")?;
|
|
|
- Ok(caps.get(1).unwrap().as_str().to_string())
|
|
|
+ parse_deepvariant_version(&combined)
|
|
|
}
|
|
|
|
|
|
+ /// Retrieves the DeepVariant version via Slurm job submission.
|
|
|
+ ///
|
|
|
+ /// Useful when local GPU access is unavailable but Slurm has GPU nodes.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// Returns an error if job submission fails or version cannot be parsed.
|
|
|
fn version_slurm(config: &Config) -> anyhow::Result<String> {
|
|
|
// Minimal Slurm job just to run the version command
|
|
|
struct DeepVariantVersionJob<'a> {
|
|
|
@@ -515,21 +655,65 @@ impl Version for DeepVariant {
|
|
|
SlurmRunner::run(&mut job).context("failed to run DeepVariant --version via Slurm")?;
|
|
|
|
|
|
// Combine stdout, Slurm epilog (if any), and stderr for parsing
|
|
|
- let mut log = out.stdout.clone();
|
|
|
+ let mut combined = out.stdout.clone();
|
|
|
if let Some(epilog) = &out.slurm_epilog {
|
|
|
- log.push_str(&format!("{epilog}"));
|
|
|
+ combined.push_str(&format!("{epilog}"));
|
|
|
}
|
|
|
- log.push_str(&out.stderr);
|
|
|
+ combined.push_str(&out.stderr);
|
|
|
|
|
|
- let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)")?;
|
|
|
- let caps = re
|
|
|
- .captures(&log)
|
|
|
- .context("could not parse DeepVariant version from Slurm output")?;
|
|
|
-
|
|
|
- Ok(caps.get(1).unwrap().as_str().to_string())
|
|
|
+ parse_deepvariant_version(&combined)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// Parses the DeepVariant version from command output.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+///
|
|
|
+/// * `output` - Combined stdout/stderr from DeepVariant --version
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+///
|
|
|
+/// The version string (e.g., "1.6.1")
|
|
|
+///
|
|
|
+/// # Errors
|
|
|
+///
|
|
|
+/// Returns an error if the version pattern is not found.
|
|
|
+fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
|
|
|
+ let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid");
|
|
|
+
|
|
|
+ let caps = re
|
|
|
+ .captures(output)
|
|
|
+ .context("Could not parse DeepVariant version from output")?;
|
|
|
+
|
|
|
+ Ok(caps
|
|
|
+ .get(1)
|
|
|
+ .expect("Regex has capture group 1")
|
|
|
+ .as_str()
|
|
|
+ .to_string())
|
|
|
+}
|
|
|
+
|
|
|
+/// Splits a slice of regions into `n` approximately equal chunks.
|
|
|
+///
|
|
|
+/// Each chunk is joined with commas into a single string suitable for
|
|
|
+/// DeepVariant's `--regions` argument.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+///
|
|
|
+/// * `all_regions` - Slice of region strings (e.g., `["chr1", "chr2", ...]`)
|
|
|
+/// * `n` - Target number of chunks
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+///
|
|
|
+/// A vector of comma-separated region strings. Length may be less than `n`
|
|
|
+/// if there are fewer regions than requested chunks.
|
|
|
+///
|
|
|
+/// # Examples
|
|
|
+///
|
|
|
+/// ```
|
|
|
+/// let regions = ["chr1", "chr2", "chr3", "chr4", "chr5"];
|
|
|
+/// let chunks = split_regions_into_n(®ions, 2);
|
|
|
+/// assert_eq!(chunks, vec!["chr1,chr2,chr3", "chr4,chr5"]);
|
|
|
+/// ```
|
|
|
fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
|
|
|
let len = all_regions.len();
|
|
|
if n == 0 || len == 0 {
|
|
|
@@ -547,60 +731,92 @@ fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
|
|
|
pub fn run_deepvariant_quartered_sbatch_with_merge(
|
|
|
id: &str,
|
|
|
time_point: &str,
|
|
|
- config: Config,
|
|
|
-) -> anyhow::Result<CapturedOutput> {
|
|
|
+ config: &Config,
|
|
|
+) -> anyhow::Result<Vec<CapturedOutput>> {
|
|
|
let base = DeepVariant::initialize(id, time_point, config)?;
|
|
|
|
|
|
// if already up-to-date, nothing to do
|
|
|
if !base.should_run() {
|
|
|
- debug!(
|
|
|
- "DeepVariant PASS VCF already up-to-date for {} {}, skipping.",
|
|
|
- id, time_point
|
|
|
- );
|
|
|
- return Ok(CapturedOutput::default());
|
|
|
+ debug!("DeepVariant PASS VCF already up-to-date for {id} {time_point}, skipping.");
|
|
|
+ return Ok(Vec::new());
|
|
|
}
|
|
|
|
|
|
// 1) split regions into 4 chunks
|
|
|
let all_regions: Vec<&str> = base.regions.split(',').collect();
|
|
|
let region_chunks = split_regions_into_n(&all_regions, 4);
|
|
|
+ let n_parts = region_chunks.len();
|
|
|
|
|
|
- // 2) build jobs
|
|
|
- let mut jobs = Vec::with_capacity(region_chunks.len());
|
|
|
+ // Build parallel jobs
|
|
|
+ let mut jobs = Vec::with_capacity(n_parts);
|
|
|
for (i, regions) in region_chunks.into_iter().enumerate() {
|
|
|
let mut job = base.clone();
|
|
|
job.regions = regions;
|
|
|
job.part_index = Some(i + 1);
|
|
|
job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
|
|
|
+ info!("Planned job:\n{job}");
|
|
|
jobs.push(job);
|
|
|
}
|
|
|
|
|
|
// 3) run them concurrently on Slurm
|
|
|
- let _outputs = run_many_sbatch(jobs)?;
|
|
|
+ // let outputs = run_many_sbatch(jobs)?;
|
|
|
|
|
|
// 4) merge PASS VCFs
|
|
|
- merge_deepvariant_parts(&base, 4)?;
|
|
|
+ // merge_deepvariant_parts(&base, n_parts)?;
|
|
|
|
|
|
// we don’t really have a single merged CapturedOutput;
|
|
|
// return an empty one or synthesize something from `outputs`.
|
|
|
- Ok(CapturedOutput {
|
|
|
- stdout: String::new(),
|
|
|
- stderr: String::new(),
|
|
|
- slurm_epilog: None,
|
|
|
- })
|
|
|
+ // Ok(outputs)
|
|
|
+ Ok(vec![])
|
|
|
}
|
|
|
|
|
|
+/// Merges PASS-filtered VCFs from parallel DeepVariant runs.
|
|
|
+///
|
|
|
+/// Uses `bcftools concat` to combine part VCFs in order, then atomically
|
|
|
+/// renames the result to the final output path. All part directories are
|
|
|
+/// preserved, including VCF stats reports and intermediate files.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+///
|
|
|
+/// * `base` - Base DeepVariant instance (without `part_index`)
|
|
|
+/// * `n_parts` - Number of parts to merge
|
|
|
+///
|
|
|
+/// # Directory Structure After Completion
|
|
|
+///
|
|
|
+/// ```text
|
|
|
+/// {output_dir}/
|
|
|
+/// ├── part1/
|
|
|
+/// │ ├── {id}_{time_point}_DeepVariant.part1.vcf.gz
|
|
|
+/// │ ├── {id}_{time_point}_DeepVariant.part1.pass.vcf.gz
|
|
|
+/// │ ├── {id}_{time_point}_DeepVariant.part1.vcf.gz.visual_report.html
|
|
|
+/// │ └── {id}_{time_point}_DeepVariant_part1_logs/
|
|
|
+/// ├── part2/
|
|
|
+/// │ └── ...
|
|
|
+/// ├── part3/
|
|
|
+/// │ └── ...
|
|
|
+/// ├── part4/
|
|
|
+/// │ └── ...
|
|
|
+/// └── {id}_{time_point}_DeepVariant.pass.vcf.gz # Final merged output
|
|
|
+/// ```
|
|
|
+///
|
|
|
+/// # Errors
|
|
|
+///
|
|
|
+/// Returns an error if:
|
|
|
+/// - Any part VCF is missing
|
|
|
+/// - `bcftools concat` fails
|
|
|
+/// - File rename fails
|
|
|
fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result<()> {
|
|
|
let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
|
|
|
|
|
|
- // Collect all part PASS VCFs
|
|
|
+ // Collect and verify all part PASS VCFs exist
|
|
|
for i in 1..=n_parts {
|
|
|
let mut dv = base.clone();
|
|
|
dv.part_index = Some(i);
|
|
|
let part_pass = dv.passed_vcf_path();
|
|
|
|
|
|
- if !Path::new(&part_pass).exists() {
|
|
|
- anyhow::bail!("Missing part PASS VCF: {part_pass}");
|
|
|
- }
|
|
|
+ anyhow::ensure!(
|
|
|
+ Path::new(&part_pass).exists(),
|
|
|
+ "Missing part {i} PASS VCF: {part_pass}"
|
|
|
+ );
|
|
|
|
|
|
part_pass_paths.push(PathBuf::from(part_pass));
|
|
|
}
|
|
|
@@ -612,21 +828,28 @@ fn merge_deepvariant_parts(base: &DeepVariant, n_parts: usize) -> anyhow::Result
|
|
|
let final_tmp = format!("{final_passed_vcf}.tmp");
|
|
|
|
|
|
// Ensure output directory exists
|
|
|
- fs::create_dir_all(
|
|
|
- Path::new(&final_passed_vcf)
|
|
|
- .parent()
|
|
|
- .unwrap_or_else(|| Path::new(".")),
|
|
|
- )?;
|
|
|
+ if let Some(parent) = Path::new(&final_passed_vcf).parent() {
|
|
|
+ fs::create_dir_all(parent)?;
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Concatenating {} part VCFs into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
|
|
|
- // Run bcftools concat via our Command/SlurmRunner abstraction
|
|
|
+ // Concatenate via bcftools
|
|
|
let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
|
|
|
|
|
|
- let _ = SlurmRunner::run(&mut concat)
|
|
|
- .context("failed to run bcftools concat for DeepVariant parts")?;
|
|
|
+ SlurmRunner::run(&mut concat).context("Failed to run bcftools concat for DeepVariant parts")?;
|
|
|
|
|
|
- // Move tmp → final
|
|
|
+ // Atomic rename to final path
|
|
|
fs::rename(&final_tmp, &final_passed_vcf)
|
|
|
- .context("failed to rename merged DeepVariant PASS VCF")?;
|
|
|
+ .context("Failed to rename merged DeepVariant PASS VCF")?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Successfully merged {} parts into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
@@ -647,4 +870,15 @@ mod tests {
|
|
|
assert_eq!(vl, vs);
|
|
|
Ok(())
|
|
|
}
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn deepvariant_run() -> anyhow::Result<()> {
|
|
|
+ test_init();
|
|
|
+ let config = Config::default();
|
|
|
+ let u = DeepVariant::initialize("34528", "norm", &config)?;
|
|
|
+ info!("{u}");
|
|
|
+
|
|
|
+ let _ = run_deepvariant_quartered_sbatch_with_merge("36167", "norm", &config)?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
}
|