|
|
@@ -1,62 +1,121 @@
|
|
|
+//! # DeepSomatic Somatic Variant Calling Pipeline
|
|
|
+//!
|
|
|
+//! This module provides a pipeline runner for [DeepSomatic](https://github.com/google/deepsomatic),
|
|
|
+//! a deep learning-based somatic variant caller for paired tumor-normal samples.
|
|
|
+//!
|
|
|
+//! ## Overview
|
|
|
+//!
|
|
|
+//! DeepSomatic performs somatic variant calling using:
|
|
|
+//!
|
|
|
+//! - CNN-based variant classification (derived from DeepVariant)
|
|
|
+//! - Paired tumor-normal analysis
|
|
|
+//! - Containerized execution via Singularity
|
|
|
+//!
|
|
|
+//! ## Execution Modes
|
|
|
+//!
|
|
|
+//! Execution mode is automatically selected via `config.slurm_runner`:
|
|
|
+//!
|
|
|
+//! - **Local** — Single-node execution
|
|
|
+//! - **Slurm** — HPC job submission
|
|
|
+//!
|
|
|
+//! Both modes support chunked parallel execution via [`run_deepsomatic_chunked_with_merge`].
|
|
|
+//!
|
|
|
+//! ## Output Files
|
|
|
+//!
|
|
|
+//! PASS-filtered somatic variants:
|
|
|
+//! ```text
|
|
|
+//! {result_dir}/{id}/deepsomatic/{id}_{tumoral_name}_DeepSomatic.pass.vcf.gz
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! ## Usage
|
|
|
+//!
|
|
|
+//! ```ignore
|
|
|
+//! use crate::config::Config;
|
|
|
+//! use crate::pipes::deepsomatic::run_deepsomatic_chunked_with_merge;
|
|
|
+//!
|
|
|
+//! let config = Config::default();
|
|
|
+//! let outputs = run_deepsomatic_chunked_with_merge("sample_001", &config, 30)?;
|
|
|
+//! # Ok::<(), anyhow::Error>(())
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! ## References
|
|
|
+//!
|
|
|
+//! - [DeepSomatic GitHub repository](https://github.com/google/deepsomatic)
|
|
|
+
|
|
|
use std::{
|
|
|
- fs,
|
|
|
- path::Path,
|
|
|
- process::{Command, Stdio},
|
|
|
+ fmt, fs,
|
|
|
+ path::{Path, PathBuf},
|
|
|
+ process::{Command as ProcessCommand, Stdio},
|
|
|
};
|
|
|
|
|
|
use anyhow::Context;
|
|
|
-use log::info;
|
|
|
+use log::{debug, info};
|
|
|
use rayon::prelude::*;
|
|
|
use regex::Regex;
|
|
|
+use rust_htslib::bam::{self, Read};
|
|
|
|
|
|
use crate::{
|
|
|
annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
|
|
|
collection::vcf::Vcf,
|
|
|
- commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
|
|
|
+ commands::{
|
|
|
+ CapturedOutput, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}
|
|
|
+ },
|
|
|
config::Config,
|
|
|
- helpers::{is_file_older, remove_dir_if_exists},
|
|
|
+ helpers::{
|
|
|
+ get_genome_sizes, is_file_older, remove_dir_if_exists, split_genome_into_n_regions_exact,
|
|
|
+ },
|
|
|
io::vcf::read_vcf,
|
|
|
pipes::{Initialize, ShouldRun, Version},
|
|
|
- runners::{run_wait, DockerRun, Run},
|
|
|
+ run, run_many,
|
|
|
+ runners::Run,
|
|
|
variant::{
|
|
|
variant::{Label, Variants},
|
|
|
variant_collection::VariantCollection,
|
|
|
},
|
|
|
};
|
|
|
|
|
|
-/// A pipeline runner for executing DeepSomatic on paired tumor and normal BAM files.
|
|
|
-///
|
|
|
-/// This struct encapsulates the configuration and metadata required to run the DeepSomatic
|
|
|
-/// variant caller inside a Docker container. It integrates into the pipeline ecosystem with:
|
|
|
-/// - Docker-based execution logic
|
|
|
-/// - Output filtering for PASS variants
|
|
|
-/// - Logging and diagnostics tracking
|
|
|
-/// - Annotation tagging for somatic variants
|
|
|
-#[derive(Debug)]
|
|
|
+#[derive(Debug, Clone)]
|
|
|
pub struct DeepSomatic {
|
|
|
pub id: String,
|
|
|
+ pub regions: String,
|
|
|
pub log_dir: String,
|
|
|
pub config: Config,
|
|
|
+ pub part_index: Option<usize>,
|
|
|
+}
|
|
|
+
|
|
|
+impl fmt::Display for DeepSomatic {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ writeln!(f, "🧬 DeepSomatic")?;
|
|
|
+ writeln!(f, " Case ID : {}", self.id)?;
|
|
|
+ writeln!(f, " Regions : {}", self.regions)?;
|
|
|
+ writeln!(f, " Log dir : {}", self.log_dir)?;
|
|
|
+ writeln!(
|
|
|
+ f,
|
|
|
+ " Part : {}",
|
|
|
+ self.part_index
|
|
|
+ .map_or("full".into(), |n| format!("part{n}"))
|
|
|
+ )
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
impl Initialize for DeepSomatic {
|
|
|
- /// Initializes the DeepSomatic runner by setting paths and logging.
|
|
|
- ///
|
|
|
- /// # Arguments
|
|
|
- /// * `id` - Sample ID used for output directory and log tagging.
|
|
|
- /// * `config` - Shared pipeline configuration.
|
|
|
- ///
|
|
|
- /// # Returns
|
|
|
- /// A ready-to-use `DeepSomatic` runner.
|
|
|
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
+ fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
|
info!("Initializing DeepSomatic for {id}.");
|
|
|
|
|
|
let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id);
|
|
|
+
|
|
|
+ let regions = (1..=22)
|
|
|
+ .map(|i| format!("chr{i}"))
|
|
|
+ .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
|
|
|
+ .collect::<Vec<String>>();
|
|
|
+
|
|
|
let deep_somatic = Self {
|
|
|
id,
|
|
|
- config,
|
|
|
+ regions: regions.join(" "),
|
|
|
log_dir,
|
|
|
+ config: config.clone(),
|
|
|
+ part_index: None,
|
|
|
};
|
|
|
|
|
|
if deep_somatic.config.deepsomatic_force {
|
|
|
@@ -67,20 +126,15 @@ impl Initialize for DeepSomatic {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// Determines whether DeepSomatic should be re-run based on whether
|
|
|
-/// the filtered PASS VCF is older than the input BAMs.
|
|
|
-///
|
|
|
-/// If either input BAM (normal or tumor) is newer than the PASS VCF,
|
|
|
-/// DeepSomatic is considered out of date and should be re-executed.
|
|
|
-///
|
|
|
-/// # Returns
|
|
|
-/// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist)
|
|
|
impl ShouldRun for DeepSomatic {
|
|
|
fn should_run(&self) -> bool {
|
|
|
let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id);
|
|
|
- let result = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true)
|
|
|
- .unwrap_or(true)
|
|
|
- || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
|
|
|
+ let normal_older =
|
|
|
+ is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
|
|
|
+ let tumor_older =
|
|
|
+ is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
|
|
|
+
|
|
|
+ let result = normal_older || tumor_older;
|
|
|
if result {
|
|
|
info!("DeepSomatic should run for id: {}.", self.id);
|
|
|
}
|
|
|
@@ -88,120 +142,174 @@ impl ShouldRun for DeepSomatic {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl Run for DeepSomatic {
|
|
|
- /// Runs DeepSomatic inside Docker and filters resulting variants with bcftools.
|
|
|
- ///
|
|
|
- /// # Workflow
|
|
|
- /// - Creates output directory
|
|
|
- /// - Executes Docker container for DeepSomatic
|
|
|
- /// - Applies PASS filtering using bcftools
|
|
|
- /// - Stores log outputs in gzipped format
|
|
|
- ///
|
|
|
- /// # Errors
|
|
|
- /// Returns an error if any subprocess or file operation fails.
|
|
|
- fn run(&mut self) -> anyhow::Result<()> {
|
|
|
- let output_vcf = self.config.deepsomatic_output_vcf(&self.id);
|
|
|
- if !Path::new(&output_vcf).exists() {
|
|
|
- let output_dir = self.config.deepsomatic_output_dir(&self.id);
|
|
|
- fs::create_dir_all(&output_dir)
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to create dir: {output_dir}.\n{e}"))?;
|
|
|
-
|
|
|
- let mut docker_run = DockerRun::new(&[
|
|
|
- "run",
|
|
|
- "-d",
|
|
|
- "-v",
|
|
|
- "/data:/data",
|
|
|
- "-v",
|
|
|
- &format!("{output_dir}:/output"),
|
|
|
- &format!("google/deepsomatic:{}", self.config.deepsomatic_bin_version),
|
|
|
- "run_deepsomatic",
|
|
|
- &format!("--model_type={}", self.config.deepsomatic_model_type),
|
|
|
- "--ref",
|
|
|
- &self.config.reference,
|
|
|
- "--reads_normal",
|
|
|
- &self.config.normal_bam(&self.id),
|
|
|
- "--reads_tumor",
|
|
|
- &self.config.tumoral_bam(&self.id),
|
|
|
- "--output_vcf",
|
|
|
- &format!(
|
|
|
- "/output/{}_{}_DeepSomatic.vcf.gz",
|
|
|
- self.id, self.config.tumoral_name
|
|
|
- ),
|
|
|
- // "--output_gvcf",
|
|
|
- // &format!(
|
|
|
- // "/output/{}_{}_DeepSomatic.g.vcf.gz",
|
|
|
- // self.id, self.config.tumoral_name
|
|
|
- // ),
|
|
|
- &format!("--num_shards={}", self.config.deepsomatic_threads),
|
|
|
- "--logging_dir",
|
|
|
- &format!(
|
|
|
- "/output/{}_{}_DeepSomatic_logs",
|
|
|
- self.id, self.config.tumoral_name
|
|
|
- ),
|
|
|
- "--vcf_stats_report=true",
|
|
|
- "--dry_run=false",
|
|
|
- "--sample_name_tumor",
|
|
|
- &format!("{}_{}", self.id, self.config.tumoral_name),
|
|
|
- "--sample_name_normal",
|
|
|
- &format!("{}_{}", self.id, self.config.normal_name),
|
|
|
- ]);
|
|
|
- let report = run_wait(&mut docker_run)
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to run DeepSomatic for {}.\n{e}", self.id))?;
|
|
|
- report
|
|
|
- .save_to_file(&format!("{}/deepsomatic_", self.log_dir))
|
|
|
- .map_err(|e| anyhow::anyhow!("Can't save DeepSomatic logs.\n{e}"))?;
|
|
|
+impl JobCommand for DeepSomatic {
|
|
|
+ fn init(&mut self) -> anyhow::Result<()> {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ fs::create_dir_all(&output_dir)
|
|
|
+ .with_context(|| format!("Failed to create dir: {output_dir}"))?;
|
|
|
+ fs::create_dir_all(&self.log_dir)
|
|
|
+ .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ fn cmd(&self) -> String {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ let output_vcf_path = self.output_vcf_path();
|
|
|
+ let sample_name_tumor = format!("{}_{}", self.id, self.config.tumoral_name);
|
|
|
+ let sample_name_normal = format!("{}_{}", self.id, self.config.normal_name);
|
|
|
+
|
|
|
+ let log_subdir = match self.part_index {
|
|
|
+ Some(idx) => format!("{}_DeepSomatic_part{idx}_logs", sample_name_tumor),
|
|
|
+ None => format!("{}_DeepSomatic_logs", sample_name_tumor),
|
|
|
+ };
|
|
|
+
|
|
|
+ let output_vcf = format!(
|
|
|
+ "/output/{}",
|
|
|
+ Path::new(&output_vcf_path)
|
|
|
+ .file_name()
|
|
|
+ .unwrap()
|
|
|
+ .to_string_lossy()
|
|
|
+ );
|
|
|
+
|
|
|
+ format!(
|
|
|
+ "{singularity_bin} exec \
|
|
|
+ --bind /mnt:/mnt \
|
|
|
+ --bind /home/t_steimle:/home/t_steimle \
|
|
|
+ --bind {output_dir}:/output \
|
|
|
+ {image} \
|
|
|
+ /opt/deepvariant/bin/deepsomtic/run_deepsomatic \
|
|
|
+ --model_type={model_type} \
|
|
|
+ --ref={reference} \
|
|
|
+ --reads_normal={normal_bam} \
|
|
|
+ --reads_tumor={tumor_bam} \
|
|
|
+ --regions=\"{regions}\" \
|
|
|
+ --output_vcf={output_vcf} \
|
|
|
+ --num_shards={threads} \
|
|
|
+ --logging_dir=/output/{log_subdir} \
|
|
|
+ --vcf_stats_report=true \
|
|
|
+ --dry_run=false \
|
|
|
+ --sample_name_tumor={sample_name_tumor} \
|
|
|
+ --sample_name_normal={sample_name_normal}",
|
|
|
+ singularity_bin = self.config.singularity_bin,
|
|
|
+ output_dir = output_dir,
|
|
|
+ image = self.config.deepsomatic_image,
|
|
|
+ model_type = self.config.deepsomatic_model_type,
|
|
|
+ reference = self.config.reference,
|
|
|
+ normal_bam = self.config.normal_bam(&self.id),
|
|
|
+ tumor_bam = self.config.tumoral_bam(&self.id),
|
|
|
+ regions = self.regions,
|
|
|
+ threads = self.config.deepsomatic_threads,
|
|
|
+ )
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl LocalRunner for DeepSomatic {}
|
|
|
+impl LocalBatchRunner for DeepSomatic {}
|
|
|
+
|
|
|
+impl SlurmRunner for DeepSomatic {
|
|
|
+ fn slurm_args(&self) -> Vec<String> {
|
|
|
+ SlurmParams {
|
|
|
+ job_name: Some(format!("deepsomatic_{}", self.id)),
|
|
|
+ cpus_per_task: Some(self.config.deepsomatic_threads as u32),
|
|
|
+ mem: Some("60G".into()),
|
|
|
+ partition: Some("batch".into()),
|
|
|
+ gres: None,
|
|
|
+ }.to_args()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl SbatchRunner for DeepSomatic {
|
|
|
+ fn slurm_params(&self) -> SlurmParams {
|
|
|
+ SlurmParams {
|
|
|
+ job_name: Some(format!("deepsomatic_{}", self.id)),
|
|
|
+ cpus_per_task: Some(self.config.deepsomatic_threads as u32),
|
|
|
+ mem: Some("60G".into()),
|
|
|
+ partition: Some("batch".into()),
|
|
|
+ gres: None,
|
|
|
}
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- // Keep PASS
|
|
|
- let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
|
|
|
- if !Path::new(&vcf_passed).exists() {
|
|
|
- info!("Filtering DeepSomatic PASS variants for {}", self.id);
|
|
|
- let report = bcftools_keep_pass(&output_vcf, &vcf_passed, BcftoolsConfig::default())
|
|
|
- .map_err(|e| {
|
|
|
- anyhow::anyhow!("Error while running bcftools pass for {}.\n{e}", output_vcf)
|
|
|
- })?;
|
|
|
- report
|
|
|
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
|
|
|
- .map_err(|e| {
|
|
|
- anyhow::anyhow!(
|
|
|
- "Error while saving bcftools report for {}.\n{e}",
|
|
|
- output_vcf
|
|
|
- )
|
|
|
- })?;
|
|
|
+impl DeepSomatic {
|
|
|
+ fn part_output_dir(&self) -> String {
|
|
|
+ let base_dir = self.config.deepsomatic_output_dir(&self.id);
|
|
|
+ match self.part_index {
|
|
|
+ Some(idx) => format!("{base_dir}/part{idx}"),
|
|
|
+ None => base_dir,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn output_vcf_path(&self) -> String {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ format!(
|
|
|
+ "{output_dir}/{}_{}_DeepSomatic.vcf.gz",
|
|
|
+ self.id, self.config.tumoral_name
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ fn passed_vcf_path(&self) -> String {
|
|
|
+ match self.part_index {
|
|
|
+ Some(_) => self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz"),
|
|
|
+ None => self.config.deepsomatic_passed_vcf(&self.id),
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ fn filter_pass(&self) -> anyhow::Result<()> {
|
|
|
+ let output_vcf = self.output_vcf_path();
|
|
|
+ let vcf_passed = self.passed_vcf_path();
|
|
|
+
|
|
|
+ if Path::new(&vcf_passed).exists() {
|
|
|
+ debug!("PASS VCF already exists: {vcf_passed}");
|
|
|
+ return Ok(());
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Filtering DeepSomatic PASS variants for {} (part: {:?})",
|
|
|
+ self.id, self.part_index
|
|
|
+ );
|
|
|
+
|
|
|
+ let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, vcf_passed);
|
|
|
+ let report = run!(&self.config, &mut cmd)
|
|
|
+ .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
|
|
|
+ .context("Can't save bcftools PASS logs")?;
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+impl Run for DeepSomatic {
|
|
|
+ fn run(&mut self) -> anyhow::Result<()> {
|
|
|
+ run!(&self.config, self)?;
|
|
|
+ self.filter_pass()?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl CallerCat for DeepSomatic {
|
|
|
- /// Returns a classification tag for this caller, identifying it as somatic.
|
|
|
fn caller_cat(&self) -> Annotation {
|
|
|
Annotation::Callers(Caller::DeepSomatic, Sample::Somatic)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl Variants for DeepSomatic {
|
|
|
- /// Loads and annotates variants from the DeepSomatic VCF (PASS-filtered).
|
|
|
- ///
|
|
|
- /// # Arguments
|
|
|
- /// * `annotations` - The global annotation map to which variants are added.
|
|
|
- ///
|
|
|
- /// # Returns
|
|
|
- /// A `VariantCollection` containing variants, associated VCF path, and caller category.
|
|
|
fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = self.caller_cat();
|
|
|
- let add = vec![caller.clone()];
|
|
|
let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
|
|
|
|
|
|
info!("Loading variants from {caller}: {vcf_passed}");
|
|
|
+
|
|
|
let variants = read_vcf(&vcf_passed)
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to read DeepSomatic VCF {}.\n{e}", vcf_passed))?;
|
|
|
+ .with_context(|| format!("Failed to read DeepSomatic VCF {vcf_passed}"))?;
|
|
|
|
|
|
variants.par_iter().for_each(|v| {
|
|
|
- annotations.insert_update(v.hash(), &add);
|
|
|
+ annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
|
|
|
});
|
|
|
- info!("{}, {} variants loaded.", caller, variants.len());
|
|
|
+
|
|
|
+ info!("{caller}: {} variants loaded", variants.len());
|
|
|
+
|
|
|
Ok(VariantCollection {
|
|
|
variants,
|
|
|
vcf: Vcf::new(vcf_passed.into())?,
|
|
|
@@ -211,46 +319,231 @@ impl Variants for DeepSomatic {
|
|
|
}
|
|
|
|
|
|
impl Label for DeepSomatic {
|
|
|
- /// Returns the string label for this caller.
|
|
|
fn label(&self) -> String {
|
|
|
self.caller_cat().to_string()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl Version for DeepSomatic {
|
|
|
- /// Retrieves the DeepSomatic version by running `run_deepsomatic --version` in its docker environment.
|
|
|
- ///
|
|
|
- /// # Errors
|
|
|
- /// Returns an error if command execution fails or "Version " not found in output.
|
|
|
fn version(config: &Config) -> anyhow::Result<String> {
|
|
|
- let out = Command::new("docker")
|
|
|
- .args([
|
|
|
- "run",
|
|
|
- "--rm",
|
|
|
- "--entrypoint",
|
|
|
- "/opt/deepvariant/bin/deepsomatic/run_deepsomatic",
|
|
|
- &format!("google/deepsomatic:{}", config.deepsomatic_bin_version),
|
|
|
- "--version",
|
|
|
- ])
|
|
|
+ let out = ProcessCommand::new("bash")
|
|
|
+ .arg("-c")
|
|
|
+ .arg(format!(
|
|
|
+ "{} exec {} /opt/deepvariant/bin/deepsomtic/run_deepsomatic --version",
|
|
|
+ config.singularity_bin, config.deepsomatic_image
|
|
|
+ ))
|
|
|
.stdout(Stdio::piped())
|
|
|
.stderr(Stdio::piped())
|
|
|
.output()
|
|
|
- .context("failed to spawn docker")?;
|
|
|
+ .context("Failed to spawn Singularity process")?;
|
|
|
|
|
|
if !out.status.success() {
|
|
|
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
|
|
|
- log.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
|
- anyhow::bail!("docker run failed: {}\n{}", out.status, log);
|
|
|
+ let combined = format!(
|
|
|
+ "{}{}",
|
|
|
+ String::from_utf8_lossy(&out.stdout),
|
|
|
+ String::from_utf8_lossy(&out.stderr)
|
|
|
+ );
|
|
|
+ anyhow::bail!(
|
|
|
+ "Singularity exec failed with status {}: {}",
|
|
|
+ out.status,
|
|
|
+ combined
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
|
|
|
- log.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
|
+ let combined = format!(
|
|
|
+ "{}{}",
|
|
|
+ String::from_utf8_lossy(&out.stdout),
|
|
|
+ String::from_utf8_lossy(&out.stderr)
|
|
|
+ );
|
|
|
+
|
|
|
+ parse_deepsomatic_version(&combined)
|
|
|
+ }
|
|
|
|
|
|
- // e.g. “DeepSomatic version 1.9.0”
|
|
|
- let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)")?;
|
|
|
- let caps = re
|
|
|
- .captures(&log)
|
|
|
- .context("could not parse DeepSomatic version from output")?;
|
|
|
- Ok(caps.get(1).unwrap().as_str().to_string())
|
|
|
+ fn version_slurm(config: &Config) -> anyhow::Result<String> {
|
|
|
+ struct DeepSomaticVersionJob<'a> {
|
|
|
+ config: &'a Config,
|
|
|
+ }
|
|
|
+
|
|
|
+ impl JobCommand for DeepSomaticVersionJob<'_> {
|
|
|
+ fn cmd(&self) -> String {
|
|
|
+ format!(
|
|
|
+ "{} exec {} /opt/deepvariant/bin/deepsomtic/run_deepsomatic --version",
|
|
|
+ self.config.singularity_bin, self.config.deepsomatic_image
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl SlurmRunner for DeepSomaticVersionJob<'_> {
|
|
|
+ fn slurm_args(&self) -> Vec<String> {
|
|
|
+ SlurmParams {
|
|
|
+ job_name: Some("deepsomatic_version".into()),
|
|
|
+ partition: Some("shortq".into()),
|
|
|
+ cpus_per_task: Some(1),
|
|
|
+ mem: Some("10G".into()),
|
|
|
+ gres: None,
|
|
|
+ }.to_args()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut job = DeepSomaticVersionJob { config };
|
|
|
+ let out = crate::commands::SlurmRunner::run(&mut job)
|
|
|
+ .context("Failed to run DeepSomatic --version via Slurm")?;
|
|
|
+
|
|
|
+ let mut combined = out.stdout.clone();
|
|
|
+ if let Some(epilog) = &out.slurm_epilog {
|
|
|
+ combined.push_str(&epilog.to_string());
|
|
|
+ }
|
|
|
+ combined.push_str(&out.stderr);
|
|
|
+
|
|
|
+ parse_deepsomatic_version(&combined)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn parse_deepsomatic_version(output: &str) -> anyhow::Result<String> {
|
|
|
+ let re = Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("Version regex is valid");
|
|
|
+
|
|
|
+ let caps = re
|
|
|
+ .captures(output)
|
|
|
+ .context("Could not parse DeepSomatic version from output")?;
|
|
|
+
|
|
|
+ Ok(caps
|
|
|
+ .get(1)
|
|
|
+ .expect("Regex has capture group 1")
|
|
|
+ .as_str()
|
|
|
+ .to_string())
|
|
|
+}
|
|
|
+
|
|
|
+fn merge_deepsomatic_parts(base: &DeepSomatic, n_parts: usize) -> anyhow::Result<()> {
|
|
|
+ let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
|
|
|
+
|
|
|
+ for i in 1..=n_parts {
|
|
|
+ let mut ds = base.clone();
|
|
|
+ ds.part_index = Some(i);
|
|
|
+ let part_pass = ds.passed_vcf_path();
|
|
|
+
|
|
|
+ anyhow::ensure!(
|
|
|
+ Path::new(&part_pass).exists(),
|
|
|
+ "Missing DeepSomatic part {i} PASS VCF: {part_pass}"
|
|
|
+ );
|
|
|
+
|
|
|
+ part_pass_paths.push(PathBuf::from(part_pass));
|
|
|
+ }
|
|
|
+
|
|
|
+ let final_passed_vcf = base.config.deepsomatic_passed_vcf(&base.id);
|
|
|
+ let final_tmp = format!("{final_passed_vcf}.tmp");
|
|
|
+
|
|
|
+ if let Some(parent) = Path::new(&final_passed_vcf).parent() {
|
|
|
+ fs::create_dir_all(parent)?;
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Concatenating {} DeepSomatic part VCFs into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
+
|
|
|
+ let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
|
|
|
+ run!(&base.config, &mut concat)
|
|
|
+ .context("Failed to run bcftools concat for DeepSomatic parts")?;
|
|
|
+
|
|
|
+ fs::rename(&final_tmp, &final_passed_vcf)
|
|
|
+ .context("Failed to rename merged DeepSomatic PASS VCF")?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Successfully merged {} DeepSomatic parts into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+/// Runs DeepSomatic in parallel chunks, then merges results.
|
|
|
+///
|
|
|
+/// Execution mode (local vs Slurm) is determined by `config.slurm_runner`.
|
|
|
+pub fn run_deepsomatic_chunked_with_merge(
|
|
|
+ id: &str,
|
|
|
+ config: &Config,
|
|
|
+ n_parts: usize,
|
|
|
+) -> anyhow::Result<Vec<CapturedOutput>> {
|
|
|
+ anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
|
|
|
+
|
|
|
+ let base = DeepSomatic::initialize(id, config)?;
|
|
|
+
|
|
|
+ if !base.should_run() {
|
|
|
+ debug!("DeepSomatic PASS VCF already up-to-date for {id}, skipping.");
|
|
|
+ return Ok(Vec::new());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get genome sizes from tumor BAM header
|
|
|
+ let tumor_bam = config.tumoral_bam(id);
|
|
|
+ let reader = bam::Reader::from_path(&tumor_bam)
|
|
|
+ .with_context(|| format!("Failed to open BAM: {tumor_bam}"))?;
|
|
|
+ let header = bam::Header::from_template(reader.header());
|
|
|
+ let genome_sizes = get_genome_sizes(&header)?;
|
|
|
+
|
|
|
+ let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
|
|
|
+ .into_iter()
|
|
|
+ .map(|v| v.join(" "))
|
|
|
+ .collect::<Vec<String>>();
|
|
|
+
|
|
|
+ let actual_n_parts = region_chunks.len();
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Running DeepSomatic in {} parallel parts for {}",
|
|
|
+ actual_n_parts, id
|
|
|
+ );
|
|
|
+
|
|
|
+ let mut jobs = Vec::with_capacity(actual_n_parts);
|
|
|
+ for (i, regions) in region_chunks.into_iter().enumerate() {
|
|
|
+ let mut job = base.clone();
|
|
|
+ job.regions = regions;
|
|
|
+ job.part_index = Some(i + 1);
|
|
|
+ job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
|
|
|
+ info!("Planned DeepSomatic job:\n{job}");
|
|
|
+ jobs.push(job);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run DeepSomatic jobs
|
|
|
+ let outputs = run_many!(config, jobs.clone())?;
|
|
|
+
|
|
|
+ // Filter PASS variants for each part
|
|
|
+ info!("Filtering PASS variants for {} parts", actual_n_parts);
|
|
|
+ for job in &jobs {
|
|
|
+ job.filter_pass()?;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Merge PASS VCFs
|
|
|
+ merge_deepsomatic_parts(&base, actual_n_parts)?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "DeepSomatic completed for {}: {} parts merged",
|
|
|
+ id, actual_n_parts
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(outputs)
|
|
|
+}
|
|
|
+
|
|
|
+#[cfg(test)]
|
|
|
+mod tests {
|
|
|
+ use super::*;
|
|
|
+ use crate::helpers::test_init;
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn deepsomatic_version() -> anyhow::Result<()> {
|
|
|
+ test_init();
|
|
|
+ let vl = DeepSomatic::version(&Config::default())?;
|
|
|
+ info!("DeepSomatic local version: {vl}");
|
|
|
+ let vs = DeepSomatic::version_slurm(&Config::default())?;
|
|
|
+ info!("DeepSomatic slurm version: {vs}");
|
|
|
+ assert_eq!(vl, vs);
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn deepsomatic_run() -> anyhow::Result<()> {
|
|
|
+ test_init();
|
|
|
+ let config = Config::default();
|
|
|
+ let outputs = run_deepsomatic_chunked_with_merge("34528", &config, 30)?;
|
|
|
+ info!("Completed with {} job outputs", outputs.len());
|
|
|
+ Ok(())
|
|
|
}
|
|
|
}
|