Thomas 7 月之前
父节点
当前提交
624448cd60
共有 5 个文件被更改,包括 646 次插入2 次删除
  1. 1 1
      src/collection/flowcells.rs
  2. 1 0
      src/collection/mod.rs
  3. 611 0
      src/collection/run.rs
  4. 23 1
      src/collection/tests.rs
  5. 10 0
      src/config.rs

+ 1 - 1
src/collection/flowcells.rs

@@ -141,7 +141,7 @@ impl FlowCells {
         instance.load_from_archive(archive_store_path)?;
         instance.load_from_local(local_run_dir)?;
         instance.annotate_with_inputs(inputs_path)?;
-        instance.save_to_archive(archive_store_path)?;
+        // instance.save_to_archive(archive_store_path)?;
 
         Ok(instance)
     }

+ 1 - 0
src/collection/mod.rs

@@ -40,6 +40,7 @@ pub mod minknow;
 
 #[cfg(test)]
 mod tests;
+pub mod run;
 
 #[derive(Debug, Clone)]
 pub struct CollectionsConfig {

+ 611 - 0
src/collection/run.rs

@@ -0,0 +1,611 @@
+use std::{
+    collections::{hash_map::Entry, HashMap, HashSet},
+    fs::{self, File},
+    io::{Read, Write},
+    path::{Path, PathBuf},
+};
+
+use anyhow::Context;
+use chrono::{DateTime, TimeZone, Utc};
+use log::{debug, info};
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+
+use crate::collection::minknow::{parse_pore_activity_from_reader, parse_throughput_from_reader};
+
+use super::minknow::{MinKnowSampleSheet, PoreStateEntry, ThroughputEntry};
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
+enum PodDir {
+    Muxed {
+        dir: PathBuf,
+        source: Pod5Source,
+        info: Vec<(String, u64, DateTime<Utc>)>,
+    },
+    DeMuxed {
+        dir: PathBuf,
+        source: Pod5Source,
+        info: Vec<(String, u64, DateTime<Utc>)>,
+    },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
+enum Pod5Source {
+    LocalDir,
+    Tar,
+}
+
+pub struct PromRun {
+    sample_sheet: MinKnowSampleSheet,
+    pore_activity: Vec<PoreStateEntry>,
+    throughput: Vec<ThroughputEntry>,
+    pod5_dirs: Vec<PodDir>,
+}
+
+impl PromRun {
+    pub fn from_local_dir(dir: &PathBuf) -> anyhow::Result<Self> {
+        let mut throughput_opt = None;
+        let mut sample_sheet_opt = None;
+        let mut pore_activity_opt = None;
+        let mut pod5_dirs = Vec::new();
+
+        for entry in
+            fs::read_dir(dir).with_context(|| format!("failed to read dir {:?}", dir.display()))?
+        {
+            let entry = entry?;
+            let path = entry.path();
+            println!("{}", path.display());
+
+            if let Some(fname) = path.file_name().and_then(|n| n.to_str()) {
+                let path_str = path
+                    .to_str()
+                    .with_context(|| format!("non-UTF8 path: {:?}", path))?;
+
+                // Sample sheet
+                if fname.starts_with("sample_sheet") {
+                    sample_sheet_opt = Some(
+                        MinKnowSampleSheet::from_path(path_str)
+                            .with_context(|| format!("failed to load {:?}", path.display()))?,
+                    );
+                    println!("Loaded sample_sheet: {}", path.display());
+                }
+                // Throughput
+                else if fname.starts_with("throughput_") {
+                    let mut f = File::open(&path)
+                        .with_context(|| format!("failed to open {:?}", path.display()))?;
+                    throughput_opt = Some(
+                        parse_throughput_from_reader(&mut f)
+                            .with_context(|| format!("failed to load {:?}", path.display()))?,
+                    );
+                    println!("Loaded throughput: {}", path.display());
+                }
+                // Pore activity
+                else if fname.starts_with("pore_activity_") {
+                    let mut f = File::open(&path)
+                        .with_context(|| format!("failed to open {:?}", path.display()))?;
+                    pore_activity_opt = Some(
+                        parse_pore_activity_from_reader(&mut f)
+                            .with_context(|| format!("failed to load {:?}", path.display()))?,
+                    );
+                    println!("Loaded pore_activity: {}", path.display());
+                }
+                // pod5 muxed
+                else if fname == "pod5" && path.is_dir() {
+                    // path.
+                    pod5_dirs.push(PodDir::Muxed {
+                        dir: path.clone(),
+                        source: Pod5Source::LocalDir,
+                        info: read_local_pod5_dir(&path)?,
+                    });
+                } else if fname == "pod5_pass" && path.is_dir() {
+                    pod5_dirs.push(PodDir::DeMuxed {
+                        dir: path.clone(),
+                        source: Pod5Source::LocalDir,
+                        info: read_local_pod5_dir(&path)?,
+                    });
+                }
+            }
+        }
+
+        Ok(Self {
+            sample_sheet: sample_sheet_opt.with_context(|| "sample sheet not found")?,
+            pore_activity: pore_activity_opt.with_context(|| "pore activity not found")?,
+            throughput: throughput_opt.with_context(|| "throughput not found")?,
+            pod5_dirs,
+        })
+    }
+
+    pub fn id(&self) -> String {
+        self.sample_sheet.protocol_run_id.clone()
+    }
+}
+
+pub fn read_local_pod5_dir(dir: &PathBuf) -> anyhow::Result<Vec<(String, u64, DateTime<Utc>)>> {
+    let mut files = Vec::new();
+    for entry in
+        fs::read_dir(dir).with_context(|| format!("failed to read dir {:?}", dir.display()))?
+    {
+        let entry = entry?;
+        let meta = entry.metadata()?;
+        if meta.is_file() {
+            let name = entry.file_name().into_string().unwrap_or_default();
+            let size = meta.len();
+            let modified = meta.modified()?;
+            let datetime: DateTime<Utc> = DateTime::<Utc>::from(modified);
+            files.push((name, size, datetime));
+        }
+    }
+    Ok(files)
+}
+
+/// Recursively find all directories containing required files
+fn find_ready_dirs<P: AsRef<Path>>(root: P) -> anyhow::Result<Vec<PathBuf>> {
+    let mut result = Vec::new();
+
+    fn visit_dir(dir: &Path, out: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+        // Only proceed if directory
+        if !dir.is_dir() {
+            return Ok(());
+        }
+        let mut has_sample_sheet = false;
+        let mut has_throughput = false;
+        let mut has_pore_activity = false;
+
+        for entry in fs::read_dir(dir).with_context(|| format!("failed to read dir {dir:?}"))? {
+            let entry = entry?;
+            let path = entry.path();
+
+            if path.is_file() {
+                if let Some(fname) = path.file_name().and_then(|n| n.to_str()) {
+                    if fname.starts_with("sample_sheet") {
+                        has_sample_sheet = true;
+                    } else if fname.starts_with("throughput_") {
+                        has_throughput = true;
+                    } else if fname.starts_with("pore_activity_") {
+                        has_pore_activity = true;
+                    }
+                }
+            } else if path.is_dir() {
+                visit_dir(&path, out)?; // recurse into subdir
+            }
+        }
+
+        if has_sample_sheet && has_throughput && has_pore_activity {
+            out.push(dir.to_path_buf());
+        }
+        Ok(())
+    }
+
+    visit_dir(root.as_ref(), &mut result)?;
+    Ok(result)
+}
+
+#[derive(Debug, Default)]
+pub struct PromRuns {
+    sample_sheets: HashMap<String, MinKnowSampleSheet>,
+    pore_activities: HashMap<String, Vec<PoreStateEntry>>,
+    throughputs: HashMap<String, Vec<ThroughputEntry>>,
+    pub archive_dir: PathBuf,
+    pod5_dirs: HashMap<String, Vec<PodDir>>,
+}
+
+impl PromRuns {
+    pub fn new(archive_dir: PathBuf) -> Self {
+        Self {
+            sample_sheets: HashMap::new(),
+            pore_activities: HashMap::new(),
+            throughputs: HashMap::new(),
+            pod5_dirs: HashMap::new(),
+            archive_dir,
+        }
+    }
+
+    pub fn scan_local(&mut self, dir: &str) -> anyhow::Result<()> {
+        let dirs = find_ready_dirs(dir)?;
+        println!("{} dirs", dirs.len());
+        for dir in dirs {
+            let PromRun {
+                sample_sheet,
+                pore_activity,
+                throughput,
+                pod5_dirs,
+            } = PromRun::from_local_dir(&dir)?;
+            let id = sample_sheet.protocol_run_id.clone();
+            if self
+                .sample_sheets
+                .insert(id.clone(), sample_sheet)
+                .is_some()
+            {
+                debug!("Sample sheet already loaded for: {id}");
+            }
+            if self
+                .pore_activities
+                .insert(id.clone(), pore_activity)
+                .is_some()
+            {
+                debug!("Pore activities already loaded for: {id}");
+            }
+            if self.throughputs.insert(id.clone(), throughput).is_some() {
+                debug!("Throughputs already loaded for: {id}");
+            }
+            match self.pod5_dirs.entry(id.clone()) {
+                Entry::Occupied(mut e) => {
+                    e.get_mut().extend(pod5_dirs);
+                    debug!("pod5_dirs extended for: {id}");
+                }
+                Entry::Vacant(e) => {
+                    e.insert(pod5_dirs);
+                }
+            }
+        }
+        Ok(())
+    }
+
+    pub fn scan_archives_dir(&mut self, dir: &str) -> anyhow::Result<()> {
+        for path in fs::read_dir(dir).with_context(|| format!("failed to read: {dir}"))? {
+            let e = path?;
+
+            if !(e.file_type()?.is_file()
+                && e.file_name()
+                    .to_str()
+                    .map(|s| s.ends_with(".tar"))
+                    .unwrap_or(false))
+            {
+                info!("{} is not a valid tar file.", e.path().display());
+                continue;
+            }
+
+            info!("Reading archive: {}", e.path().display());
+            let (sample_sheet, pore_activity_opt, throughput_opt, info) = scan_archive(&e.path())?;
+
+            let id = sample_sheet.protocol_run_id.clone();
+
+            if self
+                .sample_sheets
+                .insert(id.clone(), sample_sheet)
+                .is_some()
+            {
+                debug!("Sample sheet already loaded for: {id}");
+            }
+            if let Some(pore_activities) = pore_activity_opt {
+                if self
+                    .pore_activities
+                    .insert(id.clone(), pore_activities)
+                    .is_some()
+                {
+                    debug!("Pore activities already loaded for: {id}");
+                }
+            }
+
+            if let Some(throughput) = throughput_opt {
+                if self.throughputs.insert(id.clone(), throughput).is_some() {
+                    debug!("Throughputs already loaded for: {id}");
+                }
+            }
+
+            let pod5_mux: Vec<_> = info
+                .iter()
+                .filter(|(path_str, _, _)| {
+                    path_str.ends_with("pod5") && path_str.contains("/pod5/")
+                })
+                .cloned()
+                .collect();
+
+            let pod5_demux: Vec<_> = info
+                .iter()
+                .filter(|(path_str, _, _)| {
+                    path_str.ends_with("pod5") && path_str.contains("/pod5_pass/")
+                })
+                .cloned()
+                .collect();
+
+            let mut pod5_dirs = Vec::new();
+            if let Some((path_str, _, _)) = pod5_mux.first() {
+                if let Some(dir) = PathBuf::from(path_str).parent() {
+                    pod5_dirs.push(PodDir::Muxed {
+                        dir: dir.to_path_buf(),
+                        source: Pod5Source::Tar,
+                        info: pod5_mux,
+                    });
+                }
+            }
+
+            if let Some((path_str, _, _)) = pod5_demux.first() {
+                if let Some(dir) = PathBuf::from(path_str).parent() {
+                    pod5_dirs.push(PodDir::DeMuxed {
+                        dir: dir.to_path_buf(),
+                        source: Pod5Source::Tar,
+                        info: pod5_demux,
+                    });
+                }
+            }
+
+            self.add_pod5_dirs(id.clone(), pod5_dirs);
+
+            break;
+        }
+        Ok(())
+    }
+
+    pub fn add_pod5_dirs(&mut self, id: String, pod5_dirs: Vec<PodDir>) {
+        // use std::collections::hash_map::Entry;
+
+        match self.pod5_dirs.entry(id.clone()) {
+            Entry::Occupied(mut e) => {
+                let list = e.get_mut();
+                list.extend(pod5_dirs);
+
+                let mut set = HashSet::new();
+                // Take ownership, replace with deduped
+                let mut deduped = Vec::new();
+                for pod in list.drain(..) {
+                    if set.insert(pod.clone()) {
+                        // requires PodDir: Clone + Eq + Hash
+                        deduped.push(pod);
+                    }
+                }
+                *list = deduped;
+
+                debug!("pod5_dirs extended for: {id}");
+            }
+            Entry::Vacant(e) => {
+                e.insert(pod5_dirs);
+            }
+        }
+    }
+
+    pub fn stats(&self) {
+        println!("sample sheets: {}", self.sample_sheets.len());
+        println!("pore activities: {}", self.pore_activities.len());
+        println!("Throughputs: {}", self.throughputs.len());
+    }
+
+    pub fn load_sample_sheets(&mut self) -> std::io::Result<usize> {
+        load_all(
+            &mut self.sample_sheets,
+            &self.archive_dir,
+            "sample_sheet.json",
+        )
+    }
+
+    pub fn load_pod5_dirs(&mut self) -> std::io::Result<usize> {
+        load_all(&mut self.pod5_dirs, &self.archive_dir, "pod5_dirs.json")
+    }
+
+    pub fn archive(&self, force: bool) -> std::io::Result<()> {
+        self.archive_sample_sheets(force)?;
+        self.archive_pore_activities(force)?;
+        self.archive_throughputs(force)?;
+        self.archive_pod5_dirs(force)
+    }
+
+    pub fn archive_sample_sheets(&self, force: bool) -> std::io::Result<()> {
+        archive_map(
+            &self.sample_sheets,
+            &self.archive_dir,
+            "sample_sheet.json",
+            force,
+        )
+    }
+
+    fn archive_pore_activities(&self, force: bool) -> std::io::Result<()> {
+        archive_map(
+            &self.pore_activities,
+            &self.archive_dir,
+            "pore_activities.json",
+            force,
+        )
+    }
+
+    fn archive_throughputs(&self, force: bool) -> std::io::Result<()> {
+        archive_map(
+            &self.throughputs,
+            &self.archive_dir,
+            "throughputs.json",
+            force,
+        )
+    }
+
+    fn archive_pod5_dirs(&self, force: bool) -> std::io::Result<()> {
+        archive_map(&self.pod5_dirs, &self.archive_dir, "pod5_dirs.json", force)
+    }
+
+    pub fn get_pore_activities(&mut self, key: &str) -> Option<&Vec<PoreStateEntry>> {
+        get_or_load(
+            &mut self.pore_activities,
+            &self.archive_dir,
+            key,
+            "pore_activities.json",
+        )
+    }
+
+    pub fn get_throughputs(&mut self, key: &str) -> Option<&Vec<ThroughputEntry>> {
+        get_or_load(
+            &mut self.throughputs,
+            &self.archive_dir,
+            key,
+            "throughputs.json",
+        )
+    }
+}
+
+pub fn archive_map<T: serde::Serialize>(
+    map: &HashMap<String, T>,
+    archive_dir: &std::path::Path,
+    filename: &str,
+    force: bool,
+) -> std::io::Result<()> {
+    use std::fs::{self, File};
+    use std::io::Write;
+    use std::path::PathBuf;
+
+    for (key, entries) in map {
+        let folder: PathBuf = archive_dir.join(key);
+        fs::create_dir_all(&folder)?;
+
+        let file_path = folder.join(filename);
+        if file_path.exists() && !force {
+            continue;
+        }
+        let json = serde_json::to_string_pretty(entries).expect("Serialization failed");
+        let mut file = File::create(&file_path)?;
+        file.write_all(json.as_bytes())?;
+    }
+    Ok(())
+}
+
+pub fn get_or_load<'a, T>(
+    map: &'a mut HashMap<String, T>,
+    archive_dir: &Path,
+    key: &str,
+    filename: &str,
+) -> Option<&'a T>
+where
+    T: DeserializeOwned,
+{
+    if map.contains_key(key) {
+        return map.get(key);
+    }
+    let path = archive_dir.join(key).join(filename);
+    let mut file = File::open(&path).ok()?;
+    let mut data = Vec::new();
+    file.read_to_end(&mut data).ok()?;
+    let entries: T = serde_json::from_slice(&data).ok()?;
+
+    // Insert and immediately return reference using entry API
+    Some(map.entry(key.to_string()).or_insert(entries))
+}
+
+pub fn load_all<T>(
+    map: &mut HashMap<String, T>,
+    archive_dir: &Path,
+    filename: &str,
+) -> std::io::Result<usize>
+where
+    T: DeserializeOwned,
+{
+    map.clear();
+    let mut count = 0;
+    for entry in fs::read_dir(archive_dir)? {
+        let entry = entry?;
+        if entry.file_type()?.is_dir() {
+            let key = entry.file_name().into_string().ok().unwrap_or_default();
+            let file_path = entry.path().join(filename);
+            if file_path.exists() {
+                let data = fs::read(&file_path)?;
+                if let Ok(vec) = serde_json::from_slice::<T>(&data) {
+                    map.insert(key, vec);
+                    count += 1;
+                }
+            }
+        }
+    }
+    Ok(count)
+}
+
+/// Scans a `.tar` archive containing a MinKNOW sequencing experiment.
+///
+/// This function opens a TAR archive, searches for the `sample_sheet` CSV file,
+/// extracts its metadata, and parses it into a `MinKnowSampleSheet`.
+/// All other entries in the archive are collected with their path, size, and modification time.
+///
+/// # Arguments
+/// - `tar_path`: Path to the `.tar` archive file.
+///
+/// # Returns
+/// - `Ok(ExperimentData)`: A tuple containing the parsed sample sheet and the list of file metadata.
+/// - `Err`: If the archive is unreadable, malformed, or missing the `sample_sheet`.
+///
+/// # Archive Requirements
+/// - Must contain exactly one file matching `"sample_sheet"` in its path.
+/// - The sample sheet must contain a valid CSV header and a single data row.
+///
+/// # Errors
+/// - Fails if the archive can't be opened or read.
+/// - Fails if any entry is malformed (e.g., missing timestamp).
+/// - Fails if no sample sheet is found or if it is malformed.
+///
+/// # Example
+/// ```no_run
+/// let (sample_sheet, files) = scan_archive("archive.tar")?;
+/// println!("Sample ID: {}", sample_sheet.sample_id);
+/// println!("Total files in archive: {}", files.len());
+/// ```
+fn scan_archive(tar_path: &PathBuf) -> anyhow::Result<ExperimentData> {
+    info!("Scanning archive: {}", tar_path.display());
+
+    let file = File::open(tar_path)
+        .with_context(|| format!("Failed to open tar file at path: {}", tar_path.display()))?;
+
+    let mut archive = tar::Archive::new(file);
+    let mut result = Vec::new();
+
+    let mut sample_sheet: Option<String> = None;
+
+    let mut throughput = None;
+    let mut pore_activity = None;
+
+    // Iterate through the entries in the archive
+    for entry in archive.entries()? {
+        let mut file = entry.context("Failed to read an entry from the tar archive")?;
+
+        // Extract file properties safely
+        let size = file.size();
+        let modified = file
+            .header()
+            .mtime()
+            .context("Failed to get modification time")?;
+        let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
+
+        let path = file
+            .path()
+            .context("Failed to get file path from tar entry")?;
+
+        let path_str = path.to_string_lossy().into_owned();
+        debug!("archive contains: {path_str}");
+
+        if path_str.contains("pore_activity") {
+            pore_activity = Some(
+                parse_pore_activity_from_reader(&mut file)
+                    .context("Failed to read pore_activity data: {path}")?,
+            );
+        } else if path_str.contains("sample_sheet") {
+            let mut buffer = String::new();
+            file.read_to_string(&mut buffer)
+                .context("Failed to read file contents as string")?;
+            sample_sheet = Some(buffer);
+        } else if path_str.contains("throughput") {
+            throughput = Some(
+                parse_throughput_from_reader(&mut file)
+                    .context("Failed to read throughput data: {path}")?,
+            );
+        }
+        result.push((path_str, size, modified_utc));
+    }
+
+    let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
+        "No sample sheet detected in archive: {}",
+        tar_path.display()
+    ))?;
+    let (_, data) = sample_sheet
+        .split_once("\n")
+        .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
+    let sample_sheet: MinKnowSampleSheet = data
+        .try_into()
+        .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
+    Ok((sample_sheet, pore_activity, throughput, result))
+}
+
+/// Represents the result of scanning a MinKNOW experiment source.
+///
+/// This tuple includes:
+/// - `MinKnowSampleSheet`: Parsed metadata describing the experiment/sample.
+/// - `Vec<(String, u64, DateTime<Utc>)>`: A list of files with:
+///   - `String`: file path
+///   - `u64`: file size in bytes
+///   - `DateTime<Utc>`: last modification time (UTC)
+type ExperimentData = (
+    MinKnowSampleSheet,
+    Option<Vec<PoreStateEntry>>,
+    Option<Vec<ThroughputEntry>>,
+    Vec<(String, u64, DateTime<Utc>)>,
+);

