//! # ClairS Somatic Variant Calling Pipeline //! //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS), //! a deep learning-based somatic variant caller designed for long-read sequencing data //! (ONT and PacBio HiFi). //! //! ## Overview //! //! ClairS performs somatic variant calling on paired tumor-normal samples using: //! //! - Haplotype-aware variant calling with LongPhase integration //! - Separate SNV and indel calling pipelines //! - Clair3-based germline variant detection on the normal sample //! //! ## Key Features //! //! - **Deep learning-based** - CNN models trained on long-read data //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy //! - **Dual output** - Somatic and germline variants in a single run //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi //! - **Parallel execution** - Genome chunking for HPC scalability //! //! ## Requirements //! //! Before running ClairS, ensure: //! - Tumor and normal BAMs are indexed (`.bai` files present) //! - Reference genome is accessible //! - Singularity/Docker image is available //! - Platform is correctly specified (`config.clairs_platform`) //! //! ## Execution Modes //! //! The module supports three execution strategies, all dispatched through [`Run::run`]: //! //! - **Local** — Single-node execution via [`LocalRunner`](crate::commands::LocalRunner) //! - **Slurm** — Single HPC job submission via [`SbatchRunner`](crate::commands::SbatchRunner) //! - **Chunked** — Parallel execution across genome regions (enabled automatically when //! `config.slurm_runner = true`, calls the internal `run_chunked` function) //! //! ### Chunked Parallel Execution //! //! For large genomes, the chunked mode provides significant speedup: //! //! ```text //! ┌─────────────────────────────────────────────────────────────────┐ //! │ Genome Splitting │ //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │ //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘ //! │ │ │ │ //! ▼ ▼ ▼ ▼ //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │ //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │ //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ //! │ │ │ │ //! ▼ ▼ ▼ ▼ //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│ //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │ //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ //! │ │ │ │ //! └───────────────┴───────┬───────┴───────────────────┘ //! ▼ //! ┌─────────────────────┐ //! │ bcftools concat │ //! │ (somatic PASS) │ //! └──────────┬──────────┘ //! ▼ //! ┌─────────────────────┐ //! │ Final VCF Output │ //! └─────────────────────┘ //! ``` //! //! ## Output Files //! //! Somatic variants (PASS only): //! ```text //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz //! ``` //! //! Germline variants (PASS only): //! ```text //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz //! ``` //! //! ## Post-processing Pipeline //! //! Each ClairS run (or part) undergoes automatic post-processing: //! //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants //! 2. **Germline**: Filter PASS variants from Clair3 germline output //! //! ## Usage //! //! ### Basic Execution //! //! ```ignore //! use pandora_lib_promethion::callers::clairs::ClairS; //! use pandora_lib_promethion::config::Config; //! use pandora_lib_promethion::pipes::Initialize; //! use pandora_lib_promethion::runners::Run; //! //! let config = Config::default(); //! // config.slurm_runner = true → chunked Slurm execution (60 parts) //! // config.slurm_runner = false → local single-node execution //! let mut clairs = ClairS::initialize("sample_001", &config)?; //! clairs.run()?; //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ### Loading Variants for Downstream Analysis //! //! ```ignore //! use pandora_lib_promethion::annotation::Annotations; //! use pandora_lib_promethion::variant::vcf_variant::Variants; //! //! let annotations = Annotations::new(); //! let somatic = clairs.variants(&annotations)?; //! let germline = clairs.germline(&annotations)?; //! //! println!("Somatic: {} variants", somatic.variants.len()); //! println!("Germline: {} variants", germline.variants.len()); //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ## Configuration Requirements //! //! The following [`Config`](crate::config::Config) fields must be set: //! //! - `singularity_bin` — Path to Singularity executable //! - `clairs_image` — ClairS container image path //! - `reference` — Reference genome FASTA //! - `clairs_threads` — CPU threads per job //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.) //! - `clairs_force` — Force re-run even if outputs exist //! //! ## Implemented Traits //! //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating //! - [`JobCommand`](crate::commands::Command) — Command string generation //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission //! - [`Run`](crate::runners::Run) — Unified execution interface //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier //! - [`Version`](crate::pipes::Version) — Tool version extraction //! //! ## Dependencies //! //! External tools (containerized or system): //! //! - **ClairS** — Somatic variant calling //! - **bcftools** — VCF concatenation and filtering //! //! ## References //! //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS) //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2) use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{ bcftools::{BcftoolsConcat, BcftoolsKeepPass}, exec_jobs, samtools::SamtoolsMergeMany, AnyJob, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, }, config::Config, helpers::{ get_genome_sizes_in_header_order, is_file_older, list_files_recursive, remove_dir_if_exists, singularity_bind_flags, split_ordered_genome_into_n_regions_exact, }, io::vcf::read_vcf, jobs_seq, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, runners::Run, variant::{ variant_collection::VariantCollection, vcf_variant::{Label, Variants}, }, }; use anyhow::Context; use log::{debug, info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use rust_htslib::bam::{self, Read}; use std::{ fmt, fs, path::{Path, PathBuf}, process::{Command as ProcessCommand, Stdio}, }; use uuid::Uuid; /// ClairS haplotype-aware somatic variant caller runner. /// /// Executes ClairS for paired tumor-normal variant calling with automatic /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction). /// /// # Fields /// /// - `id` - Sample identifier (e.g., "34528") /// - `config` - All resolved paths, tool settings, and Slurm parameters (see [`ClairsConfig`]) /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`) /// - `part_index` - Chunk index when running in parallel (e.g., `Some(3)` for part 3 of N); /// `None` for full-genome runs /// /// # Execution /// /// Call [`Run::run`] — it dispatches automatically: /// - `config.slurm_runner = false` → local single-node execution /// - `config.slurm_runner = true` → chunked parallel Slurm execution (60 genome parts) /// /// # Output Files /// /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz` /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz` #[derive(Debug, Clone)] pub struct ClairS { pub id: String, pub config: ClairsConfig, pub region: Option, pub part_index: Option, } /// Resolved configuration for a single ClairS run. /// /// Built once at initialisation via [`ClairsConfig::from_config`] and carried /// inside [`ClairS`]. All path templates from [`Config`] are expanded here so /// that `ClairS` methods never need to re-expand them. #[derive(Debug, Clone)] pub struct ClairsConfig { // Execution pub singularity_bin: String, pub clairs_image: String, pub clairs_threads: u8, pub clairs_platform: String, pub clairs_force: bool, pub clairs_keep_parts: bool, // Paths pub reference: String, pub tmp_dir: String, pub result_dir: String, // Slurm pub slurm_runner: bool, pub slurm_max_par: u8, // Sample-scoped paths, computed once at init pub normal_bam: String, pub tumoral_bam: String, pub passed_vcf: String, pub germline_passed_vcf: String, pub germline_normal_vcf: String, pub output_dir: String, pub snv_vcf: String, pub indel_vcf: String, pub log_dir: String, pub lock_dir: String, // BcftoolsConcat pub bcftools_bin: String, pub bcftools_threads: u8, } impl ClairsConfig { pub fn from_config(config: &Config, id: &str) -> Self { let output_dir = config.clairs_output_dir(id); let (snv_vcf, indel_vcf) = config.clairs_output_vcfs(id); Self { singularity_bin: config.singularity_bin.clone(), clairs_image: config.clairs_image.clone(), clairs_threads: config.clairs_threads, clairs_platform: config.clairs_platform.clone(), clairs_force: config.clairs_force, clairs_keep_parts: config.clairs_keep_parts, reference: config.reference.clone(), tmp_dir: config.tmp_dir.clone(), result_dir: config.result_dir.clone(), slurm_runner: config.slurm_runner, slurm_max_par: config.slurm_max_par, normal_bam: config.normal_bam(id), tumoral_bam: config.tumoral_bam(id), passed_vcf: config.clairs_passed_vcf(id), germline_passed_vcf: config.clairs_germline_passed_vcf(id), germline_normal_vcf: config.clairs_germline_normal_vcf(id), output_dir, snv_vcf, indel_vcf, log_dir: format!("{}/{}/log/clairs", config.result_dir, id), lock_dir: format!("{}/locks", config.result_dir), bcftools_bin: config.bcftools_bin.clone(), bcftools_threads: config.bcftools_threads, } } } impl fmt::Display for ClairS { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "🧬 ClairS {}", self.id)?; writeln!( f, " Region : {}", self.region.as_deref().unwrap_or("genome-wide") )?; writeln!( f, " Part : {}", self.part_index .map_or("full".into(), |n| format!("part{n}")) )?; writeln!(f, " Log dir : {}", self.config.log_dir) } } impl Initialize for ClairS { fn initialize(id: &str, config: &Config) -> anyhow::Result { let id = id.to_string(); info!("Initializing ClairS for {id}"); let clairs = Self { config: ClairsConfig::from_config(config, &id), id, region: None, part_index: None, }; if config.clairs_force { remove_dir_if_exists(&config.clairs_output_dir(&clairs.id))?; } Ok(clairs) } } impl ShouldRun for ClairS { /// Determines whether ClairS should be re-run based on BAM modification timestamps. fn should_run(&self) -> bool { let passed_vcf = &self.config.passed_vcf; let normal_older = is_file_older(passed_vcf, &self.config.normal_bam, true).unwrap_or(true); let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam, true).unwrap_or(true); let result = normal_older || tumor_older; if result { info!("ClairS should run for id: {}.", self.id); } result } } impl JobCommand for ClairS { fn init(&mut self) -> anyhow::Result<()> { let output_dir = &self.config.output_dir; fs::create_dir_all(output_dir) .with_context(|| format!("Failed to create dir: {output_dir}"))?; fs::create_dir_all(&self.config.log_dir) .with_context(|| format!("Failed to create dir: {}", self.config.log_dir))?; Ok(()) } fn cmd(&self) -> String { let output_dir = &self.config.output_dir; let region_arg = self .region .as_ref() .map(|r| format!("-r '{r}'")) .unwrap_or_default(); let sample_name = format!("{}_diag", self.id); let bind_flags = singularity_bind_flags([ &self.config.tumoral_bam, &self.config.normal_bam, &self.config.reference, output_dir, &self.config.log_dir, &self.config.tmp_dir, ]); format!( "{singularity_bin} exec --cleanenv \ {binds} \ --bind {output_dir}:{output_dir} \ {image} \ /opt/bin/run_clairs \ -T {tumor_bam} \ -N {normal_bam} \ -R {reference} \ -t {threads} \ -p {platform} \ --enable_indel_calling \ --include_all_ctgs \ --print_germline_calls \ --enable_clair3_germline_output \ --use_longphase_for_intermediate_haplotagging true \ --output_dir {output_dir} \ -s {sample_name} \ {region_arg}", singularity_bin = self.config.singularity_bin, binds = bind_flags, image = self.config.clairs_image, tumor_bam = self.config.tumoral_bam, normal_bam = self.config.normal_bam, reference = self.config.reference, threads = self.config.clairs_threads, platform = self.config.clairs_platform, output_dir = output_dir, sample_name = sample_name, region_arg = region_arg, ) } } impl LocalRunner for ClairS {} // impl SlurmRunner for ClairS { // fn slurm_args(&self) -> Vec { // self.slurm_params().to_args() // } // } impl SbatchRunner for ClairS { fn slurm_params(&self) -> SlurmParams { let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default(); SlurmParams { job_name: Some(format!("clairs_{}{}", self.id, batch_id)), cpus_per_task: Some(self.config.clairs_threads as u32), mem: Some("50G".into()), partition: Some("shortq".into()), gres: None, } } } impl Run for ClairS { fn run(&mut self) -> anyhow::Result<()> { if !self.should_run() { info!("ClairS is up-to-date."); return Ok(()); } // Acquire lock before any work let lock_dir = format!("{}/locks", self.config.result_dir); let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs") .with_context(|| format!("Cannot start ClairS for {}", self.id))?; if self.config.slurm_runner { run_chunked(&self.id, &self.config, 60)?; } else { let (somatic_concat, somatic_tmp_path) = self.process_somatic_concat()?; let somatic_pass = self.process_somatic_pass(&somatic_tmp_path)?; let germline = self.process_germline()?; let outputs = exec_jobs( vec![jobs_seq![ self.clone(), germline, somatic_concat, somatic_pass, ]], false, 1, ) .context("Failed to run ClairS local pipeline")?; for output in &outputs { output.save_to_file(format!("{}/clairs_", self.config.log_dir))?; } // Clean up temporary concatenated VCF debug!("Removing temporary file: {}", somatic_tmp_path); if let Err(e) = fs::remove_file(&somatic_tmp_path) { warn!( "Failed to remove temporary file {}: {}", somatic_tmp_path, e ); } } Ok(()) } } impl ClairS { /// Returns the germline PASS VCF path. /// /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz` /// - Full runs: canonical path from config fn germline_passed_vcf_path(&self) -> String { match self.part_index { Some(idx) => { format!( "{}/clair3_germline.part{idx}.pass.vcf.gz", self.config.output_dir ) } None => self.config.germline_passed_vcf.clone(), } } fn process_germline(&self) -> anyhow::Result { let germline_passed = self.germline_passed_vcf_path(); let germline_input = match self.part_index { Some(_) => { let output_dir = self.config.output_dir.clone(); let base = self.config.germline_normal_vcf.clone(); let base_name = Path::new(&base) .file_name() .with_context(|| { format!("germline VCF path has no filename component: {base}") })? .to_string_lossy(); format!("{output_dir}/{base_name}") } None => self.config.germline_normal_vcf.clone(), }; info!( "Filtering germline PASS variants for {} part {:?}", self.id, self.part_index ); Ok(BcftoolsKeepPass { bin: self.config.bcftools_bin.clone(), threads: self.config.bcftools_threads, input: germline_input.into(), output: germline_passed.clone().into(), tmp_dir: self.config.tmp_dir.clone().into(), tmp_sort: PathBuf::new(), tmp_norm: PathBuf::new(), }) } fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> { let output_dir = self.config.output_dir.clone(); let snv_vcf_base = self.config.snv_vcf.clone(); let indel_vcf_base = self.config.indel_vcf.clone(); let snv_vcf = format!( "{}/{}", output_dir, Path::new(&snv_vcf_base) .file_name() .with_context(|| format!("SNV VCF path has no filename component: {snv_vcf_base}"))? .to_string_lossy() ); let indel_vcf = format!( "{}/{}", output_dir, Path::new(&indel_vcf_base) .file_name() .with_context(|| { format!("indel VCF path has no filename component: {indel_vcf_base}") })? .to_string_lossy() ); info!( "Concatenating and filtering somatic variants for {} part {:?}", self.id, self.part_index ); let tmp_file = Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4())); let tmp_path = tmp_file .to_str() .context("Temp path not valid UTF-8")? .to_string(); // Concat SNV + indel let concat = BcftoolsConcat { bin: self.config.bcftools_bin.clone(), threads: self.config.bcftools_threads, inputs: vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)], output: tmp_path.clone().into(), tmp_dir: self.config.tmp_dir.clone().into(), tmp_list: PathBuf::new(), }; Ok((concat, tmp_path)) } pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result { let passed_vcf = self.somatic_passed_vcf_path(); // Filter PASS let keep_pass = BcftoolsKeepPass { bin: self.config.bcftools_bin.clone(), threads: self.config.bcftools_threads, input: tmp_path.into(), output: passed_vcf.clone().into(), tmp_dir: self.config.tmp_dir.clone().into(), tmp_sort: PathBuf::new(), tmp_norm: PathBuf::new(), }; Ok(keep_pass) } fn for_part(&self, idx: usize) -> Self { let mut part = self.clone(); part.part_index = Some(idx); part.config.output_dir = format!("{}/part{idx}", self.config.output_dir); part.config.log_dir = format!("{}/part{idx}", self.config.log_dir); Self { ..part } } /// Returns the somatic PASS VCF path. /// /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz` /// - Full runs: canonical path from config fn somatic_passed_vcf_path(&self) -> String { match self.part_index { Some(idx) => { format!("{}/clairs.part{idx}.pass.vcf.gz", self.config.output_dir) } None => self.config.passed_vcf.clone(), } } pub fn run_chunked_resume(&self, n_parts: usize) -> anyhow::Result<()> { run_chunked_resume(&self.id, &self.config, n_parts) } } impl CallerCat for ClairS { fn caller_cat(&self) -> Annotation { Annotation::Callers(Caller::ClairS, Sample::Somatic) } } impl Variants for ClairS { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let passed_vcf = &self.config.passed_vcf.clone(); info!("Loading variants from {caller}: {passed_vcf}"); let variants = read_vcf(passed_vcf) .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{caller}: {} variants loaded", variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(passed_vcf.into())?, caller, }) } } impl ClairS { /// Loads germline variants from the Clair3 germline output. pub fn germline(&self, annotations: &Annotations) -> anyhow::Result { let caller = Annotation::Callers(Caller::ClairS, Sample::Germline); let germline_passed = &self.config.germline_passed_vcf; info!("Loading germline variants from {caller}: {germline_passed}"); let variants = read_vcf(germline_passed)?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{caller}: {} variants loaded", variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(germline_passed.into())?, caller, }) } } impl Label for ClairS { fn label(&self) -> String { self.caller_cat().to_string() } } impl Version for ClairS { fn version(config: &Config) -> anyhow::Result { let out = ProcessCommand::new("bash") .arg("-c") .arg(format!( "{} exec {} /opt/bin/run_clairs --version", config.singularity_bin, config.clairs_image )) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .context("Failed to spawn Singularity process")?; if !out.status.success() { let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); anyhow::bail!( "Singularity exec failed with status {}: {}", out.status, combined ); } let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); parse_clairs_version(&combined) } fn version_slurm(config: &Config) -> anyhow::Result { struct ClairSVersionJob<'a> { config: &'a Config, } impl JobCommand for ClairSVersionJob<'_> { fn cmd(&self) -> String { format!( "{} exec {} /opt/bin/run_clairs --version", self.config.singularity_bin, self.config.clairs_image ) } } impl SlurmRunner for ClairSVersionJob<'_> { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("clairs_version".into()), partition: Some("shortq".into()), cpus_per_task: Some(1), mem: Some("10G".into()), gres: None, } .to_args() } } let mut job = ClairSVersionJob { config }; let out = SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?; let mut combined = out.stdout.clone(); if let Some(epilog) = &out.slurm_epilog { combined.push_str(&epilog.to_string()); } combined.push_str(&out.stderr); parse_clairs_version(&combined) } } /// Parses ClairS version from command output. fn parse_clairs_version(output: &str) -> anyhow::Result { let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid"); let caps = re .captures(output) .context("Could not parse ClairS version from output")?; Ok(caps .get(1) .expect("Regex has capture group 1") .as_str() .to_string()) } /// Merges N chunked ClairS PASS VCFs into the final output. fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let part = base.for_part(i); let part_pass = part.somatic_passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing ClairS part {} PASS VCF: {}", i, part_pass ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base.config.passed_vcf.clone(); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} ClairS part VCFs into {}", n_parts, final_passed_vcf ); let concat = BcftoolsConcat { bin: base.config.bcftools_bin.clone(), threads: base.config.bcftools_threads, inputs: part_pass_paths, output: final_tmp.clone().into(), tmp_dir: base.config.tmp_dir.clone().into(), tmp_list: PathBuf::new(), }; let outputs = exec_jobs( vec![jobs_seq![concat]], base.config.slurm_runner, base.config.slurm_max_par.into(), ) .context("Failed to run bcftools concat for ClairS germline parts")?; for output in &outputs { output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?; } anyhow::ensure!( Path::new(&final_tmp_csi).exists(), "CSI index not found after concat: {}", final_tmp_csi ); fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged ClairS PASS VCF CSI")?; info!( "Successfully merged {} ClairS parts into {}", n_parts, final_passed_vcf ); Ok(()) } fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let part = base.for_part(i); let part_pass = part.germline_passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing ClairS germline part {} PASS VCF: {}", i, part_pass ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base.config.germline_passed_vcf.clone(); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} ClairS germline part VCFs into {}", n_parts, final_passed_vcf ); let concat = BcftoolsConcat { bin: base.config.bcftools_bin.clone(), threads: base.config.bcftools_threads, inputs: part_pass_paths, output: final_tmp.clone().into(), tmp_dir: base.config.tmp_dir.clone().into(), tmp_list: PathBuf::new(), }; let outputs = exec_jobs( vec![jobs_seq![concat]], base.config.slurm_runner, base.config.slurm_max_par.into(), ) .context("Failed to run bcftools concat for ClairS germline parts")?; for output in &outputs { output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?; } anyhow::ensure!( Path::new(&final_tmp_csi).exists(), "CSI index not found after concat: {}", final_tmp_csi ); fs::rename(&final_tmp, &final_passed_vcf) .context("Failed to rename merged ClairS germline PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged ClairS germline PASS VCF CSI")?; info!( "Successfully merged {} ClairS germline parts into {}", n_parts, final_passed_vcf ); Ok(()) } pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> { let dir = config.clairs_output_dir(id); let bams: Vec = list_files_recursive(dir) .into_iter() .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam")) .collect(); let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf(); info!("Merging {} into: {}", bams.len(), into.display()); let merge = SamtoolsMergeMany::from_config(into, bams, config); exec_jobs( vec![jobs_seq![merge]], config.slurm_runner, config.slurm_max_par.into(), )?; Ok(()) } /// Runs ClairS in parallel chunks with sequential per-part post-processing, then merges results. /// /// # Pipeline per part (sequential) /// `ClairS main → germline PASS filter → somatic SNV+indel concat → somatic PASS filter` /// /// # Parallelism /// Parts run in parallel (bounded by `config.slurm_max_par`), dispatched locally or via /// SLURM depending on `config.slurm_runner`. /// /// # Errors /// Fails fast within a part on any step error. Already-running jobs are not cancelled. fn run_chunked(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> { anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0"); let base = ClairS { id: id.to_string(), config: config.clone(), region: None, part_index: None, }; let normal_bam = base.config.normal_bam.clone(); let reader = bam::Reader::from_path(&normal_bam) .with_context(|| format!("Failed to open BAM: {normal_bam}"))?; let genome_sizes = get_genome_sizes_in_header_order(reader.header())?; let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts) .into_iter() .flatten() .collect::>(); let actual_n_parts = regions.len(); info!( "Running ClairS in {} parallel parts for {}", actual_n_parts, id ); let groups: anyhow::Result>>> = regions .into_iter() .enumerate() .map(|(i, region)| { let mut job = base.for_part(i + 1); job.region = Some(region); info!("Planned ClairS job:\n{job}"); let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?; let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?; let germline = job.process_germline()?; Ok(jobs_seq![job, germline, somatic_concat, somatic_pass]) }) .collect(); let outputs = exec_jobs( groups?, base.config.slurm_runner, base.config.slurm_max_par.into(), )?; for output in &outputs { output.save_to_file(format!("{}/clairs_", base.config.log_dir))?; } merge_clairs_parts(&base, actual_n_parts)?; merge_clairs_germline_parts(&base, actual_n_parts)?; if !config.clairs_keep_parts { let somatic_final = config.passed_vcf.clone(); let germline_final = config.germline_passed_vcf.clone(); anyhow::ensure!( Path::new(&somatic_final).exists(), "Refusing to clean up parts: merged somatic VCF not found at {}", somatic_final ); anyhow::ensure!( Path::new(&germline_final).exists(), "Refusing to clean up parts: merged germline VCF not found at {}", germline_final ); let base_dir = config.output_dir.clone(); for i in 1..=actual_n_parts { let part_dir = format!("{base_dir}/part{i}"); if Path::new(&part_dir).exists() { fs::remove_dir_all(&part_dir) .with_context(|| format!("Failed to remove part directory: {part_dir}"))?; debug!("Removed part directory: {part_dir}"); } } info!("Cleaned up {} part directories for {}", actual_n_parts, id); } info!( "ClairS completed for {}: {} parts merged (somatic + germline)", id, actual_n_parts ); Ok(()) } fn part_is_complete(part: &ClairS) -> bool { Path::new(&part.somatic_passed_vcf_path()).exists() && Path::new(&format!("{}.csi", part.somatic_passed_vcf_path())).exists() && Path::new(&part.germline_passed_vcf_path()).exists() && Path::new(&format!("{}.csi", part.germline_passed_vcf_path())).exists() } fn run_chunked_resume(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> { anyhow::ensure!(n_parts > 0, "run_chunked_resume: n_parts must be > 0"); let base = ClairS { id: id.to_string(), config: config.clone(), region: None, part_index: None, }; let normal_bam = base.config.normal_bam.clone(); let reader = bam::Reader::from_path(&normal_bam) .with_context(|| format!("Failed to open BAM: {normal_bam}"))?; let genome_sizes = get_genome_sizes_in_header_order(reader.header())?; let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts) .into_iter() .flatten() .collect::>(); let actual_n_parts = regions.len(); info!( "Resuming ClairS chunked run for {} with {} parts", id, actual_n_parts ); let groups: anyhow::Result>>> = regions .into_iter() .enumerate() .filter_map(|(i, region)| { let mut job = base.for_part(i + 1); job.region = Some(region); if part_is_complete(&job) { info!("Skipping completed ClairS part {} for {}", i + 1, id); return None; } info!( "Scheduling missing/incomplete ClairS part {}:\n{job}", i + 1 ); Some((|| { let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?; let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?; let germline = job.process_germline()?; Ok(jobs_seq![job, germline, somatic_concat, somatic_pass]) })()) }) .collect(); let groups = groups?; if groups.is_empty() { info!("All ClairS parts already complete for {}", id); } else { let outputs = exec_jobs( groups, base.config.slurm_runner, base.config.slurm_max_par.into(), )?; for output in &outputs { output.save_to_file(format!("{}/clairs_", base.config.log_dir))?; } } merge_clairs_parts(&base, actual_n_parts)?; merge_clairs_germline_parts(&base, actual_n_parts)?; if !config.clairs_keep_parts { let somatic_final = config.passed_vcf.clone(); let germline_final = config.germline_passed_vcf.clone(); anyhow::ensure!( Path::new(&somatic_final).exists(), "Refusing to clean up parts: merged somatic VCF not found at {}", somatic_final ); anyhow::ensure!( Path::new(&germline_final).exists(), "Refusing to clean up parts: merged germline VCF not found at {}", germline_final ); let base_dir = config.output_dir.clone(); for i in 1..=actual_n_parts { let part_dir = format!("{base_dir}/part{i}"); if Path::new(&part_dir).exists() { fs::remove_dir_all(&part_dir) .with_context(|| format!("Failed to remove part directory: {part_dir}"))?; } } info!("Cleaned up {} part directories for {}", actual_n_parts, id); } info!( "ClairS resumed/completed for {}: {} parts merged", id, actual_n_parts ); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::helpers::test_init; #[test] fn clairs_version() -> anyhow::Result<()> { test_init(); let vl = ClairS::version(&Config::default())?; info!("ClairS local version: {vl}"); let vs = ClairS::version_slurm(&Config::default())?; info!("ClairS slurm version: {vs}"); assert_eq!(vl, vs); Ok(()) } #[test] fn clairs_run() -> anyhow::Result<()> { test_init(); let id = "CHAHA"; let config = Config::default(); let mut clairs = ClairS::initialize(id, &config)?; crate::runners::Run::run(&mut clairs)?; Ok(()) } #[test] fn clairs_resume() -> anyhow::Result<()> { test_init(); let id = "CHAHA"; let config = Config::default(); let mut clairs = ClairS::initialize(id, &config)?; clairs.run_chunked_resume(60); Ok(()) } }