| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
- collection::{vcf::Vcf, Initialize, ShouldRun, Version},
- commands::{
- bcftools::{bcftools_keep_pass, BcftoolsConfig},
- longphase::{LongphaseConfig, LongphaseHap, LongphasePhase},
- },
- config::Config,
- helpers::is_file_older,
- io::{readers::get_gz_reader, vcf::read_vcf},
- positions::{num_to_contig, GenomeRange},
- runners::{run_wait, CommandRun, Run},
- variant::{
- variant::{RunnerVariants, Variants},
- variant_collection::VariantCollection,
- },
- };
- use anyhow::Context;
- use itertools::Itertools;
- use log::{debug, info};
- use rayon::prelude::*;
- use serde::{Deserialize, Serialize};
- use std::{
- fs::{self},
- io::BufRead,
- path::Path,
- str::FromStr,
- };
- /// The `Savana` struct orchestrates the haplotype-aware somatic variant calling
- /// pipeline using the Savana tool. It manages initialization, conditional execution,
- /// phasing dependencies, haplotagging, and output filtering.
- #[derive(Debug)]
- pub struct Savana {
- pub id: String,
- pub config: Config,
- pub log_dir: String,
- }
- impl Initialize for Savana {
- /// Initializes a new `Savana` instance for a given sample ID and configuration.
- ///
- /// If `savana_force` is enabled in [`Config`] and the output VCF exists, or if the run
- /// should be re-triggered (based on file freshness), the output directory
- /// will be cleaned.
- ///
- /// # Arguments
- ///
- /// * `id` - Sample identifier
- /// * `config` - Pipeline execution configuration
- ///
- /// # Returns
- ///
- /// A new `Savana` instance, or an error if cleanup fails.
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
- info!("Initialize Savana for {id}.");
- let log_dir = format!("{}/{}/log/savana", config.result_dir, id);
- let savana = Self {
- id: id.to_string(),
- config,
- log_dir,
- };
- let output_vcf_exists = Path::new(&savana.config.savana_output_vcf(id)).exists();
- if (savana.config.savana_force && output_vcf_exists) || savana.should_run() {
- fs::remove_dir_all(savana.config.savana_output_dir(id))?;
- }
- Ok(savana)
- }
- }
- impl Run for Savana {
- /// Executes the Savana pipeline, including prerequisite phasing and haplotagging steps.
- ///
- /// If the output VCF does not already exist, it ensures that the phased VCF and
- /// haplotagged BAMs exist (or runs the modules to produce them), then runs Savana.
- /// Finally, it filters the resulting VCF to retain only PASS variants.
- ///
- /// # Returns
- ///
- /// `Ok(())` if the run completes successfully, or an error otherwise.
- fn run(&mut self) -> anyhow::Result<()> {
- let id = &self.id;
- let output_vcf = &self.config.savana_output_vcf(id);
- if !Path::new(&output_vcf).exists() {
- info!("Running Savana v{}", Savana::version(&self.config)?);
- let output_dir = self.config.savana_output_dir(id);
- fs::create_dir_all(&output_dir).with_context(|| {
- format!("Failed to create output dir for Savana run: {output_dir}")
- })?;
- // Check for phased germline vcf
- // no required anymore since >= 1.3.0
- let phased_germline_vcf = self.config.constit_phased_vcf(id);
- if !Path::new(&phased_germline_vcf).exists() {
- LongphasePhase::initialize(id, self.config.clone())?.run()?;
- }
- let normal_hp_bam = self.config.normal_haplotagged_bam(id);
- let tumoral_hp_bam = self.config.tumoral_haplotagged_bam(id);
- // Check for haplotagged bam
- if !Path::new(&normal_hp_bam).exists() {
- LongphaseHap::new(
- id,
- &self.config.normal_bam(id),
- &phased_germline_vcf,
- LongphaseConfig::default(),
- )
- .run()?;
- }
- if !Path::new(&tumoral_hp_bam).exists() {
- LongphaseHap::new(
- id,
- &self.config.tumoral_bam(id),
- &phased_germline_vcf,
- LongphaseConfig::default(),
- )
- .run()?;
- }
- let savana_args = [
- // "run",
- "--tumour",
- &tumoral_hp_bam,
- "--normal",
- &normal_hp_bam,
- "--outdir",
- &self.config.savana_output_dir(id),
- "--ref",
- &self.config.reference,
- "--snp_vcf",
- &phased_germline_vcf,
- "--no_blacklist",
- "--threads",
- &self.config.savana_threads.to_string(),
- ];
- let args = [
- "-c",
- &format!(
- "source {} && conda activate savana && {} {}",
- self.config.conda_sh,
- self.config.savana_bin,
- savana_args.join(" ")
- ),
- ];
- let mut cmd_run = CommandRun::new("bash", &args);
- let report = run_wait(&mut cmd_run).context(format!(
- "Error while running `severus.py {}`",
- args.join(" ")
- ))?;
- let log_file = format!("{}/savana_", self.log_dir);
- report
- .save_to_file(&log_file)
- .context(format!("Error while writing logs into {log_file}"))?;
- } else {
- debug!(
- "Savana output already exists for {}, skipping execution.",
- self.id
- );
- }
- // Keep PASS
- let passed_vcf = self.config.savana_passed_vcf(id);
- if !Path::new(&passed_vcf).exists() && Path::new(output_vcf).exists() {
- let report = bcftools_keep_pass(output_vcf, &passed_vcf, BcftoolsConfig::default())
- .context(format!(
- "Error while running bcftools keep PASS for {output_vcf}"
- ))?;
- let log_file = format!("{}/bcftools_pass", self.log_dir);
- report
- .save_to_file(&log_file)
- .context(format!("Error while writing logs into {log_file}"))?;
- }
- Ok(())
- }
- }
- impl ShouldRun for Savana {
- /// Determines whether Savana should be re-run based on whether
- /// the filtered PASS VCF is older than the input BAMs.
- ///
- /// If either input BAM (normal or tumor) is newer than the PASS VCF,
- /// Savana is considered out of date and should be re-executed.
- ///
- /// # Returns
- /// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist)
- fn should_run(&self) -> bool {
- let passed_vcf = &self.config.savana_passed_vcf(&self.id);
- is_file_older(passed_vcf, &self.config.normal_bam(&self.id)).unwrap_or(true)
- || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id)).unwrap_or(true)
- }
- }
- impl Version for Savana {
- fn version(config: &Config) -> anyhow::Result<String> {
- let savana_args = ["--version"];
- let args = [
- "-c",
- &format!(
- "source {} && conda activate savana && {} {}",
- config.conda_sh,
- config.savana_bin,
- savana_args.join(" ")
- ),
- ];
- let mut cmd_run = CommandRun::new("bash", &args);
- let report = run_wait(&mut cmd_run)
- .context(format!("Error while running `savana {}`", args.join(" ")))?;
- let log = report.log;
- let start = log
- .find("Version ")
- .context("Failed to find 'Version ' in the log")?;
- let start_index = start + "Version ".len();
- let end = log[start_index..]
- .find('\n')
- .context("Failed to find newline after 'Version '")?;
- Ok(log[start_index..start_index + end]
- .to_string()
- .trim()
- .to_string())
- }
- }
- impl CallerCat for Savana {
- fn caller_cat(&self) -> Annotation {
- Annotation::Callers(Caller::Savana, Sample::Somatic)
- }
- }
- impl Variants for Savana {
- /// Loads variants from the PASS-filtered VCF produced by Savana and annotates them.
- ///
- /// Reads the VCF file, applies the caller tag via the `Annotations` object, and
- /// wraps the parsed data into a `VariantCollection`.
- ///
- /// # Arguments
- /// * `annotations` - Global annotation registry shared across callers.
- ///
- /// # Returns
- /// A populated `VariantCollection`, or an error if the VCF fails to parse.
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let caller = self.caller_cat();
- let add = vec![caller.clone()];
- let passed_vcf = self.config.savana_passed_vcf(&self.id);
- info!("Loading variants from {}: {}", caller, &passed_vcf);
- let variants = read_vcf(&passed_vcf)
- .map_err(|e| anyhow::anyhow!("Failed to read Savana VCF {}.\n{e}", passed_vcf))?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), &add);
- });
- info!("{}, {} variants loaded.", caller, variants.len());
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(passed_vcf.clone().into())?,
- caller,
- })
- }
- }
- impl RunnerVariants for Savana {}
- pub struct SavanaCNRow {
- pub range: GenomeRange,
- pub segment_id: String,
- pub bin_count: u32,
- pub sum_of_bin_lengths: u32,
- pub weight: f32,
- pub copy_number: f32,
- pub minor_allele_copy_number: Option<f64>,
- pub mean_baf: Option<f64>,
- pub n_het_snps: u32,
- }
- impl FromStr for SavanaCNRow {
- type Err = anyhow::Error;
- fn from_str(s: &str) -> anyhow::Result<Self> {
- let cells: Vec<&str> = s.split("\t").collect();
- let range = GenomeRange::from_1_inclusive_str(
- cells
- .first()
- .context(format!("Error while parsing contig {s}."))?,
- cells
- .get(1)
- .context(format!("Error while parsing start {s}."))?,
- cells
- .get(2)
- .context(format!("Error while parsing end {s}."))?,
- )
- .map_err(|e| anyhow::anyhow!("Error while parsing range {s}.\n{e}"))?;
- Ok(Self {
- range,
- segment_id: cells
- .get(3)
- .context(format!("Error while parsing segment_id {s}."))?
- .to_string(),
- bin_count: cells
- .get(4)
- .context(format!("Error while parsing bin_count {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing bin_count {s}.\n{e}"))?,
- sum_of_bin_lengths: cells
- .get(5)
- .context(format!("Error while parsing sum_of_bin_lengths {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing sum_of_bin_lengths {s}.\n{e}"))?,
- weight: cells
- .get(6)
- .context(format!("Error while parsing weight {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing weight {s}.\n{e}"))?,
- copy_number: cells
- .get(7)
- .context(format!("Error while parsing copy_number {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing copy_number {s}.\n{e}"))?,
- minor_allele_copy_number: cells
- .get(8)
- .context(format!("Error while parsing minor_allele_copy_number {s}."))?
- .parse::<f64>()
- .map(Some)
- .unwrap_or(None),
- mean_baf: cells
- .get(9)
- .context(format!("Error while parsing mean_baf {s}."))?
- .parse::<f64>()
- .map(Some)
- .unwrap_or(None),
- n_het_snps: cells
- .get(10)
- .context(format!("Error while parsing n_het_snps {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing n_het_snps {s}.\n{e}"))?,
- })
- }
- }
- pub struct SavanaCopyNumber {
- pub data: Vec<SavanaCNRow>,
- }
- impl SavanaCopyNumber {
- pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
- let path = config.savana_copy_number(id);
- let reader = get_gz_reader(&path)?;
- let mut data = Vec::new();
- for line in reader.lines() {
- let line = line?;
- if line.starts_with("chromosome") {
- continue;
- }
- let cn: SavanaCNRow = line.parse()?;
- data.push(cn);
- }
- Ok(Self { data })
- }
- }
- // bin chromosome start end perc_known_bases use_bin tumour_read_count normal_read_count
- pub struct SavanaRCRow {
- pub range: GenomeRange,
- pub perc_known_bases: f32,
- pub use_bin: bool,
- pub tumour_read_count: u32,
- pub normal_read_count: u32,
- }
- impl FromStr for SavanaRCRow {
- type Err = anyhow::Error;
- fn from_str(s: &str) -> anyhow::Result<Self> {
- let cells: Vec<&str> = s.split("\t").collect();
- let bin = cells
- .first()
- .context(format!("Error while parsing range {s}."))?;
- let range = bin
- .split_once(':')
- .and_then(|(a, b)| {
- b.split_once('_').map(|(b, c)| {
- GenomeRange::from_1_inclusive_str(a, b, c)
- .context(format!("Error while parsing range {}", bin))
- })
- })
- .context(format!("Invalid range format: {}", bin))??;
- Ok(Self {
- range,
- perc_known_bases: cells
- .get(4)
- .context(format!("Error while parsing perc_known_bases {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing perc_known_bases {s}.\n{e}"))?,
- use_bin: cells
- .get(5)
- .context(format!("Error while parsing use_bin {s}."))?
- .to_lowercase()
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing use_bin {s}.\n{e}"))?,
- tumour_read_count: cells
- .get(6)
- .context(format!("Error while parsing tumour_read_count {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing tumour_read_count {s}.\n{e}"))?,
- normal_read_count: cells
- .get(7)
- .context(format!("Error while parsing normal_read_count {s}."))?
- .parse()
- .map_err(|e| anyhow::anyhow!("Error while parsing normal_read_count {s}.\n{e}"))?,
- })
- }
- }
- pub struct SavanaReadCounts {
- pub data: Vec<SavanaRCRow>,
- }
- impl SavanaReadCounts {
- pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
- let path = config.savana_read_counts(id);
- let reader = get_gz_reader(&path)?;
- let mut data = Vec::new();
- for line in reader.lines() {
- let line = line?;
- if line.starts_with("bin") {
- continue;
- }
- let cn: SavanaRCRow = line.parse()?;
- data.push(cn);
- }
- Ok(Self { data })
- }
- pub fn n_tumoral_reads(&self) -> usize {
- self.data
- .par_iter()
- .map(|c| c.tumour_read_count as usize)
- .sum::<usize>()
- }
- pub fn n_normal_reads(&self) -> usize {
- self.data
- .par_iter()
- .map(|c| c.normal_read_count as usize)
- .sum::<usize>()
- }
- pub fn norm_chr_counts(&self) -> Vec<(String, f64)> {
- let n_tum = self.n_tumoral_reads();
- let n_bins = self.data.len();
- self.data
- .iter()
- .chunk_by(|c| c.range.contig)
- .into_iter()
- .map(|(contig_num, chr_counts)| {
- let chr_counts: Vec<_> = chr_counts.collect();
- let chr_n_bins = chr_counts.len();
- let chr_counts = chr_counts
- .par_iter()
- .map(|c| c.tumour_read_count as usize)
- .sum::<usize>();
- let r = chr_counts as f64 / ((chr_n_bins as f64 * n_tum as f64) / n_bins as f64);
- (num_to_contig(contig_num), r)
- })
- .collect()
- }
- }
- #[derive(Debug, Serialize, Deserialize)]
- pub struct SavanaCN {
- pub segments: Vec<CNSegment>,
- }
- #[derive(Debug, Serialize, Deserialize)]
- pub struct CNSegment {
- pub chromosome: String,
- pub start: u64,
- pub end: u64,
- pub segment_id: String,
- pub bin_count: u32,
- pub sum_of_bin_lengths: u64,
- pub weight: f64,
- pub copy_number: f64,
- pub minor_allele_copy_number: Option<f64>,
- pub mean_baf: Option<f64>,
- pub no_het_snps: u32,
- }
- impl SavanaCN {
- pub fn parse_file(id: &str, config: &Config) -> anyhow::Result<Self> {
- let path = config.savana_copy_number(id);
- let reader = get_gz_reader(&path)?;
- // let file = File::open(&path).context("Failed to open the file")?;
- // let reader = io::BufReader::new(file);
- let mut segments = Vec::new();
- for (index, line) in reader.lines().enumerate() {
- let line = line.context("Failed to read line from file")?;
- // Skip header line
- if index == 0 {
- continue;
- }
- let parts: Vec<&str> = line.split('\t').collect();
- if parts.len() != 11 {
- return Err(anyhow::anyhow!(
- "Invalid number of columns at line {} (expected 11, got {})",
- index + 1,
- parts.len()
- ));
- }
- // Helper closure for parsing with context
- let parse_field = |field: &str, name: &str, line_num: usize| -> anyhow::Result<f64> {
- field
- .parse()
- .context(format!("Failed to parse {name} at line {line_num}"))
- };
- let parse_field_u32 =
- |field: &str, name: &str, line_num: usize| -> anyhow::Result<u32> {
- field
- .parse()
- .context(format!("Failed to parse {name} at line {line_num}"))
- };
- let line_num = index + 1;
- let segment = CNSegment {
- chromosome: parts[0].to_string(),
- start: parse_field_u32(parts[1], "start", line_num)? as u64,
- end: parse_field_u32(parts[2], "end", line_num)? as u64,
- segment_id: parts[3].to_string(),
- bin_count: parse_field_u32(parts[4], "bin_count", line_num)?,
- sum_of_bin_lengths: parse_field_u32(parts[5], "sum_of_bin_lengths", line_num)?
- as u64,
- weight: parse_field(parts[6], "weight", line_num)?,
- copy_number: parse_field(parts[7], "copy_number", line_num)?,
- minor_allele_copy_number: if parts[8].is_empty() {
- None
- } else {
- Some(parse_field(parts[8], "minor_allele_copy_number", line_num)?)
- },
- mean_baf: if parts[9].is_empty() {
- None
- } else {
- Some(parse_field(parts[9], "mean_baf", line_num)?)
- },
- no_het_snps: if parts[10].is_empty() {
- 0
- } else {
- parse_field_u32(parts[10], "no_het_snps", line_num)?
- },
- };
- segments.push(segment);
- }
- Ok(Self { segments })
- }
- }
|