clairs.rs 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202
  1. //! # ClairS Somatic Variant Calling Pipeline
  2. //!
  3. //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS),
  4. //! a deep learning-based somatic variant caller designed for long-read sequencing data
  5. //! (ONT and PacBio HiFi).
  6. //!
  7. //! ## Overview
  8. //!
  9. //! ClairS performs somatic variant calling on paired tumor-normal samples using:
  10. //!
  11. //! - Haplotype-aware variant calling with LongPhase integration
  12. //! - Separate SNV and indel calling pipelines
  13. //! - Clair3-based germline variant detection on the normal sample
  14. //!
  15. //! ## Key Features
  16. //!
  17. //! - **Deep learning-based** - CNN models trained on long-read data
  18. //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy
  19. //! - **Dual output** - Somatic and germline variants in a single run
  20. //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi
  21. //! - **Parallel execution** - Genome chunking for HPC scalability
  22. //!
  23. //! ## Requirements
  24. //!
  25. //! Before running ClairS, ensure:
  26. //! - Tumor and normal BAMs are indexed (`.bai` files present)
  27. //! - Reference genome is accessible
  28. //! - Singularity/Docker image is available
  29. //! - Platform is correctly specified (`config.clairs_platform`)
  30. //!
  31. //! ## Execution Modes
  32. //!
  33. //! The module supports three execution strategies, all dispatched through [`Run::run`]:
  34. //!
  35. //! - **Local** — Single-node execution via [`LocalRunner`](crate::commands::LocalRunner)
  36. //! - **Slurm** — Single HPC job submission via [`SbatchRunner`](crate::commands::SbatchRunner)
  37. //! - **Chunked** — Parallel execution across genome regions (enabled automatically when
  38. //! `config.slurm_runner = true`, calls the internal `run_chunked` function)
  39. //!
  40. //! ### Chunked Parallel Execution
  41. //!
  42. //! For large genomes, the chunked mode provides significant speedup:
  43. //!
  44. //! ```text
  45. //! ┌─────────────────────────────────────────────────────────────────┐
  46. //! │ Genome Splitting │
  47. //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │
  48. //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘
  49. //! │ │ │ │
  50. //! ▼ ▼ ▼ ▼
  51. //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
  52. //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │
  53. //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │
  54. //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
  55. //! │ │ │ │
  56. //! ▼ ▼ ▼ ▼
  57. //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
  58. //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│
  59. //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │
  60. //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │
  61. //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
  62. //! │ │ │ │
  63. //! └───────────────┴───────┬───────┴───────────────────┘
  64. //! ▼
  65. //! ┌─────────────────────┐
  66. //! │ bcftools concat │
  67. //! │ (somatic PASS) │
  68. //! └──────────┬──────────┘
  69. //! ▼
  70. //! ┌─────────────────────┐
  71. //! │ Final VCF Output │
  72. //! └─────────────────────┘
  73. //! ```
  74. //!
  75. //! ## Output Files
  76. //!
  77. //! Somatic variants (PASS only):
  78. //! ```text
  79. //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz
  80. //! ```
  81. //!
  82. //! Germline variants (PASS only):
  83. //! ```text
  84. //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz
  85. //! ```
  86. //!
  87. //! ## Post-processing Pipeline
  88. //!
  89. //! Each ClairS run (or part) undergoes automatic post-processing:
  90. //!
  91. //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants
  92. //! 2. **Germline**: Filter PASS variants from Clair3 germline output
  93. //!
  94. //! ## Usage
  95. //!
  96. //! ### Basic Execution
  97. //!
  98. //! ```ignore
  99. //! use pandora_lib_promethion::callers::clairs::ClairS;
  100. //! use pandora_lib_promethion::config::Config;
  101. //! use pandora_lib_promethion::pipes::Initialize;
  102. //! use pandora_lib_promethion::runners::Run;
  103. //!
  104. //! let config = Config::default();
  105. //! // config.slurm_runner = true → chunked Slurm execution (60 parts)
  106. //! // config.slurm_runner = false → local single-node execution
  107. //! let mut clairs = ClairS::initialize("sample_001", &config)?;
  108. //! clairs.run()?;
  109. //! # Ok::<(), anyhow::Error>(())
  110. //! ```
  111. //!
  112. //! ### Loading Variants for Downstream Analysis
  113. //!
  114. //! ```ignore
  115. //! use pandora_lib_promethion::annotation::Annotations;
  116. //! use pandora_lib_promethion::variant::vcf_variant::Variants;
  117. //!
  118. //! let annotations = Annotations::new();
  119. //! let somatic = clairs.variants(&annotations)?;
  120. //! let germline = clairs.germline(&annotations)?;
  121. //!
  122. //! println!("Somatic: {} variants", somatic.variants.len());
  123. //! println!("Germline: {} variants", germline.variants.len());
  124. //! # Ok::<(), anyhow::Error>(())
  125. //! ```
  126. //!
  127. //! ## Configuration Requirements
  128. //!
  129. //! The following [`Config`](crate::config::Config) fields must be set:
  130. //!
  131. //! - `singularity_bin` — Path to Singularity executable
  132. //! - `clairs_image` — ClairS container image path
  133. //! - `reference` — Reference genome FASTA
  134. //! - `clairs_threads` — CPU threads per job
  135. //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.)
  136. //! - `clairs_force` — Force re-run even if outputs exist
  137. //!
  138. //! ## Implemented Traits
  139. //!
  140. //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation
  141. //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating
  142. //! - [`JobCommand`](crate::commands::Command) — Command string generation
  143. //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support
  144. //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission
  145. //! - [`Run`](crate::runners::Run) — Unified execution interface
  146. //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading
  147. //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category
  148. //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier
  149. //! - [`Version`](crate::pipes::Version) — Tool version extraction
  150. //!
  151. //! ## Dependencies
  152. //!
  153. //! External tools (containerized or system):
  154. //!
  155. //! - **ClairS** — Somatic variant calling
  156. //! - **bcftools** — VCF concatenation and filtering
  157. //!
  158. //! ## References
  159. //!
  160. //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
  161. //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
  162. use crate::{
  163. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  164. collection::vcf::Vcf,
  165. commands::{
  166. bcftools::{BcftoolsConcat, BcftoolsKeepPass},
  167. exec_jobs,
  168. samtools::SamtoolsMergeMany,
  169. AnyJob, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
  170. SlurmRunner,
  171. },
  172. config::Config,
  173. helpers::{
  174. get_genome_sizes_in_header_order, is_file_older, list_files_recursive,
  175. remove_dir_if_exists, singularity_bind_flags, split_ordered_genome_into_n_regions_exact,
  176. },
  177. io::vcf::read_vcf,
  178. jobs_seq,
  179. locker::SampleLock,
  180. pipes::{Initialize, ShouldRun, Version},
  181. runners::Run,
  182. variant::{
  183. variant_collection::VariantCollection,
  184. vcf_variant::{Label, Variants},
  185. },
  186. };
  187. use anyhow::Context;
  188. use log::{debug, info, warn};
  189. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  190. use regex::Regex;
  191. use rust_htslib::bam::{self, Read};
  192. use std::{
  193. fmt, fs,
  194. path::{Path, PathBuf},
  195. process::{Command as ProcessCommand, Stdio},
  196. };
  197. use uuid::Uuid;
  198. /// ClairS haplotype-aware somatic variant caller runner.
  199. ///
  200. /// Executes ClairS for paired tumor-normal variant calling with automatic
  201. /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction).
  202. ///
  203. /// # Fields
  204. ///
  205. /// - `id` - Sample identifier (e.g., "34528")
  206. /// - `config` - All resolved paths, tool settings, and Slurm parameters (see [`ClairsConfig`])
  207. /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`)
  208. /// - `part_index` - Chunk index when running in parallel (e.g., `Some(3)` for part 3 of N);
  209. /// `None` for full-genome runs
  210. ///
  211. /// # Execution
  212. ///
  213. /// Call [`Run::run`] — it dispatches automatically:
  214. /// - `config.slurm_runner = false` → local single-node execution
  215. /// - `config.slurm_runner = true` → chunked parallel Slurm execution (60 genome parts)
  216. ///
  217. /// # Output Files
  218. ///
  219. /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
  220. /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
  221. #[derive(Debug, Clone)]
  222. pub struct ClairS {
  223. pub id: String,
  224. pub config: ClairsConfig,
  225. pub region: Option<String>,
  226. pub part_index: Option<usize>,
  227. }
  228. /// Resolved configuration for a single ClairS run.
  229. ///
  230. /// Built once at initialisation via [`ClairsConfig::from_config`] and carried
  231. /// inside [`ClairS`]. All path templates from [`Config`] are expanded here so
  232. /// that `ClairS` methods never need to re-expand them.
  233. #[derive(Debug, Clone)]
  234. pub struct ClairsConfig {
  235. // Execution
  236. pub singularity_bin: String,
  237. pub clairs_image: String,
  238. pub clairs_threads: u8,
  239. pub clairs_platform: String,
  240. pub clairs_force: bool,
  241. pub clairs_keep_parts: bool,
  242. // Paths
  243. pub reference: String,
  244. pub tmp_dir: String,
  245. pub result_dir: String,
  246. // Slurm
  247. pub slurm_runner: bool,
  248. pub slurm_max_par: u8,
  249. // Sample-scoped paths, computed once at init
  250. pub normal_bam: String,
  251. pub tumoral_bam: String,
  252. pub passed_vcf: String,
  253. pub germline_passed_vcf: String,
  254. pub germline_normal_vcf: String,
  255. pub output_dir: String,
  256. pub snv_vcf: String,
  257. pub indel_vcf: String,
  258. pub log_dir: String,
  259. pub lock_dir: String,
  260. // BcftoolsConcat
  261. pub bcftools_bin: String,
  262. pub bcftools_threads: u8,
  263. }
  264. impl ClairsConfig {
  265. pub fn from_config(config: &Config, id: &str) -> Self {
  266. let output_dir = config.clairs_output_dir(id);
  267. let (snv_vcf, indel_vcf) = config.clairs_output_vcfs(id);
  268. Self {
  269. singularity_bin: config.singularity_bin.clone(),
  270. clairs_image: config.clairs_image.clone(),
  271. clairs_threads: config.clairs_threads,
  272. clairs_platform: config.clairs_platform.clone(),
  273. clairs_force: config.clairs_force,
  274. clairs_keep_parts: config.clairs_keep_parts,
  275. reference: config.reference.clone(),
  276. tmp_dir: config.tmp_dir.clone(),
  277. result_dir: config.result_dir.clone(),
  278. slurm_runner: config.slurm_runner,
  279. slurm_max_par: config.slurm_max_par,
  280. normal_bam: config.normal_bam(id),
  281. tumoral_bam: config.tumoral_bam(id),
  282. passed_vcf: config.clairs_passed_vcf(id),
  283. germline_passed_vcf: config.clairs_germline_passed_vcf(id),
  284. germline_normal_vcf: config.clairs_germline_normal_vcf(id),
  285. output_dir,
  286. snv_vcf,
  287. indel_vcf,
  288. log_dir: format!("{}/{}/log/clairs", config.result_dir, id),
  289. lock_dir: format!("{}/locks", config.result_dir),
  290. bcftools_bin: config.bcftools_bin.clone(),
  291. bcftools_threads: config.bcftools_threads,
  292. }
  293. }
  294. }
  295. impl fmt::Display for ClairS {
  296. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  297. writeln!(f, "🧬 ClairS {}", self.id)?;
  298. writeln!(
  299. f,
  300. " Region : {}",
  301. self.region.as_deref().unwrap_or("genome-wide")
  302. )?;
  303. writeln!(
  304. f,
  305. " Part : {}",
  306. self.part_index
  307. .map_or("full".into(), |n| format!("part{n}"))
  308. )?;
  309. writeln!(f, " Log dir : {}", self.config.log_dir)
  310. }
  311. }
  312. impl Initialize for ClairS {
  313. fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
  314. let id = id.to_string();
  315. info!("Initializing ClairS for {id}");
  316. let clairs = Self {
  317. config: ClairsConfig::from_config(config, &id),
  318. id,
  319. region: None,
  320. part_index: None,
  321. };
  322. if config.clairs_force {
  323. remove_dir_if_exists(&config.clairs_output_dir(&clairs.id))?;
  324. }
  325. Ok(clairs)
  326. }
  327. }
  328. impl ShouldRun for ClairS {
  329. /// Determines whether ClairS should be re-run based on BAM modification timestamps.
  330. fn should_run(&self) -> bool {
  331. let passed_vcf = &self.config.passed_vcf;
  332. let normal_older = is_file_older(passed_vcf, &self.config.normal_bam, true).unwrap_or(true);
  333. let tumor_older = is_file_older(passed_vcf, &self.config.tumoral_bam, true).unwrap_or(true);
  334. let result = normal_older || tumor_older;
  335. if result {
  336. info!("ClairS should run for id: {}.", self.id);
  337. }
  338. result
  339. }
  340. }
  341. impl JobCommand for ClairS {
  342. fn init(&mut self) -> anyhow::Result<()> {
  343. let output_dir = &self.config.output_dir;
  344. fs::create_dir_all(output_dir)
  345. .with_context(|| format!("Failed to create dir: {output_dir}"))?;
  346. fs::create_dir_all(&self.config.log_dir)
  347. .with_context(|| format!("Failed to create dir: {}", self.config.log_dir))?;
  348. Ok(())
  349. }
  350. fn cmd(&self) -> String {
  351. let output_dir = &self.config.output_dir;
  352. let region_arg = self
  353. .region
  354. .as_ref()
  355. .map(|r| format!("-r '{r}'"))
  356. .unwrap_or_default();
  357. let sample_name = format!("{}_diag", self.id);
  358. let bind_flags = singularity_bind_flags([
  359. &self.config.tumoral_bam,
  360. &self.config.normal_bam,
  361. &self.config.reference,
  362. output_dir,
  363. &self.config.log_dir,
  364. &self.config.tmp_dir,
  365. ]);
  366. format!(
  367. "{singularity_bin} exec --cleanenv \
  368. {binds} \
  369. --bind {output_dir}:{output_dir} \
  370. {image} \
  371. /opt/bin/run_clairs \
  372. -T {tumor_bam} \
  373. -N {normal_bam} \
  374. -R {reference} \
  375. -t {threads} \
  376. -p {platform} \
  377. --enable_indel_calling \
  378. --include_all_ctgs \
  379. --print_germline_calls \
  380. --enable_clair3_germline_output \
  381. --use_longphase_for_intermediate_haplotagging true \
  382. --output_dir {output_dir} \
  383. -s {sample_name} \
  384. {region_arg}",
  385. singularity_bin = self.config.singularity_bin,
  386. binds = bind_flags,
  387. image = self.config.clairs_image,
  388. tumor_bam = self.config.tumoral_bam,
  389. normal_bam = self.config.normal_bam,
  390. reference = self.config.reference,
  391. threads = self.config.clairs_threads,
  392. platform = self.config.clairs_platform,
  393. output_dir = output_dir,
  394. sample_name = sample_name,
  395. region_arg = region_arg,
  396. )
  397. }
  398. }
  399. impl LocalRunner for ClairS {}
  400. // impl SlurmRunner for ClairS {
  401. // fn slurm_args(&self) -> Vec<String> {
  402. // self.slurm_params().to_args()
  403. // }
  404. // }
  405. impl SbatchRunner for ClairS {
  406. fn slurm_params(&self) -> SlurmParams {
  407. let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
  408. SlurmParams {
  409. job_name: Some(format!("clairs_{}{}", self.id, batch_id)),
  410. cpus_per_task: Some(self.config.clairs_threads as u32),
  411. mem: Some("50G".into()),
  412. partition: Some("shortq".into()),
  413. gres: None,
  414. }
  415. }
  416. }
  417. impl Run for ClairS {
  418. fn run(&mut self) -> anyhow::Result<()> {
  419. if !self.should_run() {
  420. info!("ClairS is up-to-date.");
  421. return Ok(());
  422. }
  423. // Acquire lock before any work
  424. let lock_dir = format!("{}/locks", self.config.result_dir);
  425. let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
  426. .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
  427. if self.config.slurm_runner {
  428. run_chunked(&self.id, &self.config, 60)?;
  429. } else {
  430. let (somatic_concat, somatic_tmp_path) = self.process_somatic_concat()?;
  431. let somatic_pass = self.process_somatic_pass(&somatic_tmp_path)?;
  432. let germline = self.process_germline()?;
  433. let outputs = exec_jobs(
  434. vec![jobs_seq![
  435. self.clone(),
  436. germline,
  437. somatic_concat,
  438. somatic_pass,
  439. ]],
  440. false,
  441. 1,
  442. )
  443. .context("Failed to run ClairS local pipeline")?;
  444. for output in &outputs {
  445. output.save_to_file(format!("{}/clairs_", self.config.log_dir))?;
  446. }
  447. // Clean up temporary concatenated VCF
  448. debug!("Removing temporary file: {}", somatic_tmp_path);
  449. if let Err(e) = fs::remove_file(&somatic_tmp_path) {
  450. warn!(
  451. "Failed to remove temporary file {}: {}",
  452. somatic_tmp_path, e
  453. );
  454. }
  455. }
  456. Ok(())
  457. }
  458. }
  459. impl ClairS {
  460. /// Returns the germline PASS VCF path.
  461. ///
  462. /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz`
  463. /// - Full runs: canonical path from config
  464. fn germline_passed_vcf_path(&self) -> String {
  465. match self.part_index {
  466. Some(idx) => {
  467. format!(
  468. "{}/clair3_germline.part{idx}.pass.vcf.gz",
  469. self.config.output_dir
  470. )
  471. }
  472. None => self.config.germline_passed_vcf.clone(),
  473. }
  474. }
  475. fn process_germline(&self) -> anyhow::Result<BcftoolsKeepPass> {
  476. let germline_passed = self.germline_passed_vcf_path();
  477. let germline_input = match self.part_index {
  478. Some(_) => {
  479. let output_dir = self.config.output_dir.clone();
  480. let base = self.config.germline_normal_vcf.clone();
  481. let base_name = Path::new(&base)
  482. .file_name()
  483. .with_context(|| {
  484. format!("germline VCF path has no filename component: {base}")
  485. })?
  486. .to_string_lossy();
  487. format!("{output_dir}/{base_name}")
  488. }
  489. None => self.config.germline_normal_vcf.clone(),
  490. };
  491. info!(
  492. "Filtering germline PASS variants for {} part {:?}",
  493. self.id, self.part_index
  494. );
  495. Ok(BcftoolsKeepPass {
  496. bin: self.config.bcftools_bin.clone(),
  497. threads: self.config.bcftools_threads,
  498. input: germline_input.into(),
  499. output: germline_passed.clone().into(),
  500. tmp_dir: self.config.tmp_dir.clone().into(),
  501. tmp_sort: PathBuf::new(),
  502. tmp_norm: PathBuf::new(),
  503. })
  504. }
  505. fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> {
  506. let output_dir = self.config.output_dir.clone();
  507. let snv_vcf_base = self.config.snv_vcf.clone();
  508. let indel_vcf_base = self.config.indel_vcf.clone();
  509. let snv_vcf = format!(
  510. "{}/{}",
  511. output_dir,
  512. Path::new(&snv_vcf_base)
  513. .file_name()
  514. .with_context(|| format!("SNV VCF path has no filename component: {snv_vcf_base}"))?
  515. .to_string_lossy()
  516. );
  517. let indel_vcf = format!(
  518. "{}/{}",
  519. output_dir,
  520. Path::new(&indel_vcf_base)
  521. .file_name()
  522. .with_context(|| {
  523. format!("indel VCF path has no filename component: {indel_vcf_base}")
  524. })?
  525. .to_string_lossy()
  526. );
  527. info!(
  528. "Concatenating and filtering somatic variants for {} part {:?}",
  529. self.id, self.part_index
  530. );
  531. let tmp_file =
  532. Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4()));
  533. let tmp_path = tmp_file
  534. .to_str()
  535. .context("Temp path not valid UTF-8")?
  536. .to_string();
  537. // Concat SNV + indel
  538. let concat = BcftoolsConcat {
  539. bin: self.config.bcftools_bin.clone(),
  540. threads: self.config.bcftools_threads,
  541. inputs: vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
  542. output: tmp_path.clone().into(),
  543. tmp_dir: self.config.tmp_dir.clone().into(),
  544. tmp_list: PathBuf::new(),
  545. };
  546. Ok((concat, tmp_path))
  547. }
  548. pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result<BcftoolsKeepPass> {
  549. let passed_vcf = self.somatic_passed_vcf_path();
  550. // Filter PASS
  551. let keep_pass = BcftoolsKeepPass {
  552. bin: self.config.bcftools_bin.clone(),
  553. threads: self.config.bcftools_threads,
  554. input: tmp_path.into(),
  555. output: passed_vcf.clone().into(),
  556. tmp_dir: self.config.tmp_dir.clone().into(),
  557. tmp_sort: PathBuf::new(),
  558. tmp_norm: PathBuf::new(),
  559. };
  560. Ok(keep_pass)
  561. }
  562. fn for_part(&self, idx: usize) -> Self {
  563. let mut part = self.clone();
  564. part.part_index = Some(idx);
  565. part.config.output_dir = format!("{}/part{idx}", self.config.output_dir);
  566. part.config.log_dir = format!("{}/part{idx}", self.config.log_dir);
  567. Self { ..part }
  568. }
  569. /// Returns the somatic PASS VCF path.
  570. ///
  571. /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz`
  572. /// - Full runs: canonical path from config
  573. fn somatic_passed_vcf_path(&self) -> String {
  574. match self.part_index {
  575. Some(idx) => {
  576. format!("{}/clairs.part{idx}.pass.vcf.gz", self.config.output_dir)
  577. }
  578. None => self.config.passed_vcf.clone(),
  579. }
  580. }
  581. pub fn run_chunked_resume(&self, n_parts: usize) -> anyhow::Result<()> {
  582. run_chunked_resume(&self.id, &self.config, n_parts)
  583. }
  584. }
  585. impl CallerCat for ClairS {
  586. fn caller_cat(&self) -> Annotation {
  587. Annotation::Callers(Caller::ClairS, Sample::Somatic)
  588. }
  589. }
  590. impl Variants for ClairS {
  591. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  592. let caller = self.caller_cat();
  593. let passed_vcf = &self.config.passed_vcf.clone();
  594. info!("Loading variants from {caller}: {passed_vcf}");
  595. let variants = read_vcf(passed_vcf)
  596. .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
  597. variants.par_iter().for_each(|v| {
  598. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  599. });
  600. info!("{caller}: {} variants loaded", variants.len());
  601. Ok(VariantCollection {
  602. variants,
  603. vcf: Vcf::new(passed_vcf.into())?,
  604. caller,
  605. })
  606. }
  607. }
  608. impl ClairS {
  609. /// Loads germline variants from the Clair3 germline output.
  610. pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  611. let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
  612. let germline_passed = &self.config.germline_passed_vcf;
  613. info!("Loading germline variants from {caller}: {germline_passed}");
  614. let variants = read_vcf(germline_passed)?;
  615. variants.par_iter().for_each(|v| {
  616. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  617. });
  618. info!("{caller}: {} variants loaded", variants.len());
  619. Ok(VariantCollection {
  620. variants,
  621. vcf: Vcf::new(germline_passed.into())?,
  622. caller,
  623. })
  624. }
  625. }
  626. impl Label for ClairS {
  627. fn label(&self) -> String {
  628. self.caller_cat().to_string()
  629. }
  630. }
  631. impl Version for ClairS {
  632. fn version(config: &Config) -> anyhow::Result<String> {
  633. let out = ProcessCommand::new("bash")
  634. .arg("-c")
  635. .arg(format!(
  636. "{} exec {} /opt/bin/run_clairs --version",
  637. config.singularity_bin, config.clairs_image
  638. ))
  639. .stdout(Stdio::piped())
  640. .stderr(Stdio::piped())
  641. .output()
  642. .context("Failed to spawn Singularity process")?;
  643. if !out.status.success() {
  644. let combined = format!(
  645. "{}{}",
  646. String::from_utf8_lossy(&out.stdout),
  647. String::from_utf8_lossy(&out.stderr)
  648. );
  649. anyhow::bail!(
  650. "Singularity exec failed with status {}: {}",
  651. out.status,
  652. combined
  653. );
  654. }
  655. let combined = format!(
  656. "{}{}",
  657. String::from_utf8_lossy(&out.stdout),
  658. String::from_utf8_lossy(&out.stderr)
  659. );
  660. parse_clairs_version(&combined)
  661. }
  662. fn version_slurm(config: &Config) -> anyhow::Result<String> {
  663. struct ClairSVersionJob<'a> {
  664. config: &'a Config,
  665. }
  666. impl JobCommand for ClairSVersionJob<'_> {
  667. fn cmd(&self) -> String {
  668. format!(
  669. "{} exec {} /opt/bin/run_clairs --version",
  670. self.config.singularity_bin, self.config.clairs_image
  671. )
  672. }
  673. }
  674. impl SlurmRunner for ClairSVersionJob<'_> {
  675. fn slurm_args(&self) -> Vec<String> {
  676. SlurmParams {
  677. job_name: Some("clairs_version".into()),
  678. partition: Some("shortq".into()),
  679. cpus_per_task: Some(1),
  680. mem: Some("10G".into()),
  681. gres: None,
  682. }
  683. .to_args()
  684. }
  685. }
  686. let mut job = ClairSVersionJob { config };
  687. let out =
  688. SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?;
  689. let mut combined = out.stdout.clone();
  690. if let Some(epilog) = &out.slurm_epilog {
  691. combined.push_str(&epilog.to_string());
  692. }
  693. combined.push_str(&out.stderr);
  694. parse_clairs_version(&combined)
  695. }
  696. }
  697. /// Parses ClairS version from command output.
  698. fn parse_clairs_version(output: &str) -> anyhow::Result<String> {
  699. let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid");
  700. let caps = re
  701. .captures(output)
  702. .context("Could not parse ClairS version from output")?;
  703. Ok(caps
  704. .get(1)
  705. .expect("Regex has capture group 1")
  706. .as_str()
  707. .to_string())
  708. }
  709. /// Merges N chunked ClairS PASS VCFs into the final output.
  710. fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
  711. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  712. for i in 1..=n_parts {
  713. let part = base.for_part(i);
  714. let part_pass = part.somatic_passed_vcf_path();
  715. anyhow::ensure!(
  716. Path::new(&part_pass).exists(),
  717. "Missing ClairS part {} PASS VCF: {}",
  718. i,
  719. part_pass
  720. );
  721. part_pass_paths.push(PathBuf::from(part_pass));
  722. }
  723. let final_passed_vcf = base.config.passed_vcf.clone();
  724. let rand = Uuid::new_v4();
  725. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  726. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  727. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  728. fs::create_dir_all(parent)?;
  729. }
  730. info!(
  731. "Concatenating {} ClairS part VCFs into {}",
  732. n_parts, final_passed_vcf
  733. );
  734. let concat = BcftoolsConcat {
  735. bin: base.config.bcftools_bin.clone(),
  736. threads: base.config.bcftools_threads,
  737. inputs: part_pass_paths,
  738. output: final_tmp.clone().into(),
  739. tmp_dir: base.config.tmp_dir.clone().into(),
  740. tmp_list: PathBuf::new(),
  741. };
  742. let outputs = exec_jobs(
  743. vec![jobs_seq![concat]],
  744. base.config.slurm_runner,
  745. base.config.slurm_max_par.into(),
  746. )
  747. .context("Failed to run bcftools concat for ClairS germline parts")?;
  748. for output in &outputs {
  749. output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
  750. }
  751. anyhow::ensure!(
  752. Path::new(&final_tmp_csi).exists(),
  753. "CSI index not found after concat: {}",
  754. final_tmp_csi
  755. );
  756. fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
  757. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  758. .context("Failed to rename merged ClairS PASS VCF CSI")?;
  759. info!(
  760. "Successfully merged {} ClairS parts into {}",
  761. n_parts, final_passed_vcf
  762. );
  763. Ok(())
  764. }
  765. fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
  766. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  767. for i in 1..=n_parts {
  768. let part = base.for_part(i);
  769. let part_pass = part.germline_passed_vcf_path();
  770. anyhow::ensure!(
  771. Path::new(&part_pass).exists(),
  772. "Missing ClairS germline part {} PASS VCF: {}",
  773. i,
  774. part_pass
  775. );
  776. part_pass_paths.push(PathBuf::from(part_pass));
  777. }
  778. let final_passed_vcf = base.config.germline_passed_vcf.clone();
  779. let rand = Uuid::new_v4();
  780. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  781. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  782. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  783. fs::create_dir_all(parent)?;
  784. }
  785. info!(
  786. "Concatenating {} ClairS germline part VCFs into {}",
  787. n_parts, final_passed_vcf
  788. );
  789. let concat = BcftoolsConcat {
  790. bin: base.config.bcftools_bin.clone(),
  791. threads: base.config.bcftools_threads,
  792. inputs: part_pass_paths,
  793. output: final_tmp.clone().into(),
  794. tmp_dir: base.config.tmp_dir.clone().into(),
  795. tmp_list: PathBuf::new(),
  796. };
  797. let outputs = exec_jobs(
  798. vec![jobs_seq![concat]],
  799. base.config.slurm_runner,
  800. base.config.slurm_max_par.into(),
  801. )
  802. .context("Failed to run bcftools concat for ClairS germline parts")?;
  803. for output in &outputs {
  804. output.save_to_file(format!("{}/bcftools_concat_", base.config.log_dir))?;
  805. }
  806. anyhow::ensure!(
  807. Path::new(&final_tmp_csi).exists(),
  808. "CSI index not found after concat: {}",
  809. final_tmp_csi
  810. );
  811. fs::rename(&final_tmp, &final_passed_vcf)
  812. .context("Failed to rename merged ClairS germline PASS VCF")?;
  813. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  814. .context("Failed to rename merged ClairS germline PASS VCF CSI")?;
  815. info!(
  816. "Successfully merged {} ClairS germline parts into {}",
  817. n_parts, final_passed_vcf
  818. );
  819. Ok(())
  820. }
  821. pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> {
  822. let dir = config.clairs_output_dir(id);
  823. let bams: Vec<PathBuf> = list_files_recursive(dir)
  824. .into_iter()
  825. .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam"))
  826. .collect();
  827. let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf();
  828. info!("Merging {} into: {}", bams.len(), into.display());
  829. let merge = SamtoolsMergeMany::from_config(into, bams, config);
  830. exec_jobs(
  831. vec![jobs_seq![merge]],
  832. config.slurm_runner,
  833. config.slurm_max_par.into(),
  834. )?;
  835. Ok(())
  836. }
  837. /// Runs ClairS in parallel chunks with sequential per-part post-processing, then merges results.
  838. ///
  839. /// # Pipeline per part (sequential)
  840. /// `ClairS main → germline PASS filter → somatic SNV+indel concat → somatic PASS filter`
  841. ///
  842. /// # Parallelism
  843. /// Parts run in parallel (bounded by `config.slurm_max_par`), dispatched locally or via
  844. /// SLURM depending on `config.slurm_runner`.
  845. ///
  846. /// # Errors
  847. /// Fails fast within a part on any step error. Already-running jobs are not cancelled.
  848. fn run_chunked(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
  849. anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
  850. let base = ClairS {
  851. id: id.to_string(),
  852. config: config.clone(),
  853. region: None,
  854. part_index: None,
  855. };
  856. let normal_bam = base.config.normal_bam.clone();
  857. let reader = bam::Reader::from_path(&normal_bam)
  858. .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
  859. let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
  860. let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
  861. .into_iter()
  862. .flatten()
  863. .collect::<Vec<String>>();
  864. let actual_n_parts = regions.len();
  865. info!(
  866. "Running ClairS in {} parallel parts for {}",
  867. actual_n_parts, id
  868. );
  869. let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
  870. .into_iter()
  871. .enumerate()
  872. .map(|(i, region)| {
  873. let mut job = base.for_part(i + 1);
  874. job.region = Some(region);
  875. info!("Planned ClairS job:\n{job}");
  876. let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
  877. let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
  878. let germline = job.process_germline()?;
  879. Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
  880. })
  881. .collect();
  882. let outputs = exec_jobs(
  883. groups?,
  884. base.config.slurm_runner,
  885. base.config.slurm_max_par.into(),
  886. )?;
  887. for output in &outputs {
  888. output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
  889. }
  890. merge_clairs_parts(&base, actual_n_parts)?;
  891. merge_clairs_germline_parts(&base, actual_n_parts)?;
  892. if !config.clairs_keep_parts {
  893. let somatic_final = config.passed_vcf.clone();
  894. let germline_final = config.germline_passed_vcf.clone();
  895. anyhow::ensure!(
  896. Path::new(&somatic_final).exists(),
  897. "Refusing to clean up parts: merged somatic VCF not found at {}",
  898. somatic_final
  899. );
  900. anyhow::ensure!(
  901. Path::new(&germline_final).exists(),
  902. "Refusing to clean up parts: merged germline VCF not found at {}",
  903. germline_final
  904. );
  905. let base_dir = config.output_dir.clone();
  906. for i in 1..=actual_n_parts {
  907. let part_dir = format!("{base_dir}/part{i}");
  908. if Path::new(&part_dir).exists() {
  909. fs::remove_dir_all(&part_dir)
  910. .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
  911. debug!("Removed part directory: {part_dir}");
  912. }
  913. }
  914. info!("Cleaned up {} part directories for {}", actual_n_parts, id);
  915. }
  916. info!(
  917. "ClairS completed for {}: {} parts merged (somatic + germline)",
  918. id, actual_n_parts
  919. );
  920. Ok(())
  921. }
  922. fn part_is_complete(part: &ClairS) -> bool {
  923. Path::new(&part.somatic_passed_vcf_path()).exists()
  924. && Path::new(&format!("{}.csi", part.somatic_passed_vcf_path())).exists()
  925. && Path::new(&part.germline_passed_vcf_path()).exists()
  926. && Path::new(&format!("{}.csi", part.germline_passed_vcf_path())).exists()
  927. }
  928. fn run_chunked_resume(id: &str, config: &ClairsConfig, n_parts: usize) -> anyhow::Result<()> {
  929. anyhow::ensure!(n_parts > 0, "run_chunked_resume: n_parts must be > 0");
  930. let base = ClairS {
  931. id: id.to_string(),
  932. config: config.clone(),
  933. region: None,
  934. part_index: None,
  935. };
  936. let normal_bam = base.config.normal_bam.clone();
  937. let reader = bam::Reader::from_path(&normal_bam)
  938. .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
  939. let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
  940. let regions = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
  941. .into_iter()
  942. .flatten()
  943. .collect::<Vec<String>>();
  944. let actual_n_parts = regions.len();
  945. info!(
  946. "Resuming ClairS chunked run for {} with {} parts",
  947. id, actual_n_parts
  948. );
  949. let groups: anyhow::Result<Vec<Vec<Box<dyn AnyJob>>>> = regions
  950. .into_iter()
  951. .enumerate()
  952. .filter_map(|(i, region)| {
  953. let mut job = base.for_part(i + 1);
  954. job.region = Some(region);
  955. if part_is_complete(&job) {
  956. info!("Skipping completed ClairS part {} for {}", i + 1, id);
  957. return None;
  958. }
  959. info!(
  960. "Scheduling missing/incomplete ClairS part {}:\n{job}",
  961. i + 1
  962. );
  963. Some((|| {
  964. let (somatic_concat, somatic_tmp_path) = job.process_somatic_concat()?;
  965. let somatic_pass = job.process_somatic_pass(&somatic_tmp_path)?;
  966. let germline = job.process_germline()?;
  967. Ok(jobs_seq![job, germline, somatic_concat, somatic_pass])
  968. })())
  969. })
  970. .collect();
  971. let groups = groups?;
  972. if groups.is_empty() {
  973. info!("All ClairS parts already complete for {}", id);
  974. } else {
  975. let outputs = exec_jobs(
  976. groups,
  977. base.config.slurm_runner,
  978. base.config.slurm_max_par.into(),
  979. )?;
  980. for output in &outputs {
  981. output.save_to_file(format!("{}/clairs_", base.config.log_dir))?;
  982. }
  983. }
  984. merge_clairs_parts(&base, actual_n_parts)?;
  985. merge_clairs_germline_parts(&base, actual_n_parts)?;
  986. if !config.clairs_keep_parts {
  987. let somatic_final = config.passed_vcf.clone();
  988. let germline_final = config.germline_passed_vcf.clone();
  989. anyhow::ensure!(
  990. Path::new(&somatic_final).exists(),
  991. "Refusing to clean up parts: merged somatic VCF not found at {}",
  992. somatic_final
  993. );
  994. anyhow::ensure!(
  995. Path::new(&germline_final).exists(),
  996. "Refusing to clean up parts: merged germline VCF not found at {}",
  997. germline_final
  998. );
  999. let base_dir = config.output_dir.clone();
  1000. for i in 1..=actual_n_parts {
  1001. let part_dir = format!("{base_dir}/part{i}");
  1002. if Path::new(&part_dir).exists() {
  1003. fs::remove_dir_all(&part_dir)
  1004. .with_context(|| format!("Failed to remove part directory: {part_dir}"))?;
  1005. }
  1006. }
  1007. info!("Cleaned up {} part directories for {}", actual_n_parts, id);
  1008. }
  1009. info!(
  1010. "ClairS resumed/completed for {}: {} parts merged",
  1011. id, actual_n_parts
  1012. );
  1013. Ok(())
  1014. }
  1015. #[cfg(test)]
  1016. mod tests {
  1017. use super::*;
  1018. use crate::helpers::test_init;
  1019. #[test]
  1020. fn clairs_version() -> anyhow::Result<()> {
  1021. test_init();
  1022. let vl = ClairS::version(&Config::default())?;
  1023. info!("ClairS local version: {vl}");
  1024. let vs = ClairS::version_slurm(&Config::default())?;
  1025. info!("ClairS slurm version: {vs}");
  1026. assert_eq!(vl, vs);
  1027. Ok(())
  1028. }
  1029. #[test]
  1030. fn clairs_run() -> anyhow::Result<()> {
  1031. test_init();
  1032. let id = "CHAHA";
  1033. let config = Config::default();
  1034. let mut clairs = ClairS::initialize(id, &config)?;
  1035. crate::runners::Run::run(&mut clairs)?;
  1036. Ok(())
  1037. }
  1038. #[test]
  1039. fn clairs_resume() -> anyhow::Result<()> {
  1040. test_init();
  1041. let id = "CHAHA";
  1042. let config = Config::default();
  1043. let mut clairs = ClairS::initialize(id, &config)?;
  1044. clairs.run_chunked_resume(60);
  1045. Ok(())
  1046. }
  1047. }