use std::{ collections::HashSet, fmt, fs, path::{Path, PathBuf}, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::{ collection::flowcells::IdInput, helpers::{human_size, list_files_with_ext}, io::pod5_infos::Pod5Info, }; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Pod5 { pub name: String, pub file_size: u64, pub path: PathBuf, pub acquisition_id: String, pub acquisition_start_time: DateTime, pub adc_max: i16, pub adc_min: i16, pub experiment_name: String, pub flow_cell_id: String, pub flow_cell_product_code: String, pub protocol_name: String, pub protocol_run_id: String, pub protocol_start_time: DateTime, pub sample_id: String, pub sample_rate: u16, pub sequencing_kit: String, pub sequencer_position: String, pub sequencer_position_type: String, pub software: String, pub system_name: String, pub system_type: String, } impl fmt::Display for Pod5 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "๐Ÿ“ {}", self.name)?; writeln!(f, " size : {} bytes", self.file_size)?; writeln!(f, " path : {}", self.path.display())?; writeln!(f, " experiment: {}", self.experiment_name)?; writeln!( f, " flow cell : {} ({})", self.flow_cell_id, self.flow_cell_product_code )?; writeln!(f, "๐Ÿงช Sample")?; writeln!(f, " id : {}", self.sample_id)?; writeln!(f, " kit : {}", self.sequencing_kit)?; writeln!(f, " rate : {} Hz", self.sample_rate)?; writeln!(f, "๐Ÿ”ฌ Acquisition")?; writeln!(f, " id : {}", self.acquisition_id)?; writeln!(f, " start : {}", self.acquisition_start_time)?; writeln!(f, " ADC : [{} .. {}]", self.adc_min, self.adc_max)?; writeln!(f, "โš™๏ธ Protocol")?; writeln!(f, " name : {}", self.protocol_name)?; writeln!(f, " run id : {}", self.protocol_run_id)?; writeln!(f, " started : {}", self.protocol_start_time)?; writeln!(f, "๐Ÿ–ฅ๏ธ System")?; writeln!(f, " {} / {}", self.system_name, self.system_type)?; writeln!(f, " software : {}", self.software) } } impl Pod5 { /// Construct a `Pod5` from a filesystem path. /// /// This loads the metadata using `Pod5Info::from_pod5` and fills the /// corresponding fields in `Pod5`. pub fn from_path>(path: P) -> anyhow::Result { let path_ref = path.as_ref(); // Convert path to string, returning an error if it contains invalid UTF-8 let path_str = path_ref.to_str().ok_or_else(|| { anyhow::anyhow!("Path contains invalid UTF-8: {}", path_ref.display()) })?; // Pod5Info::from_pod5 now returns Result let info = Pod5Info::from_pod5(path_str)?; let file_size = std::fs::metadata(path_ref) .map_err(|e| { anyhow::anyhow!( "Failed to read metadata for '{}': {}", path_ref.display(), e ) })? .len(); Ok(Self { name: path_ref .file_name() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(), file_size, path: PathBuf::from(path_ref), acquisition_id: info.acquisition_id, acquisition_start_time: info.acquisition_start_time, adc_max: info.adc_max, adc_min: info.adc_min, experiment_name: info.experiment_name, flow_cell_id: info.flow_cell_id, flow_cell_product_code: info.flow_cell_product_code, protocol_name: info.protocol_name, protocol_run_id: info.protocol_run_id, protocol_start_time: info.protocol_start_time, sample_id: info.sample_id, sample_rate: info.sample_rate, sequencing_kit: info.sequencing_kit, sequencer_position: info.sequencer_position, sequencer_position_type: info.sequencer_position_type, software: info.software, system_name: info.system_name, system_type: info.system_type, }) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Pod5sRun { pub run_id: String, pub flow_cell_id: String, pub sequencing_kit: String, pub cases: Vec, pub pod5s: Vec, pub bams_pass: Option, } impl fmt::Display for Pod5sRun { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "๐Ÿš€ Run {}", self.run_id)?; writeln!(f, " Flow cell : {}", self.flow_cell_id)?; writeln!(f, " Sequencing kit : {}", self.sequencing_kit)?; writeln!(f, " Cases : {}", self.cases.len())?; for c in &self.cases { writeln!(f, " โ€ข {}", c)?; } writeln!( f, " Pod5 files : {} (showing 0 details)", self.pod5s.len() )?; if let Some(ref bam) = self.bams_pass { writeln!(f, " BAM pass : {}", bam.display())?; } Ok(()) } } impl Pod5sRun { /// Load all `.pod5` files from a directory and build a collection. pub fn load_from_dir>(dir: P) -> anyhow::Result { Self::load_from_dirs(std::iter::once(dir)) } /// Load all `.pod5` files from multiple directories and build a collection. /// /// All files must share the same `run_id`, `flow_cell_id` and `sequencing_kit`. pub fn load_from_dirs(dirs: I) -> anyhow::Result where P: AsRef, I: IntoIterator, { let mut pod5s = Vec::new(); let mut flow_cell_id: Option = None; let mut sequencing_kit: Option = None; let mut run_id: Option = None; let mut skipped = 0usize; let mut any_pod_files = false; for dir in dirs { let dir = dir.as_ref(); let pod_paths = list_files_with_ext(dir, "pod5")?; if pod_paths.is_empty() { log::debug!("No .pod5 files found in directory: {}", dir.display()); continue; } any_pod_files = true; for p in pod_paths.iter() { let pod = match Pod5::from_path(p) { Ok(pod) => pod, Err(e) => { log::debug!("Skipping corrupted POD5 file '{}': {}", p.display(), e); skipped += 1; continue; } }; // run_id uniqueness check match &run_id { None => run_id = Some(pod.protocol_run_id.clone()), Some(exp) if &pod.protocol_run_id != exp => { anyhow::bail!( "Mixed run IDs: expected '{}', found '{}' (file: {})", exp, pod.protocol_run_id, pod.path.display() ); } _ => {} } // flow_cell_id uniqueness check match &flow_cell_id { None => flow_cell_id = Some(pod.flow_cell_id.clone()), Some(exp) if &pod.flow_cell_id != exp => { anyhow::bail!( "Mixed flow cells: expected '{}', found '{}' (file: {})", exp, pod.flow_cell_id, pod.path.display() ); } _ => {} } // sequencing_kit uniqueness check match &sequencing_kit { None => sequencing_kit = Some(pod.sequencing_kit.clone()), Some(exp) if &pod.sequencing_kit != exp => { anyhow::bail!( "Mixed sequencing kits: expected '{}', found '{}' (file: {})", exp, pod.sequencing_kit, pod.path.display() ); } _ => {} } pod5s.push(pod); } } if !any_pod_files { anyhow::bail!("No .pod5 files found in any directory"); } let run_id = run_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?; let flow_cell_id = flow_cell_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?; let sequencing_kit = sequencing_kit.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?; if skipped > 0 { log::debug!( "Skipped {} corrupted POD5 file(s) across directories", skipped ); } Ok(Self { run_id, flow_cell_id, sequencing_kit, cases: Vec::new(), pod5s, bams_pass: None, }) } pub fn add_id_input(&mut self, id_input: IdInput) { self.cases.push(id_input); } /// Compute summary statistics for the collection. pub fn stats(&self) -> Pod5sFlowCellStats { if self.pod5s.is_empty() { return Pod5sFlowCellStats { run_id: self.run_id.clone(), flow_cell_id: self.flow_cell_id.clone(), sequencing_kit: self.sequencing_kit.clone(), count: 0, total_size: 0, min_acq: None, max_acq: None, min_protocol: None, max_protocol: None, avg_sample_rate: None, }; } let count = self.pod5s.len(); let total_size = self.pod5s.iter().map(|p| p.file_size).sum(); let (min_acq, max_acq) = self.pod5s.iter().map(|p| p.acquisition_start_time).fold( ( self.pod5s[0].acquisition_start_time, self.pod5s[0].acquisition_start_time, ), |(minv, maxv), t| (minv.min(t), maxv.max(t)), ); let (min_protocol, max_protocol) = self.pod5s.iter().map(|p| p.protocol_start_time).fold( ( self.pod5s[0].protocol_start_time, self.pod5s[0].protocol_start_time, ), |(minv, maxv), t| (minv.min(t), maxv.max(t)), ); let avg_sample_rate = Some(self.pod5s.iter().map(|p| p.sample_rate as f64).sum::() / count as f64); Pod5sFlowCellStats { run_id: self.run_id.clone(), flow_cell_id: self.flow_cell_id.clone(), sequencing_kit: self.sequencing_kit.clone(), count, total_size, min_acq: Some(min_acq), max_acq: Some(max_acq), min_protocol: Some(min_protocol), max_protocol: Some(max_protocol), avg_sample_rate, } } } #[derive(Debug, Clone)] pub struct Pod5sFlowCellStats { pub run_id: String, pub flow_cell_id: String, pub sequencing_kit: String, pub count: usize, pub total_size: u64, pub min_acq: Option>, pub max_acq: Option>, pub min_protocol: Option>, pub max_protocol: Option>, pub avg_sample_rate: Option, } impl fmt::Display for Pod5sFlowCellStats { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "Pod5 Flow Cell Stats")?; writeln!(f, "---------------------")?; writeln!(f, "Run ID: {}", self.run_id)?; writeln!(f, "Flow Cell ID: {}", self.flow_cell_id)?; writeln!(f, "Sequencing kit: {}", self.sequencing_kit)?; writeln!(f, "Count: {}", self.count)?; writeln!( f, "Total Size: {} ({} bytes)", human_size(self.total_size), self.total_size )?; if let Some(t) = self.min_acq { writeln!(f, "Acquisition Start (min): {}", t)?; } if let Some(t) = self.max_acq { writeln!(f, "Acquisition Start (max): {}", t)?; } if let Some(t) = self.min_protocol { writeln!(f, "Protocol Start (min): {}", t)?; } if let Some(t) = self.max_protocol { writeln!(f, "Protocol Start (max): {}", t)?; } if let Some(avg) = self.avg_sample_rate { writeln!(f, "Average Sample Rate: {:.2}", avg)?; } Ok(()) } } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Pod5sRuns { pub data: Vec, } impl Pod5sRuns { pub fn new() -> Self { Self::default() } /// Add a new `Pod5sRun` by scanning a directory of `.pod5` files. /// /// - Builds a `Pod5sRun` via [`Pod5sRun::load_from_dir`]. /// - If **no** existing run has the same `(run_id, flow_cell_id, sequencing_kit)`, /// the new run is appended to `data`. /// - If a run **already exists** with these three identifiers, its `pod5s` list is /// **merged** with the new one: /// - New `Pod5` entries are only added if their file name (from `pod.path.file_name()`) /// does **not** already exist in the run. /// - Duplicate file names are silently skipped (no error). /// Add a new `Pod5sRun` by scanning a directory of `.pod5` files. /// /// - Builds a `Pod5sRun` via [`Pod5sRun::load_from_dir`]. /// - Optionally attaches a `bams_pass` directory to the run. /// - If **no** existing run has the same `(run_id, flow_cell_id, sequencing_kit)`, /// the new run is appended to `data`. /// - If a run **already exists** with these three identifiers: /// - `pod5s` are merged by file name (duplicates skipped). /// - `bams_pass`: /// * if existing has `Some` and new is `None` โ†’ keep existing. /// * if existing is `None` and new is `Some` โ†’ set existing to new. /// * if both `Some` but different โ†’ error (conflicting BAM-pass roots). pub fn add_from_dir(&mut self, pod_dir: P, bams_pass: Option) -> anyhow::Result<()> where P: AsRef, Q: AsRef, { let mut new_run = Pod5sRun::load_from_dir(&pod_dir)?; new_run.bams_pass = bams_pass.map(|p| p.as_ref().to_path_buf()); // Try to find an existing run with same identifiers if let Some(existing) = self.data.iter_mut().find(|r| { r.run_id == new_run.run_id && r.flow_cell_id == new_run.flow_cell_id && r.sequencing_kit == new_run.sequencing_kit }) { // --- merge bams_pass --- match (&existing.bams_pass, &new_run.bams_pass) { (Some(old), Some(new)) if old != new => { anyhow::bail!( "Conflicting bam_pass for run {} (flowcell {}): \ existing='{}', new='{}'", existing.run_id, existing.flow_cell_id, old.display(), new.display() ); } (None, Some(new)) => { existing.bams_pass = Some(new.clone()); } _ => { // (Some, None) or (None, None) or (Some, Some equal): nothing to do } } // Build a set of existing Pod5 file names let mut existing_names: HashSet = existing .pod5s .iter() .filter_map(|p| p.path.file_name().map(|n| n.to_string_lossy().into_owned())) .collect(); // Keep only Pod5 entries with a new file name new_run.pod5s.retain(|p| { if let Some(name_os) = p.path.file_name() { let name = name_os.to_string_lossy().to_string(); if existing_names.contains(&name) { // duplicate -> skip false } else { existing_names.insert(name); true } } else { // No file name, keep it (or change to false if you prefer to drop these) true } }); // Merge the unique new Pod5s into the existing run existing.pod5s.extend(new_run.pod5s); Ok(()) } else { // No matching run: add as a new entry self.data.push(new_run); Ok(()) } } /// Save metadata (not raw POD5s) as JSON. pub fn save_json>(&self, path: P) -> anyhow::Result<()> { let s = serde_json::to_string_pretty(self)?; fs::write(path, s)?; Ok(()) } /// Load metadata from a saved JSON file. pub fn load_json>(path: P) -> anyhow::Result { let data = std::fs::read_to_string(path)?; Ok(serde_json::from_str(&data)?) } /// Add a run from an ONT run directory. /// /// Layout assumed: /// - `bam_pass/` โ†’ attached to `bams_pass` if present. /// - `pod5_pass/barcode*/*.pod5` and `pod5_recovered/*.pod5` โ†’ added to `pod5s`. pub fn add_run_dir>(&mut self, run_dir: P) -> anyhow::Result<()> { let run_dir = run_dir.as_ref(); // --- collect POD5 directories --- let mut pod_dirs: Vec = Vec::new(); // pod5_pass/barcode*/ subdirectories let pod5_pass = run_dir.join("pod5_pass"); if pod5_pass.is_dir() { for entry in fs::read_dir(&pod5_pass)? { let entry = entry?; if entry.file_type()?.is_dir() { let name = entry.file_name(); if name.to_string_lossy().starts_with("barcode") { pod_dirs.push(entry.path()); } } } } // pod5_recovered/ let pod5_recovered = run_dir.join("pod5_recovered"); if pod5_recovered.is_dir() { pod_dirs.push(pod5_recovered); } if pod_dirs.is_empty() { anyhow::bail!( "No POD5 directories (pod5_pass/barcode*/ or pod5_recovered/) found under {}", run_dir.display() ); } // --- bam_pass directory (optional) --- let bam_pass_dir = run_dir.join("bam_pass"); let bams_pass = if bam_pass_dir.is_dir() { Some(bam_pass_dir) } else { None }; // Build the new run from all POD5 directories let mut new_run = Pod5sRun::load_from_dirs(&pod_dirs)?; new_run.bams_pass = bams_pass; // --- merge logic identical to `add_from_dir` --- if let Some(existing) = self.data.iter_mut().find(|r| { r.run_id == new_run.run_id && r.flow_cell_id == new_run.flow_cell_id && r.sequencing_kit == new_run.sequencing_kit }) { // merge bams_pass match (&existing.bams_pass, &new_run.bams_pass) { (Some(old), Some(new)) if old != new => { anyhow::bail!( "Conflicting bam_pass for run {} (flowcell {}): \ existing='{}', new='{}'", existing.run_id, existing.flow_cell_id, old.display(), new.display() ); } (None, Some(new)) => { existing.bams_pass = Some(new.clone()); } _ => { // (Some, None) or (None, None) or (Some, Some equal): nothing to do } } // merge pod5s by file name, skipping duplicates use std::collections::HashSet; let mut existing_names: HashSet = existing .pod5s .iter() .filter_map(|p| p.path.file_name().map(|n| n.to_string_lossy().into_owned())) .collect(); new_run.pod5s.retain(|p| { if let Some(name_os) = p.path.file_name() { let name = name_os.to_string_lossy().to_string(); if existing_names.contains(&name) { false } else { existing_names.insert(name); true } } else { true } }); existing.pod5s.extend(new_run.pod5s); Ok(()) } else { // No matching run: add as new entry self.data.push(new_run); Ok(()) } } } #[cfg(test)] mod tests { use crate::helpers::test_init; use super::*; #[test] fn load_pod5s() -> anyhow::Result<()> { test_init(); let dir = "/mnt/beegfs02/scratch/t_steimle/prom_runs/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/pod5_recovered"; let flow_cell = Pod5sRun::load_from_dir(dir)?; println!("{:#?}", flow_cell.pod5s.first()); let stats = flow_cell.stats(); println!("{stats}"); Ok(()) } #[test] fn load_prom_run() -> anyhow::Result<()> { test_init(); let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A"; let mut runs = Pod5sRuns::new(); runs.add_run_dir(dir)?; let stats = runs.data[0].stats(); println!("{runs:#?}"); println!("{stats}"); Ok(()) } }