//! # ClairS Somatic Variant Calling Pipeline //! //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS), //! a deep learning-based somatic variant caller designed for long-read sequencing data //! (ONT and PacBio HiFi). //! //! ## Overview //! //! ClairS performs somatic variant calling on paired tumor-normal samples using: //! //! - Haplotype-aware variant calling with LongPhase integration //! - Separate SNV and indel calling pipelines //! - Clair3-based germline variant detection on the normal sample //! //! ## Key Features //! //! - **Deep learning-based** - CNN models trained on long-read data //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy //! - **Dual output** - Somatic and germline variants in a single run //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi //! - **Parallel execution** - Genome chunking for HPC scalability //! //! ## Requirements //! //! Before running ClairS, ensure: //! - Tumor and normal BAMs are indexed (`.bai` files present) //! - Reference genome is accessible //! - Singularity/Docker image is available //! - Platform is correctly specified (`config.clairs_platform`) //! //! ## Execution Modes //! //! The module supports three execution strategies: //! //! - **Local** ([`ClairS::run_local`]) — Single-node execution, useful for debugging //! - **Slurm** ([`ClairS::run_sbatch`]) — Single HPC job submission //! - **Chunked** ([`run_clairs_chunked_sbatch_with_merge`]) — Parallel execution across genome regions //! //! ### Chunked Parallel Execution //! //! For large genomes, the chunked mode provides significant speedup: //! //! ```text //! ┌─────────────────────────────────────────────────────────────────┐ //! │ Genome Splitting │ //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │ //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘ //! │ │ │ │ //! ▼ ▼ ▼ ▼ //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │ //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │ //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ //! │ │ │ │ //! ▼ ▼ ▼ ▼ //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│ //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │ //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ //! │ │ │ │ //! └───────────────┴───────┬───────┴───────────────────┘ //! ▼ //! ┌─────────────────────┐ //! │ bcftools concat │ //! │ (somatic PASS) │ //! └──────────┬──────────┘ //! ▼ //! ┌─────────────────────┐ //! │ Final VCF Output │ //! └─────────────────────┘ //! ``` //! //! ## Output Files //! //! Somatic variants (PASS only): //! ```text //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz //! ``` //! //! Germline variants (PASS only): //! ```text //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz //! ``` //! //! ## Post-processing Pipeline //! //! Each ClairS run (or part) undergoes automatic post-processing: //! //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants //! 2. **Germline**: Filter PASS variants from Clair3 germline output //! //! ## Usage //! //! ### Basic Slurm Execution //! //! ```ignore //! use crate::config::Config; //! use crate::pipes::clairs::ClairS; //! use crate::pipes::Initialize; //! //! let config = Config::default(); //! let mut clairs = ClairS::initialize("sample_001", &config)?; //! let output = clairs.run_sbatch()?; //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ### Chunked Parallel Execution (Recommended for WGS) //! //! ```ignore //! use crate::config::Config; //! use crate::pipes::clairs::run_clairs_chunked_sbatch_with_merge; //! //! let config = Config::default(); //! let outputs = run_clairs_chunked_sbatch_with_merge("sample_001", &config, 20)?; //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ### Loading Variants for Downstream Analysis //! //! ```ignore //! use crate::annotation::Annotations; //! use crate::variant::vcf_variant::Variants; //! //! let annotations = Annotations::new(); //! let somatic = clairs.variants(&annotations)?; //! let germline = clairs.germline(&annotations)?; //! //! println!("Somatic: {} variants", somatic.variants.len()); //! println!("Germline: {} variants", germline.variants.len()); //! # Ok::<(), anyhow::Error>(()) //! ``` //! //! ## Configuration Requirements //! //! The following [`Config`](crate::config::Config) fields must be set: //! //! - `singularity_bin` — Path to Singularity executable //! - `clairs_image` — ClairS container image path //! - `reference` — Reference genome FASTA //! - `clairs_threads` — CPU threads per job //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.) //! - `clairs_force` — Force re-run even if outputs exist //! //! ## Implemented Traits //! //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating //! - [`JobCommand`](crate::commands::Command) — Command string generation //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission //! - [`Run`](crate::runners::Run) — Unified execution interface //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier //! - [`Version`](crate::pipes::Version) — Tool version extraction //! //! ## Dependencies //! //! External tools (containerized or system): //! //! - **ClairS** — Somatic variant calling //! - **bcftools** — VCF concatenation and filtering //! //! ## References //! //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS) //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2) use crate::{ annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{ Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, samtools::SamtoolsMergeMany }, config::Config, helpers::{ get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists, singularity_bind_flags, split_genome_into_n_regions_exact, }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{ variant_collection::VariantCollection, vcf_variant::{Label, Variants}, } }; use anyhow::Context; use log::{debug, info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use rust_htslib::bam::{self, Read}; use std::{ fmt, fs, path::{Path, PathBuf}, process::{Command as ProcessCommand, Stdio}, }; use uuid::Uuid; /// ClairS haplotype-aware somatic variant caller runner. /// /// Executes ClairS for paired tumor-normal variant calling with automatic /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction). /// /// # Fields /// /// - `id` - Sample identifier (e.g., "34528") /// - `config` - Global pipeline configuration /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/clairs") /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`) /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N) /// /// # Execution Modes /// /// - **Local** - Direct execution via `run_local()` /// - **Slurm** - Single job submission via `run_sbatch()` /// - **Chunked** - Parallel genome-wide execution via [`run_clairs_chunked_sbatch_with_merge`] /// /// # Output Files /// /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz` /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz` /// /// # Chunked Execution /// /// For whole-genome sequencing, use [`run_clairs_chunked_sbatch_with_merge`] which: /// 1. Splits genome into N equal-sized regions /// 2. Runs ClairS in parallel Slurm jobs (one per region) /// 3. Post-processes each part (concat SNV+indel, filter PASS) /// 4. Merges all part PASS VCFs into final output #[derive(Debug, Clone)] pub struct ClairS { /// Sample identifier pub id: String, /// Global pipeline configuration pub config: Config, /// Directory for log file storage pub log_dir: String, /// Optional genomic region restriction (format: "chr:start-end") pub region: Option, /// Optional part index for chunked parallel runs (1-indexed) pub part_index: Option, } impl fmt::Display for ClairS { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "🧬 ClairS {}", self.id)?; writeln!( f, " Region : {}", self.region.as_deref().unwrap_or("genome-wide") )?; writeln!( f, " Part : {}", self.part_index .map_or("full".into(), |n| format!("part{n}")) )?; writeln!(f, " Log dir : {}", self.log_dir) } } impl Initialize for ClairS { fn initialize(id: &str, config: &Config) -> anyhow::Result { let id = id.to_string(); info!("Initializing ClairS for {id}"); let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id); let clairs = Self { id, log_dir, config: config.clone(), region: None, part_index: None, }; if clairs.config.clairs_force { remove_dir_if_exists(&clairs.config.clairs_output_dir(&clairs.id))?; } Ok(clairs) } } impl ShouldRun for ClairS { /// Determines whether ClairS should be re-run based on BAM modification timestamps. fn should_run(&self) -> bool { let passed_vcf = &self.config.clairs_passed_vcf(&self.id); let normal_older = is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true); let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true); let result = normal_older || tumor_older; if result { info!("ClairS should run for id: {}.", self.id); } result } } impl JobCommand for ClairS { fn init(&mut self) -> anyhow::Result<()> { let output_dir = self.part_output_dir(); fs::create_dir_all(&output_dir) .with_context(|| format!("Failed to create dir: {output_dir}"))?; fs::create_dir_all(&self.log_dir) .with_context(|| format!("Failed to create dir: {}", self.log_dir))?; Ok(()) } fn cmd(&self) -> String { let output_dir = self.part_output_dir(); let region_arg = self .region .as_ref() .map(|r| format!("-r '{r}'")) .unwrap_or_default(); let sample_name = format!("{}_diag", self.id); let bind_flags = singularity_bind_flags([ &self.config.tumoral_bam(&self.id), &self.config.normal_bam(&self.id), &self.config.reference, &output_dir, &self.log_dir, &self.config.tmp_dir, ]); format!( "{singularity_bin} exec \ {binds} \ --bind {output_dir}:{output_dir} \ {image} \ /opt/bin/run_clairs \ -T {tumor_bam} \ -N {normal_bam} \ -R {reference} \ -t {threads} \ -p {platform} \ --enable_indel_calling \ --include_all_ctgs \ --print_germline_calls \ --enable_clair3_germline_output \ --use_longphase_for_intermediate_haplotagging true \ --output_dir {output_dir} \ -s {sample_name} \ {region_arg}", singularity_bin = self.config.singularity_bin, binds = bind_flags, image = self.config.clairs_image, tumor_bam = self.config.tumoral_bam(&self.id), normal_bam = self.config.normal_bam(&self.id), reference = self.config.reference, threads = self.config.clairs_threads, platform = self.config.clairs_platform, output_dir = output_dir, sample_name = sample_name, region_arg = region_arg, ) } } impl LocalRunner for ClairS {} impl LocalBatchRunner for ClairS {} impl SlurmRunner for ClairS { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some(format!("clairs_{}", self.id)), cpus_per_task: Some(self.config.clairs_threads as u32), mem: Some("80G".into()), partition: Some("shortq".into()), gres: None, } .to_args() } } impl SbatchRunner for ClairS { fn slurm_params(&self) -> SlurmParams { SlurmParams { job_name: Some(format!("clairs_{}", self.id)), cpus_per_task: Some(self.config.clairs_threads as u32), mem: Some("70G".into()), partition: Some("shortq".into()), gres: None, } } } impl Run for ClairS { fn run(&mut self) -> anyhow::Result<()> { if !self.should_run() { info!("ClairS is up-to-date."); return Ok(()); } // Acquire lock before any work let lock_dir = format!("{}/locks", self.config.result_dir); let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs") .with_context(|| format!("Cannot start ClairS for {}", self.id))?; if self.config.slurm_runner { run_clairs_chunked(&self.id, &self.config, 50)?; // merge_haplotaged_tmp_bam(&self.config, &self.id)?; } else { run!(&self.config, self)?; let mut germline = self.process_germline()?; let report = run!(&self.config, &mut germline) .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?; report .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir)) .context("Failed to save germline PASS logs")?; let (mut concat, tmp_path) = self.process_somatic_concat()?; let (snv_vcf, indel_vcf) = self.config.clairs_output_vcfs(&self.id); let report = run!(&self.config, &mut concat) .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?; report .save_to_file(format!("{}/bcftools_concat_", self.log_dir)) .context("Failed to save concat logs")?; let mut keep_pass = self.process_somatic_pass(&tmp_path)?; let report = run!(&self.config, &mut keep_pass) .with_context(|| format!("Failed to filter PASS for {}", self.id))?; report .save_to_file(format!("{}/bcftools_pass_", self.log_dir)) .context("Failed to save PASS filter logs")?; // Clean up temporary concatenated VCF debug!("Removing temporary file: {}", tmp_path); if let Err(e) = fs::remove_file(&tmp_path) { warn!("Failed to remove temporary file {}: {}", tmp_path, e); } } Ok(()) } } impl ClairS { /// Returns the germline PASS VCF path. /// /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz` /// - Full runs: canonical path from config fn germline_passed_vcf_path(&self) -> String { match self.part_index { Some(idx) => { let outdir = self.part_output_dir(); format!("{outdir}/clair3_germline.part{idx}.pass.vcf.gz") } None => self.config.clairs_germline_passed_vcf(&self.id), } } fn process_germline(&self) -> anyhow::Result { let germline_passed = self.germline_passed_vcf_path(); let germline_input = match self.part_index { Some(_) => { let output_dir = self.part_output_dir(); let base = self.config.clairs_germline_normal_vcf(&self.id); let base_name = Path::new(&base) .file_name() .expect("Germline VCF should have filename") .to_string_lossy(); format!("{output_dir}/{base_name}") } None => self.config.clairs_germline_normal_vcf(&self.id), }; info!( "Filtering germline PASS variants for {} part {:?}", self.id, self.part_index ); let cmd = BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone()); Ok(cmd) } fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> { let output_dir = self.part_output_dir(); let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id); let snv_vcf = format!( "{}/{}", output_dir, Path::new(&snv_vcf_base) .file_name() .expect("SNV VCF should have filename") .to_string_lossy() ); let indel_vcf = format!( "{}/{}", output_dir, Path::new(&indel_vcf_base) .file_name() .expect("Indel VCF should have filename") .to_string_lossy() ); info!( "Concatenating and filtering somatic variants for {} part {:?}", self.id, self.part_index ); let tmp_file = Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4())); let tmp_path = tmp_file .to_str() .context("Temp path not valid UTF-8")? .to_string(); // Concat SNV + indel Ok(( BcftoolsConcat::from_config( &self.config, vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)], &tmp_path, ), tmp_path, )) } pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result { let passed_vcf = self.somatic_passed_vcf_path(); // Filter PASS let keep_pass = BcftoolsKeepPass::from_config(&self.config, tmp_path, passed_vcf.clone()); Ok(keep_pass) } /// Returns the per-part output directory. fn part_output_dir(&self) -> String { let base_dir = self.config.clairs_output_dir(&self.id); match self.part_index { Some(idx) => format!("{base_dir}/part{idx}"), None => base_dir, } } /// Returns the somatic PASS VCF path. /// /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz` /// - Full runs: canonical path from config fn somatic_passed_vcf_path(&self) -> String { match self.part_index { Some(idx) => { let outdir = self.part_output_dir(); format!("{outdir}/clairs.part{idx}.pass.vcf.gz") } None => self.config.clairs_passed_vcf(&self.id), } } } impl CallerCat for ClairS { fn caller_cat(&self) -> Annotation { Annotation::Callers(Caller::ClairS, Sample::Somatic) } } impl Variants for ClairS { fn variants(&self, annotations: &Annotations) -> anyhow::Result { let caller = self.caller_cat(); let passed_vcf = &self.config.clairs_passed_vcf(&self.id); info!("Loading variants from {caller}: {passed_vcf}"); let variants = read_vcf(passed_vcf) .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{caller}: {} variants loaded", variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(passed_vcf.into())?, caller, }) } } impl ClairS { /// Loads germline variants from the Clair3 germline output. pub fn germline(&self, annotations: &Annotations) -> anyhow::Result { let caller = Annotation::Callers(Caller::ClairS, Sample::Germline); let germline_passed = &self.config.clairs_germline_passed_vcf(&self.id); info!("Loading germline variants from {caller}: {germline_passed}"); let variants = read_vcf(germline_passed)?; variants.par_iter().for_each(|v| { annotations.insert_update(v.hash(), std::slice::from_ref(&caller)); }); info!("{caller}: {} variants loaded", variants.len()); Ok(VariantCollection { variants, vcf: Vcf::new(germline_passed.into())?, caller, }) } } impl Label for ClairS { fn label(&self) -> String { self.caller_cat().to_string() } } impl Version for ClairS { fn version(config: &Config) -> anyhow::Result { let out = ProcessCommand::new("bash") .arg("-c") .arg(format!( "{} exec {} /opt/bin/run_clairs --version", config.singularity_bin, config.clairs_image )) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .context("Failed to spawn Singularity process")?; if !out.status.success() { let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); anyhow::bail!( "Singularity exec failed with status {}: {}", out.status, combined ); } let combined = format!( "{}{}", String::from_utf8_lossy(&out.stdout), String::from_utf8_lossy(&out.stderr) ); parse_clairs_version(&combined) } fn version_slurm(config: &Config) -> anyhow::Result { struct ClairSVersionJob<'a> { config: &'a Config, } impl JobCommand for ClairSVersionJob<'_> { fn cmd(&self) -> String { format!( "{} exec {} /opt/bin/run_clairs --version", self.config.singularity_bin, self.config.clairs_image ) } } impl SlurmRunner for ClairSVersionJob<'_> { fn slurm_args(&self) -> Vec { SlurmParams { job_name: Some("clairs_version".into()), partition: Some("shortq".into()), cpus_per_task: Some(1), mem: Some("10G".into()), gres: None, } .to_args() } } let mut job = ClairSVersionJob { config }; let out = SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?; let mut combined = out.stdout.clone(); if let Some(epilog) = &out.slurm_epilog { combined.push_str(&epilog.to_string()); } combined.push_str(&out.stderr); parse_clairs_version(&combined) } } /// Parses ClairS version from command output. fn parse_clairs_version(output: &str) -> anyhow::Result { let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid"); let caps = re .captures(output) .context("Could not parse ClairS version from output")?; Ok(caps .get(1) .expect("Regex has capture group 1") .as_str() .to_string()) } /// Merges N chunked ClairS PASS VCFs into the final output. fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let mut part = base.clone(); part.part_index = Some(i); let part_pass = part.somatic_passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing ClairS part {} PASS VCF: {}", i, part_pass ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base.config.clairs_passed_vcf(&base.id); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} ClairS part VCFs into {}", n_parts, final_passed_vcf ); let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp); run!(&base.config, &mut concat).context("Failed to run bcftools concat for ClairS parts")?; fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged ClairS PASS VCF CSI")?; info!( "Successfully merged {} ClairS parts into {}", n_parts, final_passed_vcf ); Ok(()) } fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> { let mut part_pass_paths: Vec = Vec::with_capacity(n_parts); for i in 1..=n_parts { let mut part = base.clone(); part.part_index = Some(i); let part_pass = part.germline_passed_vcf_path(); anyhow::ensure!( Path::new(&part_pass).exists(), "Missing ClairS germline part {} PASS VCF: {}", i, part_pass ); part_pass_paths.push(PathBuf::from(part_pass)); } let final_passed_vcf = base.config.clairs_germline_passed_vcf(&base.id); let rand = Uuid::new_v4(); let final_tmp = format!("{final_passed_vcf}_{rand}.tmp"); let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi"); if let Some(parent) = Path::new(&final_passed_vcf).parent() { fs::create_dir_all(parent)?; } info!( "Concatenating {} ClairS germline part VCFs into {}", n_parts, final_passed_vcf ); let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp); run!(&base.config, &mut concat) .context("Failed to run bcftools concat for ClairS germline parts")?; fs::rename(&final_tmp, &final_passed_vcf) .context("Failed to rename merged ClairS germline PASS VCF")?; fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi")) .context("Failed to rename merged ClairS germline PASS VCF CSI")?; info!( "Successfully merged {} ClairS germline parts into {}", n_parts, final_passed_vcf ); Ok(()) } pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> { let dir = config.clairs_output_dir(id); let bams: Vec = list_files_recursive(dir) .into_iter() .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam")) .collect(); let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf(); info!("Merging {} into: {}", bams.len(), into.display()); let mut merge = SamtoolsMergeMany::from_config(into, bams, config); run!(config, &mut merge)?; Ok(()) } /// Runs ClairS in parallel chunks, then merges results. /// /// Splits the genome into N equal-sized regions, runs ClairS on each region /// in parallel (local or Slurm based on `config.slurm_runner`), post-processes /// each part (concatenates SNV+indel, filters PASS), and merges both somatic /// and germline VCFs. /// /// # Arguments /// /// * `id` - Sample identifier /// * `config` - Global pipeline configuration /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome) /// /// # Returns /// /// `Ok(())` on success, or an error if any step fails. /// /// # Errors /// /// Returns an error if: /// - `n_parts` is 0 /// - Normal BAM file cannot be opened or is corrupted /// - BAM header is malformed /// - ClairS execution fails on any part /// - SNV+indel concatenation fails /// - PASS filtering fails /// - Somatic or germline VCF merging fails /// - Output directory cannot be created /// /// # Example /// /// ```ignore /// let config = Config::default(); /// run_clairs_chunked("sample_001", &config, 30)?; /// ``` pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> { anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0"); let lock_dir = format!("{}/locks", config.result_dir); let _lock = SampleLock::acquire(&lock_dir, id, "clairs") .with_context(|| format!("Cannot start ClairS chunked for {}", id))?; let base = ClairS::initialize(id, config)?; if !base.should_run() { debug!("ClairS PASS VCF already up-to-date for {id}, skipping."); return Ok(()); } let normal_bam = config.normal_bam(id); let reader = bam::Reader::from_path(&normal_bam) .with_context(|| format!("Failed to open BAM: {normal_bam}"))?; let header = bam::Header::from_template(reader.header()); let genome_sizes = get_genome_sizes(&header)?; let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts) .into_iter() .flatten() .collect::>(); let actual_n_parts = regions.len(); info!( "Running ClairS in {} parallel parts for {}", actual_n_parts, id ); let mut jobs = Vec::with_capacity(actual_n_parts); for (i, region) in regions.into_iter().enumerate() { let mut job = base.clone(); job.part_index = Some(i + 1); job.region = Some(region); job.log_dir = format!("{}/part{}", base.log_dir, i + 1); info!("Planned ClairS job:\n{job}"); jobs.push(job); } let outputs = run_many!(config, jobs.clone())?; for output in outputs.iter() { output.save_to_file(format!("{}/clairs_", base.log_dir))?; } // 1) Germline PASS filters (one per part) let mut germlines = Vec::new(); // 2a) Somatic SNV+indel concats (one per part) let mut somatic_concats = Vec::new(); let mut somatic_tmp_paths = Vec::new(); for job in &jobs { // germline PASS germlines.push(job.process_germline()?); // somatic concat (SNV+indel -> tmp VCF) let (concat, tmp_path) = job.process_somatic_concat()?; somatic_concats.push(concat); somatic_tmp_paths.push(tmp_path); } // Run all germline PASS filters in parallel run_many!(config, germlines)?; // Run all somatic concats in parallel run_many!(config, somatic_concats)?; // 2b) Somatic PASS filters (one per part, based on tmp VCFs) let mut somatic_pass_filters = Vec::new(); for (job, tmp_path) in jobs.iter().zip(somatic_tmp_paths.iter()) { somatic_pass_filters.push(job.process_somatic_pass(tmp_path)?); } // Run all somatic PASS filters in parallel run_many!(config, somatic_pass_filters)?; merge_clairs_parts(&base, actual_n_parts)?; merge_clairs_germline_parts(&base, actual_n_parts)?; info!( "ClairS completed for {}: {} parts merged (somatic + germline)", id, actual_n_parts ); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::helpers::test_init; #[test] fn clairs_version() -> anyhow::Result<()> { test_init(); let vl = ClairS::version(&Config::default())?; info!("ClairS local version: {vl}"); let vs = ClairS::version_slurm(&Config::default())?; info!("ClairS slurm version: {vs}"); assert_eq!(vl, vs); Ok(()) } #[test] fn clairs_run() -> anyhow::Result<()> { test_init(); let config = Config::default(); // let clairs = ClairS::initialize("34528", &config)?; // info!("{clairs}"); run_clairs_chunked("CHAHA", &config, 50)?; Ok(()) } #[test] fn clairs_haplo() -> anyhow::Result<()> { test_init(); let config = Config::default(); merge_haplotaged_tmp_bam(&config, "DUMCO")?; Ok(()) } }