|
|
@@ -0,0 +1,760 @@
|
|
|
+use std::{collections::{HashMap, HashSet}, fmt, fs::{self, File}, io::{BufReader, Read}, os::unix::fs::MetadataExt, path::Path};
|
|
|
+
|
|
|
+use anyhow::Context;
|
|
|
+use chrono::{DateTime, TimeZone, Utc};
|
|
|
+use glob::glob;
|
|
|
+use csv::ReaderBuilder;
|
|
|
+use log::{info, warn};
|
|
|
+use serde::{Deserialize, Serialize};
|
|
|
+
|
|
|
+use crate::helpers::{find_files, list_directories};
|
|
|
+
|
|
|
+use super::minknow::MinKnowSampleSheet;
|
|
|
+
|
|
|
+/// A collection of `IdInput` records, with utility methods
|
|
|
+/// for loading, saving, deduplication, and construction from TSV.
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub struct IdsInput {
|
|
|
+ /// The list of ID entries.
|
|
|
+ pub data: Vec<IdInput>,
|
|
|
+}
|
|
|
+
|
|
|
+impl IdsInput {
|
|
|
+ /// Load `IdsInput` from a JSON file.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `path` - Path to a JSON file containing an array of `IdInput`.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if the file cannot be opened or parsed.
|
|
|
+ pub fn load_json(path: &str) -> anyhow::Result<Self> {
|
|
|
+ let f = File::open(path)?;
|
|
|
+ let s: Self = serde_json::from_reader(f)?;
|
|
|
+ Ok(s)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Save `IdsInput` to a JSON file.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `path` - Destination file path.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if the file cannot be created or written.
|
|
|
+ pub fn save_json(&self, path: &str) -> anyhow::Result<()> {
|
|
|
+ let f = File::create(path)?;
|
|
|
+ serde_json::to_writer(f, self)?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Remove duplicate `IdInput` entries.
|
|
|
+ ///
|
|
|
+ /// Keeps the first occurrence of each unique `IdInput`.
|
|
|
+ pub fn dedup(&mut self) {
|
|
|
+ let mut unique = HashSet::new();
|
|
|
+ self.data.retain(|item| unique.insert(item.clone()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Load `IdsInput` from a TSV file using corrected flowcell names.
|
|
|
+ ///
|
|
|
+ /// This method internally deduplicates the data.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `path` - Path to the TSV file.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if loading or parsing fails.
|
|
|
+ pub fn load_from_tsv(path: &str) -> anyhow::Result<Self> {
|
|
|
+ let inputs = load_flowcells_corrected_names(path)?;
|
|
|
+ let data = inputs
|
|
|
+ .iter()
|
|
|
+ .map(|line| IdInput {
|
|
|
+ id: line.id.to_string(),
|
|
|
+ time_point: line.sample_type.to_string(),
|
|
|
+ barcode: line.barcode_number.to_string(),
|
|
|
+ flow_cell: line.flow_cell.to_string(),
|
|
|
+ run: line.run.to_string(),
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ let mut res = Self { data };
|
|
|
+ res.dedup();
|
|
|
+ Ok(res)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Add a new `IdInput` and deduplicate the collection.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `values` - A new `IdInput` record.
|
|
|
+ pub fn add_input(&mut self, values: IdInput) {
|
|
|
+ self.data.push(values);
|
|
|
+ self.dedup();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// A unique sample identifier from sequencing metadata.
|
|
|
+///
|
|
|
+/// Uniqueness is defined by the combination of all fields.
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
|
|
|
+pub struct IdInput {
|
|
|
+ /// Sample or patient ID.
|
|
|
+ pub id: String,
|
|
|
+ /// Time point or sample type.
|
|
|
+ pub time_point: String,
|
|
|
+ /// Barcode number (sequencing index).
|
|
|
+ pub barcode: String,
|
|
|
+ /// Flow cell identifier.
|
|
|
+ pub flow_cell: String,
|
|
|
+ /// Run identifier.
|
|
|
+ pub run: String,
|
|
|
+}
|
|
|
+
|
|
|
+/// Represents a single record describing a barcode-flowcell pairing,
|
|
|
+/// including original and corrected metadata.
|
|
|
+///
|
|
|
+/// This struct is typically deserialized from a TSV file and used to map
|
|
|
+/// `.pod5` files to metadata like corrected flowcell names and experimental time points.
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub struct FCLine {
|
|
|
+ /// Unique identifier for the sample or barcode group (e.g., "P001X03").
|
|
|
+ pub id: String,
|
|
|
+
|
|
|
+ /// Sample type associated with this record (e.g., "normal", "tumoral").
|
|
|
+ pub sample_type: String,
|
|
|
+
|
|
|
+ /// The barcode number (e.g., "NB01", "NB02").
|
|
|
+ pub barcode_number: String,
|
|
|
+
|
|
|
+ /// Original flowcell name as found in the raw `.pod5` metadata.
|
|
|
+ pub flow_cell: String,
|
|
|
+
|
|
|
+ /// Sequencing run name this flowcell belongs to (e.g., "20240101_FAB123").
|
|
|
+ pub run: String,
|
|
|
+
|
|
|
+ /// Original path to data (can be absolute or relative).
|
|
|
+ pub path: String,
|
|
|
+
|
|
|
+ /// Corrected flowcell name used to resolve naming inconsistencies.
|
|
|
+ pub ref_flow_cell: String,
|
|
|
+}
|
|
|
+
|
|
|
+/// Loads corrected flowcell metadata from a tab-delimited file.
|
|
|
+///
|
|
|
+/// This function parses a TSV file where each row is deserialized into an `FCLine`.
|
|
|
+/// It also normalizes some fields (e.g., lowercases `sample_type`, uppercases `id`)
|
|
|
+/// for consistency in downstream processing.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+/// - `file_path`: Path to the TSV file containing flowcell correction data.
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+/// A vector of `FCLine` records, one per line in the file.
|
|
|
+///
|
|
|
+/// # Errors
|
|
|
+/// Returns an error if the file cannot be opened or if any line fails to deserialize.
|
|
|
+///
|
|
|
+/// # Expected Format (TSV with header)
|
|
|
+/// ```text
|
|
|
+/// id sample_type barcode_number flow_cell run_path ref_flow_cell
|
|
|
+/// P001X03 tumoral NB01 FC123 RUN123 /path/to/data FC123_CORR
|
|
|
+/// ```
|
|
|
+///
|
|
|
+/// # Example
|
|
|
+/// ```
|
|
|
+/// let fc_lines = load_flowcells_corrected_names("flowcells.tsv")?;
|
|
|
+/// assert!(!fc_lines.is_empty());
|
|
|
+/// ```
|
|
|
+pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result<Vec<FCLine>> {
|
|
|
+ let file = File::open(file_path)?;
|
|
|
+
|
|
|
+ let mut rdr = ReaderBuilder::new()
|
|
|
+ .delimiter(b'\t')
|
|
|
+ .has_headers(true)
|
|
|
+ .from_reader(file);
|
|
|
+
|
|
|
+ let mut records = Vec::new();
|
|
|
+ for result in rdr.deserialize() {
|
|
|
+ let mut record: FCLine = result?;
|
|
|
+
|
|
|
+ // formating
|
|
|
+ record.sample_type = record.sample_type.to_lowercase();
|
|
|
+ record.id = record.id.to_uppercase();
|
|
|
+
|
|
|
+ records.push(record);
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(records)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+/// Container for a deduplicated and enriched collection of flowcells (`FlowCel`).
|
|
|
+///
|
|
|
+/// `FlowCells` represents the aggregated result of scanning multiple sources:
|
|
|
+/// - A cached archive of flowcells (`archive_store_path`)
|
|
|
+/// - A live scan of the local run directory (`local_run_dir`)
|
|
|
+///
|
|
|
+/// Each [`FlowCel`] contains all necessary metadata for downstream processing,
|
|
|
+/// including parsed MinKNOW sample sheet data, `.pod5` file statistics, experiment layout,
|
|
|
+/// and optional sample/case annotations from an [`IdsInput`] file.
|
|
|
+///
|
|
|
+/// The [`FlowCells::load`] method performs the following:
|
|
|
+/// - Loads existing flowcells from the archive if available
|
|
|
+/// - Scans local directories for new or updated flowcells
|
|
|
+/// - Deduplicates flowcells using the `flowcell_id`
|
|
|
+/// - Retains the most recently modified version of each flowcell
|
|
|
+/// - Enriches each flowcell with case-level annotations
|
|
|
+///
|
|
|
+/// # Fields
|
|
|
+/// - `flow_cells`: A deduplicated list of fully parsed [`FlowCel`] instances.
|
|
|
+///
|
|
|
+/// # Example
|
|
|
+/// ```
|
|
|
+/// let flow_cells = FlowCells::load(
|
|
|
+/// "/mnt/data/runs",
|
|
|
+/// "inputs.json",
|
|
|
+/// "flowcell_cache.json"
|
|
|
+/// )?;
|
|
|
+/// println!("Loaded {} unique flowcells", flow_cells.flow_cells.len());
|
|
|
+/// ```
|
|
|
+///
|
|
|
+/// # Deduplication
|
|
|
+/// Flowcells are uniquely identified by their `flowcell_id`, a combination of
|
|
|
+/// `{experiment_id}/{sample_id}`. If both archived and local versions exist,
|
|
|
+/// the one with the latest `.pod5` modification time is retained.
|
|
|
+///
|
|
|
+/// # Related Types
|
|
|
+/// - [`FlowCel`]: Describes a flowcell, its metadata, and files
|
|
|
+/// - [`FlowCellExperiment`]: Muxed vs. demuxed layout classification
|
|
|
+/// - [`FlowCellLocation`]: Indicates source (local or archive)
|
|
|
+/// - [`MinKnowSampleSheet`]: Parsed sample sheet data
|
|
|
+/// - [`IdInput`]: Case-level annotation applied to flowcells
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub struct FlowCells {
|
|
|
+ /// A collection of parsed flowcell metadata records.
|
|
|
+ pub flow_cells: Vec<FlowCel>,
|
|
|
+}
|
|
|
+
|
|
|
+impl FlowCells {
|
|
|
+ /// Loads and merges `FlowCel` objects from both archive and local filesystem, deduplicating by `flowcell_id`.
|
|
|
+ ///
|
|
|
+ /// This function combines flowcells from:
|
|
|
+ /// - a precomputed archive (JSON),
|
|
|
+ /// - and a dynamic scan of local run directories.
|
|
|
+ ///
|
|
|
+ /// The result is deduplicated by `flowcell_id`, and enriched with case-level annotations
|
|
|
+ /// from an `IdsInput` file based on `sample_id` and `experiment_id`.
|
|
|
+ ///
|
|
|
+ /// # Deduplication Logic
|
|
|
+ /// If a flowcell appears in both sources, the one with the more recent `modified` timestamp is retained.
|
|
|
+ pub fn load(
|
|
|
+ local_run_dir: &str,
|
|
|
+ inputs_path: &str,
|
|
|
+ archive_store_path: &str,
|
|
|
+ ) -> anyhow::Result<Self> {
|
|
|
+ let mut merged_map: HashMap<String, FlowCel> = HashMap::new();
|
|
|
+
|
|
|
+ // Load from archive if present
|
|
|
+ if Path::new(archive_store_path).exists() {
|
|
|
+ let file = File::open(archive_store_path)?;
|
|
|
+ let archived: Vec<FlowCel> = serde_json::from_reader(BufReader::new(file))?;
|
|
|
+
|
|
|
+ for fc in archived {
|
|
|
+ merged_map.insert(fc.flowcell_id.clone(), fc);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Scan local sample_sheets
|
|
|
+ let sample_sheets = find_files(&format!("{local_run_dir}/**/sample_sheet*"))?;
|
|
|
+ for sample_sheet_path in sample_sheets {
|
|
|
+ let dir = sample_sheet_path.parent().ok_or_else(|| {
|
|
|
+ anyhow::anyhow!(
|
|
|
+ "Failed to get directory from path: {}",
|
|
|
+ sample_sheet_path.display()
|
|
|
+ )
|
|
|
+ })?;
|
|
|
+ let dir_str = dir.to_string_lossy().to_string();
|
|
|
+
|
|
|
+ let (sample_sheet, files) = scan_local(&dir_str)?;
|
|
|
+ let fc = FlowCel::new(sample_sheet, FlowCellLocation::Local(dir_str), files)?;
|
|
|
+
|
|
|
+ // Dedup by flowcell_id, retain most recently modified
|
|
|
+ merged_map
|
|
|
+ .entry(fc.flowcell_id.clone())
|
|
|
+ .and_modify(|existing| {
|
|
|
+ if fc.modified > existing.modified {
|
|
|
+ *existing = fc.clone();
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .or_insert(fc);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Load input metadata and annotate flowcells
|
|
|
+ let inputs = IdsInput::load_json(inputs_path)?;
|
|
|
+ for fc in merged_map.values_mut() {
|
|
|
+ fc.cases = inputs
|
|
|
+ .data
|
|
|
+ .iter()
|
|
|
+ .filter(|info| {
|
|
|
+ info.flow_cell == fc.run.sample_id && info.run == fc.run.experiment_id
|
|
|
+ })
|
|
|
+ .cloned()
|
|
|
+ .collect();
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(Self {
|
|
|
+ flow_cells: merged_map.into_values().collect(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Updates a JSON archive of `FlowCel` objects by scanning `.tar` archives in a directory.
|
|
|
+ ///
|
|
|
+ /// This function is used to **discover new archived flowcells** by scanning all `.tar` files
|
|
|
+ /// in a given directory, parsing their contents using [`scan_archive`] and [`FlowCel::new`],
|
|
|
+ /// and then appending the results to an existing JSON file (if present).
|
|
|
+ /// Flowcells are **deduplicated** by `flowcell_id`, and the updated result is saved back to disk.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// - `archive_path`: Path to a directory containing `.tar` archives produced by MinKNOW.
|
|
|
+ /// - `save_path`: Path to a JSON file where the deduplicated list of `FlowCel` objects will be saved.
|
|
|
+ ///
|
|
|
+ /// # Behavior
|
|
|
+ /// - If `save_path` exists, the function loads existing flowcells from it.
|
|
|
+ /// - Then it scans all `.tar` files in `archive_path`, one by one:
|
|
|
+ /// - Extracts `sample_sheet` and `.pod5` file metadata using [`scan_archive`]
|
|
|
+ /// - Builds a new [`FlowCel`] using [`FlowCel::new`] with location `FlowCellLocation::Archived(...)`
|
|
|
+ /// - Logs and skips entries that fail to parse
|
|
|
+ /// - All new flowcells are added to the existing list and deduplicated.
|
|
|
+ /// - The updated list is sorted and written back to `save_path`.
|
|
|
+ ///
|
|
|
+ /// # Deduplication
|
|
|
+ /// - Flowcells are deduplicated using `.dedup_by_key(|fc| fc.flowcell_id.clone())`.
|
|
|
+ /// - The last encountered entry is kept if duplicates exist.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// - `Ok(())` if scanning and update succeeds.
|
|
|
+ /// - `Err` if the archive directory, `.tar` files, or save path cannot be processed.
|
|
|
+ ///
|
|
|
+ /// # Example
|
|
|
+ /// ```
|
|
|
+ /// update_archive_from_scan("archives/", "flowcell_cache.json")?;
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// # See Also
|
|
|
+ /// - [`scan_archive`]
|
|
|
+ /// - [`FlowCel`]
|
|
|
+ /// - [`FlowCellLocation::Archived`]
|
|
|
+ pub fn update_archive_from_scan(archive_path: &str, save_path: &str) -> anyhow::Result<()> {
|
|
|
+ // Load existing archive, if any
|
|
|
+ let mut all: Vec<FlowCel> = if Path::new(save_path).exists() {
|
|
|
+ let file = File::open(save_path)?;
|
|
|
+ serde_json::from_reader(BufReader::new(file))?
|
|
|
+ } else {
|
|
|
+ Vec::new()
|
|
|
+ };
|
|
|
+
|
|
|
+ let n_before = all.len();
|
|
|
+ let pattern = format!("{archive_path}/*.tar");
|
|
|
+
|
|
|
+ // Scan all .tar archives
|
|
|
+ let res: Vec<FlowCel> = glob(&pattern)?
|
|
|
+ .filter_map(Result::ok)
|
|
|
+ .filter_map(|path| {
|
|
|
+ let archive_str = path.to_string_lossy();
|
|
|
+ let (sample_sheet, files) = match scan_archive(&archive_str) {
|
|
|
+ Ok(r) => r,
|
|
|
+ Err(e) => {
|
|
|
+ warn!("Failed to scan archive {}: {e}", archive_str);
|
|
|
+ return None;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ match FlowCel::new(
|
|
|
+ sample_sheet,
|
|
|
+ FlowCellLocation::Archived(archive_path.to_string()),
|
|
|
+ files,
|
|
|
+ ) {
|
|
|
+ Ok(fc) => Some(fc),
|
|
|
+ Err(e) => {
|
|
|
+ warn!("Failed to create FlowCel from {}: {e}", archive_str);
|
|
|
+ None
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ // Merge, deduplicate, and write updated archive
|
|
|
+ all.extend(res);
|
|
|
+ all.sort_by(|a, b| a.flowcell_id.cmp(&b.flowcell_id));
|
|
|
+ all.dedup_by_key(|v| v.flowcell_id.clone());
|
|
|
+
|
|
|
+ let n_final = all.len();
|
|
|
+ info!("{} new archive(s) discovered.", n_final.saturating_sub(n_before));
|
|
|
+
|
|
|
+ let json = serde_json::to_string_pretty(&all)
|
|
|
+ .map_err(|e| anyhow::anyhow!("Can't convert into json.\n{e}"))?;
|
|
|
+
|
|
|
+ fs::write(save_path, json)
|
|
|
+ .map_err(|e| anyhow::anyhow!("Can't write file: {save_path}.\n{e}"))?;
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Represents a fully described flowcell unit, including experimental metadata,
|
|
|
+/// physical location (local or archived), sample sheet data, and associated pod5 files.
|
|
|
+///
|
|
|
+/// A `FlowCel` object serves as the central unit in the data model for sample aggregation
|
|
|
+/// and downstream processing.
|
|
|
+///
|
|
|
+/// # Fields
|
|
|
+/// - `flowcell_id`: A compound identifier, typically formatted as `{experiment_id}/{sample_id}`.
|
|
|
+/// - `experiment`: Experiment type inferred from `.pod5` files (see `FlowCellExperiment`).
|
|
|
+/// - `location`: Whether the flowcell was loaded from a local directory or archive store.
|
|
|
+/// - `modified`: Last modification timestamp among `.pod5` files.
|
|
|
+/// - `run`: The original MinKNOW sample sheet metadata (`MinKnowSampleSheet`).
|
|
|
+/// - `pod5_size`: Total size (in bytes) of `.pod5` files.
|
|
|
+/// - `n_pod5`: Number of `.pod5` files found.
|
|
|
+/// - `cases`: List of sample/case-level annotations associated with this flowcell (from `IdsInput`).
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub struct FlowCel {
|
|
|
+ pub flowcell_id: String,
|
|
|
+ pub experiment: FlowCellExperiment,
|
|
|
+ pub location: FlowCellLocation,
|
|
|
+ pub modified: DateTime<Utc>,
|
|
|
+ pub run: MinKnowSampleSheet,
|
|
|
+ pub pod5_size: usize,
|
|
|
+ pub n_pod5: usize,
|
|
|
+ pub cases: Vec<IdInput>,
|
|
|
+}
|
|
|
+
|
|
|
+/// Describes the physical origin of a flowcell when loaded.
|
|
|
+///
|
|
|
+/// This is used to differentiate flowcells discovered during a local scan
|
|
|
+/// versus those restored from an archived store.
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub enum FlowCellLocation {
|
|
|
+ /// Flowcell discovered in a local filesystem path.
|
|
|
+ Local(String),
|
|
|
+
|
|
|
+ /// Flowcell restored from a `.tar` archive or a serialized cache.
|
|
|
+ Archived(String),
|
|
|
+}
|
|
|
+
|
|
|
+impl fmt::Display for FlowCellLocation {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(
|
|
|
+ f,
|
|
|
+ "{}",
|
|
|
+ match self {
|
|
|
+ FlowCellLocation::Local(_) => "local",
|
|
|
+ FlowCellLocation::Archived(_) => "archived",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ }
|
|
|
+}
|
|
|
+impl FlowCel {
|
|
|
+ /// Constructs a new `FlowCel` from a sample sheet and associated file list.
|
|
|
+ ///
|
|
|
+ /// This method aggregates information from a parsed `MinKnowSampleSheet` and the
|
|
|
+ /// corresponding `.pod5` file metadata, and infers the experiment type from
|
|
|
+ /// file paths using `FlowCellExperiment::from_pod5_paths`.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// - `sample_sheet`: Parsed sample sheet metadata.
|
|
|
+ /// - `location`: Origin of the flowcell (local or archived).
|
|
|
+ /// - `files`: List of files associated with the flowcell, each with:
|
|
|
+ /// - `String`: file path
|
|
|
+ /// - `u64`: size in bytes
|
|
|
+ /// - `DateTime<Utc>`: modification time
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// - `Ok(FlowCel)` if experiment type and file metadata are successfully resolved.
|
|
|
+ /// - `Err` if the experiment type cannot be determined.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// - If `FlowCellExperiment::from_pod5_paths` fails (e.g., unknown layout).
|
|
|
+ ///
|
|
|
+ /// # Example
|
|
|
+ /// ```
|
|
|
+ /// let fc = FlowCel::new(sample_sheet, FlowCellLocation::Local(dir), files)?;
|
|
|
+ /// println!("Flowcell ID: {}", fc.flowcell_id);
|
|
|
+ /// ```
|
|
|
+ pub fn new(
|
|
|
+ sample_sheet: MinKnowSampleSheet,
|
|
|
+ location: FlowCellLocation,
|
|
|
+ files: Vec<(String, u64, DateTime<Utc>)>,
|
|
|
+ ) -> anyhow::Result<Self> {
|
|
|
+ let flowcell_id = format!("{}/{}", sample_sheet.experiment_id, sample_sheet.sample_id);
|
|
|
+
|
|
|
+ // Filter .pod5 files
|
|
|
+ let pod5s: Vec<_> = files
|
|
|
+ .iter()
|
|
|
+ .filter(|(path, _, _)| path.ends_with(".pod5"))
|
|
|
+ .cloned()
|
|
|
+ .collect();
|
|
|
+ let n_pod5 = pod5s.len();
|
|
|
+
|
|
|
+ // Infer experiment type from pod5 paths
|
|
|
+ let experiment = FlowCellExperiment::from_pod5_paths(
|
|
|
+ &files.iter().map(|(p, _, _)| p.to_string()).collect(),
|
|
|
+ )
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Can't find experiment type for {flowcell_id}"))?;
|
|
|
+
|
|
|
+ // Aggregate pod5 size and latest modification time
|
|
|
+ let (pod5_size, modified): (usize, DateTime<Utc>) = files
|
|
|
+ .into_iter()
|
|
|
+ .filter(|(path, _, _)| path.ends_with(".pod5"))
|
|
|
+ .fold(
|
|
|
+ (0, DateTime::<Utc>::MIN_UTC),
|
|
|
+ |(acc_size, acc_time), (_, size, time)| {
|
|
|
+ (
|
|
|
+ acc_size + size as usize,
|
|
|
+ if acc_time < time { time } else { acc_time },
|
|
|
+ )
|
|
|
+ },
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(Self {
|
|
|
+ flowcell_id,
|
|
|
+ experiment,
|
|
|
+ location,
|
|
|
+ modified,
|
|
|
+ run: sample_sheet,
|
|
|
+ pod5_size,
|
|
|
+ n_pod5,
|
|
|
+ cases: Vec::new(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Describes the type of experiment layout based on `.pod5` file structure.
|
|
|
+///
|
|
|
+/// Used to distinguish between whole-genome sequencing (WGS) `.pod5` files
|
|
|
+/// organized in a single (muxed) directory or demultiplexed (`pod5_pass`) structure.
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
|
+pub enum FlowCellExperiment {
|
|
|
+ /// `.pod5` files are stored in a single unbarcoded directory, typically `/pod5/`.
|
|
|
+ WGSPod5Mux(String),
|
|
|
+
|
|
|
+ /// `.pod5` files are organized by barcode in subdirectories, typically `/pod5_pass/`.
|
|
|
+ WGSPod5Demux(String),
|
|
|
+}
|
|
|
+
|
|
|
+impl fmt::Display for FlowCellExperiment {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(
|
|
|
+ f,
|
|
|
+ "{}",
|
|
|
+ match self {
|
|
|
+ FlowCellExperiment::WGSPod5Mux(_) => "WGS Pod5 Muxed",
|
|
|
+ FlowCellExperiment::WGSPod5Demux(_) => "WGS Pod5 Demuxed",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl FlowCellExperiment {
|
|
|
+ /// Attempts to infer the experiment type from the immediate subdirectories of the given path.
|
|
|
+ ///
|
|
|
+ /// This is useful when scanning a flowcell directory directly and checking
|
|
|
+ /// whether it contains a `pod5/` or `pod5_pass/` structure.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// - `flowcell_path`: Path to the root of a flowcell directory.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// - `Some(FlowCellExperiment)` if a known subdirectory is found.
|
|
|
+ /// - `None` if no match is detected.
|
|
|
+ pub fn from_path(flowcell_path: &str) -> Option<Self> {
|
|
|
+ for dir in list_directories(flowcell_path).ok().unwrap_or_default() {
|
|
|
+ if dir == "pod5" {
|
|
|
+ return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string()));
|
|
|
+ }
|
|
|
+ if dir == "pod5_pass" {
|
|
|
+ return Some(FlowCellExperiment::WGSPod5Demux(dir.to_string()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ None
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Attempts to infer the experiment type from a list of `.pod5` file paths.
|
|
|
+ ///
|
|
|
+ /// This is typically used when files have already been collected and their
|
|
|
+ /// parent directories can be checked for naming conventions.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// - `all_paths`: Vector of paths (as strings) to `.pod5` files or directories.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// - `Some(FlowCellExperiment)` if a known suffix is detected.
|
|
|
+ /// - `None` if no matching pattern is found.
|
|
|
+ pub fn from_pod5_paths(all_paths: &Vec<String>) -> Option<Self> {
|
|
|
+ for path in all_paths {
|
|
|
+ if path.ends_with("/pod5/") || path.ends_with("/pod5") {
|
|
|
+ return Some(FlowCellExperiment::WGSPod5Mux(path.to_string()));
|
|
|
+ }
|
|
|
+
|
|
|
+ if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") {
|
|
|
+ return Some(FlowCellExperiment::WGSPod5Demux(path.to_string()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ None
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the underlying string (directory path) for the experiment.
|
|
|
+ ///
|
|
|
+ /// This is useful when you need access to the directory path used to classify the experiment.
|
|
|
+ pub fn inner(&self) -> &str {
|
|
|
+ match self {
|
|
|
+ FlowCellExperiment::WGSPod5Mux(v) => v,
|
|
|
+ FlowCellExperiment::WGSPod5Demux(v) => v,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Represents the result of scanning a MinKNOW experiment source.
|
|
|
+///
|
|
|
+/// This tuple includes:
|
|
|
+/// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample.
|
|
|
+/// - `Vec<(String, u64, DateTime<Utc>)>`: A list of files with:
|
|
|
+/// - `String`: file path
|
|
|
+/// - `u64`: file size in bytes
|
|
|
+/// - `DateTime<Utc>`: last modification time (UTC)
|
|
|
+type ExperimentData = (MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>);
|
|
|
+
|
|
|
+/// Scans a local directory for MinKNOW experiment files and metadata.
|
|
|
+///
|
|
|
+/// This function recursively walks a directory using globbing,
|
|
|
+/// collects file paths, sizes, and modification timestamps,
|
|
|
+/// and identifies a `sample_sheet` file to parse as `MinKnowSampleSheet`.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+/// - `dir`: Root directory to scan (absolute or relative).
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+/// - `Ok(ExperimentData)` containing the sample sheet and a list of file records.
|
|
|
+/// - `Err` if the directory can't be accessed, no sample sheet is found, or parsing fails.
|
|
|
+///
|
|
|
+/// # Requirements
|
|
|
+/// - A file path containing `"sample_sheet"` must be present and readable.
|
|
|
+/// - The sample sheet must be formatted according to MinKNOW expectations
|
|
|
+/// (1 header + 1 data row).
|
|
|
+///
|
|
|
+/// # Errors
|
|
|
+/// - If reading files or metadata fails.
|
|
|
+/// - If the sample sheet is missing or invalid.
|
|
|
+///
|
|
|
+/// # Example
|
|
|
+/// ```
|
|
|
+/// let (sheet, files) = scan_local("/data/run001")?;
|
|
|
+/// println!("Kit used: {}", sheet.kit);
|
|
|
+/// println!("Number of files found: {}", files.len());
|
|
|
+/// ```
|
|
|
+pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
|
|
|
+ let mut result = Vec::new();
|
|
|
+ let mut sample_sheet: Option<String> = None;
|
|
|
+
|
|
|
+ for entry in glob(&format!("{}/**/*", dir))? {
|
|
|
+ let file = entry.context("Failed to read an entry from the tar archive")?;
|
|
|
+
|
|
|
+ // Extract file properties safely
|
|
|
+ let metadata = file.metadata().context(format!(
|
|
|
+ "Failed to access file metadata: {}",
|
|
|
+ file.display()
|
|
|
+ ))?;
|
|
|
+ let size = metadata.size();
|
|
|
+ let modified = metadata.mtime();
|
|
|
+ let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
|
|
|
+
|
|
|
+ let path = file.to_string_lossy().into_owned();
|
|
|
+
|
|
|
+ if path.contains("sample_sheet") {
|
|
|
+ sample_sheet = Some(path.clone());
|
|
|
+ }
|
|
|
+
|
|
|
+ result.push((path, size, modified_utc));
|
|
|
+ }
|
|
|
+
|
|
|
+ let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?;
|
|
|
+ let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
|
|
|
+ .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
|
|
|
+
|
|
|
+ Ok((sample_sheet, result))
|
|
|
+}
|
|
|
+
|
|
|
+/// Scans a `.tar` archive containing a MinKNOW sequencing experiment.
|
|
|
+///
|
|
|
+/// This function opens a TAR archive, searches for the `sample_sheet` CSV file,
|
|
|
+/// extracts its metadata, and parses it into a `MinKnowSampleSheet`.
|
|
|
+/// All other entries in the archive are collected with their path, size, and modification time.
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+/// - `tar_path`: Path to the `.tar` archive file.
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+/// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata.
|
|
|
+/// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`.
|
|
|
+///
|
|
|
+/// # Archive Requirements
|
|
|
+/// - Must contain exactly one file matching `"sample_sheet"` in its path.
|
|
|
+/// - The sample sheet must contain a valid CSV header and a single data row.
|
|
|
+///
|
|
|
+/// # Errors
|
|
|
+/// - Fails if the archive can't be opened or read.
|
|
|
+/// - Fails if any entry is malformed (e.g., missing timestamp).
|
|
|
+/// - Fails if no sample sheet is found or if it is malformed.
|
|
|
+///
|
|
|
+/// # Example
|
|
|
+/// ```no_run
|
|
|
+/// let (sample_sheet, files) = scan_archive("archive.tar")?;
|
|
|
+/// println!("Sample ID: {}", sample_sheet.sample_id);
|
|
|
+/// println!("Total files in archive: {}", files.len());
|
|
|
+/// ```
|
|
|
+pub fn scan_archive(tar_path: &str) -> anyhow::Result<ExperimentData> {
|
|
|
+ info!("Scanning archive: {tar_path}");
|
|
|
+
|
|
|
+ let file = File::open(tar_path)
|
|
|
+ .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
|
|
|
+
|
|
|
+ let mut archive = tar::Archive::new(file);
|
|
|
+ let mut result = Vec::new();
|
|
|
+ let mut sample_sheet: Option<String> = None;
|
|
|
+
|
|
|
+ // Iterate through the entries in the archive
|
|
|
+ for entry in archive.entries_with_seek()? {
|
|
|
+ let mut file = entry.context("Failed to read an entry from the tar archive")?;
|
|
|
+
|
|
|
+ // Extract file properties safely
|
|
|
+ let size = file.size();
|
|
|
+ let modified = file
|
|
|
+ .header()
|
|
|
+ .mtime()
|
|
|
+ .context("Failed to get modification time")?;
|
|
|
+ let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
|
|
|
+
|
|
|
+ let path = file
|
|
|
+ .path()
|
|
|
+ .context("Failed to get file path from tar entry")?
|
|
|
+ .to_string_lossy()
|
|
|
+ .into_owned();
|
|
|
+
|
|
|
+ if path.contains("sample_sheet") {
|
|
|
+ let mut buffer = String::new();
|
|
|
+ file.read_to_string(&mut buffer)
|
|
|
+ .context("Failed to read file contents as string")?;
|
|
|
+ sample_sheet = Some(buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ result.push((path, size, modified_utc));
|
|
|
+ }
|
|
|
+
|
|
|
+ let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
|
|
|
+ "No sample sheet detected in archive: {tar_path}"
|
|
|
+ ))?;
|
|
|
+ let (_, data) = sample_sheet
|
|
|
+ .split_once("\n")
|
|
|
+ .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
|
|
|
+ let sample_sheet: MinKnowSampleSheet = data
|
|
|
+ .try_into()
|
|
|
+ .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
|
|
|
+ Ok((sample_sheet, result))
|
|
|
+}
|