| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- use std::str::FromStr;
- use anyhow::Context;
- use serde::{Deserialize, Serialize};
- /// Represents a single entry from a MinKNOW sample sheet CSV file.
- ///
- /// This structure captures the metadata associated with a sample flowcell
- /// as defined by the Oxford Nanopore MinKNOW software.
- ///
- /// Expected format (CSV):
- /// ```text
- /// protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit
- /// 0ef3f65c-aa2b-4936-b49b-e55d361e9d85,1F,PBC97196,Projet_143,Projet_143,FLO-PRO114M,
- /// ```
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct MinKnowSampleSheet {
- /// Unique identifier for the protocol run.
- pub protocol_run_id: String,
- /// Identifier for the flowcell position (e.g., port or device slot).
- pub position_id: String,
- /// Flowcell barcode or identifier (e.g., FAB123).
- pub flow_cell_id: String,
- /// Sample ID associated with this run.
- pub sample_id: String,
- /// Experiment ID assigned in the sample sheet.
- pub experiment_id: String,
- /// Product code for the flowcell (e.g., FLO-MIN106).
- pub flow_cell_product_code: String,
- /// Kit identifier used for sample preparation (e.g., SQK-LSK109).
- pub kit: String,
- }
- impl TryFrom<&str> for MinKnowSampleSheet {
- type Error = anyhow::Error;
- /// Attempts to parse a single comma-separated line (CSV row) into a `MinKnowSampleSheet`.
- ///
- /// # Arguments
- /// - `value`: A CSV-formatted string representing a single data row (excluding the header).
- ///
- /// # Returns
- /// - `Ok(MinKnowSampleSheet)` if parsing succeeds.
- /// - `Err` if the row does not contain exactly 7 fields.
- ///
- /// # Example
- /// ```
- /// let row = "1234-ABCD,ABC123,FAB001,SAMPLE001,EXP001,FCP001,KIT001";
- /// let sheet = MinKnowSampleSheet::try_from(row)?;
- /// assert_eq!(sheet.sample_id, "SAMPLE001");
- /// ```
- fn try_from(value: &str) -> anyhow::Result<Self> {
- let cells: Vec<&str> = value.split(',').collect();
- if cells.len() != 7 {
- return Err(anyhow::anyhow!(
- "Number of fields is not equal to 7: {value}"
- ));
- }
- Ok(Self {
- protocol_run_id: cells[0].to_string(),
- position_id: cells[1].to_string(),
- flow_cell_id: cells[2].to_string(),
- sample_id: cells[3].to_string(),
- experiment_id: cells[4].to_string(),
- flow_cell_product_code: cells[5].to_string(),
- kit: cells[6].to_string(),
- })
- }
- }
- impl MinKnowSampleSheet {
- /// Loads a `MinKnowSampleSheet` from a file path containing a 2-line CSV:
- /// a header and a single data row.
- ///
- /// # Arguments
- /// - `path`: Path to the MinKNOW sample sheet file.
- ///
- /// # Returns
- /// - `Ok(MinKnowSampleSheet)` if the file is well-formed.
- /// - `Err` if the file is missing, malformed, or has an invalid header.
- ///
- /// # Expected Format
- /// The file must contain:
- /// - A single header line:
- /// `"protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit"`
- /// - One data line corresponding to a sample.
- ///
- /// # Errors
- /// - If the file is missing, empty, or contains malformed data.
- ///
- /// # Example
- /// ```
- /// let sheet = MinKnowSampleSheet::from_path("samplesheet.csv")?;
- /// println!("Sample ID: {}", sheet.sample_id);
- /// ```
- pub fn from_path(path: &str) -> anyhow::Result<Self> {
- use std::fs::File;
- use std::io::{self, BufRead};
- let file =
- File::open(path).map_err(|e| anyhow::anyhow!("Can't open file: {path}\n\t{e}"))?;
- let reader = io::BufReader::new(file);
- let mut lines = reader.lines();
- // Validate header line
- if let Some(header_line) = lines.next() {
- let header_line =
- header_line.map_err(|e| anyhow::anyhow!("Error reading header line: {e}"))?;
- if header_line
- != "protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit"
- {
- return Err(anyhow::anyhow!(
- "File header doesn't match MinKnow sample sheet format: {header_line}"
- ));
- }
- } else {
- return Err(anyhow::anyhow!("File is empty or missing a header."));
- }
- // Parse the data row
- if let Some(data_line) = lines.next() {
- let data_line =
- data_line.map_err(|e| anyhow::anyhow!("Error reading data line: {e}"))?;
- return data_line.as_str().try_into();
- }
- Err(anyhow::anyhow!(
- "File doesn't contain the expected second line (data row)."
- ))
- }
- }
- /// Loads Nanopore channel state entries from a CSV file.
- ///
- /// This function parses a CSV file that contains time-series data for
- /// individual sequencing channels, including their status (`adapter`, `strand`, etc.),
- /// time since experiment start, and duration in samples.
- ///
- /// The CSV file is expected to have the following headers:
- /// - `Channel`
- /// - `State`
- /// - `Experiment Time (minutes)`
- /// - `State Time (samples)`
- ///
- /// The `State` column is deserialized into a `NanoporeChannelStatus` enum.
- ///
- /// # Arguments
- ///
- /// * `path` - Path to the CSV file containing the channel state data.
- ///
- /// # Returns
- ///
- /// A `Result` containing:
- /// - `Ok(Vec<ChannelStateEntry>)` on success
- /// - `Err(Box<dyn Error>)` if the file can't be opened or parsed
- ///
- /// # Errors
- ///
- /// This function will return an error if:
- /// - The file cannot be opened
- /// - The CSV is malformed or missing expected headers
- /// - A status value cannot be parsed into `NanoporeChannelStatus`
- ///
- /// # Example
- ///
- /// ```rust
- /// let entries = load_channel_states("nanopore_data.csv")?;
- /// for entry in entries {
- /// println!("{:?}", entry);
- /// }
- /// ```
- ///
- /// # Dependencies
- ///
- /// Requires the [`csv`](https://docs.rs/csv), [`serde`](https://docs.rs/serde), and [`serde_derive`](https://docs.rs/serde_derive) crates.
- ///
- /// # See Also
- ///
- /// - [`ChannelStateEntry`]
- /// - [`NanoporeChannelStatus`]
- pub fn parse_pore_activity_from_reader<R: std::io::Read>(
- r: &mut R,
- ) -> anyhow::Result<Vec<PoreStateEntry>> {
- let reader = std::io::BufReader::new(r);
- let mut rdr = csv::ReaderBuilder::new()
- .delimiter(b',')
- .has_headers(true)
- .from_reader(reader);
- let mut records = Vec::new();
- for result in rdr.deserialize() {
- let record: PoreStateEntry = result?;
- records.push(record);
- }
- Ok(records)
- }
- /// One record from a Nanopore pore activity CSV file.
- ///
- /// This structure represents the state of a single sequencing channel at a given
- /// timepoint in the experiment. Each entry includes the channel number, the current
- /// pore state (e.g., `adapter`, `strand`, etc.), the experiment time in minutes, and
- /// the duration the channel has been in that state (in raw sample counts).
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct PoreStateEntry {
- /// Current status of the pore in this channel.
- ///
- /// Values include: `adapter`, `strand`, `unavailable`, etc.
- #[serde(rename = "Channel State", deserialize_with = "deserialize_status")]
- pub state: NanoporeChannelStatus,
- /// Time since the start of the experiment, in minutes.
- #[serde(rename = "Experiment Time (minutes)")]
- pub experiment_time_minutes: f32,
- /// Duration the channel has been in this state, in raw sample units.
- #[serde(rename = "State Time (samples)")]
- pub state_time_samples: u64,
- }
- /// Represents the current status of a Nanopore sequencing channel.
- #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
- pub enum NanoporeChannelStatus {
- /// An adapter is detected in the pore, but sequencing hasn't started yet.
- Adapter,
- /// Channel has been disabled by the system or user.
- Disabled,
- /// Channel is reserved or blocked, typically during transitions like muxing.
- Locked,
- /// More than one nanopore detected in the channel, which is undesirable.
- Multiple,
- /// No nanopore detected in the channel.
- NoPore,
- /// Channel is flagged for manual reset due to an error or inactivity.
- PendingManualReset,
- /// Channel is about to undergo a multiplexer change.
- PendingMuxChange,
- /// A functional nanopore is detected and ready to start sequencing.
- Pore,
- /// Signal is saturated, often due to blockage or abnormal activity.
- Saturated,
- /// Channel is actively sequencing a strand of DNA or RNA.
- Strand,
- /// Channel is not accessible or not reporting status.
- Unavailable,
- /// Channel is currently undergoing unblocking to restore functionality.
- Unblocking,
- /// Channel is in an undefined or unclassified state.
- Unclassified,
- /// Channel is unclassified following a reset attempt.
- UnclassifiedFollowingReset,
- /// Unknown status with a negative signal or pattern.
- UnknownNegative,
- /// Unknown status with a positive signal or pattern.
- UnknownPositive,
- /// No signal detected; the channel appears inactive or disconnected.
- Zero,
- }
- impl FromStr for NanoporeChannelStatus {
- type Err = String;
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- use NanoporeChannelStatus::*;
- match s.trim().to_lowercase().as_str() {
- "adapter" => Ok(Adapter),
- "disabled" => Ok(Disabled),
- "locked" => Ok(Locked),
- "multiple" => Ok(Multiple),
- "no_pore" => Ok(NoPore),
- "pending_manual_reset" => Ok(PendingManualReset),
- "pending_mux_change" => Ok(PendingMuxChange),
- "pore" => Ok(Pore),
- "saturated" => Ok(Saturated),
- "strand" => Ok(Strand),
- "unavailable" => Ok(Unavailable),
- "unblocking" => Ok(Unblocking),
- "unclassified" => Ok(Unclassified),
- "unclassified_following_reset" => Ok(UnclassifiedFollowingReset),
- "unknown_negative" => Ok(UnknownNegative),
- "unknown_positive" => Ok(UnknownPositive),
- "zero" => Ok(Zero),
- _ => Err(format!("Unknown channel status: {}", s)),
- }
- }
- }
- // Used by serde to parse the status field
- fn deserialize_status<'de, D>(deserializer: D) -> Result<NanoporeChannelStatus, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- let s = String::deserialize(deserializer)?;
- NanoporeChannelStatus::from_str(&s).map_err(serde::de::Error::custom)
- }
- /// Represents a single timepoint of flowcell throughput metrics from a Nanopore sequencing run.
- ///
- /// Each record summarizes various metrics such as the number of reads,
- /// basecalled reads, raw samples, and throughput estimates at a given minute
- /// of the experiment.
- #[derive(Debug, Serialize, Deserialize, Clone)]
- pub struct ThroughputEntry {
- /// Time since the start of the experiment, in minutes.
- #[serde(rename = "Experiment Time (minutes)")]
- pub experiment_time_minutes: u32,
- /// Total number of reads detected.
- #[serde(rename = "Reads")]
- pub reads: u64,
- /// Number of reads that passed basecalling filters.
- #[serde(rename = "Basecalled Reads Passed")]
- pub basecalled_reads_passed: u64,
- /// Number of reads that failed basecalling filters.
- #[serde(rename = "Basecalled Reads Failed")]
- pub basecalled_reads_failed: u64,
- /// Number of reads skipped during basecalling.
- #[serde(rename = "Basecalled Reads Skipped")]
- pub basecalled_reads_skipped: u64,
- /// Number of raw signal samples selected for processing.
- #[serde(rename = "Selected Raw Samples")]
- pub selected_raw_samples: u64,
- /// Number of events (e.g., current transitions) selected.
- #[serde(rename = "Selected Events")]
- pub selected_events: u64,
- /// Estimated number of base pairs sequenced.
- #[serde(rename = "Estimated Bases")]
- pub estimated_bases: u64,
- /// Actual number of basecalled base pairs.
- #[serde(rename = "Basecalled Bases")]
- pub basecalled_bases: u64,
- /// Number of raw signal samples used for basecalling.
- #[serde(rename = "Basecalled Samples")]
- pub basecalled_samples: u64,
- }
- /// Loads flowcell throughput statistics from a CSV file.
- ///
- /// This function reads per-minute summary statistics from a CSV file
- /// generated by a Nanopore sequencing run. These metrics include the number
- /// of reads, basecalled results, and estimated throughput.
- ///
- /// The CSV file must have the following headers:
- /// - `Experiment Time (minutes)`
- /// - `Reads`
- /// - `Basecalled Reads Passed`
- /// - `Basecalled Reads Failed`
- /// - `Basecalled Reads Skipped`
- /// - `Selected Raw Samples`
- /// - `Selected Events`
- /// - `Estimated Bases`
- /// - `Basecalled Bases`
- /// - `Basecalled Samples`
- ///
- /// # Arguments
- ///
- /// * `reader` - Any reader that implements `Read`, such as a file, buffer, or
- /// decompressed tar entry, containing CSV-formatted throughput metrics.
- ///
- /// # Returns
- ///
- /// A `Result` containing:
- /// - `Ok(Vec<ThroughputEntry>)` if parsing succeeds
- /// - `Err(anyhow::Error)` if an I/O or CSV deserialization error occurs
- ///
- pub fn parse_throughput_from_reader<R: std::io::Read>(
- r: &mut R,
- ) -> anyhow::Result<Vec<ThroughputEntry>> {
- let reader = std::io::BufReader::new(r);
- let mut csv_reader = csv::ReaderBuilder::new()
- .delimiter(b',')
- .has_headers(true)
- .from_reader(reader);
- let mut records = Vec::new();
- for result in csv_reader.deserialize() {
- let record: ThroughputEntry = result.context("CSV deserialization failed")?;
- records.push(record);
- }
- Ok(records)
- }
|