flowcells.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. use std::{
  2. collections::{HashMap, HashSet},
  3. fmt,
  4. fs::{self, File, OpenOptions},
  5. io::{BufReader, Read, Write},
  6. os::unix::fs::MetadataExt,
  7. path::Path,
  8. };
  9. use anyhow::Context;
  10. use chrono::{DateTime, TimeZone, Utc};
  11. use glob::glob;
  12. use log::{info, warn};
  13. use serde::{Deserialize, Serialize};
  14. use crate::{
  15. collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader},
  16. helpers::{find_files, list_directories},
  17. io::{readers::{get_gz_reader, get_reader}, writers::{get_gz_writer, get_writer}},
  18. };
  19. use super::minknow::{MinKnowSampleSheet, PoreStateEntry, PoreStateEntryExt, ThroughputEntry};
  20. /// A collection of `IdInput` records, with utility methods
  21. /// for loading, saving, deduplication, and construction from TSV.
  22. #[derive(Debug, Serialize, Deserialize, Clone, Default)]
  23. pub struct IdsInput {
  24. /// The list of ID entries.
  25. pub data: Vec<IdInput>,
  26. }
  27. /// A unique sample identifier from sequencing metadata.
  28. ///
  29. /// Uniqueness is defined by the combination of all fields.
  30. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
  31. pub struct IdInput {
  32. /// Case or patient ID.
  33. pub case_id: String,
  34. /// Time point or sample type.
  35. pub sample_type: String,
  36. /// Barcode number (sequencing index).
  37. pub barcode: String,
  38. /// Flow cell identifier.
  39. pub flow_cell_id: String,
  40. }
  41. impl IdsInput {
  42. /// Load `IdsInput` from a JSON file.
  43. pub fn load_json<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
  44. let file = File::open(&path)?;
  45. Ok(serde_json::from_reader(file)?)
  46. }
  47. /// Save `IdsInput` to a JSON file (pretty-printed).
  48. pub fn save_json<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
  49. let file = File::create(&path)?;
  50. serde_json::to_writer_pretty(file, self)?;
  51. Ok(())
  52. }
  53. /// Remove duplicate `IdInput` entries, retaining the first occurrence.
  54. pub fn dedup(&mut self) {
  55. let mut seen = HashSet::new();
  56. self.data.retain(|item| seen.insert(item.clone()));
  57. }
  58. /// Add a new `IdInput` and deduplicate.
  59. pub fn add_input(&mut self, entry: IdInput) {
  60. self.data.push(entry);
  61. self.dedup();
  62. }
  63. }
  64. /// Container for a deduplicated and enriched collection of flowcells (`FlowCel`).
  65. ///
  66. /// `FlowCells` represents the aggregated result of scanning multiple sources:
  67. /// - A cached archive of flowcells (`archive_store_path`)
  68. /// - A live scan of the local run directory (`local_run_dir`)
  69. ///
  70. /// Each [`FlowCel`] contains all necessary metadata for downstream processing,
  71. /// including parsed MinKNOW sample sheet data, `.pod5` file statistics, experiment layout,
  72. /// and optional sample/case annotations from an [`IdsInput`] file.
  73. ///
  74. /// The [`FlowCells::load`] method performs the following:
  75. /// - Loads existing flowcells from the archive if available
  76. /// - Scans local directories for new or updated flowcells
  77. /// - Deduplicates flowcells using the `flowcell_id`
  78. /// - Retains the most recently modified version of each flowcell
  79. /// - Enriches each flowcell with case-level annotations
  80. ///
  81. /// # Fields
  82. /// - `flow_cells`: A deduplicated list of fully parsed [`FlowCel`] instances.
  83. ///
  84. /// # Example
  85. /// ```
  86. /// let flow_cells = FlowCells::load(
  87. /// "/mnt/data/runs",
  88. /// "inputs.json",
  89. /// "flowcell_cache.json"
  90. /// )?;
  91. /// println!("Loaded {} unique flowcells", flow_cells.flow_cells.len());
  92. /// ```
  93. ///
  94. /// # Deduplication
  95. /// Flowcells are uniquely identified by their `flowcell_id`, a combination of
  96. /// `{experiment_id}/{sample_id}`. If both archived and local versions exist,
  97. /// the one with the latest `.pod5` modification time is retained.
  98. ///
  99. /// # Related Types
  100. /// - [`FlowCel`]: Describes a flowcell, its metadata, and files
  101. /// - [`FlowCellExperiment`]: Muxed vs. demuxed layout classification
  102. /// - [`FlowCellLocation`]: Indicates source (local or archive)
  103. /// - [`MinKnowSampleSheet`]: Parsed sample sheet data
  104. /// - [`IdInput`]: Case-level annotation applied to flowcells
  105. #[derive(Debug, Serialize, Deserialize, Clone, Default)]
  106. pub struct FlowCells {
  107. /// A collection of parsed flowcell metadata records.
  108. pub flow_cells: Vec<FlowCell>,
  109. }
  110. impl FlowCells {
  111. /// Load and merge flowcells from archive and local directories, then annotate with `IdsInput`.
  112. ///
  113. /// # Arguments
  114. /// * `local_run_dir` - Root directory containing local flowcell run folders.
  115. /// * `inputs_path` - Path to a JSON file with [`IdInput`] annotations.
  116. /// * `archive_store_path` - Path to a cached JSON file of archived flowcells.
  117. ///
  118. /// # Returns
  119. /// A deduplicated and annotated [`FlowCells`] collection.
  120. ///
  121. /// # Errors
  122. /// Returns an error if any input file cannot be read or parsed, or if local scanning fails.
  123. pub fn load(
  124. local_run_dir: &str,
  125. inputs_path: &str,
  126. archive_store_path: &str,
  127. ) -> anyhow::Result<Self> {
  128. let mut instance = Self::default();
  129. instance.load_from_archive(archive_store_path)?;
  130. instance.load_from_local(local_run_dir)?;
  131. instance.annotate_with_inputs(inputs_path)?;
  132. // instance.save_to_archive(archive_store_path)?;
  133. Ok(instance)
  134. }
  135. /// Load flowcells from a cached archive and merge into `self.flow_cells`.
  136. ///
  137. /// Retains only the most recently modified entry for each `flowcell_id`.
  138. ///
  139. /// # Arguments
  140. /// * `archive_path` - Path to archive JSON file.
  141. ///
  142. /// # Errors
  143. /// Returns an error if the file cannot be read or parsed.
  144. pub fn load_from_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
  145. if !Path::new(archive_path).exists() {
  146. return Ok(());
  147. }
  148. let file = get_reader(archive_path)
  149. .with_context(|| format!("Failed to open archive file: {archive_path}"))?;
  150. let archived: Vec<FlowCell> = serde_json::from_reader(BufReader::new(file))
  151. .with_context(|| format!("Failed to parse FlowCells from: {archive_path}"))?;
  152. self.insert_flowcells(archived);
  153. Ok(())
  154. }
  155. pub fn save_to_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
  156. // let file = OpenOptions::new()
  157. // .write(true)
  158. // .create(true)
  159. // .truncate(true)
  160. // .open(archive_path)
  161. // .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
  162. let tmp_path = format!("/tmp/{}.json.gz", uuid::Uuid::new_v4());
  163. let mut file = get_writer(&tmp_path)
  164. .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
  165. serde_json::to_writer_pretty(&mut file, &self.flow_cells)
  166. .with_context(|| format!("Failed to write FlowCells to: {archive_path}"))?;
  167. file.flush()?;
  168. // file.get_ref().sync_all()?;
  169. std::fs::copy(&tmp_path, archive_path)?;
  170. std::fs::remove_file(tmp_path)?;
  171. Ok(())
  172. }
  173. /// Scan local run directories for flowcells and merge into `self.flow_cells`.
  174. ///
  175. /// Uses `scan_local()` to parse local metadata and selects the most recently modified
  176. /// version if duplicates exist.
  177. ///
  178. /// # Arguments
  179. /// * `local_dir` - Root directory containing flowcell runs.
  180. ///
  181. /// # Errors
  182. /// Returns an error if scanning fails or if directory layout is invalid.
  183. pub fn load_from_local(&mut self, local_dir: &str) -> anyhow::Result<()> {
  184. info!("Scanning {local_dir} for sample sheets.");
  185. let sample_sheets = find_files(&format!("{local_dir}/**/sample_sheet*"))?;
  186. info!("{} sample sheets found.", sample_sheets.len());
  187. for path in sample_sheets {
  188. let dir = path
  189. .parent()
  190. .ok_or_else(|| anyhow::anyhow!("Invalid sample_sheet path: {}", path.display()))?;
  191. let dir_str = dir.to_string_lossy();
  192. let (sheet, pore, throughput, files) = scan_local(&dir_str)
  193. .with_context(|| format!("Failed to scan local dir: {dir_str}"))?;
  194. let fc = FlowCell::new(
  195. sheet,
  196. pore,
  197. throughput,
  198. FlowCellLocation::Local(dir_str.to_string()),
  199. files,
  200. )
  201. .with_context(|| format!("Failed to create FlowCell from dir: {dir_str}"))?;
  202. self.insert_flowcells(vec![fc]);
  203. }
  204. Ok(())
  205. }
  206. /// Annotate flowcells with matching `IdInput` records from JSON.
  207. ///
  208. /// Assigns to `FlowCell.cases` all matching `IdInput`s by `flow_cell_id`.
  209. ///
  210. /// # Arguments
  211. /// * `inputs_path` - Path to JSON file containing a list of `IdInput` records.
  212. ///
  213. /// # Errors
  214. /// Returns an error if the file is unreadable or malformed.
  215. pub fn annotate_with_inputs(&mut self, inputs_path: &str) -> anyhow::Result<()> {
  216. if !Path::new(inputs_path).exists() {
  217. warn!("IdsInput file not found at {}", inputs_path);
  218. return Ok(());
  219. }
  220. let inputs = IdsInput::load_json(inputs_path)
  221. .with_context(|| format!("Failed to load IdsInput from: {inputs_path}"))?;
  222. self.cross_cases(&inputs)
  223. .with_context(|| format!("Failed to cross with IdsInput from: {inputs_path}"))?;
  224. Ok(())
  225. }
  226. pub fn cross_cases(&mut self, inputs: &IdsInput) -> anyhow::Result<()> {
  227. for fc in &mut self.flow_cells {
  228. fc.cases = inputs
  229. .data
  230. .iter()
  231. .filter(|id| id.flow_cell_id == fc.sample_sheet.flow_cell_id)
  232. .cloned()
  233. .collect();
  234. }
  235. Ok(())
  236. }
  237. /// Insert new flowcells into the current collection, deduplicating by `flowcell_id`.
  238. ///
  239. /// For duplicates, retains the flowcell with the newer `.modified` timestamp.
  240. ///
  241. /// # Arguments
  242. /// * `new_cells` - A vector of parsed `FlowCell` instances.
  243. pub fn insert_flowcells(&mut self, new_cells: Vec<FlowCell>) {
  244. // let mut map: HashMap<String, FlowCell> = self
  245. // .flow_cells
  246. // .drain(..)
  247. // .map(|fc| {
  248. // (fc.sample_sheet.flow_cell_id.clone(), fc)
  249. // })
  250. // .collect();
  251. //
  252. for fc in new_cells {
  253. self.flow_cells.push(fc);
  254. // map.entry(fc.sample_sheet.flow_cell_id.clone())
  255. // .and_modify(|existing| {
  256. // if fc.modified > existing.modified {
  257. // *existing = fc.clone();
  258. // }
  259. // })
  260. // .or_insert(fc);
  261. }
  262. self.sort_by_modified();
  263. self.flow_cells.dedup_by_key(|fc| fc.id());
  264. // self.flow_cells = map.into_values().collect();
  265. }
  266. /// Discover new archived flowcells in `archive_dir` and update the JSON at `store_path`.
  267. ///
  268. /// - Loads existing archive (gzip-aware) into `self`.
  269. /// - Recursively scans **all** `.tar` files under `archive_dir`.
  270. /// - For each archive, runs `scan_archive`, builds a `FlowCell` with
  271. /// `FlowCellLocation::Archived(archive_dir)`, and merges it via `insert_flowcells`.
  272. /// - Logs how many new flowcells were added.
  273. /// - Saves back to `store_path` (gzip-compressed).
  274. pub fn update_archive_from_scan(
  275. &mut self,
  276. archive_dir: &str,
  277. store_path: &str,
  278. ) -> anyhow::Result<()> {
  279. // 1) Load whatever’s already in the archive (if present)
  280. self.load_from_archive(store_path)
  281. .with_context(|| format!("Loading existing archive from {}", store_path))?;
  282. let before = self.flow_cells.len();
  283. info!("Scanning '{}' for .tar archives…", archive_dir);
  284. // 2) Find & process every .tar (recursive)
  285. let pattern = format!("{}/*.tar", archive_dir);
  286. let entries =
  287. glob(&pattern).with_context(|| format!("Invalid glob pattern: {}", &pattern))?;
  288. for entry in entries.filter_map(Result::ok) {
  289. let path = entry.to_string_lossy();
  290. match scan_archive(&path) {
  291. Ok((sheet, pore, throughput, files)) => {
  292. match FlowCell::new(
  293. sheet,
  294. pore,
  295. throughput,
  296. FlowCellLocation::Archived(path.to_string()),
  297. files,
  298. ) {
  299. Ok(fc) => {
  300. // merge & dedup
  301. self.insert_flowcells(vec![fc]);
  302. }
  303. Err(e) => {
  304. warn!("Failed to build FlowCell from '{}': {}", path, e);
  305. }
  306. }
  307. }
  308. Err(e) => warn!("Failed to scan archive '{}': {}", path, e),
  309. }
  310. }
  311. let added = self.flow_cells.len().saturating_sub(before);
  312. info!("Discovered {} new archived flowcell(s).", added);
  313. // 3) Save the merged list back
  314. self.save_to_archive(store_path)
  315. .with_context(|| format!("Saving updated archive to {}", store_path))?;
  316. Ok(())
  317. }
  318. /// Sorts `flow_cells` in-place from oldest (`modified` smallest) to newest.
  319. pub fn sort_by_modified(&mut self) {
  320. self.flow_cells.sort_by_key(|fc| fc.modified);
  321. }
  322. }
  323. /// Represents a fully described flowcell unit, including experimental metadata,
  324. /// physical location (local or archived), sample sheet data, and associated pod5 files.
  325. ///
  326. /// A `FlowCell` object serves as the central unit in the data model for sample aggregation
  327. /// and downstream processing.
  328. ///
  329. /// # Fields
  330. /// - `sample_sheet`: The original MinKNOW sample sheet metadata (`MinKnowSampleSheet`).
  331. /// - `experiment`: Experiment type inferred from `.pod5` files (see `FlowCellExperiment`).
  332. /// - `location`: Whether the flowcell was loaded from a local directory or archive store.
  333. /// - `modified`: Last modification timestamp among `.pod5` files.
  334. /// - `pod5_size`: Total size (in bytes) of `.pod5` files.
  335. /// - `n_pod5`: Number of `.pod5` files found.
  336. /// - `cases`: List of sample/case-level annotations associated with this flowcell (from `IdsInput`).
  337. #[derive(Debug, Serialize, Deserialize, Clone)]
  338. pub struct FlowCell {
  339. pub sample_sheet: MinKnowSampleSheet,
  340. pub experiment: FlowCellExperiment,
  341. pub location: FlowCellLocation,
  342. pub modified: DateTime<Utc>,
  343. pub pod5_size: usize,
  344. pub n_pod5: usize,
  345. pub cases: Vec<IdInput>,
  346. pub pore_activity: Option<Vec<PoreStateEntry>>,
  347. pub throughput: Option<Vec<ThroughputEntry>>,
  348. }
  349. /// Describes the physical origin of a flowcell when loaded.
  350. ///
  351. /// This is used to differentiate flowcells discovered during a local scan
  352. /// versus those restored from an archived store.
  353. #[derive(Debug, Serialize, Deserialize, Clone)]
  354. pub enum FlowCellLocation {
  355. /// Flowcell discovered in a local filesystem path.
  356. Local(String),
  357. /// Flowcell restored from a `.tar` archive or a serialized cache.
  358. Archived(String),
  359. }
  360. impl fmt::Display for FlowCellLocation {
  361. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  362. write!(
  363. f,
  364. "{}",
  365. match self {
  366. FlowCellLocation::Local(_) => "local",
  367. FlowCellLocation::Archived(_) => "archived",
  368. }
  369. )
  370. }
  371. }
  372. impl FlowCell {
  373. /// Constructs a new `FlowCell` from a sample sheet and associated file list.
  374. ///
  375. /// This method aggregates information from a parsed `MinKnowSampleSheet` and the
  376. /// corresponding `.pod5` file metadata, and infers the experiment type from
  377. /// file paths using `FlowCellExperiment::from_pod5_paths`.
  378. ///
  379. /// # Arguments
  380. /// - `sample_sheet`: Parsed sample sheet metadata.
  381. /// - `location`: Origin of the flowcell (local or archived).
  382. /// - `files`: List of files associated with the flowcell, each with:
  383. /// - `String`: file path
  384. /// - `u64`: size in bytes
  385. /// - `DateTime<Utc>`: modification time
  386. ///
  387. /// # Returns
  388. /// - `Ok(FlowCell)` if experiment type and file metadata are successfully resolved.
  389. /// - `Err` if the experiment type cannot be determined.
  390. ///
  391. /// # Errors
  392. /// - If `FlowCellExperiment::from_pod5_paths` fails (e.g., unknown layout).
  393. ///
  394. /// # Example
  395. /// ```
  396. /// let fc = FlowCell::new(sample_sheet, FlowCellLocation::Local(dir), files)?;
  397. /// println!("Flowcell ID: {}", fc.flowcell_id);
  398. /// ```
  399. pub fn new(
  400. sample_sheet: MinKnowSampleSheet,
  401. pore_activity: Option<Vec<PoreStateEntry>>,
  402. throughput: Option<Vec<ThroughputEntry>>,
  403. location: FlowCellLocation,
  404. files: Vec<(String, u64, DateTime<Utc>)>,
  405. ) -> anyhow::Result<Self> {
  406. let flowcell_id = sample_sheet.flow_cell_id.clone();
  407. // Filter .pod5 files
  408. let pod5s: Vec<_> = files
  409. .iter()
  410. .filter(|(path, _, _)| path.ends_with(".pod5"))
  411. .cloned()
  412. .collect();
  413. let n_pod5 = pod5s.len();
  414. // Infer experiment type from pod5 paths
  415. let experiment = FlowCellExperiment::from_pod5_paths(
  416. &files.iter().map(|(p, _, _)| p.to_string()).collect(),
  417. )
  418. .ok_or_else(|| anyhow::anyhow!("Can't find experiment type for {flowcell_id}"))?;
  419. // Aggregate pod5 size and latest modification time
  420. let (pod5_size, modified): (usize, DateTime<Utc>) = files
  421. .into_iter()
  422. .filter(|(path, _, _)| path.ends_with(".pod5"))
  423. .fold(
  424. (0, DateTime::<Utc>::MIN_UTC),
  425. |(acc_size, acc_time), (_, size, time)| {
  426. (
  427. acc_size + size as usize,
  428. if acc_time < time { time } else { acc_time },
  429. )
  430. },
  431. );
  432. Ok(Self {
  433. experiment,
  434. location,
  435. modified,
  436. sample_sheet,
  437. pod5_size,
  438. n_pod5,
  439. cases: Vec::new(),
  440. pore_activity,
  441. throughput,
  442. })
  443. }
  444. pub fn id(&self) -> String {
  445. match &self.experiment {
  446. FlowCellExperiment::WGSPod5Mux(p) => p,
  447. FlowCellExperiment::WGSPod5Demux(p) => p,
  448. }.to_string()
  449. }
  450. }
  451. /// Describes the type of experiment layout based on `.pod5` file structure.
  452. ///
  453. /// Used to distinguish between whole-genome sequencing (WGS) `.pod5` files
  454. /// organized in a single (muxed) directory or demultiplexed (`pod5_pass`) structure.
  455. #[derive(Debug, Serialize, Deserialize, Clone)]
  456. pub enum FlowCellExperiment {
  457. /// `.pod5` files are stored in a single unbarcoded directory, typically `/pod5/`.
  458. WGSPod5Mux(String),
  459. /// `.pod5` files are organized by barcode in subdirectories, typically `/pod5_pass/`.
  460. WGSPod5Demux(String),
  461. }
  462. impl fmt::Display for FlowCellExperiment {
  463. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  464. write!(
  465. f,
  466. "{}",
  467. match self {
  468. FlowCellExperiment::WGSPod5Mux(_) => "WGS Pod5 Muxed",
  469. FlowCellExperiment::WGSPod5Demux(_) => "WGS Pod5 Demuxed",
  470. }
  471. )
  472. }
  473. }
  474. impl FlowCellExperiment {
  475. /// Attempts to infer the experiment type from the immediate subdirectories of the given path.
  476. ///
  477. /// This is useful when scanning a flowcell directory directly and checking
  478. /// whether it contains a `pod5/` or `pod5_pass/` structure.
  479. ///
  480. /// # Arguments
  481. /// - `flowcell_path`: Path to the root of a flowcell directory.
  482. ///
  483. /// # Returns
  484. /// - `Some(FlowCellExperiment)` if a known subdirectory is found.
  485. /// - `None` if no match is detected.
  486. pub fn from_path(flowcell_path: &str) -> Option<Self> {
  487. for dir in list_directories(flowcell_path).ok().unwrap_or_default() {
  488. if dir == "pod5" {
  489. return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string()));
  490. }
  491. if dir == "pod5_pass" {
  492. return Some(FlowCellExperiment::WGSPod5Demux(dir.to_string()));
  493. }
  494. }
  495. None
  496. }
  497. /// Attempts to infer the experiment type from a list of `.pod5` file paths.
  498. ///
  499. /// This is typically used when files have already been collected and their
  500. /// parent directories can be checked for naming conventions.
  501. ///
  502. /// # Arguments
  503. /// - `all_paths`: Vector of paths (as strings) to `.pod5` files or directories.
  504. ///
  505. /// # Returns
  506. /// - `Some(FlowCellExperiment)` if a known suffix is detected.
  507. /// - `None` if no matching pattern is found.
  508. pub fn from_pod5_paths(all_paths: &Vec<String>) -> Option<Self> {
  509. for path in all_paths {
  510. if path.ends_with("/pod5/") || path.ends_with("/pod5") {
  511. return Some(FlowCellExperiment::WGSPod5Mux(path.to_string()));
  512. }
  513. if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") {
  514. return Some(FlowCellExperiment::WGSPod5Demux(path.to_string()));
  515. }
  516. }
  517. None
  518. }
  519. /// Returns the underlying string (directory path) for the experiment.
  520. ///
  521. /// This is useful when you need access to the directory path used to classify the experiment.
  522. pub fn inner(&self) -> &str {
  523. match self {
  524. FlowCellExperiment::WGSPod5Mux(v) => v,
  525. FlowCellExperiment::WGSPod5Demux(v) => v,
  526. }
  527. }
  528. }
  529. /// Represents the result of scanning a MinKNOW experiment source.
  530. ///
  531. /// This tuple includes:
  532. /// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample.
  533. /// - `Vec<(String, u64, DateTime<Utc>)>`: A list of files with:
  534. /// - `String`: file path
  535. /// - `u64`: file size in bytes
  536. /// - `DateTime<Utc>`: last modification time (UTC)
  537. type ExperimentData = (
  538. MinKnowSampleSheet,
  539. Option<Vec<PoreStateEntry>>,
  540. Option<Vec<ThroughputEntry>>,
  541. Vec<(String, u64, DateTime<Utc>)>,
  542. );
  543. /// Scans a local directory for MinKNOW experiment files and metadata.
  544. ///
  545. /// This function recursively walks a directory using globbing,
  546. /// collects file paths, sizes, and modification timestamps,
  547. /// and identifies a `sample_sheet` file to parse as `MinKnowSampleSheet`.
  548. ///
  549. /// # Arguments
  550. /// - `dir`: Root directory to scan (absolute or relative).
  551. ///
  552. /// # Returns
  553. /// - `Ok(ExperimentData)` containing the sample sheet and a list of file records.
  554. /// - `Err` if the directory can't be accessed, no sample sheet is found, or parsing fails.
  555. ///
  556. /// # Requirements
  557. /// - A file path containing `"sample_sheet"` must be present and readable.
  558. /// - The sample sheet must be formatted according to MinKNOW expectations
  559. /// (1 header + 1 data row).
  560. ///
  561. /// # Errors
  562. /// - If reading files or metadata fails.
  563. /// - If the sample sheet is missing or invalid.
  564. ///
  565. /// # Example
  566. /// ```
  567. /// let (sheet, files) = scan_local("/data/run001")?;
  568. /// println!("Kit used: {}", sheet.kit);
  569. /// println!("Number of files found: {}", files.len());
  570. /// ```
  571. pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
  572. let mut result = Vec::new();
  573. let mut sample_sheet: Option<String> = None;
  574. let mut pore_activity = None;
  575. let mut throughput = None;
  576. for entry in glob(&format!("{}/**/*", dir))? {
  577. let file = entry.context("Failed to read an entry from the tar archive")?;
  578. // Extract file properties safely
  579. let metadata = file.metadata().context(format!(
  580. "Failed to access file metadata: {}",
  581. file.display()
  582. ))?;
  583. let size = metadata.size();
  584. let modified = metadata.mtime();
  585. let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
  586. let path = file.to_string_lossy().into_owned();
  587. if path.contains("pore_activity") {
  588. let mut reader =
  589. File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
  590. pore_activity = Some(
  591. parse_pore_activity_from_reader(&mut reader)
  592. .map(|d| d.group_by_state_and_time(60.0))
  593. .with_context(|| {
  594. format!(
  595. "Failed to parse pore activity date from: {}",
  596. file.display()
  597. )
  598. })?,
  599. );
  600. } else if path.contains("sample_sheet") {
  601. sample_sheet = Some(path.clone());
  602. } else if path.contains("throughput") {
  603. let mut reader =
  604. File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
  605. throughput = Some(
  606. parse_throughput_from_reader(&mut reader)
  607. .with_context(|| {
  608. format!("Failed to parse throughput data from: {}", file.display())
  609. })?
  610. .into_iter()
  611. .enumerate()
  612. .filter_map(|(i, item)| if i % 60 == 0 { Some(item) } else { None })
  613. .collect(),
  614. );
  615. }
  616. result.push((path, size, modified_utc));
  617. }
  618. let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?;
  619. let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
  620. .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
  621. // info!("{} files found in {}", result.len(), dir);
  622. Ok((sample_sheet, pore_activity, throughput, result))
  623. }
  624. /// Scans a `.tar` archive containing a MinKNOW sequencing experiment.
  625. ///
  626. /// This function opens a TAR archive, searches for the `sample_sheet` CSV file,
  627. /// extracts its metadata, and parses it into a `MinKnowSampleSheet`.
  628. /// All other entries in the archive are collected with their path, size, and modification time.
  629. ///
  630. /// # Arguments
  631. /// - `tar_path`: Path to the `.tar` archive file.
  632. ///
  633. /// # Returns
  634. /// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata.
  635. /// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`.
  636. ///
  637. /// # Archive Requirements
  638. /// - Must contain exactly one file matching `"sample_sheet"` in its path.
  639. /// - The sample sheet must contain a valid CSV header and a single data row.
  640. ///
  641. /// # Errors
  642. /// - Fails if the archive can't be opened or read.
  643. /// - Fails if any entry is malformed (e.g., missing timestamp).
  644. /// - Fails if no sample sheet is found or if it is malformed.
  645. ///
  646. /// # Example
  647. /// ```no_run
  648. /// let (sample_sheet, files) = scan_archive("archive.tar")?;
  649. /// println!("Sample ID: {}", sample_sheet.sample_id);
  650. /// println!("Total files in archive: {}", files.len());
  651. /// ```
  652. pub fn scan_archive(tar_path: &str) -> anyhow::Result<ExperimentData> {
  653. info!("Scanning archive: {tar_path}");
  654. let file = File::open(tar_path)
  655. .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
  656. let mut archive = tar::Archive::new(file);
  657. let mut result = Vec::new();
  658. let mut sample_sheet: Option<String> = None;
  659. let mut throughput = None;
  660. let mut pore_activity = None;
  661. // Iterate through the entries in the archive
  662. for entry in archive.entries_with_seek()? {
  663. let mut file = entry.context("Failed to read an entry from the tar archive")?;
  664. // Extract file properties safely
  665. let size = file.size();
  666. let modified = file
  667. .header()
  668. .mtime()
  669. .context("Failed to get modification time")?;
  670. let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
  671. let path = file
  672. .path()
  673. .context("Failed to get file path from tar entry")?
  674. .to_string_lossy()
  675. .into_owned();
  676. if path.contains("pore_activity") {
  677. pore_activity = Some(
  678. parse_pore_activity_from_reader(&mut file)
  679. .context("Failed to read pore_activity data: {path}")?,
  680. );
  681. } else if path.contains("sample_sheet") {
  682. let mut buffer = String::new();
  683. file.read_to_string(&mut buffer)
  684. .context("Failed to read file contents as string")?;
  685. sample_sheet = Some(buffer);
  686. } else if path.contains("throughput") {
  687. throughput = Some(
  688. parse_throughput_from_reader(&mut file)
  689. .context("Failed to read throughput data: {path}")?,
  690. );
  691. }
  692. result.push((path, size, modified_utc));
  693. }
  694. let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
  695. "No sample sheet detected in archive: {tar_path}"
  696. ))?;
  697. let (_, data) = sample_sheet
  698. .split_once("\n")
  699. .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
  700. let sample_sheet: MinKnowSampleSheet = data
  701. .try_into()
  702. .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
  703. Ok((sample_sheet, pore_activity, throughput, result))
  704. }