+ 23 - 1
src/collection/tests.rs

@@ -1,10 +1,15 @@
 use log::info;
 
 use crate::{
-    collection::bam::{bam_composition, WGSBam, WGSBamStats},
+    collection::{
+        bam::{bam_composition, WGSBam, WGSBamStats},
+        run::PromRuns,
+    },
     config::Config,
 };
 
+use super::run::PromRun;
+
 fn init() {
     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
         .is_test(true)
@@ -68,3 +73,20 @@ fn bam_c() -> anyhow::Result<()> {
 
     Ok(())
 }
+
+#[test]
+fn samplerun() -> anyhow::Result<()> {
+    init();
+
+    // let r = PromRun::from_local_dir("/home/prom/mnt/store/20250428_AgC/FIANXA-DG-N06_MONCA-DG-N07/20250428_1600_2E_PBC92750_b7090451")?;
+    // println!("{}", r.id());
+
+    let mut rs = PromRuns::new("/data/promethion-runs-metadata".into());
+    // rs.scan_local("/home/prom/mnt/store/20250428_AgC")?;
+    rs.scan_archives_dir("/home/prom/mnt/lto")?;
+    rs.archive(true)?;
+
+    // rs.load_sample_sheets()?;
+    rs.stats();
+    Ok(())
+}

+ 10 - 0
src/config.rs

@@ -267,6 +267,16 @@ impl Config {
         )
     }
 
+    pub fn solo_bam_info_json(&self, id: &str, time: &str) -> String {
+        format!(
+            "{}/{}_{}_{}_info.json",
+            self.solo_dir(id, time),
+            id,
+            time,
+            self.reference_name,
+        )
+    }
+
     pub fn tumoral_bam(&self, id: &str) -> String {
         format!(
             "{}/{}_{}_{}.bam",