savana.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. use crate::{
  2. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  3. collection::{vcf::Vcf, Initialize, ShouldRun, Version},
  4. commands::{
  5. bcftools::{bcftools_keep_pass, BcftoolsConfig},
  6. longphase::{LongphaseConfig, LongphaseHap, LongphasePhase},
  7. },
  8. config::Config,
  9. helpers::is_file_older,
  10. io::{readers::get_gz_reader, vcf::read_vcf},
  11. positions::{num_to_contig, GenomeRange},
  12. runners::{run_wait, CommandRun, Run},
  13. variant::{
  14. variant::{RunnerVariants, Variants},
  15. variant_collection::VariantCollection,
  16. },
  17. };
  18. use anyhow::Context;
  19. use itertools::Itertools;
  20. use log::{debug, info};
  21. use rayon::prelude::*;
  22. use serde::{Deserialize, Serialize};
  23. use std::{
  24. fs::{self},
  25. io::BufRead,
  26. path::Path,
  27. str::FromStr,
  28. };
  29. /// The `Savana` struct orchestrates the haplotype-aware somatic variant calling
  30. /// pipeline using the Savana tool. It manages initialization, conditional execution,
  31. /// phasing dependencies, haplotagging, and output filtering.
  32. #[derive(Debug)]
  33. pub struct Savana {
  34. pub id: String,
  35. pub config: Config,
  36. pub log_dir: String,
  37. }
  38. impl Initialize for Savana {
  39. /// Initializes a new `Savana` instance for a given sample ID and configuration.
  40. ///
  41. /// If `savana_force` is enabled in [`Config`] and the output VCF exists, or if the run
  42. /// should be re-triggered (based on file freshness), the output directory
  43. /// will be cleaned.
  44. ///
  45. /// # Arguments
  46. ///
  47. /// * `id` - Sample identifier
  48. /// * `config` - Pipeline execution configuration
  49. ///
  50. /// # Returns
  51. ///
  52. /// A new `Savana` instance, or an error if cleanup fails.
  53. fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
  54. info!("Initialize Savana for {id}.");
  55. let log_dir = format!("{}/{}/log/savana", config.result_dir, id);
  56. let savana = Self {
  57. id: id.to_string(),
  58. config,
  59. log_dir,
  60. };
  61. let output_vcf_exists = Path::new(&savana.config.savana_output_vcf(id)).exists();
  62. if (savana.config.savana_force && output_vcf_exists) || savana.should_run() {
  63. fs::remove_dir_all(savana.config.savana_output_dir(id))?;
  64. }
  65. Ok(savana)
  66. }
  67. }
  68. impl Run for Savana {
  69. /// Executes the Savana pipeline, including prerequisite phasing and haplotagging steps.
  70. ///
  71. /// If the output VCF does not already exist, it ensures that the phased VCF and
  72. /// haplotagged BAMs exist (or runs the modules to produce them), then runs Savana.
  73. /// Finally, it filters the resulting VCF to retain only PASS variants.
  74. ///
  75. /// # Returns
  76. ///
  77. /// `Ok(())` if the run completes successfully, or an error otherwise.
  78. fn run(&mut self) -> anyhow::Result<()> {
  79. let id = &self.id;
  80. let output_vcf = &self.config.savana_output_vcf(id);
  81. if !Path::new(&output_vcf).exists() {
  82. info!("Running Savana v{}", Savana::version(&self.config)?);
  83. let output_dir = self.config.savana_output_dir(id);
  84. fs::create_dir_all(&output_dir).with_context(|| {
  85. format!("Failed to create output dir for Savana run: {output_dir}")
  86. })?;
  87. // Check for phased germline vcf
  88. // no required anymore since >= 1.3.0
  89. let phased_germline_vcf = self.config.constit_phased_vcf(id);
  90. if !Path::new(&phased_germline_vcf).exists() {
  91. LongphasePhase::initialize(id, self.config.clone())?.run()?;
  92. }
  93. let normal_hp_bam = self.config.normal_haplotagged_bam(id);
  94. let tumoral_hp_bam = self.config.tumoral_haplotagged_bam(id);
  95. // Check for haplotagged bam
  96. if !Path::new(&normal_hp_bam).exists() {
  97. LongphaseHap::new(
  98. id,
  99. &self.config.normal_bam(id),
  100. &phased_germline_vcf,
  101. LongphaseConfig::default(),
  102. )
  103. .run()?;
  104. }
  105. if !Path::new(&tumoral_hp_bam).exists() {
  106. LongphaseHap::new(
  107. id,
  108. &self.config.tumoral_bam(id),
  109. &phased_germline_vcf,
  110. LongphaseConfig::default(),
  111. )
  112. .run()?;
  113. }
  114. let savana_args = [
  115. // "run",
  116. "--tumour",
  117. &tumoral_hp_bam,
  118. "--normal",
  119. &normal_hp_bam,
  120. "--outdir",
  121. &self.config.savana_output_dir(id),
  122. "--ref",
  123. &self.config.reference,
  124. "--snp_vcf",
  125. &phased_germline_vcf,
  126. "--no_blacklist",
  127. "--threads",
  128. &self.config.savana_threads.to_string(),
  129. ];
  130. let args = [
  131. "-c",
  132. &format!(
  133. "source {} && conda activate savana && {} {}",
  134. self.config.conda_sh,
  135. self.config.savana_bin,
  136. savana_args.join(" ")
  137. ),
  138. ];
  139. let mut cmd_run = CommandRun::new("bash", &args);
  140. let report = run_wait(&mut cmd_run).context(format!(
  141. "Error while running `severus.py {}`",
  142. args.join(" ")
  143. ))?;
  144. let log_file = format!("{}/savana_", self.log_dir);
  145. report
  146. .save_to_file(&log_file)
  147. .context(format!("Error while writing logs into {log_file}"))?;
  148. } else {
  149. debug!(
  150. "Savana output already exists for {}, skipping execution.",
  151. self.id
  152. );
  153. }
  154. // Keep PASS
  155. let passed_vcf = self.config.savana_passed_vcf(id);
  156. if !Path::new(&passed_vcf).exists() && Path::new(output_vcf).exists() {
  157. let report = bcftools_keep_pass(output_vcf, &passed_vcf, BcftoolsConfig::default())
  158. .context(format!(
  159. "Error while running bcftools keep PASS for {output_vcf}"
  160. ))?;
  161. let log_file = format!("{}/bcftools_pass", self.log_dir);
  162. report
  163. .save_to_file(&log_file)
  164. .context(format!("Error while writing logs into {log_file}"))?;
  165. }
  166. Ok(())
  167. }
  168. }
  169. impl ShouldRun for Savana {
  170. /// Determines whether Savana should be re-run based on whether
  171. /// the filtered PASS VCF is older than the input BAMs.
  172. ///
  173. /// If either input BAM (normal or tumor) is newer than the PASS VCF,
  174. /// Savana is considered out of date and should be re-executed.
  175. ///
  176. /// # Returns
  177. /// `true` if an update is needed, or if timestamps can't be checked (file doesn't exist)
  178. fn should_run(&self) -> bool {
  179. let passed_vcf = &self.config.savana_passed_vcf(&self.id);
  180. is_file_older(passed_vcf, &self.config.normal_bam(&self.id)).unwrap_or(true)
  181. || is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id)).unwrap_or(true)
  182. }
  183. }
  184. impl Version for Savana {
  185. fn version(config: &Config) -> anyhow::Result<String> {
  186. let savana_args = ["--version"];
  187. let args = [
  188. "-c",
  189. &format!(
  190. "source {} && conda activate savana && {} {}",
  191. config.conda_sh,
  192. config.savana_bin,
  193. savana_args.join(" ")
  194. ),
  195. ];
  196. let mut cmd_run = CommandRun::new("bash", &args);
  197. let report = run_wait(&mut cmd_run)
  198. .context(format!("Error while running `savana {}`", args.join(" ")))?;
  199. let log = report.log;
  200. let start = log
  201. .find("Version ")
  202. .context("Failed to find 'Version ' in the log")?;
  203. let start_index = start + "Version ".len();
  204. let end = log[start_index..]
  205. .find('\n')
  206. .context("Failed to find newline after 'Version '")?;
  207. Ok(log[start_index..start_index + end]
  208. .to_string()
  209. .trim()
  210. .to_string())
  211. }
  212. }
  213. impl CallerCat for Savana {
  214. fn caller_cat(&self) -> Annotation {
  215. Annotation::Callers(Caller::Savana, Sample::Somatic)
  216. }
  217. }
  218. impl Variants for Savana {
  219. /// Loads variants from the PASS-filtered VCF produced by Savana and annotates them.
  220. ///
  221. /// Reads the VCF file, applies the caller tag via the `Annotations` object, and
  222. /// wraps the parsed data into a `VariantCollection`.
  223. ///
  224. /// # Arguments
  225. /// * `annotations` - Global annotation registry shared across callers.
  226. ///
  227. /// # Returns
  228. /// A populated `VariantCollection`, or an error if the VCF fails to parse.
  229. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  230. let caller = self.caller_cat();
  231. let add = vec![caller.clone()];
  232. let passed_vcf = self.config.savana_passed_vcf(&self.id);
  233. info!("Loading variants from {}: {}", caller, &passed_vcf);
  234. let variants = read_vcf(&passed_vcf)
  235. .map_err(|e| anyhow::anyhow!("Failed to read Savana VCF {}.\n{e}", passed_vcf))?;
  236. variants.par_iter().for_each(|v| {
  237. annotations.insert_update(v.hash(), &add);
  238. });
  239. info!("{}, {} variants loaded.", caller, variants.len());
  240. Ok(VariantCollection {
  241. variants,
  242. vcf: Vcf::new(passed_vcf.clone().into())?,
  243. caller,
  244. })
  245. }
  246. }
  247. impl RunnerVariants for Savana {}
  248. pub struct SavanaCNRow {
  249. pub range: GenomeRange,
  250. pub segment_id: String,
  251. pub bin_count: u32,
  252. pub sum_of_bin_lengths: u32,
  253. pub weight: f32,
  254. pub copy_number: f32,
  255. pub minor_allele_copy_number: Option<f64>,
  256. pub mean_baf: Option<f64>,
  257. pub n_het_snps: u32,
  258. }
  259. impl FromStr for SavanaCNRow {
  260. type Err = anyhow::Error;
  261. fn from_str(s: &str) -> anyhow::Result<Self> {
  262. let cells: Vec<&str> = s.split("\t").collect();
  263. let range = GenomeRange::from_1_inclusive_str(
  264. cells
  265. .first()
  266. .context(format!("Error while parsing contig {s}."))?,
  267. cells
  268. .get(1)
  269. .context(format!("Error while parsing start {s}."))?,
  270. cells
  271. .get(2)
  272. .context(format!("Error while parsing end {s}."))?,
  273. )
  274. .map_err(|e| anyhow::anyhow!("Error while parsing range {s}.\n{e}"))?;
  275. Ok(Self {
  276. range,
  277. segment_id: cells
  278. .get(3)
  279. .context(format!("Error while parsing segment_id {s}."))?
  280. .to_string(),
  281. bin_count: cells
  282. .get(4)
  283. .context(format!("Error while parsing bin_count {s}."))?
  284. .parse()
  285. .map_err(|e| anyhow::anyhow!("Error while parsing bin_count {s}.\n{e}"))?,
  286. sum_of_bin_lengths: cells
  287. .get(5)
  288. .context(format!("Error while parsing sum_of_bin_lengths {s}."))?
  289. .parse()
  290. .map_err(|e| anyhow::anyhow!("Error while parsing sum_of_bin_lengths {s}.\n{e}"))?,
  291. weight: cells
  292. .get(6)
  293. .context(format!("Error while parsing weight {s}."))?
  294. .parse()
  295. .map_err(|e| anyhow::anyhow!("Error while parsing weight {s}.\n{e}"))?,
  296. copy_number: cells
  297. .get(7)
  298. .context(format!("Error while parsing copy_number {s}."))?
  299. .parse()
  300. .map_err(|e| anyhow::anyhow!("Error while parsing copy_number {s}.\n{e}"))?,
  301. minor_allele_copy_number: cells
  302. .get(8)
  303. .context(format!("Error while parsing minor_allele_copy_number {s}."))?
  304. .parse::<f64>()
  305. .map(Some)
  306. .unwrap_or(None),
  307. mean_baf: cells
  308. .get(9)
  309. .context(format!("Error while parsing mean_baf {s}."))?
  310. .parse::<f64>()
  311. .map(Some)
  312. .unwrap_or(None),
  313. n_het_snps: cells
  314. .get(10)
  315. .context(format!("Error while parsing n_het_snps {s}."))?
  316. .parse()
  317. .map_err(|e| anyhow::anyhow!("Error while parsing n_het_snps {s}.\n{e}"))?,
  318. })
  319. }
  320. }
  321. pub struct SavanaCopyNumber {
  322. pub data: Vec<SavanaCNRow>,
  323. }
  324. impl SavanaCopyNumber {
  325. pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
  326. let path = config.savana_copy_number(id);
  327. let reader = get_gz_reader(&path)?;
  328. let mut data = Vec::new();
  329. for line in reader.lines() {
  330. let line = line?;
  331. if line.starts_with("chromosome") {
  332. continue;
  333. }
  334. let cn: SavanaCNRow = line.parse()?;
  335. data.push(cn);
  336. }
  337. Ok(Self { data })
  338. }
  339. }
  340. // bin chromosome start end perc_known_bases use_bin tumour_read_count normal_read_count
  341. pub struct SavanaRCRow {
  342. pub range: GenomeRange,
  343. pub perc_known_bases: f32,
  344. pub use_bin: bool,
  345. pub tumour_read_count: u32,
  346. pub normal_read_count: u32,
  347. }
  348. impl FromStr for SavanaRCRow {
  349. type Err = anyhow::Error;
  350. fn from_str(s: &str) -> anyhow::Result<Self> {
  351. let cells: Vec<&str> = s.split("\t").collect();
  352. let bin = cells
  353. .first()
  354. .context(format!("Error while parsing range {s}."))?;
  355. let range = bin
  356. .split_once(':')
  357. .and_then(|(a, b)| {
  358. b.split_once('_').map(|(b, c)| {
  359. GenomeRange::from_1_inclusive_str(a, b, c)
  360. .context(format!("Error while parsing range {}", bin))
  361. })
  362. })
  363. .context(format!("Invalid range format: {}", bin))??;
  364. Ok(Self {
  365. range,
  366. perc_known_bases: cells
  367. .get(4)
  368. .context(format!("Error while parsing perc_known_bases {s}."))?
  369. .parse()
  370. .map_err(|e| anyhow::anyhow!("Error while parsing perc_known_bases {s}.\n{e}"))?,
  371. use_bin: cells
  372. .get(5)
  373. .context(format!("Error while parsing use_bin {s}."))?
  374. .to_lowercase()
  375. .parse()
  376. .map_err(|e| anyhow::anyhow!("Error while parsing use_bin {s}.\n{e}"))?,
  377. tumour_read_count: cells
  378. .get(6)
  379. .context(format!("Error while parsing tumour_read_count {s}."))?
  380. .parse()
  381. .map_err(|e| anyhow::anyhow!("Error while parsing tumour_read_count {s}.\n{e}"))?,
  382. normal_read_count: cells
  383. .get(7)
  384. .context(format!("Error while parsing normal_read_count {s}."))?
  385. .parse()
  386. .map_err(|e| anyhow::anyhow!("Error while parsing normal_read_count {s}.\n{e}"))?,
  387. })
  388. }
  389. }
  390. pub struct SavanaReadCounts {
  391. pub data: Vec<SavanaRCRow>,
  392. }
  393. impl SavanaReadCounts {
  394. pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
  395. let path = config.savana_read_counts(id);
  396. let reader = get_gz_reader(&path)?;
  397. let mut data = Vec::new();
  398. for line in reader.lines() {
  399. let line = line?;
  400. if line.starts_with("bin") {
  401. continue;
  402. }
  403. let cn: SavanaRCRow = line.parse()?;
  404. data.push(cn);
  405. }
  406. Ok(Self { data })
  407. }
  408. pub fn n_tumoral_reads(&self) -> usize {
  409. self.data
  410. .par_iter()
  411. .map(|c| c.tumour_read_count as usize)
  412. .sum::<usize>()
  413. }
  414. pub fn n_normal_reads(&self) -> usize {
  415. self.data
  416. .par_iter()
  417. .map(|c| c.normal_read_count as usize)
  418. .sum::<usize>()
  419. }
  420. pub fn norm_chr_counts(&self) -> Vec<(String, f64)> {
  421. let n_tum = self.n_tumoral_reads();
  422. let n_bins = self.data.len();
  423. self.data
  424. .iter()
  425. .chunk_by(|c| c.range.contig)
  426. .into_iter()
  427. .map(|(contig_num, chr_counts)| {
  428. let chr_counts: Vec<_> = chr_counts.collect();
  429. let chr_n_bins = chr_counts.len();
  430. let chr_counts = chr_counts
  431. .par_iter()
  432. .map(|c| c.tumour_read_count as usize)
  433. .sum::<usize>();
  434. let r = chr_counts as f64 / ((chr_n_bins as f64 * n_tum as f64) / n_bins as f64);
  435. (num_to_contig(contig_num), r)
  436. })
  437. .collect()
  438. }
  439. }
  440. #[derive(Debug, Serialize, Deserialize)]
  441. pub struct SavanaCN {
  442. pub segments: Vec<CNSegment>,
  443. }
  444. #[derive(Debug, Serialize, Deserialize)]
  445. pub struct CNSegment {
  446. pub chromosome: String,
  447. pub start: u64,
  448. pub end: u64,
  449. pub segment_id: String,
  450. pub bin_count: u32,
  451. pub sum_of_bin_lengths: u64,
  452. pub weight: f64,
  453. pub copy_number: f64,
  454. pub minor_allele_copy_number: Option<f64>,
  455. pub mean_baf: Option<f64>,
  456. pub no_het_snps: u32,
  457. }
  458. impl SavanaCN {
  459. pub fn parse_file(id: &str, config: &Config) -> anyhow::Result<Self> {
  460. let path = config.savana_copy_number(id);
  461. let reader = get_gz_reader(&path)?;
  462. // let file = File::open(&path).context("Failed to open the file")?;
  463. // let reader = io::BufReader::new(file);
  464. let mut segments = Vec::new();
  465. for (index, line) in reader.lines().enumerate() {
  466. let line = line.context("Failed to read line from file")?;
  467. // Skip header line
  468. if index == 0 {
  469. continue;
  470. }
  471. let parts: Vec<&str> = line.split('\t').collect();
  472. if parts.len() != 11 {
  473. return Err(anyhow::anyhow!(
  474. "Invalid number of columns at line {} (expected 11, got {})",
  475. index + 1,
  476. parts.len()
  477. ));
  478. }
  479. // Helper closure for parsing with context
  480. let parse_field = |field: &str, name: &str, line_num: usize| -> anyhow::Result<f64> {
  481. field
  482. .parse()
  483. .context(format!("Failed to parse {name} at line {line_num}"))
  484. };
  485. let parse_field_u32 =
  486. |field: &str, name: &str, line_num: usize| -> anyhow::Result<u32> {
  487. field
  488. .parse()
  489. .context(format!("Failed to parse {name} at line {line_num}"))
  490. };
  491. let line_num = index + 1;
  492. let segment = CNSegment {
  493. chromosome: parts[0].to_string(),
  494. start: parse_field_u32(parts[1], "start", line_num)? as u64,
  495. end: parse_field_u32(parts[2], "end", line_num)? as u64,
  496. segment_id: parts[3].to_string(),
  497. bin_count: parse_field_u32(parts[4], "bin_count", line_num)?,
  498. sum_of_bin_lengths: parse_field_u32(parts[5], "sum_of_bin_lengths", line_num)?
  499. as u64,
  500. weight: parse_field(parts[6], "weight", line_num)?,
  501. copy_number: parse_field(parts[7], "copy_number", line_num)?,
  502. minor_allele_copy_number: if parts[8].is_empty() {
  503. None
  504. } else {
  505. Some(parse_field(parts[8], "minor_allele_copy_number", line_num)?)
  506. },
  507. mean_baf: if parts[9].is_empty() {
  508. None
  509. } else {
  510. Some(parse_field(parts[9], "mean_baf", line_num)?)
  511. },
  512. no_het_snps: if parts[10].is_empty() {
  513. 0
  514. } else {
  515. parse_field_u32(parts[10], "no_het_snps", line_num)?
  516. },
  517. };
  518. segments.push(segment);
  519. }
  520. Ok(Self { segments })
  521. }
  522. }