| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987 |
- //! # ClairS Somatic Variant Calling Pipeline
- //!
- //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS),
- //! a deep learning-based somatic variant caller designed for long-read sequencing data
- //! (ONT and PacBio HiFi).
- //!
- //! ## Overview
- //!
- //! ClairS performs somatic variant calling on paired tumor-normal samples using:
- //!
- //! - Haplotype-aware variant calling with LongPhase integration
- //! - Separate SNV and indel calling pipelines
- //! - Clair3-based germline variant detection on the normal sample
- //!
- //! ## Key Features
- //!
- //! - **Deep learning-based** - CNN models trained on long-read data
- //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy
- //! - **Dual output** - Somatic and germline variants in a single run
- //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi
- //! - **Parallel execution** - Genome chunking for HPC scalability
- //!
- //! ## Requirements
- //!
- //! Before running ClairS, ensure:
- //! - Tumor and normal BAMs are indexed (`.bai` files present)
- //! - Reference genome is accessible
- //! - Singularity/Docker image is available
- //! - Platform is correctly specified (`config.clairs_platform`)
- //!
- //! ## Execution Modes
- //!
- //! The module supports three execution strategies:
- //!
- //! - **Local** ([`ClairS::run_local`]) — Single-node execution, useful for debugging
- //! - **Slurm** ([`ClairS::run_sbatch`]) — Single HPC job submission
- //! - **Chunked** ([`run_clairs_chunked_sbatch_with_merge`]) — Parallel execution across genome regions
- //!
- //! ### Chunked Parallel Execution
- //!
- //! For large genomes, the chunked mode provides significant speedup:
- //!
- //! ```text
- //! ┌─────────────────────────────────────────────────────────────────┐
- //! │ Genome Splitting │
- //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │
- //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘
- //! │ │ │ │
- //! ▼ ▼ ▼ ▼
- //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
- //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │
- //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │
- //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
- //! │ │ │ │
- //! ▼ ▼ ▼ ▼
- //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
- //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│
- //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │
- //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │
- //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
- //! │ │ │ │
- //! └───────────────┴───────┬───────┴───────────────────┘
- //! ▼
- //! ┌─────────────────────┐
- //! │ bcftools concat │
- //! │ (somatic PASS) │
- //! └──────────┬──────────┘
- //! ▼
- //! ┌─────────────────────┐
- //! │ Final VCF Output │
- //! └─────────────────────┘
- //! ```
- //!
- //! ## Output Files
- //!
- //! Somatic variants (PASS only):
- //! ```text
- //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz
- //! ```
- //!
- //! Germline variants (PASS only):
- //! ```text
- //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz
- //! ```
- //!
- //! ## Post-processing Pipeline
- //!
- //! Each ClairS run (or part) undergoes automatic post-processing:
- //!
- //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants
- //! 2. **Germline**: Filter PASS variants from Clair3 germline output
- //!
- //! ## Usage
- //!
- //! ### Basic Slurm Execution
- //!
- //! ```ignore
- //! use crate::config::Config;
- //! use crate::pipes::clairs::ClairS;
- //! use crate::pipes::Initialize;
- //!
- //! let config = Config::default();
- //! let mut clairs = ClairS::initialize("sample_001", &config)?;
- //! let output = clairs.run_sbatch()?;
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ### Chunked Parallel Execution (Recommended for WGS)
- //!
- //! ```ignore
- //! use crate::config::Config;
- //! use crate::pipes::clairs::run_clairs_chunked_sbatch_with_merge;
- //!
- //! let config = Config::default();
- //! let outputs = run_clairs_chunked_sbatch_with_merge("sample_001", &config, 20)?;
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ### Loading Variants for Downstream Analysis
- //!
- //! ```ignore
- //! use crate::annotation::Annotations;
- //! use crate::variant::vcf_variant::Variants;
- //!
- //! let annotations = Annotations::new();
- //! let somatic = clairs.variants(&annotations)?;
- //! let germline = clairs.germline(&annotations)?;
- //!
- //! println!("Somatic: {} variants", somatic.variants.len());
- //! println!("Germline: {} variants", germline.variants.len());
- //! # Ok::<(), anyhow::Error>(())
- //! ```
- //!
- //! ## Configuration Requirements
- //!
- //! The following [`Config`](crate::config::Config) fields must be set:
- //!
- //! - `singularity_bin` — Path to Singularity executable
- //! - `clairs_image` — ClairS container image path
- //! - `reference` — Reference genome FASTA
- //! - `clairs_threads` — CPU threads per job
- //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.)
- //! - `clairs_force` — Force re-run even if outputs exist
- //!
- //! ## Implemented Traits
- //!
- //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation
- //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating
- //! - [`JobCommand`](crate::commands::Command) — Command string generation
- //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support
- //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission
- //! - [`Run`](crate::runners::Run) — Unified execution interface
- //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading
- //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category
- //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier
- //! - [`Version`](crate::pipes::Version) — Tool version extraction
- //!
- //! ## Dependencies
- //!
- //! External tools (containerized or system):
- //!
- //! - **ClairS** — Somatic variant calling
- //! - **bcftools** — VCF concatenation and filtering
- //!
- //! ## References
- //!
- //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
- //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
- Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, samtools::SamtoolsMergeMany
- }, config::Config, helpers::{
- get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists,
- singularity_bind_flags, split_genome_into_n_regions_exact,
- }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{
- variant_collection::VariantCollection,
- vcf_variant::{Label, Variants},
- }
- };
- use anyhow::Context;
- use log::{debug, info, warn};
- use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
- use regex::Regex;
- use rust_htslib::bam::{self, Read};
- use std::{
- fmt, fs,
- path::{Path, PathBuf},
- process::{Command as ProcessCommand, Stdio},
- };
- use uuid::Uuid;
- /// ClairS haplotype-aware somatic variant caller runner.
- ///
- /// Executes ClairS for paired tumor-normal variant calling with automatic
- /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction).
- ///
- /// # Fields
- ///
- /// - `id` - Sample identifier (e.g., "34528")
- /// - `config` - Global pipeline configuration
- /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/clairs")
- /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`)
- /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
- ///
- /// # Execution Modes
- ///
- /// - **Local** - Direct execution via `run_local()`
- /// - **Slurm** - Single job submission via `run_sbatch()`
- /// - **Chunked** - Parallel genome-wide execution via [`run_clairs_chunked_sbatch_with_merge`]
- ///
- /// # Output Files
- ///
- /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
- /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
- ///
- /// # Chunked Execution
- ///
- /// For whole-genome sequencing, use [`run_clairs_chunked_sbatch_with_merge`] which:
- /// 1. Splits genome into N equal-sized regions
- /// 2. Runs ClairS in parallel Slurm jobs (one per region)
- /// 3. Post-processes each part (concat SNV+indel, filter PASS)
- /// 4. Merges all part PASS VCFs into final output
- #[derive(Debug, Clone)]
- pub struct ClairS {
- /// Sample identifier
- pub id: String,
- /// Global pipeline configuration
- pub config: Config,
- /// Directory for log file storage
- pub log_dir: String,
- /// Optional genomic region restriction (format: "chr:start-end")
- pub region: Option<String>,
- /// Optional part index for chunked parallel runs (1-indexed)
- pub part_index: Option<usize>,
- }
- impl fmt::Display for ClairS {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "🧬 ClairS {}", self.id)?;
- writeln!(
- f,
- " Region : {}",
- self.region.as_deref().unwrap_or("genome-wide")
- )?;
- writeln!(
- f,
- " Part : {}",
- self.part_index
- .map_or("full".into(), |n| format!("part{n}"))
- )?;
- writeln!(f, " Log dir : {}", self.log_dir)
- }
- }
- impl Initialize for ClairS {
- fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- info!("Initializing ClairS for {id}");
- let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
- let clairs = Self {
- id,
- log_dir,
- config: config.clone(),
- region: None,
- part_index: None,
- };
- if clairs.config.clairs_force {
- remove_dir_if_exists(&clairs.config.clairs_output_dir(&clairs.id))?;
- }
- Ok(clairs)
- }
- }
- impl ShouldRun for ClairS {
- /// Determines whether ClairS should be re-run based on BAM modification timestamps.
- fn should_run(&self) -> bool {
- let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
- let normal_older =
- is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
- let tumor_older =
- is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
- let result = normal_older || tumor_older;
- if result {
- info!("ClairS should run for id: {}.", self.id);
- }
- result
- }
- }
- impl JobCommand for ClairS {
- fn init(&mut self) -> anyhow::Result<()> {
- let output_dir = self.part_output_dir();
- fs::create_dir_all(&output_dir)
- .with_context(|| format!("Failed to create dir: {output_dir}"))?;
- fs::create_dir_all(&self.log_dir)
- .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
- Ok(())
- }
- fn cmd(&self) -> String {
- let output_dir = self.part_output_dir();
- let region_arg = self
- .region
- .as_ref()
- .map(|r| format!("-r '{r}'"))
- .unwrap_or_default();
- let sample_name = format!("{}_diag", self.id);
- let bind_flags = singularity_bind_flags([
- &self.config.tumoral_bam(&self.id),
- &self.config.normal_bam(&self.id),
- &self.config.reference,
- &output_dir,
- &self.log_dir,
- &self.config.tmp_dir,
- ]);
- format!(
- "{singularity_bin} exec \
- {binds} \
- --bind {output_dir}:{output_dir} \
- {image} \
- /opt/bin/run_clairs \
- -T {tumor_bam} \
- -N {normal_bam} \
- -R {reference} \
- -t {threads} \
- -p {platform} \
- --enable_indel_calling \
- --include_all_ctgs \
- --print_germline_calls \
- --enable_clair3_germline_output \
- --use_longphase_for_intermediate_haplotagging true \
- --output_dir {output_dir} \
- -s {sample_name} \
- {region_arg}",
- singularity_bin = self.config.singularity_bin,
- binds = bind_flags,
- image = self.config.clairs_image,
- tumor_bam = self.config.tumoral_bam(&self.id),
- normal_bam = self.config.normal_bam(&self.id),
- reference = self.config.reference,
- threads = self.config.clairs_threads,
- platform = self.config.clairs_platform,
- output_dir = output_dir,
- sample_name = sample_name,
- region_arg = region_arg,
- )
- }
- }
- impl LocalRunner for ClairS {}
- impl LocalBatchRunner for ClairS {}
- impl SlurmRunner for ClairS {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some(format!("clairs_{}", self.id)),
- cpus_per_task: Some(self.config.clairs_threads as u32),
- mem: Some("80G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- .to_args()
- }
- }
- impl SbatchRunner for ClairS {
- fn slurm_params(&self) -> SlurmParams {
- SlurmParams {
- job_name: Some(format!("clairs_{}", self.id)),
- cpus_per_task: Some(self.config.clairs_threads as u32),
- mem: Some("70G".into()),
- partition: Some("shortq".into()),
- gres: None,
- }
- }
- }
- impl Run for ClairS {
- fn run(&mut self) -> anyhow::Result<()> {
- if !self.should_run() {
- info!("ClairS is up-to-date.");
- return Ok(());
- }
- // Acquire lock before any work
- let lock_dir = format!("{}/locks", self.config.result_dir);
- let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
- .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
- if self.config.slurm_runner {
- run_clairs_chunked(&self.id, &self.config, 50)?;
- // merge_haplotaged_tmp_bam(&self.config, &self.id)?;
- } else {
- run!(&self.config, self)?;
- let mut germline = self.process_germline()?;
- let report = run!(&self.config, &mut germline)
- .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
- report
- .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
- .context("Failed to save germline PASS logs")?;
- let (mut concat, tmp_path) = self.process_somatic_concat()?;
- let (snv_vcf, indel_vcf) = self.config.clairs_output_vcfs(&self.id);
- let report = run!(&self.config, &mut concat)
- .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
- report
- .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
- .context("Failed to save concat logs")?;
- let mut keep_pass = self.process_somatic_pass(&tmp_path)?;
- let report = run!(&self.config, &mut keep_pass)
- .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
- report
- .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
- .context("Failed to save PASS filter logs")?;
- // Clean up temporary concatenated VCF
- debug!("Removing temporary file: {}", tmp_path);
- if let Err(e) = fs::remove_file(&tmp_path) {
- warn!("Failed to remove temporary file {}: {}", tmp_path, e);
- }
- }
- Ok(())
- }
- }
- impl ClairS {
- /// Returns the germline PASS VCF path.
- ///
- /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz`
- /// - Full runs: canonical path from config
- fn germline_passed_vcf_path(&self) -> String {
- match self.part_index {
- Some(idx) => {
- let outdir = self.part_output_dir();
- format!("{outdir}/clair3_germline.part{idx}.pass.vcf.gz")
- }
- None => self.config.clairs_germline_passed_vcf(&self.id),
- }
- }
- fn process_germline(&self) -> anyhow::Result<BcftoolsKeepPass> {
- let germline_passed = self.germline_passed_vcf_path();
- let germline_input = match self.part_index {
- Some(_) => {
- let output_dir = self.part_output_dir();
- let base = self.config.clairs_germline_normal_vcf(&self.id);
- let base_name = Path::new(&base)
- .file_name()
- .expect("Germline VCF should have filename")
- .to_string_lossy();
- format!("{output_dir}/{base_name}")
- }
- None => self.config.clairs_germline_normal_vcf(&self.id),
- };
- info!(
- "Filtering germline PASS variants for {} part {:?}",
- self.id, self.part_index
- );
- let cmd =
- BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
- Ok(cmd)
- }
- fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> {
- let output_dir = self.part_output_dir();
- let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
- let snv_vcf = format!(
- "{}/{}",
- output_dir,
- Path::new(&snv_vcf_base)
- .file_name()
- .expect("SNV VCF should have filename")
- .to_string_lossy()
- );
- let indel_vcf = format!(
- "{}/{}",
- output_dir,
- Path::new(&indel_vcf_base)
- .file_name()
- .expect("Indel VCF should have filename")
- .to_string_lossy()
- );
- info!(
- "Concatenating and filtering somatic variants for {} part {:?}",
- self.id, self.part_index
- );
- let tmp_file =
- Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4()));
- let tmp_path = tmp_file
- .to_str()
- .context("Temp path not valid UTF-8")?
- .to_string();
- // Concat SNV + indel
- Ok((
- BcftoolsConcat::from_config(
- &self.config,
- vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
- &tmp_path,
- ),
- tmp_path,
- ))
- }
- pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result<BcftoolsKeepPass> {
- let passed_vcf = self.somatic_passed_vcf_path();
- // Filter PASS
- let keep_pass = BcftoolsKeepPass::from_config(&self.config, tmp_path, passed_vcf.clone());
- Ok(keep_pass)
- }
- /// Returns the per-part output directory.
- fn part_output_dir(&self) -> String {
- let base_dir = self.config.clairs_output_dir(&self.id);
- match self.part_index {
- Some(idx) => format!("{base_dir}/part{idx}"),
- None => base_dir,
- }
- }
- /// Returns the somatic PASS VCF path.
- ///
- /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz`
- /// - Full runs: canonical path from config
- fn somatic_passed_vcf_path(&self) -> String {
- match self.part_index {
- Some(idx) => {
- let outdir = self.part_output_dir();
- format!("{outdir}/clairs.part{idx}.pass.vcf.gz")
- }
- None => self.config.clairs_passed_vcf(&self.id),
- }
- }
- }
- impl CallerCat for ClairS {
- fn caller_cat(&self) -> Annotation {
- Annotation::Callers(Caller::ClairS, Sample::Somatic)
- }
- }
- impl Variants for ClairS {
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
- info!("Loading variants from {caller}: {passed_vcf}");
- let variants = read_vcf(passed_vcf)
- .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
- });
- info!("{caller}: {} variants loaded", variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(passed_vcf.into())?,
- caller,
- })
- }
- }
- impl ClairS {
- /// Loads germline variants from the Clair3 germline output.
- pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
- let germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
- info!("Loading germline variants from {caller}: {germline_passed}");
- let variants = read_vcf(germline_passed)?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
- });
- info!("{caller}: {} variants loaded", variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(germline_passed.into())?,
- caller,
- })
- }
- }
- impl Label for ClairS {
- fn label(&self) -> String {
- self.caller_cat().to_string()
- }
- }
- impl Version for ClairS {
- fn version(config: &Config) -> anyhow::Result<String> {
- let out = ProcessCommand::new("bash")
- .arg("-c")
- .arg(format!(
- "{} exec {} /opt/bin/run_clairs --version",
- config.singularity_bin, config.clairs_image
- ))
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .output()
- .context("Failed to spawn Singularity process")?;
- if !out.status.success() {
- let combined = format!(
- "{}{}",
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr)
- );
- anyhow::bail!(
- "Singularity exec failed with status {}: {}",
- out.status,
- combined
- );
- }
- let combined = format!(
- "{}{}",
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr)
- );
- parse_clairs_version(&combined)
- }
- fn version_slurm(config: &Config) -> anyhow::Result<String> {
- struct ClairSVersionJob<'a> {
- config: &'a Config,
- }
- impl JobCommand for ClairSVersionJob<'_> {
- fn cmd(&self) -> String {
- format!(
- "{} exec {} /opt/bin/run_clairs --version",
- self.config.singularity_bin, self.config.clairs_image
- )
- }
- }
- impl SlurmRunner for ClairSVersionJob<'_> {
- fn slurm_args(&self) -> Vec<String> {
- SlurmParams {
- job_name: Some("clairs_version".into()),
- partition: Some("shortq".into()),
- cpus_per_task: Some(1),
- mem: Some("10G".into()),
- gres: None,
- }
- .to_args()
- }
- }
- let mut job = ClairSVersionJob { config };
- let out =
- SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?;
- let mut combined = out.stdout.clone();
- if let Some(epilog) = &out.slurm_epilog {
- combined.push_str(&epilog.to_string());
- }
- combined.push_str(&out.stderr);
- parse_clairs_version(&combined)
- }
- }
- /// Parses ClairS version from command output.
- fn parse_clairs_version(output: &str) -> anyhow::Result<String> {
- let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid");
- let caps = re
- .captures(output)
- .context("Could not parse ClairS version from output")?;
- Ok(caps
- .get(1)
- .expect("Regex has capture group 1")
- .as_str()
- .to_string())
- }
- /// Merges N chunked ClairS PASS VCFs into the final output.
- fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
- let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
- for i in 1..=n_parts {
- let mut part = base.clone();
- part.part_index = Some(i);
- let part_pass = part.somatic_passed_vcf_path();
- anyhow::ensure!(
- Path::new(&part_pass).exists(),
- "Missing ClairS part {} PASS VCF: {}",
- i,
- part_pass
- );
- part_pass_paths.push(PathBuf::from(part_pass));
- }
- let final_passed_vcf = base.config.clairs_passed_vcf(&base.id);
- let rand = Uuid::new_v4();
- let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
- let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
- if let Some(parent) = Path::new(&final_passed_vcf).parent() {
- fs::create_dir_all(parent)?;
- }
- info!(
- "Concatenating {} ClairS part VCFs into {}",
- n_parts, final_passed_vcf
- );
- let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
- run!(&base.config, &mut concat).context("Failed to run bcftools concat for ClairS parts")?;
- fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
- fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
- .context("Failed to rename merged ClairS PASS VCF CSI")?;
- info!(
- "Successfully merged {} ClairS parts into {}",
- n_parts, final_passed_vcf
- );
- Ok(())
- }
- fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
- let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
- for i in 1..=n_parts {
- let mut part = base.clone();
- part.part_index = Some(i);
- let part_pass = part.germline_passed_vcf_path();
- anyhow::ensure!(
- Path::new(&part_pass).exists(),
- "Missing ClairS germline part {} PASS VCF: {}",
- i,
- part_pass
- );
- part_pass_paths.push(PathBuf::from(part_pass));
- }
- let final_passed_vcf = base.config.clairs_germline_passed_vcf(&base.id);
- let rand = Uuid::new_v4();
- let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
- let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
- if let Some(parent) = Path::new(&final_passed_vcf).parent() {
- fs::create_dir_all(parent)?;
- }
- info!(
- "Concatenating {} ClairS germline part VCFs into {}",
- n_parts, final_passed_vcf
- );
- let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
- run!(&base.config, &mut concat)
- .context("Failed to run bcftools concat for ClairS germline parts")?;
- fs::rename(&final_tmp, &final_passed_vcf)
- .context("Failed to rename merged ClairS germline PASS VCF")?;
- fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
- .context("Failed to rename merged ClairS germline PASS VCF CSI")?;
- info!(
- "Successfully merged {} ClairS germline parts into {}",
- n_parts, final_passed_vcf
- );
- Ok(())
- }
- pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> {
- let dir = config.clairs_output_dir(id);
- let bams: Vec<PathBuf> = list_files_recursive(dir)
- .into_iter()
- .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam"))
- .collect();
- let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf();
- info!("Merging {} into: {}", bams.len(), into.display());
- let mut merge = SamtoolsMergeMany::from_config(into, bams, config);
- run!(config, &mut merge)?;
- Ok(())
- }
- /// Runs ClairS in parallel chunks, then merges results.
- ///
- /// Splits the genome into N equal-sized regions, runs ClairS on each region
- /// in parallel (local or Slurm based on `config.slurm_runner`), post-processes
- /// each part (concatenates SNV+indel, filters PASS), and merges both somatic
- /// and germline VCFs.
- ///
- /// # Arguments
- ///
- /// * `id` - Sample identifier
- /// * `config` - Global pipeline configuration
- /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
- ///
- /// # Returns
- ///
- /// `Ok(())` on success, or an error if any step fails.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - `n_parts` is 0
- /// - Normal BAM file cannot be opened or is corrupted
- /// - BAM header is malformed
- /// - ClairS execution fails on any part
- /// - SNV+indel concatenation fails
- /// - PASS filtering fails
- /// - Somatic or germline VCF merging fails
- /// - Output directory cannot be created
- ///
- /// # Example
- ///
- /// ```ignore
- /// let config = Config::default();
- /// run_clairs_chunked("sample_001", &config, 30)?;
- /// ```
- pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
- anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
- let lock_dir = format!("{}/locks", config.result_dir);
- let _lock = SampleLock::acquire(&lock_dir, id, "clairs")
- .with_context(|| format!("Cannot start ClairS chunked for {}", id))?;
- let base = ClairS::initialize(id, config)?;
- if !base.should_run() {
- debug!("ClairS PASS VCF already up-to-date for {id}, skipping.");
- return Ok(());
- }
- let normal_bam = config.normal_bam(id);
- let reader = bam::Reader::from_path(&normal_bam)
- .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
- let header = bam::Header::from_template(reader.header());
- let genome_sizes = get_genome_sizes(&header)?;
- let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
- .into_iter()
- .flatten()
- .collect::<Vec<String>>();
- let actual_n_parts = regions.len();
- info!(
- "Running ClairS in {} parallel parts for {}",
- actual_n_parts, id
- );
- let mut jobs = Vec::with_capacity(actual_n_parts);
- for (i, region) in regions.into_iter().enumerate() {
- let mut job = base.clone();
- job.part_index = Some(i + 1);
- job.region = Some(region);
- job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
- info!("Planned ClairS job:\n{job}");
- jobs.push(job);
- }
- let outputs = run_many!(config, jobs.clone())?;
- for output in outputs.iter() {
- output.save_to_file(format!("{}/clairs_", base.log_dir))?;
- }
- // 1) Germline PASS filters (one per part)
- let mut germlines = Vec::new();
- // 2a) Somatic SNV+indel concats (one per part)
- let mut somatic_concats = Vec::new();
- let mut somatic_tmp_paths = Vec::new();
- for job in &jobs {
- // germline PASS
- germlines.push(job.process_germline()?);
- // somatic concat (SNV+indel -> tmp VCF)
- let (concat, tmp_path) = job.process_somatic_concat()?;
- somatic_concats.push(concat);
- somatic_tmp_paths.push(tmp_path);
- }
- // Run all germline PASS filters in parallel
- run_many!(config, germlines)?;
- // Run all somatic concats in parallel
- run_many!(config, somatic_concats)?;
- // 2b) Somatic PASS filters (one per part, based on tmp VCFs)
- let mut somatic_pass_filters = Vec::new();
- for (job, tmp_path) in jobs.iter().zip(somatic_tmp_paths.iter()) {
- somatic_pass_filters.push(job.process_somatic_pass(tmp_path)?);
- }
- // Run all somatic PASS filters in parallel
- run_many!(config, somatic_pass_filters)?;
- merge_clairs_parts(&base, actual_n_parts)?;
- merge_clairs_germline_parts(&base, actual_n_parts)?;
- info!(
- "ClairS completed for {}: {} parts merged (somatic + germline)",
- id, actual_n_parts
- );
- Ok(())
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use crate::helpers::test_init;
- #[test]
- fn clairs_version() -> anyhow::Result<()> {
- test_init();
- let vl = ClairS::version(&Config::default())?;
- info!("ClairS local version: {vl}");
- let vs = ClairS::version_slurm(&Config::default())?;
- info!("ClairS slurm version: {vs}");
- assert_eq!(vl, vs);
- Ok(())
- }
- #[test]
- fn clairs_run() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- // let clairs = ClairS::initialize("34528", &config)?;
- // info!("{clairs}");
- run_clairs_chunked("CHAHA", &config, 50)?;
- Ok(())
- }
- #[test]
- fn clairs_haplo() -> anyhow::Result<()> {
- test_init();
- let config = Config::default();
- merge_haplotaged_tmp_bam(&config, "DUMCO")?;
- Ok(())
- }
- }
|