deep_somatic.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. //! # DeepSomatic Somatic Variant Calling Pipeline
  2. //!
  3. //! This module provides a pipeline runner for [DeepSomatic](https://github.com/google/deepsomatic),
  4. //! a deep learning-based somatic variant caller for paired tumor-normal samples.
  5. //!
  6. //! ## Overview
  7. //!
  8. //! DeepSomatic performs somatic variant calling using:
  9. //!
  10. //! - CNN-based variant classification (derived from DeepVariant)
  11. //! - Paired tumor-normal analysis
  12. //! - Containerized execution via Singularity
  13. //!
  14. //! ## Key Features
  15. //!
  16. //! - **Deep learning-based** - CNN models specifically trained for somatic variant detection
  17. //! - **Tumor-normal paired** - Leverages matched control for improved specificity
  18. //! - **Platform flexibility** - Supports ONT, PacBio, and Illumina
  19. //! - **Parallel execution** - Genome chunking for HPC scalability
  20. //!
  21. //! ## Requirements
  22. //!
  23. //! Before running DeepSomatic, ensure:
  24. //! - Tumor and normal BAMs are indexed (`.bai` files present)
  25. //! - Reference genome is accessible
  26. //! - Singularity/Docker image is available
  27. //! - Model type matches sequencing platform (`config.deepsomatic_model_type`)
  28. //!
  29. //! ## Execution Modes
  30. //!
  31. //! Execution mode is automatically selected via `config.slurm_runner`:
  32. //!
  33. //! - **Local** — Single-node execution
  34. //! - **Slurm** — HPC job submission
  35. //!
  36. //! Both modes support chunked parallel execution via [`run_deepsomatic_chunked_with_merge`].
  37. //!
  38. //! ## Output Files
  39. //!
  40. //! PASS-filtered somatic variants:
  41. //! ```text
  42. //! {result_dir}/{id}/deepsomatic/{id}_{tumoral_name}_DeepSomatic.pass.vcf.gz
  43. //! ```
  44. //!
  45. //! ## Usage
  46. //!
  47. //! ### Chunked Parallel Execution (Recommended for WGS)
  48. //!
  49. //! ```ignore
  50. //! use pandora_lib_promethion::callers::deep_somatic::run_deepsomatic_chunked_with_merge;
  51. //! use pandora_lib_promethion::config::Config;
  52. //!
  53. //! let config = Config::default();
  54. //! // Run DeepSomatic in 30 parallel chunks
  55. //! let outputs = run_deepsomatic_chunked_with_merge("sample_001", &config, 30)?;
  56. //! # Ok::<(), anyhow::Error>(())
  57. //! ```
  58. //!
  59. //! ### Single-Run Execution
  60. //!
  61. //! ```ignore
  62. //! use pandora_lib_promethion::callers::deep_somatic::DeepSomatic;
  63. //! use pandora_lib_promethion::config::Config;
  64. //! use pandora_lib_promethion::pipes::Initialize;
  65. //! use pandora_lib_promethion::runners::Run;
  66. //!
  67. //! let config = Config::default();
  68. //! let mut caller = DeepSomatic::initialize("sample_001", &config)?;
  69. //!
  70. //! if caller.should_run() {
  71. //! caller.run()?;
  72. //! }
  73. //!
  74. //! // Load somatic variants
  75. //! let variants = caller.variants(&annotations)?;
  76. //! println!("Found {} somatic variants", variants.variants.len());
  77. //! # Ok::<(), anyhow::Error>(())
  78. //! ```
  79. //!
  80. //! ## References
  81. //!
  82. //! - [DeepSomatic GitHub repository](https://github.com/google/deepsomatic)
  83. use std::{
  84. fmt, fs,
  85. path::{Path, PathBuf},
  86. process::{Command as ProcessCommand, Stdio},
  87. };
  88. use anyhow::Context;
  89. use log::{debug, info};
  90. use rayon::prelude::*;
  91. use regex::Regex;
  92. use rust_htslib::bam::{self, Read};
  93. use uuid::Uuid;
  94. use crate::{
  95. annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
  96. collection::vcf::Vcf,
  97. commands::{
  98. bcftools::{BcftoolsConcat, BcftoolsIndex, BcftoolsKeepPass},
  99. Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
  100. SlurmRunner,
  101. },
  102. config::Config,
  103. helpers::{
  104. get_genome_sizes_in_header_order, is_file_older, remove_dir_if_exists,
  105. singularity_bind_flags, split_ordered_genome_into_n_regions_exact,
  106. },
  107. io::vcf::read_vcf,
  108. locker::SampleLock,
  109. pipes::{Initialize, ShouldRun, Version},
  110. run, run_many,
  111. runners::Run,
  112. variant::{
  113. variant_collection::VariantCollection,
  114. vcf_variant::{Label, Variants},
  115. },
  116. };
  117. /// DeepSomatic paired tumor-normal somatic variant caller.
  118. ///
  119. /// Executes DeepSomatic for somatic SNV and indel detection using matched
  120. /// tumor and normal BAM files with automatic post-processing (PASS filtering).
  121. ///
  122. /// # Fields
  123. ///
  124. /// - `id` - Sample identifier (e.g., "34528")
  125. /// - `regions` - Space-separated list of genomic regions to process (e.g., "chr1 chr2 chr3")
  126. /// - `log_dir` - Directory for execution logs (e.g., "{result_dir}/{id}/log/deepsomatic")
  127. /// - `config` - Global pipeline configuration
  128. /// - `part_index` - Optional chunk index for parallel execution (e.g., `Some(3)` for part 3 of N)
  129. ///
  130. /// # Execution Modes
  131. ///
  132. /// - **Local** - Direct execution
  133. /// - **Slurm** - Single job submission
  134. /// - **Chunked** - Parallel genome-wide execution via [`run_deepsomatic_chunked_with_merge`]
  135. #[derive(Debug, Clone)]
  136. pub struct DeepSomatic {
  137. /// Sample identifier
  138. pub id: String,
  139. /// Space-separated list of genomic regions to process
  140. pub regions: String,
  141. /// Directory for log file storage
  142. pub log_dir: String,
  143. /// Global pipeline configuration
  144. pub config: Config,
  145. /// Optional part index for chunked parallel runs (1-indexed)
  146. pub part_index: Option<usize>,
  147. }
  148. impl fmt::Display for DeepSomatic {
  149. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  150. writeln!(f, "🧬 DeepSomatic")?;
  151. writeln!(f, " Case ID : {}", self.id)?;
  152. writeln!(f, " Regions : {}", self.regions)?;
  153. writeln!(f, " Log dir : {}", self.log_dir)?;
  154. writeln!(
  155. f,
  156. " Part : {}",
  157. self.part_index
  158. .map_or("full".into(), |n| format!("part{n}"))
  159. )
  160. }
  161. }
  162. impl Initialize for DeepSomatic {
  163. fn initialize(id: &str, config: &Config) -> anyhow::Result<Self> {
  164. let id = id.to_string();
  165. info!("Initializing DeepSomatic for {id}.");
  166. let log_dir = format!("{}/{}/log/deepsomatic", config.result_dir, &id);
  167. let regions = (1..=22)
  168. .map(|i| format!("chr{i}"))
  169. .chain(["chrX", "chrY", "chrM"].into_iter().map(String::from))
  170. .collect::<Vec<String>>();
  171. let deep_somatic = Self {
  172. id,
  173. regions: regions.join(" "),
  174. log_dir,
  175. config: config.clone(),
  176. part_index: None,
  177. };
  178. if deep_somatic.config.deepsomatic_force {
  179. remove_dir_if_exists(&deep_somatic.config.deepsomatic_output_dir(&deep_somatic.id))?;
  180. }
  181. Ok(deep_somatic)
  182. }
  183. }
  184. impl ShouldRun for DeepSomatic {
  185. fn should_run(&self) -> bool {
  186. let passed_vcf = &self.config.deepsomatic_passed_vcf(&self.id);
  187. let normal_older =
  188. is_file_older(passed_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true);
  189. let tumor_older =
  190. is_file_older(passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true);
  191. let result = normal_older || tumor_older;
  192. if result {
  193. info!("DeepSomatic should run for id: {}.", self.id);
  194. }
  195. result
  196. }
  197. }
  198. impl JobCommand for DeepSomatic {
  199. fn init(&mut self) -> anyhow::Result<()> {
  200. let output_dir = self.part_output_dir();
  201. fs::create_dir_all(&output_dir)
  202. .with_context(|| format!("Failed to create dir: {output_dir}"))?;
  203. fs::create_dir_all(&self.log_dir)
  204. .with_context(|| format!("Failed to create dir: {}", self.log_dir))?;
  205. Ok(())
  206. }
  207. fn cmd(&self) -> String {
  208. let output_dir = self.part_output_dir();
  209. let output_vcf_path = self.output_vcf_path();
  210. let sample_name_tumor = format!("{}_{}", self.id, self.config.tumoral_name);
  211. let sample_name_normal = format!("{}_{}", self.id, self.config.normal_name);
  212. let log_subdir = match self.part_index {
  213. Some(idx) => format!("{}_DeepSomatic_part{idx}_logs", sample_name_tumor),
  214. None => format!("{}_DeepSomatic_logs", sample_name_tumor),
  215. };
  216. let bind_flags = singularity_bind_flags([
  217. &self.config.normal_bam(&self.id),
  218. &self.config.tumoral_bam(&self.id),
  219. &self.config.reference,
  220. &output_dir,
  221. &self.log_dir,
  222. &self.config.tmp_dir,
  223. ]);
  224. let output_vcf = format!(
  225. "/output/{}",
  226. Path::new(&output_vcf_path)
  227. .file_name()
  228. .unwrap()
  229. .to_string_lossy()
  230. );
  231. format!(
  232. "{singularity_bin} exec --cleanenv \
  233. {binds} \
  234. --bind {output_dir}:/output \
  235. {image} \
  236. /opt/deepvariant/bin/deepsomatic/run_deepsomatic \
  237. --model_type={model_type} \
  238. --ref={reference} \
  239. --reads_normal={normal_bam} \
  240. --reads_tumor={tumor_bam} \
  241. --regions=\"{regions}\" \
  242. --output_vcf={output_vcf} \
  243. --num_shards={threads} \
  244. --logging_dir=/output/{log_subdir} \
  245. --vcf_stats_report=true \
  246. --dry_run=false \
  247. --sample_name_tumor={sample_name_tumor} \
  248. --sample_name_normal={sample_name_normal}",
  249. singularity_bin = self.config.singularity_bin,
  250. binds = bind_flags,
  251. output_dir = output_dir,
  252. image = self.config.deepsomatic_image,
  253. model_type = self.config.deepsomatic_model_type,
  254. reference = self.config.reference,
  255. normal_bam = self.config.normal_bam(&self.id),
  256. tumor_bam = self.config.tumoral_bam(&self.id),
  257. regions = self.regions,
  258. threads = self.config.deepsomatic_threads,
  259. )
  260. }
  261. }
  262. impl LocalRunner for DeepSomatic {}
  263. impl LocalBatchRunner for DeepSomatic {}
  264. impl SlurmRunner for DeepSomatic {
  265. fn slurm_args(&self) -> Vec<String> {
  266. let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
  267. SlurmParams {
  268. job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)),
  269. cpus_per_task: Some(self.config.deepsomatic_threads as u32),
  270. mem: Some("60G".into()),
  271. partition: Some("shortq".into()),
  272. gres: None,
  273. }
  274. .to_args()
  275. }
  276. }
  277. impl SbatchRunner for DeepSomatic {
  278. fn slurm_params(&self) -> SlurmParams {
  279. let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
  280. SlurmParams {
  281. job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)),
  282. cpus_per_task: Some(self.config.deepsomatic_threads as u32),
  283. mem: Some("60G".into()),
  284. partition: Some("shortq".into()),
  285. gres: None,
  286. }
  287. }
  288. }
  289. impl DeepSomatic {
  290. fn part_output_dir(&self) -> String {
  291. let base_dir = self.config.deepsomatic_output_dir(&self.id);
  292. match self.part_index {
  293. Some(idx) => format!("{base_dir}/part{idx}"),
  294. None => base_dir,
  295. }
  296. }
  297. fn output_vcf_path(&self) -> String {
  298. let output_dir = self.part_output_dir();
  299. format!(
  300. "{output_dir}/{}_{}_DeepSomatic.vcf.gz",
  301. self.id, self.config.tumoral_name
  302. )
  303. }
  304. fn passed_vcf_path(&self) -> String {
  305. match self.part_index {
  306. Some(_) => self.output_vcf_path().replace(".vcf.gz", ".pass.vcf.gz"),
  307. None => self.config.deepsomatic_passed_vcf(&self.id),
  308. }
  309. }
  310. fn filter_pass(&self) -> anyhow::Result<()> {
  311. let output_vcf = self.output_vcf_path();
  312. let vcf_passed = self.passed_vcf_path();
  313. if Path::new(&vcf_passed).exists() {
  314. debug!("PASS VCF already exists: {vcf_passed}");
  315. return Ok(());
  316. }
  317. info!(
  318. "Filtering DeepSomatic PASS variants for {} (part: {:?})",
  319. self.id, self.part_index
  320. );
  321. let mut cmd = BcftoolsKeepPass::from_config(&self.config, output_vcf, &vcf_passed);
  322. let report = run!(&self.config, &mut cmd)
  323. .with_context(|| format!("Failed to filter PASS for {}", self.id))?;
  324. report
  325. .save_to_file(format!("{}/bcftools_pass_", self.log_dir))
  326. .context("Can't save bcftools PASS logs")?;
  327. let mut cmd = BcftoolsIndex::from_config(&self.config, vcf_passed);
  328. let report = run!(&self.config, &mut cmd)
  329. .with_context(|| format!("Failed to index vcf PASS for {}", self.id))?;
  330. report
  331. .save_to_file(format!("{}/bcftools_index_", self.log_dir))
  332. .context("Failed to save bcftools index logs")?;
  333. Ok(())
  334. }
  335. }
  336. impl Run for DeepSomatic {
  337. fn run(&mut self) -> anyhow::Result<()> {
  338. let lock_dir = format!("{}/locks", self.config.result_dir);
  339. let _lock = SampleLock::acquire(&lock_dir, &self.id, "deepsomatic")
  340. .with_context(|| format!("Cannot start DeepSomatic chunked for {}", self.id))?;
  341. if !self.should_run() {
  342. info!("DeepSomatic is up-to-data.");
  343. return Ok(());
  344. }
  345. if self.config.slurm_runner {
  346. run_deepsomatic_chunked(&self.id, &self.config, 30)
  347. } else {
  348. run!(&self.config, self)?;
  349. self.filter_pass()
  350. }
  351. }
  352. }
  353. impl CallerCat for DeepSomatic {
  354. fn caller_cat(&self) -> Annotation {
  355. Annotation::Callers(Caller::DeepSomatic, Sample::Somatic)
  356. }
  357. }
  358. impl Variants for DeepSomatic {
  359. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  360. let caller = self.caller_cat();
  361. let vcf_passed = self.config.deepsomatic_passed_vcf(&self.id);
  362. info!("Loading variants from {caller}: {vcf_passed}");
  363. let variants = read_vcf(&vcf_passed)
  364. .with_context(|| format!("Failed to read DeepSomatic VCF {vcf_passed}"))?;
  365. variants.par_iter().for_each(|v| {
  366. annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
  367. });
  368. info!("{caller}: {} variants loaded", variants.len());
  369. Ok(VariantCollection {
  370. variants,
  371. vcf: Vcf::new(vcf_passed.into())?,
  372. caller,
  373. })
  374. }
  375. }
  376. impl Label for DeepSomatic {
  377. fn label(&self) -> String {
  378. self.caller_cat().to_string()
  379. }
  380. }
  381. impl Version for DeepSomatic {
  382. fn version(config: &Config) -> anyhow::Result<String> {
  383. let out = ProcessCommand::new("bash")
  384. .arg("-c")
  385. .arg(format!(
  386. "{} exec {} /opt/deepvariant/bin/deepsomatic/run_deepsomatic --version",
  387. config.singularity_bin, config.deepsomatic_image
  388. ))
  389. .stdout(Stdio::piped())
  390. .stderr(Stdio::piped())
  391. .output()
  392. .context("Failed to spawn Singularity process")?;
  393. if !out.status.success() {
  394. let combined = format!(
  395. "{}{}",
  396. String::from_utf8_lossy(&out.stdout),
  397. String::from_utf8_lossy(&out.stderr)
  398. );
  399. anyhow::bail!(
  400. "Singularity exec failed with status {}: {}",
  401. out.status,
  402. combined
  403. );
  404. }
  405. let combined = format!(
  406. "{}{}",
  407. String::from_utf8_lossy(&out.stdout),
  408. String::from_utf8_lossy(&out.stderr)
  409. );
  410. parse_deepsomatic_version(&combined)
  411. }
  412. fn version_slurm(config: &Config) -> anyhow::Result<String> {
  413. struct DeepSomaticVersionJob<'a> {
  414. config: &'a Config,
  415. }
  416. impl JobCommand for DeepSomaticVersionJob<'_> {
  417. fn cmd(&self) -> String {
  418. format!(
  419. "{} exec {} /opt/deepvariant/bin/deepsomatic/run_deepsomatic --version",
  420. self.config.singularity_bin, self.config.deepsomatic_image
  421. )
  422. }
  423. }
  424. impl SlurmRunner for DeepSomaticVersionJob<'_> {
  425. fn slurm_args(&self) -> Vec<String> {
  426. SlurmParams {
  427. job_name: Some("deepsomatic_version".into()),
  428. partition: Some("shortq".into()),
  429. cpus_per_task: Some(1),
  430. mem: Some("10G".into()),
  431. gres: None,
  432. }
  433. .to_args()
  434. }
  435. }
  436. let mut job = DeepSomaticVersionJob { config };
  437. let out = crate::commands::SlurmRunner::exec(&mut job)
  438. .context("Failed to run DeepSomatic --version via Slurm")?;
  439. let mut combined = out.stdout.clone();
  440. if let Some(epilog) = &out.slurm_epilog {
  441. combined.push_str(&epilog.to_string());
  442. }
  443. combined.push_str(&out.stderr);
  444. parse_deepsomatic_version(&combined)
  445. }
  446. }
  447. fn parse_deepsomatic_version(output: &str) -> anyhow::Result<String> {
  448. let deepsomatic_re = Regex::new(r"(?mi)\bdeepsomatic(?:\s+version)?\s*[:=]?\s*([^\s]+)")
  449. .expect("DeepSomatic version regex is valid");
  450. if let Some(caps) = deepsomatic_re.captures(output) {
  451. return Ok(caps
  452. .get(1)
  453. .expect("Regex has capture group 1")
  454. .as_str()
  455. .to_string());
  456. }
  457. // Fallback for containers that only expose the embedded DeepVariant version.
  458. let deepvariant_re =
  459. Regex::new(r"(?m)DeepVariant version\s+([^\s]+)").expect("DeepVariant regex is valid");
  460. let caps = deepvariant_re
  461. .captures(output)
  462. .context("Could not parse DeepSomatic version from output")?;
  463. Ok(caps
  464. .get(1)
  465. .expect("Regex has capture group 1")
  466. .as_str()
  467. .to_string())
  468. }
  469. fn merge_deepsomatic_parts(base: &DeepSomatic, n_parts: usize) -> anyhow::Result<()> {
  470. let mut part_pass_paths: Vec<PathBuf> = Vec::with_capacity(n_parts);
  471. for i in 1..=n_parts {
  472. let mut ds = base.clone();
  473. ds.part_index = Some(i);
  474. let part_pass = ds.passed_vcf_path();
  475. anyhow::ensure!(
  476. Path::new(&part_pass).exists(),
  477. "Missing DeepSomatic part {i} PASS VCF: {part_pass}"
  478. );
  479. part_pass_paths.push(PathBuf::from(part_pass));
  480. }
  481. let final_passed_vcf = base.config.deepsomatic_passed_vcf(&base.id);
  482. let rand = Uuid::new_v4();
  483. let final_tmp = format!("{final_passed_vcf}_{rand}.tmp");
  484. let final_tmp_csi = format!("{final_passed_vcf}_{rand}.tmp.csi");
  485. if let Some(parent) = Path::new(&final_passed_vcf).parent() {
  486. fs::create_dir_all(parent)?;
  487. }
  488. info!(
  489. "Concatenating {} DeepSomatic part VCFs into {}",
  490. n_parts, final_passed_vcf
  491. );
  492. let mut concat = BcftoolsConcat::from_config(&base.config, part_pass_paths, &final_tmp);
  493. run!(&base.config, &mut concat)
  494. .context("Failed to run bcftools concat for DeepSomatic parts")?;
  495. fs::rename(&final_tmp, &final_passed_vcf)
  496. .context("Failed to rename merged DeepSomatic PASS VCF")?;
  497. fs::rename(&final_tmp_csi, format!("{final_passed_vcf}.csi"))
  498. .context("Failed to rename merged DeepVariant PASS VCF CSI")?;
  499. info!(
  500. "Successfully merged {} DeepSomatic parts into {}",
  501. n_parts, final_passed_vcf
  502. );
  503. Ok(())
  504. }
  505. /// Runs DeepSomatic in parallel chunks, then merges results.
  506. ///
  507. /// Splits the genome into N equal-sized regions, runs DeepSomatic on each region
  508. /// in parallel (local or Slurm based on `config.slurm_runner`), filters PASS variants,
  509. /// and concatenates the final VCF.
  510. ///
  511. /// # Arguments
  512. ///
  513. /// * `id` - Sample identifier
  514. /// * `config` - Global pipeline configuration
  515. /// * `n_parts` - Number of parallel chunks (typically 20-30 for whole-genome)
  516. ///
  517. /// # Returns
  518. ///
  519. /// `Ok(())` on success, or an error if any step fails.
  520. ///
  521. /// # Errors
  522. ///
  523. /// Returns an error if:
  524. /// - `n_parts` is 0
  525. /// - Tumor or normal BAM file cannot be opened
  526. /// - BAM header is malformed
  527. /// - DeepSomatic execution fails on any part
  528. /// - PASS filtering fails
  529. /// - VCF merging fails
  530. /// - Output directory cannot be created
  531. ///
  532. /// # Example
  533. ///
  534. /// ```ignore
  535. /// let config = Config::default();
  536. /// run_deepsomatic_chunked("sample_001", &config, 30)?;
  537. /// ```
  538. pub fn run_deepsomatic_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::Result<()> {
  539. anyhow::ensure!(n_parts > 0, "n_parts must be > 0");
  540. // Lock at the chunked level (caller may have already locked, but this is idempotent protection)
  541. // let lock_dir = format!("{}/locks", config.result_dir);
  542. // let _lock = SampleLock::acquire(&lock_dir, id, "deepsomatic")
  543. // .with_context(|| format!("Cannot start DeepSomatic chunked for {}", id))?;
  544. let base = DeepSomatic::initialize(id, config)?;
  545. // if !base.should_run() {
  546. // debug!("DeepSomatic PASS VCF already up-to-date for {id}, skipping.");
  547. // return Ok(());
  548. // }
  549. // Get genome sizes from tumor BAM header
  550. let tumor_bam = config.tumoral_bam(id);
  551. let reader = bam::Reader::from_path(&tumor_bam)
  552. .with_context(|| format!("Failed to open BAM: {tumor_bam}"))?;
  553. let genome_sizes = get_genome_sizes_in_header_order(reader.header())?;
  554. let region_chunks = split_ordered_genome_into_n_regions_exact(&genome_sizes, n_parts)
  555. .into_iter()
  556. .map(|v| v.join(" "))
  557. .collect::<Vec<String>>();
  558. let actual_n_parts = region_chunks.len();
  559. info!(
  560. "Running DeepSomatic in {} parallel parts for {}",
  561. actual_n_parts, id
  562. );
  563. let mut jobs = Vec::with_capacity(actual_n_parts);
  564. for (i, regions) in region_chunks.into_iter().enumerate() {
  565. let mut job = base.clone();
  566. job.regions = regions;
  567. job.part_index = Some(i + 1);
  568. job.log_dir = format!("{}/part{}", base.log_dir, i + 1);
  569. info!("Planned DeepSomatic job:\n{job}");
  570. jobs.push(job);
  571. }
  572. // Run DeepSomatic jobs
  573. let outputs = run_many!(config, jobs.clone())?;
  574. for output in outputs.iter() {
  575. output.save_to_file(format!("{}/deepsomatic_", base.log_dir))?;
  576. }
  577. // Filter PASS variants for each part in parallel
  578. info!(
  579. "Filtering PASS variants for all {} parts in parallel",
  580. actual_n_parts
  581. );
  582. let filter_jobs: Vec<_> = jobs
  583. .iter()
  584. .map(|job| {
  585. BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
  586. })
  587. .collect();
  588. run_many!(config, filter_jobs)?;
  589. // Merge PASS VCFs
  590. merge_deepsomatic_parts(&base, actual_n_parts)?;
  591. info!(
  592. "DeepSomatic completed for {}: {} parts merged",
  593. id, actual_n_parts
  594. );
  595. Ok(())
  596. }
  597. #[cfg(test)]
  598. mod tests {
  599. use super::*;
  600. use crate::helpers::test_init;
  601. #[test]
  602. fn deepsomatic_version() -> anyhow::Result<()> {
  603. test_init();
  604. let vl = DeepSomatic::version(&Config::default())?;
  605. info!("DeepSomatic local version: {vl}");
  606. let vs = DeepSomatic::version_slurm(&Config::default())?;
  607. info!("DeepSomatic slurm version: {vs}");
  608. assert_eq!(vl, vs);
  609. Ok(())
  610. }
  611. #[test]
  612. fn deepsomatic_run() -> anyhow::Result<()> {
  613. test_init();
  614. let config = Config::default();
  615. run_deepsomatic_chunked("DUMCO", &config, 50)
  616. }
  617. }