| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202 |
- //! # ClairS Somatic Variant Calling Pipeline
- //!
- //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS),
- //! a deep learning-based somatic variant caller designed for long-read sequencing data
- //! (ONT and PacBio HiFi).
- //!
- //! ## Overview
- //!
- //! ClairS performs somatic variant calling on paired tumor-normal samples using:
- //!
- //! - Haplotype-aware variant calling with LongPhase integration
- //! - Separate SNV and indel calling pipelines
- //! - Clair3-based germline variant detection on the normal sample
- //!
- //! ## Key Features
- //!
- //! - **Deep learning-based** - CNN models trained on long-read data
- //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy
- //! - **Dual output** - Somatic and germline variants in a single run
- //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi
- //! - **Parallel execution** - Genome chunking for HPC scalability
- //!
- //! ## Requirements
- //!
- //! Before running ClairS, ensure:
- //! - Tumor and normal BAMs are indexed (`.bai` files present)
- //! - Reference genome is accessible
- //! - Singularity/Docker image is available
- //! - Platform is correctly specified (`config.clairs_platform`)
- //!
- //! ## Execution Modes
- //!
- //! The module supports three execution strategies, all dispatched through [`Run::run`]:
- //!
- //! - **Local** — Single-node execution via [`LocalRunner`](crate::commands::LocalRunner)
- //! - **Slurm** — Single HPC job submission via [`SbatchRunner`](crate::commands::SbatchRunner)
- //! - **Chunked** — Parallel execution across genome regions (enabled automatically when
- //! `config.slurm_runner = true`, calls the internal `run_chunked` function)
- //!
- //! ### Chunked Parallel Execution
- //!
- //! For large genomes, the chunked mode provides significant speedup:
- //!
- //! ```text
- //! ┌─────────────────────────────────────────────────────────────────┐
- //! │ Genome Splitting │
- //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │
- //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘
- //! │ │ │ │
- //! ▼ ▼ ▼ ▼
- //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
- //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │
- //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │
- //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
- //! │ │ │ │
- //! ▼ ▼ ▼ ▼
- //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
- //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│
- //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │
- //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │
- //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
- //! │ │ │ │
- //! └───────────────┴───────┬───────┴───────────────────┘
- //! ▼
- //! ┌─────────────────────┐
- //! │ bcftools concat │
- //! │ (somatic PASS) │
- //! └──────────┬──────────┘
- //! ▼
- //! ┌─────────────────────┐
- //! │ Final VCF Output │
- //! └─────────────────────┘
- //! ```
- //!
- //! ## Output Files
- //!
- //! Somatic variants (PASS only):
- //! ```text
- //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz
- //! ```
- //!
- //! Germline variants (PASS only):
- //! ```text
- //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz
- //! ```
- //!
- //! ## Post-processing Pipeline
- //!
- //! Each ClairS run (or part) undergoes automatic post-processing:
- //!
- //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants
- //! 2. **Germline**: Filter PASS variants from Clair3 germline output
- //!
- //! ## Usage
- //!
- //! ### Basic Execution
- //!
- //! ```ignore
- //! use pandora_lib_promethion::callers::clairs::ClairS;
- //! use pandora_lib_promethion::config::Config;
- //! use pandora_lib_promethion::pipes::Initialize;
- //! use pandora_lib_promethion::runners::Run;
- //!
- //! let config = Config::default();
- //! // config.slurm_runner = true → chunked Slurm execution (60 parts)
- //! // config.slurm_runner = false → local single-node execution
- //! let mut clairs = ClairS::initialize("sample_001", &config)?;
- //! clairs.run()?;
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ### Loading Variants for Downstream Analysis
- //!
- //! ```ignore
- //! use pandora_lib_promethion::annotation::Annotations;
- //! use pandora_lib_promethion::variant::vcf_variant::Variants;
- //!
- //! let annotations = Annotations::new();
- //! let somatic = clairs.variants(&annotations)?;
- //! let germline = clairs.germline(&annotations)?;
- //!
- //! println!("Somatic: {} variants", somatic.variants.len());
- //! println!("Germline: {} variants", germline.variants.len());
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ## Configuration Requirements
- //!
- //! The following [`Config`](crate::config::Config) fields must be set:
- //!
- //! - `singularity_bin` — Path to Singularity executable
- //! - `clairs_image` — ClairS container image path
- //! - `reference` — Reference genome FASTA
- //! - `clairs_threads` — CPU threads per job
- //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.)
- //! - `clairs_force` — Force re-run even if outputs exist
- //!
- //! ## Implemented Traits
- //!
- //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation
- //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating
- //! - [`JobCommand`](crate::commands::Command) — Command string generation
- //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support
- //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission
- //! - [`Run`](crate::runners::Run) — Unified execution interface
- //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading
- //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category
- //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier
- //! - [`Version`](crate::pipes::Version) — Tool version extraction
- //!
- //! ## Dependencies
- //!
- //! External tools (containerized or system):
- //!
- //! - **ClairS** — Somatic variant calling
- //! - **bcftools** — VCF concatenation and filtering
- //!
- //! ## References
- //!
- //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
- //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
- collection::vcf::Vcf,
- commands::{
- bcftools::{BcftoolsConcat, BcftoolsKeepPass},
- exec_jobs,
- samtools::SamtoolsMergeMany,
- AnyJob, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
- SlurmRunner,
- },
- config::Config,
- helpers::{
- get_genome_sizes_in_header_order, is_file_older, list_files_recursive,
- remove_dir_if_exists, singularity_bind_flags, split_ordered_genome_into_n_regions_exact,
- },
- io::vcf::read_vcf,
- jobs_seq,
- locker::SampleLock,
- pipes::{Initialize, ShouldRun, Version},
- runners::Run,
- variant::{
- variant_collection::VariantCollection,
- vcf_variant::{Label, Variants},
- },
- };
- use anyhow::Context;
- use log::{debug, info, warn};
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use regex::Regex;
- use rust_htslib::bam::{self, Read};
- use std::{
- fmt, fs,
- path::{Path, PathBuf},
- process::{Command as ProcessCommand, Stdio},
- };
- use uuid::Uuid;
- /// ClairS haplotype-aware somatic variant caller runner.
- ///
- /// Executes ClairS for paired tumor-normal variant calling with automatic
- /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction).
- ///
- /// # Fields
- ///
- /// - `id` - Sample identifier (e.g., "34528")
- /// - `config` - All resolved paths, tool settings, and Slurm parameters (see [`ClairsConfig`])
- /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`)
- /// - `part_index` - Chunk index when running in parallel (e.g., `Some(3)` for part 3 of N);
- /// `None` for full-genome runs
- ///
- /// # Execution
- ///
- /// Call [`Run::run`] — it dispatches automatically:
- /// - `config.slurm_runner = false` → local single-node execution
- /// - `config.slurm_runner = true` → chunked parallel Slurm execution (60 genome parts)
- ///
- /// # Output Files
- ///
- /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
- /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
- #[derive(Debug, Clone)]
- pub struct ClairS {
- pub id: String,
- pub config: ClairsConfig,
- pub region: Option<String>,
- pub part_index: Option<usize>,
- }
- /// Resolved configuration for a single ClairS run.
- ///
- /// Built once at initialisation via [`ClairsConfig::from_config`] and carried
- /// inside [`ClairS`]. All path templates from [`Config`] are expanded here so
- /// that `ClairS` methods never need to re-expand them.
- #[derive(Debug, Clone)]
- pub struct ClairsConfig {
- // Execution
- pub singularity_bin: String,
- pub clairs_image: String,
- pub clairs_threads: u8,
- pub clairs_platform: String,
- pub clairs_force: bool,
- pub clairs_keep_parts: bool,
- // Paths
- pub reference: String,
- pub tmp_dir: String,
- pub result_dir: String,
- // Slurm
- pub slurm_runner: bool,
- pub slurm_max_par: u8,
- // Sample-scoped paths, computed once at init
- pub normal_bam: String,
- pub tumoral_bam: String,
- pub passed_vcf: String,
- pub germline_passed_vcf: String,
- pub germline_normal_vcf: String,
- pub output_dir: String,
- pub snv_vcf: String,
- pub indel_vcf: String,
- pub log_dir: String,
- pub lock_dir: String,
- // BcftoolsConcat
- pub bcftools_bin: String,
- pub bcftools_threads: u8,
- }
- impl ClairsConfig {
- pub fn from_config(config: &Config, id: &str) -> Self {
- let output_dir = config.clairs_output_dir(id);
- let (snv_vcf, indel_vcf) = config.clairs_output_vcfs(id);
- Self {
- singularity_bin: config.singularity_bin.clone(),
- clairs_image: config.clairs_image.clone(),
- clairs_threads: config.clairs_threads,
- clairs_platform: config.clairs_platform.clone(),
- clairs_force: config.clairs_force,
- clairs_keep_parts: config.clairs_keep_parts,
- reference: config.reference.clone(),
- tmp_dir: config.tmp_dir.clone(),
- result_dir: config.result_dir.clone(),
- slurm_runner: config.slurm_runner,
- slurm_max_par: config.slurm_max_par,
- normal_bam: config.normal_bam(id),
- tumoral_bam: config.tumoral_bam(id),
- passed_vcf: config.clairs_passed_vcf(id),
- germline_passed_vcf: config.clairs_germline_passed_vcf(id),
- germline_normal_vcf: config.clairs_germline_normal_vcf(id),
- output_dir,
- snv_vcf,
- indel_vcf,
- log_dir: format!("{}/{}/log/clairs", config.result_dir, id),
- lock_dir: format!("{}/locks", config.result_dir),
- bcftools_bin: config.bcftools_bin.clone(),
- bcftools_threads: config.bcftools_threads,
- }
- }
- }
- impl fmt::Display for ClairS {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "🧬 ClairS {}", self.id)?;
- writeln!(
- f,
- " Region : {}",
- self.region.as_deref().unwrap_or("genome-wide")
- )?;
- writeln!(
- f,
- " Part : {}",
- self.part_index
- .map_or("full".into(), |n| format!("part{n}"))
- )?;
- writeln!(f, " Log dir : {}", self.config.log_dir)
- }
- }
- impl Initialize for ClairS {
- fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- info!("Initializing ClairS for {id}");
- let clairs = Self {
- config: ClairsConfig::from_config(config, &id),
- id,
- region: None,
- part_index: None,
- };
- if config.clairs_force {
- remove_dir_if_exists(&config.clairs_output_dir(&clairs.id))?;
- }
- Ok(clairs)
- }
- }
- impl ShouldRun for ClairS {
- /// Determines whether ClairS should be re-run based on BAM modification timestamps.
- fn should_run(&self) -> bool {
- let passed_vcf = &self.config.passed_vcf;
- let normal_older = is_file_older(passed_vcf, &self.config.normal_bam, true).unwrap_or(true);
- let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam, true).unwrap_or(true);
- let result = normal_older || tumor_older;
- if result {
- info!("ClairS should run for id: {}.", self.id);
- }
- result
- }
- }
- impl JobCommand for ClairS {
- fn init(&mut self) -> anyhow::Result<()> {
- let output_dir = &self.config.output_dir;
- fs::create_dir_all(output_dir)
- .with_context(|| format!("Failed to create dir: {output_dir}"))?;
- fs::create_dir_all(&self.config.log_dir)
- .with_context(|| format!("Failed to create dir: {}", self.config.log_dir))?;
- Ok(())
- }
- fn cmd(&self) -> String {
- let output_dir = &self.config.output_dir;
- let region_arg = self
- .region
- .as_ref()
- .map(|r| format!("-r '{r}'"))
- .unwrap_or_default();
- let sample_name = format!("{}_diag", self.id);
- let bind_flags = singularity_bind_flags([
- &self.config.tumoral_bam,
- &self.config.normal_bam,
- &self.config.reference,
- output_dir,
- &self.config.log_dir,
- &self.config.tmp_dir,
- ]);
- format!(
- "{singularity_bin} exec --cleanenv \
- {binds} \
- --bind {output_dir}:{output_dir} \
- {image} \
- /opt/bin/run_clairs \
- -T {tumor_bam} \
- -N {normal_bam} \
- -R {reference} \
- -t {threads} \
- -p {platform} \
- --enable_indel_calling \
- --include_all_ctgs \
- --print_germline_calls \
- --enable_clair3_germline_output \
- --use_longphase_for_intermediate_haplotagging true \
- --output_dir {output_dir} \
- -s {sample_name} \
- {region_arg}",
- singularity_bin = self.config.singularity_bin,
- binds = bind_flags,
- image = self.config.clairs_image,
- tumor_bam = self.config.tumoral_bam,
- normal_bam = self.config.normal_bam,
- reference = self.config.reference,
- threads = self.config.clairs_threads,
- platform = self.config.clairs_platform,
- output_dir = output_dir,
- sample_name = sample_name,
- region_arg = region_arg,
- )
- }
- }
- impl LocalRunner for ClairS {}
- // impl SlurmRunner for ClairS {
- // fn slurm_args(&self) -> Vec<String> {
- // self.slurm_params().to_args()
- // }
- // }
- impl SbatchRunner for ClairS {
- fn slurm_params(&self) -> SlurmParams {
- let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
- SlurmParams {
- job_name: Some(format!("clairs_{}{}", self.id, batch_id)),
- cpus_per_task: Some(self.config.clairs_threads as u32),
- mem: Some("50G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- }
- }
- impl Run for ClairS {
- fn run(&mut self) -> anyhow::Result<()> {
- if !self.should_run() {
- info!("ClairS is up-to-date.");
- return Ok(());
- }
- // Acquire lock before any work
- let lock_dir = format!("{}/locks", self.config.result_dir);
- let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
- .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
- if self.config.slurm_runner {
- run_chunked(&self.id, &self.config, 60)?;
- } else {
- let (somatic_concat, somatic_tmp_path) = self.process_somatic_concat()?;
- let somatic_pass = self.process_somatic_pass(&somatic_tmp_path)?;
- let germline = self.process_germline()?;
- let outputs = exec_jobs(
- vec![jobs_seq![
- self.clone(),
- germline,
- somatic_concat,
- somatic_pass,
- ]],
- false,
- 1,
- )
- .context("Failed to run ClairS local pipeline")?;
- for output in &outputs {
- output.save_to_file(format!("{}/clairs_", self.config.log_dir))?;
- }
- // Clean up temporary concatenated VCF
- debug!("Removing temporary file: {}", somatic_tmp_path);
- if let Err(e) = fs::remove_file(&somatic_tmp_path) {
- warn!(
- "Failed to remove temporary file {}: {}",
- somatic_tmp_path, e
- );
- }
- }
- Ok(())
- }
- }
- impl ClairS {
- /// Returns the germline PASS VCF path.
- ///
- /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz`
- /// - Full runs: canonical path from config
- fn germline_passed_vcf_path(&self) -> String {
- match self.part_index {
- Some(idx) => {
- format!(
- "{}/clair3_germline.part{idx}.pass.vcf.gz",
- self.config.output_dir
- )
- }
- None => self.config.germline_passed_vcf.clone(),
- }
- }
- fn process_germline(&self) -> anyhow::Result<BcftoolsKeepPass> {
- let germline_passed = self.germline_passed_vcf_path();
- let germline_input = match self.part_index {
- Some(_) => {
- let output_dir = self.config.output_dir.clone();
- let base = self.config.germline_normal_vcf.clone();
- let base_name = Path::new(&base)
- .file_name()
- .with_context(|| {
- format!("germline VCF path has no filename component: {base}")
- })?
- .to_string_lossy();
- format!("{output_dir}/{base_name}")
- }
- None => self.config.germline_normal_vcf.clone(),
- };
- info!(
- "Filtering germline PASS variants for {} part {:?}",
- self.id, self.part_index
- );
- Ok(BcftoolsKeepPass {
- bin: self.config.bcftools_bin.clone(),
- threads: self.config.bcftools_threads,
- input: germline_input.into(),
- output: germline_passed.clone().into(),
- tmp_dir: self.config.tmp_dir.clone().into(),
- tmp_sort: PathBuf::new(),
- tmp_norm: PathBuf::new(),
- })
- }
- fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> {
- let output_dir = self.config.output_dir.clone();
- let snv_vcf_base = self.config.snv_vcf.clone();
- let indel_vcf_base = self.config.indel_vcf.clone();
- let snv_vcf = format!(
- "{}/{}",
- output_dir,
- Path::new(&snv_vcf_base)
- .file_name()
- .with_context(|| format!("SNV VCF path has no filename component: {snv_vcf_base}"))?
- .to_string_lossy()
- );
- let indel_vcf = format!(
- "{}/{}",
- output_dir,
- Path::new(&indel_vcf_base)
- .file_name()
- .with_context(|| {
- format!("indel VCF path has no filename component: {indel_vcf_base}")
- })?
- .to_string_lossy()
- );
- info!(
- "Concatenating and filtering somatic variants for {} part {:?}",
- self.id, self.part_index
- );
- let tmp_file =
- Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4()));
- let tmp_path = tmp_file
- .to_str()
- .context("Temp path not valid UTF-8")?
- .to_string();
- // Concat SNV + indel
- let concat = BcftoolsConcat {
- bin: self.config.bcftools_bin.clone(),
- threads: self.config.bcftools_threads,
- inputs: vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
- output: tmp_path.clone().into(),
- tmp_dir: self.config.tmp_dir.clone().into(),
- tmp_list: PathBuf::new(),
- };
- Ok((concat, tmp_path))
- }
- pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result<BcftoolsKeepPass> {
- let passed_vcf = self.somatic_passed_vcf_path();
- // Filter PASS
- let keep_pass = BcftoolsKeepPass {
- bin: self.config.bcftools_bin.clone(),
- threads: self.config.bcftools_threads,
- input: tmp_path.into(),
- output: passed_vcf.clone().into(),
- tmp_dir: self.config.tmp_dir.clone().into(),
- tmp_sort: PathBuf::new(),
- tmp_norm: PathBuf::new(),
- };
- Ok(keep_pass)
- }
- fn for_part(&self, idx: usize) -> Self {
- let mut part = self.clone();
- part.part_index = Some(idx);
- part.config.output_dir = format!("{}/part{idx}", self.config.output_dir);
- part.config.log_dir = format!("{}/part{idx}", self.config.log_dir);
- Self { ..part }
- }
- /// Returns the somatic PASS VCF path.
- ///
- /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz`
- /// - Full runs: canonical path from config
- fn somatic_passed_vcf_path(&self) -> String {
- match self.part_index {
- Some(idx) => {
- format!("{}/clairs.part{idx}.pass.vcf.gz", self.config.output_dir)
- }
- None => self.config.passed_vcf.clone(),
- }
- }
- pub fn run_chunked_resume(&self, n_parts: usize) -> anyhow::Result<()> {
- run_chunked_resume(&self.id, &self.config, n_parts)
- }
- }
- impl CallerCat for ClairS {
- fn caller_cat(&self) -> Annotation {
- Annotation::Callers(Caller::ClairS, Sample::Somatic)
- }
- }
- impl Variants for ClairS {
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let passed_vcf = &self.config.passed_vcf.clone();
- info!("Loading variants from {caller}: {passed_vcf}");
- let variants = read_vcf(passed_vcf)
- .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
- });
- info!("{caller}: {} variants loaded", variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(passed_vcf.into())?,
- caller,
- })
- }
- }
- impl ClairS {
- /// Loads germline variants from the Clair3 germline output.
- pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
- let germline_passed = &self.config.germline_passed_vcf;
- info!("Loading germline variants from {caller}: {germline_passed}");
- let variants = read_vcf(germline_passed)?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
- });
- info!("{caller}: {} variants loaded", variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(germline_passed.into())?,
- caller,
- })
- }
- }
- impl Label for ClairS {
- fn label(&self) -> String {
- self.caller_cat().to_string()
- }
- }
- impl Version for ClairS {
- fn version(config: &Config) -> anyhow::Result<String> {
- let out = ProcessCommand::new("bash")
- .arg("-c")
- .arg(format!(
- "{} exec {} /opt/bin/run_clairs --version",
- config.singularity_bin, config.clairs_image
- ))
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .output()
- .context("Failed to spawn Singularity process")?;
- if !out.status.success() {
- let combined = format!(
- "{}{}",
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr)
- );
- anyhow::bail!(
- "Singularity exec failed with status {}: {}",
- out.status,
- combined
- );
- }
- let combined = format!(
- "{}{}",
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr)
- );
- parse_clairs_version(&combined)
- }
- fn version_slurm(config: &Config) -> anyhow::Result<String> {
- struct ClairSVersionJob<'a> {
- config: &'a Config,
- }
- impl JobCommand for ClairSVersionJob<'_> {
- fn cmd(&self) -> String {
- format!(
- "{} exec {} /opt/bin/run_clairs --version",
- self.config.singularity_bin, self.config.clairs_image
- )
- }
- }
- impl SlurmRunner for ClairSVersionJob<'_> {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("clairs_version".into()),
- partition: Some("shortq".into()),
- cpus_per_task: Some(1),
- mem: Some("10G".into()),
- gres: None,
- }
- .to_args()
- }
- }
- let mut job = ClairSVersionJob { config };
- let out =
- SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?;
- let mut combined = out.stdout.clone();
- if let Some(epilog) = &out.slurm_epilog {
- combined.push_str(&epilog.to_string());
- }
- combined.push_str(&out.stderr);
- parse_clairs_version(&combined)
- }
- }
- /// Parses ClairS version from command output.
- fn parse_clairs_version(output: &str) -> anyhow::Result<String> {
- let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid");
- let caps = re
- .captures(output)
- .context("Could not parse ClairS version from output")?;
- Ok(caps
- .get(1)
- .expect("Regex has capture group 1")
- .as_str()
- .to_string())
- }
- /// Merges N chunked ClairS PASS VCFs into the final output.
- fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
- let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
- for i in 1..=n_parts {
- let part = base.for_part(i);
- let part_pass = part.somatic_passed_vcf_path();
- anyhow::ensure!(
- Path::new(&part_pass).exists(),
- "Missing ClairS part {} PASS VCF: {}",
- i,
- part_pass
- );
- part_pass_paths.push(PathBuf::from(part_pass));
- }
- let final_passed_vcf = base.config.passed_vcf.clone();
- let rand = Uuid::new_v4();
- let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
- let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
- if let Some(parent) = Path::new(&final_passed_vcf).parent() {
- fs::create_dir_all(parent)?;
- }
- info!(
- "Concatenating {} ClairS part VCFs into {}",
- n_parts, final_passed_vcf
- );
- let concat = BcftoolsConcat {
- bin: base.config.bcftools_bin.clone(),
- threads: base.config.bcftools_threads,
- inputs: part_pass_paths,
- output: final_tmp.clone().into(),
- tmp_dir: base.config.tmp_dir.clone().into(),
- tmp_list: PathBuf::new(),
- };
- let outputs = exec_jobs(
- vec![jobs_seq![concat]],
- base.config.slurm_runner,
- base.config.slurm_max_par.into(),
- )
- .context("Failed to run bcftools concat for ClairS germline parts")?;
- for output in &outputs {
- output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
- }
- anyhow::ensure!(
- Path::new(&final_tmp_csi).exists(),
- "CSI index not found after concat: {}",
- final_tmp_csi
- );
- fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
- fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
- .context("Failed to rename merged ClairS PASS VCF CSI")?;
- info!(
- "Successfully merged {} ClairS parts into {}",
- n_parts, final_passed_vcf
- );
- Ok(())
- }
- fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
- let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
- for i in 1..=n_parts {
- let part = base.for_part(i);
- let part_pass = part.germline_passed_vcf_path();
- anyhow::ensure!(
- Path::new(&part_pass).exists(),
- "Missing ClairS germline part {} PASS VCF: {}",
- i,
- part_pass
- );
- part_pass_paths.push(PathBuf::from(part_pass));
- }
- let final_passed_vcf = base.config.germline_passed_vcf.clone();
- let rand = Uuid::new_v4();
- let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
- let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
- if let Some(parent) = Path::new(&final_passed_vcf).parent() {
- fs::create_dir_all(parent)?;
- }
- info!(
- "Concatenating {} ClairS germline part VCFs into {}",
- n_parts, final_passed_vcf
- );
- let concat = BcftoolsConcat {
- bin: base.config.bcftools_bin.clone(),
- threads: base.config.bcftools_threads,
- inputs: part_pass_paths,
- output: final_tmp.clone().into(),
- tmp_dir: base.config.tmp_dir.clone().into(),
- tmp_list: PathBuf::new(),
- };
- let outputs = exec_jobs(
- vec![jobs_seq![concat]],
- base.config.slurm_runner,
- base.config.slurm_max_par.into(),
- )
- .context("Failed to run bcftools concat for ClairS germline parts")?;
- for output in &outputs {
- output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
- }
- anyhow::ensure!(
- Path::new(&final_tmp_csi).exists(),
- "CSI index not found after concat: {}",
- final_tmp_csi
- );
- fs::rename(&final_tmp, &final_passed_vcf)
- .context("Failed to rename merged ClairS germline PASS VCF")?;
- fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
- .context("Failed to rename merged ClairS germline PASS VCF CSI")?;
- info!(
- "Successfully merged {} ClairS germline parts into {}",
- n_parts, final_passed_vcf
- );
- Ok(())
- }
- pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> {
- let dir = config.clairs_output_dir(id);
- let bams: Vec<PathBuf> = list_files_recursive(dir)
- .into_iter()
- .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam"))
- .collect();
- let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf();
- info!("Merging {} into: {}", bams.len(), into.display());
- let merge = SamtoolsMergeMany::from_config(into, bams, config);
- exec_jobs(
- vec![jobs_seq![merge]],
- config.slurm_runner,
- config.slurm_max_par.into(),
- )?;
- Ok(())
- }
- /// Runs ClairS in parallel chunks with sequential per-part post-processing, then merges results.
- ///
- /// # Pipeline per part (sequential)
- /// `ClairS main → germline PASS filter → somatic SNV+indel concat → somatic PASS filter`
- ///
- /// # Parallelism
- /// Parts run in parallel (bounded by `config.slurm_max_par`), dispatched locally or via
- /// SLURM depending on `config.slurm_runner`.
- ///
- /// # Errors
- /// Fails fast within a part on any step error. Already-running jobs are not cancelled.
- fn run_chunked(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
- anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
- let base = ClairS {
- id: id.to_string(),
- config: config.clone(),
- region: None,
- part_index: None,
- };
- let normal_bam = base.config.normal_bam.clone();
- let reader = bam::Reader::from_path(&normal_bam)
- .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
- let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
- let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
- .into_iter()
- .flatten()
- .collect::<Vec<String>>();
- let actual_n_parts = regions.len();
- info!(
- "Running ClairS in {} parallel parts for {}",
- actual_n_parts, id
- );
- let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
- .into_iter()
- .enumerate()
- .map(|(i, region)| {
- let mut job = base.for_part(i + 1);
- job.region = Some(region);
- info!("Planned ClairS job:\n{job}");
- let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
- let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
- let germline = job.process_germline()?;
- Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
- })
- .collect();
- let outputs = exec_jobs(
- groups?,
- base.config.slurm_runner,
- base.config.slurm_max_par.into(),
- )?;
- for output in &outputs {
- output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
- }
- merge_clairs_parts(&base, actual_n_parts)?;
- merge_clairs_germline_parts(&base, actual_n_parts)?;
- if !config.clairs_keep_parts {
- let somatic_final = config.passed_vcf.clone();
- let germline_final = config.germline_passed_vcf.clone();
- anyhow::ensure!(
- Path::new(&somatic_final).exists(),
- "Refusing to clean up parts: merged somatic VCF not found at {}",
- somatic_final
- );
- anyhow::ensure!(
- Path::new(&germline_final).exists(),
- "Refusing to clean up parts: merged germline VCF not found at {}",
- germline_final
- );
- let base_dir = config.output_dir.clone();
- for i in 1..=actual_n_parts {
- let part_dir = format!("{base_dir}/part{i}");
- if Path::new(&part_dir).exists() {
- fs::remove_dir_all(&part_dir)
- .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
- debug!("Removed part directory: {part_dir}");
- }
- }
- info!("Cleaned up {} part directories for {}", actual_n_parts, id);
- }
- info!(
- "ClairS completed for {}: {} parts merged (somatic + germline)",
- id, actual_n_parts
- );
- Ok(())
- }
- fn part_is_complete(part: &ClairS) -> bool {
- Path::new(&part.somatic_passed_vcf_path()).exists()
- && Path::new(&format!("{}.csi", part.somatic_passed_vcf_path())).exists()
- && Path::new(&part.germline_passed_vcf_path()).exists()
- && Path::new(&format!("{}.csi", part.germline_passed_vcf_path())).exists()
- }
- fn run_chunked_resume(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
- anyhow::ensure!(n_parts > 0, "run_chunked_resume: n_parts must be > 0");
- let base = ClairS {
- id: id.to_string(),
- config: config.clone(),
- region: None,
- part_index: None,
- };
- let normal_bam = base.config.normal_bam.clone();
- let reader = bam::Reader::from_path(&normal_bam)
- .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
- let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
- let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
- .into_iter()
- .flatten()
- .collect::<Vec<String>>();
- let actual_n_parts = regions.len();
- info!(
- "Resuming ClairS chunked run for {} with {} parts",
- id, actual_n_parts
- );
- let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
- .into_iter()
- .enumerate()
- .filter_map(|(i, region)| {
- let mut job = base.for_part(i + 1);
- job.region = Some(region);
- if part_is_complete(&job) {
- info!("Skipping completed ClairS part {} for {}", i + 1, id);
- return None;
- }
- info!(
- "Scheduling missing/incomplete ClairS part {}:\n{job}",
- i + 1
- );
- Some((|| {
- let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
- let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
- let germline = job.process_germline()?;
- Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
- })())
- })
- .collect();
- let groups = groups?;
- if groups.is_empty() {
- info!("All ClairS parts already complete for {}", id);
- } else {
- let outputs = exec_jobs(
- groups,
- base.config.slurm_runner,
- base.config.slurm_max_par.into(),
- )?;
- for output in &outputs {
- output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
- }
- }
- merge_clairs_parts(&base, actual_n_parts)?;
- merge_clairs_germline_parts(&base, actual_n_parts)?;
- if !config.clairs_keep_parts {
- let somatic_final = config.passed_vcf.clone();
- let germline_final = config.germline_passed_vcf.clone();
- anyhow::ensure!(
- Path::new(&somatic_final).exists(),
- "Refusing to clean up parts: merged somatic VCF not found at {}",
- somatic_final
- );
- anyhow::ensure!(
- Path::new(&germline_final).exists(),
- "Refusing to clean up parts: merged germline VCF not found at {}",
- germline_final
- );
- let base_dir = config.output_dir.clone();
- for i in 1..=actual_n_parts {
- let part_dir = format!("{base_dir}/part{i}");
- if Path::new(&part_dir).exists() {
- fs::remove_dir_all(&part_dir)
- .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
- }
- }
- info!("Cleaned up {} part directories for {}", actual_n_parts, id);
- }
- info!(
- "ClairS resumed/completed for {}: {} parts merged",
- id, actual_n_parts
- );
- Ok(())
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use crate::helpers::test_init;
- #[test]
- fn clairs_version() -> anyhow::Result<()> {
- test_init();
- let vl = ClairS::version(&Config::default())?;
- info!("ClairS local version: {vl}");
- let vs = ClairS::version_slurm(&Config::default())?;
- info!("ClairS slurm version: {vs}");
- assert_eq!(vl, vs);
- Ok(())
- }
- #[test]
- fn clairs_run() -> anyhow::Result<()> {
- test_init();
- let id = "CHAHA";
- let config = Config::default();
- let mut clairs = ClairS::initialize(id, &config)?;
- crate::runners::Run::run(&mut clairs)?;
- Ok(())
- }
- #[test]
- fn clairs_resume() -> anyhow::Result<()> {
- test_init();
- let id = "CHAHA";
- let config = Config::default();
- let mut clairs = ClairS::initialize(id, &config)?;
- clairs.run_chunked_resume(60);
- Ok(())
- }
- }
|