prom_run.rs 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767
  1. //! # PromRun Module
  2. //!
  3. //! Import, validation, and processing pipeline for Oxford Nanopore PromethION sequencing runs.
  4. //!
  5. //! This module provides the [`PromRun`] struct which orchestrates the complete workflow from
  6. //! raw MinKNOW output ingestion through alignment, sorting, and per-case BAM finalization.
  7. //!
  8. //! ## Supported Kit Types
  9. //!
  10. //! - **Multiplexed** (`SQK-NBD114-24`, `SQK-NBD112`, `RBK`): BAMs demultiplexed by barcode
  11. //! - **Non-multiplexed** (`SQK-LSK114`, `SQK-LSK109`): Single-sample runs
  12. //!
  13. //! ## Pipeline Stages
  14. //!
  15. //! 1. **Import** — Recursive directory scan for BAM, POD5, sample sheets, pore activity logs
  16. //! 2. **Validation** — Verify BAMs are unaligned (no reference sequences in header)
  17. //! 3. **Case mapping** — Associate BAMs with cases via barcode extraction or single-case assignment
  18. //! 4. **Alignment** — Parallel Dorado alignment against reference genome
  19. //! 5. **Sort/Index** — Coordinate sort and index via samtools (Slurm batch)
  20. //! 6. **Finalize** — Merge chunks per case, atomic replacement of existing outputs
  21. //!
  22. //! ## Directory Structure
  23. //!
  24. //! Files are discovered recursively; typical MinKNOW layouts:
  25. //!
  26. //! ```text
  27. //! run_dir/
  28. //! ├── sample_sheet_*.csv # Required
  29. //! ├── pore_activity_*.csv # Optional
  30. //! ├── pod5_pass/barcode*/*.pod5 # Multiplexed
  31. //! ├── bam_pass/barcode*/*.bam # Multiplexed
  32. //! ├── pod5/*.pod5 # Non-multiplexed
  33. //! └── bam/*.bam # Non-multiplexed
  34. //! ```
  35. //!
  36. //! ## Example
  37. //!
  38. //! ```rust,ignore
  39. //! use crate::config::Config;
  40. //! use crate::collection::promrun::PromRun;
  41. //!
  42. //! let config = Config::default();
  43. //!
  44. //! // Import and process
  45. //! let mut run = PromRun::from_dir("/data/prom/20240101_run", &config)?;
  46. //! run.cases.push(IdInput { case_id: "CASE001".into(), sample_type: "TUM".into(), barcode: "barcode01".into() });
  47. //! run.process_bams(&config)?;
  48. //!
  49. //! // Or load cached
  50. //! let run = PromRun::open("protocol_run_id", &config)?;
  51. //! ```
  52. use std::{
  53. collections::{BTreeMap, HashMap},
  54. fmt,
  55. fs::{self, File},
  56. io::{BufReader, BufWriter},
  57. path::{Path, PathBuf},
  58. time::{Duration, Instant},
  59. };
  60. use anyhow::{bail, Context};
  61. use chrono::{DateTime, Utc};
  62. use log::{info, warn};
  63. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  64. use rust_htslib::bam::{self, Read};
  65. use rustc_hash::FxHashSet;
  66. use serde::{Deserialize, Serialize};
  67. use uuid::Uuid;
  68. use crate::{
  69. collection::{
  70. bam_stats::WGSBamStats,
  71. flowcells::IdInput,
  72. minknow::{parse_pore_activity_from_reader, MinKnowSampleSheet, PoreStateEntry},
  73. pod5::Pod5,
  74. },
  75. commands::{
  76. dorado::DoradoAlign,
  77. modkit::ModkitSummary,
  78. samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort},
  79. },
  80. config::Config,
  81. helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
  82. io::bam::read_sm_tag_or_inject,
  83. locker::SampleLock,
  84. pipes::InitializeSolo,
  85. run, run_many,
  86. };
  87. /// Represent a complete ONT PromethION sequencing run with all associated files.
  88. ///
  89. /// Represents an imported run directory containing BAM files, POD5 raw signal
  90. /// files, sample sheets, and pore activity logs. Provides JSON serialization
  91. /// for caching parsed metadata.
  92. ///
  93. /// # Directory Structure
  94. ///
  95. /// The importer recursively scans for files, supporting various MinKNOW output layouts:
  96. ///
  97. /// ```text
  98. /// run_dir/
  99. /// ├── sample_sheet_*.csv # Required: MinKNOW sample sheet
  100. /// ├── pore_activity_*.csv # Optional: pore state logs
  101. /// │
  102. /// │ # POD5 files (any structure supported):
  103. /// ├── pod5/ # Non-multiplexed runs
  104. /// │ └── *.pod5
  105. /// ├── pod5_pass/ # Multiplexed runs (pass reads)
  106. /// │ └── barcode*/
  107. /// │ └── *.pod5
  108. /// ├── pod5_fail/ # Failed reads (optional)
  109. /// │ └── *.pod5
  110. /// │
  111. /// │ # BAM files (any structure supported):
  112. /// ├── bam/ # Non-multiplexed runs
  113. /// │ └── *.bam
  114. /// └── bam_pass/ # Multiplexed runs
  115. /// └── barcode*/
  116. /// └── *.bam
  117. /// ```
  118. ///
  119. /// Files are discovered recursively regardless of subdirectory naming.
  120. ///
  121. /// # Example
  122. ///
  123. /// ```rust,ignore
  124. /// let config = Config::default();
  125. ///
  126. /// // Import a new run
  127. /// let run = PromRun::from_dir("/data/runs/20240101_run", &config)?;
  128. ///
  129. /// // Load a cached run
  130. /// let run = PromRun::open("protocol_run_id_here", &config)?;
  131. ///
  132. /// println!("Flowcell: {}", run.flow_cell_id);
  133. /// println!("BAM files: {}", run.bams.len());
  134. /// println!("POD5 files: {}", run.pod5s.len());
  135. /// ```
  136. #[derive(Debug, Clone, Serialize, Deserialize)]
  137. pub struct PromRun {
  138. /// Root directory of the sequencing run.
  139. pub dir: PathBuf,
  140. /// Timestamp when this run was imported/parsed.
  141. pub import_date: DateTime<Utc>,
  142. /// Unique identifier for the protocol run (from sample sheet).
  143. pub protocol_run_id: String,
  144. /// Flowcell position identifier (e.g., device slot or port).
  145. pub position_id: String,
  146. /// Flowcell barcode/identifier (e.g., `FAB12345`).
  147. pub flow_cell_id: String,
  148. /// Sample ID from the sample sheet.
  149. pub sample_id: String,
  150. /// Experiment ID from the sample sheet.
  151. pub experiment_id: String,
  152. /// Flowcell product code (e.g., `FLO-MIN106`, `FLO-PRO002`).
  153. pub flow_cell_product_code: String,
  154. /// Library preparation kit identifier (e.g., `SQK-LSK114`).
  155. pub kit: String,
  156. /// Associated case identifiers for clinical/research tracking.
  157. pub cases: Vec<IdInput>,
  158. /// All POD5 raw signal files found in the run directory.
  159. pub pod5s: Vec<Pod5>,
  160. /// All BAM files found in the run directory.
  161. pub bams: Vec<PromBam>,
  162. /// Pore activity/state log entries, if available.
  163. pub pore_activity: Option<Vec<PoreStateEntry>>,
  164. }
  165. impl PromRun {
  166. /// Imports a sequencing run from a directory.
  167. ///
  168. /// Recursively scans the directory for BAM files, POD5 files, sample sheets,
  169. /// and pore activity logs. Parses all files in parallel and caches the result.
  170. ///
  171. /// # Arguments
  172. ///
  173. /// * `dir` - Path to the run directory (must exist and be a directory)
  174. /// * `config` - Application configuration (provides thread count and cache paths)
  175. ///
  176. /// # Returns
  177. ///
  178. /// Returns `Ok(PromRun)` on success, or an error if:
  179. /// - `dir` is not a directory
  180. /// - No sample sheet (`sample_sheet_*.csv`) is found
  181. /// - Sample sheet parsing fails
  182. /// - Thread pool creation fails
  183. ///
  184. ///
  185. /// # Performance
  186. ///
  187. /// BAM and POD5 file parsing is parallelized using Rayon with the thread
  188. /// count from `config.threads`.
  189. ///
  190. /// # Example
  191. ///
  192. /// ```rust,ignore
  193. /// let config = Config::default();
  194. /// let run = PromRun::from_dir("/data/runs/my_run", &config)?;
  195. /// println!("Imported {} BAM files", run.bams.len());
  196. /// ```
  197. pub fn from_dir(dir: impl AsRef<Path>) -> anyhow::Result<Self> {
  198. let dir = dir.as_ref().to_path_buf();
  199. if !dir.is_dir() {
  200. anyhow::bail!(
  201. "Failed to import Run: input path is not a directory: {}",
  202. dir.display()
  203. );
  204. }
  205. // Collect file paths by type
  206. let mut bam_paths = Vec::new();
  207. let mut sample_sheet_path = None;
  208. let mut pod5_paths = Vec::new();
  209. let mut pore_activity_path = None;
  210. for file in list_files_recursive(&dir) {
  211. let name = file.file_name().and_then(|s| s.to_str()).unwrap_or("");
  212. let ext = file.extension().and_then(|e| e.to_str()).unwrap_or("");
  213. match ext {
  214. "bam" => bam_paths.push(file),
  215. "pod5" => pod5_paths.push(file),
  216. "csv" => {
  217. if name.starts_with("sample_sheet") {
  218. sample_sheet_path = Some(file);
  219. } else if name.starts_with("pore_activity") {
  220. pore_activity_path = Some(file);
  221. }
  222. }
  223. _ => {}
  224. }
  225. }
  226. // Parse required sample sheet
  227. let sample_sheet = sample_sheet_path
  228. .ok_or_else(|| anyhow::anyhow!("No sample_sheet_*.csv found in {}", dir.display()))
  229. .and_then(MinKnowSampleSheet::from_path)?;
  230. // Parse optional pore activity
  231. let pore_activity = pore_activity_path
  232. .and_then(|path| File::open(path).ok())
  233. .and_then(|mut reader| parse_pore_activity_from_reader(&mut reader).ok());
  234. // Parse BAM files in parallel
  235. let bams: Vec<PromBam> = bam_paths
  236. .par_iter()
  237. .filter_map(|p| match PromBam::from_path(p) {
  238. Ok(bam) => Some(bam),
  239. Err(e) => {
  240. log::warn!("Failed to parse BAM {}: {}", p.display(), e);
  241. None
  242. }
  243. })
  244. .collect();
  245. // Parse POD5 files in parallel
  246. let pod5s: Vec<Pod5> = pod5_paths
  247. .par_iter()
  248. .filter_map(|p| match Pod5::from_path(p) {
  249. Ok(pod5) => Some(pod5),
  250. Err(e) => {
  251. log::warn!("Failed to parse POD5 {}: {}", p.display(), e);
  252. None
  253. }
  254. })
  255. .collect();
  256. let prom_run = Self {
  257. dir,
  258. import_date: Utc::now(),
  259. protocol_run_id: sample_sheet.protocol_run_id,
  260. position_id: sample_sheet.position_id,
  261. flow_cell_id: sample_sheet.flow_cell_id,
  262. sample_id: sample_sheet.sample_id,
  263. experiment_id: sample_sheet.experiment_id,
  264. flow_cell_product_code: sample_sheet.flow_cell_product_code,
  265. kit: sample_sheet.kit,
  266. cases: Vec::new(),
  267. pod5s,
  268. bams,
  269. pore_activity,
  270. };
  271. // Cache to disk
  272. // prom_run.save_json(prom_run.cache_path(config))?;
  273. Ok(prom_run)
  274. }
  275. /// Partitions BAMs into unaligned and already-aligned groups.
  276. ///
  277. /// Returns (unaligned, already_aligned) BAM lists.
  278. fn partition_bams_by_alignment<'a>(
  279. &self,
  280. pass_bams: &[&'a PromBam],
  281. ) -> (Vec<&'a PromBam>, Vec<&'a PromBam>) {
  282. let results: Vec<_> = pass_bams
  283. .par_iter()
  284. .map(|&bam| {
  285. let is_aligned = match bam::Reader::from_path(&bam.path) {
  286. Ok(reader) => {
  287. let header = bam::Header::from_template(reader.header());
  288. match get_genome_sizes(&header) {
  289. Ok(sizes) => !sizes.is_empty(),
  290. Err(_) => {
  291. warn!(
  292. "Failed to parse header for {}, assuming unaligned",
  293. bam.path.display()
  294. );
  295. false
  296. }
  297. }
  298. }
  299. Err(e) => {
  300. warn!("Failed to open BAM {}: {}", bam.path.display(), e);
  301. false
  302. }
  303. };
  304. (bam, is_aligned)
  305. })
  306. .collect();
  307. let mut unaligned = Vec::new();
  308. let mut aligned = Vec::new();
  309. for (bam, is_aligned) in results {
  310. if is_aligned {
  311. aligned.push(bam);
  312. } else {
  313. unaligned.push(bam);
  314. }
  315. }
  316. (unaligned, aligned)
  317. }
  318. /// Maps BAM files to cases based on kit type (multiplexed vs non-multiplexed).
  319. ///
  320. /// Returns a mapping from BAM path to case, plus a list of unmatched BAMs.
  321. fn map_bams_to_cases<'a>(
  322. &'a self,
  323. pass_bams: &[&PromBam],
  324. kit_type: KitType,
  325. ) -> anyhow::Result<(HashMap<PathBuf, &'a IdInput>, Vec<PathBuf>)> {
  326. let mut bam_to_case: HashMap<PathBuf, &IdInput> = HashMap::new();
  327. let mut unmatched: Vec<PathBuf> = Vec::new();
  328. match kit_type {
  329. KitType::NonMultiplexed => {
  330. if self.cases.len() != 1 {
  331. bail!("Non-multiplexed run requires exactly one case");
  332. }
  333. let single_case = self.cases.first().ok_or_else(|| {
  334. anyhow::anyhow!("Non-multiplexed run requires exactly one case")
  335. })?;
  336. for bam in pass_bams {
  337. bam_to_case.insert(bam.path.clone(), single_case);
  338. }
  339. }
  340. KitType::Multiplexed => {
  341. // Construct map(normalized barcode) -> IdInput
  342. let barcode_to_case: HashMap<String, &IdInput> = self
  343. .cases
  344. .iter()
  345. .filter(|c| !c.barcode.is_empty())
  346. .map(|c| (normalize_barcode(&c.barcode), c))
  347. .collect();
  348. for bam in pass_bams {
  349. let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or("");
  350. let barcode_opt = extract_and_normalize_barcode(filename).or_else(|| {
  351. bam.path
  352. .parent()
  353. .and_then(|p| p.file_name())
  354. .and_then(|n| n.to_str())
  355. .and_then(extract_and_normalize_barcode)
  356. });
  357. match barcode_opt.and_then(|bc| barcode_to_case.get(&bc)) {
  358. Some(case) => {
  359. bam_to_case.insert(bam.path.clone(), *case);
  360. }
  361. None => {
  362. unmatched.push(bam.path.clone());
  363. }
  364. }
  365. }
  366. }
  367. }
  368. Ok((bam_to_case, unmatched))
  369. }
  370. /// Process BAM files from a PromethION run: align, sort, merge, and index per case.
  371. ///
  372. /// This method handles both multiplexed (SQK-NBD114-24) and non-multiplexed (SQK-LSK114)
  373. /// sequencing runs. It skips basecalling and works directly with existing unaligned BAMs.
  374. ///
  375. /// # Workflow
  376. ///
  377. /// 1. **Validate cases** based on kit type (multiplexed vs non-multiplexed)
  378. /// 2. **Map BAMs to cases** using barcode extraction or single-case assignment
  379. /// 3. **Align BAMs in parallel** using Dorado
  380. /// 4. **Sort aligned chunks in parallel**
  381. /// 5. **Merge and finalize** per case with sorting and indexing
  382. ///
  383. /// # Kit Types
  384. ///
  385. /// - **SQK-NBD114-24** (multiplexed): BAMs in `bam_pass/barcodeXX/`, matched by barcode
  386. /// - **SQK-LSK114** (non-multiplexed): All BAMs in `bam_pass/` assigned to single case
  387. ///
  388. /// # Arguments
  389. ///
  390. /// * `config` - Application configuration with alignment settings and paths
  391. ///
  392. /// # Returns
  393. ///
  394. /// Returns `Ok(())` on success, or an error if validation or processing fails.
  395. ///
  396. /// # Errors
  397. ///
  398. /// This function will return an error if:
  399. /// - No cases are associated with the run
  400. /// - Non-multiplexed run has != 1 case
  401. /// - No BAMs are available for processing
  402. /// - BAM files are already aligned
  403. /// - External commands fail (Dorado, samtools, Slurm)
  404. pub fn process_bams(&self, config: &Config) -> anyhow::Result<()> {
  405. let lock_dir = format!("{}/locks", config.result_dir);
  406. let _lock = SampleLock::acquire(&lock_dir, &self.protocol_run_id, "process_bams")
  407. .with_context(|| format!("Cannot start Processing BAMs for {}", self.dir.display()))?;
  408. let pipeline_start = Timer::start();
  409. let mut metrics = PipelineMetrics::default();
  410. let tmp_dir = PathBuf::from(&config.tmp_dir);
  411. let mut temp_guard = TempFileGuard::new();
  412. // =====================================================================
  413. // Step 1: Validate preconditions
  414. // =====================================================================
  415. if self.cases.is_empty() {
  416. bail!(
  417. "Run {} has no associated cases. Add cases before processing.",
  418. self.protocol_run_id
  419. );
  420. }
  421. let kit_type = KitType::detect(&self.kit)?;
  422. if kit_type == KitType::NonMultiplexed && self.cases.len() != 1 {
  423. bail!(
  424. "Non-multiplexed run (kit: {}) requires exactly 1 case, found {}",
  425. self.kit,
  426. self.cases.len()
  427. );
  428. }
  429. info!(
  430. "Processing PromRun: {} (flowcell: {}, kit: {} [{:?}], {} BAMs, {} cases)",
  431. self.protocol_run_id,
  432. self.flow_cell_id,
  433. self.kit,
  434. kit_type,
  435. self.bams.len(),
  436. self.cases.len()
  437. );
  438. // =====================================================================
  439. // Step 2: Filter and validate BAMs
  440. // =====================================================================
  441. let validation_start = Timer::start();
  442. // Filter out BAMs already merged into final outputs
  443. let candidate_bams = self.filter_already_merged(config);
  444. if candidate_bams.is_empty() {
  445. info!(
  446. "🎉 Run {}: all BAMs already merged — nothing to do",
  447. self.protocol_run_id
  448. );
  449. return Ok(());
  450. }
  451. let pass_bams = filter_pass_bams(&candidate_bams, kit_type);
  452. if pass_bams.is_empty() {
  453. bail!("No BAM files found in bam_pass directories");
  454. }
  455. info!("Filtered to {} BAM files from bam_pass", pass_bams.len());
  456. info!("Step 1/5: Validating BAMs are unaligned");
  457. let (unaligned_bams, aligned_bams) = self.partition_bams_by_alignment(&pass_bams);
  458. if !aligned_bams.is_empty() {
  459. info!(
  460. "Found {} already-aligned BAMs — will skip alignment for these",
  461. aligned_bams.len()
  462. );
  463. }
  464. if unaligned_bams.is_empty() && aligned_bams.is_empty() {
  465. bail!("No valid BAM files to process");
  466. }
  467. metrics.validation_duration = validation_start.elapsed();
  468. metrics.bams_processed = pass_bams.len();
  469. metrics.bytes_input = pass_bams.iter().map(|b| b.bam_size).sum();
  470. info!(
  471. "✅ {} unaligned, {} already aligned",
  472. unaligned_bams.len(),
  473. aligned_bams.len()
  474. );
  475. // =====================================================================
  476. // Step 3: Map BAMs to cases
  477. // =====================================================================
  478. info!("Step 2/5: Mapping BAM files to cases");
  479. // Map unaligned BAMs (need alignment)
  480. let (unaligned_bam_to_case, unmatched_unaligned) =
  481. self.map_bams_to_cases(&unaligned_bams, kit_type)?;
  482. // Map aligned BAMs (skip alignment, go straight to sort/merge)
  483. let (aligned_bam_to_case, unmatched_aligned) =
  484. self.map_bams_to_cases(&aligned_bams, kit_type)?;
  485. let total_mapped = unaligned_bam_to_case.len() + aligned_bam_to_case.len();
  486. let total_unmatched = unmatched_unaligned.len() + unmatched_aligned.len();
  487. if total_mapped == 0 {
  488. bail!("No BAMs were successfully mapped to cases");
  489. }
  490. info!(
  491. "✅ Mapped {} BAMs to cases ({} unmatched)",
  492. total_mapped, total_unmatched
  493. );
  494. // =====================================================================
  495. // Step 4: Prepare and run alignment
  496. // =====================================================================
  497. let align_start = Timer::start();
  498. let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
  499. // Add already-aligned BAMs directly to the map (no alignment needed)
  500. for (bam_path, case) in &aligned_bam_to_case {
  501. let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
  502. let key = format!("{}_{}", case.case_id, sample_type_dir);
  503. case_bam_map.entry(key).or_default().push(bam_path.clone());
  504. }
  505. if !unaligned_bam_to_case.is_empty() {
  506. let (align_jobs, aligned_case_map) =
  507. prepare_alignment_jobs(&unaligned_bam_to_case, &tmp_dir, config)?;
  508. // Track temp files for cleanup
  509. for bams in aligned_case_map.values() {
  510. temp_guard.track_many(bams.clone());
  511. }
  512. info!("Step 3/5: Running {} alignment jobs", align_jobs.len());
  513. run_many!(config, align_jobs).context("Alignment batch failed")?;
  514. self.verify_outputs_exist(&aligned_case_map, "alignment")?;
  515. // Merge newly aligned BAMs into the case map
  516. for (key, bams) in aligned_case_map {
  517. case_bam_map.entry(key).or_default().extend(bams);
  518. }
  519. info!("✅ Alignments completed");
  520. } else {
  521. info!("Step 3/5: Skipping alignment — all BAMs already aligned");
  522. }
  523. metrics.alignment_duration = align_start.elapsed();
  524. // =====================================================================
  525. // Step 5: Sort and index chunks
  526. // =====================================================================
  527. info!("Step 4/5: Sorting and indexing chunks");
  528. let sort_start = Timer::start();
  529. let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?;
  530. // Update guard to track sorted files instead
  531. temp_guard.clear();
  532. for bams in sorted_bam_map.values() {
  533. temp_guard.track_many(bams.clone());
  534. }
  535. metrics.sort_index_duration = sort_start.elapsed();
  536. info!("✅ Sorted {} chunk BAMs", sorted_bam_map.len());
  537. // =====================================================================
  538. // Step 6: Merge and finalize per case
  539. // =====================================================================
  540. info!("Step 5/5: Finalizing per-case BAMs");
  541. let finalize_start = Timer::start();
  542. let finalized = finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?;
  543. metrics.finalize_duration = finalize_start.elapsed();
  544. metrics.cases_finalized = finalized.len();
  545. metrics.bytes_output = finalized
  546. .iter()
  547. .filter_map(|p| fs::metadata(p).ok())
  548. .map(|m| m.len())
  549. .sum();
  550. // Success — disarm cleanup
  551. temp_guard.disarm();
  552. metrics.total_duration = pipeline_start.elapsed();
  553. info!("{metrics}");
  554. // TODO: save metrics
  555. info!(
  556. "🎉 Pipeline completed for run: {} (flowcell: {})",
  557. self.protocol_run_id, self.flow_cell_id
  558. );
  559. Ok(())
  560. }
  561. fn verify_outputs_exist(
  562. &self,
  563. case_bam_map: &HashMap<String, Vec<PathBuf>>,
  564. stage: &str,
  565. ) -> anyhow::Result<()> {
  566. let missing: Vec<_> = case_bam_map
  567. .values()
  568. .flatten()
  569. .filter(|p| !p.exists())
  570. .collect();
  571. if !missing.is_empty() {
  572. bail!(
  573. "{} failed: {} output file(s) not created. First missing: {}",
  574. stage,
  575. missing.len(),
  576. missing[0].display()
  577. );
  578. }
  579. Ok(())
  580. }
  581. /// Serializes this run to a JSON file.
  582. ///
  583. /// # Arguments
  584. ///
  585. /// * `path` - Output file path (will be created or overwritten)
  586. ///
  587. /// # Errors
  588. ///
  589. /// Returns an error if file creation or JSON serialization fails.
  590. pub fn save_json(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
  591. let path = path.as_ref();
  592. let file = BufWriter::new(
  593. File::create(path)
  594. .with_context(|| format!("Failed to create JSON file: {}", path.display()))?,
  595. );
  596. serde_json::to_writer_pretty(file, self)
  597. .with_context(|| format!("Failed to serialize PromRun to {}", path.display()))?;
  598. Ok(())
  599. }
  600. /// Deserializes a run from a JSON file.
  601. ///
  602. /// # Arguments
  603. ///
  604. /// * `path` - Path to a previously saved JSON cache file
  605. ///
  606. /// # Errors
  607. ///
  608. /// Returns an error if the file cannot be read or parsed.
  609. pub fn load_json(path: impl AsRef<Path>) -> anyhow::Result<Self> {
  610. let path = path.as_ref();
  611. let file = BufReader::new(
  612. File::open(path)
  613. .with_context(|| format!("Failed to open JSON file: {}", path.display()))?,
  614. );
  615. let data = serde_json::from_reader(file)
  616. .with_context(|| format!("Failed to parse PromRun from {}", path.display()))?;
  617. Ok(data)
  618. }
  619. /// Returns the cache file path for this run.
  620. ///
  621. /// The cache path is `{config.run_cache_dir}/{protocol_run_id}.json`.
  622. #[must_use]
  623. pub fn cache_path(&self, config: &Config) -> PathBuf {
  624. let mut path = PathBuf::from(&config.run_cache_dir);
  625. path.push(format!("{}.json", self.protocol_run_id));
  626. path
  627. }
  628. /// Opens a previously cached run by its protocol run ID.
  629. ///
  630. /// # Arguments
  631. ///
  632. /// * `run_id` - The protocol run ID (filename stem in the cache directory)
  633. /// * `config` - Application configuration with `run_cache_dir`
  634. ///
  635. /// # Example
  636. ///
  637. /// ```rust,ignore
  638. /// let run = PromRun::open("abc123-def456", &config)?;
  639. /// ```
  640. pub fn open(run_id: &str, config: &Config) -> anyhow::Result<Self> {
  641. let mut path = PathBuf::from(&config.run_cache_dir);
  642. path.push(format!("{run_id}.json"));
  643. Self::load_json(path)
  644. }
  645. /// Returns the total size of all BAM files in bytes.
  646. #[must_use]
  647. pub fn total_bam_size(&self) -> u64 {
  648. self.bams.iter().map(|b| b.bam_size).sum()
  649. }
  650. /// Returns the total size of all POD5 files in bytes.
  651. #[must_use]
  652. pub fn total_pod5_size(&self) -> u64 {
  653. self.pod5s.iter().map(|p| p.file_size).sum()
  654. }
  655. /// Returns BAMs not yet merged into any case's final output.
  656. ///
  657. /// For each case, checks which source files are already in the final BAM
  658. /// and excludes them from processing.
  659. fn filter_already_merged(&self, config: &Config) -> Vec<&PromBam> {
  660. // Collect all source filenames already merged across all cases
  661. let mut all_existing_sources: FxHashSet<String> = FxHashSet::default();
  662. for case in &self.cases {
  663. let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
  664. if !final_bam_path.exists() {
  665. info!(
  666. "No existing final BAM for case {} — all matching inputs are new",
  667. case.case_id
  668. );
  669. continue;
  670. }
  671. match WGSBamStats::from_bam(&final_bam_path, config) {
  672. Ok(stats) => {
  673. let sources = stats.source_filenames();
  674. info!(
  675. "Case {} has {} existing source files",
  676. case.case_id,
  677. sources.len()
  678. );
  679. all_existing_sources.extend(sources);
  680. }
  681. Err(e) => {
  682. warn!(
  683. "Could not load stats for case {} ({}): {}",
  684. case.case_id,
  685. final_bam_path.display(),
  686. e
  687. );
  688. }
  689. }
  690. }
  691. if all_existing_sources.is_empty() {
  692. info!(
  693. "Run {}: no existing sources found, all {} BAMs are new",
  694. self.protocol_run_id,
  695. self.bams.len()
  696. );
  697. return self.bams.iter().collect();
  698. }
  699. let (new_bams, skipped): (Vec<_>, Vec<_>) = self.bams.iter().partition(|bam| {
  700. let filename = bam.path.file_name().and_then(|n| n.to_str()).unwrap_or("");
  701. !all_existing_sources.contains(filename)
  702. });
  703. if !skipped.is_empty() {
  704. info!(
  705. "Run {}: skipping {} BAM(s) already merged: {}{}",
  706. self.protocol_run_id,
  707. skipped.len(),
  708. skipped
  709. .iter()
  710. .take(3)
  711. .filter_map(|b| b.path.file_name())
  712. .map(|n| n.to_string_lossy())
  713. .collect::<Vec<_>>()
  714. .join(", "),
  715. if skipped.len() > 3 { "..." } else { "" }
  716. );
  717. }
  718. info!(
  719. "Run {}: {} new BAM(s) to process",
  720. self.protocol_run_id,
  721. new_bams.len()
  722. );
  723. new_bams
  724. }
  725. }
  726. /// Kit type classification for workflow branching.
  727. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  728. pub enum KitType {
  729. Multiplexed,
  730. NonMultiplexed,
  731. }
  732. impl KitType {
  733. /// Detects kit type from kit name string.
  734. ///
  735. /// Known multiplexed kits: NBD114, NBD112, RBK
  736. /// Known non-multiplexed kits: LSK114, LSK109
  737. pub fn detect(kit: &str) -> anyhow::Result<Self> {
  738. const MULTIPLEXED_PATTERNS: &[&str] = &["NBD114", "NBD112", "RBK"];
  739. const NON_MULTIPLEXED_PATTERNS: &[&str] = &["LSK114", "LSK109"];
  740. let kit_upper = kit.to_uppercase();
  741. for pattern in MULTIPLEXED_PATTERNS {
  742. if kit_upper.contains(pattern) {
  743. return Ok(Self::Multiplexed);
  744. }
  745. }
  746. for pattern in NON_MULTIPLEXED_PATTERNS {
  747. if kit_upper.contains(pattern) {
  748. return Ok(Self::NonMultiplexed);
  749. }
  750. }
  751. bail!(
  752. "Unknown kit type '{}'. Expected one of: {:?} (multiplexed) or {:?} (non-multiplexed)",
  753. kit,
  754. MULTIPLEXED_PATTERNS,
  755. NON_MULTIPLEXED_PATTERNS
  756. )
  757. }
  758. }
  759. /// Statistics for files in a single directory.
  760. #[derive(Default)]
  761. struct DirStats {
  762. pod5_count: usize,
  763. pod5_size: u64,
  764. bam_count: usize,
  765. bam_size: u64,
  766. }
  767. impl fmt::Display for PromRun {
  768. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  769. /// Converts bytes to GiB.
  770. fn to_gib(bytes: u64) -> f64 {
  771. bytes as f64 / 1024.0_f64.powi(3)
  772. }
  773. let mut dir_stats: BTreeMap<String, DirStats> = BTreeMap::new();
  774. // Aggregate POD5 stats by directory
  775. for pod5 in &self.pod5s {
  776. let rel = pod5.path.strip_prefix(&self.dir).unwrap_or(&pod5.path);
  777. let dir = rel.parent().unwrap_or(Path::new("."));
  778. let key = dir.display().to_string();
  779. let entry = dir_stats.entry(key).or_default();
  780. entry.pod5_count += 1;
  781. entry.pod5_size += pod5.file_size;
  782. }
  783. // Aggregate BAM stats by directory
  784. for bam in &self.bams {
  785. let rel = bam.path.strip_prefix(&self.dir).unwrap_or(&bam.path);
  786. let dir = rel.parent().unwrap_or(Path::new("."));
  787. let key = dir.display().to_string();
  788. let entry = dir_stats.entry(key).or_default();
  789. entry.bam_count += 1;
  790. entry.bam_size += bam.bam_size;
  791. }
  792. // Header information
  793. writeln!(f, "📦 PromRun")?;
  794. writeln!(f, " dir : {}", self.dir.display())?;
  795. writeln!(f, " imported : {}", self.import_date)?;
  796. writeln!(f, " run id : {}", self.protocol_run_id)?;
  797. writeln!(f, " position : {}", self.position_id)?;
  798. writeln!(
  799. f,
  800. " flow cell : {} ({})",
  801. self.flow_cell_id, self.flow_cell_product_code
  802. )?;
  803. writeln!(f, " sample id : {}", self.sample_id)?;
  804. writeln!(f, " experiment : {}", self.experiment_id)?;
  805. writeln!(f, " kit : {}", self.kit)?;
  806. writeln!(f, " cases : {}", self.cases.len())?;
  807. match &self.pore_activity {
  808. Some(pa) => writeln!(f, " pore act. : {} entries", pa.len())?,
  809. None => writeln!(f, " pore act. : none")?,
  810. }
  811. writeln!(f)?;
  812. writeln!(
  813. f,
  814. "📁 Files by directory (relative to {})",
  815. self.dir.display()
  816. )?;
  817. if dir_stats.is_empty() {
  818. writeln!(f, " (no files)")?;
  819. return Ok(());
  820. }
  821. for (dir, stats) in dir_stats {
  822. writeln!(f, " - {dir}")?;
  823. if stats.pod5_count > 0 {
  824. writeln!(
  825. f,
  826. " POD5 : {:>3} files, {:6.2} GiB",
  827. stats.pod5_count,
  828. to_gib(stats.pod5_size)
  829. )?;
  830. }
  831. if stats.bam_count > 0 {
  832. writeln!(
  833. f,
  834. " BAM : {:>3} files, {:6.2} GiB",
  835. stats.bam_count,
  836. to_gib(stats.bam_size)
  837. )?;
  838. }
  839. }
  840. Ok(())
  841. }
  842. }
  843. /// Map sample type to output directory name.
  844. fn map_sample_type_to_dir(sample_type: &str, config: &Config) -> String {
  845. let normalized = sample_type.to_lowercase();
  846. match normalized.as_str() {
  847. "nor" | "normal" | "mrd" => config.normal_name.clone(),
  848. _ => config.tumoral_name.clone(),
  849. }
  850. }
  851. /// Metadata extracted from an ONT PromethION BAM header.
  852. ///
  853. /// Wraps standard SAM header tags specifically for MinKNOW outputs.
  854. ///
  855. /// # Parsed Tags
  856. /// * `@RG` (Read Group): Extracts run ID, basecall model, flowcell ID, and sample info.
  857. /// * `@PG` (Program): Extracts MinKNOW version and GPU telemetry.
  858. #[derive(Debug, Clone, Serialize, Deserialize)]
  859. pub struct PromBam {
  860. /// Absolute path to the BAM file.
  861. pub path: PathBuf,
  862. /// Last modification timestamp of the file.
  863. pub modified: DateTime<Utc>,
  864. /// File size in bytes.
  865. pub bam_size: u64,
  866. /// Unique run identifier from the `@RG DS:runid=` field.
  867. pub run_id: String,
  868. /// Basecalling model name (e.g., `dna_r10.4.1_e8.2_400bps_sup@v4.2.0`).
  869. pub basecall_model: String,
  870. /// Modified base detection models, space-separated if multiple.
  871. pub modbase_models: String,
  872. /// Instrument model (e.g., `PromethION`, `P2I`).
  873. pub instrument: String,
  874. /// Flowcell barcode identifier (e.g., `PBI55810`).
  875. pub flowcell_id: String,
  876. /// Sample identifier from the sample sheet.
  877. pub sample: String,
  878. /// Library identifier.
  879. pub library: String,
  880. /// Full read group ID string from `@RG ID:`.
  881. pub read_group_id: String,
  882. /// Sequencing timestamp in ISO 8601 format from `@RG DT:`.
  883. pub timestamp: String,
  884. /// GPU information from MinKNOW `@PG DS:`.
  885. pub gpu_info: String,
  886. /// MinKNOW version.
  887. pub minknow_version: String,
  888. }
  889. impl PromBam {
  890. /// Parses BAM header metadata from the file at the given path.
  891. ///
  892. /// Opens the BAM file, reads the SAM header, and extracts ONT-specific
  893. /// metadata from `@RG` and `@PG` records.
  894. ///
  895. /// # Arguments
  896. ///
  897. /// * `path` - Path to a BAM file (must be readable and valid BAM format)
  898. ///
  899. /// # Returns
  900. ///
  901. /// Returns `Ok(PromBam)` with populated fields, or an error if:
  902. /// - The file cannot be opened or read
  903. /// - The file is not a valid BAM format
  904. /// - The header cannot be parsed as UTF-8
  905. ///
  906. /// # Notes
  907. ///
  908. /// - Fields may be empty strings if the corresponding header tags are missing
  909. /// - Only processes `@RG` records with `PL:ONT` (Oxford Nanopore platform)
  910. /// - Only processes `@PG` records with `PN:minknow`
  911. ///
  912. /// # Example
  913. ///
  914. /// ```rust,ignore
  915. /// let bam = PromBam::from_path("sample.bam")?;
  916. /// assert!(!bam.run_id.is_empty());
  917. /// ```
  918. pub fn from_path(path: impl AsRef<Path>) -> anyhow::Result<Self> {
  919. let path = path.as_ref().to_path_buf();
  920. // Retrieve file metadata
  921. let meta = fs::metadata(&path)
  922. .with_context(|| format!("Failed to read metadata for {}", path.display()))?;
  923. let modified: DateTime<Utc> = meta
  924. .modified()
  925. .map(DateTime::<Utc>::from)
  926. .with_context(|| format!("Failed to get modification time for {}", path.display()))?;
  927. let bam_size = meta.len();
  928. // Open BAM and read header
  929. let reader = bam::Reader::from_path(&path)
  930. .with_context(|| format!("Failed to open BAM file: {}", path.display()))?;
  931. let header = reader.header().clone();
  932. let text =
  933. std::str::from_utf8(header.as_bytes()).context("BAM header contains invalid UTF-8")?;
  934. // Initialize with defaults
  935. let mut prom = PromBam {
  936. path: path.clone(),
  937. modified,
  938. bam_size,
  939. run_id: String::new(),
  940. basecall_model: String::new(),
  941. modbase_models: String::new(),
  942. instrument: String::new(),
  943. flowcell_id: String::new(),
  944. sample: String::new(),
  945. library: String::new(),
  946. read_group_id: String::new(),
  947. timestamp: String::new(),
  948. gpu_info: String::new(),
  949. minknow_version: String::new(),
  950. };
  951. let mut found_ont_rg = false;
  952. let mut found_other_rg = false;
  953. // Parse header lines
  954. for line in text.lines() {
  955. if !line.starts_with('@') {
  956. continue;
  957. }
  958. let mut fields = line.split('\t');
  959. let Some(tag) = fields.next() else {
  960. continue;
  961. };
  962. let tag = tag.trim_start_matches('@');
  963. // Build key-value map from tab-separated fields
  964. let kv: Vec<(&str, &str)> = fields
  965. .filter_map(|f| {
  966. let mut it = f.splitn(2, ':');
  967. Some((it.next()?, it.next().unwrap_or("")))
  968. })
  969. .collect();
  970. match tag {
  971. "PG" => Self::parse_pg_record(&kv, &mut prom),
  972. "RG" => {
  973. if Self::parse_rg_record(&kv, &mut prom) {
  974. found_ont_rg = true;
  975. } else {
  976. found_other_rg = true;
  977. }
  978. }
  979. _ => {}
  980. }
  981. }
  982. if !found_ont_rg && found_other_rg {
  983. warn!(
  984. "BAM {} contains non-ONT read groups but no ONT read groups. \
  985. This may not be a Nanopore BAM file.",
  986. path.display()
  987. );
  988. }
  989. if !found_ont_rg && !found_other_rg {
  990. warn!(
  991. "BAM {} contains no read groups. Metadata will be incomplete.",
  992. path.display()
  993. );
  994. }
  995. Ok(prom)
  996. }
  997. /// Parses `@PG` (Program) header record for MinKNOW-specific fields.
  998. fn parse_pg_record(kv: &[(&str, &str)], prom: &mut PromBam) {
  999. // Only process MinKNOW program records
  1000. let is_minknow = kv.iter().any(|(k, v)| *k == "PN" && *v == "minknow");
  1001. if !is_minknow {
  1002. return;
  1003. }
  1004. for (k, v) in kv {
  1005. match *k {
  1006. "DS" => prom.gpu_info = (*v).to_string(),
  1007. "VN" => prom.minknow_version = (*v).to_string(),
  1008. _ => {}
  1009. }
  1010. }
  1011. }
  1012. /// Parses `@RG` (Read Group) header record for ONT-specific fields.
  1013. fn parse_rg_record(kv: &[(&str, &str)], prom: &mut PromBam) -> bool {
  1014. // Only process ONT platform read groups
  1015. let platform = kv
  1016. .iter()
  1017. .find(|(k, _)| *k == "PL")
  1018. .map(|(_, v)| *v)
  1019. .unwrap_or("");
  1020. if platform != "ONT" {
  1021. return false;
  1022. }
  1023. // Parse composite DS field: runid=... basecall_model=... modbase_models=...
  1024. if let Some((_, ds_raw)) = kv.iter().find(|(k, _)| *k == "DS") {
  1025. for part in ds_raw.split_whitespace() {
  1026. if let Some(val) = part.strip_prefix("runid=") {
  1027. prom.run_id = val.to_string();
  1028. } else if let Some(val) = part.strip_prefix("basecall_model=") {
  1029. prom.basecall_model = val.to_string();
  1030. } else if let Some(val) = part.strip_prefix("modbase_models=") {
  1031. // Remove commas from modbase_models list
  1032. prom.modbase_models = val.replace(',', " ");
  1033. }
  1034. }
  1035. }
  1036. // Extract standard SAM tags
  1037. for (k, v) in kv {
  1038. match *k {
  1039. "ID" => prom.read_group_id = (*v).to_string(),
  1040. "DT" => prom.timestamp = (*v).to_string(),
  1041. "LB" => prom.library = (*v).to_string(),
  1042. "PM" => prom.instrument = (*v).to_string(),
  1043. "PU" => prom.flowcell_id = (*v).to_string(),
  1044. "SM" => prom.sample = (*v).to_string(),
  1045. _ => {}
  1046. }
  1047. }
  1048. true
  1049. }
  1050. }
  1051. impl fmt::Display for PromBam {
  1052. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  1053. writeln!(f, "PromBam")?;
  1054. writeln!(f, " Path: {}", self.path.display())?;
  1055. writeln!(f, " Modified: {}", self.modified)?;
  1056. writeln!(
  1057. f,
  1058. " BamSize: {:.2} GiB",
  1059. self.bam_size as f64 / (1024.0_f64.powi(3))
  1060. )?;
  1061. writeln!(f)?;
  1062. writeln!(f, " Run ID: {}", self.run_id)?;
  1063. writeln!(f, " Basecaller: {}", self.basecall_model)?;
  1064. writeln!(f, " Modbase Models: {}", self.modbase_models)?;
  1065. writeln!(f)?;
  1066. writeln!(f, " Instrument: {}", self.instrument)?;
  1067. writeln!(f, " Flowcell ID: {}", self.flowcell_id)?;
  1068. writeln!(f, " Sample: {}", self.sample)?;
  1069. writeln!(f, " Library: {}", self.library)?;
  1070. writeln!(f)?;
  1071. writeln!(f, " RG ID: {}", self.read_group_id)?;
  1072. writeln!(f, " Timestamp: {}", self.timestamp)?;
  1073. writeln!(f)?;
  1074. writeln!(f, " GPU: {}", self.gpu_info)?;
  1075. writeln!(f, " MinKNOW Ver: {}", self.minknow_version)
  1076. }
  1077. }
  1078. /// Filters BAMs to only include those from bam_pass directories.
  1079. fn filter_pass_bams<'a>(bams: &[&'a PromBam], kit_type: KitType) -> Vec<&'a PromBam> {
  1080. bams.iter()
  1081. .filter(|bam| {
  1082. let p = bam.path.to_string_lossy();
  1083. if p.contains("bam_fail") {
  1084. return false;
  1085. }
  1086. match kit_type {
  1087. KitType::Multiplexed => p.contains("bam_pass"),
  1088. KitType::NonMultiplexed => p.contains("/bam/") || p.contains("bam_pass"),
  1089. }
  1090. })
  1091. .copied()
  1092. .collect()
  1093. }
  1094. /// Builds the final BAM path for a case.
  1095. fn build_final_bam_path(case: &IdInput, config: &Config) -> anyhow::Result<PathBuf> {
  1096. let final_bam_path = PathBuf::from(config.solo_bam(&case.case_id, &case.sample_type));
  1097. if let Some(parent) = final_bam_path.parent() {
  1098. fs::create_dir_all(parent)
  1099. .with_context(|| format!("Failed to create output directory: {}", parent.display()))?;
  1100. }
  1101. Ok(final_bam_path)
  1102. }
  1103. /// Merges multiple chunk BAMs into a single BAM.
  1104. ///
  1105. /// If only one chunk exists, returns it directly (no merge needed).
  1106. fn merge_chunk_bams(
  1107. chunk_bams: &[PathBuf],
  1108. case: &IdInput,
  1109. sample_type_dir: &str,
  1110. tmp_dir: &Path,
  1111. config: &Config,
  1112. ) -> anyhow::Result<PathBuf> {
  1113. if chunk_bams.len() == 1 {
  1114. return Ok(chunk_bams[0].clone());
  1115. }
  1116. let merged_path = tmp_dir.join(format!(
  1117. "{}_{}_{}_merged.bam",
  1118. Uuid::new_v4(),
  1119. case.case_id,
  1120. sample_type_dir
  1121. ));
  1122. info!(
  1123. " Merging {} chunks for case {} → {}",
  1124. chunk_bams.len(),
  1125. case.case_id,
  1126. merged_path.file_name().unwrap().to_string_lossy()
  1127. );
  1128. let mut merge_cmd =
  1129. SamtoolsMergeMany::from_config(merged_path.clone(), chunk_bams.to_vec(), config);
  1130. run!(config, &mut merge_cmd).with_context(|| {
  1131. format!(
  1132. "Failed to merge {} chunk BAMs for case {}",
  1133. chunk_bams.len(),
  1134. case.case_id
  1135. )
  1136. })?;
  1137. if !merged_path.exists() {
  1138. bail!(
  1139. "Merge produced no output for case {}: expected {}",
  1140. case.case_id,
  1141. merged_path.display()
  1142. );
  1143. }
  1144. Ok(merged_path)
  1145. }
  1146. /// Atomically replaces a destination file with a source file.
  1147. ///
  1148. /// Uses rename for atomicity. Both files must be on the same filesystem.
  1149. fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
  1150. if destination.exists() {
  1151. let backup = destination.with_extension("bam.backup");
  1152. fs::rename(destination, &backup).with_context(|| {
  1153. format!(
  1154. "Failed to backup {} to {}",
  1155. destination.display(),
  1156. backup.display()
  1157. )
  1158. })?;
  1159. if let Err(e) = fs::rename(source, destination) {
  1160. // Attempt restore, but propagate original error regardless
  1161. if let Err(restore_err) = fs::rename(&backup, destination) {
  1162. warn!(
  1163. "Failed to restore backup {} after failed replacement: {}",
  1164. backup.display(),
  1165. restore_err
  1166. );
  1167. }
  1168. return Err(e).with_context(|| {
  1169. format!(
  1170. "Failed to replace {} with {}",
  1171. destination.display(),
  1172. source.display()
  1173. )
  1174. });
  1175. }
  1176. // Remove backup and its orphaned index
  1177. if let Err(e) = remove_bam_with_index(&backup) {
  1178. log::warn!("Failed to clean up backup file {}: {}", backup.display(), e);
  1179. }
  1180. } else {
  1181. fs::rename(source, destination).with_context(|| {
  1182. format!(
  1183. "Failed to move {} to {}",
  1184. source.display(),
  1185. destination.display()
  1186. )
  1187. })?;
  1188. }
  1189. Ok(())
  1190. }
  1191. /// Indexes a BAM file using samtools.
  1192. fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> {
  1193. let mut index_cmd = SamtoolsIndex::from_config(config, bam.to_string_lossy().as_ref());
  1194. run!(config, &mut index_cmd)?;
  1195. let index_path = PathBuf::from(format!("{}.bai", bam.display()));
  1196. if !index_path.exists() {
  1197. bail!("Index file not created: {}", index_path.display());
  1198. }
  1199. Ok(())
  1200. }
  1201. /// Merges a new BAM into an existing final BAM atomically.
  1202. ///
  1203. /// Assumes both BAMs are coordinate-sorted. Skips re-sorting since
  1204. /// samtools merge preserves sort order from sorted inputs.
  1205. fn merge_into_existing_final(
  1206. source: &Path,
  1207. destination: &Path,
  1208. tmp_dir: &Path,
  1209. config: &Config,
  1210. ) -> anyhow::Result<()> {
  1211. info!(
  1212. " Merging into existing final BAM: {}",
  1213. destination.display()
  1214. );
  1215. index_bam(source, config)
  1216. .with_context(|| format!("Failed to index source BAM: {}", source.display()))?;
  1217. index_bam(destination, config)
  1218. .with_context(|| format!("Failed to index destination BAM: {}", destination.display()))?;
  1219. let temp_merged = tmp_dir.join(format!(".merging_{}.bam", Uuid::new_v4()));
  1220. let input_bams = vec![source.to_path_buf(), destination.to_path_buf()];
  1221. let mut merge_cmd = SamtoolsMergeMany::from_config(temp_merged.clone(), input_bams, config);
  1222. run!(config, &mut merge_cmd)
  1223. .with_context(|| "Failed to merge source into existing final BAM")?;
  1224. // No sort needed — merge preserves coordinate order from sorted inputs
  1225. atomic_replace(&temp_merged, destination)?;
  1226. index_bam(destination, config)
  1227. .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?;
  1228. if let Err(e) = remove_bam_with_index(source) {
  1229. log::warn!(
  1230. "Failed to clean up merged source {}: {}",
  1231. source.display(),
  1232. e
  1233. );
  1234. }
  1235. Ok(())
  1236. }
  1237. /// Creates a new final BAM from a merged per-case BAM.
  1238. ///
  1239. /// Source is already sorted (from sort_and_index_chunks), so only
  1240. /// move and index are needed.
  1241. fn create_new_final(
  1242. source: &Path,
  1243. destination: &Path,
  1244. case: &IdInput,
  1245. config: &Config,
  1246. ) -> anyhow::Result<()> {
  1247. info!(" Creating new final BAM: {}", destination.display());
  1248. // Source already sorted — just move and index
  1249. atomic_replace(source, destination)?;
  1250. // Adding SM tag in BAM without (if not demultiplexed if so it should be the barcode name)
  1251. // let _ = read_sm_tag_or_inject(
  1252. // &destination.to_string_lossy(),
  1253. // &format!("{}_{}", case.case_id, case.sample_type),
  1254. // config,
  1255. // )?;
  1256. index_bam(destination, config)
  1257. .with_context(|| format!("Failed to index final BAM: {}", destination.display()))?;
  1258. Ok(())
  1259. }
  1260. type AlignJobsCase = (Vec<DoradoAlign>, HashMap<String, Vec<PathBuf>>);
  1261. /// Prepares alignment jobs and builds the case-to-BAM mapping.
  1262. fn prepare_alignment_jobs(
  1263. bam_to_case: &HashMap<PathBuf, &IdInput>,
  1264. tmp_dir: &Path,
  1265. config: &Config,
  1266. ) -> anyhow::Result<AlignJobsCase> {
  1267. let mut align_jobs = Vec::new();
  1268. let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
  1269. for (bam_path, case) in bam_to_case {
  1270. let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
  1271. let key = format!("{}_{}", case.case_id, sample_type_dir);
  1272. let aligned_bam = tmp_dir.join(format!(
  1273. "{}_{}_{}_{}.bam",
  1274. Uuid::new_v4(),
  1275. case.case_id,
  1276. sample_type_dir,
  1277. bam_path.file_stem().unwrap().to_string_lossy()
  1278. ));
  1279. align_jobs.push(DoradoAlign::from_config(
  1280. config,
  1281. bam_path,
  1282. aligned_bam.clone(),
  1283. ));
  1284. case_bam_map.entry(key).or_default().push(aligned_bam);
  1285. }
  1286. Ok((align_jobs, case_bam_map))
  1287. }
  1288. /// Sorts and indexes all aligned chunk BAMs in parallel via Slurm.
  1289. ///
  1290. /// Returns an updated map with sorted BAM paths replacing the original aligned paths.
  1291. fn sort_and_index_chunks(
  1292. case_bam_map: &HashMap<String, Vec<PathBuf>>,
  1293. config: &Config,
  1294. ) -> anyhow::Result<HashMap<String, Vec<PathBuf>>> {
  1295. let mut sort_jobs = Vec::new();
  1296. let mut original_to_sorted: HashMap<PathBuf, PathBuf> = HashMap::new();
  1297. // Sort job for each BAM.
  1298. for bams in case_bam_map.values() {
  1299. for bam in bams {
  1300. let sorted_bam = bam.with_extension("sorted.bam");
  1301. original_to_sorted.insert(bam.clone(), sorted_bam.clone());
  1302. sort_jobs.push(SamtoolsSort::from_config(config, bam, &sorted_bam));
  1303. }
  1304. }
  1305. info!("Submitting {} sort jobs", sort_jobs.len());
  1306. run_many!(config, sort_jobs).context("Sort batch failed")?;
  1307. // Verify sorted BAM exists.
  1308. let missing: Vec<_> = original_to_sorted
  1309. .values()
  1310. .filter(|p| !p.exists())
  1311. .collect();
  1312. if !missing.is_empty() {
  1313. bail!(
  1314. "Sorting failed: {} file(s) not created. First missing: {}",
  1315. missing.len(),
  1316. missing[0].display()
  1317. );
  1318. }
  1319. // Index every sorted BAM.
  1320. let index_jobs: Vec<SamtoolsIndex> = original_to_sorted
  1321. .values()
  1322. .map(|sorted| SamtoolsIndex::from_config(config, sorted.to_string_lossy().as_ref()))
  1323. .collect();
  1324. info!("Submitting {} index jobs", index_jobs.len());
  1325. run_many!(config, index_jobs).context("Sort batch failed")?;
  1326. // Verify index exists for each BAM.
  1327. let missing_indices: Vec<_> = original_to_sorted
  1328. .values()
  1329. .map(|p| PathBuf::from(format!("{}.bai", p.display())))
  1330. .filter(|p| !p.exists())
  1331. .collect();
  1332. if !missing_indices.is_empty() {
  1333. bail!(
  1334. "Indexing failed: {} index file(s) not created. First missing: {}",
  1335. missing_indices.len(),
  1336. missing_indices[0].display()
  1337. );
  1338. }
  1339. // Remove input BAMs.
  1340. for original in original_to_sorted.keys() {
  1341. if let Err(e) = fs::remove_file(original) {
  1342. log::warn!(
  1343. "Failed to remove unsorted BAM {}: {}",
  1344. original.display(),
  1345. e
  1346. );
  1347. }
  1348. }
  1349. // Construct the new map(case_id) -> BAMs
  1350. let sorted_map: HashMap<String, Vec<PathBuf>> = case_bam_map
  1351. .iter()
  1352. .map(|(key, bams)| {
  1353. let sorted_bams: Vec<PathBuf> = bams
  1354. .iter()
  1355. .filter_map(|b| original_to_sorted.get(b).cloned())
  1356. .collect();
  1357. (key.clone(), sorted_bams)
  1358. })
  1359. .collect();
  1360. Ok(sorted_map)
  1361. }
  1362. /// Finalizes per-case BAM files by merging chunks and creating indexed outputs.
  1363. fn finalize_case_bams(
  1364. cases: &[IdInput],
  1365. sorted_bam_map: &HashMap<String, Vec<PathBuf>>,
  1366. tmp_dir: &Path,
  1367. config: &Config,
  1368. ) -> anyhow::Result<Vec<PathBuf>> {
  1369. let mut finalized = Vec::new();
  1370. for case in cases {
  1371. let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
  1372. let key = format!("{}_{}", case.case_id, sample_type_dir);
  1373. let Some(chunk_bams) = sorted_bam_map.get(&key) else {
  1374. info!(
  1375. "No aligned BAMs found for case {} ({}) — skipping",
  1376. case.case_id, sample_type_dir
  1377. );
  1378. continue;
  1379. };
  1380. if chunk_bams.is_empty() {
  1381. continue;
  1382. }
  1383. let final_bam_path = build_final_bam_path(case, config)?;
  1384. info!(
  1385. "Finalizing case {} ({}): {} chunks → {}",
  1386. case.case_id,
  1387. sample_type_dir,
  1388. chunk_bams.len(),
  1389. final_bam_path.display()
  1390. );
  1391. let merged_case_bam =
  1392. merge_chunk_bams(chunk_bams, case, &sample_type_dir, tmp_dir, config)?;
  1393. if final_bam_path.exists() {
  1394. merge_into_existing_final(&merged_case_bam, &final_bam_path, tmp_dir, config)?;
  1395. } else {
  1396. create_new_final(&merged_case_bam, &final_bam_path, case, config)?;
  1397. }
  1398. for chunk in chunk_bams {
  1399. if chunk.exists() {
  1400. if let Err(e) = remove_bam_with_index(chunk) {
  1401. log::warn!("Failed to clean up temp chunk {}: {}", chunk.display(), e);
  1402. }
  1403. }
  1404. }
  1405. let stats = WGSBamStats::open(&case.case_id, &case.sample_type, config)?;
  1406. ModkitSummary::initialize(&case.case_id, &case.sample_type, config)?.run()?;
  1407. info!("✅ Output: {}\n{stats}", final_bam_path.display());
  1408. finalized.push(final_bam_path);
  1409. }
  1410. Ok(finalized)
  1411. }
  1412. /// Normalize barcode strings for matching.
  1413. ///
  1414. /// Handles various formats:
  1415. /// - "barcode01" → "1"
  1416. /// - "NB01" → "1"
  1417. /// - "01" → "1"
  1418. /// - "BC01" → "1"
  1419. fn normalize_barcode(barcode: &str) -> String {
  1420. let lower = barcode.to_lowercase();
  1421. // Strip known prefixes
  1422. let stripped = lower
  1423. .trim_start_matches("barcode")
  1424. .trim_start_matches("nb")
  1425. .trim_start_matches("bc")
  1426. .trim_start_matches('_')
  1427. .trim_start_matches('-')
  1428. .trim();
  1429. // Parse as number to remove leading zeros
  1430. if let Ok(num) = stripped.parse::<u32>() {
  1431. return num.to_string();
  1432. }
  1433. // Fallback: return cleaned string
  1434. stripped.to_string()
  1435. }
  1436. lazy_static! {
  1437. static ref BARCODE_PATTERNS: [regex::Regex; 3] = [
  1438. regex::Regex::new(r"(?i)barcode(\d+)").unwrap(),
  1439. regex::Regex::new(r"(?i)nb(\d+)").unwrap(),
  1440. regex::Regex::new(r"(?i)bc(\d+)").unwrap(),
  1441. ];
  1442. }
  1443. /// Extract barcode number from various filename patterns.
  1444. ///
  1445. /// Recognizes:
  1446. /// - `*_barcode01_*`
  1447. /// - `*_NB01_*`
  1448. /// - `barcode01/`
  1449. fn extract_and_normalize_barcode(text: &str) -> Option<String> {
  1450. for pattern in BARCODE_PATTERNS.iter() {
  1451. if let Some(caps) = pattern.captures(text) {
  1452. if let Some(num_str) = caps.get(1) {
  1453. if let Ok(num) = num_str.as_str().parse::<u32>() {
  1454. return Some(num.to_string());
  1455. }
  1456. }
  1457. }
  1458. }
  1459. None
  1460. }
  1461. #[derive(Debug, Clone, Default, Serialize, Deserialize)]
  1462. pub struct PipelineMetrics {
  1463. pub validation_duration: Duration,
  1464. pub alignment_duration: Duration,
  1465. pub sort_index_duration: Duration,
  1466. pub finalize_duration: Duration,
  1467. pub total_duration: Duration,
  1468. pub bams_processed: usize,
  1469. pub cases_finalized: usize,
  1470. pub bytes_input: u64,
  1471. pub bytes_output: u64,
  1472. }
  1473. impl PipelineMetrics {
  1474. pub fn throughput_mbps(&self) -> f64 {
  1475. if self.total_duration.as_secs_f64() == 0.0 {
  1476. return 0.0;
  1477. }
  1478. (self.bytes_input as f64 / 1_000_000.0) / self.total_duration.as_secs_f64()
  1479. }
  1480. pub fn save(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
  1481. let file = BufWriter::new(File::create(path.as_ref())?);
  1482. serde_json::to_writer_pretty(file, self)?;
  1483. Ok(())
  1484. }
  1485. }
  1486. impl std::fmt::Display for PipelineMetrics {
  1487. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  1488. writeln!(f, "📊 Pipeline Metrics")?;
  1489. writeln!(f, " Validation: {:>8.2?}", self.validation_duration)?;
  1490. writeln!(f, " Alignment: {:>8.2?}", self.alignment_duration)?;
  1491. writeln!(f, " Sort/Index: {:>8.2?}", self.sort_index_duration)?;
  1492. writeln!(f, " Finalize: {:>8.2?}", self.finalize_duration)?;
  1493. writeln!(f, " ─────────────────────")?;
  1494. writeln!(f, " Total: {:>8.2?}", self.total_duration)?;
  1495. writeln!(f)?;
  1496. writeln!(f, " BAMs processed: {:>6}", self.bams_processed)?;
  1497. writeln!(f, " Cases finalized: {:>6}", self.cases_finalized)?;
  1498. writeln!(
  1499. f,
  1500. " Input: {:>8.2} GiB",
  1501. self.bytes_input as f64 / 1024.0_f64.powi(3)
  1502. )?;
  1503. writeln!(
  1504. f,
  1505. " Output: {:>8.2} GiB",
  1506. self.bytes_output as f64 / 1024.0_f64.powi(3)
  1507. )?;
  1508. writeln!(f, " Throughput: {:>8.2} MB/s", self.throughput_mbps())
  1509. }
  1510. }
  1511. struct Timer(Instant);
  1512. impl Timer {
  1513. fn start() -> Self {
  1514. Self(Instant::now())
  1515. }
  1516. fn elapsed(&self) -> Duration {
  1517. self.0.elapsed()
  1518. }
  1519. }
  1520. #[cfg(test)]
  1521. mod tests {
  1522. use super::*;
  1523. use log::info;
  1524. use crate::helpers::test_init;
  1525. #[test]
  1526. fn prom_run_process() -> anyhow::Result<()> {
  1527. test_init();
  1528. let config = Config::default();
  1529. let dir = "/mnt/beegfs02/scratch/t_steimle/data/prom/20251121_001_01_CD/03/20251121_1531_P2I-00461-B_PBI56020_efa567ea";
  1530. let prom_run = PromRun::from_dir(dir)?;
  1531. let prom = PromRun::open(&prom_run.protocol_run_id, &config)?;
  1532. info!("{prom}");
  1533. prom.process_bams(&config)?;
  1534. Ok(())
  1535. }
  1536. }