| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788 |
- use std::{
- collections::{HashMap, HashSet},
- fmt,
- fs::{self, File, OpenOptions},
- io::{BufReader, Read},
- os::unix::fs::MetadataExt,
- path::Path,
- };
- use anyhow::Context;
- use chrono::{DateTime, TimeZone, Utc};
- use glob::glob;
- use log::{info, warn};
- use serde::{Deserialize, Serialize};
- use crate::{
- collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader},
- helpers::{find_files, list_directories},
- };
- use super::minknow::{MinKnowSampleSheet, PoreStateEntry, ThroughputEntry};
- /// A collection of `IdInput` records, with utility methods
- /// for loading, saving, deduplication, and construction from TSV.
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct IdsInput {
- /// The list of ID entries.
- pub data: Vec<IdInput>,
- }
- /// A unique sample identifier from sequencing metadata.
- ///
- /// Uniqueness is defined by the combination of all fields.
- #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
- pub struct IdInput {
- /// Case or patient ID.
- pub case_id: String,
- /// Time point or sample type.
- pub sample_type: String,
- /// Barcode number (sequencing index).
- pub barcode: String,
- /// Flow cell identifier.
- pub flow_cell_id: String,
- }
- impl IdsInput {
- /// Load `IdsInput` from a JSON file.
- pub fn load_json<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
- let file = File::open(&path)?;
- Ok(serde_json::from_reader(file)?)
- }
- /// Save `IdsInput` to a JSON file (pretty-printed).
- pub fn save_json<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
- let file = File::create(&path)?;
- serde_json::to_writer_pretty(file, self)?;
- Ok(())
- }
- /// Remove duplicate `IdInput` entries, retaining the first occurrence.
- pub fn dedup(&mut self) {
- let mut seen = HashSet::new();
- self.data.retain(|item| seen.insert(item.clone()));
- }
- /// Add a new `IdInput` and deduplicate.
- pub fn add_input(&mut self, entry: IdInput) {
- self.data.push(entry);
- self.dedup();
- }
- }
- /// Container for a deduplicated and enriched collection of flowcells (`FlowCel`).
- ///
- /// `FlowCells` represents the aggregated result of scanning multiple sources:
- /// - A cached archive of flowcells (`archive_store_path`)
- /// - A live scan of the local run directory (`local_run_dir`)
- ///
- /// Each [`FlowCel`] contains all necessary metadata for downstream processing,
- /// including parsed MinKNOW sample sheet data, `.pod5` file statistics, experiment layout,
- /// and optional sample/case annotations from an [`IdsInput`] file.
- ///
- /// The [`FlowCells::load`] method performs the following:
- /// - Loads existing flowcells from the archive if available
- /// - Scans local directories for new or updated flowcells
- /// - Deduplicates flowcells using the `flowcell_id`
- /// - Retains the most recently modified version of each flowcell
- /// - Enriches each flowcell with case-level annotations
- ///
- /// # Fields
- /// - `flow_cells`: A deduplicated list of fully parsed [`FlowCel`] instances.
- ///
- /// # Example
- /// ```
- /// let flow_cells = FlowCells::load(
- /// "/mnt/data/runs",
- /// "inputs.json",
- /// "flowcell_cache.json"
- /// )?;
- /// println!("Loaded {} unique flowcells", flow_cells.flow_cells.len());
- /// ```
- ///
- /// # Deduplication
- /// Flowcells are uniquely identified by their `flowcell_id`, a combination of
- /// `{experiment_id}/{sample_id}`. If both archived and local versions exist,
- /// the one with the latest `.pod5` modification time is retained.
- ///
- /// # Related Types
- /// - [`FlowCel`]: Describes a flowcell, its metadata, and files
- /// - [`FlowCellExperiment`]: Muxed vs. demuxed layout classification
- /// - [`FlowCellLocation`]: Indicates source (local or archive)
- /// - [`MinKnowSampleSheet`]: Parsed sample sheet data
- /// - [`IdInput`]: Case-level annotation applied to flowcells
- #[derive(Debug, Serialize, Deserialize, Clone, Default)]
- pub struct FlowCells {
- /// A collection of parsed flowcell metadata records.
- pub flow_cells: Vec<FlowCell>,
- }
- impl FlowCells {
- /// Load and merge flowcells from archive and local directories, then annotate with `IdsInput`.
- ///
- /// # Arguments
- /// * `local_run_dir` - Root directory containing local flowcell run folders.
- /// * `inputs_path` - Path to a JSON file with [`IdInput`] annotations.
- /// * `archive_store_path` - Path to a cached JSON file of archived flowcells.
- ///
- /// # Returns
- /// A deduplicated and annotated [`FlowCells`] collection.
- ///
- /// # Errors
- /// Returns an error if any input file cannot be read or parsed, or if local scanning fails.
- pub fn load(
- local_run_dir: &str,
- inputs_path: &str,
- archive_store_path: &str,
- ) -> anyhow::Result<Self> {
- let mut instance = Self::default();
- instance.load_from_archive(archive_store_path)?;
- instance.load_from_local(local_run_dir)?;
- instance.annotate_with_inputs(inputs_path)?;
- instance.save_to_archive(archive_store_path)?;
- Ok(instance)
- }
- /// Load flowcells from a cached archive and merge into `self.flow_cells`.
- ///
- /// Retains only the most recently modified entry for each `flowcell_id`.
- ///
- /// # Arguments
- /// * `archive_path` - Path to archive JSON file.
- ///
- /// # Errors
- /// Returns an error if the file cannot be read or parsed.
- pub fn load_from_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
- if !Path::new(archive_path).exists() {
- return Ok(());
- }
- let file = File::open(archive_path)
- .with_context(|| format!("Failed to open archive file: {archive_path}"))?;
- let archived: Vec<FlowCell> = serde_json::from_reader(BufReader::new(file))
- .with_context(|| format!("Failed to parse FlowCells from: {archive_path}"))?;
- self.insert_flowcells(archived);
- Ok(())
- }
- pub fn save_to_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
- let file = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(archive_path)
- .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
- serde_json::to_writer_pretty(file, &self.flow_cells)
- .with_context(|| format!("Failed to write FlowCells to: {archive_path}"))?;
- Ok(())
- }
- /// Scan local run directories for flowcells and merge into `self.flow_cells`.
- ///
- /// Uses `scan_local()` to parse local metadata and selects the most recently modified
- /// version if duplicates exist.
- ///
- /// # Arguments
- /// * `local_dir` - Root directory containing flowcell runs.
- ///
- /// # Errors
- /// Returns an error if scanning fails or if directory layout is invalid.
- pub fn load_from_local(&mut self, local_dir: &str) -> anyhow::Result<()> {
- let sample_sheets = find_files(&format!("{local_dir}/**/sample_sheet*"))?;
- for path in sample_sheets {
- let dir = path
- .parent()
- .ok_or_else(|| anyhow::anyhow!("Invalid sample_sheet path: {}", path.display()))?;
- let dir_str = dir.to_string_lossy();
- let (sheet, pore, throughput, files) = scan_local(&dir_str)
- .with_context(|| format!("Failed to scan local dir: {dir_str}"))?;
- let fc = FlowCell::new(
- sheet,
- pore,
- throughput,
- FlowCellLocation::Local(dir_str.to_string()),
- files,
- )
- .with_context(|| format!("Failed to create FlowCell from dir: {dir_str}"))?;
- self.insert_flowcells(vec![fc]);
- }
- Ok(())
- }
- /// Annotate flowcells with matching `IdInput` records from JSON.
- ///
- /// Assigns to `FlowCell.cases` all matching `IdInput`s by `flow_cell_id`.
- ///
- /// # Arguments
- /// * `inputs_path` - Path to JSON file containing a list of `IdInput` records.
- ///
- /// # Errors
- /// Returns an error if the file is unreadable or malformed.
- pub fn annotate_with_inputs(&mut self, inputs_path: &str) -> anyhow::Result<()> {
- if !Path::new(inputs_path).exists() {
- warn!("IdsInput file not found at {}", inputs_path);
- return Ok(());
- }
- let inputs = IdsInput::load_json(inputs_path)
- .with_context(|| format!("Failed to load IdsInput from: {inputs_path}"))?;
- self.cross_cases(&inputs)
- .with_context(|| format!("Failed to cross with IdsInput from: {inputs_path}"))?;
- Ok(())
- }
- pub fn cross_cases(&mut self, inputs: &IdsInput) -> anyhow::Result<()> {
- for fc in &mut self.flow_cells {
- fc.cases = inputs
- .data
- .iter()
- .filter(|id| id.flow_cell_id == fc.sample_sheet.flow_cell_id)
- .cloned()
- .collect();
- }
- Ok(())
- }
- /// Insert new flowcells into the current collection, deduplicating by `flowcell_id`.
- ///
- /// For duplicates, retains the flowcell with the newer `.modified` timestamp.
- ///
- /// # Arguments
- /// * `new_cells` - A vector of parsed `FlowCell` instances.
- pub fn insert_flowcells(&mut self, new_cells: Vec<FlowCell>) {
- let mut map: HashMap<String, FlowCell> = self
- .flow_cells
- .drain(..)
- .map(|fc| (fc.sample_sheet.flow_cell_id.clone(), fc))
- .collect();
- for fc in new_cells {
- map.entry(fc.sample_sheet.flow_cell_id.clone())
- .and_modify(|existing| {
- if fc.modified > existing.modified {
- *existing = fc.clone();
- }
- })
- .or_insert(fc);
- }
- self.flow_cells = map.into_values().collect();
- }
- /// Updates a JSON archive of `FlowCel` objects by scanning `.tar` archives in a directory.
- ///
- /// This function is used to **discover new archived flowcells** by scanning all `.tar` files
- /// in a given directory, parsing their contents using [`scan_archive`] and [`FlowCel::new`],
- /// and then appending the results to an existing JSON file (if present).
- /// Flowcells are **deduplicated** by `flowcell_id`, and the updated result is saved back to disk.
- ///
- /// # Arguments
- /// - `archive_path`: Path to a directory containing `.tar` archives produced by MinKNOW.
- /// - `save_path`: Path to a JSON file where the deduplicated list of `FlowCel` objects will be saved.
- ///
- /// # Behavior
- /// - If `save_path` exists, the function loads existing flowcells from it.
- /// - Then it scans all `.tar` files in `archive_path`, one by one:
- /// - Extracts `sample_sheet` and `.pod5` file metadata using [`scan_archive`]
- /// - Builds a new [`FlowCel`] using [`FlowCel::new`] with location `FlowCellLocation::Archived(...)`
- /// - Logs and skips entries that fail to parse
- /// - All new flowcells are added to the existing list and deduplicated.
- /// - The updated list is sorted and written back to `save_path`.
- ///
- /// # Deduplication
- /// - Flowcells are deduplicated using `.dedup_by_key(|fc| fc.flowcell_id.clone())`.
- /// - The last encountered entry is kept if duplicates exist.
- ///
- /// # Returns
- /// - `Ok(())` if scanning and update succeeds.
- /// - `Err` if the archive directory, `.tar` files, or save path cannot be processed.
- ///
- /// # Example
- /// ```
- /// update_archive_from_scan("archives/", "flowcell_cache.json")?;
- /// ```
- ///
- /// # See Also
- /// - [`scan_archive`]
- /// - [`FlowCell`]
- /// - [`FlowCellLocation::Archived`]
- pub fn update_archive_from_scan(archive_path: &str, save_path: &str) -> anyhow::Result<()> {
- // Load existing archive, if any
- let mut all: Vec<FlowCell> = if Path::new(save_path).exists() {
- let file = File::open(save_path)?;
- serde_json::from_reader(BufReader::new(file))?
- } else {
- Vec::new()
- };
- let n_before = all.len();
- let pattern = format!("{archive_path}/*.tar");
- // Scan all .tar archives
- let res: Vec<FlowCell> = glob(&pattern)?
- .filter_map(Result::ok)
- .filter_map(|path| {
- let archive_str = path.to_string_lossy();
- let (sample_sheet, pore_activity, throughput, files) =
- match scan_archive(&archive_str) {
- Ok(r) => r,
- Err(e) => {
- warn!("Failed to scan archive {}: {e}", archive_str);
- return None;
- }
- };
- match FlowCell::new(
- sample_sheet,
- pore_activity,
- throughput,
- FlowCellLocation::Archived(archive_path.to_string()),
- files,
- ) {
- Ok(fc) => Some(fc),
- Err(e) => {
- warn!("Failed to create FlowCel from {}: {e}", archive_str);
- None
- }
- }
- })
- .collect();
- // Merge, deduplicate, and write updated archive
- all.extend(res);
- all.sort_by(|a, b| {
- a.sample_sheet
- .flow_cell_id
- .cmp(&b.sample_sheet.flow_cell_id)
- });
- all.dedup_by_key(|v| v.sample_sheet.flow_cell_id.clone());
- let n_final = all.len();
- info!(
- "{} new archive(s) discovered.",
- n_final.saturating_sub(n_before)
- );
- let json = serde_json::to_string_pretty(&all)
- .map_err(|e| anyhow::anyhow!("Can't convert into json.\n{e}"))?;
- fs::write(save_path, json)
- .map_err(|e| anyhow::anyhow!("Can't write file: {save_path}.\n{e}"))?;
- Ok(())
- }
- }
- /// Represents a fully described flowcell unit, including experimental metadata,
- /// physical location (local or archived), sample sheet data, and associated pod5 files.
- ///
- /// A `FlowCell` object serves as the central unit in the data model for sample aggregation
- /// and downstream processing.
- ///
- /// # Fields
- /// - `sample_sheet`: The original MinKNOW sample sheet metadata (`MinKnowSampleSheet`).
- /// - `experiment`: Experiment type inferred from `.pod5` files (see `FlowCellExperiment`).
- /// - `location`: Whether the flowcell was loaded from a local directory or archive store.
- /// - `modified`: Last modification timestamp among `.pod5` files.
- /// - `pod5_size`: Total size (in bytes) of `.pod5` files.
- /// - `n_pod5`: Number of `.pod5` files found.
- /// - `cases`: List of sample/case-level annotations associated with this flowcell (from `IdsInput`).
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct FlowCell {
- pub sample_sheet: MinKnowSampleSheet,
- pub experiment: FlowCellExperiment,
- pub location: FlowCellLocation,
- pub modified: DateTime<Utc>,
- pub pod5_size: usize,
- pub n_pod5: usize,
- pub cases: Vec<IdInput>,
- pub pore_activity: Option<Vec<PoreStateEntry>>,
- pub throughput: Option<Vec<ThroughputEntry>>,
- }
- /// Describes the physical origin of a flowcell when loaded.
- ///
- /// This is used to differentiate flowcells discovered during a local scan
- /// versus those restored from an archived store.
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub enum FlowCellLocation {
- /// Flowcell discovered in a local filesystem path.
- Local(String),
- /// Flowcell restored from a `.tar` archive or a serialized cache.
- Archived(String),
- }
- impl fmt::Display for FlowCellLocation {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "{}",
- match self {
- FlowCellLocation::Local(_) => "local",
- FlowCellLocation::Archived(_) => "archived",
- }
- )
- }
- }
- impl FlowCell {
- /// Constructs a new `FlowCell` from a sample sheet and associated file list.
- ///
- /// This method aggregates information from a parsed `MinKnowSampleSheet` and the
- /// corresponding `.pod5` file metadata, and infers the experiment type from
- /// file paths using `FlowCellExperiment::from_pod5_paths`.
- ///
- /// # Arguments
- /// - `sample_sheet`: Parsed sample sheet metadata.
- /// - `location`: Origin of the flowcell (local or archived).
- /// - `files`: List of files associated with the flowcell, each with:
- /// - `String`: file path
- /// - `u64`: size in bytes
- /// - `DateTime<Utc>`: modification time
- ///
- /// # Returns
- /// - `Ok(FlowCell)` if experiment type and file metadata are successfully resolved.
- /// - `Err` if the experiment type cannot be determined.
- ///
- /// # Errors
- /// - If `FlowCellExperiment::from_pod5_paths` fails (e.g., unknown layout).
- ///
- /// # Example
- /// ```
- /// let fc = FlowCell::new(sample_sheet, FlowCellLocation::Local(dir), files)?;
- /// println!("Flowcell ID: {}", fc.flowcell_id);
- /// ```
- pub fn new(
- sample_sheet: MinKnowSampleSheet,
- pore_activity: Option<Vec<PoreStateEntry>>,
- throughput: Option<Vec<ThroughputEntry>>,
- location: FlowCellLocation,
- files: Vec<(String, u64, DateTime<Utc>)>,
- ) -> anyhow::Result<Self> {
- let flowcell_id = sample_sheet.flow_cell_id.clone();
- // Filter .pod5 files
- let pod5s: Vec<_> = files
- .iter()
- .filter(|(path, _, _)| path.ends_with(".pod5"))
- .cloned()
- .collect();
- let n_pod5 = pod5s.len();
- // Infer experiment type from pod5 paths
- let experiment = FlowCellExperiment::from_pod5_paths(
- &files.iter().map(|(p, _, _)| p.to_string()).collect(),
- )
- .ok_or_else(|| anyhow::anyhow!("Can't find experiment type for {flowcell_id}"))?;
- // Aggregate pod5 size and latest modification time
- let (pod5_size, modified): (usize, DateTime<Utc>) = files
- .into_iter()
- .filter(|(path, _, _)| path.ends_with(".pod5"))
- .fold(
- (0, DateTime::<Utc>::MIN_UTC),
- |(acc_size, acc_time), (_, size, time)| {
- (
- acc_size + size as usize,
- if acc_time < time { time } else { acc_time },
- )
- },
- );
- Ok(Self {
- experiment,
- location,
- modified,
- sample_sheet,
- pod5_size,
- n_pod5,
- cases: Vec::new(),
- pore_activity,
- throughput,
- })
- }
- }
- /// Describes the type of experiment layout based on `.pod5` file structure.
- ///
- /// Used to distinguish between whole-genome sequencing (WGS) `.pod5` files
- /// organized in a single (muxed) directory or demultiplexed (`pod5_pass`) structure.
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub enum FlowCellExperiment {
- /// `.pod5` files are stored in a single unbarcoded directory, typically `/pod5/`.
- WGSPod5Mux(String),
- /// `.pod5` files are organized by barcode in subdirectories, typically `/pod5_pass/`.
- WGSPod5Demux(String),
- }
- impl fmt::Display for FlowCellExperiment {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "{}",
- match self {
- FlowCellExperiment::WGSPod5Mux(_) => "WGS Pod5 Muxed",
- FlowCellExperiment::WGSPod5Demux(_) => "WGS Pod5 Demuxed",
- }
- )
- }
- }
- impl FlowCellExperiment {
- /// Attempts to infer the experiment type from the immediate subdirectories of the given path.
- ///
- /// This is useful when scanning a flowcell directory directly and checking
- /// whether it contains a `pod5/` or `pod5_pass/` structure.
- ///
- /// # Arguments
- /// - `flowcell_path`: Path to the root of a flowcell directory.
- ///
- /// # Returns
- /// - `Some(FlowCellExperiment)` if a known subdirectory is found.
- /// - `None` if no match is detected.
- pub fn from_path(flowcell_path: &str) -> Option<Self> {
- for dir in list_directories(flowcell_path).ok().unwrap_or_default() {
- if dir == "pod5" {
- return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string()));
- }
- if dir == "pod5_pass" {
- return Some(FlowCellExperiment::WGSPod5Demux(dir.to_string()));
- }
- }
- None
- }
- /// Attempts to infer the experiment type from a list of `.pod5` file paths.
- ///
- /// This is typically used when files have already been collected and their
- /// parent directories can be checked for naming conventions.
- ///
- /// # Arguments
- /// - `all_paths`: Vector of paths (as strings) to `.pod5` files or directories.
- ///
- /// # Returns
- /// - `Some(FlowCellExperiment)` if a known suffix is detected.
- /// - `None` if no matching pattern is found.
- pub fn from_pod5_paths(all_paths: &Vec<String>) -> Option<Self> {
- for path in all_paths {
- if path.ends_with("/pod5/") || path.ends_with("/pod5") {
- return Some(FlowCellExperiment::WGSPod5Mux(path.to_string()));
- }
- if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") {
- return Some(FlowCellExperiment::WGSPod5Demux(path.to_string()));
- }
- }
- None
- }
- /// Returns the underlying string (directory path) for the experiment.
- ///
- /// This is useful when you need access to the directory path used to classify the experiment.
- pub fn inner(&self) -> &str {
- match self {
- FlowCellExperiment::WGSPod5Mux(v) => v,
- FlowCellExperiment::WGSPod5Demux(v) => v,
- }
- }
- }
- /// Represents the result of scanning a MinKNOW experiment source.
- ///
- /// This tuple includes:
- /// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample.
- /// - `Vec<(String, u64, DateTime<Utc>)>`: A list of files with:
- /// - `String`: file path
- /// - `u64`: file size in bytes
- /// - `DateTime<Utc>`: last modification time (UTC)
- type ExperimentData = (
- MinKnowSampleSheet,
- Option<Vec<PoreStateEntry>>,
- Option<Vec<ThroughputEntry>>,
- Vec<(String, u64, DateTime<Utc>)>,
- );
- /// Scans a local directory for MinKNOW experiment files and metadata.
- ///
- /// This function recursively walks a directory using globbing,
- /// collects file paths, sizes, and modification timestamps,
- /// and identifies a `sample_sheet` file to parse as `MinKnowSampleSheet`.
- ///
- /// # Arguments
- /// - `dir`: Root directory to scan (absolute or relative).
- ///
- /// # Returns
- /// - `Ok(ExperimentData)` containing the sample sheet and a list of file records.
- /// - `Err` if the directory can't be accessed, no sample sheet is found, or parsing fails.
- ///
- /// # Requirements
- /// - A file path containing `"sample_sheet"` must be present and readable.
- /// - The sample sheet must be formatted according to MinKNOW expectations
- /// (1 header + 1 data row).
- ///
- /// # Errors
- /// - If reading files or metadata fails.
- /// - If the sample sheet is missing or invalid.
- ///
- /// # Example
- /// ```
- /// let (sheet, files) = scan_local("/data/run001")?;
- /// println!("Kit used: {}", sheet.kit);
- /// println!("Number of files found: {}", files.len());
- /// ```
- pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
- let mut result = Vec::new();
- let mut sample_sheet: Option<String> = None;
- let mut pore_activity = None;
- let mut throughput = None;
- for entry in glob(&format!("{}/**/*", dir))? {
- let file = entry.context("Failed to read an entry from the tar archive")?;
- // Extract file properties safely
- let metadata = file.metadata().context(format!(
- "Failed to access file metadata: {}",
- file.display()
- ))?;
- let size = metadata.size();
- let modified = metadata.mtime();
- let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
- let path = file.to_string_lossy().into_owned();
- if path.contains("pore_activity") {
- let mut reader =
- File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
- pore_activity = Some(parse_pore_activity_from_reader(&mut reader).with_context(
- || {
- format!(
- "Failed to parse pore activity date from: {}",
- file.display()
- )
- },
- )?);
- } else if path.contains("sample_sheet") {
- sample_sheet = Some(path.clone());
- } else if path.contains("throughput") {
- let mut reader =
- File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
- throughput = Some(parse_throughput_from_reader(&mut reader).with_context(|| {
- format!("Failed to parse throughput date from: {}", file.display())
- })?);
- }
- result.push((path, size, modified_utc));
- }
- let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?;
- let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
- .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
- Ok((sample_sheet, pore_activity, throughput, result))
- }
- /// Scans a `.tar` archive containing a MinKNOW sequencing experiment.
- ///
- /// This function opens a TAR archive, searches for the `sample_sheet` CSV file,
- /// extracts its metadata, and parses it into a `MinKnowSampleSheet`.
- /// All other entries in the archive are collected with their path, size, and modification time.
- ///
- /// # Arguments
- /// - `tar_path`: Path to the `.tar` archive file.
- ///
- /// # Returns
- /// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata.
- /// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`.
- ///
- /// # Archive Requirements
- /// - Must contain exactly one file matching `"sample_sheet"` in its path.
- /// - The sample sheet must contain a valid CSV header and a single data row.
- ///
- /// # Errors
- /// - Fails if the archive can't be opened or read.
- /// - Fails if any entry is malformed (e.g., missing timestamp).
- /// - Fails if no sample sheet is found or if it is malformed.
- ///
- /// # Example
- /// ```no_run
- /// let (sample_sheet, files) = scan_archive("archive.tar")?;
- /// println!("Sample ID: {}", sample_sheet.sample_id);
- /// println!("Total files in archive: {}", files.len());
- /// ```
- pub fn scan_archive(tar_path: &str) -> anyhow::Result<ExperimentData> {
- info!("Scanning archive: {tar_path}");
- let file = File::open(tar_path)
- .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
- let mut archive = tar::Archive::new(file);
- let mut result = Vec::new();
- let mut sample_sheet: Option<String> = None;
- let mut throughput = None;
- let mut pore_activity = None;
- // Iterate through the entries in the archive
- for entry in archive.entries_with_seek()? {
- let mut file = entry.context("Failed to read an entry from the tar archive")?;
- // Extract file properties safely
- let size = file.size();
- let modified = file
- .header()
- .mtime()
- .context("Failed to get modification time")?;
- let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
- let path = file
- .path()
- .context("Failed to get file path from tar entry")?
- .to_string_lossy()
- .into_owned();
- if path.contains("pore_activity") {
- pore_activity = Some(
- parse_pore_activity_from_reader(&mut file)
- .context("Failed to read pore_activity data: {path}")?,
- );
- } else if path.contains("sample_sheet") {
- let mut buffer = String::new();
- file.read_to_string(&mut buffer)
- .context("Failed to read file contents as string")?;
- sample_sheet = Some(buffer);
- } else if path.contains("throughput") {
- throughput = Some(
- parse_throughput_from_reader(&mut file)
- .context("Failed to read throughput data: {path}")?,
- );
- }
- result.push((path, size, modified_utc));
- }
- let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
- "No sample sheet detected in archive: {tar_path}"
- ))?;
- let (_, data) = sample_sheet
- .split_once("\n")
- .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
- let sample_sheet: MinKnowSampleSheet = data
- .try_into()
- .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
- Ok((sample_sheet, pore_activity, throughput, result))
- }
|