Browse Source

Flowcells save gz

Thomas 8 months ago
parent
commit
ec89a0e159
3 changed files with 63 additions and 22 deletions
  1. 32 18
      src/collection/flowcells.rs
  2. 29 3
      src/collection/minknow.rs
  3. 2 1
      src/lib.rs

+ 32 - 18
src/collection/flowcells.rs

@@ -15,10 +15,10 @@ use serde::{Deserialize, Serialize};
 
 use crate::{
     collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader},
-    helpers::{find_files, list_directories},
+    helpers::{find_files, list_directories}, io::writers::get_gz_writer,
 };
 
-use super::minknow::{MinKnowSampleSheet, PoreStateEntry, ThroughputEntry};
+use super::minknow::{MinKnowSampleSheet, PoreStateEntry, PoreStateEntryExt, ThroughputEntry};
 
 /// A collection of `IdInput` records, with utility methods
 /// for loading, saving, deduplication, and construction from TSV.
@@ -169,11 +169,13 @@ impl FlowCells {
     }
 
     pub fn save_to_archive(&mut self, archive_path: &str) -> anyhow::Result<()> {
-        let file = OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .open(archive_path)
+        // let file = OpenOptions::new()
+        //     .write(true)
+        //     .create(true)
+        //     .truncate(true)
+        //     .open(archive_path)
+        //     .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
+        let file = get_gz_writer(archive_path, true)
             .with_context(|| format!("Failed to open archive file for writing: {archive_path}"))?;
 
         serde_json::to_writer_pretty(file, &self.flow_cells)
@@ -193,7 +195,9 @@ impl FlowCells {
     /// # Errors
     /// Returns an error if scanning fails or if directory layout is invalid.
     pub fn load_from_local(&mut self, local_dir: &str) -> anyhow::Result<()> {
+        info!("Scanning {local_dir} for sample sheets.");
         let sample_sheets = find_files(&format!("{local_dir}/**/sample_sheet*"))?;
+        info!("{} sample sheets found.", sample_sheets.len());
 
         for path in sample_sheets {
             let dir = path
@@ -667,22 +671,31 @@ pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
         if path.contains("pore_activity") {
             let mut reader =
                 File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
-            pore_activity = Some(parse_pore_activity_from_reader(&mut reader).with_context(
-                || {
-                    format!(
-                        "Failed to parse pore activity date from: {}",
-                        file.display()
-                    )
-                },
-            )?);
+            pore_activity = Some(
+                parse_pore_activity_from_reader(&mut reader)
+                    .map(|d| d.group_by_state_and_time(60.0))
+                    .with_context(|| {
+                        format!(
+                            "Failed to parse pore activity date from: {}",
+                            file.display()
+                        )
+                    })?,
+            );
         } else if path.contains("sample_sheet") {
             sample_sheet = Some(path.clone());
         } else if path.contains("throughput") {
             let mut reader =
                 File::open(&file).with_context(|| format!("Failed to open: {}", file.display()))?;
-            throughput = Some(parse_throughput_from_reader(&mut reader).with_context(|| {
-                format!("Failed to parse throughput date from: {}", file.display())
-            })?);
+            throughput = Some(
+                parse_throughput_from_reader(&mut reader)
+                    .with_context(|| {
+                        format!("Failed to parse throughput data from: {}", file.display())
+                    })?
+                    .into_iter()
+                    .enumerate()
+                    .filter_map(|(i, item)| if i % 60 == 0 { Some(item) } else { None })
+                    .collect(),
+            );
         }
 
         result.push((path, size, modified_utc));
@@ -692,6 +705,7 @@ pub fn scan_local(dir: &str) -> anyhow::Result<ExperimentData> {
     let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
         .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
 
+    // info!("{} files found in {}", result.len(), dir);
     Ok((sample_sheet, pore_activity, throughput, result))
 }
 

+ 29 - 3
src/collection/minknow.rs

@@ -1,6 +1,7 @@
-use std::str::FromStr;
+use std::{collections::BTreeMap, str::FromStr};
 
 use anyhow::Context;
+use ordered_float::OrderedFloat;
 use serde::{Deserialize, Serialize};
 
 /// Represents a single entry from a MinKNOW sample sheet CSV file.
@@ -222,15 +223,40 @@ pub struct PoreStateEntry {
 
     /// Time since the start of the experiment, in minutes.
     #[serde(rename = "Experiment Time (minutes)")]
-    pub experiment_time_minutes: f32,
+    pub experiment_time_minutes: OrderedFloat<f32>,
 
     /// Duration the channel has been in this state, in raw sample units.
     #[serde(rename = "State Time (samples)")]
     pub state_time_samples: u64,
 }
 
+pub trait PoreStateEntryExt {
+    fn group_by_state_and_time(&self, t: f32) -> Vec<PoreStateEntry>;
+}
+
+impl PoreStateEntryExt for Vec<PoreStateEntry> {
+    fn group_by_state_and_time(&self, t: f32) -> Vec<PoreStateEntry> {
+        let mut buckets: BTreeMap<(OrderedFloat<f32>, NanoporeChannelStatus), u64> = BTreeMap::new();
+
+        for entry in self {
+            let bucket_start = OrderedFloat((entry.experiment_time_minutes / t).floor() * t);
+            let key = (bucket_start, entry.state);
+            *buckets.entry(key).or_default() += entry.state_time_samples;
+        }
+
+        buckets
+            .into_iter()
+            .map(|((bucket_time, state), total_samples)| PoreStateEntry {
+                state,
+                experiment_time_minutes: OrderedFloat(bucket_time.into_inner()),
+                state_time_samples: total_samples,
+            })
+            .collect()
+    }
+}
+
 /// Represents the current status of a Nanopore sequencing channel.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)]
 pub enum NanoporeChannelStatus {
     /// An adapter is detected in the pore, but sequencing hasn't started yet.
     Adapter,

+ 2 - 1
src/lib.rs

@@ -892,7 +892,8 @@ mod tests {
     fn load_fc() -> anyhow::Result<()> {
         init();
         // FlowCells::load_archive_from_scan("/data/lto", "/data/archives.json")?;
-        let r = FlowCells::load("/data/run_data", "/data/inputs_ids.json", "/data/archives.json")?;
+        let r = FlowCells::load("/home/prom/mnt/store", 
+            "/data/pandora_id_inputs.json", "/data/archives.json.gz")?;
         println!("{r:#?}");
         Ok(())
     }