use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; use csv::ReaderBuilder; use glob::glob; use hashbrown::HashMap; use log::{info, warn}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::{ fmt::Display, fs::{self, File, Metadata}, os::unix::fs::MetadataExt, path::{Path, PathBuf}, }; use crate::io::pod5_infos::Pod5Info; /// Represents a collection of Pod5 sequencing runs and associated metadata. /// /// A `Pod5Collection` groups multiple sequencing runs (`Run`), each consisting of /// one or more flow cells. It is initialized by scanning a directory of `.pod5` files, /// optionally mapping flow cell names to corrected identifiers, and assigning BAM and /// `.pod5` directories. /// /// # Fields /// - `importation_date`: Timestamp of when this collection was created. /// - `runs`: List of runs with associated flow cells and metadata. /// - `bam_dir`: Directory containing BAM files. /// - `pod5_dir`: Directory containing `.pod5` files. #[derive(Debug, Default)] pub struct Pod5Collection { pub importation_date: DateTime, pub runs: Vec, pub bam_dir: String, pub pod5_dir: String, } impl Pod5Collection { /// Constructs a new `Pod5Collection` by scanning the given `.pod5` directory, /// applying corrected flowcell naming, and grouping data by run. /// /// # Arguments /// - `pod5_dir`: Path to directory containing `.pod5` files. /// - `corrected_fc_path`: Path to file with corrected flowcell mappings. /// - `bam_dir`: Path to directory containing BAM files. /// /// # Returns /// - `Ok(Pod5Collection)` if the data is consistent and valid. /// - `Err(anyhow::Error)` if listing, parsing, or validation fails. pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result { // Load pod5 files let pod5_files = list_pod_files(pod5_dir)?; info!("n pod5 {}", pod5_files.len()); // Group pod5 files by run-flowcell key let mut grouped: HashMap> = HashMap::new(); for pod in pod5_files { let key = format!("{}••{}", pod.run_name, pod.flowcell_name); grouped.entry(key).or_default().push(pod); } // Load corrected flowcell mapping let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?; // Construct FlowCells in parallel from Pod5 groups let flowcells: Vec = grouped .into_values() .par_bridge() .map(|group| FlowCell::new(group, &corrected_fc)) .collect::>>()?; // Group FlowCells by run_name (sequential step) let mut runs_map: HashMap> = HashMap::new(); for fc in flowcells { runs_map.entry(fc.run_name.clone()).or_default().push(fc); } // Convert each run group into a Run let runs: Vec = runs_map .into_values() .map(|fcs| Run { run_name: fcs[0].run_name.clone(), flowcells: fcs, }) .collect(); Ok(Self { importation_date: Utc::now(), runs, bam_dir: bam_dir.to_string(), pod5_dir: pod5_dir.to_string(), }) } pub fn print_info(&self) { self.runs.iter().for_each(|run| { run.flowcells.iter().for_each(|fc| { let total_size: u64 = fc.pod5.iter().map(|p| p.file_metadata.size()).sum(); let n_files = fc.pod5.len(); let dates: Vec> = fc .pod5 .iter() .map(|p| p.file_metadata.modified().unwrap().into()) .collect(); let from = dates.iter().min().unwrap(); let to = dates.iter().max().unwrap(); let s = [ run.run_name.clone(), from.to_string(), to.to_string(), n_files.to_string(), total_size.to_string(), fc.flowcell_name.to_string(), fc.pod5_type.to_string(), fc.pod5_info.acquisition_id.clone(), format!("{:?}", fc.cases), ] .join("\t"); println!("{s}"); }); }); } /// Returns a sorted and deduplicated list of all unique `FlowCellCase` IDs in the collection. pub fn ids(&self) -> Vec { let mut ids: Vec = self .runs .iter() .flat_map(|r| r.flowcells.iter()) .flat_map(|f| f.cases.iter().map(|c| c.id.clone())) .collect(); ids.sort_unstable(); // faster than sort() ids.dedup(); ids } } /// Represents a sequencing run, which may contain multiple flowcells. /// /// A `Run` groups flowcells that were processed together during a sequencing event /// (e.g., a MinION or PromethION run). It serves as a logical grouping for downstream analysis. /// /// # Fields /// - `run_name`: Unique identifier for the sequencing run (e.g., "20240301_RUN42"). /// - `flowcells`: List of `FlowCell` objects associated with this run. #[derive(Debug)] pub struct Run { /// Name of the sequencing run. pub run_name: String, /// Flowcells that belong to this run. pub flowcells: Vec, } /// Represents a flowcell and its associated metadata, cases, and `.pod5` files. /// /// A `FlowCell` encapsulates all relevant information needed to track, /// identify, and process a physical flowcell, including its corrected name, /// acquisition metadata, and associated `.pod5` files. /// /// # Fields /// - `flowcell_name`: Original name of the flowcell as found in `.pod5` files. /// - `corrected_name`: Normalized or corrected version of the flowcell name (if available). /// - `cases`: Associated cases (`FlowCellCase`) for this flowcell, usually representing samples or barcodes. /// - `run_name`: Name of the sequencing run this flowcell belongs to. /// - `pod5_type`: Whether the `.pod5` files are raw or demultiplexed (`Pod5Type`). /// - `pod5_info`: Metadata extracted from one representative `.pod5` file (`Pod5Info`). /// - `pod5`: All `.pod5` file entries associated with this flowcell. #[derive(Debug, Clone)] pub struct FlowCell { /// Original flowcell name (e.g., "FCX123"). pub flowcell_name: String, /// Corrected flowcell name, if normalization was applied. pub corrected_name: String, /// Sample/barcode-level associations for this flowcell. pub cases: Vec, /// The sequencing run this flowcell belongs to. pub run_name: String, /// Type of pod5 data: raw or demuxed. pub pod5_type: Pod5Type, /// Metadata extracted from a `.pod5` file, including acquisition ID. pub pod5_info: Pod5Info, /// The list of `.pod5` files linked to this flowcell. pub pod5: Vec, } impl FlowCell { /// Constructs a new `FlowCell` from a non-empty vector of `Pod5` entries /// and a list of corrected flowcell mappings. /// /// Ensures that all entries in the vector share the same `run_name`, `flowcell_name`, and `pod5_type`. /// /// # Arguments /// - `pods`: A non-empty vector of `Pod5` entries (moved, not cloned). /// - `corrected_fc`: Reference to a list of `FCLine` entries for resolving corrected names. /// /// # Errors /// Returns an error if: /// - `pods` is empty /// - `.pod5` path is invalid UTF-8 /// - inconsistent metadata across pod5 entries /// - multiple corrected names are found /// - parent directory resolution fails pub fn new(pods: Vec, corrected_fc: &[FCLine]) -> anyhow::Result { let first = pods.first().context("Empty pod5 list for FlowCell")?; let flowcell_name = &first.flowcell_name; let run_name = &first.run_name; let pod5_type = &first.pod5_type; // Consistency check let inconsistent = pods.iter().any(|p| { p.flowcell_name != *flowcell_name || p.run_name != *run_name || p.pod5_type != *pod5_type }); if inconsistent { return Err(anyhow!( "Inconsistent pod5 metadata: all entries must share the same run_name, flowcell_name, and pod5_type" )); } // Extract and validate .pod5 path let path_str = first.path.to_str().context("Invalid UTF-8 in pod5 path")?; let pod5_info = Pod5Info::from_pod5(path_str); // Select corrected entries for this flowcell let matched_fc_lines: Vec<_> = corrected_fc .iter() .filter(|e| e.flow_cell == *flowcell_name) .cloned() .collect(); // Resolve unique corrected name let corrected_name = { let mut names: Vec<_> = matched_fc_lines .iter() .map(|e| e.ref_flow_cell.clone()) .filter(|s| !s.is_empty()) .collect(); names.dedup(); match names.len() { 0 => String::new(), 1 => names[0].clone(), _ => { return Err(anyhow!( "Multiple corrected names for flow cell '{}': {:?}", flowcell_name, names )); } } }; // Cache parent directories let raw_parent = first.path.parent().context("Missing parent for RAW pod5")?; let demuxed_grandparent = raw_parent .parent() .context("Invalid directory structure for DEMUXED pod5")?; // Build case list let cases = matched_fc_lines .iter() .map(|e| { let pod_dir = match pod5_type { Pod5Type::Raw => raw_parent.to_path_buf(), Pod5Type::Demuxed => { let mut bc_dir = demuxed_grandparent.to_path_buf(); bc_dir.push(format!("barcode{}", e.barcode_number.replace("NB", ""))); bc_dir } }; Ok(FlowCellCase { id: e.id.clone(), time_point: e.sample_type.clone(), barcode: e.barcode_number.clone(), pod_dir, }) }) .collect::>()?; Ok(Self { flowcell_name: flowcell_name.clone(), corrected_name, cases, run_name: run_name.clone(), pod5_type: pod5_type.clone(), pod5_info, pod5: pods, // Already moved }) } } /// Represents the type of `.pod5` file: either raw or demultiplexed. #[derive(Debug, Clone, PartialEq)] pub enum Pod5Type { /// Raw `.pod5` files directly from acquisition. Raw, /// Demultiplexed `.pod5` files, post-processed by barcoding. Demuxed, } impl Display for Pod5Type { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match self { Pod5Type::Raw => "raw", Pod5Type::Demuxed => "demuxed", }; f.write_str(s) } } /// Configuration for interpreting file paths when parsing `.pod5` files. #[derive(Debug, Clone)] pub struct Pod5Config { /// Base directory (prefix to strip from full paths). pub base_dir: String, /// Substring used to detect "raw" pod5 files. pub type_raw: String, /// Substring used to detect "demuxed" pod5 files. pub type_demuxed: String, /// Index (in path components) where `run_name` is expected. pub run_dir_n: u8, /// Index (in path components) where `flowcell_name` is expected. pub flowcell_dir_n: u8, } impl Default for Pod5Config { fn default() -> Self { Self { base_dir: "/data/run_data".to_string(), type_raw: "/pod5/".to_string(), type_demuxed: "/pod5_pass/".to_string(), run_dir_n: 0, flowcell_dir_n: 1, } } } /// Represents a `.pod5` file and its associated metadata and location info. /// /// Used as the base object for flowcell and run aggregation. #[derive(Debug, Clone)] pub struct Pod5 { /// Full path to the `.pod5` file. pub path: PathBuf, /// Whether the file is raw or demultiplexed. pub pod5_type: Pod5Type, /// Name of the sequencing run this file belongs to. pub run_name: String, /// Name of the flowcell associated with this file. pub flowcell_name: String, /// Filesystem metadata (e.g., size, modified time). pub file_metadata: Metadata, } impl Pod5 { /// Constructs a `Pod5` instance from a file path, using a `Pod5Config` to infer type and extract metadata. /// /// # Arguments /// - `path`: Path to the `.pod5` file. /// - `config`: Configuration used to interpret the path structure. /// /// # Returns /// - `Ok(Pod5)` if type and components can be extracted. /// - `Err` if path is malformed, missing components, or type is unrecognized. pub fn from_path(path: impl AsRef, config: &Pod5Config) -> Result { let path = path.as_ref(); let path_str = path .to_str() .context(format!("Can't convert path to UTF-8 string: {:?}", path))?; // Determine Pod5 type by pattern matching let pod5_type = if path_str.contains(&config.type_raw) { Pod5Type::Raw } else if path_str.contains(&config.type_demuxed) { Pod5Type::Demuxed } else { return Err(anyhow!( "Unable to determine pod5 type from path: {}", path_str )); }; // Extract metadata from filesystem let file_metadata = fs::metadata(path).with_context(|| format!("Failed to get metadata for {:?}", path))?; // Strip base_dir and split into components let relative_path = path_str.strip_prefix(&config.base_dir).unwrap_or(path_str); // fallback to full path if base_dir is not a prefix let components: Vec<&str> = relative_path.split('/').filter(|c| !c.is_empty()).collect(); // Extract run_name and flowcell_name from path components let run_name = components .get(config.run_dir_n as usize) .context("Missing run_name in path")? .to_string(); let flowcell_name = components .get(config.flowcell_dir_n as usize) .context("Missing flowcell_name in path")? .to_string(); Ok(Self { path: path.to_path_buf(), pod5_type, run_name, flowcell_name, file_metadata, }) } } /// Recursively scans a directory for `.pod5` files and parses them into `Pod5` objects. /// /// This function uses glob-based search to find all `.pod5` files under the given directory /// (including subdirectories), then filters out unwanted paths (e.g., `pod5_fail/`, `pod5_skip/`) /// and attempts to parse each remaining file using `Pod5::from_path`. /// /// Any file that fails to parse is skipped with a warning. /// /// # Arguments /// - `dir`: Path to the root directory to search (absolute or relative). /// /// # Returns /// - `Ok(Vec)` on success, with all successfully parsed `.pod5` files. /// - `Err(anyhow::Error)` if path parsing fails (e.g., invalid UTF-8). /// /// # Errors /// - Fails early if the glob pattern itself is invalid. /// - Skips over files that fail to parse, but logs warnings. /// /// # Notes /// - Directories containing `/pod5_fail/` or `/pod5_skip/` are excluded. /// - The glob pattern used is `{dir}/**/*.pod5`. /// /// # Example /// ``` /// let pod_files = list_pod_files("/data/pods")?; /// for pod in pod_files { /// println!("{}", pod.path.display()); /// } /// ``` pub fn list_pod_files(dir: &str) -> Result> { let pattern = format!("{}/**/*.pod5", dir); let mut pod_files = Vec::new(); let conf = Pod5Config { base_dir: if dir.ends_with('/') { dir.to_string() } else { format!("{dir}/") }, ..Pod5Config::default() }; for entry in glob(&pattern).expect("Failed to read glob pattern") { match entry { Ok(path) => { let p = path.to_str().context("Can't parse path to string {path}")?; if p.contains("/pod5_fail/") || p.contains("/pod5_skip/") { continue; } match Pod5::from_path(&path, &conf) { Ok(pod5) => pod_files.push(pod5), Err(e) => warn!("{e}"), } } Err(e) => warn!("Error: {:?}", e), } } Ok(pod_files) } // impl FlowCell { // pub fn cases_pod5_dir(&self) -> Vec { // match self.pod5_type { // Pod5Type::Raw => { // let p = self.pod5.first().unwrap(); // vec![p.path.parent().unwrap().to_path_buf()] // }, // Pod5Type::Demuxed => { // self.cases.iter().map(|c| { // let str_barcode = format!("barcode{}", c.barcode); // }) // }, // } // } // } #[derive(Debug, Clone, Default)] pub struct FlowCellCase { pub id: String, pub time_point: String, pub barcode: String, pub pod_dir: PathBuf, // pub basecalled: Option, } // #[derive(Debug, Serialize, Deserialize, Clone)] // pub struct IdsInput { // pub data: Vec, // } // // #[derive(Debug, Serialize, Deserialize, Clone)] // pub struct IdInput { // pub id: String, // pub time_point: String, // pub barcode: String, // pub flow_cell: String, // pub run: String, // } // // // Implement PartialEq and Eq for IdInput // impl PartialEq for IdInput { // fn eq(&self, other: &Self) -> bool { // self.id == other.id // && self.time_point == other.time_point // && self.barcode == other.barcode // && self.flow_cell == other.flow_cell // && self.run == other.run // } // } // // impl Eq for IdInput {} // // // Implement Hash for IdInput // impl Hash for IdInput { // fn hash(&self, state: &mut H) { // self.id.hash(state); // self.time_point.hash(state); // self.barcode.hash(state); // self.flow_cell.hash(state); // self.run.hash(state); // } // } // // impl IdsInput { // pub fn load_json(path: &str) -> anyhow::Result { // let f = File::open(path)?; // let s: Self = serde_json::from_reader(f)?; // Ok(s) // } // // pub fn save_json(&self, path: &str) -> anyhow::Result<()> { // let f = File::create(path)?; // serde_json::to_writer(f, self)?; // Ok(()) // } // // pub fn dedup(&mut self) { // let mut unique = HashSet::new(); // self.data.retain(|item| unique.insert(item.clone())); // } // // pub fn load_from_tsv(path: &str) -> anyhow::Result { // let inputs = load_flowcells_corrected_names(path)?; // let data = inputs // .iter() // .map(|line| IdInput { // id: line.id.to_string(), // time_point: line.sample_type.to_string(), // barcode: line.barcode_number.to_string(), // flow_cell: line.flow_cell.to_string(), // run: line.run.to_string(), // }) // .collect(); // // let mut res = Self { data }; // res.dedup(); // Ok(res) // } // // pub fn add_input(&mut self, values: IdInput) { // self.data.push(values); // self.dedup(); // } // } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Pod5Run { pub protocol_run_id: String, pub position_id: String, pub flow_cell_id: String, pub id: String, pub time_point: String, pub barcode_number: String, pub flow_cell: String, pub run: String, pub last_pod_dir: (DateTime, String), pub archives: Vec<(String, DateTime, String)>, } /// Loads corrected flowcell metadata from a tab-delimited file. /// /// This function parses a TSV file where each row is deserialized into an `FCLine`. /// It also normalizes some fields (e.g., lowercases `sample_type`, uppercases `id`) /// for consistency in downstream processing. /// /// # Arguments /// - `file_path`: Path to the TSV file containing flowcell correction data. /// /// # Returns /// A vector of `FCLine` records, one per line in the file. /// /// # Errors /// Returns an error if the file cannot be opened or if any line fails to deserialize. /// /// # Expected Format (TSV with header) /// ```text /// id sample_type barcode_number flow_cell run_path ref_flow_cell /// P001X03 tumoral NB01 FC123 RUN123 /path/to/data FC123_CORR /// ``` /// /// # Example /// ``` /// let fc_lines = load_flowcells_corrected_names("flowcells.tsv")?; /// assert!(!fc_lines.is_empty()); /// ``` pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result> { let file = File::open(file_path)?; let mut rdr = ReaderBuilder::new() .delimiter(b'\t') .has_headers(true) .from_reader(file); let mut records = Vec::new(); for result in rdr.deserialize() { let mut record: FCLine = result?; // formating record.sample_type = record.sample_type.to_lowercase(); record.id = record.id.to_uppercase(); records.push(record); } Ok(records) } /// Represents a single record describing a barcode-flowcell pairing, /// including original and corrected metadata. /// /// This struct is typically deserialized from a TSV file and used to map /// `.pod5` files to metadata like corrected flowcell names and experimental time points. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct FCLine { /// Unique identifier for the sample or barcode group (e.g., "P001X03"). pub id: String, /// Sample type associated with this record (e.g., "normal", "tumoral"). pub sample_type: String, /// The barcode number (e.g., "NB01", "NB02"). pub barcode_number: String, /// Original flowcell name as found in the raw `.pod5` metadata. pub flow_cell: String, /// Sequencing run name this flowcell belongs to (e.g., "20240101_FAB123"). pub run: String, /// Original path to data (can be absolute or relative). pub path: String, /// Corrected flowcell name used to resolve naming inconsistencies. pub ref_flow_cell: String, }