|
|
@@ -1,9 +1,9 @@
|
|
|
use crate::{
|
|
|
annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
|
|
|
- collection::{vcf::Vcf, Initialize},
|
|
|
+ collection::{vcf::Vcf, Initialize, ShouldRun},
|
|
|
commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
|
|
|
config::Config,
|
|
|
- helpers::{force_or_not, temp_file_path},
|
|
|
+ helpers::{is_file_older, temp_file_path},
|
|
|
io::vcf::read_vcf,
|
|
|
runners::{run_wait, DockerRun, Run},
|
|
|
variant::{
|
|
|
@@ -16,82 +16,103 @@ use log::{debug, info};
|
|
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|
|
use std::{fs, path::Path};
|
|
|
|
|
|
+/// A pipeline runner for executing ClairS on paired tumor and normal samples.
|
|
|
+///
|
|
|
+/// ClairS is a somatic variant caller that uses haplotype tagging from LongPhase.
|
|
|
+/// This struct manages:
|
|
|
+/// - Dockerized execution of the ClairS pipeline
|
|
|
+/// - Handling and filtering of output VCFs
|
|
|
+/// - Logging and diagnostic tracking
|
|
|
+/// - Integration with variant annotation workflows
|
|
|
#[derive(Debug, Clone)]
|
|
|
pub struct ClairS {
|
|
|
pub id: String,
|
|
|
- pub output_dir: String,
|
|
|
- pub output_vcf: String,
|
|
|
- pub output_indels_vcf: String,
|
|
|
- pub vcf_passed: String,
|
|
|
- pub diag_bam: String,
|
|
|
- pub mrd_bam: String,
|
|
|
- pub log_dir: String,
|
|
|
pub config: Config,
|
|
|
- pub clair3_germline_normal: String,
|
|
|
- pub clair3_germline_tumor: String,
|
|
|
- pub clair3_germline_passed: String,
|
|
|
+ pub log_dir: String,
|
|
|
}
|
|
|
|
|
|
impl Initialize for ClairS {
|
|
|
+ /// Initializes the ClairS runner.
|
|
|
+ ///
|
|
|
+ /// This method constructs a `ClairS` instance with logging and configuration setup,
|
|
|
+ /// and ensures the output directory is cleaned up if the results are outdated or force execution is enabled.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `id` - The identifier for the sample being analyzed.
|
|
|
+ /// * `config` - Pipeline-wide configuration object containing paths, resources, and settings.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// A fully initialized `ClairS` instance ready for execution.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if the output directory fails to be removed when necessary.
|
|
|
+ ///
|
|
|
+ /// If the output VCF already exists and is not outdated, initialization skips deletion.
|
|
|
+ /// Otherwise, the output directory is cleared for a fresh run.
|
|
|
fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
let id = id.to_string();
|
|
|
info!("Initialize ClairS for {id}.");
|
|
|
let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
|
|
|
|
|
|
- if !Path::new(&log_dir).exists() {
|
|
|
- fs::create_dir_all(&log_dir)
|
|
|
- .context(format!("Failed to create {log_dir} directory"))?;
|
|
|
- }
|
|
|
-
|
|
|
- let output_dir = config.clairs_output_dir(&id);
|
|
|
- fs::create_dir_all(&output_dir).context(format!("Can't create dir: {output_dir}"))?;
|
|
|
-
|
|
|
- let (output_vcf, output_indels_vcf) = config.clairs_output_vcfs(&id);
|
|
|
- let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz");
|
|
|
-
|
|
|
- let diag_bam = config.tumoral_bam(&id);
|
|
|
- let mrd_bam = config.normal_bam(&id);
|
|
|
-
|
|
|
- let clair3_germline_normal = config.clairs_germline_normal_vcf(&id);
|
|
|
- let clair3_germline_tumor = config.clairs_germline_tumor_vcf(&id);
|
|
|
- let clair3_germline_passed = config.clairs_germline_passed_vcf(&id);
|
|
|
-
|
|
|
- Ok(Self {
|
|
|
+ let clairs = Self {
|
|
|
id,
|
|
|
- output_dir,
|
|
|
- output_vcf,
|
|
|
- output_indels_vcf,
|
|
|
- vcf_passed,
|
|
|
- diag_bam,
|
|
|
- mrd_bam,
|
|
|
log_dir,
|
|
|
config,
|
|
|
- clair3_germline_normal,
|
|
|
- clair3_germline_tumor,
|
|
|
- clair3_germline_passed,
|
|
|
- })
|
|
|
+ };
|
|
|
+
|
|
|
+ let passed_vcf = clairs.config.clairs_passed_vcf(&clairs.id);
|
|
|
+ if (clairs.config.clairs_force && Path::new(&passed_vcf).exists()) || clairs.should_run() {
|
|
|
+ fs::remove_dir_all(clairs.config.clairs_output_dir(&clairs.id))?;
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(clairs)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl ShouldRun for ClairS {
|
|
|
+ /// Determines whether ClairS should be re-run based on BAM modification timestamps.
|
|
|
+ fn should_run(&self) -> bool {
|
|
|
+ let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
|
|
|
+ is_file_older(passed_vcf, &self.config.normal_bam(&self.id)).unwrap_or(true)
|
|
|
+ || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id)).unwrap_or(true)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl Run for ClairS {
|
|
|
+ /// Executes the ClairS variant calling pipeline and post-processes its output.
|
|
|
+ ///
|
|
|
+ /// # Pipeline Overview
|
|
|
+ /// - Runs ClairS in a Docker container using paired tumor and normal BAMs
|
|
|
+ /// - Generates both somatic and germline variant VCFs
|
|
|
+ /// - Applies bcftools filtering to keep only PASS variants
|
|
|
+ /// - Concatenates separate VCFs (e.g., SNPs and INDELs) into a single somatic file
|
|
|
+ /// - Tracks all operations via logs saved to disk
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if:
|
|
|
+ /// - Docker execution fails
|
|
|
+ /// - bcftools fails to process or filter VCFs
|
|
|
+ /// - Temporary files can't be removed or written
|
|
|
fn run(&mut self) -> anyhow::Result<()> {
|
|
|
- force_or_not(&self.vcf_passed, self.config.clairs_force)?;
|
|
|
-
|
|
|
// Run Docker command if output VCF doesn't exist
|
|
|
- if !Path::new(&self.output_vcf).exists() || !Path::new(&self.output_indels_vcf).exists() {
|
|
|
+ let (output_vcf, output_indels_vcf) = self.config.clairs_output_vcfs(&self.id);
|
|
|
+ if !Path::new(&output_vcf).exists() || !Path::new(&output_indels_vcf).exists() {
|
|
|
+ let output_dir = self.config.clairs_output_dir(&self.id);
|
|
|
+ fs::create_dir_all(&output_dir).context(format!("Failed create dir: {output_dir}"))?;
|
|
|
+
|
|
|
let mut docker_run = DockerRun::new(&[
|
|
|
"run",
|
|
|
"-d",
|
|
|
"-v",
|
|
|
"/data:/data",
|
|
|
"-v",
|
|
|
- &format!("{}:{}", self.output_dir, self.output_dir),
|
|
|
+ &format!("{}:{}", output_dir, output_dir),
|
|
|
"hkubal/clairs:latest",
|
|
|
"/opt/bin/run_clairs",
|
|
|
"-T",
|
|
|
- &self.diag_bam,
|
|
|
+ &self.config.tumoral_bam(&self.id),
|
|
|
"-N",
|
|
|
- &self.mrd_bam,
|
|
|
+ &self.config.normal_bam(&self.id),
|
|
|
"-R",
|
|
|
&self.config.reference,
|
|
|
"-t",
|
|
|
@@ -105,34 +126,40 @@ impl Run for ClairS {
|
|
|
"--use_longphase_for_intermediate_haplotagging",
|
|
|
"true",
|
|
|
"--output_dir",
|
|
|
- &self.output_dir,
|
|
|
+ &output_dir,
|
|
|
"-s",
|
|
|
&format!("{}_diag", self.id),
|
|
|
]);
|
|
|
- let report = run_wait(&mut docker_run).context(format!(
|
|
|
- "Error while running ClairS in docker for {} and {}",
|
|
|
- &self.diag_bam, &self.mrd_bam
|
|
|
- ))?;
|
|
|
+ let report = run_wait(&mut docker_run)
|
|
|
+ .context(format!("Failed to run ClairS for {}", &self.id))?;
|
|
|
|
|
|
let log_file = format!("{}/clairs_", self.log_dir);
|
|
|
report
|
|
|
.save_to_file(&log_file)
|
|
|
.context(format!("Error while writing logs into {log_file}"))?;
|
|
|
} else {
|
|
|
- debug!("ClairS VCFs exist.");
|
|
|
+ debug!(
|
|
|
+ "ClairS output VCF already exists for {}, skipping execution.",
|
|
|
+ self.id
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- // Germline
|
|
|
- if !Path::new(&self.clair3_germline_passed).exists() {
|
|
|
+ // Germline PASS
|
|
|
+ let clair3_germline_passed = self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
+ if !Path::new(&clair3_germline_passed).exists() {
|
|
|
+ let clair3_germline_normal = self.config.clairs_germline_normal_vcf(&self.id);
|
|
|
+ // let clair3_germline_tumor = self.config.clairs_germline_tumor_vcf(&self.id);
|
|
|
+
|
|
|
let report = bcftools_keep_pass(
|
|
|
- &self.clair3_germline_normal,
|
|
|
- &self.clair3_germline_passed,
|
|
|
+ &clair3_germline_normal,
|
|
|
+ &clair3_germline_passed,
|
|
|
BcftoolsConfig::default(),
|
|
|
)
|
|
|
.context(format!(
|
|
|
"Error while running bcftools keep PASS for {}",
|
|
|
- &self.clair3_germline_passed
|
|
|
+ &clair3_germline_passed
|
|
|
))?;
|
|
|
+
|
|
|
let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
|
report
|
|
|
.save_to_file(&log_file)
|
|
|
@@ -140,34 +167,35 @@ impl Run for ClairS {
|
|
|
|
|
|
// fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
|
|
|
} else {
|
|
|
- debug!("ClairS germline VCF exists.");
|
|
|
+ debug!(
|
|
|
+ "ClairS Germline PASSED VCF already exists for {}, skipping execution.",
|
|
|
+ self.id
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- if !Path::new(&self.vcf_passed).exists() {
|
|
|
+ let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
|
|
|
+ if !Path::new(&passed_vcf).exists() {
|
|
|
// Concat output and indels
|
|
|
let tmp_file = temp_file_path(".vcf.gz")?.to_str().unwrap().to_string();
|
|
|
let report = bcftools_concat(
|
|
|
- vec![
|
|
|
- self.output_vcf.to_string(),
|
|
|
- self.output_indels_vcf.to_string(),
|
|
|
- ],
|
|
|
+ vec![output_vcf.to_string(), output_indels_vcf.to_string()],
|
|
|
&tmp_file,
|
|
|
BcftoolsConfig::default(),
|
|
|
)
|
|
|
.context(format!(
|
|
|
- "Error while running bcftools concat for {} and {}",
|
|
|
- &self.output_vcf, &self.output_indels_vcf
|
|
|
+ "Failed to run bcftools concat for {} and {}",
|
|
|
+ &output_vcf, &output_indels_vcf
|
|
|
))?;
|
|
|
let log_file = format!("{}/bcftools_concat_", self.log_dir);
|
|
|
report
|
|
|
.save_to_file(&log_file)
|
|
|
.context(format!("Error while writing logs into {log_file}"))?;
|
|
|
|
|
|
- let report = bcftools_keep_pass(&tmp_file, &self.vcf_passed, BcftoolsConfig::default())
|
|
|
- .context(format!(
|
|
|
- "Error while running bcftools keep PASS for {}",
|
|
|
- &self.output_vcf
|
|
|
- ))?;
|
|
|
+ let report =
|
|
|
+ bcftools_keep_pass(&tmp_file, passed_vcf, BcftoolsConfig::default()).context(
|
|
|
+ format!("Error while running bcftools keep PASS for {}", &output_vcf),
|
|
|
+ )?;
|
|
|
+
|
|
|
let log_file = format!("{}/bcftools_pass_", self.log_dir);
|
|
|
report
|
|
|
.save_to_file(&log_file)
|
|
|
@@ -175,7 +203,10 @@ impl Run for ClairS {
|
|
|
|
|
|
fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
|
|
|
} else {
|
|
|
- debug!("ClairS PASSED vcf exists.");
|
|
|
+ debug!(
|
|
|
+ "ClairS PASSED VCF already exists for {}, skipping execution.",
|
|
|
+ self.id
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
Ok(())
|
|
|
@@ -183,18 +214,34 @@ impl Run for ClairS {
|
|
|
}
|
|
|
|
|
|
impl CallerCat for ClairS {
|
|
|
+ /// Tags this runner as somatic, used for annotation classification.
|
|
|
fn caller_cat(&self) -> Annotation {
|
|
|
Annotation::Callers(Caller::ClairS, Sample::Somatic)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl Variants for ClairS {
|
|
|
+ /// Loads and annotates somatic variants from the ClairS filtered VCF.
|
|
|
+ ///
|
|
|
+ /// This method reads the filtered PASS VCF file generated by ClairS for somatic variants.
|
|
|
+ /// It tags each variant with the ClairS somatic annotation and adds it to the shared `annotations` map.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `annotations` - A reference to the global annotations structure used to store variant metadata.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// A `VariantCollection` with the list of variants, the source VCF file, and the associated caller tag.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Will return an error if the VCF file is unreadable, missing, or malformed.
|
|
|
fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = self.caller_cat();
|
|
|
let add = vec![caller.clone()];
|
|
|
- info!("Loading variants from {}: {}", caller, self.vcf_passed);
|
|
|
- let variants = read_vcf(&self.vcf_passed)
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to read ClairS VCF {}.\n{e}", self.vcf_passed))?;
|
|
|
+ let passed_vcf = &self.config.clairs_passed_vcf(&self.id);
|
|
|
+
|
|
|
+ info!("Loading variants from {}: {}", caller, passed_vcf);
|
|
|
+ let variants = read_vcf(passed_vcf)
|
|
|
+ .map_err(|e| anyhow::anyhow!("Failed to read ClairS VCF {}.\n{e}", passed_vcf))?;
|
|
|
|
|
|
variants.par_iter().for_each(|v| {
|
|
|
annotations.insert_update(v.hash(), &add);
|
|
|
@@ -203,19 +250,37 @@ impl Variants for ClairS {
|
|
|
|
|
|
Ok(VariantCollection {
|
|
|
variants,
|
|
|
- vcf: Vcf::new(self.vcf_passed.clone().into())?,
|
|
|
+ vcf: Vcf::new(passed_vcf.into())?,
|
|
|
caller,
|
|
|
})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl ClairS {
|
|
|
+ /// Loads and annotates germline variants from the ClairS germline output.
|
|
|
+ ///
|
|
|
+ /// This function loads a pre-filtered VCF file containing germline variants called by ClairS.
|
|
|
+ /// It updates the provided `annotations` structure with a tag indicating these are germline variants.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `annotations` - A shared annotation structure to update with variant hashes and tags.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// A [`VariantCollection`] object containing the loaded variants, associated VCF metadata, and caller category.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Will return an error if the VCF file cannot be read or parsed.
|
|
|
pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
|
|
|
let caller = Annotation::Callers(Caller::ClairS, Sample::Germline);
|
|
|
let add = vec![caller.clone()];
|
|
|
- info!("Loading variants from {}: {}", caller, self.vcf_passed);
|
|
|
+ let clair3_germline_passed = &self.config.clairs_germline_passed_vcf(&self.id);
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "Loading variants from {}: {}",
|
|
|
+ caller, clair3_germline_passed
|
|
|
+ );
|
|
|
|
|
|
- let variants = read_vcf(&self.clair3_germline_passed)?;
|
|
|
+ let variants = read_vcf(clair3_germline_passed)?;
|
|
|
variants.par_iter().for_each(|v| {
|
|
|
annotations.insert_update(v.hash(), &add);
|
|
|
});
|
|
|
@@ -223,10 +288,11 @@ impl ClairS {
|
|
|
|
|
|
Ok(VariantCollection {
|
|
|
variants,
|
|
|
- vcf: Vcf::new(self.clair3_germline_passed.clone().into())?,
|
|
|
+ vcf: Vcf::new(clair3_germline_passed.into())?,
|
|
|
caller,
|
|
|
})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// Marker trait implementation to signal ClairS supports variant export.
|
|
|
impl RunnerVariants for ClairS {}
|