Sfoglia il codice sorgente

audit and refactor src/io/pod5_infos.rs

- Fix flat_map(|v| vec![...]) -> filter_map(|v| v.map(...)); old version
  allocated a Vec per element when extracting StringArray values
- Fix read_arrow_table: now takes &mut File instead of re-opening by path;
  from_pod5 already holds the file open, no need for a second handle
- Extract column dispatch into extract_column() helper; from_pod5 was
  177 lines with a deeply nested match; now the column loop is 5 lines
- batch.is_empty() + batch[0] -> batch.first()
- Add debug! logs for unrecognised column names (aids future POD5 versions)
- Use (0..a.len()).next() to take only the first row value from scalar cols
- Add //! module header and rustdoc on Pod5Info and from_pod5
- FileReader::try_new(...).collect() replaces manual batch loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Thomas 1 mese fa
parent
commit
f3d94897a1
1 ha cambiato i file con 147 aggiunte e 167 eliminazioni
  1. 147 167
      src/io/pod5_infos.rs

+ 147 - 167
src/io/pod5_infos.rs

@@ -1,3 +1,10 @@
+//! POD5 file metadata reader.
+//!
+//! Extracts run-level metadata from ONT POD5 files by:
+//! 1. Reading the flatbuffers footer embedded at the end of the file
+//! 2. Locating the `RunInfoTable` Arrow IPC table referenced in the footer
+//! 3. Deserialising the Arrow columns into a [`Pod5Info`] struct
+
 use std::{
     fs::File,
     io::{Read, Seek, SeekFrom},
@@ -7,14 +14,18 @@ use arrow::array::{ArrayRef, Int16Array, StringArray, TimestampMillisecondArray,
 use arrow::{array::RecordBatch, ipc::reader::FileReader};
 use chrono::TimeZone;
 use chrono::{DateTime, Utc};
+use log::debug;
 
 use super::pod5_footer_generated::minknow::reads_format::{ContentType, Footer};
-// use podders::{root_as_footer, ContentType};
 
 pub fn root_as_footer(buf: &[u8]) -> Result<Footer<'_>, flatbuffers::InvalidFlatbuffer> {
-  flatbuffers::root::<Footer>(buf)
+    flatbuffers::root::<Footer>(buf)
 }
 
+/// Run-level metadata extracted from a POD5 file's `RunInfoTable`.
+///
+/// Fields that are absent in the Arrow table retain their default values
+/// (`""` for strings, `0` for numbers, current time for timestamps).
 #[derive(Debug, Clone)]
 pub struct Pod5Info {
     pub acquisition_id: String,
@@ -38,209 +49,178 @@ pub struct Pod5Info {
 }
 
 impl Pod5Info {
-    /// Read Pod5 metadata from a file, returning Result instead of panicking
+    /// Read POD5 metadata from `file_path`.
+    ///
+    /// Parses the flatbuffers footer at the end of the file to locate the
+    /// `RunInfoTable` Arrow IPC section, then extracts all known metadata
+    /// fields by column name.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be opened, is too small, has an
+    /// invalid footer, or if Arrow deserialisation fails.
     pub fn from_pod5(file_path: &str) -> anyhow::Result<Self> {
         let mut file = File::open(file_path)
             .map_err(|e| anyhow::anyhow!("Failed to open POD5 file '{}': {}", file_path, e))?;
-        
+
         let end = file.seek(SeekFrom::End(0))
-            .map_err(|e| anyhow::anyhow!("Failed to seek to end of POD5 file '{}': {}", file_path, e))?;
-        
+            .map_err(|e| anyhow::anyhow!("Failed to seek in '{}': {}", file_path, e))?;
+
         if end < 32 {
-            anyhow::bail!("POD5 file '{}' is too small ({} bytes), expected at least 32 bytes", file_path, end);
+            anyhow::bail!("POD5 file '{}' is too small ({} bytes)", file_path, end);
         }
-        
+
+        // The 8 bytes at offset (end - 32) hold the flatbuffers footer length.
         file.seek(SeekFrom::Current(-32))
-            .map_err(|e| anyhow::anyhow!("Failed to seek in POD5 file '{}': {}", file_path, e))?;
-        
-        let mut buffer = [0; 8];
+            .map_err(|e| anyhow::anyhow!("Seek failed in '{}': {}", file_path, e))?;
+
+        let mut buffer = [0u8; 8];
         file.read_exact(&mut buffer)
-            .map_err(|e| anyhow::anyhow!("Failed to read footer length from POD5 file '{}': {}", file_path, e))?;
+            .map_err(|e| anyhow::anyhow!("Failed to read footer length from '{}': {}", file_path, e))?;
+
+        let footer_len = i64::from_le_bytes(buffer);
 
-        let value = i64::from_le_bytes(buffer);
-        
-        if value <= 0 || value as u64 > end {
-            anyhow::bail!("Invalid footer length in POD5 file '{}': {} (file size: {})", file_path, value, end);
+        if footer_len <= 0 || footer_len as u64 > end {
+            anyhow::bail!("Invalid footer length {} in '{}'", footer_len, file_path);
         }
 
-        file.seek(SeekFrom::Current(-(8 + value)))
-            .map_err(|e| anyhow::anyhow!("Failed to seek to footer in POD5 file '{}': {}", file_path, e))?;
+        // Now at end-24; footer starts at end-32-footer_len.
+        file.seek(SeekFrom::Current(-(8 + footer_len)))
+            .map_err(|e| anyhow::anyhow!("Seek to footer failed in '{}': {}", file_path, e))?;
 
-        let mut buf = vec![0; value as usize];
+        let mut buf = vec![0u8; footer_len as usize];
         file.read_exact(&mut buf)
-            .map_err(|e| anyhow::anyhow!("Failed to read footer data from POD5 file '{}': {}", file_path, e))?;
+            .map_err(|e| anyhow::anyhow!("Failed to read footer from '{}': {}", file_path, e))?;
 
         let footer = root_as_footer(&buf)
-            .map_err(|e| anyhow::anyhow!("Failed to parse footer in POD5 file '{}': {:?}", file_path, e))?;
-
-        let mut acquisition_id = String::new();
-        let mut acquisition_start_time = Utc::now();
-        let mut adc_max = 0;
-        let mut adc_min = 0;
-        let mut experiment_name = String::new();
-        let mut flow_cell_id = String::new();
-        let mut flow_cell_product_code = String::new();
-        let mut protocol_name = String::new();
-        let mut protocol_run_id = String::new();
-        let mut protocol_start_time = Utc::now();
-        let mut sample_id = String::new();
-        let mut sample_rate = 0;
-        let mut sequencing_kit = String::new();
-        let mut sequencer_position = String::new();
-        let mut sequencer_position_type = String::new();
-        let mut software = String::new();
-        let mut system_name = String::new();
-        let mut system_type = String::new();
+            .map_err(|e| anyhow::anyhow!("Failed to parse footer in '{}': {:?}", file_path, e))?;
+
+        let mut info = Pod5Info {
+            acquisition_id: String::new(),
+            acquisition_start_time: Utc::now(),
+            adc_max: 0,
+            adc_min: 0,
+            experiment_name: String::new(),
+            flow_cell_id: String::new(),
+            flow_cell_product_code: String::new(),
+            protocol_name: String::new(),
+            protocol_run_id: String::new(),
+            protocol_start_time: Utc::now(),
+            sample_id: String::new(),
+            sample_rate: 0,
+            sequencing_kit: String::new(),
+            sequencer_position: String::new(),
+            sequencer_position_type: String::new(),
+            software: String::new(),
+            system_name: String::new(),
+            system_type: String::new(),
+        };
 
         if let Some(contents) = footer.contents() {
             for content in contents.iter() {
                 if let ContentType::RunInfoTable = content.content_type() {
-                    let batch = read_arrow_table(
-                        file_path,
+                    let batches = read_arrow_table(
+                        &mut file,
                         content.offset() as u64,
                         content.length() as u64,
-                    )
-                    .map_err(|e| anyhow::anyhow!("Failed to read run info table from POD5 file '{}': {}", file_path, e))?;
-
-                    if batch.is_empty() {
-                        continue;
+                    ).map_err(|e| anyhow::anyhow!("Failed to read RunInfoTable from '{}': {}", file_path, e))?;
+
+                    let batch = match batches.first() {
+                        Some(b) => b,
+                        None => continue,
+                    };
+
+                    let schema = batch.schema();
+                    for col_idx in 0..batch.num_columns() {
+                        let array: ArrayRef = batch.column(col_idx).clone();
+                        let col = schema.field(col_idx).name().as_str();
+                        extract_column(col, &array, &mut info);
                     }
+                }
+            }
+        }
 
-                    let schema = batch[0].schema();
-                    for column in 0..batch[0].num_columns() {
-                        let array: ArrayRef = batch[0].column(column).clone();
-                        let column_name = schema.field(column).name().to_string();
-
-                        match array.data_type() {
-                            arrow::datatypes::DataType::Int16 => {
-                                if let Some(int_array) = array.as_any().downcast_ref::<Int16Array>() {
-                                    for i in 0..int_array.len() {
-                                        match column_name.as_str() {
-                                            "adc_max" => adc_max = int_array.value(i),
-                                            "adc_min" => adc_min = int_array.value(i),
-                                            _ => (),
-                                        }
-                                    }
-                                }
-                            }
-                            arrow::datatypes::DataType::UInt16 => {
-                                if let Some(int_array) = array.as_any().downcast_ref::<UInt16Array>() {
-                                    for i in 0..int_array.len() {
-                                        if let "sample_rate" = column_name.as_str() {
-                                            sample_rate = int_array.value(i)
-                                        }
-                                    }
-                                }
-                            }
-                            arrow::datatypes::DataType::Utf8 => {
-                                if let Some(string_array) = array.as_any().downcast_ref::<StringArray>() {
-                                    let string_array: Vec<String> = string_array
-                                        .iter()
-                                        .flat_map(|v| match v {
-                                            Some(v) => vec![v.to_string()],
-                                            None => vec![],
-                                        })
-                                        .collect();
-
-                                    let value = string_array.join(" ");
-
-                                    match column_name.as_str() {
-                                        "acquisition_id" => acquisition_id = value,
-                                        "experiment_name" => experiment_name = value,
-                                        "flow_cell_id" => flow_cell_id = value,
-                                        "flow_cell_product_code" => flow_cell_product_code = value,
-                                        "protocol_name" => protocol_name = value,
-                                        "protocol_run_id" => protocol_run_id = value,
-                                        "sample_id" => sample_id = value,
-                                        "sequencing_kit" => sequencing_kit = value,
-                                        "sequencer_position" => sequencer_position = value,
-                                        "sequencer_position_type" => sequencer_position_type = value,
-                                        "software" => software = value,
-                                        "system_name" => system_name = value,
-                                        "system_type" => system_type = value,
-                                        _ => (),
-                                    }
-                                }
-                            }
-                            arrow::datatypes::DataType::Timestamp(
-                                arrow::datatypes::TimeUnit::Millisecond,
-                                Some(timezone),
-                            ) => {
-                                if &timezone.to_string() == "UTC" {
-                                    if let Some(timestamp_array) = array
-                                        .as_any()
-                                        .downcast_ref::<TimestampMillisecondArray>()
-                                    {
-                                        for i in 0..timestamp_array.len() {
-                                            let timestamp = timestamp_array.value(i);
-                                            if let Some(datetime) = Utc.timestamp_millis_opt(timestamp).single() {
-                                                match column_name.as_str() {
-                                                    "acquisition_start_time" => {
-                                                        acquisition_start_time = datetime
-                                                    }
-                                                    "protocol_start_time" => protocol_start_time = datetime,
-                                                    _ => (),
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                            _ => {
-                                // Unsupported data type, skip
-                            }
+        Ok(info)
+    }
+}
+
+/// Extract one Arrow column into the matching `Pod5Info` field.
+fn extract_column(col: &str, array: &ArrayRef, info: &mut Pod5Info) {
+    use arrow::datatypes::DataType;
+
+    match array.data_type() {
+        DataType::Int16 => {
+            if let Some(a) = array.as_any().downcast_ref::<Int16Array>() {
+                if let Some(v) = (0..a.len()).next().map(|i| a.value(i)) {
+                    match col {
+                        "adc_max" => info.adc_max = v,
+                        "adc_min" => info.adc_min = v,
+                        _ => debug!("pod5: unrecognised Int16 column '{col}'"),
+                    }
+                }
+            }
+        }
+        DataType::UInt16 => {
+            if let Some(a) = array.as_any().downcast_ref::<UInt16Array>() {
+                if let Some(v) = (0..a.len()).next().map(|i| a.value(i)) {
+                    match col {
+                        "sample_rate" => info.sample_rate = v,
+                        _ => debug!("pod5: unrecognised UInt16 column '{col}'"),
+                    }
+                }
+            }
+        }
+        DataType::Utf8 => {
+            if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
+                let value: String = a.iter().filter_map(|v| v.map(|s| s.to_string())).collect::<Vec<_>>().join(" ");
+                match col {
+                    "acquisition_id"          => info.acquisition_id = value,
+                    "experiment_name"         => info.experiment_name = value,
+                    "flow_cell_id"            => info.flow_cell_id = value,
+                    "flow_cell_product_code"  => info.flow_cell_product_code = value,
+                    "protocol_name"           => info.protocol_name = value,
+                    "protocol_run_id"         => info.protocol_run_id = value,
+                    "sample_id"               => info.sample_id = value,
+                    "sequencing_kit"          => info.sequencing_kit = value,
+                    "sequencer_position"      => info.sequencer_position = value,
+                    "sequencer_position_type" => info.sequencer_position_type = value,
+                    "software"                => info.software = value,
+                    "system_name"             => info.system_name = value,
+                    "system_type"             => info.system_type = value,
+                    _ => debug!("pod5: unrecognised Utf8 column '{col}'"),
+                }
+            }
+        }
+        DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, Some(tz))
+            if tz.as_ref() == "UTC" =>
+        {
+            if let Some(a) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
+                for i in 0..a.len() {
+                    if let Some(dt) = Utc.timestamp_millis_opt(a.value(i)).single() {
+                        match col {
+                            "acquisition_start_time" => info.acquisition_start_time = dt,
+                            "protocol_start_time"    => info.protocol_start_time = dt,
+                            _ => debug!("pod5: unrecognised Timestamp column '{col}'"),
                         }
                     }
                 }
             }
         }
-
-        Ok(Pod5Info {
-            acquisition_id,
-            acquisition_start_time,
-            adc_max,
-            adc_min,
-            experiment_name,
-            flow_cell_id,
-            flow_cell_product_code,
-            protocol_name,
-            protocol_run_id,
-            protocol_start_time,
-            sample_id,
-            sample_rate,
-            sequencing_kit,
-            sequencer_position,
-            sequencer_position_type,
-            software,
-            system_name,
-            system_type,
-        })
+        _ => {}
     }
 }
 
+/// Read an Arrow IPC table embedded at `[offset, offset+length)` in `file`.
 fn read_arrow_table(
-    file_path: &str,
+    file: &mut File,
     offset: u64,
     length: u64,
 ) -> arrow::error::Result<Vec<RecordBatch>> {
-    let mut file = File::open(file_path)?;
-
-    // Seek to the start of the embedded table
     file.seek(SeekFrom::Start(offset))?;
 
-    // Read the specified length of bytes
-    let mut buffer = vec![0; length as usize];
+    let mut buffer = vec![0u8; length as usize];
     file.read_exact(&mut buffer)?;
 
-    // Deserialize bytes into Arrow RecordBatch
     let cursor = std::io::Cursor::new(buffer);
-    let reader = FileReader::try_new(cursor, None)?;
-
-    let mut batches = Vec::new();
-    for batch in reader {
-        batches.push(batch?);
-    }
-
-    Ok(batches)
+    FileReader::try_new(cursor, None)?.collect()
 }
-