| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646 |
- use std::{
- collections::HashSet,
- fmt, fs,
- path::{Path, PathBuf},
- };
- use chrono::{DateTime, Utc};
- use serde::{Deserialize, Serialize};
- use crate::{
- collection::flowcells::IdInput,
- helpers::{human_size, list_files_with_ext},
- io::pod5_infos::Pod5Info,
- };
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct Pod5 {
- pub name: String,
- pub file_size: u64,
- pub path: PathBuf,
- pub acquisition_id: String,
- pub acquisition_start_time: DateTime<Utc>,
- pub adc_max: i16,
- pub adc_min: i16,
- pub experiment_name: String,
- pub flow_cell_id: String,
- pub flow_cell_product_code: String,
- pub protocol_name: String,
- pub protocol_run_id: String,
- pub protocol_start_time: DateTime<Utc>,
- pub sample_id: String,
- pub sample_rate: u16,
- pub sequencing_kit: String,
- pub sequencer_position: String,
- pub sequencer_position_type: String,
- pub software: String,
- pub system_name: String,
- pub system_type: String,
- }
- impl fmt::Display for Pod5 {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "📁 {}", self.name)?;
- writeln!(f, " size : {} bytes", self.file_size)?;
- writeln!(f, " path : {}", self.path.display())?;
- writeln!(f, " experiment: {}", self.experiment_name)?;
- writeln!(
- f,
- " flow cell : {} ({})",
- self.flow_cell_id, self.flow_cell_product_code
- )?;
- writeln!(f, "🧪 Sample")?;
- writeln!(f, " id : {}", self.sample_id)?;
- writeln!(f, " kit : {}", self.sequencing_kit)?;
- writeln!(f, " rate : {} Hz", self.sample_rate)?;
- writeln!(f, "🔬 Acquisition")?;
- writeln!(f, " id : {}", self.acquisition_id)?;
- writeln!(f, " start : {}", self.acquisition_start_time)?;
- writeln!(f, " ADC : [{} .. {}]", self.adc_min, self.adc_max)?;
- writeln!(f, "⚙️ Protocol")?;
- writeln!(f, " name : {}", self.protocol_name)?;
- writeln!(f, " run id : {}", self.protocol_run_id)?;
- writeln!(f, " started : {}", self.protocol_start_time)?;
- writeln!(f, "🖥️ System")?;
- writeln!(f, " {} / {}", self.system_name, self.system_type)?;
- writeln!(f, " software : {}", self.software)
- }
- }
- impl Pod5 {
- /// Construct a `Pod5` from a filesystem path.
- ///
- /// This loads the metadata using `Pod5Info::from_pod5` and fills the
- /// corresponding fields in `Pod5`.
- pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
- let path_ref = path.as_ref();
- // Convert path to string, returning an error if it contains invalid UTF-8
- let path_str = path_ref.to_str().ok_or_else(|| {
- anyhow::anyhow!("Path contains invalid UTF-8: {}", path_ref.display())
- })?;
- // Pod5Info::from_pod5 now returns Result
- let info = Pod5Info::from_pod5(path_str)?;
- let file_size = std::fs::metadata(path_ref)
- .map_err(|e| {
- anyhow::anyhow!(
- "Failed to read metadata for '{}': {}",
- path_ref.display(),
- e
- )
- })?
- .len();
- Ok(Self {
- name: path_ref
- .file_name()
- .and_then(|s| s.to_str())
- .unwrap_or("")
- .to_string(),
- file_size,
- path: PathBuf::from(path_ref),
- acquisition_id: info.acquisition_id,
- acquisition_start_time: info.acquisition_start_time,
- adc_max: info.adc_max,
- adc_min: info.adc_min,
- experiment_name: info.experiment_name,
- flow_cell_id: info.flow_cell_id,
- flow_cell_product_code: info.flow_cell_product_code,
- protocol_name: info.protocol_name,
- protocol_run_id: info.protocol_run_id,
- protocol_start_time: info.protocol_start_time,
- sample_id: info.sample_id,
- sample_rate: info.sample_rate,
- sequencing_kit: info.sequencing_kit,
- sequencer_position: info.sequencer_position,
- sequencer_position_type: info.sequencer_position_type,
- software: info.software,
- system_name: info.system_name,
- system_type: info.system_type,
- })
- }
- }
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct Pod5sRun {
- pub run_id: String,
- pub flow_cell_id: String,
- pub sequencing_kit: String,
- pub cases: Vec<IdInput>,
- pub pod5s: Vec<Pod5>,
- pub bams_pass: Option<PathBuf>,
- }
- impl fmt::Display for Pod5sRun {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "🚀 Run {}", self.run_id)?;
- writeln!(f, " Flow cell : {}", self.flow_cell_id)?;
- writeln!(f, " Sequencing kit : {}", self.sequencing_kit)?;
- writeln!(f, " Cases : {}", self.cases.len())?;
- for c in &self.cases {
- writeln!(f, " • {}", c)?;
- }
- writeln!(
- f,
- " Pod5 files : {} (showing 0 details)",
- self.pod5s.len()
- )?;
- if let Some(ref bam) = self.bams_pass {
- writeln!(f, " BAM pass : {}", bam.display())?;
- }
- Ok(())
- }
- }
- impl Pod5sRun {
- /// Load all `.pod5` files from a directory and build a collection.
- pub fn load_from_dir<P: AsRef<Path>>(dir: P) -> anyhow::Result<Self> {
- Self::load_from_dirs(std::iter::once(dir))
- }
- /// Load all `.pod5` files from multiple directories and build a collection.
- ///
- /// All files must share the same `run_id`, `flow_cell_id` and `sequencing_kit`.
- pub fn load_from_dirs<P, I>(dirs: I) -> anyhow::Result<Self>
- where
- P: AsRef<Path>,
- I: IntoIterator<Item = P>,
- {
- let mut pod5s = Vec::new();
- let mut flow_cell_id: Option<String> = None;
- let mut sequencing_kit: Option<String> = None;
- let mut run_id: Option<String> = None;
- let mut skipped = 0usize;
- let mut any_pod_files = false;
- for dir in dirs {
- let dir = dir.as_ref();
- let pod_paths = list_files_with_ext(dir, "pod5")?;
- if pod_paths.is_empty() {
- log::debug!("No .pod5 files found in directory: {}", dir.display());
- continue;
- }
- any_pod_files = true;
- for p in pod_paths.iter() {
- let pod = match Pod5::from_path(p) {
- Ok(pod) => pod,
- Err(e) => {
- log::debug!("Skipping corrupted POD5 file '{}': {}", p.display(), e);
- skipped += 1;
- continue;
- }
- };
- // run_id uniqueness check
- match &run_id {
- None => run_id = Some(pod.protocol_run_id.clone()),
- Some(exp) if &pod.protocol_run_id != exp => {
- anyhow::bail!(
- "Mixed run IDs: expected '{}', found '{}' (file: {})",
- exp,
- pod.protocol_run_id,
- pod.path.display()
- );
- }
- _ => {}
- }
- // flow_cell_id uniqueness check
- match &flow_cell_id {
- None => flow_cell_id = Some(pod.flow_cell_id.clone()),
- Some(exp) if &pod.flow_cell_id != exp => {
- anyhow::bail!(
- "Mixed flow cells: expected '{}', found '{}' (file: {})",
- exp,
- pod.flow_cell_id,
- pod.path.display()
- );
- }
- _ => {}
- }
- // sequencing_kit uniqueness check
- match &sequencing_kit {
- None => sequencing_kit = Some(pod.sequencing_kit.clone()),
- Some(exp) if &pod.sequencing_kit != exp => {
- anyhow::bail!(
- "Mixed sequencing kits: expected '{}', found '{}' (file: {})",
- exp,
- pod.sequencing_kit,
- pod.path.display()
- );
- }
- _ => {}
- }
- pod5s.push(pod);
- }
- }
- if !any_pod_files {
- anyhow::bail!("No .pod5 files found in any directory");
- }
- let run_id = run_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
- let flow_cell_id =
- flow_cell_id.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
- let sequencing_kit =
- sequencing_kit.ok_or_else(|| anyhow::anyhow!("No valid pod5 files loaded"))?;
- if skipped > 0 {
- log::debug!(
- "Skipped {} corrupted POD5 file(s) across directories",
- skipped
- );
- }
- Ok(Self {
- run_id,
- flow_cell_id,
- sequencing_kit,
- cases: Vec::new(),
- pod5s,
- bams_pass: None,
- })
- }
- pub fn add_id_input(&mut self, id_input: IdInput) {
- self.cases.push(id_input);
- }
- /// Compute summary statistics for the collection.
- pub fn stats(&self) -> Pod5sFlowCellStats {
- if self.pod5s.is_empty() {
- return Pod5sFlowCellStats {
- run_id: self.run_id.clone(),
- flow_cell_id: self.flow_cell_id.clone(),
- sequencing_kit: self.sequencing_kit.clone(),
- count: 0,
- total_size: 0,
- min_acq: None,
- max_acq: None,
- min_protocol: None,
- max_protocol: None,
- avg_sample_rate: None,
- };
- }
- let count = self.pod5s.len();
- let total_size = self.pod5s.iter().map(|p| p.file_size).sum();
- let (min_acq, max_acq) = self.pod5s.iter().map(|p| p.acquisition_start_time).fold(
- (
- self.pod5s[0].acquisition_start_time,
- self.pod5s[0].acquisition_start_time,
- ),
- |(minv, maxv), t| (minv.min(t), maxv.max(t)),
- );
- let (min_protocol, max_protocol) = self.pod5s.iter().map(|p| p.protocol_start_time).fold(
- (
- self.pod5s[0].protocol_start_time,
- self.pod5s[0].protocol_start_time,
- ),
- |(minv, maxv), t| (minv.min(t), maxv.max(t)),
- );
- let avg_sample_rate =
- Some(self.pod5s.iter().map(|p| p.sample_rate as f64).sum::<f64>() / count as f64);
- Pod5sFlowCellStats {
- run_id: self.run_id.clone(),
- flow_cell_id: self.flow_cell_id.clone(),
- sequencing_kit: self.sequencing_kit.clone(),
- count,
- total_size,
- min_acq: Some(min_acq),
- max_acq: Some(max_acq),
- min_protocol: Some(min_protocol),
- max_protocol: Some(max_protocol),
- avg_sample_rate,
- }
- }
- }
- #[derive(Debug, Clone)]
- pub struct Pod5sFlowCellStats {
- pub run_id: String,
- pub flow_cell_id: String,
- pub sequencing_kit: String,
- pub count: usize,
- pub total_size: u64,
- pub min_acq: Option<DateTime<Utc>>,
- pub max_acq: Option<DateTime<Utc>>,
- pub min_protocol: Option<DateTime<Utc>>,
- pub max_protocol: Option<DateTime<Utc>>,
- pub avg_sample_rate: Option<f64>,
- }
- impl fmt::Display for Pod5sFlowCellStats {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(f, "Pod5 Flow Cell Stats")?;
- writeln!(f, "---------------------")?;
- writeln!(f, "Run ID: {}", self.run_id)?;
- writeln!(f, "Flow Cell ID: {}", self.flow_cell_id)?;
- writeln!(f, "Sequencing kit: {}", self.sequencing_kit)?;
- writeln!(f, "Count: {}", self.count)?;
- writeln!(
- f,
- "Total Size: {} ({} bytes)",
- human_size(self.total_size),
- self.total_size
- )?;
- if let Some(t) = self.min_acq {
- writeln!(f, "Acquisition Start (min): {}", t)?;
- }
- if let Some(t) = self.max_acq {
- writeln!(f, "Acquisition Start (max): {}", t)?;
- }
- if let Some(t) = self.min_protocol {
- writeln!(f, "Protocol Start (min): {}", t)?;
- }
- if let Some(t) = self.max_protocol {
- writeln!(f, "Protocol Start (max): {}", t)?;
- }
- if let Some(avg) = self.avg_sample_rate {
- writeln!(f, "Average Sample Rate: {:.2}", avg)?;
- }
- Ok(())
- }
- }
- #[derive(Debug, Default, Serialize, Deserialize, Clone)]
- pub struct Pod5sRuns {
- pub data: Vec<Pod5sRun>,
- }
- impl Pod5sRuns {
- pub fn new() -> Self {
- Self::default()
- }
- /// Add a new `Pod5sRun` by scanning a directory of `.pod5` files.
- ///
- /// - Builds a `Pod5sRun` via [`Pod5sRun::load_from_dir`].
- /// - If **no** existing run has the same `(run_id, flow_cell_id, sequencing_kit)`,
- /// the new run is appended to `data`.
- /// - If a run **already exists** with these three identifiers, its `pod5s` list is
- /// **merged** with the new one:
- /// - New `Pod5` entries are only added if their file name (from `pod.path.file_name()`)
- /// does **not** already exist in the run.
- /// - Duplicate file names are silently skipped (no error).
- /// Add a new `Pod5sRun` by scanning a directory of `.pod5` files.
- ///
- /// - Builds a `Pod5sRun` via [`Pod5sRun::load_from_dir`].
- /// - Optionally attaches a `bams_pass` directory to the run.
- /// - If **no** existing run has the same `(run_id, flow_cell_id, sequencing_kit)`,
- /// the new run is appended to `data`.
- /// - If a run **already exists** with these three identifiers:
- /// - `pod5s` are merged by file name (duplicates skipped).
- /// - `bams_pass`:
- /// * if existing has `Some` and new is `None` → keep existing.
- /// * if existing is `None` and new is `Some` → set existing to new.
- /// * if both `Some` but different → error (conflicting BAM-pass roots).
- pub fn add_from_dir<P, Q>(&mut self, pod_dir: P, bams_pass: Option<Q>) -> anyhow::Result<()>
- where
- P: AsRef<Path>,
- Q: AsRef<Path>,
- {
- let mut new_run = Pod5sRun::load_from_dir(&pod_dir)?;
- new_run.bams_pass = bams_pass.map(|p| p.as_ref().to_path_buf());
- // Try to find an existing run with same identifiers
- if let Some(existing) = self.data.iter_mut().find(|r| {
- r.run_id == new_run.run_id
- && r.flow_cell_id == new_run.flow_cell_id
- && r.sequencing_kit == new_run.sequencing_kit
- }) {
- // --- merge bams_pass ---
- match (&existing.bams_pass, &new_run.bams_pass) {
- (Some(old), Some(new)) if old != new => {
- anyhow::bail!(
- "Conflicting bam_pass for run {} (flowcell {}): \
- existing='{}', new='{}'",
- existing.run_id,
- existing.flow_cell_id,
- old.display(),
- new.display()
- );
- }
- (None, Some(new)) => {
- existing.bams_pass = Some(new.clone());
- }
- _ => {
- // (Some, None) or (None, None) or (Some, Some equal): nothing to do
- }
- }
- // Build a set of existing Pod5 file names
- let mut existing_names: HashSet<String> = existing
- .pod5s
- .iter()
- .filter_map(|p| p.path.file_name().map(|n| n.to_string_lossy().into_owned()))
- .collect();
- // Keep only Pod5 entries with a new file name
- new_run.pod5s.retain(|p| {
- if let Some(name_os) = p.path.file_name() {
- let name = name_os.to_string_lossy().to_string();
- if existing_names.contains(&name) {
- // duplicate -> skip
- false
- } else {
- existing_names.insert(name);
- true
- }
- } else {
- // No file name, keep it (or change to false if you prefer to drop these)
- true
- }
- });
- // Merge the unique new Pod5s into the existing run
- existing.pod5s.extend(new_run.pod5s);
- Ok(())
- } else {
- // No matching run: add as a new entry
- self.data.push(new_run);
- Ok(())
- }
- }
- /// Save metadata (not raw POD5s) as JSON.
- pub fn save_json<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
- let s = serde_json::to_string_pretty(self)?;
- fs::write(path, s)?;
- Ok(())
- }
- /// Load metadata from a saved JSON file.
- pub fn load_json<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
- let data = std::fs::read_to_string(path)?;
- Ok(serde_json::from_str(&data)?)
- }
- /// Add a run from an ONT run directory.
- ///
- /// Layout assumed:
- /// - `bam_pass/` → attached to `bams_pass` if present.
- /// - `pod5_pass/barcode*/*.pod5` and `pod5_recovered/*.pod5` → added to `pod5s`.
- pub fn add_run_dir<P: AsRef<Path>>(&mut self, run_dir: P) -> anyhow::Result<()> {
- let run_dir = run_dir.as_ref();
- // --- collect POD5 directories ---
- let mut pod_dirs: Vec<PathBuf> = Vec::new();
- // pod5_pass/barcode*/ subdirectories
- let pod5_pass = run_dir.join("pod5_pass");
- if pod5_pass.is_dir() {
- for entry in fs::read_dir(&pod5_pass)? {
- let entry = entry?;
- if entry.file_type()?.is_dir() {
- let name = entry.file_name();
- if name.to_string_lossy().starts_with("barcode") {
- pod_dirs.push(entry.path());
- }
- }
- }
- }
- // pod5_recovered/
- let pod5_recovered = run_dir.join("pod5_recovered");
- if pod5_recovered.is_dir() {
- pod_dirs.push(pod5_recovered);
- }
- if pod_dirs.is_empty() {
- anyhow::bail!(
- "No POD5 directories (pod5_pass/barcode*/ or pod5_recovered/) found under {}",
- run_dir.display()
- );
- }
- // --- bam_pass directory (optional) ---
- let bam_pass_dir = run_dir.join("bam_pass");
- let bams_pass = if bam_pass_dir.is_dir() {
- Some(bam_pass_dir)
- } else {
- None
- };
- // Build the new run from all POD5 directories
- let mut new_run = Pod5sRun::load_from_dirs(&pod_dirs)?;
- new_run.bams_pass = bams_pass;
- // --- merge logic identical to `add_from_dir` ---
- if let Some(existing) = self.data.iter_mut().find(|r| {
- r.run_id == new_run.run_id
- && r.flow_cell_id == new_run.flow_cell_id
- && r.sequencing_kit == new_run.sequencing_kit
- }) {
- // merge bams_pass
- match (&existing.bams_pass, &new_run.bams_pass) {
- (Some(old), Some(new)) if old != new => {
- anyhow::bail!(
- "Conflicting bam_pass for run {} (flowcell {}): \
- existing='{}', new='{}'",
- existing.run_id,
- existing.flow_cell_id,
- old.display(),
- new.display()
- );
- }
- (None, Some(new)) => {
- existing.bams_pass = Some(new.clone());
- }
- _ => {
- // (Some, None) or (None, None) or (Some, Some equal): nothing to do
- }
- }
- // merge pod5s by file name, skipping duplicates
- use std::collections::HashSet;
- let mut existing_names: HashSet<String> = existing
- .pod5s
- .iter()
- .filter_map(|p| p.path.file_name().map(|n| n.to_string_lossy().into_owned()))
- .collect();
- new_run.pod5s.retain(|p| {
- if let Some(name_os) = p.path.file_name() {
- let name = name_os.to_string_lossy().to_string();
- if existing_names.contains(&name) {
- false
- } else {
- existing_names.insert(name);
- true
- }
- } else {
- true
- }
- });
- existing.pod5s.extend(new_run.pod5s);
- Ok(())
- } else {
- // No matching run: add as new entry
- self.data.push(new_run);
- Ok(())
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use crate::helpers::test_init;
- use super::*;
- #[test]
- fn load_pod5s() -> anyhow::Result<()> {
- test_init();
- let dir = "/mnt/beegfs02/scratch/t_steimle/prom_runs/A/20251117_0915_P2I-00461-A_PBI55810_22582b29/pod5_recovered";
- let flow_cell = Pod5sRun::load_from_dir(dir)?;
- println!("{:#?}", flow_cell.pod5s.first());
- let stats = flow_cell.stats();
- println!("{stats}");
- Ok(())
- }
- #[test]
- fn load_prom_run() -> anyhow::Result<()> {
- test_init();
- let dir = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A";
- let mut runs = Pod5sRuns::new();
- runs.add_run_dir(dir)?;
- let stats = runs.data[0].stats();
- println!("{runs:#?}");
- println!("{stats}");
- Ok(())
- }
- }
|