savana.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. use crate::{
  2. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  3. collection::{vcf::Vcf, HasOutputs, Initialize, Version},
  4. commands::{
  5. bcftools::{bcftools_keep_pass, BcftoolsConfig},
  6. longphase::{LongphaseConfig, LongphaseHap, LongphasePhase},
  7. },
  8. config::Config,
  9. io::{readers::get_gz_reader, vcf::read_vcf},
  10. positions::{num_to_contig, GenomeRange},
  11. runners::{run_wait, CommandRun, Run},
  12. variant::{
  13. variant::{RunnerVariants, Variants},
  14. variant_collection::VariantCollection,
  15. },
  16. };
  17. use anyhow::Context;
  18. use itertools::Itertools;
  19. use log::info;
  20. use rayon::prelude::*;
  21. use serde::{Deserialize, Serialize};
  22. use std::{
  23. fs::{self},
  24. io::BufRead,
  25. path::Path,
  26. str::FromStr,
  27. };
  28. #[derive(Debug)]
  29. pub struct Savana {
  30. pub id: String,
  31. pub passed_vcf: String,
  32. pub phased_germline_vcf: String,
  33. pub normal_hp_bam: String,
  34. pub tumoral_hp_bam: String,
  35. pub config: Config,
  36. pub log_dir: String,
  37. }
  38. impl Initialize for Savana {
  39. fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
  40. let log_dir = format!("{}/{}/log/savana", config.result_dir, id);
  41. if !Path::new(&log_dir).exists() {
  42. fs::create_dir_all(&log_dir)
  43. .context(format!("Failed to create {log_dir} directory"))?;
  44. }
  45. // Check for phased germline vcf
  46. // no required anymore since >= 1.3.0
  47. let phased_germline_vcf = config.constit_phased_vcf(id);
  48. if !Path::new(&phased_germline_vcf).exists() {
  49. LongphasePhase::initialize(id, config.clone())?.run()?;
  50. }
  51. let normal_hp_bam = config.normal_haplotagged_bam(id);
  52. let tumoral_hp_bam = config.tumoral_haplotagged_bam(id);
  53. let passed_vcf = config.savana_passed_vcf(id);
  54. fs::create_dir_all(config.savana_output_dir(id))?;
  55. Ok(Self {
  56. id: id.to_string(),
  57. passed_vcf,
  58. config,
  59. log_dir,
  60. phased_germline_vcf,
  61. normal_hp_bam,
  62. tumoral_hp_bam,
  63. })
  64. }
  65. }
  66. impl Run for Savana {
  67. fn run(&mut self) -> anyhow::Result<()> {
  68. // force_or_not(
  69. // &self.config.savana_output_vcf(&self.id),
  70. // self.config.savana_force,
  71. // )?;
  72. let id = &self.id;
  73. let output_vcf = &self.config.savana_output_vcf(id);
  74. if !Path::new(&output_vcf).exists() {
  75. info!("Running Savana v{}", Savana::version(&self.config)?);
  76. // Check for haplotagged bam
  77. if !Path::new(&self.normal_hp_bam).exists() {
  78. LongphaseHap::new(
  79. id,
  80. &self.config.normal_bam(id),
  81. &self.phased_germline_vcf,
  82. LongphaseConfig::default(),
  83. )
  84. .run()?;
  85. }
  86. if !Path::new(&self.tumoral_hp_bam).exists() {
  87. LongphaseHap::new(
  88. id,
  89. &self.config.tumoral_bam(id),
  90. &self.phased_germline_vcf,
  91. LongphaseConfig::default(),
  92. )
  93. .run()?;
  94. }
  95. let savana_args = [
  96. // "run",
  97. "--tumour",
  98. &self.tumoral_hp_bam,
  99. "--normal",
  100. &self.normal_hp_bam,
  101. "--outdir",
  102. &self.config.savana_output_dir(id),
  103. "--ref",
  104. &self.config.reference,
  105. "--snp_vcf",
  106. &self.phased_germline_vcf,
  107. "--no_blacklist",
  108. "--threads",
  109. &self.config.savana_threads.to_string(),
  110. ];
  111. let args = [
  112. "-c",
  113. &format!(
  114. "source {} && conda activate savana && {} {}",
  115. self.config.conda_sh,
  116. self.config.savana_bin,
  117. savana_args.join(" ")
  118. ),
  119. ];
  120. let mut cmd_run = CommandRun::new("bash", &args);
  121. let report = run_wait(&mut cmd_run).context(format!(
  122. "Error while running `severus.py {}`",
  123. args.join(" ")
  124. ))?;
  125. let log_file = format!("{}/savana_", self.log_dir);
  126. report
  127. .save_to_file(&log_file)
  128. .context(format!("Error while writing logs into {log_file}"))?;
  129. }
  130. // Keep PASS
  131. if !Path::new(&self.passed_vcf).exists() && Path::new(output_vcf).exists() {
  132. let report =
  133. bcftools_keep_pass(output_vcf, &self.passed_vcf, BcftoolsConfig::default())
  134. .context(format!(
  135. "Error while running bcftools keep PASS for {output_vcf}"
  136. ))?;
  137. let log_file = format!("{}/bcftools_pass", self.log_dir);
  138. report
  139. .save_to_file(&log_file)
  140. .context(format!("Error while writing logs into {log_file}"))?;
  141. }
  142. Ok(())
  143. }
  144. }
  145. impl HasOutputs for Savana {
  146. fn has_output(&self, id: &str) -> (bool, bool) {
  147. let output = &self.config.savana_passed_vcf(id);
  148. let passed = &self.config.savana_passed_vcf(id);
  149. (Path::new(output).exists(), Path::new(passed).exists())
  150. }
  151. }
  152. impl Version for Savana {
  153. fn version(config: &Config) -> anyhow::Result<String> {
  154. let savana_args = ["--version"];
  155. let args = [
  156. "-c",
  157. &format!(
  158. "source {} && conda activate savana && {} {}",
  159. config.conda_sh,
  160. config.savana_bin,
  161. savana_args.join(" ")
  162. ),
  163. ];
  164. let mut cmd_run = CommandRun::new("bash", &args);
  165. let report = run_wait(&mut cmd_run)
  166. .context(format!("Error while running `savana {}`", args.join(" ")))?;
  167. let log = report.log;
  168. let start = log
  169. .find("Version ")
  170. .context("Failed to find 'Version ' in the log")?;
  171. let start_index = start + "Version ".len();
  172. let end = log[start_index..]
  173. .find('\n')
  174. .context("Failed to find newline after 'Version '")?;
  175. Ok(log[start_index..start_index + end]
  176. .to_string()
  177. .trim()
  178. .to_string())
  179. }
  180. }
  181. impl CallerCat for Savana {
  182. fn caller_cat(&self) -> Annotation {
  183. Annotation::Callers(Caller::Savana, Sample::Somatic)
  184. }
  185. }
  186. impl Variants for Savana {
  187. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  188. let caller = self.caller_cat();
  189. let add = vec![caller.clone()];
  190. info!("Loading variants from {}: {}", caller, self.passed_vcf);
  191. let variants = read_vcf(&self.passed_vcf)
  192. .map_err(|e| anyhow::anyhow!("Failed to read Savana VCF {}.\n{e}", self.passed_vcf))?;
  193. variants.par_iter().for_each(|v| {
  194. annotations.insert_update(v.hash(), &add);
  195. });
  196. info!("{}, {} variants loaded.", caller, variants.len());
  197. Ok(VariantCollection {
  198. variants,
  199. vcf: Vcf::new(self.passed_vcf.clone().into())?,
  200. caller,
  201. })
  202. }
  203. }
  204. impl RunnerVariants for Savana {}
  205. pub struct SavanaCNRow {
  206. pub range: GenomeRange,
  207. pub segment_id: String,
  208. pub bin_count: u32,
  209. pub sum_of_bin_lengths: u32,
  210. pub weight: f32,
  211. pub copy_number: f32,
  212. pub minor_allele_copy_number: Option<f64>,
  213. pub mean_baf: Option<f64>,
  214. pub n_het_snps: u32,
  215. }
  216. impl FromStr for SavanaCNRow {
  217. type Err = anyhow::Error;
  218. fn from_str(s: &str) -> anyhow::Result<Self> {
  219. let cells: Vec<&str> = s.split("\t").collect();
  220. let range = GenomeRange::from_1_inclusive_str(
  221. cells
  222. .first()
  223. .context(format!("Error while parsing contig {s}."))?,
  224. cells
  225. .get(1)
  226. .context(format!("Error while parsing start {s}."))?,
  227. cells
  228. .get(2)
  229. .context(format!("Error while parsing end {s}."))?,
  230. )
  231. .map_err(|e| anyhow::anyhow!("Error while parsing range {s}.\n{e}"))?;
  232. Ok(Self {
  233. range,
  234. segment_id: cells
  235. .get(3)
  236. .context(format!("Error while parsing segment_id {s}."))?
  237. .to_string(),
  238. bin_count: cells
  239. .get(4)
  240. .context(format!("Error while parsing bin_count {s}."))?
  241. .parse()
  242. .map_err(|e| anyhow::anyhow!("Error while parsing bin_count {s}.\n{e}"))?,
  243. sum_of_bin_lengths: cells
  244. .get(5)
  245. .context(format!("Error while parsing sum_of_bin_lengths {s}."))?
  246. .parse()
  247. .map_err(|e| anyhow::anyhow!("Error while parsing sum_of_bin_lengths {s}.\n{e}"))?,
  248. weight: cells
  249. .get(6)
  250. .context(format!("Error while parsing weight {s}."))?
  251. .parse()
  252. .map_err(|e| anyhow::anyhow!("Error while parsing weight {s}.\n{e}"))?,
  253. copy_number: cells
  254. .get(7)
  255. .context(format!("Error while parsing copy_number {s}."))?
  256. .parse()
  257. .map_err(|e| anyhow::anyhow!("Error while parsing copy_number {s}.\n{e}"))?,
  258. minor_allele_copy_number: cells
  259. .get(8)
  260. .context(format!("Error while parsing minor_allele_copy_number {s}."))?
  261. .parse::<f64>()
  262. .map(Some)
  263. .unwrap_or(None),
  264. mean_baf: cells
  265. .get(9)
  266. .context(format!("Error while parsing mean_baf {s}."))?
  267. .parse::<f64>()
  268. .map(Some)
  269. .unwrap_or(None),
  270. n_het_snps: cells
  271. .get(10)
  272. .context(format!("Error while parsing n_het_snps {s}."))?
  273. .parse()
  274. .map_err(|e| anyhow::anyhow!("Error while parsing n_het_snps {s}.\n{e}"))?,
  275. })
  276. }
  277. }
  278. pub struct SavanaCopyNumber {
  279. pub data: Vec<SavanaCNRow>,
  280. }
  281. impl SavanaCopyNumber {
  282. pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
  283. let path = config.savana_copy_number(id);
  284. let reader = get_gz_reader(&path)?;
  285. let mut data = Vec::new();
  286. for line in reader.lines() {
  287. let line = line?;
  288. if line.starts_with("chromosome") {
  289. continue;
  290. }
  291. let cn: SavanaCNRow = line.parse()?;
  292. data.push(cn);
  293. }
  294. Ok(Self { data })
  295. }
  296. }
  297. // bin chromosome start end perc_known_bases use_bin tumour_read_count normal_read_count
  298. pub struct SavanaRCRow {
  299. pub range: GenomeRange,
  300. pub perc_known_bases: f32,
  301. pub use_bin: bool,
  302. pub tumour_read_count: u32,
  303. pub normal_read_count: u32,
  304. }
  305. impl FromStr for SavanaRCRow {
  306. type Err = anyhow::Error;
  307. fn from_str(s: &str) -> anyhow::Result<Self> {
  308. let cells: Vec<&str> = s.split("\t").collect();
  309. let bin = cells
  310. .first()
  311. .context(format!("Error while parsing range {s}."))?;
  312. let range = bin
  313. .split_once(':')
  314. .and_then(|(a, b)| {
  315. b.split_once('_').map(|(b, c)| {
  316. GenomeRange::from_1_inclusive_str(a, b, c)
  317. .context(format!("Error while parsing range {}", bin))
  318. })
  319. })
  320. .context(format!("Invalid range format: {}", bin))??;
  321. Ok(Self {
  322. range,
  323. perc_known_bases: cells
  324. .get(4)
  325. .context(format!("Error while parsing perc_known_bases {s}."))?
  326. .parse()
  327. .map_err(|e| anyhow::anyhow!("Error while parsing perc_known_bases {s}.\n{e}"))?,
  328. use_bin: cells
  329. .get(5)
  330. .context(format!("Error while parsing use_bin {s}."))?
  331. .to_lowercase()
  332. .parse()
  333. .map_err(|e| anyhow::anyhow!("Error while parsing use_bin {s}.\n{e}"))?,
  334. tumour_read_count: cells
  335. .get(6)
  336. .context(format!("Error while parsing tumour_read_count {s}."))?
  337. .parse()
  338. .map_err(|e| anyhow::anyhow!("Error while parsing tumour_read_count {s}.\n{e}"))?,
  339. normal_read_count: cells
  340. .get(7)
  341. .context(format!("Error while parsing normal_read_count {s}."))?
  342. .parse()
  343. .map_err(|e| anyhow::anyhow!("Error while parsing normal_read_count {s}.\n{e}"))?,
  344. })
  345. }
  346. }
  347. pub struct SavanaReadCounts {
  348. pub data: Vec<SavanaRCRow>,
  349. }
  350. impl SavanaReadCounts {
  351. pub fn load_id(id: &str, config: Config) -> anyhow::Result<Self> {
  352. let path = config.savana_read_counts(id);
  353. let reader = get_gz_reader(&path)?;
  354. let mut data = Vec::new();
  355. for line in reader.lines() {
  356. let line = line?;
  357. if line.starts_with("bin") {
  358. continue;
  359. }
  360. let cn: SavanaRCRow = line.parse()?;
  361. data.push(cn);
  362. }
  363. Ok(Self { data })
  364. }
  365. pub fn n_tumoral_reads(&self) -> usize {
  366. self.data
  367. .par_iter()
  368. .map(|c| c.tumour_read_count as usize)
  369. .sum::<usize>()
  370. }
  371. pub fn n_normal_reads(&self) -> usize {
  372. self.data
  373. .par_iter()
  374. .map(|c| c.normal_read_count as usize)
  375. .sum::<usize>()
  376. }
  377. pub fn norm_chr_counts(&self) -> Vec<(String, f64)> {
  378. let n_tum = self.n_tumoral_reads();
  379. let n_bins = self.data.len();
  380. self.data
  381. .iter()
  382. .chunk_by(|c| c.range.contig)
  383. .into_iter()
  384. .map(|(contig_num, chr_counts)| {
  385. let chr_counts: Vec<_> = chr_counts.collect();
  386. let chr_n_bins = chr_counts.len();
  387. let chr_counts = chr_counts
  388. .par_iter()
  389. .map(|c| c.tumour_read_count as usize)
  390. .sum::<usize>();
  391. let r = chr_counts as f64 / ((chr_n_bins as f64 * n_tum as f64) / n_bins as f64);
  392. (num_to_contig(contig_num), r)
  393. })
  394. .collect()
  395. }
  396. }
  397. #[derive(Debug, Serialize, Deserialize)]
  398. pub struct SavanaCN {
  399. pub segments: Vec<CNSegment>,
  400. }
  401. #[derive(Debug, Serialize, Deserialize)]
  402. pub struct CNSegment {
  403. pub chromosome: String,
  404. pub start: u64,
  405. pub end: u64,
  406. pub segment_id: String,
  407. pub bin_count: u32,
  408. pub sum_of_bin_lengths: u64,
  409. pub weight: f64,
  410. pub copy_number: f64,
  411. pub minor_allele_copy_number: Option<f64>,
  412. pub mean_baf: Option<f64>,
  413. pub no_het_snps: u32,
  414. }
  415. impl SavanaCN {
  416. pub fn parse_file(id: &str, config: &Config) -> anyhow::Result<Self> {
  417. let path = config.savana_copy_number(id);
  418. let reader = get_gz_reader(&path)?;
  419. // let file = File::open(&path).context("Failed to open the file")?;
  420. // let reader = io::BufReader::new(file);
  421. let mut segments = Vec::new();
  422. for (index, line) in reader.lines().enumerate() {
  423. let line = line.context("Failed to read line from file")?;
  424. // Skip header line
  425. if index == 0 {
  426. continue;
  427. }
  428. let parts: Vec<&str> = line.split('\t').collect();
  429. if parts.len() != 11 {
  430. return Err(anyhow::anyhow!(
  431. "Invalid number of columns at line {} (expected 11, got {})",
  432. index + 1,
  433. parts.len()
  434. ));
  435. }
  436. // Helper closure for parsing with context
  437. let parse_field = |field: &str, name: &str, line_num: usize| -> anyhow::Result<f64> {
  438. field
  439. .parse()
  440. .context(format!("Failed to parse {name} at line {line_num}"))
  441. };
  442. let parse_field_u32 =
  443. |field: &str, name: &str, line_num: usize| -> anyhow::Result<u32> {
  444. field
  445. .parse()
  446. .context(format!("Failed to parse {name} at line {line_num}"))
  447. };
  448. let line_num = index + 1;
  449. let segment = CNSegment {
  450. chromosome: parts[0].to_string(),
  451. start: parse_field_u32(parts[1], "start", line_num)? as u64,
  452. end: parse_field_u32(parts[2], "end", line_num)? as u64,
  453. segment_id: parts[3].to_string(),
  454. bin_count: parse_field_u32(parts[4], "bin_count", line_num)?,
  455. sum_of_bin_lengths: parse_field_u32(parts[5], "sum_of_bin_lengths", line_num)?
  456. as u64,
  457. weight: parse_field(parts[6], "weight", line_num)?,
  458. copy_number: parse_field(parts[7], "copy_number", line_num)?,
  459. minor_allele_copy_number: if parts[8].is_empty() {
  460. None
  461. } else {
  462. Some(parse_field(parts[8], "minor_allele_copy_number", line_num)?)
  463. },
  464. mean_baf: if parts[9].is_empty() {
  465. None
  466. } else {
  467. Some(parse_field(parts[9], "mean_baf", line_num)?)
  468. },
  469. no_het_snps: if parts[10].is_empty() {
  470. 0
  471. } else {
  472. parse_field_u32(parts[10], "no_het_snps", line_num)?
  473. },
  474. };
  475. segments.push(segment);
  476. }
  477. Ok(Self { segments })
  478. }
  479. }