|
|
@@ -21,7 +21,7 @@ use crate::{
|
|
|
};
|
|
|
|
|
|
use anyhow::Context;
|
|
|
-use log::{debug, info, warn};
|
|
|
+use log::{debug, info};
|
|
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|
|
use regex::Regex;
|
|
|
use rust_htslib::bam::{self, Read};
|
|
|
@@ -37,21 +37,32 @@ use std::{
|
|
|
/// This struct supports:
|
|
|
/// - Local execution via `run_local`
|
|
|
/// - Slurm execution via `run_sbatch`
|
|
|
-/// - Optional region restriction via `-r` (for downstream batched runners)
|
|
|
+/// - Chunked parallel execution via `run_clairs_chunked_sbatch_with_merge`
|
|
|
/// - bcftools post-processing (germline + somatic PASS)
|
|
|
+///
|
|
|
+/// # Output Files
|
|
|
+///
|
|
|
+/// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
|
|
|
+/// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
|
|
|
+///
|
|
|
+/// # Chunked Execution
|
|
|
+///
|
|
|
+/// For large genomes, use [`run_clairs_chunked_sbatch_with_merge`] which:
|
|
|
+/// 1. Splits genome into N regions
|
|
|
+/// 2. Runs ClairS in parallel Slurm jobs
|
|
|
+/// 3. Post-processes each part (concat SNV+indel, filter PASS)
|
|
|
+/// 4. Merges all part PASS VCFs into final output
|
|
|
#[derive(Debug, Clone)]
|
|
|
pub struct ClairS {
|
|
|
+ /// Sample identifier
|
|
|
pub id: String,
|
|
|
+ /// Pipeline configuration
|
|
|
pub config: Config,
|
|
|
+ /// Log directory for this run
|
|
|
pub log_dir: String,
|
|
|
- /// Optional region passed as `-r REGION` (`ctg:start-end`).
|
|
|
- /// When `None`, ClairS runs genome-wide.
|
|
|
+ /// Optional region for restricted runs (format: `ctg:start-end`)
|
|
|
pub region: Option<String>,
|
|
|
-
|
|
|
- /// Optional part index for chunked parallel runs (1-indexed).
|
|
|
- ///
|
|
|
- /// When `Some(n)`, output files go into a `part{n}` subdirectory and
|
|
|
- /// PASS VCFs are per-part, later merged into the canonical VCF.
|
|
|
+ /// Optional part index for chunked parallel runs (1-indexed)
|
|
|
pub part_index: Option<usize>,
|
|
|
}
|
|
|
|
|
|
@@ -60,16 +71,16 @@ impl fmt::Display for ClairS {
|
|
|
writeln!(f, "🧬 ClairS {}", self.id)?;
|
|
|
writeln!(
|
|
|
f,
|
|
|
- " Region : {}",
|
|
|
+ " Region : {}",
|
|
|
self.region.as_deref().unwrap_or("genome-wide")
|
|
|
)?;
|
|
|
writeln!(
|
|
|
f,
|
|
|
- " Part : {}",
|
|
|
+ " Part : {}",
|
|
|
self.part_index
|
|
|
.map_or("full".into(), |n| format!("part{n}"))
|
|
|
)?;
|
|
|
- writeln!(f, " Log dir : {}", self.log_dir)
|
|
|
+ writeln!(f, " Log dir : {}", self.log_dir)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -77,7 +88,7 @@ impl Initialize for ClairS {
|
|
|
fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
|
|
|
|
- info!("Initialize ClairS for {id}.");
|
|
|
+ info!("Initializing ClairS for {id}");
|
|
|
let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
|
|
|
|
|
|
let clairs = Self {
|
|
|
@@ -100,27 +111,28 @@ impl ShouldRun for ClairS {
|
|
|
/// Determines whether ClairS should be re-run based on BAM modification timestamps.
|
|
|
fn should_run(&self) -> bool {
|
|
|
let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
|
|
|
- let result = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true)
|
|
|
- .unwrap_or(true)
|
|
|
- || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
|
|
|
+ let normal_older =
|
|
|
+ is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
|
|
|
+ let tumor_older =
|
|
|
+ is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
|
|
|
+
|
|
|
+ let result = normal_older || tumor_older;
|
|
|
if result {
|
|
|
- warn!("ClairS should run for id: {}.", self.id);
|
|
|
+ info!("ClairS should run for id: {}.", self.id);
|
|
|
}
|
|
|
result
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* ---------------- JobCommand / LocalRunner / SbatchRunner ---------------- */
|
|
|
-
|
|
|
impl JobCommand for ClairS {
|
|
|
fn init(&mut self) -> anyhow::Result<()> {
|
|
|
let output_dir = self.part_output_dir();
|
|
|
|
|
|
fs::create_dir_all(&output_dir)
|
|
|
- .with_context(|| format!("Failed create dir: {output_dir}"))?;
|
|
|
+ .with_context(|| format!("Failed to create dir: {output_dir}"))?;
|
|
|
|
|
|
fs::create_dir_all(&self.log_dir)
|
|
|
- .with_context(|| format!("Failed create dir: {}", self.log_dir))?;
|
|
|
+ .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
@@ -128,11 +140,10 @@ impl JobCommand for ClairS {
|
|
|
fn cmd(&self) -> String {
|
|
|
let output_dir = self.part_output_dir();
|
|
|
|
|
|
- // Build repeated -r REGION args if any regions were set (for batched runs)
|
|
|
let region_arg = self
|
|
|
.region
|
|
|
.as_ref()
|
|
|
- .map(|r| format!("-r {r}"))
|
|
|
+ .map(|r| format!("-r '{r}'"))
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
let sample_name = format!("{}_diag", self.id);
|
|
|
@@ -170,18 +181,16 @@ impl JobCommand for ClairS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl LocalRunner for ClairS {
|
|
|
- // default shell() is fine ("bash")
|
|
|
-}
|
|
|
+impl LocalRunner for ClairS {}
|
|
|
|
|
|
impl SbatchRunner for ClairS {
|
|
|
fn slurm_params(&self) -> SlurmParams {
|
|
|
SlurmParams {
|
|
|
job_name: Some(format!("clairs_{}", self.id)),
|
|
|
cpus_per_task: Some(self.config.clairs_threads as u32),
|
|
|
- mem: Some("60G".into()), // tune as needed
|
|
|
- partition: Some("batch".into()), // CPU partition, no GPU
|
|
|
- gres: None, // ClairS does not use GPU
|
|
|
+ mem: Some("60G".into()),
|
|
|
+ partition: Some("batch".into()),
|
|
|
+ gres: None,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -197,188 +206,270 @@ impl Run for ClairS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* ---------------- Post-processing helpers (germline + somatic) ----------- */
|
|
|
-
|
|
|
impl ClairS {
|
|
|
- fn postprocess_local(&self) -> anyhow::Result<()> {
|
|
|
- // Germline PASS only once (full run, not per-part)
|
|
|
- if self.part_index.is_none() {
|
|
|
- let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
- if !Path::new(&clair3_germline_passed).exists() {
|
|
|
- let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
|
|
|
-
|
|
|
- let mut cmd = BcftoolsKeepPass::from_config(
|
|
|
- &self.config,
|
|
|
- clair3_germline_normal,
|
|
|
- clair3_germline_passed.clone(),
|
|
|
- );
|
|
|
- let report =
|
|
|
- <BcftoolsKeepPass as LocalRunner>::run(&mut cmd).with_context(|| {
|
|
|
- format!(
|
|
|
- "Failed to run `bcftools keep PASS` for {}.",
|
|
|
- clair3_germline_passed
|
|
|
- )
|
|
|
- })?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_germline_pass_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .with_context(|| format!("Error while writing logs into {log_file}"))?;
|
|
|
- } else {
|
|
|
- debug!(
|
|
|
- "ClairS Germline PASSED VCF already exists for {}, skipping.",
|
|
|
- self.id
|
|
|
- );
|
|
|
+ /// Returns the germline PASS VCF path.
|
|
|
+ ///
|
|
|
+ /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz`
|
|
|
+ /// - Full runs: canonical path from config
|
|
|
+ fn germline_passed_vcf_path(&self) -> String {
|
|
|
+ match self.part_index {
|
|
|
+ Some(idx) => {
|
|
|
+ let outdir = self.part_output_dir();
|
|
|
+ format!("{outdir}/clair3_germline.part{idx}.pass.vcf.gz")
|
|
|
}
|
|
|
+ None => self.config.clairs_germline_passed_vcf(&self.id),
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // Somatic concat + PASS (per-part or full)
|
|
|
- let passed_vcf = self.somatic_passed_vcf_path();
|
|
|
- if !Path::new(&passed_vcf).exists() {
|
|
|
- let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
|
|
|
- let output_dir = self.part_output_dir();
|
|
|
- let output_vcf = format!(
|
|
|
- "{output_dir}/{}",
|
|
|
- Path::new(&output_vcf)
|
|
|
- .file_name()
|
|
|
- .unwrap()
|
|
|
- .to_string_lossy()
|
|
|
+ /// Post-processes ClairS output locally (concat SNV+indel, filter PASS).
|
|
|
+ fn postprocess_local(&self) -> anyhow::Result<()> {
|
|
|
+ self.process_germline_local()?;
|
|
|
+ self.process_somatic_local()?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Processes germline VCF (PASS filter only).
|
|
|
+ fn process_germline_local(&self) -> anyhow::Result<()> {
|
|
|
+ let germline_passed = self.germline_passed_vcf_path();
|
|
|
+
|
|
|
+ if Path::new(&germline_passed).exists() {
|
|
|
+ debug!(
|
|
|
+ "ClairS germline PASS VCF already exists for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
);
|
|
|
- let output_indels_vcf = format!(
|
|
|
- "{output_dir}/{}",
|
|
|
- Path::new(&output_indels_vcf)
|
|
|
+ return Ok(());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Input path depends on whether this is a part run
|
|
|
+ let germline_input = match self.part_index {
|
|
|
+ Some(_) => {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ let b = self.config.clairs_germline_normal_vcf(&self.id);
|
|
|
+ let base_name = Path::new(&b)
|
|
|
.file_name()
|
|
|
- .unwrap()
|
|
|
- .to_string_lossy()
|
|
|
- );
|
|
|
+ .expect("Germline VCF should have filename")
|
|
|
+ .to_string_lossy();
|
|
|
+ format!("{output_dir}/{base_name}")
|
|
|
+ }
|
|
|
+ None => self.config.clairs_germline_normal_vcf(&self.id),
|
|
|
+ };
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Filtering germline PASS variants for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
+ );
|
|
|
|
|
|
- let tmp_file = temp_file_path(".vcf.gz")?.to_str().unwrap().to_string();
|
|
|
+ let mut cmd =
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
|
|
|
|
|
|
- let mut concat = BcftoolsConcat::from_config(
|
|
|
- &self.config,
|
|
|
- vec![output_vcf.clone(), output_indels_vcf.clone()],
|
|
|
- &tmp_file,
|
|
|
- );
|
|
|
- let report = <BcftoolsConcat as LocalRunner>::run(&mut concat).with_context(|| {
|
|
|
- format!(
|
|
|
- "Failed to run bcftools concat for {} and {}.",
|
|
|
- output_vcf, output_indels_vcf
|
|
|
- )
|
|
|
- })?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_concat_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .with_context(|| format!("Error while writing logs into {log_file}"))?;
|
|
|
-
|
|
|
- let mut keep_pass =
|
|
|
- BcftoolsKeepPass::from_config(&self.config, tmp_file.clone(), passed_vcf.clone());
|
|
|
- let report =
|
|
|
- <BcftoolsKeepPass as LocalRunner>::run(&mut keep_pass).with_context(|| {
|
|
|
- format!("Error while running bcftools keep PASS for {}.", output_vcf)
|
|
|
- })?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .with_context(|| format!("Error while writing logs into {log_file}"))?;
|
|
|
-
|
|
|
- fs::remove_file(&tmp_file)
|
|
|
- .with_context(|| format!("Failed to remove temporary file {tmp_file}"))?;
|
|
|
- } else {
|
|
|
+ let report = <BcftoolsKeepPass as LocalRunner>::run(&mut cmd)
|
|
|
+ .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
|
|
|
+ .context("Failed to save germline PASS logs")?;
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Processes somatic VCFs (concat SNV+indel, then PASS filter).
|
|
|
+ fn process_somatic_local(&self) -> anyhow::Result<()> {
|
|
|
+ let passed_vcf = self.somatic_passed_vcf_path();
|
|
|
+
|
|
|
+ if Path::new(&passed_vcf).exists() {
|
|
|
debug!(
|
|
|
- "ClairS PASSED VCF already exists for {}, part {:?}, skipping.",
|
|
|
+ "ClairS somatic PASS VCF already exists for {} part {:?}",
|
|
|
self.id, self.part_index
|
|
|
);
|
|
|
+ return Ok(());
|
|
|
}
|
|
|
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
|
|
|
+
|
|
|
+ let snv_vcf = format!(
|
|
|
+ "{}/{}",
|
|
|
+ output_dir,
|
|
|
+ Path::new(&snv_vcf_base)
|
|
|
+ .file_name()
|
|
|
+ .expect("SNV VCF should have filename")
|
|
|
+ .to_string_lossy()
|
|
|
+ );
|
|
|
+ let indel_vcf = format!(
|
|
|
+ "{}/{}",
|
|
|
+ output_dir,
|
|
|
+ Path::new(&indel_vcf_base)
|
|
|
+ .file_name()
|
|
|
+ .expect("Indel VCF should have filename")
|
|
|
+ .to_string_lossy()
|
|
|
+ );
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Concatenating and filtering somatic variants for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
+ );
|
|
|
+
|
|
|
+ // Create temp file for intermediate concat result
|
|
|
+ let tmp_file = temp_file_path(".vcf.gz")?;
|
|
|
+ let tmp_path = tmp_file
|
|
|
+ .to_str()
|
|
|
+ .context("Temp path not valid UTF-8")?
|
|
|
+ .to_string();
|
|
|
+
|
|
|
+ // Concat SNV + indel
|
|
|
+ let mut concat = BcftoolsConcat::from_config(
|
|
|
+ &self.config,
|
|
|
+ vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
|
|
|
+ &tmp_path,
|
|
|
+ );
|
|
|
+
|
|
|
+ let report = <BcftoolsConcat as LocalRunner>::run(&mut concat)
|
|
|
+ .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
|
|
|
+ .context("Failed to save concat logs")?;
|
|
|
+
|
|
|
+ // Filter PASS
|
|
|
+ let mut keep_pass =
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, tmp_path.clone(), passed_vcf.clone());
|
|
|
+
|
|
|
+ let report = <BcftoolsKeepPass as LocalRunner>::run(&mut keep_pass)
|
|
|
+ .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
|
|
|
+ .context("Failed to save PASS filter logs")?;
|
|
|
+
|
|
|
+ fs::remove_file(&tmp_path).ok();
|
|
|
+
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- fn postprocess_sbatch(&self) -> anyhow::Result<()> {
|
|
|
- // Germline PASS only once
|
|
|
- if self.part_index.is_none() {
|
|
|
- let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
- if !Path::new(&clair3_germline_passed).exists() {
|
|
|
- let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
|
|
|
-
|
|
|
- let mut cmd = BcftoolsKeepPass::from_config(
|
|
|
- &self.config,
|
|
|
- clair3_germline_normal,
|
|
|
- clair3_germline_passed.clone(),
|
|
|
- );
|
|
|
- let report = SlurmRunner::run(&mut cmd)
|
|
|
- .context("Failed to run `bcftools keep PASS` on Slurm")?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_germline_pass_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .context("Error while writing logs")?;
|
|
|
- } else {
|
|
|
- debug!(
|
|
|
- "ClairS Germline PASSED VCF already exists for {}, skipping.",
|
|
|
- self.id
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
+ /// Processes germline VCF via Slurm.
|
|
|
+ fn process_germline_sbatch(&self) -> anyhow::Result<()> {
|
|
|
+ let germline_passed = self.germline_passed_vcf_path();
|
|
|
|
|
|
- // Somatic concat + PASS (per-part or full)
|
|
|
- let passed_vcf = self.somatic_passed_vcf_path();
|
|
|
- if !Path::new(&passed_vcf).exists() {
|
|
|
- let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
|
|
|
- let output_dir = self.part_output_dir();
|
|
|
- let output_vcf = format!(
|
|
|
- "{output_dir}/{}",
|
|
|
- Path::new(&output_vcf)
|
|
|
- .file_name()
|
|
|
- .unwrap()
|
|
|
- .to_string_lossy()
|
|
|
+ if Path::new(&germline_passed).exists() {
|
|
|
+ debug!(
|
|
|
+ "ClairS germline PASS VCF already exists for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
);
|
|
|
- let output_indels_vcf = format!(
|
|
|
- "{output_dir}/{}",
|
|
|
- Path::new(&output_indels_vcf)
|
|
|
+ return Ok(());
|
|
|
+ }
|
|
|
+
|
|
|
+ let germline_input = match self.part_index {
|
|
|
+ Some(_) => {
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ let germline_file = self.config.clairs_germline_normal_vcf(&self.id);
|
|
|
+ let base_name = Path::new(&germline_file)
|
|
|
.file_name()
|
|
|
- .unwrap()
|
|
|
- .to_string_lossy()
|
|
|
- );
|
|
|
+ .expect("Germline VCF should have filename")
|
|
|
+ .to_string_lossy();
|
|
|
+ format!("{output_dir}/{base_name}")
|
|
|
+ }
|
|
|
+ None => self.config.clairs_germline_normal_vcf(&self.id),
|
|
|
+ };
|
|
|
|
|
|
- let tmp_file = temp_file_path(".vcf.gz")?.to_str().unwrap().to_string();
|
|
|
+ info!(
|
|
|
+ "Filtering germline PASS variants via Slurm for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
+ );
|
|
|
|
|
|
- let mut concat = BcftoolsConcat::from_config(
|
|
|
- &self.config,
|
|
|
- vec![output_vcf.clone(), output_indels_vcf.clone()],
|
|
|
- &tmp_file,
|
|
|
- );
|
|
|
- let report = SlurmRunner::run(&mut concat)
|
|
|
- .context("Failed to run bcftools concat for ClairS somatic on Slurm")?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_concat_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .context("Error while writing concat logs")?;
|
|
|
-
|
|
|
- let mut keep_pass =
|
|
|
- BcftoolsKeepPass::from_config(&self.config, tmp_file.clone(), passed_vcf.clone());
|
|
|
- let report = SlurmRunner::run(&mut keep_pass)
|
|
|
- .context("Failed to run bcftools keep PASS for ClairS somatic on Slurm")?;
|
|
|
-
|
|
|
- let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
|
- report
|
|
|
- .save_to_file(&log_file)
|
|
|
- .context("Error while writing PASS logs")?;
|
|
|
-
|
|
|
- fs::remove_file(&tmp_file).context("Failed to remove temporary merged VCF")?;
|
|
|
- } else {
|
|
|
+ let mut cmd =
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
|
|
|
+
|
|
|
+ let report = SlurmRunner::run(&mut cmd)
|
|
|
+ .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
|
|
|
+ .context("Failed to save germline PASS logs")?;
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Processes somatic VCFs via Slurm.
|
|
|
+ fn process_somatic_sbatch(&self) -> anyhow::Result<()> {
|
|
|
+ let passed_vcf = self.somatic_passed_vcf_path();
|
|
|
+
|
|
|
+ if Path::new(&passed_vcf).exists() {
|
|
|
debug!(
|
|
|
- "ClairS PASSED VCF already exists for {}, part {:?}, skipping.",
|
|
|
+ "ClairS somatic PASS VCF already exists for {} part {:?}",
|
|
|
self.id, self.part_index
|
|
|
);
|
|
|
+ return Ok(());
|
|
|
}
|
|
|
|
|
|
+ let output_dir = self.part_output_dir();
|
|
|
+ let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
|
|
|
+
|
|
|
+ let snv_vcf = format!(
|
|
|
+ "{}/{}",
|
|
|
+ output_dir,
|
|
|
+ Path::new(&snv_vcf_base)
|
|
|
+ .file_name()
|
|
|
+ .expect("SNV VCF should have filename")
|
|
|
+ .to_string_lossy()
|
|
|
+ );
|
|
|
+ let indel_vcf = format!(
|
|
|
+ "{}/{}",
|
|
|
+ output_dir,
|
|
|
+ Path::new(&indel_vcf_base)
|
|
|
+ .file_name()
|
|
|
+ .expect("Indel VCF should have filename")
|
|
|
+ .to_string_lossy()
|
|
|
+ );
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Concatenating and filtering somatic variants via Slurm for {} part {:?}",
|
|
|
+ self.id, self.part_index
|
|
|
+ );
|
|
|
+
|
|
|
+ let tmp_file = temp_file_path(".vcf.gz")?;
|
|
|
+ let tmp_path = tmp_file
|
|
|
+ .to_str()
|
|
|
+ .context("Temp path not valid UTF-8")?
|
|
|
+ .to_string();
|
|
|
+
|
|
|
+ // Concat SNV + indel
|
|
|
+ let mut concat = BcftoolsConcat::from_config(
|
|
|
+ &self.config,
|
|
|
+ vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
|
|
|
+ &tmp_path,
|
|
|
+ );
|
|
|
+
|
|
|
+ let report = SlurmRunner::run(&mut concat)
|
|
|
+ .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
|
|
|
+ .context("Failed to save concat logs")?;
|
|
|
+
|
|
|
+ // Filter PASS
|
|
|
+ let mut keep_pass =
|
|
|
+ BcftoolsKeepPass::from_config(&self.config, tmp_path.clone(), passed_vcf.clone());
|
|
|
+
|
|
|
+ let report = SlurmRunner::run(&mut keep_pass)
|
|
|
+ .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
|
|
|
+
|
|
|
+ report
|
|
|
+ .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
|
|
|
+ .context("Failed to save PASS filter logs")?;
|
|
|
+
|
|
|
+ fs::remove_file(&tmp_path).context("Failed to remove temporary concat VCF")?;
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Post-processes ClairS output via Slurm.
|
|
|
+ fn postprocess_sbatch(&self) -> anyhow::Result<()> {
|
|
|
+ self.process_germline_sbatch()?;
|
|
|
+ self.process_somatic_sbatch()?;
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- /// Local execution: runs ClairS via `LocalRunner` then post-processes VCFs locally.
|
|
|
+ /// Runs ClairS locally with post-processing.
|
|
|
pub fn run_local(&mut self) -> anyhow::Result<CapturedOutput> {
|
|
|
if !self.should_run() {
|
|
|
debug!(
|
|
|
@@ -395,7 +486,7 @@ impl ClairS {
|
|
|
Ok(out)
|
|
|
}
|
|
|
|
|
|
- /// Slurm execution: submits ClairS via sbatch (Docker inside job) then bcftools via Slurm.
|
|
|
+ /// Runs ClairS via Slurm with post-processing.
|
|
|
pub fn run_sbatch(&mut self) -> anyhow::Result<CapturedOutput> {
|
|
|
if !self.should_run() {
|
|
|
debug!(
|
|
|
@@ -412,10 +503,7 @@ impl ClairS {
|
|
|
Ok(out)
|
|
|
}
|
|
|
|
|
|
- /// Per-part output directory.
|
|
|
- ///
|
|
|
- /// For chunked runs, this is `{clairs_output_dir(id)}/part{idx}`.
|
|
|
- /// For full-genome runs, just `clairs_output_dir(id)`.
|
|
|
+ /// Returns the per-part output directory.
|
|
|
fn part_output_dir(&self) -> String {
|
|
|
let base_dir = self.config.clairs_output_dir(&self.id);
|
|
|
match self.part_index {
|
|
|
@@ -424,15 +512,13 @@ impl ClairS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Somatic PASS VCF path for this run.
|
|
|
+ /// Returns the somatic PASS VCF path.
|
|
|
///
|
|
|
- /// - When `part_index.is_some()`: per-part intermediate PASS VCF
|
|
|
- /// (inside the part dir), later merged.
|
|
|
- /// - When `None`: canonical final path from `Config::clairs_passed_vcf`.
|
|
|
+ /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz`
|
|
|
+ /// - Full runs: canonical path from config
|
|
|
fn somatic_passed_vcf_path(&self) -> String {
|
|
|
match self.part_index {
|
|
|
Some(idx) => {
|
|
|
- // Example: {clairs_output_dir(id)}/part{idx}/clairs.part{idx}.pass.vcf.gz
|
|
|
let outdir = self.part_output_dir();
|
|
|
format!("{outdir}/clairs.part{idx}.pass.vcf.gz")
|
|
|
}
|
|
|
@@ -441,8 +527,6 @@ impl ClairS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* ---------------- Variant / Label / Version impls ------------------------ */
|
|
|
-
|
|
|
impl CallerCat for ClairS {
|
|
|
fn caller_cat(&self) -> Annotation {
|
|
|
Annotation::Callers(Caller::ClairS, Sample::Somatic)
|
|
|
@@ -452,17 +536,18 @@ impl CallerCat for ClairS {
|
|
|
impl Variants for ClairS {
|
|
|
fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = self.caller_cat();
|
|
|
- let add = vec![caller.clone()];
|
|
|
let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
|
|
|
|
|
|
info!("Loading variants from {caller}: {passed_vcf}");
|
|
|
+
|
|
|
let variants = read_vcf(passed_vcf)
|
|
|
.with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
|
|
|
|
|
|
variants.par_iter().for_each(|v| {
|
|
|
- annotations.insert_update(v.hash(), &add);
|
|
|
+ annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
|
|
|
});
|
|
|
- info!("{}, {} variants loaded.", caller, variants.len());
|
|
|
+
|
|
|
+ info!("{caller}: {} variants loaded", variants.len());
|
|
|
|
|
|
Ok(VariantCollection {
|
|
|
variants,
|
|
|
@@ -473,21 +558,24 @@ impl Variants for ClairS {
|
|
|
}
|
|
|
|
|
|
impl ClairS {
|
|
|
+ /// Loads germline variants from the Clair3 germline output.
|
|
|
pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
|
|
|
- let add = vec![caller.clone()];
|
|
|
- let clair3_germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
+ let germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
+
|
|
|
+ info!("Loading germline variants from {caller}: {germline_passed}");
|
|
|
+
|
|
|
+ let variants = read_vcf(germline_passed)?;
|
|
|
|
|
|
- info!("Loading variants from {caller}: {clair3_germline_passed}");
|
|
|
- let variants = read_vcf(clair3_germline_passed)?;
|
|
|
variants.par_iter().for_each(|v| {
|
|
|
- annotations.insert_update(v.hash(), &add);
|
|
|
+ annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
|
|
|
});
|
|
|
- info!("{}, {} variants loaded.", caller, variants.len());
|
|
|
+
|
|
|
+ info!("{caller}: {} variants loaded", variants.len());
|
|
|
|
|
|
Ok(VariantCollection {
|
|
|
variants,
|
|
|
- vcf: Vcf::new(clair3_germline_passed.into())?,
|
|
|
+ vcf: Vcf::new(germline_passed.into())?,
|
|
|
caller,
|
|
|
})
|
|
|
}
|
|
|
@@ -510,74 +598,86 @@ impl Version for ClairS {
|
|
|
.stdout(Stdio::piped())
|
|
|
.stderr(Stdio::piped())
|
|
|
.output()
|
|
|
- .context("failed to spawn singularity")?;
|
|
|
+ .context("Failed to spawn Singularity process")?;
|
|
|
|
|
|
if !out.status.success() {
|
|
|
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
|
|
|
- log.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
|
- anyhow::bail!("singularity run failed: {}\n{}", out.status, log);
|
|
|
+ let combined = format!(
|
|
|
+ "{}{}",
|
|
|
+ String::from_utf8_lossy(&out.stdout),
|
|
|
+ String::from_utf8_lossy(&out.stderr)
|
|
|
+ );
|
|
|
+ anyhow::bail!(
|
|
|
+ "Singularity exec failed with status {}: {}",
|
|
|
+ out.status,
|
|
|
+ combined
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- let mut log = String::from_utf8_lossy(&out.stdout).to_string();
|
|
|
- log.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
|
+ let combined = format!(
|
|
|
+ "{}{}",
|
|
|
+ String::from_utf8_lossy(&out.stdout),
|
|
|
+ String::from_utf8_lossy(&out.stderr)
|
|
|
+ );
|
|
|
|
|
|
- let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)")?;
|
|
|
- let caps = re
|
|
|
- .captures(&log)
|
|
|
- .context("could not parse ClairS version from output")?;
|
|
|
- Ok(caps.get(1).unwrap().as_str().to_string())
|
|
|
+ parse_clairs_version(&combined)
|
|
|
}
|
|
|
|
|
|
- /// Slurm: run `/opt/bin/run_clairs --version` inside a small sbatch job.
|
|
|
fn version_slurm(config: &Config) -> anyhow::Result<String> {
|
|
|
- // Minimal Slurm job just to run the version command
|
|
|
struct ClairSVersionJob<'a> {
|
|
|
config: &'a Config,
|
|
|
}
|
|
|
|
|
|
- impl<'a> JobCommand for ClairSVersionJob<'a> {
|
|
|
+ impl JobCommand for ClairSVersionJob<'_> {
|
|
|
fn cmd(&self) -> String {
|
|
|
format!(
|
|
|
- "{} exec {} /opt/bin/run_clairs \
|
|
|
- --version",
|
|
|
+ "{} exec {} /opt/bin/run_clairs --version",
|
|
|
self.config.singularity_bin, self.config.clairs_image
|
|
|
)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- impl<'a> SlurmRunner for ClairSVersionJob<'a> {
|
|
|
+ impl SlurmRunner for ClairSVersionJob<'_> {
|
|
|
fn slurm_args(&self) -> Vec<String> {
|
|
|
SlurmParams {
|
|
|
job_name: Some("clairs_version".into()),
|
|
|
- partition: Some("shortq".into()), // adjust to your CPU partition
|
|
|
+ partition: Some("shortq".into()),
|
|
|
cpus_per_task: Some(1),
|
|
|
mem: Some("10G".into()),
|
|
|
- gres: None, // no GPU
|
|
|
+ gres: None,
|
|
|
}
|
|
|
.to_args()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
let mut job = ClairSVersionJob { config };
|
|
|
- let out = SlurmRunner::run(&mut job).context("failed to run ClairS --version via Slurm")?;
|
|
|
+ let out = SlurmRunner::run(&mut job).context("Failed to run ClairS --version via Slurm")?;
|
|
|
|
|
|
- // Combine stdout, Slurm epilog (if any), and stderr for parsing
|
|
|
- let mut log = out.stdout.clone();
|
|
|
+ let mut combined = out.stdout.clone();
|
|
|
if let Some(epilog) = &out.slurm_epilog {
|
|
|
- log.push_str(&epilog.to_string());
|
|
|
+ combined.push_str(&epilog.to_string());
|
|
|
}
|
|
|
- log.push_str(&out.stderr);
|
|
|
+ combined.push_str(&out.stderr);
|
|
|
|
|
|
- let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)")?;
|
|
|
- let caps = re
|
|
|
- .captures(&log)
|
|
|
- .context("could not parse ClairS version from Slurm output")?;
|
|
|
-
|
|
|
- Ok(caps.get(1).unwrap().as_str().to_string())
|
|
|
+ parse_clairs_version(&combined)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// Merge N chunked ClairS PASS VCFs into the final clairs_passed_vcf().
|
|
|
+/// Parses ClairS version from command output.
|
|
|
+fn parse_clairs_version(output: &str) -> anyhow::Result<String> {
|
|
|
+ let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid");
|
|
|
+
|
|
|
+ let caps = re
|
|
|
+ .captures(output)
|
|
|
+ .context("Could not parse ClairS version from output")?;
|
|
|
+
|
|
|
+ Ok(caps
|
|
|
+ .get(1)
|
|
|
+ .expect("Regex has capture group 1")
|
|
|
+ .as_str()
|
|
|
+ .to_string())
|
|
|
+}
|
|
|
+
|
|
|
+/// Merges N chunked ClairS PASS VCFs into the final output.
|
|
|
fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
|
|
|
let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
|
|
|
|
|
|
@@ -588,7 +688,9 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
|
|
|
|
|
|
anyhow::ensure!(
|
|
|
Path::new(&part_pass).exists(),
|
|
|
- "Missing ClairS part {i} PASS VCF: {part_pass}"
|
|
|
+ "Missing ClairS part {} PASS VCF: {}",
|
|
|
+ i,
|
|
|
+ part_pass
|
|
|
);
|
|
|
|
|
|
part_pass_paths.push(PathBuf::from(part_pass));
|
|
|
@@ -619,30 +721,103 @@ fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
+/// Merges N chunked ClairS germline PASS VCFs into the final output.
|
|
|
+fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
|
|
|
+ let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
|
|
|
+
|
|
|
+ for i in 1..=n_parts {
|
|
|
+ let mut part = base.clone();
|
|
|
+ part.part_index = Some(i);
|
|
|
+ let part_pass = part.germline_passed_vcf_path();
|
|
|
+
|
|
|
+ anyhow::ensure!(
|
|
|
+ Path::new(&part_pass).exists(),
|
|
|
+ "Missing ClairS germline part {} PASS VCF: {}",
|
|
|
+ i,
|
|
|
+ part_pass
|
|
|
+ );
|
|
|
+
|
|
|
+ part_pass_paths.push(PathBuf::from(part_pass));
|
|
|
+ }
|
|
|
+
|
|
|
+ let final_passed_vcf = base.config.clairs_germline_passed_vcf(&base.id);
|
|
|
+ let final_tmp = format!("{final_passed_vcf}.tmp");
|
|
|
+
|
|
|
+ if let Some(parent) = Path::new(&final_passed_vcf).parent() {
|
|
|
+ fs::create_dir_all(parent)?;
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Concatenating {} ClairS germline part VCFs into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
+
|
|
|
+ let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
|
|
|
+ SlurmRunner::run(&mut concat)
|
|
|
+ .context("Failed to run bcftools concat for ClairS germline parts")?;
|
|
|
+
|
|
|
+ fs::rename(&final_tmp, &final_passed_vcf)
|
|
|
+ .context("Failed to rename merged ClairS germline PASS VCF")?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Successfully merged {} ClairS germline parts into {}",
|
|
|
+ n_parts, final_passed_vcf
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+/// Runs ClairS in parallel chunks via Slurm, then merges results.
|
|
|
+///
|
|
|
+/// # Steps
|
|
|
+///
|
|
|
+/// 1. Split genome into N regions
|
|
|
+/// 2. Submit parallel Slurm jobs (one per region)
|
|
|
+/// 3. Post-process each part (concat SNV+indel, filter PASS)
|
|
|
+/// 4. Merge all part PASS VCFs into final output
|
|
|
+/// 5. Process germline VCF (from first part or full run)
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+///
|
|
|
+/// * `id` - Sample identifier
|
|
|
+/// * `config` - Pipeline configuration
|
|
|
+/// * `n_parts` - Target number of parallel jobs
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+///
|
|
|
+/// Vector of captured outputs from each Slurm job.
|
|
|
pub fn run_clairs_chunked_sbatch_with_merge(
|
|
|
id: &str,
|
|
|
config: &Config,
|
|
|
n_parts: usize,
|
|
|
) -> anyhow::Result<Vec<CapturedOutput>> {
|
|
|
+ anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
|
|
|
+
|
|
|
let base = ClairS::initialize(id, config)?;
|
|
|
|
|
|
- // If final VCF already up-to-date, skip (uses full run ShouldRun logic)
|
|
|
if !base.should_run() {
|
|
|
debug!("ClairS PASS VCF already up-to-date for {id}, skipping.");
|
|
|
return Ok(Vec::new());
|
|
|
}
|
|
|
|
|
|
- // Genome sizes from normal BAM header
|
|
|
+ // Get genome sizes from normal BAM header
|
|
|
let normal_bam = config.normal_bam(id);
|
|
|
- let reader =
|
|
|
- bam::Reader::from_path(&normal_bam).with_context(|| format!("Opening BAM {normal_bam}"))?;
|
|
|
+ let reader = bam::Reader::from_path(&normal_bam)
|
|
|
+ .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
|
|
|
let header = bam::Header::from_template(reader.header());
|
|
|
let genome_sizes = get_genome_sizes(&header)?;
|
|
|
|
|
|
+ // Split genome into regions
|
|
|
let regions = split_genome_into_n_regions(&genome_sizes, n_parts);
|
|
|
- let n_jobs = regions.len();
|
|
|
+ let actual_n_parts = regions.len();
|
|
|
|
|
|
- let mut jobs = Vec::with_capacity(n_jobs);
|
|
|
+ info!(
|
|
|
+ "Running ClairS in {} parallel parts for {}",
|
|
|
+ actual_n_parts, id
|
|
|
+ );
|
|
|
+
|
|
|
+ // Build jobs
|
|
|
+ let mut jobs = Vec::with_capacity(actual_n_parts);
|
|
|
for (i, region) in regions.into_iter().enumerate() {
|
|
|
let mut job = base.clone();
|
|
|
job.part_index = Some(i + 1);
|
|
|
@@ -653,10 +828,23 @@ pub fn run_clairs_chunked_sbatch_with_merge(
|
|
|
}
|
|
|
|
|
|
// Run all parts via Slurm
|
|
|
- let outputs = run_many_sbatch(jobs)?;
|
|
|
-
|
|
|
- // Merge somatic PASS VCFs into final clairs_passed_vcf()
|
|
|
- merge_clairs_parts(&base, n_parts)?;
|
|
|
+ let outputs = run_many_sbatch(jobs.clone())?;
|
|
|
+
|
|
|
+ // Post-process each part (creates .pass.vcf.gz files for both somatic and germline)
|
|
|
+ for job in &jobs {
|
|
|
+ job.postprocess_sbatch()?;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Merge somatic PASS VCFs
|
|
|
+ merge_clairs_parts(&base, actual_n_parts)?;
|
|
|
+
|
|
|
+ // Merge germline PASS VCFs
|
|
|
+ merge_clairs_germline_parts(&base, actual_n_parts)?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "ClairS completed for {}: {} parts merged (somatic + germline)",
|
|
|
+ id, actual_n_parts
|
|
|
+ );
|
|
|
|
|
|
Ok(outputs)
|
|
|
}
|
|
|
@@ -681,10 +869,11 @@ mod tests {
|
|
|
fn clairs_run() -> anyhow::Result<()> {
|
|
|
test_init();
|
|
|
let config = Config::default();
|
|
|
- let u = ClairS::initialize("34528", &config)?;
|
|
|
- info!("{u}");
|
|
|
+ let clairs = ClairS::initialize("34528", &config)?;
|
|
|
+ info!("{clairs}");
|
|
|
|
|
|
- let _ = run_clairs_chunked_sbatch_with_merge("34528", &config, 5)?;
|
|
|
+ let outputs = run_clairs_chunked_sbatch_with_merge("34528", &config, 5)?;
|
|
|
+ info!("Completed with {} job outputs", outputs.len());
|
|
|
|
|
|
Ok(())
|
|
|
}
|