clairs.rs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987
  1. //! # ClairS Somatic Variant Calling Pipeline
  2. //!
  3. //! This module provides a pipeline runner for [ClairS](https://github.com/HKU-BAL/ClairS),
  4. //! a deep learning-based somatic variant caller designed for long-read sequencing data
  5. //! (ONT and PacBio HiFi).
  6. //!
  7. //! ## Overview
  8. //!
  9. //! ClairS performs somatic variant calling on paired tumor-normal samples using:
  10. //!
  11. //! - Haplotype-aware variant calling with LongPhase integration
  12. //! - Separate SNV and indel calling pipelines
  13. //! - Clair3-based germline variant detection on the normal sample
  14. //!
  15. //! ## Key Features
  16. //!
  17. //! - **Deep learning-based** - CNN models trained on long-read data
  18. //! - **Haplotype-aware** - Uses phased germline variants for improved accuracy
  19. //! - **Dual output** - Somatic and germline variants in a single run
  20. //! - **Platform flexibility** - Supports ONT (R9/R10) and PacBio HiFi
  21. //! - **Parallel execution** - Genome chunking for HPC scalability
  22. //!
  23. //! ## Requirements
  24. //!
  25. //! Before running ClairS, ensure:
  26. //! - Tumor and normal BAMs are indexed (`.bai` files present)
  27. //! - Reference genome is accessible
  28. //! - Singularity/Docker image is available
  29. //! - Platform is correctly specified (`config.clairs_platform`)
  30. //!
  31. //! ## Execution Modes
  32. //!
  33. //! The module supports three execution strategies:
  34. //!
  35. //! - **Local** ([`ClairS::run_local`]) — Single-node execution, useful for debugging
  36. //! - **Slurm** ([`ClairS::run_sbatch`]) — Single HPC job submission
  37. //! - **Chunked** ([`run_clairs_chunked_sbatch_with_merge`]) — Parallel execution across genome regions
  38. //!
  39. //! ### Chunked Parallel Execution
  40. //!
  41. //! For large genomes, the chunked mode provides significant speedup:
  42. //!
  43. //! ```text
  44. //! ┌─────────────────────────────────────────────────────────────────┐
  45. //! │ Genome Splitting │
  46. //! │ chr1:1-50M │ chr1:50M-100M │ chr2:1-50M │ ... │ chrN │
  47. //! └──────┬───────┴────────┬────────┴───────┬──────┴───────┴────┬────┘
  48. //! │ │ │ │
  49. //! ▼ ▼ ▼ ▼
  50. //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
  51. //! │ ClairS │ │ ClairS │ │ ClairS │ ... │ ClairS │
  52. //! │ Part 1 │ │ Part 2 │ │ Part 3 │ │ Part N │
  53. //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
  54. //! │ │ │ │
  55. //! ▼ ▼ ▼ ▼
  56. //! ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
  57. //! │ Postprocess│ │ Postprocess│ │ Postprocess│ ... │ Postprocess│
  58. //! │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │ │ SNV+Indel │
  59. //! │ → PASS │ │ → PASS │ │ → PASS │ │ → PASS │
  60. //! └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
  61. //! │ │ │ │
  62. //! └───────────────┴───────┬───────┴───────────────────┘
  63. //! ▼
  64. //! ┌─────────────────────┐
  65. //! │ bcftools concat │
  66. //! │ (somatic PASS) │
  67. //! └──────────┬──────────┘
  68. //! ▼
  69. //! ┌─────────────────────┐
  70. //! │ Final VCF Output │
  71. //! └─────────────────────┘
  72. //! ```
  73. //!
  74. //! ## Output Files
  75. //!
  76. //! Somatic variants (PASS only):
  77. //! ```text
  78. //! {result_dir}/{id}/clairs/{id}_clairs.pass.vcf.gz
  79. //! ```
  80. //!
  81. //! Germline variants (PASS only):
  82. //! ```text
  83. //! {result_dir}/{id}/clairs/clair3_germline.pass.vcf.gz
  84. //! ```
  85. //!
  86. //! ## Post-processing Pipeline
  87. //!
  88. //! Each ClairS run (or part) undergoes automatic post-processing:
  89. //!
  90. //! 1. **Somatic**: Concatenate SNV + indel VCFs → filter PASS variants
  91. //! 2. **Germline**: Filter PASS variants from Clair3 germline output
  92. //!
  93. //! ## Usage
  94. //!
  95. //! ### Basic Slurm Execution
  96. //!
  97. //! ```ignore
  98. //! use crate::config::Config;
  99. //! use crate::pipes::clairs::ClairS;
  100. //! use crate::pipes::Initialize;
  101. //!
  102. //! let config = Config::default();
  103. //! let mut clairs = ClairS::initialize("sample_001", &config)?;
  104. //! let output = clairs.run_sbatch()?;
  105. //! # Ok::<(), anyhow::Error>(())
  106. //! ```
  107. //!
  108. //! ### Chunked Parallel Execution (Recommended for WGS)
  109. //!
  110. //! ```ignore
  111. //! use crate::config::Config;
  112. //! use crate::pipes::clairs::run_clairs_chunked_sbatch_with_merge;
  113. //!
  114. //! let config = Config::default();
  115. //! let outputs = run_clairs_chunked_sbatch_with_merge("sample_001", &config, 20)?;
  116. //! # Ok::<(), anyhow::Error>(())
  117. //! ```
  118. //!
  119. //! ### Loading Variants for Downstream Analysis
  120. //!
  121. //! ```ignore
  122. //! use crate::annotation::Annotations;
  123. //! use crate::variant::vcf_variant::Variants;
  124. //!
  125. //! let annotations = Annotations::new();
  126. //! let somatic = clairs.variants(&annotations)?;
  127. //! let germline = clairs.germline(&annotations)?;
  128. //!
  129. //! println!("Somatic: {} variants", somatic.variants.len());
  130. //! println!("Germline: {} variants", germline.variants.len());
  131. //! # Ok::<(), anyhow::Error>(())
  132. //! ```
  133. //!
  134. //! ## Configuration Requirements
  135. //!
  136. //! The following [`Config`](crate::config::Config) fields must be set:
  137. //!
  138. //! - `singularity_bin` — Path to Singularity executable
  139. //! - `clairs_image` — ClairS container image path
  140. //! - `reference` — Reference genome FASTA
  141. //! - `clairs_threads` — CPU threads per job
  142. //! - `clairs_platform` — Sequencing platform (`ont_r10_dorado_sup_4khz`, `hifi_revio`, etc.)
  143. //! - `clairs_force` — Force re-run even if outputs exist
  144. //!
  145. //! ## Implemented Traits
  146. //!
  147. //! - [`Initialize`](crate::pipes::Initialize) — Setup and directory creation
  148. //! - [`ShouldRun`](crate::pipes::ShouldRun) — Timestamp-based execution gating
  149. //! - [`JobCommand`](crate::commands::Command) — Command string generation
  150. //! - [`LocalRunner`](crate::commands::LocalRunner) — Local execution support
  151. //! - [`SbatchRunner`](crate::commands::SbatchRunner) — Slurm job submission
  152. //! - [`Run`](crate::runners::Run) — Unified execution interface
  153. //! - [`Variants`](crate::variant::vcf_variant::Variants) — Somatic variant loading
  154. //! - [`CallerCat`](crate::annotation::CallerCat) — Caller annotation category
  155. //! - [`Label`](crate::variant::vcf_variant::Label) — Human-readable identifier
  156. //! - [`Version`](crate::pipes::Version) — Tool version extraction
  157. //!
  158. //! ## Dependencies
  159. //!
  160. //! External tools (containerized or system):
  161. //!
  162. //! - **ClairS** — Somatic variant calling
  163. //! - **bcftools** — VCF concatenation and filtering
  164. //!
  165. //! ## References
  166. //!
  167. //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
  168. //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
  169. use crate::{
  170. annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
  171. Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, samtools::SamtoolsMergeMany
  172. }, config::Config, helpers::{
  173. get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists,
  174. singularity_bind_flags, split_genome_into_n_regions_exact,
  175. }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{
  176. variant_collection::VariantCollection,
  177. vcf_variant::{Label, Variants},
  178. }
  179. };
  180. use anyhow::Context;
  181. use log::{debug, info, warn};
  182. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  183. use regex::Regex;
  184. use rust_htslib::bam::{self, Read};
  185. use std::{
  186. fmt, fs,
  187. path::{Path, PathBuf},
  188. process::{Command as ProcessCommand, Stdio},
  189. };
  190. use uuid::Uuid;
  191. /// ClairS haplotype-aware somatic variant caller runner.
  192. ///
  193. /// Executes ClairS for paired tumor-normal variant calling with automatic
  194. /// post-processing (SNV+indel concatenation, PASS filtering, germline extraction).
  195. ///
  196. /// # Fields
  197. ///
  198. /// - `id` - Sample identifier (e.g., "34528")
  199. /// - `config` - Global pipeline configuration
  200. /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/clairs")
  201. /// - `region` - Optional genomic region for targeted calling (e.g., `Some("chr1:1000-2000")`)
  202. /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
  203. ///
  204. /// # Execution Modes
  205. ///
  206. /// - **Local** - Direct execution via `run_local()`
  207. /// - **Slurm** - Single job submission via `run_sbatch()`
  208. /// - **Chunked** - Parallel genome-wide execution via [`run_clairs_chunked_sbatch_with_merge`]
  209. ///
  210. /// # Output Files
  211. ///
  212. /// - Somatic PASS VCF: `{clairs_output_dir}/{id}_clairs.pass.vcf.gz`
  213. /// - Germline PASS VCF: `{clairs_output_dir}/clair3_germline.pass.vcf.gz`
  214. ///
  215. /// # Chunked Execution
  216. ///
  217. /// For whole-genome sequencing, use [`run_clairs_chunked_sbatch_with_merge`] which:
  218. /// 1. Splits genome into N equal-sized regions
  219. /// 2. Runs ClairS in parallel Slurm jobs (one per region)
  220. /// 3. Post-processes each part (concat SNV+indel, filter PASS)
  221. /// 4. Merges all part PASS VCFs into final output
  222. #[derive(Debug, Clone)]
  223. pub struct ClairS {
  224. /// Sample identifier
  225. pub id: String,
  226. /// Global pipeline configuration
  227. pub config: Config,
  228. /// Directory for log file storage
  229. pub log_dir: String,
  230. /// Optional genomic region restriction (format: "chr:start-end")
  231. pub region: Option<String>,
  232. /// Optional part index for chunked parallel runs (1-indexed)
  233. pub part_index: Option<usize>,
  234. }
  235. impl fmt::Display for ClairS {
  236. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  237. writeln!(f, "🧬 ClairS {}", self.id)?;
  238. writeln!(
  239. f,
  240. " Region : {}",
  241. self.region.as_deref().unwrap_or("genome-wide")
  242. )?;
  243. writeln!(
  244. f,
  245. " Part : {}",
  246. self.part_index
  247. .map_or("full".into(), |n| format!("part{n}"))
  248. )?;
  249. writeln!(f, " Log dir : {}", self.log_dir)
  250. }
  251. }
  252. impl Initialize for ClairS {
  253. fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
  254. let id = id.to_string();
  255. info!("Initializing ClairS for {id}");
  256. let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
  257. let clairs = Self {
  258. id,
  259. log_dir,
  260. config: config.clone(),
  261. region: None,
  262. part_index: None,
  263. };
  264. if clairs.config.clairs_force {
  265. remove_dir_if_exists(&clairs.config.clairs_output_dir(&clairs.id))?;
  266. }
  267. Ok(clairs)
  268. }
  269. }
  270. impl ShouldRun for ClairS {
  271. /// Determines whether ClairS should be re-run based on BAM modification timestamps.
  272. fn should_run(&self) -> bool {
  273. let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
  274. let normal_older =
  275. is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
  276. let tumor_older =
  277. is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
  278. let result = normal_older || tumor_older;
  279. if result {
  280. info!("ClairS should run for id: {}.", self.id);
  281. }
  282. result
  283. }
  284. }
  285. impl JobCommand for ClairS {
  286. fn init(&mut self) -> anyhow::Result<()> {
  287. let output_dir = self.part_output_dir();
  288. fs::create_dir_all(&output_dir)
  289. .with_context(|| format!("Failed to create dir: {output_dir}"))?;
  290. fs::create_dir_all(&self.log_dir)
  291. .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
  292. Ok(())
  293. }
  294. fn cmd(&self) -> String {
  295. let output_dir = self.part_output_dir();
  296. let region_arg = self
  297. .region
  298. .as_ref()
  299. .map(|r| format!("-r '{r}'"))
  300. .unwrap_or_default();
  301. let sample_name = format!("{}_diag", self.id);
  302. let bind_flags = singularity_bind_flags([
  303. &self.config.tumoral_bam(&self.id),
  304. &self.config.normal_bam(&self.id),
  305. &self.config.reference,
  306. &output_dir,
  307. &self.log_dir,
  308. &self.config.tmp_dir,
  309. ]);
  310. format!(
  311. "{singularity_bin} exec \
  312. {binds} \
  313. --bind {output_dir}:{output_dir} \
  314. {image} \
  315. /opt/bin/run_clairs \
  316. -T {tumor_bam} \
  317. -N {normal_bam} \
  318. -R {reference} \
  319. -t {threads} \
  320. -p {platform} \
  321. --enable_indel_calling \
  322. --include_all_ctgs \
  323. --print_germline_calls \
  324. --enable_clair3_germline_output \
  325. --use_longphase_for_intermediate_haplotagging true \
  326. --output_dir {output_dir} \
  327. -s {sample_name} \
  328. {region_arg}",
  329. singularity_bin = self.config.singularity_bin,
  330. binds = bind_flags,
  331. image = self.config.clairs_image,
  332. tumor_bam = self.config.tumoral_bam(&self.id),
  333. normal_bam = self.config.normal_bam(&self.id),
  334. reference = self.config.reference,
  335. threads = self.config.clairs_threads,
  336. platform = self.config.clairs_platform,
  337. output_dir = output_dir,
  338. sample_name = sample_name,
  339. region_arg = region_arg,
  340. )
  341. }
  342. }
  343. impl LocalRunner for ClairS {}
  344. impl LocalBatchRunner for ClairS {}
  345. impl SlurmRunner for ClairS {
  346. fn slurm_args(&self) -> Vec<String> {
  347. SlurmParams {
  348. job_name: Some(format!("clairs_{}", self.id)),
  349. cpus_per_task: Some(self.config.clairs_threads as u32),
  350. mem: Some("80G".into()),
  351. partition: Some("shortq".into()),
  352. gres: None,
  353. }
  354. .to_args()
  355. }
  356. }
  357. impl SbatchRunner for ClairS {
  358. fn slurm_params(&self) -> SlurmParams {
  359. SlurmParams {
  360. job_name: Some(format!("clairs_{}", self.id)),
  361. cpus_per_task: Some(self.config.clairs_threads as u32),
  362. mem: Some("70G".into()),
  363. partition: Some("shortq".into()),
  364. gres: None,
  365. }
  366. }
  367. }
  368. impl Run for ClairS {
  369. fn run(&mut self) -> anyhow::Result<()> {
  370. if !self.should_run() {
  371. info!("ClairS is up-to-date.");
  372. return Ok(());
  373. }
  374. // Acquire lock before any work
  375. let lock_dir = format!("{}/locks", self.config.result_dir);
  376. let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
  377. .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
  378. if self.config.slurm_runner {
  379. run_clairs_chunked(&self.id, &self.config, 50)?;
  380. // merge_haplotaged_tmp_bam(&self.config, &self.id)?;
  381. } else {
  382. run!(&self.config, self)?;
  383. let mut germline = self.process_germline()?;
  384. let report = run!(&self.config, &mut germline)
  385. .with_context(|| format!("Failed to filter germline PASS for {}", self.id))?;
  386. report
  387. .save_to_file(format!("{}/bcftools_germline_pass_", self.log_dir))
  388. .context("Failed to save germline PASS logs")?;
  389. let (mut concat, tmp_path) = self.process_somatic_concat()?;
  390. let (snv_vcf, indel_vcf) = self.config.clairs_output_vcfs(&self.id);
  391. let report = run!(&self.config, &mut concat)
  392. .with_context(|| format!("Failed to concat {} and {}", snv_vcf, indel_vcf))?;
  393. report
  394. .save_to_file(format!("{}/bcftools_concat_", self.log_dir))
  395. .context("Failed to save concat logs")?;
  396. let mut keep_pass = self.process_somatic_pass(&tmp_path)?;
  397. let report = run!(&self.config, &mut keep_pass)
  398. .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
  399. report
  400. .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
  401. .context("Failed to save PASS filter logs")?;
  402. // Clean up temporary concatenated VCF
  403. debug!("Removing temporary file: {}", tmp_path);
  404. if let Err(e) = fs::remove_file(&tmp_path) {
  405. warn!("Failed to remove temporary file {}: {}", tmp_path, e);
  406. }
  407. }
  408. Ok(())
  409. }
  410. }
  411. impl ClairS {
  412. /// Returns the germline PASS VCF path.
  413. ///
  414. /// - Part runs: `{part_dir}/clair3_germline.part{N}.pass.vcf.gz`
  415. /// - Full runs: canonical path from config
  416. fn germline_passed_vcf_path(&self) -> String {
  417. match self.part_index {
  418. Some(idx) => {
  419. let outdir = self.part_output_dir();
  420. format!("{outdir}/clair3_germline.part{idx}.pass.vcf.gz")
  421. }
  422. None => self.config.clairs_germline_passed_vcf(&self.id),
  423. }
  424. }
  425. fn process_germline(&self) -> anyhow::Result<BcftoolsKeepPass> {
  426. let germline_passed = self.germline_passed_vcf_path();
  427. let germline_input = match self.part_index {
  428. Some(_) => {
  429. let output_dir = self.part_output_dir();
  430. let base = self.config.clairs_germline_normal_vcf(&self.id);
  431. let base_name = Path::new(&base)
  432. .file_name()
  433. .expect("Germline VCF should have filename")
  434. .to_string_lossy();
  435. format!("{output_dir}/{base_name}")
  436. }
  437. None => self.config.clairs_germline_normal_vcf(&self.id),
  438. };
  439. info!(
  440. "Filtering germline PASS variants for {} part {:?}",
  441. self.id, self.part_index
  442. );
  443. let cmd =
  444. BcftoolsKeepPass::from_config(&self.config, germline_input, germline_passed.clone());
  445. Ok(cmd)
  446. }
  447. fn process_somatic_concat(&self) -> anyhow::Result<(BcftoolsConcat, String)> {
  448. let output_dir = self.part_output_dir();
  449. let (snv_vcf_base, indel_vcf_base) = self.config.clairs_output_vcfs(&self.id);
  450. let snv_vcf = format!(
  451. "{}/{}",
  452. output_dir,
  453. Path::new(&snv_vcf_base)
  454. .file_name()
  455. .expect("SNV VCF should have filename")
  456. .to_string_lossy()
  457. );
  458. let indel_vcf = format!(
  459. "{}/{}",
  460. output_dir,
  461. Path::new(&indel_vcf_base)
  462. .file_name()
  463. .expect("Indel VCF should have filename")
  464. .to_string_lossy()
  465. );
  466. info!(
  467. "Concatenating and filtering somatic variants for {} part {:?}",
  468. self.id, self.part_index
  469. );
  470. let tmp_file =
  471. Path::new(&self.config.tmp_dir).join(format!("pandora-temp-{}.vcf.gz", Uuid::new_v4()));
  472. let tmp_path = tmp_file
  473. .to_str()
  474. .context("Temp path not valid UTF-8")?
  475. .to_string();
  476. // Concat SNV + indel
  477. Ok((
  478. BcftoolsConcat::from_config(
  479. &self.config,
  480. vec![PathBuf::from(&snv_vcf), PathBuf::from(&indel_vcf)],
  481. &tmp_path,
  482. ),
  483. tmp_path,
  484. ))
  485. }
  486. pub fn process_somatic_pass(&self, tmp_path: &str) -> anyhow::Result<BcftoolsKeepPass> {
  487. let passed_vcf = self.somatic_passed_vcf_path();
  488. // Filter PASS
  489. let keep_pass = BcftoolsKeepPass::from_config(&self.config, tmp_path, passed_vcf.clone());
  490. Ok(keep_pass)
  491. }
  492. /// Returns the per-part output directory.
  493. fn part_output_dir(&self) -> String {
  494. let base_dir = self.config.clairs_output_dir(&self.id);
  495. match self.part_index {
  496. Some(idx) => format!("{base_dir}/part{idx}"),
  497. None => base_dir,
  498. }
  499. }
  500. /// Returns the somatic PASS VCF path.
  501. ///
  502. /// - Part runs: `{part_dir}/clairs.part{N}.pass.vcf.gz`
  503. /// - Full runs: canonical path from config
  504. fn somatic_passed_vcf_path(&self) -> String {
  505. match self.part_index {
  506. Some(idx) => {
  507. let outdir = self.part_output_dir();
  508. format!("{outdir}/clairs.part{idx}.pass.vcf.gz")
  509. }
  510. None => self.config.clairs_passed_vcf(&self.id),
  511. }
  512. }
  513. }
  514. impl CallerCat for ClairS {
  515. fn caller_cat(&self) -> Annotation {
  516. Annotation::Callers(Caller::ClairS, Sample::Somatic)
  517. }
  518. }
  519. impl Variants for ClairS {
  520. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  521. let caller = self.caller_cat();
  522. let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
  523. info!("Loading variants from {caller}: {passed_vcf}");
  524. let variants = read_vcf(passed_vcf)
  525. .with_context(|| format!("Failed to read ClairS VCF {passed_vcf}"))?;
  526. variants.par_iter().for_each(|v| {
  527. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  528. });
  529. info!("{caller}: {} variants loaded", variants.len());
  530. Ok(VariantCollection {
  531. variants,
  532. vcf: Vcf::new(passed_vcf.into())?,
  533. caller,
  534. })
  535. }
  536. }
  537. impl ClairS {
  538. /// Loads germline variants from the Clair3 germline output.
  539. pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  540. let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
  541. let germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
  542. info!("Loading germline variants from {caller}: {germline_passed}");
  543. let variants = read_vcf(germline_passed)?;
  544. variants.par_iter().for_each(|v| {
  545. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  546. });
  547. info!("{caller}: {} variants loaded", variants.len());
  548. Ok(VariantCollection {
  549. variants,
  550. vcf: Vcf::new(germline_passed.into())?,
  551. caller,
  552. })
  553. }
  554. }
  555. impl Label for ClairS {
  556. fn label(&self) -> String {
  557. self.caller_cat().to_string()
  558. }
  559. }
  560. impl Version for ClairS {
  561. fn version(config: &Config) -> anyhow::Result<String> {
  562. let out = ProcessCommand::new("bash")
  563. .arg("-c")
  564. .arg(format!(
  565. "{} exec {} /opt/bin/run_clairs --version",
  566. config.singularity_bin, config.clairs_image
  567. ))
  568. .stdout(Stdio::piped())
  569. .stderr(Stdio::piped())
  570. .output()
  571. .context("Failed to spawn Singularity process")?;
  572. if !out.status.success() {
  573. let combined = format!(
  574. "{}{}",
  575. String::from_utf8_lossy(&out.stdout),
  576. String::from_utf8_lossy(&out.stderr)
  577. );
  578. anyhow::bail!(
  579. "Singularity exec failed with status {}: {}",
  580. out.status,
  581. combined
  582. );
  583. }
  584. let combined = format!(
  585. "{}{}",
  586. String::from_utf8_lossy(&out.stdout),
  587. String::from_utf8_lossy(&out.stderr)
  588. );
  589. parse_clairs_version(&combined)
  590. }
  591. fn version_slurm(config: &Config) -> anyhow::Result<String> {
  592. struct ClairSVersionJob<'a> {
  593. config: &'a Config,
  594. }
  595. impl JobCommand for ClairSVersionJob<'_> {
  596. fn cmd(&self) -> String {
  597. format!(
  598. "{} exec {} /opt/bin/run_clairs --version",
  599. self.config.singularity_bin, self.config.clairs_image
  600. )
  601. }
  602. }
  603. impl SlurmRunner for ClairSVersionJob<'_> {
  604. fn slurm_args(&self) -> Vec<String> {
  605. SlurmParams {
  606. job_name: Some("clairs_version".into()),
  607. partition: Some("shortq".into()),
  608. cpus_per_task: Some(1),
  609. mem: Some("10G".into()),
  610. gres: None,
  611. }
  612. .to_args()
  613. }
  614. }
  615. let mut job = ClairSVersionJob { config };
  616. let out =
  617. SlurmRunner::exec(&mut job).context("Failed to run ClairS --version via Slurm")?;
  618. let mut combined = out.stdout.clone();
  619. if let Some(epilog) = &out.slurm_epilog {
  620. combined.push_str(&epilog.to_string());
  621. }
  622. combined.push_str(&out.stderr);
  623. parse_clairs_version(&combined)
  624. }
  625. }
  626. /// Parses ClairS version from command output.
  627. fn parse_clairs_version(output: &str) -> anyhow::Result<String> {
  628. let re = Regex::new(r"(?m)run_clairs\s+([^\s]+)").expect("Version regex is valid");
  629. let caps = re
  630. .captures(output)
  631. .context("Could not parse ClairS version from output")?;
  632. Ok(caps
  633. .get(1)
  634. .expect("Regex has capture group 1")
  635. .as_str()
  636. .to_string())
  637. }
  638. /// Merges N chunked ClairS PASS VCFs into the final output.
  639. fn merge_clairs_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
  640. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  641. for i in 1..=n_parts {
  642. let mut part = base.clone();
  643. part.part_index = Some(i);
  644. let part_pass = part.somatic_passed_vcf_path();
  645. anyhow::ensure!(
  646. Path::new(&part_pass).exists(),
  647. "Missing ClairS part {} PASS VCF: {}",
  648. i,
  649. part_pass
  650. );
  651. part_pass_paths.push(PathBuf::from(part_pass));
  652. }
  653. let final_passed_vcf = base.config.clairs_passed_vcf(&base.id);
  654. let rand = Uuid::new_v4();
  655. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  656. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  657. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  658. fs::create_dir_all(parent)?;
  659. }
  660. info!(
  661. "Concatenating {} ClairS part VCFs into {}",
  662. n_parts, final_passed_vcf
  663. );
  664. let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
  665. run!(&base.config, &mut concat).context("Failed to run bcftools concat for ClairS parts")?;
  666. fs::rename(&final_tmp, &final_passed_vcf).context("Failed to rename merged ClairS PASS VCF")?;
  667. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  668. .context("Failed to rename merged ClairS PASS VCF CSI")?;
  669. info!(
  670. "Successfully merged {} ClairS parts into {}",
  671. n_parts, final_passed_vcf
  672. );
  673. Ok(())
  674. }
  675. fn merge_clairs_germline_parts(base: &ClairS, n_parts: usize) -> anyhow::Result<()> {
  676. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  677. for i in 1..=n_parts {
  678. let mut part = base.clone();
  679. part.part_index = Some(i);
  680. let part_pass = part.germline_passed_vcf_path();
  681. anyhow::ensure!(
  682. Path::new(&part_pass).exists(),
  683. "Missing ClairS germline part {} PASS VCF: {}",
  684. i,
  685. part_pass
  686. );
  687. part_pass_paths.push(PathBuf::from(part_pass));
  688. }
  689. let final_passed_vcf = base.config.clairs_germline_passed_vcf(&base.id);
  690. let rand = Uuid::new_v4();
  691. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  692. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  693. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  694. fs::create_dir_all(parent)?;
  695. }
  696. info!(
  697. "Concatenating {} ClairS germline part VCFs into {}",
  698. n_parts, final_passed_vcf
  699. );
  700. let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
  701. run!(&base.config, &mut concat)
  702. .context("Failed to run bcftools concat for ClairS germline parts")?;
  703. fs::rename(&final_tmp, &final_passed_vcf)
  704. .context("Failed to rename merged ClairS germline PASS VCF")?;
  705. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  706. .context("Failed to rename merged ClairS germline PASS VCF CSI")?;
  707. info!(
  708. "Successfully merged {} ClairS germline parts into {}",
  709. n_parts, final_passed_vcf
  710. );
  711. Ok(())
  712. }
  713. pub fn merge_haplotaged_tmp_bam(config: &Config, id: &str) -> anyhow::Result<()> {
  714. let dir = config.clairs_output_dir(id);
  715. let bams: Vec<PathBuf> = list_files_recursive(dir)
  716. .into_iter()
  717. .filter(|f| f.extension().and_then(|e| e.to_str()) == Some("bam"))
  718. .collect();
  719. let into = Path::new(&config.tumoral_haplotagged_bam(id)).to_path_buf();
  720. info!("Merging {} into: {}", bams.len(), into.display());
  721. let mut merge = SamtoolsMergeMany::from_config(into, bams, config);
  722. run!(config, &mut merge)?;
  723. Ok(())
  724. }
  725. /// Runs ClairS in parallel chunks, then merges results.
  726. ///
  727. /// Splits the genome into N equal-sized regions, runs ClairS on each region
  728. /// in parallel (local or Slurm based on `config.slurm_runner`), post-processes
  729. /// each part (concatenates SNV+indel, filters PASS), and merges both somatic
  730. /// and germline VCFs.
  731. ///
  732. /// # Arguments
  733. ///
  734. /// * `id` - Sample identifier
  735. /// * `config` - Global pipeline configuration
  736. /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
  737. ///
  738. /// # Returns
  739. ///
  740. /// `Ok(())` on success, or an error if any step fails.
  741. ///
  742. /// # Errors
  743. ///
  744. /// Returns an error if:
  745. /// - `n_parts` is 0
  746. /// - Normal BAM file cannot be opened or is corrupted
  747. /// - BAM header is malformed
  748. /// - ClairS execution fails on any part
  749. /// - SNV+indel concatenation fails
  750. /// - PASS filtering fails
  751. /// - Somatic or germline VCF merging fails
  752. /// - Output directory cannot be created
  753. ///
  754. /// # Example
  755. ///
  756. /// ```ignore
  757. /// let config = Config::default();
  758. /// run_clairs_chunked("sample_001", &config, 30)?;
  759. /// ```
  760. pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
  761. anyhow::ensure!(n_parts > 0, "run_clairs_chunked: n_parts must be > 0");
  762. let lock_dir = format!("{}/locks", config.result_dir);
  763. let _lock = SampleLock::acquire(&lock_dir, id, "clairs")
  764. .with_context(|| format!("Cannot start ClairS chunked for {}", id))?;
  765. let base = ClairS::initialize(id, config)?;
  766. if !base.should_run() {
  767. debug!("ClairS PASS VCF already up-to-date for {id}, skipping.");
  768. return Ok(());
  769. }
  770. let normal_bam = config.normal_bam(id);
  771. let reader = bam::Reader::from_path(&normal_bam)
  772. .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;
  773. let header = bam::Header::from_template(reader.header());
  774. let genome_sizes = get_genome_sizes(&header)?;
  775. let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
  776. .into_iter()
  777. .flatten()
  778. .collect::<Vec<String>>();
  779. let actual_n_parts = regions.len();
  780. info!(
  781. "Running ClairS in {} parallel parts for {}",
  782. actual_n_parts, id
  783. );
  784. let mut jobs = Vec::with_capacity(actual_n_parts);
  785. for (i, region) in regions.into_iter().enumerate() {
  786. let mut job = base.clone();
  787. job.part_index = Some(i + 1);
  788. job.region = Some(region);
  789. job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
  790. info!("Planned ClairS job:\n{job}");
  791. jobs.push(job);
  792. }
  793. let outputs = run_many!(config, jobs.clone())?;
  794. for output in outputs.iter() {
  795. output.save_to_file(format!("{}/clairs_", base.log_dir))?;
  796. }
  797. // 1) Germline PASS filters (one per part)
  798. let mut germlines = Vec::new();
  799. // 2a) Somatic SNV+indel concats (one per part)
  800. let mut somatic_concats = Vec::new();
  801. let mut somatic_tmp_paths = Vec::new();
  802. for job in &jobs {
  803. // germline PASS
  804. germlines.push(job.process_germline()?);
  805. // somatic concat (SNV+indel -> tmp VCF)
  806. let (concat, tmp_path) = job.process_somatic_concat()?;
  807. somatic_concats.push(concat);
  808. somatic_tmp_paths.push(tmp_path);
  809. }
  810. // Run all germline PASS filters in parallel
  811. run_many!(config, germlines)?;
  812. // Run all somatic concats in parallel
  813. run_many!(config, somatic_concats)?;
  814. // 2b) Somatic PASS filters (one per part, based on tmp VCFs)
  815. let mut somatic_pass_filters = Vec::new();
  816. for (job, tmp_path) in jobs.iter().zip(somatic_tmp_paths.iter()) {
  817. somatic_pass_filters.push(job.process_somatic_pass(tmp_path)?);
  818. }
  819. // Run all somatic PASS filters in parallel
  820. run_many!(config, somatic_pass_filters)?;
  821. merge_clairs_parts(&base, actual_n_parts)?;
  822. merge_clairs_germline_parts(&base, actual_n_parts)?;
  823. info!(
  824. "ClairS completed for {}: {} parts merged (somatic + germline)",
  825. id, actual_n_parts
  826. );
  827. Ok(())
  828. }
  829. #[cfg(test)]
  830. mod tests {
  831. use super::*;
  832. use crate::helpers::test_init;
  833. #[test]
  834. fn clairs_version() -> anyhow::Result<()> {
  835. test_init();
  836. let vl = ClairS::version(&Config::default())?;
  837. info!("ClairS local version: {vl}");
  838. let vs = ClairS::version_slurm(&Config::default())?;
  839. info!("ClairS slurm version: {vs}");
  840. assert_eq!(vl, vs);
  841. Ok(())
  842. }
  843. #[test]
  844. fn clairs_run() -> anyhow::Result<()> {
  845. test_init();
  846. let config = Config::default();
  847. // let clairs = ClairS::initialize("34528", &config)?;
  848. // info!("{clairs}");
  849. run_clairs_chunked("CHAHA", &config, 50)?;
  850. Ok(())
  851. }
  852. #[test]
  853. fn clairs_haplo() -> anyhow::Result<()> {
  854. test_init();
  855. let config = Config::default();
  856. merge_haplotaged_tmp_bam(&config, "DUMCO")?;
  857. Ok(())
  858. }
  859. }