flowcells.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. use std::{
  2. collections::{HashMap, HashSet},
  3. fmt,
  4. fs::{self, File, OpenOptions},
  5. io::{BufReader, Read},
  6. os::unix::fs::MetadataExt,
  7. path::Path,
  8. };
  9. use anyhow::Context;
  10. use chrono::{DateTime, TimeZone, Utc};
  11. use glob::glob;
  12. use log::{info, warn};
  13. use serde::{Deserialize, Serialize};
  14. use crate::{
  15. collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader},
  16. helpers::{find_files, list_directories},
  17. };
  18. use super::minknow::{MinKnowSampleSheet, PoreStateEntry, ThroughputEntry};
  19. /// A collection of `IdInput` records, with utility methods
  20. /// for loading, saving, deduplication, and construction from TSV.
  21. #[derive(Debug, Serialize, Deserialize, Clone)]
  22. pub struct IdsInput {
  23. /// The list of ID entries.
  24. pub data: Vec<IdInput>,
  25. }
  26. /// A unique sample identifier from sequencing metadata.
  27. ///
  28. /// Uniqueness is defined by the combination of all fields.
  29. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
  30. pub struct IdInput {
  31. /// Case or patient ID.
  32. pub case_id: String,
  33. /// Time point or sample type.
  34. pub sample_type: String,
  35. /// Barcode number (sequencing index).
  36. pub barcode: String,
  37. /// Flow cell identifier.
  38. pub flow_cell_id: String,
  39. }
  40. impl IdsInput {
  41. /// Load `IdsInput` from a JSON file.
  42. pub fn load_json<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
  43. let file = File::open(&path)?;
  44. Ok(serde_json::from_reader(file)?)
  45. }
  46. /// Save `IdsInput` to a JSON file (pretty-printed).
  47. pub fn save_json<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
  48. let file = File::create(&path)?;
  49. serde_json::to_writer_pretty(file, self)?;
  50. Ok(())
  51. }
  52. /// Remove duplicate `IdInput` entries, retaining the first occurrence.
  53. pub fn dedup(&mut self) {
  54. let mut seen = HashSet::new();
  55. self.data.retain(|item| seen.insert(item.clone()));
  56. }
  57. /// Add a new `IdInput` and deduplicate.
  58. pub fn add_input(&mut self, entry: IdInput) {
  59. self.data.push(entry);
  60. self.dedup();
  61. }
  62. }
  63. /// Container for a deduplicated and enriched collection of flowcells (`FlowCel`).
  64. ///
  65. /// `FlowCells` represents the aggregated result of scanning multiple sources:
  66. /// - A cached archive of flowcells (`archive_store_path`)
  67. /// - A live scan of the local run directory (`local_run_dir`)
  68. ///
  69. /// Each [`FlowCel`] contains all necessary metadata for downstream processing,
  70. /// including parsed MinKNOW sample sheet data, `.pod5` file statistics, experiment layout,
  71. /// and optional sample/case annotations from an [`IdsInput`] file.
  72. ///
  73. /// The [`FlowCells::load`] method performs the following:
  74. /// - Loads existing flowcells from the archive if available
  75. /// - Scans local directories for new or updated flowcells
  76. /// - Deduplicates flowcells using the `flowcell_id`
  77. /// - Retains the most recently modified version of each flowcell
  78. /// - Enriches each flowcell with case-level annotations
  79. ///
  80. /// # Fields
  81. /// - `flow_cells`: A deduplicated list of fully parsed [`FlowCel`] instances.
  82. ///
  83. /// # Example
  84. /// ```
  85. /// let flow_cells = FlowCells::load(
  86. /// "/mnt/data/runs",
  87. /// "inputs.json",
  88. /// "flowcell_cache.json"
  89. /// )?;
  90. /// println!("Loaded {} unique flowcells", flow_cells.flow_cells.len());
  91. /// ```
  92. ///
  93. /// # Deduplication
  94. /// Flowcells are uniquely identified by their `flowcell_id`, a combination of
  95. /// `{experiment_id}/{sample_id}`. If both archived and local versions exist,
  96. /// the one with the latest `.pod5` modification time is retained.
  97. ///
  98. /// # Related Types
  99. /// - [`FlowCel`]: Describes a flowcell, its metadata, and files
  100. /// - [`FlowCellExperiment`]: Muxed vs. demuxed layout classification
  101. /// - [`FlowCellLocation`]: Indicates source (local or archive)
  102. /// - [`MinKnowSampleSheet`]: Parsed sample sheet data
  103. /// - [`IdInput`]: Case-level annotation applied to flowcells
  104. #[derive(Debug, Serialize, Deserialize, Clone, Default)]
  105. pub struct FlowCells {
  106. /// A collection of parsed flowcell metadata records.
  107. pub flow_cells: Vec<FlowCell>,
  108. }
  109. impl FlowCells {
  110. /// Load and merge flowcells from archive and local directories, then annotate with `IdsInput`.
  111. ///
  112. /// # Arguments
  113. /// * `local_run_dir` - Root directory containing local flowcell run folders.
  114. /// * `inputs_path` - Path to a JSON file with [`IdInput`] annotations.
  115. /// * `archive_store_path` - Path to a cached JSON file of archived flowcells.
  116. ///
  117. /// # Returns
  118. /// A deduplicated and annotated [`FlowCells`] collection.
  119. ///
  120. /// # Errors
  121. /// Returns an error if any input file cannot be read or parsed, or if local scanning fails.
  122. pub fn load(
  123. local_run_dir: &str,
  124. inputs_path: &str,
  125. archive_store_path: &str,
  126. ) -> anyhow::Result<Self> {
  127. let mut instance = Self::default();
  128. instance.load_from_archive(archive_store_path)?;
  129. instance.load_from_local(local_run_dir)?;
  130. instance.annotate_with_inputs(inputs_path)?;
  131. instance.save_to_archive(archive_store_path)?;
  132. Ok(instance)
  133. }
  134. /// Load flowcells from a cached archive and merge into `self.flow_cells`.
  135. ///
  136. /// Retains only the most recently modified entry for each `flowcell_id`.
  137. ///
  138. /// # Arguments
  139. /// * `archive_path` - Path to archive JSON file.
  140. ///
  141. /// # Errors
  142. /// Returns an error if the file cannot be read or parsed.
  143. pub fn load_from_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
  144. if !Path::new(archive_path).exists() {
  145. return Ok(());
  146. }
  147. let file = File::open(archive_path)
  148. .with_context(|| format!("Failed to open archive file: {archive_path}"))?;
  149. let archived: Vec<FlowCell> = serde_json::from_reader(BufReader::new(file))
  150. .with_context(|| format!("Failed to parse FlowCells from: {archive_path}"))?;
  151. self.insert_flowcells(archived);
  152. Ok(())
  153. }
  154. pub fn save_to_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
  155. let file = OpenOptions::new()
  156. .write(true)
  157. .create(true)
  158. .truncate(true)
  159. .open(archive_path)
  160. .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
  161. serde_json::to_writer_pretty(file, &self.flow_cells)
  162. .with_context(|| format!("Failed to write FlowCells to: {archive_path}"))?;
  163. Ok(())
  164. }
  165. /// Scan local run directories for flowcells and merge into `self.flow_cells`.
  166. ///
  167. /// Uses `scan_local()` to parse local metadata and selects the most recently modified
  168. /// version if duplicates exist.
  169. ///
  170. /// # Arguments
  171. /// * `local_dir` - Root directory containing flowcell runs.
  172. ///
  173. /// # Errors
  174. /// Returns an error if scanning fails or if directory layout is invalid.
  175. pub fn load_from_local(&mut self, local_dir: &str) -> anyhow::Result<()> {
  176. let sample_sheets = find_files(&format!("{local_dir}/**/sample_sheet*"))?;
  177. for path in sample_sheets {
  178. let dir = path
  179. .parent()
  180. .ok_or_else(|| anyhow::anyhow!("Invalid sample_sheet path: {}", path.display()))?;
  181. let dir_str = dir.to_string_lossy();
  182. let (sheet, pore, throughput, files) = scan_local(&dir_str)
  183. .with_context(|| format!("Failed to scan local dir: {dir_str}"))?;
  184. let fc = FlowCell::new(
  185. sheet,
  186. pore,
  187. throughput,
  188. FlowCellLocation::Local(dir_str.to_string()),
  189. files,
  190. )
  191. .with_context(|| format!("Failed to create FlowCell from dir: {dir_str}"))?;
  192. self.insert_flowcells(vec![fc]);
  193. }
  194. Ok(())
  195. }
  196. /// Annotate flowcells with matching `IdInput` records from JSON.
  197. ///
  198. /// Assigns to `FlowCell.cases` all matching `IdInput`s by `flow_cell_id`.
  199. ///
  200. /// # Arguments
  201. /// * `inputs_path` - Path to JSON file containing a list of `IdInput` records.
  202. ///
  203. /// # Errors
  204. /// Returns an error if the file is unreadable or malformed.
  205. pub fn annotate_with_inputs(&mut self, inputs_path: &str) -> anyhow::Result<()> {
  206. if !Path::new(inputs_path).exists() {
  207. warn!("IdsInput file not found at {}", inputs_path);
  208. return Ok(());
  209. }
  210. let inputs = IdsInput::load_json(inputs_path)
  211. .with_context(|| format!("Failed to load IdsInput from: {inputs_path}"))?;
  212. self.cross_cases(&inputs)
  213. .with_context(|| format!("Failed to cross with IdsInput from: {inputs_path}"))?;
  214. Ok(())
  215. }
  216. pub fn cross_cases(&mut self, inputs: &IdsInput) -> anyhow::Result<()> {
  217. for fc in &mut self.flow_cells {
  218. fc.cases = inputs
  219. .data
  220. .iter()
  221. .filter(|id| id.flow_cell_id == fc.sample_sheet.flow_cell_id)
  222. .cloned()
  223. .collect();
  224. }
  225. Ok(())
  226. }
  227. /// Insert new flowcells into the current collection, deduplicating by `flowcell_id`.
  228. ///
  229. /// For duplicates, retains the flowcell with the newer `.modified` timestamp.
  230. ///
  231. /// # Arguments
  232. /// * `new_cells` - A vector of parsed `FlowCell` instances.
  233. pub fn insert_flowcells(&mut self, new_cells: Vec<FlowCell>) {
  234. let mut map: HashMap<String, FlowCell> = self
  235. .flow_cells
  236. .drain(..)
  237. .map(|fc| (fc.sample_sheet.flow_cell_id.clone(), fc))
  238. .collect();
  239. for fc in new_cells {
  240. map.entry(fc.sample_sheet.flow_cell_id.clone())
  241. .and_modify(|existing| {
  242. if fc.modified > existing.modified {
  243. *existing = fc.clone();
  244. }
  245. })
  246. .or_insert(fc);
  247. }
  248. self.flow_cells = map.into_values().collect();
  249. }
  250. /// Updates a JSON archive of `FlowCel` objects by scanning `.tar` archives in a directory.
  251. ///
  252. /// This function is used to **discover new archived flowcells** by scanning all `.tar` files
  253. /// in a given directory, parsing their contents using [`scan_archive`] and [`FlowCel::new`],
  254. /// and then appending the results to an existing JSON file (if present).
  255. /// Flowcells are **deduplicated** by `flowcell_id`, and the updated result is saved back to disk.
  256. ///
  257. /// # Arguments
  258. /// - `archive_path`: Path to a directory containing `.tar` archives produced by MinKNOW.
  259. /// - `save_path`: Path to a JSON file where the deduplicated list of `FlowCel` objects will be saved.
  260. ///
  261. /// # Behavior
  262. /// - If `save_path` exists, the function loads existing flowcells from it.
  263. /// - Then it scans all `.tar` files in `archive_path`, one by one:
  264. /// - Extracts `sample_sheet` and `.pod5` file metadata using [`scan_archive`]
  265. /// - Builds a new [`FlowCel`] using [`FlowCel::new`] with location `FlowCellLocation::Archived(...)`
  266. /// - Logs and skips entries that fail to parse
  267. /// - All new flowcells are added to the existing list and deduplicated.
  268. /// - The updated list is sorted and written back to `save_path`.
  269. ///
  270. /// # Deduplication
  271. /// - Flowcells are deduplicated using `.dedup_by_key(|fc| fc.flowcell_id.clone())`.
  272. /// - The last encountered entry is kept if duplicates exist.
  273. ///
  274. /// # Returns
  275. /// - `Ok(())` if scanning and update succeeds.
  276. /// - `Err` if the archive directory, `.tar` files, or save path cannot be processed.
  277. ///
  278. /// # Example
  279. /// ```
  280. /// update_archive_from_scan("archives/", "flowcell_cache.json")?;
  281. /// ```
  282. ///
  283. /// # See Also
  284. /// - [`scan_archive`]
  285. /// - [`FlowCell`]
  286. /// - [`FlowCellLocation::Archived`]
  287. pub fn update_archive_from_scan(archive_path: &str, save_path: &str) -> anyhow::Result<()> {
  288. // Load existing archive, if any
  289. let mut all: Vec<FlowCell> = if Path::new(save_path).exists() {
  290. let file = File::open(save_path)?;
  291. serde_json::from_reader(BufReader::new(file))?
  292. } else {
  293. Vec::new()
  294. };
  295. let n_before = all.len();
  296. let pattern = format!("{archive_path}/*.tar");
  297. // Scan all .tar archives
  298. let res: Vec<FlowCell> = glob(&pattern)?
  299. .filter_map(Result::ok)
  300. .filter_map(|path| {
  301. let archive_str = path.to_string_lossy();
  302. let (sample_sheet, pore_activity, throughput, files) =
  303. match scan_archive(&archive_str) {
  304. Ok(r) => r,
  305. Err(e) => {
  306. warn!("Failed to scan archive {}: {e}", archive_str);
  307. return None;
  308. }
  309. };
  310. match FlowCell::new(
  311. sample_sheet,
  312. pore_activity,
  313. throughput,
  314. FlowCellLocation::Archived(archive_path.to_string()),
  315. files,
  316. ) {
  317. Ok(fc) => Some(fc),
  318. Err(e) => {
  319. warn!("Failed to create FlowCel from {}: {e}", archive_str);
  320. None
  321. }
  322. }
  323. })
  324. .collect();
  325. // Merge, deduplicate, and write updated archive
  326. all.extend(res);
  327. all.sort_by(|a, b| {
  328. a.sample_sheet
  329. .flow_cell_id
  330. .cmp(&b.sample_sheet.flow_cell_id)
  331. });
  332. all.dedup_by_key(|v| v.sample_sheet.flow_cell_id.clone());
  333. let n_final = all.len();
  334. info!(
  335. "{} new archive(s) discovered.",
  336. n_final.saturating_sub(n_before)
  337. );
  338. let json = serde_json::to_string_pretty(&all)
  339. .map_err(|e| anyhow::anyhow!("Can't convert into json.\n{e}"))?;
  340. fs::write(save_path, json)
  341. .map_err(|e| anyhow::anyhow!("Can't write file: {save_path}.\n{e}"))?;
  342. Ok(())
  343. }
  344. }
  345. /// Represents a fully described flowcell unit, including experimental metadata,
  346. /// physical location (local or archived), sample sheet data, and associated pod5 files.
  347. ///
  348. /// A `FlowCell` object serves as the central unit in the data model for sample aggregation
  349. /// and downstream processing.
  350. ///
  351. /// # Fields
  352. /// - `sample_sheet`: The original MinKNOW sample sheet metadata (`MinKnowSampleSheet`).
  353. /// - `experiment`: Experiment type inferred from `.pod5` files (see `FlowCellExperiment`).
  354. /// - `location`: Whether the flowcell was loaded from a local directory or archive store.
  355. /// - `modified`: Last modification timestamp among `.pod5` files.
  356. /// - `pod5_size`: Total size (in bytes) of `.pod5` files.
  357. /// - `n_pod5`: Number of `.pod5` files found.
  358. /// - `cases`: List of sample/case-level annotations associated with this flowcell (from `IdsInput`).
  359. #[derive(Debug, Serialize, Deserialize, Clone)]
  360. pub struct FlowCell {
  361. pub sample_sheet: MinKnowSampleSheet,
  362. pub experiment: FlowCellExperiment,
  363. pub location: FlowCellLocation,
  364. pub modified: DateTime<Utc>,
  365. pub pod5_size: usize,
  366. pub n_pod5: usize,
  367. pub cases: Vec<IdInput>,
  368. pub pore_activity: Option<Vec<PoreStateEntry>>,
  369. pub throughput: Option<Vec<ThroughputEntry>>,
  370. }
  371. /// Describes the physical origin of a flowcell when loaded.
  372. ///
  373. /// This is used to differentiate flowcells discovered during a local scan
  374. /// versus those restored from an archived store.
  375. #[derive(Debug, Serialize, Deserialize, Clone)]
  376. pub enum FlowCellLocation {
  377. /// Flowcell discovered in a local filesystem path.
  378. Local(String),
  379. /// Flowcell restored from a `.tar` archive or a serialized cache.
  380. Archived(String),
  381. }
  382. impl fmt::Display for FlowCellLocation {
  383. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  384. write!(
  385. f,
  386. "{}",
  387. match self {
  388. FlowCellLocation::Local(_) => "local",
  389. FlowCellLocation::Archived(_) => "archived",
  390. }
  391. )
  392. }
  393. }
  394. impl FlowCell {
  395. /// Constructs a new `FlowCell` from a sample sheet and associated file list.
  396. ///
  397. /// This method aggregates information from a parsed `MinKnowSampleSheet` and the
  398. /// corresponding `.pod5` file metadata, and infers the experiment type from
  399. /// file paths using `FlowCellExperiment::from_pod5_paths`.
  400. ///
  401. /// # Arguments
  402. /// - `sample_sheet`: Parsed sample sheet metadata.
  403. /// - `location`: Origin of the flowcell (local or archived).
  404. /// - `files`: List of files associated with the flowcell, each with:
  405. /// - `String`: file path
  406. /// - `u64`: size in bytes
  407. /// - `DateTime<Utc>`: modification time
  408. ///
  409. /// # Returns
  410. /// - `Ok(FlowCell)` if experiment type and file metadata are successfully resolved.
  411. /// - `Err` if the experiment type cannot be determined.
  412. ///
  413. /// # Errors
  414. /// - If `FlowCellExperiment::from_pod5_paths` fails (e.g., unknown layout).
  415. ///
  416. /// # Example
  417. /// ```
  418. /// let fc = FlowCell::new(sample_sheet, FlowCellLocation::Local(dir), files)?;
  419. /// println!("Flowcell ID: {}", fc.flowcell_id);
  420. /// ```
  421. pub fn new(
  422. sample_sheet: MinKnowSampleSheet,
  423. pore_activity: Option<Vec<PoreStateEntry>>,
  424. throughput: Option<Vec<ThroughputEntry>>,
  425. location: FlowCellLocation,
  426. files: Vec<(String, u64, DateTime<Utc>)>,
  427. ) -> anyhow::Result<Self> {
  428. let flowcell_id = sample_sheet.flow_cell_id.clone();
  429. // Filter .pod5 files
  430. let pod5s: Vec<_> = files
  431. .iter()
  432. .filter(|(path, _, _)| path.ends_with(".pod5"))
  433. .cloned()
  434. .collect();
  435. let n_pod5 = pod5s.len();
  436. // Infer experiment type from pod5 paths
  437. let experiment = FlowCellExperiment::from_pod5_paths(
  438. &files.iter().map(|(p, _, _)| p.to_string()).collect(),
  439. )
  440. .ok_or_else(|| anyhow::anyhow!("Can't find experiment type for {flowcell_id}"))?;
  441. // Aggregate pod5 size and latest modification time
  442. let (pod5_size, modified): (usize, DateTime<Utc>) = files
  443. .into_iter()
  444. .filter(|(path, _, _)| path.ends_with(".pod5"))
  445. .fold(
  446. (0, DateTime::<Utc>::MIN_UTC),
  447. |(acc_size, acc_time), (_, size, time)| {
  448. (
  449. acc_size + size as usize,
  450. if acc_time < time { time } else { acc_time },
  451. )
  452. },
  453. );
  454. Ok(Self {
  455. experiment,
  456. location,
  457. modified,
  458. sample_sheet,
  459. pod5_size,
  460. n_pod5,
  461. cases: Vec::new(),
  462. pore_activity,
  463. throughput,
  464. })
  465. }
  466. }
  467. /// Describes the type of experiment layout based on `.pod5` file structure.
  468. ///
  469. /// Used to distinguish between whole-genome sequencing (WGS) `.pod5` files
  470. /// organized in a single (muxed) directory or demultiplexed (`pod5_pass`) structure.
  471. #[derive(Debug, Serialize, Deserialize, Clone)]
  472. pub enum FlowCellExperiment {
  473. /// `.pod5` files are stored in a single unbarcoded directory, typically `/pod5/`.
  474. WGSPod5Mux(String),
  475. /// `.pod5` files are organized by barcode in subdirectories, typically `/pod5_pass/`.
  476. WGSPod5Demux(String),
  477. }
  478. impl fmt::Display for FlowCellExperiment {
  479. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  480. write!(
  481. f,
  482. "{}",
  483. match self {
  484. FlowCellExperiment::WGSPod5Mux(_) => "WGS Pod5 Muxed",
  485. FlowCellExperiment::WGSPod5Demux(_) => "WGS Pod5 Demuxed",
  486. }
  487. )
  488. }
  489. }
  490. impl FlowCellExperiment {
  491. /// Attempts to infer the experiment type from the immediate subdirectories of the given path.
  492. ///
  493. /// This is useful when scanning a flowcell directory directly and checking
  494. /// whether it contains a `pod5/` or `pod5_pass/` structure.
  495. ///
  496. /// # Arguments
  497. /// - `flowcell_path`: Path to the root of a flowcell directory.
  498. ///
  499. /// # Returns
  500. /// - `Some(FlowCellExperiment)` if a known subdirectory is found.
  501. /// - `None` if no match is detected.
  502. pub fn from_path(flowcell_path: &str) -> Option<Self> {
  503. for dir in list_directories(flowcell_path).ok().unwrap_or_default() {
  504. if dir == "pod5" {
  505. return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string()));
  506. }
  507. if dir == "pod5_pass" {
  508. return Some(FlowCellExperiment::WGSPod5Demux(dir.to_string()));
  509. }
  510. }
  511. None
  512. }
  513. /// Attempts to infer the experiment type from a list of `.pod5` file paths.
  514. ///
  515. /// This is typically used when files have already been collected and their
  516. /// parent directories can be checked for naming conventions.
  517. ///
  518. /// # Arguments
  519. /// - `all_paths`: Vector of paths (as strings) to `.pod5` files or directories.
  520. ///
  521. /// # Returns
  522. /// - `Some(FlowCellExperiment)` if a known suffix is detected.
  523. /// - `None` if no matching pattern is found.
  524. pub fn from_pod5_paths(all_paths: &Vec<String>) -> Option<Self> {
  525. for path in all_paths {
  526. if path.ends_with("/pod5/") || path.ends_with("/pod5") {
  527. return Some(FlowCellExperiment::WGSPod5Mux(path.to_string()));
  528. }
  529. if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") {
  530. return Some(FlowCellExperiment::WGSPod5Demux(path.to_string()));
  531. }
  532. }
  533. None
  534. }
  535. /// Returns the underlying string (directory path) for the experiment.
  536. ///
  537. /// This is useful when you need access to the directory path used to classify the experiment.
  538. pub fn inner(&self) -> &str {
  539. match self {
  540. FlowCellExperiment::WGSPod5Mux(v) => v,
  541. FlowCellExperiment::WGSPod5Demux(v) => v,
  542. }
  543. }
  544. }
  545. /// Represents the result of scanning a MinKNOW experiment source.
  546. ///
  547. /// This tuple includes:
  548. /// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample.
  549. /// - `Vec<(String, u64, DateTime<Utc>)>`: A list of files with:
  550. /// - `String`: file path
  551. /// - `u64`: file size in bytes
  552. /// - `DateTime<Utc>`: last modification time (UTC)
  553. type ExperimentData = (
  554. MinKnowSampleSheet,
  555. Option<Vec<PoreStateEntry>>,
  556. Option<Vec<ThroughputEntry>>,
  557. Vec<(String, u64, DateTime<Utc>)>,
  558. );
  559. /// Scans a local directory for MinKNOW experiment files and metadata.
  560. ///
  561. /// This function recursively walks a directory using globbing,
  562. /// collects file paths, sizes, and modification timestamps,
  563. /// and identifies a `sample_sheet` file to parse as `MinKnowSampleSheet`.
  564. ///
  565. /// # Arguments
  566. /// - `dir`: Root directory to scan (absolute or relative).
  567. ///
  568. /// # Returns
  569. /// - `Ok(ExperimentData)` containing the sample sheet and a list of file records.
  570. /// - `Err` if the directory can't be accessed, no sample sheet is found, or parsing fails.
  571. ///
  572. /// # Requirements
  573. /// - A file path containing `"sample_sheet"` must be present and readable.
  574. /// - The sample sheet must be formatted according to MinKNOW expectations
  575. /// (1 header + 1 data row).
  576. ///
  577. /// # Errors
  578. /// - If reading files or metadata fails.
  579. /// - If the sample sheet is missing or invalid.
  580. ///
  581. /// # Example
  582. /// ```
  583. /// let (sheet, files) = scan_local("/data/run001")?;
  584. /// println!("Kit used: {}", sheet.kit);
  585. /// println!("Number of files found: {}", files.len());
  586. /// ```
  587. pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
  588. let mut result = Vec::new();
  589. let mut sample_sheet: Option<String> = None;
  590. let mut pore_activity = None;
  591. let mut throughput = None;
  592. for entry in glob(&format!("{}/**/*", dir))? {
  593. let file = entry.context("Failed to read an entry from the tar archive")?;
  594. // Extract file properties safely
  595. let metadata = file.metadata().context(format!(
  596. "Failed to access file metadata: {}",
  597. file.display()
  598. ))?;
  599. let size = metadata.size();
  600. let modified = metadata.mtime();
  601. let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
  602. let path = file.to_string_lossy().into_owned();
  603. if path.contains("pore_activity") {
  604. let mut reader =
  605. File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
  606. pore_activity = Some(parse_pore_activity_from_reader(&mut reader).with_context(
  607. || {
  608. format!(
  609. "Failed to parse pore activity date from: {}",
  610. file.display()
  611. )
  612. },
  613. )?);
  614. } else if path.contains("sample_sheet") {
  615. sample_sheet = Some(path.clone());
  616. } else if path.contains("throughput") {
  617. let mut reader =
  618. File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
  619. throughput = Some(parse_throughput_from_reader(&mut reader).with_context(|| {
  620. format!("Failed to parse throughput date from: {}", file.display())
  621. })?);
  622. }
  623. result.push((path, size, modified_utc));
  624. }
  625. let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?;
  626. let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
  627. .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
  628. Ok((sample_sheet, pore_activity, throughput, result))
  629. }
  630. /// Scans a `.tar` archive containing a MinKNOW sequencing experiment.
  631. ///
  632. /// This function opens a TAR archive, searches for the `sample_sheet` CSV file,
  633. /// extracts its metadata, and parses it into a `MinKnowSampleSheet`.
  634. /// All other entries in the archive are collected with their path, size, and modification time.
  635. ///
  636. /// # Arguments
  637. /// - `tar_path`: Path to the `.tar` archive file.
  638. ///
  639. /// # Returns
  640. /// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata.
  641. /// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`.
  642. ///
  643. /// # Archive Requirements
  644. /// - Must contain exactly one file matching `"sample_sheet"` in its path.
  645. /// - The sample sheet must contain a valid CSV header and a single data row.
  646. ///
  647. /// # Errors
  648. /// - Fails if the archive can't be opened or read.
  649. /// - Fails if any entry is malformed (e.g., missing timestamp).
  650. /// - Fails if no sample sheet is found or if it is malformed.
  651. ///
  652. /// # Example
  653. /// ```no_run
  654. /// let (sample_sheet, files) = scan_archive("archive.tar")?;
  655. /// println!("Sample ID: {}", sample_sheet.sample_id);
  656. /// println!("Total files in archive: {}", files.len());
  657. /// ```
  658. pub fn scan_archive(tar_path: &str) -> anyhow::Result<ExperimentData> {
  659. info!("Scanning archive: {tar_path}");
  660. let file = File::open(tar_path)
  661. .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
  662. let mut archive = tar::Archive::new(file);
  663. let mut result = Vec::new();
  664. let mut sample_sheet: Option<String> = None;
  665. let mut throughput = None;
  666. let mut pore_activity = None;
  667. // Iterate through the entries in the archive
  668. for entry in archive.entries_with_seek()? {
  669. let mut file = entry.context("Failed to read an entry from the tar archive")?;
  670. // Extract file properties safely
  671. let size = file.size();
  672. let modified = file
  673. .header()
  674. .mtime()
  675. .context("Failed to get modification time")?;
  676. let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
  677. let path = file
  678. .path()
  679. .context("Failed to get file path from tar entry")?
  680. .to_string_lossy()
  681. .into_owned();
  682. if path.contains("pore_activity") {
  683. pore_activity = Some(
  684. parse_pore_activity_from_reader(&mut file)
  685. .context("Failed to read pore_activity data: {path}")?,
  686. );
  687. } else if path.contains("sample_sheet") {
  688. let mut buffer = String::new();
  689. file.read_to_string(&mut buffer)
  690. .context("Failed to read file contents as string")?;
  691. sample_sheet = Some(buffer);
  692. } else if path.contains("throughput") {
  693. throughput = Some(
  694. parse_throughput_from_reader(&mut file)
  695. .context("Failed to read throughput data: {path}")?,
  696. );
  697. }
  698. result.push((path, size, modified_utc));
  699. }
  700. let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
  701. "No sample sheet detected in archive: {tar_path}"
  702. ))?;
  703. let (_, data) = sample_sheet
  704. .split_once("\n")
  705. .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
  706. let sample_sheet: MinKnowSampleSheet = data
  707. .try_into()
  708. .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
  709. Ok((sample_sheet, pore_activity, throughput, result))
  710. }