| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- use anyhow::{anyhow, Context, Result};
- use chrono::{DateTime, Utc};
- use csv::ReaderBuilder;
- use glob::glob;
- use hashbrown::HashMap;
- use log::{info, warn};
- use rayon::prelude::*;
- use serde::{Deserialize, Serialize};
- use std::{
- fmt::Display,
- fs::{self, File, Metadata},
- os::unix::fs::MetadataExt,
- path::{Path, PathBuf},
- };
- use crate::io::pod5_infos::Pod5Info;
- /// Represents a collection of Pod5 sequencing runs and associated metadata.
- ///
- /// A `Pod5Collection` groups multiple sequencing runs (`Run`), each consisting of
- /// one or more flow cells. It is initialized by scanning a directory of `.pod5` files,
- /// optionally mapping flow cell names to corrected identifiers, and assigning BAM and
- /// `.pod5` directories.
- ///
- /// # Fields
- /// - `importation_date`: Timestamp of when this collection was created.
- /// - `runs`: List of runs with associated flow cells and metadata.
- /// - `bam_dir`: Directory containing BAM files.
- /// - `pod5_dir`: Directory containing `.pod5` files.
- #[derive(Debug, Default)]
- pub struct Pod5Collection {
- pub importation_date: DateTime<Utc>,
- pub runs: Vec<Run>,
- pub bam_dir: String,
- pub pod5_dir: String,
- }
- impl Pod5Collection {
- /// Constructs a new `Pod5Collection` by scanning the given `.pod5` directory,
- /// applying corrected flowcell naming, and grouping data by run.
- ///
- /// # Arguments
- /// - `pod5_dir`: Path to directory containing `.pod5` files.
- /// - `corrected_fc_path`: Path to file with corrected flowcell mappings.
- /// - `bam_dir`: Path to directory containing BAM files.
- ///
- /// # Returns
- /// - `Ok(Pod5Collection)` if the data is consistent and valid.
- /// - `Err(anyhow::Error)` if listing, parsing, or validation fails.
- pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result<Self> {
- // Load pod5 files
- let pod5_files = list_pod_files(pod5_dir)?;
- info!("n pod5 {}", pod5_files.len());
- // Group pod5 files by run-flowcell key
- let mut grouped: HashMap<String, Vec<Pod5>> = HashMap::new();
- for pod in pod5_files {
- let key = format!("{}••{}", pod.run_name, pod.flowcell_name);
- grouped.entry(key).or_default().push(pod);
- }
- // Load corrected flowcell mapping
- let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?;
- // Construct FlowCells in parallel from Pod5 groups
- let flowcells: Vec<FlowCell> = grouped
- .into_values()
- .par_bridge()
- .map(|group| FlowCell::new(group, &corrected_fc))
- .collect::<Result<Vec<_>>>()?;
- // Group FlowCells by run_name (sequential step)
- let mut runs_map: HashMap<String, Vec<FlowCell>> = HashMap::new();
- for fc in flowcells {
- runs_map.entry(fc.run_name.clone()).or_default().push(fc);
- }
- // Convert each run group into a Run
- let runs: Vec<Run> = runs_map
- .into_values()
- .map(|fcs| Run {
- run_name: fcs[0].run_name.clone(),
- flowcells: fcs,
- })
- .collect();
- Ok(Self {
- importation_date: Utc::now(),
- runs,
- bam_dir: bam_dir.to_string(),
- pod5_dir: pod5_dir.to_string(),
- })
- }
- pub fn print_info(&self) {
- self.runs.iter().for_each(|run| {
- run.flowcells.iter().for_each(|fc| {
- let total_size: u64 = fc.pod5.iter().map(|p| p.file_metadata.size()).sum();
- let n_files = fc.pod5.len();
- let dates: Vec<DateTime<Utc>> = fc
- .pod5
- .iter()
- .map(|p| p.file_metadata.modified().unwrap().into())
- .collect();
- let from = dates.iter().min().unwrap();
- let to = dates.iter().max().unwrap();
- let s = [
- run.run_name.clone(),
- from.to_string(),
- to.to_string(),
- n_files.to_string(),
- total_size.to_string(),
- fc.flowcell_name.to_string(),
- fc.pod5_type.to_string(),
- fc.pod5_info.acquisition_id.clone(),
- format!("{:?}", fc.cases),
- ]
- .join("\t");
- println!("{s}");
- });
- });
- }
- /// Returns a sorted and deduplicated list of all unique `FlowCellCase` IDs in the collection.
- pub fn ids(&self) -> Vec<String> {
- let mut ids: Vec<String> = self
- .runs
- .iter()
- .flat_map(|r| r.flowcells.iter())
- .flat_map(|f| f.cases.iter().map(|c| c.id.clone()))
- .collect();
- ids.sort_unstable(); // faster than sort()
- ids.dedup();
- ids
- }
- }
- /// Represents a sequencing run, which may contain multiple flowcells.
- ///
- /// A `Run` groups flowcells that were processed together during a sequencing event
- /// (e.g., a MinION or PromethION run). It serves as a logical grouping for downstream analysis.
- ///
- /// # Fields
- /// - `run_name`: Unique identifier for the sequencing run (e.g., "20240301_RUN42").
- /// - `flowcells`: List of `FlowCell` objects associated with this run.
- #[derive(Debug)]
- pub struct Run {
- /// Name of the sequencing run.
- pub run_name: String,
- /// Flowcells that belong to this run.
- pub flowcells: Vec<FlowCell>,
- }
- /// Represents a flowcell and its associated metadata, cases, and `.pod5` files.
- ///
- /// A `FlowCell` encapsulates all relevant information needed to track,
- /// identify, and process a physical flowcell, including its corrected name,
- /// acquisition metadata, and associated `.pod5` files.
- ///
- /// # Fields
- /// - `flowcell_name`: Original name of the flowcell as found in `.pod5` files.
- /// - `corrected_name`: Normalized or corrected version of the flowcell name (if available).
- /// - `cases`: Associated cases (`FlowCellCase`) for this flowcell, usually representing samples or barcodes.
- /// - `run_name`: Name of the sequencing run this flowcell belongs to.
- /// - `pod5_type`: Whether the `.pod5` files are raw or demultiplexed (`Pod5Type`).
- /// - `pod5_info`: Metadata extracted from one representative `.pod5` file (`Pod5Info`).
- /// - `pod5`: All `.pod5` file entries associated with this flowcell.
- #[derive(Debug, Clone)]
- pub struct FlowCell {
- /// Original flowcell name (e.g., "FCX123").
- pub flowcell_name: String,
- /// Corrected flowcell name, if normalization was applied.
- pub corrected_name: String,
- /// Sample/barcode-level associations for this flowcell.
- pub cases: Vec<FlowCellCase>,
- /// The sequencing run this flowcell belongs to.
- pub run_name: String,
- /// Type of pod5 data: raw or demuxed.
- pub pod5_type: Pod5Type,
- /// Metadata extracted from a `.pod5` file, including acquisition ID.
- pub pod5_info: Pod5Info,
- /// The list of `.pod5` files linked to this flowcell.
- pub pod5: Vec<Pod5>,
- }
- impl FlowCell {
- /// Constructs a new `FlowCell` from a non-empty vector of `Pod5` entries
- /// and a list of corrected flowcell mappings.
- ///
- /// Ensures that all entries in the vector share the same `run_name`, `flowcell_name`, and `pod5_type`.
- ///
- /// # Arguments
- /// - `pods`: A non-empty vector of `Pod5` entries (moved, not cloned).
- /// - `corrected_fc`: Reference to a list of `FCLine` entries for resolving corrected names.
- ///
- /// # Errors
- /// Returns an error if:
- /// - `pods` is empty
- /// - `.pod5` path is invalid UTF-8
- /// - inconsistent metadata across pod5 entries
- /// - multiple corrected names are found
- /// - parent directory resolution fails
- pub fn new(pods: Vec<Pod5>, corrected_fc: &[FCLine]) -> anyhow::Result<Self> {
- let first = pods.first().context("Empty pod5 list for FlowCell")?;
- let flowcell_name = &first.flowcell_name;
- let run_name = &first.run_name;
- let pod5_type = &first.pod5_type;
- // Consistency check
- let inconsistent = pods.iter().any(|p| {
- p.flowcell_name != *flowcell_name
- || p.run_name != *run_name
- || p.pod5_type != *pod5_type
- });
- if inconsistent {
- return Err(anyhow!(
- "Inconsistent pod5 metadata: all entries must share the same run_name, flowcell_name, and pod5_type"
- ));
- }
- // Extract and validate .pod5 path
- let path_str = first.path.to_str().context("Invalid UTF-8 in pod5 path")?;
- let pod5_info = Pod5Info::from_pod5(path_str);
- // Select corrected entries for this flowcell
- let matched_fc_lines: Vec<_> = corrected_fc
- .iter()
- .filter(|e| e.flow_cell == *flowcell_name)
- .cloned()
- .collect();
- // Resolve unique corrected name
- let corrected_name = {
- let mut names: Vec<_> = matched_fc_lines
- .iter()
- .map(|e| e.ref_flow_cell.clone())
- .filter(|s| !s.is_empty())
- .collect();
- names.dedup();
- match names.len() {
- 0 => String::new(),
- 1 => names[0].clone(),
- _ => {
- return Err(anyhow!(
- "Multiple corrected names for flow cell '{}': {:?}",
- flowcell_name,
- names
- ));
- }
- }
- };
- // Cache parent directories
- let raw_parent = first.path.parent().context("Missing parent for RAW pod5")?;
- let demuxed_grandparent = raw_parent
- .parent()
- .context("Invalid directory structure for DEMUXED pod5")?;
- // Build case list
- let cases = matched_fc_lines
- .iter()
- .map(|e| {
- let pod_dir = match pod5_type {
- Pod5Type::Raw => raw_parent.to_path_buf(),
- Pod5Type::Demuxed => {
- let mut bc_dir = demuxed_grandparent.to_path_buf();
- bc_dir.push(format!("barcode{}", e.barcode_number.replace("NB", "")));
- bc_dir
- }
- };
- Ok(FlowCellCase {
- id: e.id.clone(),
- time_point: e.sample_type.clone(),
- barcode: e.barcode_number.clone(),
- pod_dir,
- })
- })
- .collect::<Result<_>>()?;
- Ok(Self {
- flowcell_name: flowcell_name.clone(),
- corrected_name,
- cases,
- run_name: run_name.clone(),
- pod5_type: pod5_type.clone(),
- pod5_info,
- pod5: pods, // Already moved
- })
- }
- }
- /// Represents the type of `.pod5` file: either raw or demultiplexed.
- #[derive(Debug, Clone, PartialEq)]
- pub enum Pod5Type {
- /// Raw `.pod5` files directly from acquisition.
- Raw,
- /// Demultiplexed `.pod5` files, post-processed by barcoding.
- Demuxed,
- }
- impl Display for Pod5Type {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let s = match self {
- Pod5Type::Raw => "raw",
- Pod5Type::Demuxed => "demuxed",
- };
- f.write_str(s)
- }
- }
- /// Configuration for interpreting file paths when parsing `.pod5` files.
- #[derive(Debug, Clone)]
- pub struct Pod5Config {
- /// Base directory (prefix to strip from full paths).
- pub base_dir: String,
- /// Substring used to detect "raw" pod5 files.
- pub type_raw: String,
- /// Substring used to detect "demuxed" pod5 files.
- pub type_demuxed: String,
- /// Index (in path components) where `run_name` is expected.
- pub run_dir_n: u8,
- /// Index (in path components) where `flowcell_name` is expected.
- pub flowcell_dir_n: u8,
- }
- impl Default for Pod5Config {
- fn default() -> Self {
- Self {
- base_dir: "/data/run_data".to_string(),
- type_raw: "/pod5/".to_string(),
- type_demuxed: "/pod5_pass/".to_string(),
- run_dir_n: 0,
- flowcell_dir_n: 1,
- }
- }
- }
- /// Represents a `.pod5` file and its associated metadata and location info.
- ///
- /// Used as the base object for flowcell and run aggregation.
- #[derive(Debug, Clone)]
- pub struct Pod5 {
- /// Full path to the `.pod5` file.
- pub path: PathBuf,
- /// Whether the file is raw or demultiplexed.
- pub pod5_type: Pod5Type,
- /// Name of the sequencing run this file belongs to.
- pub run_name: String,
- /// Name of the flowcell associated with this file.
- pub flowcell_name: String,
- /// Filesystem metadata (e.g., size, modified time).
- pub file_metadata: Metadata,
- }
- impl Pod5 {
- /// Constructs a `Pod5` instance from a file path, using a `Pod5Config` to infer type and extract metadata.
- ///
- /// # Arguments
- /// - `path`: Path to the `.pod5` file.
- /// - `config`: Configuration used to interpret the path structure.
- ///
- /// # Returns
- /// - `Ok(Pod5)` if type and components can be extracted.
- /// - `Err` if path is malformed, missing components, or type is unrecognized.
- pub fn from_path(path: impl AsRef<Path>, config: &Pod5Config) -> Result<Self> {
- let path = path.as_ref();
- let path_str = path
- .to_str()
- .context(format!("Can't convert path to UTF-8 string: {:?}", path))?;
- // Determine Pod5 type by pattern matching
- let pod5_type = if path_str.contains(&config.type_raw) {
- Pod5Type::Raw
- } else if path_str.contains(&config.type_demuxed) {
- Pod5Type::Demuxed
- } else {
- return Err(anyhow!(
- "Unable to determine pod5 type from path: {}",
- path_str
- ));
- };
- // Extract metadata from filesystem
- let file_metadata =
- fs::metadata(path).with_context(|| format!("Failed to get metadata for {:?}", path))?;
- // Strip base_dir and split into components
- let relative_path = path_str.strip_prefix(&config.base_dir).unwrap_or(path_str); // fallback to full path if base_dir is not a prefix
- let components: Vec<&str> = relative_path.split('/').filter(|c| !c.is_empty()).collect();
- // Extract run_name and flowcell_name from path components
- let run_name = components
- .get(config.run_dir_n as usize)
- .context("Missing run_name in path")?
- .to_string();
- let flowcell_name = components
- .get(config.flowcell_dir_n as usize)
- .context("Missing flowcell_name in path")?
- .to_string();
- Ok(Self {
- path: path.to_path_buf(),
- pod5_type,
- run_name,
- flowcell_name,
- file_metadata,
- })
- }
- }
- /// Recursively scans a directory for `.pod5` files and parses them into `Pod5` objects.
- ///
- /// This function uses glob-based search to find all `.pod5` files under the given directory
- /// (including subdirectories), then filters out unwanted paths (e.g., `pod5_fail/`, `pod5_skip/`)
- /// and attempts to parse each remaining file using `Pod5::from_path`.
- ///
- /// Any file that fails to parse is skipped with a warning.
- ///
- /// # Arguments
- /// - `dir`: Path to the root directory to search (absolute or relative).
- ///
- /// # Returns
- /// - `Ok(Vec<Pod5>)` on success, with all successfully parsed `.pod5` files.
- /// - `Err(anyhow::Error)` if path parsing fails (e.g., invalid UTF-8).
- ///
- /// # Errors
- /// - Fails early if the glob pattern itself is invalid.
- /// - Skips over files that fail to parse, but logs warnings.
- ///
- /// # Notes
- /// - Directories containing `/pod5_fail/` or `/pod5_skip/` are excluded.
- /// - The glob pattern used is `{dir}/**/*.pod5`.
- ///
- /// # Example
- /// ```
- /// let pod_files = list_pod_files("/data/pods")?;
- /// for pod in pod_files {
- /// println!("{}", pod.path.display());
- /// }
- /// ```
- pub fn list_pod_files(dir: &str) -> Result<Vec<Pod5>> {
- let pattern = format!("{}/**/*.pod5", dir);
- let mut pod_files = Vec::new();
- let conf = Pod5Config {
- base_dir: if dir.ends_with('/') {
- dir.to_string()
- } else {
- format!("{dir}/")
- },
- ..Pod5Config::default()
- };
- for entry in glob(&pattern).expect("Failed to read glob pattern") {
- match entry {
- Ok(path) => {
- let p = path.to_str().context("Can't parse path to string {path}")?;
- if p.contains("/pod5_fail/") || p.contains("/pod5_skip/") {
- continue;
- }
- match Pod5::from_path(&path, &conf) {
- Ok(pod5) => pod_files.push(pod5),
- Err(e) => warn!("{e}"),
- }
- }
- Err(e) => warn!("Error: {:?}", e),
- }
- }
- Ok(pod_files)
- }
- // impl FlowCell {
- // pub fn cases_pod5_dir(&self) -> Vec<PathBuf> {
- // match self.pod5_type {
- // Pod5Type::Raw => {
- // let p = self.pod5.first().unwrap();
- // vec![p.path.parent().unwrap().to_path_buf()]
- // },
- // Pod5Type::Demuxed => {
- // self.cases.iter().map(|c| {
- // let str_barcode = format!("barcode{}", c.barcode);
- // })
- // },
- // }
- // }
- // }
- #[derive(Debug, Clone, Default)]
- pub struct FlowCellCase {
- pub id: String,
- pub time_point: String,
- pub barcode: String,
- pub pod_dir: PathBuf,
- // pub basecalled: Option<bool>,
- }
- // #[derive(Debug, Serialize, Deserialize, Clone)]
- // pub struct IdsInput {
- // pub data: Vec<IdInput>,
- // }
- //
- // #[derive(Debug, Serialize, Deserialize, Clone)]
- // pub struct IdInput {
- // pub id: String,
- // pub time_point: String,
- // pub barcode: String,
- // pub flow_cell: String,
- // pub run: String,
- // }
- //
- // // Implement PartialEq and Eq for IdInput
- // impl PartialEq for IdInput {
- // fn eq(&self, other: &Self) -> bool {
- // self.id == other.id
- // && self.time_point == other.time_point
- // && self.barcode == other.barcode
- // && self.flow_cell == other.flow_cell
- // && self.run == other.run
- // }
- // }
- //
- // impl Eq for IdInput {}
- //
- // // Implement Hash for IdInput
- // impl Hash for IdInput {
- // fn hash<H: Hasher>(&self, state: &mut H) {
- // self.id.hash(state);
- // self.time_point.hash(state);
- // self.barcode.hash(state);
- // self.flow_cell.hash(state);
- // self.run.hash(state);
- // }
- // }
- //
- // impl IdsInput {
- // pub fn load_json(path: &str) -> anyhow::Result<Self> {
- // let f = File::open(path)?;
- // let s: Self = serde_json::from_reader(f)?;
- // Ok(s)
- // }
- //
- // pub fn save_json(&self, path: &str) -> anyhow::Result<()> {
- // let f = File::create(path)?;
- // serde_json::to_writer(f, self)?;
- // Ok(())
- // }
- //
- // pub fn dedup(&mut self) {
- // let mut unique = HashSet::new();
- // self.data.retain(|item| unique.insert(item.clone()));
- // }
- //
- // pub fn load_from_tsv(path: &str) -> anyhow::Result<Self> {
- // let inputs = load_flowcells_corrected_names(path)?;
- // let data = inputs
- // .iter()
- // .map(|line| IdInput {
- // id: line.id.to_string(),
- // time_point: line.sample_type.to_string(),
- // barcode: line.barcode_number.to_string(),
- // flow_cell: line.flow_cell.to_string(),
- // run: line.run.to_string(),
- // })
- // .collect();
- //
- // let mut res = Self { data };
- // res.dedup();
- // Ok(res)
- // }
- //
- // pub fn add_input(&mut self, values: IdInput) {
- // self.data.push(values);
- // self.dedup();
- // }
- // }
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct Pod5Run {
- pub protocol_run_id: String,
- pub position_id: String,
- pub flow_cell_id: String,
- pub id: String,
- pub time_point: String,
- pub barcode_number: String,
- pub flow_cell: String,
- pub run: String,
- pub last_pod_dir: (DateTime<Utc>, String),
- pub archives: Vec<(String, DateTime<Utc>, String)>,
- }
- /// Loads corrected flowcell metadata from a tab-delimited file.
- ///
- /// This function parses a TSV file where each row is deserialized into an `FCLine`.
- /// It also normalizes some fields (e.g., lowercases `sample_type`, uppercases `id`)
- /// for consistency in downstream processing.
- ///
- /// # Arguments
- /// - `file_path`: Path to the TSV file containing flowcell correction data.
- ///
- /// # Returns
- /// A vector of `FCLine` records, one per line in the file.
- ///
- /// # Errors
- /// Returns an error if the file cannot be opened or if any line fails to deserialize.
- ///
- /// # Expected Format (TSV with header)
- /// ```text
- /// id sample_type barcode_number flow_cell run_path ref_flow_cell
- /// P001X03 tumoral NB01 FC123 RUN123 /path/to/data FC123_CORR
- /// ```
- ///
- /// # Example
- /// ```
- /// let fc_lines = load_flowcells_corrected_names("flowcells.tsv")?;
- /// assert!(!fc_lines.is_empty());
- /// ```
- pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result<Vec<FCLine>> {
- let file = File::open(file_path)?;
- let mut rdr = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(true)
- .from_reader(file);
- let mut records = Vec::new();
- for result in rdr.deserialize() {
- let mut record: FCLine = result?;
- // formating
- record.sample_type = record.sample_type.to_lowercase();
- record.id = record.id.to_uppercase();
- records.push(record);
- }
- Ok(records)
- }
- /// Represents a single record describing a barcode-flowcell pairing,
- /// including original and corrected metadata.
- ///
- /// This struct is typically deserialized from a TSV file and used to map
- /// `.pod5` files to metadata like corrected flowcell names and experimental time points.
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct FCLine {
- /// Unique identifier for the sample or barcode group (e.g., "P001X03").
- pub id: String,
- /// Sample type associated with this record (e.g., "normal", "tumoral").
- pub sample_type: String,
- /// The barcode number (e.g., "NB01", "NB02").
- pub barcode_number: String,
- /// Original flowcell name as found in the raw `.pod5` metadata.
- pub flow_cell: String,
- /// Sequencing run name this flowcell belongs to (e.g., "20240101_FAB123").
- pub run: String,
- /// Original path to data (can be absolute or relative).
- pub path: String,
- /// Corrected flowcell name used to resolve naming inconsistencies.
- pub ref_flow_cell: String,
- }
|