use std::{ collections::{HashMap, HashSet}, fmt, fs::{self, File, OpenOptions}, io::{BufReader, Read}, os::unix::fs::MetadataExt, path::Path, }; use anyhow::Context; use chrono::{DateTime, TimeZone, Utc}; use glob::glob; use log::{info, warn}; use serde::{Deserialize, Serialize}; use crate::{ collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader}, helpers::{find_files, list_directories}, }; use super::minknow::{MinKnowSampleSheet, PoreStateEntry, ThroughputEntry}; /// A collection of `IdInput` records, with utility methods /// for loading, saving, deduplication, and construction from TSV. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct IdsInput { /// The list of ID entries. pub data: Vec, } /// A unique sample identifier from sequencing metadata. /// /// Uniqueness is defined by the combination of all fields. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] pub struct IdInput { /// Case or patient ID. pub case_id: String, /// Time point or sample type. pub sample_type: String, /// Barcode number (sequencing index). pub barcode: String, /// Flow cell identifier. pub flow_cell_id: String, } impl IdsInput { /// Load `IdsInput` from a JSON file. pub fn load_json>(path: P) -> anyhow::Result { let file = File::open(&path)?; Ok(serde_json::from_reader(file)?) } /// Save `IdsInput` to a JSON file (pretty-printed). pub fn save_json>(&self, path: P) -> anyhow::Result<()> { let file = File::create(&path)?; serde_json::to_writer_pretty(file, self)?; Ok(()) } /// Remove duplicate `IdInput` entries, retaining the first occurrence. pub fn dedup(&mut self) { let mut seen = HashSet::new(); self.data.retain(|item| seen.insert(item.clone())); } /// Add a new `IdInput` and deduplicate. pub fn add_input(&mut self, entry: IdInput) { self.data.push(entry); self.dedup(); } } /// Container for a deduplicated and enriched collection of flowcells (`FlowCel`). /// /// `FlowCells` represents the aggregated result of scanning multiple sources: /// - A cached archive of flowcells (`archive_store_path`) /// - A live scan of the local run directory (`local_run_dir`) /// /// Each [`FlowCel`] contains all necessary metadata for downstream processing, /// including parsed MinKNOW sample sheet data, `.pod5` file statistics, experiment layout, /// and optional sample/case annotations from an [`IdsInput`] file. /// /// The [`FlowCells::load`] method performs the following: /// - Loads existing flowcells from the archive if available /// - Scans local directories for new or updated flowcells /// - Deduplicates flowcells using the `flowcell_id` /// - Retains the most recently modified version of each flowcell /// - Enriches each flowcell with case-level annotations /// /// # Fields /// - `flow_cells`: A deduplicated list of fully parsed [`FlowCel`] instances. /// /// # Example /// ``` /// let flow_cells = FlowCells::load( /// "/mnt/data/runs", /// "inputs.json", /// "flowcell_cache.json" /// )?; /// println!("Loaded {} unique flowcells", flow_cells.flow_cells.len()); /// ``` /// /// # Deduplication /// Flowcells are uniquely identified by their `flowcell_id`, a combination of /// `{experiment_id}/{sample_id}`. If both archived and local versions exist, /// the one with the latest `.pod5` modification time is retained. /// /// # Related Types /// - [`FlowCel`]: Describes a flowcell, its metadata, and files /// - [`FlowCellExperiment`]: Muxed vs. demuxed layout classification /// - [`FlowCellLocation`]: Indicates source (local or archive) /// - [`MinKnowSampleSheet`]: Parsed sample sheet data /// - [`IdInput`]: Case-level annotation applied to flowcells #[derive(Debug, Serialize, Deserialize, Clone, Default)] pub struct FlowCells { /// A collection of parsed flowcell metadata records. pub flow_cells: Vec, } impl FlowCells { /// Load and merge flowcells from archive and local directories, then annotate with `IdsInput`. /// /// # Arguments /// * `local_run_dir` - Root directory containing local flowcell run folders. /// * `inputs_path` - Path to a JSON file with [`IdInput`] annotations. /// * `archive_store_path` - Path to a cached JSON file of archived flowcells. /// /// # Returns /// A deduplicated and annotated [`FlowCells`] collection. /// /// # Errors /// Returns an error if any input file cannot be read or parsed, or if local scanning fails. pub fn load( local_run_dir: &str, inputs_path: &str, archive_store_path: &str, ) -> anyhow::Result { let mut instance = Self::default(); instance.load_from_archive(archive_store_path)?; instance.load_from_local(local_run_dir)?; instance.annotate_with_inputs(inputs_path)?; instance.save_to_archive(archive_store_path)?; Ok(instance) } /// Load flowcells from a cached archive and merge into `self.flow_cells`. /// /// Retains only the most recently modified entry for each `flowcell_id`. /// /// # Arguments /// * `archive_path` - Path to archive JSON file. /// /// # Errors /// Returns an error if the file cannot be read or parsed. pub fn load_from_archive(&mut self, archive_path: &str) -> anyhow::Result<()> { if !Path::new(archive_path).exists() { return Ok(()); } let file = File::open(archive_path) .with_context(|| format!("Failed to open archive file: {archive_path}"))?; let archived: Vec = serde_json::from_reader(BufReader::new(file)) .with_context(|| format!("Failed to parse FlowCells from: {archive_path}"))?; self.insert_flowcells(archived); Ok(()) } pub fn save_to_archive(&mut self, archive_path: &str) -> anyhow::Result<()> { let file = OpenOptions::new() .write(true) .create(true) .truncate(true) .open(archive_path) .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?; serde_json::to_writer_pretty(file, &self.flow_cells) .with_context(|| format!("Failed to write FlowCells to: {archive_path}"))?; Ok(()) } /// Scan local run directories for flowcells and merge into `self.flow_cells`. /// /// Uses `scan_local()` to parse local metadata and selects the most recently modified /// version if duplicates exist. /// /// # Arguments /// * `local_dir` - Root directory containing flowcell runs. /// /// # Errors /// Returns an error if scanning fails or if directory layout is invalid. pub fn load_from_local(&mut self, local_dir: &str) -> anyhow::Result<()> { let sample_sheets = find_files(&format!("{local_dir}/**/sample_sheet*"))?; for path in sample_sheets { let dir = path .parent() .ok_or_else(|| anyhow::anyhow!("Invalid sample_sheet path: {}", path.display()))?; let dir_str = dir.to_string_lossy(); let (sheet, pore, throughput, files) = scan_local(&dir_str) .with_context(|| format!("Failed to scan local dir: {dir_str}"))?; let fc = FlowCell::new( sheet, pore, throughput, FlowCellLocation::Local(dir_str.to_string()), files, ) .with_context(|| format!("Failed to create FlowCell from dir: {dir_str}"))?; self.insert_flowcells(vec![fc]); } Ok(()) } /// Annotate flowcells with matching `IdInput` records from JSON. /// /// Assigns to `FlowCell.cases` all matching `IdInput`s by `flow_cell_id`. /// /// # Arguments /// * `inputs_path` - Path to JSON file containing a list of `IdInput` records. /// /// # Errors /// Returns an error if the file is unreadable or malformed. pub fn annotate_with_inputs(&mut self, inputs_path: &str) -> anyhow::Result<()> { if !Path::new(inputs_path).exists() { warn!("IdsInput file not found at {}", inputs_path); return Ok(()); } let inputs = IdsInput::load_json(inputs_path) .with_context(|| format!("Failed to load IdsInput from: {inputs_path}"))?; self.cross_cases(&inputs) .with_context(|| format!("Failed to cross with IdsInput from: {inputs_path}"))?; Ok(()) } pub fn cross_cases(&mut self, inputs: &IdsInput) -> anyhow::Result<()> { for fc in &mut self.flow_cells { fc.cases = inputs .data .iter() .filter(|id| id.flow_cell_id == fc.sample_sheet.flow_cell_id) .cloned() .collect(); } Ok(()) } /// Insert new flowcells into the current collection, deduplicating by `flowcell_id`. /// /// For duplicates, retains the flowcell with the newer `.modified` timestamp. /// /// # Arguments /// * `new_cells` - A vector of parsed `FlowCell` instances. pub fn insert_flowcells(&mut self, new_cells: Vec) { let mut map: HashMap = self .flow_cells .drain(..) .map(|fc| (fc.sample_sheet.flow_cell_id.clone(), fc)) .collect(); for fc in new_cells { map.entry(fc.sample_sheet.flow_cell_id.clone()) .and_modify(|existing| { if fc.modified > existing.modified { *existing = fc.clone(); } }) .or_insert(fc); } self.flow_cells = map.into_values().collect(); } /// Updates a JSON archive of `FlowCel` objects by scanning `.tar` archives in a directory. /// /// This function is used to **discover new archived flowcells** by scanning all `.tar` files /// in a given directory, parsing their contents using [`scan_archive`] and [`FlowCel::new`], /// and then appending the results to an existing JSON file (if present). /// Flowcells are **deduplicated** by `flowcell_id`, and the updated result is saved back to disk. /// /// # Arguments /// - `archive_path`: Path to a directory containing `.tar` archives produced by MinKNOW. /// - `save_path`: Path to a JSON file where the deduplicated list of `FlowCel` objects will be saved. /// /// # Behavior /// - If `save_path` exists, the function loads existing flowcells from it. /// - Then it scans all `.tar` files in `archive_path`, one by one: /// - Extracts `sample_sheet` and `.pod5` file metadata using [`scan_archive`] /// - Builds a new [`FlowCel`] using [`FlowCel::new`] with location `FlowCellLocation::Archived(...)` /// - Logs and skips entries that fail to parse /// - All new flowcells are added to the existing list and deduplicated. /// - The updated list is sorted and written back to `save_path`. /// /// # Deduplication /// - Flowcells are deduplicated using `.dedup_by_key(|fc| fc.flowcell_id.clone())`. /// - The last encountered entry is kept if duplicates exist. /// /// # Returns /// - `Ok(())` if scanning and update succeeds. /// - `Err` if the archive directory, `.tar` files, or save path cannot be processed. /// /// # Example /// ``` /// update_archive_from_scan("archives/", "flowcell_cache.json")?; /// ``` /// /// # See Also /// - [`scan_archive`] /// - [`FlowCell`] /// - [`FlowCellLocation::Archived`] pub fn update_archive_from_scan(archive_path: &str, save_path: &str) -> anyhow::Result<()> { // Load existing archive, if any let mut all: Vec = if Path::new(save_path).exists() { let file = File::open(save_path)?; serde_json::from_reader(BufReader::new(file))? } else { Vec::new() }; let n_before = all.len(); let pattern = format!("{archive_path}/*.tar"); // Scan all .tar archives let res: Vec = glob(&pattern)? .filter_map(Result::ok) .filter_map(|path| { let archive_str = path.to_string_lossy(); let (sample_sheet, pore_activity, throughput, files) = match scan_archive(&archive_str) { Ok(r) => r, Err(e) => { warn!("Failed to scan archive {}: {e}", archive_str); return None; } }; match FlowCell::new( sample_sheet, pore_activity, throughput, FlowCellLocation::Archived(archive_path.to_string()), files, ) { Ok(fc) => Some(fc), Err(e) => { warn!("Failed to create FlowCel from {}: {e}", archive_str); None } } }) .collect(); // Merge, deduplicate, and write updated archive all.extend(res); all.sort_by(|a, b| { a.sample_sheet .flow_cell_id .cmp(&b.sample_sheet.flow_cell_id) }); all.dedup_by_key(|v| v.sample_sheet.flow_cell_id.clone()); let n_final = all.len(); info!( "{} new archive(s) discovered.", n_final.saturating_sub(n_before) ); let json = serde_json::to_string_pretty(&all) .map_err(|e| anyhow::anyhow!("Can't convert into json.\n{e}"))?; fs::write(save_path, json) .map_err(|e| anyhow::anyhow!("Can't write file: {save_path}.\n{e}"))?; Ok(()) } } /// Represents a fully described flowcell unit, including experimental metadata, /// physical location (local or archived), sample sheet data, and associated pod5 files. /// /// A `FlowCell` object serves as the central unit in the data model for sample aggregation /// and downstream processing. /// /// # Fields /// - `sample_sheet`: The original MinKNOW sample sheet metadata (`MinKnowSampleSheet`). /// - `experiment`: Experiment type inferred from `.pod5` files (see `FlowCellExperiment`). /// - `location`: Whether the flowcell was loaded from a local directory or archive store. /// - `modified`: Last modification timestamp among `.pod5` files. /// - `pod5_size`: Total size (in bytes) of `.pod5` files. /// - `n_pod5`: Number of `.pod5` files found. /// - `cases`: List of sample/case-level annotations associated with this flowcell (from `IdsInput`). #[derive(Debug, Serialize, Deserialize, Clone)] pub struct FlowCell { pub sample_sheet: MinKnowSampleSheet, pub experiment: FlowCellExperiment, pub location: FlowCellLocation, pub modified: DateTime, pub pod5_size: usize, pub n_pod5: usize, pub cases: Vec, pub pore_activity: Option>, pub throughput: Option>, } /// Describes the physical origin of a flowcell when loaded. /// /// This is used to differentiate flowcells discovered during a local scan /// versus those restored from an archived store. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum FlowCellLocation { /// Flowcell discovered in a local filesystem path. Local(String), /// Flowcell restored from a `.tar` archive or a serialized cache. Archived(String), } impl fmt::Display for FlowCellLocation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{}", match self { FlowCellLocation::Local(_) => "local", FlowCellLocation::Archived(_) => "archived", } ) } } impl FlowCell { /// Constructs a new `FlowCell` from a sample sheet and associated file list. /// /// This method aggregates information from a parsed `MinKnowSampleSheet` and the /// corresponding `.pod5` file metadata, and infers the experiment type from /// file paths using `FlowCellExperiment::from_pod5_paths`. /// /// # Arguments /// - `sample_sheet`: Parsed sample sheet metadata. /// - `location`: Origin of the flowcell (local or archived). /// - `files`: List of files associated with the flowcell, each with: /// - `String`: file path /// - `u64`: size in bytes /// - `DateTime`: modification time /// /// # Returns /// - `Ok(FlowCell)` if experiment type and file metadata are successfully resolved. /// - `Err` if the experiment type cannot be determined. /// /// # Errors /// - If `FlowCellExperiment::from_pod5_paths` fails (e.g., unknown layout). /// /// # Example /// ``` /// let fc = FlowCell::new(sample_sheet, FlowCellLocation::Local(dir), files)?; /// println!("Flowcell ID: {}", fc.flowcell_id); /// ``` pub fn new( sample_sheet: MinKnowSampleSheet, pore_activity: Option>, throughput: Option>, location: FlowCellLocation, files: Vec<(String, u64, DateTime)>, ) -> anyhow::Result { let flowcell_id = sample_sheet.flow_cell_id.clone(); // Filter .pod5 files let pod5s: Vec<_> = files .iter() .filter(|(path, _, _)| path.ends_with(".pod5")) .cloned() .collect(); let n_pod5 = pod5s.len(); // Infer experiment type from pod5 paths let experiment = FlowCellExperiment::from_pod5_paths( &files.iter().map(|(p, _, _)| p.to_string()).collect(), ) .ok_or_else(|| anyhow::anyhow!("Can't find experiment type for {flowcell_id}"))?; // Aggregate pod5 size and latest modification time let (pod5_size, modified): (usize, DateTime) = files .into_iter() .filter(|(path, _, _)| path.ends_with(".pod5")) .fold( (0, DateTime::::MIN_UTC), |(acc_size, acc_time), (_, size, time)| { ( acc_size + size as usize, if acc_time < time { time } else { acc_time }, ) }, ); Ok(Self { experiment, location, modified, sample_sheet, pod5_size, n_pod5, cases: Vec::new(), pore_activity, throughput, }) } } /// Describes the type of experiment layout based on `.pod5` file structure. /// /// Used to distinguish between whole-genome sequencing (WGS) `.pod5` files /// organized in a single (muxed) directory or demultiplexed (`pod5_pass`) structure. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum FlowCellExperiment { /// `.pod5` files are stored in a single unbarcoded directory, typically `/pod5/`. WGSPod5Mux(String), /// `.pod5` files are organized by barcode in subdirectories, typically `/pod5_pass/`. WGSPod5Demux(String), } impl fmt::Display for FlowCellExperiment { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{}", match self { FlowCellExperiment::WGSPod5Mux(_) => "WGS Pod5 Muxed", FlowCellExperiment::WGSPod5Demux(_) => "WGS Pod5 Demuxed", } ) } } impl FlowCellExperiment { /// Attempts to infer the experiment type from the immediate subdirectories of the given path. /// /// This is useful when scanning a flowcell directory directly and checking /// whether it contains a `pod5/` or `pod5_pass/` structure. /// /// # Arguments /// - `flowcell_path`: Path to the root of a flowcell directory. /// /// # Returns /// - `Some(FlowCellExperiment)` if a known subdirectory is found. /// - `None` if no match is detected. pub fn from_path(flowcell_path: &str) -> Option { for dir in list_directories(flowcell_path).ok().unwrap_or_default() { if dir == "pod5" { return Some(FlowCellExperiment::WGSPod5Mux(dir.to_string())); } if dir == "pod5_pass" { return Some(FlowCellExperiment::WGSPod5Demux(dir.to_string())); } } None } /// Attempts to infer the experiment type from a list of `.pod5` file paths. /// /// This is typically used when files have already been collected and their /// parent directories can be checked for naming conventions. /// /// # Arguments /// - `all_paths`: Vector of paths (as strings) to `.pod5` files or directories. /// /// # Returns /// - `Some(FlowCellExperiment)` if a known suffix is detected. /// - `None` if no matching pattern is found. pub fn from_pod5_paths(all_paths: &Vec) -> Option { for path in all_paths { if path.ends_with("/pod5/") || path.ends_with("/pod5") { return Some(FlowCellExperiment::WGSPod5Mux(path.to_string())); } if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") { return Some(FlowCellExperiment::WGSPod5Demux(path.to_string())); } } None } /// Returns the underlying string (directory path) for the experiment. /// /// This is useful when you need access to the directory path used to classify the experiment. pub fn inner(&self) -> &str { match self { FlowCellExperiment::WGSPod5Mux(v) => v, FlowCellExperiment::WGSPod5Demux(v) => v, } } } /// Represents the result of scanning a MinKNOW experiment source. /// /// This tuple includes: /// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample. /// - `Vec<(String, u64, DateTime)>`: A list of files with: /// - `String`: file path /// - `u64`: file size in bytes /// - `DateTime`: last modification time (UTC) type ExperimentData = ( MinKnowSampleSheet, Option>, Option>, Vec<(String, u64, DateTime)>, ); /// Scans a local directory for MinKNOW experiment files and metadata. /// /// This function recursively walks a directory using globbing, /// collects file paths, sizes, and modification timestamps, /// and identifies a `sample_sheet` file to parse as `MinKnowSampleSheet`. /// /// # Arguments /// - `dir`: Root directory to scan (absolute or relative). /// /// # Returns /// - `Ok(ExperimentData)` containing the sample sheet and a list of file records. /// - `Err` if the directory can't be accessed, no sample sheet is found, or parsing fails. /// /// # Requirements /// - A file path containing `"sample_sheet"` must be present and readable. /// - The sample sheet must be formatted according to MinKNOW expectations /// (1 header + 1 data row). /// /// # Errors /// - If reading files or metadata fails. /// - If the sample sheet is missing or invalid. /// /// # Example /// ``` /// let (sheet, files) = scan_local("/data/run001")?; /// println!("Kit used: {}", sheet.kit); /// println!("Number of files found: {}", files.len()); /// ``` pub fn scan_local(dir: &str) -> anyhow::Result { let mut result = Vec::new(); let mut sample_sheet: Option = None; let mut pore_activity = None; let mut throughput = None; for entry in glob(&format!("{}/**/*", dir))? { let file = entry.context("Failed to read an entry from the tar archive")?; // Extract file properties safely let metadata = file.metadata().context(format!( "Failed to access file metadata: {}", file.display() ))?; let size = metadata.size(); let modified = metadata.mtime(); let modified_utc: DateTime = Utc.timestamp_opt(modified as i64, 0).unwrap(); let path = file.to_string_lossy().into_owned(); if path.contains("pore_activity") { let mut reader = File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?; pore_activity = Some(parse_pore_activity_from_reader(&mut reader).with_context( || { format!( "Failed to parse pore activity date from: {}", file.display() ) }, )?); } else if path.contains("sample_sheet") { sample_sheet = Some(path.clone()); } else if path.contains("throughput") { let mut reader = File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?; throughput = Some(parse_throughput_from_reader(&mut reader).with_context(|| { format!("Failed to parse throughput date from: {}", file.display()) })?); } result.push((path, size, modified_utc)); } let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?; let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet) .context(anyhow::anyhow!("Can't parse sample sheet data"))?; Ok((sample_sheet, pore_activity, throughput, result)) } /// Scans a `.tar` archive containing a MinKNOW sequencing experiment. /// /// This function opens a TAR archive, searches for the `sample_sheet` CSV file, /// extracts its metadata, and parses it into a `MinKnowSampleSheet`. /// All other entries in the archive are collected with their path, size, and modification time. /// /// # Arguments /// - `tar_path`: Path to the `.tar` archive file. /// /// # Returns /// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata. /// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`. /// /// # Archive Requirements /// - Must contain exactly one file matching `"sample_sheet"` in its path. /// - The sample sheet must contain a valid CSV header and a single data row. /// /// # Errors /// - Fails if the archive can't be opened or read. /// - Fails if any entry is malformed (e.g., missing timestamp). /// - Fails if no sample sheet is found or if it is malformed. /// /// # Example /// ```no_run /// let (sample_sheet, files) = scan_archive("archive.tar")?; /// println!("Sample ID: {}", sample_sheet.sample_id); /// println!("Total files in archive: {}", files.len()); /// ``` pub fn scan_archive(tar_path: &str) -> anyhow::Result { info!("Scanning archive: {tar_path}"); let file = File::open(tar_path) .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?; let mut archive = tar::Archive::new(file); let mut result = Vec::new(); let mut sample_sheet: Option = None; let mut throughput = None; let mut pore_activity = None; // Iterate through the entries in the archive for entry in archive.entries_with_seek()? { let mut file = entry.context("Failed to read an entry from the tar archive")?; // Extract file properties safely let size = file.size(); let modified = file .header() .mtime() .context("Failed to get modification time")?; let modified_utc: DateTime = Utc.timestamp_opt(modified as i64, 0).unwrap(); let path = file .path() .context("Failed to get file path from tar entry")? .to_string_lossy() .into_owned(); if path.contains("pore_activity") { pore_activity = Some( parse_pore_activity_from_reader(&mut file) .context("Failed to read pore_activity data: {path}")?, ); } else if path.contains("sample_sheet") { let mut buffer = String::new(); file.read_to_string(&mut buffer) .context("Failed to read file contents as string")?; sample_sheet = Some(buffer); } else if path.contains("throughput") { throughput = Some( parse_throughput_from_reader(&mut file) .context("Failed to read throughput data: {path}")?, ); } result.push((path, size, modified_utc)); } let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!( "No sample sheet detected in archive: {tar_path}" ))?; let (_, data) = sample_sheet .split_once("\n") .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?; let sample_sheet: MinKnowSampleSheet = data .try_into() .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?; Ok((sample_sheet, pore_activity, throughput, result)) }