ソースを参照

archive and local flowcells with identification

Thomas 10 ヶ月 前
コミット
fa16660814
3 ファイル変更378 行追加119 行削除
  1. 305 93
      src/collection/pod5.rs
  2. 64 6
      src/helpers.rs
  3. 9 20
      src/lib.rs

+ 305 - 93
src/collection/pod5.rs

@@ -1,5 +1,5 @@
 use anyhow::{anyhow, Context, Result};
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, TimeZone, Utc};
 use csv::ReaderBuilder;
 use glob::glob;
 use hashbrown::HashMap;
@@ -11,12 +11,15 @@ use std::{
     fmt::Display,
     fs::{self, File, Metadata},
     hash::{Hash, Hasher},
-    io::{self, BufRead},
+    io::{BufReader, Read},
     os::unix::fs::MetadataExt,
-    path::PathBuf,
+    path::{Path, PathBuf},
 };
 
-use crate::io::pod5_infos::Pod5Info;
+use crate::{
+    helpers::{find_files, list_directories},
+    io::pod5_infos::Pod5Info,
+};
 
 #[derive(Debug, Clone)]
 pub struct Pod5 {
@@ -685,30 +688,302 @@ pub struct Pod5Run {
 }
 
 #[derive(Debug, Serialize, Deserialize, Clone)]
-pub struct Pod5Archived {
+pub enum FlowCellExperiment {
+    WGSUnalignedMux(String),
+    WGSUnalignedDemux(String),
+}
+
+impl FlowCellExperiment {
+    pub fn from_path(flowcell_path: &str) -> Option<Self> {
+        for dir in list_directories(flowcell_path).ok().unwrap() {
+            if dir == "pod5" {
+                return Some(FlowCellExperiment::WGSUnalignedMux(dir.to_string()));
+            }
+            if &dir == "pod5_pass" {
+                return Some(FlowCellExperiment::WGSUnalignedDemux(dir.to_string()));
+            }
+        }
+        None
+    }
+
+    pub fn from_pod5_paths(all_paths: &Vec<String>) -> Option<Self> {
+        for path in all_paths {
+            if path.ends_with("/pod5/") || path.ends_with("/pod5") {
+                return Some(FlowCellExperiment::WGSUnalignedMux(path.to_string()));
+            }
+
+            if path.ends_with("/pod5_pass/") || path.ends_with("/pod5_pass") {
+                return Some(FlowCellExperiment::WGSUnalignedMux(path.to_string()));
+            }
+        }
+        None
+    }
+
+    pub fn inner(&self) -> &str {
+        match self {
+            FlowCellExperiment::WGSUnalignedMux(v) => v,
+            FlowCellExperiment::WGSUnalignedDemux(v) => v,
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct FlowCellArchived {
     pub archive_id: String,
     pub last_seen: DateTime<Utc>,
     pub run: MinKnowSampleSheet,
 }
 
+impl FlowCellArchived {
+    pub fn from_tar(tar_path: &str) -> Result<Vec<(String, u64, u64)>> {
+        // Open the tar file safely with context for errors
+        let file = File::open(tar_path)
+            .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
+
+        let mut archive = tar::Archive::new(file);
+        let mut result = Vec::new();
+
+        // Iterate through the entries in the archive
+        for entry in archive.entries_with_seek()? {
+            let file = entry.context("Failed to read an entry from the tar archive")?;
+
+            // Extract file properties safely
+            let size = file.size();
+            let modified = file
+                .header()
+                .mtime()
+                .context("Failed to get modification time")?;
+            let path = file
+                .path()
+                .context("Failed to get file path from tar entry")?
+                .to_string_lossy()
+                .into_owned();
+
+            println!("{path}");
+            result.push((path, size, modified));
+        }
+
+        Ok(result)
+    }
+}
+
+pub fn scan_archive(
+    tar_path: &str,
+) -> anyhow::Result<(MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>)> {
+    info!("Scanning archive: {tar_path}");
+
+    let file = File::open(tar_path)
+        .with_context(|| format!("Failed to open tar file at path: {}", tar_path))?;
+
+    let mut archive = tar::Archive::new(file);
+    let mut result = Vec::new();
+    let mut sample_sheet: Option<String> = None;
+
+    // Iterate through the entries in the archive
+    for entry in archive.entries_with_seek()? {
+        let mut file = entry.context("Failed to read an entry from the tar archive")?;
+
+        // Extract file properties safely
+        let size = file.size();
+        let modified = file
+            .header()
+            .mtime()
+            .context("Failed to get modification time")?;
+        let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
+
+        let path = file
+            .path()
+            .context("Failed to get file path from tar entry")?
+            .to_string_lossy()
+            .into_owned();
+
+        if path.contains("sample_sheet") {
+            let mut buffer = String::new();
+            file.read_to_string(&mut buffer)
+                .context("Failed to read file contents as string")?;
+            sample_sheet = Some(buffer);
+        }
+
+        result.push((path, size, modified_utc));
+    }
+
+    let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!(
+        "No sample sheet detected in archive: {tar_path}"
+    ))?;
+    let (_, data) = sample_sheet
+        .split_once("\n")
+        .ok_or(anyhow::anyhow!("Can't parse sample sheet data"))?;
+    let sample_sheet: MinKnowSampleSheet = data
+        .try_into()
+        .map_err(|e| anyhow::anyhow!("Can't parse sample sheet.\n{e}"))?;
+    Ok((sample_sheet, result))
+}
+
+pub fn scan_local(
+    dir: &str,
+) -> anyhow::Result<(MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>)> {
+    let mut result = Vec::new();
+    let mut sample_sheet: Option<String> = None;
+
+    for entry in glob(&format!("{}/**/*", dir))? {
+        let file = entry.context("Failed to read an entry from the tar archive")?;
+
+        // Extract file properties safely
+        let metadata = file.metadata().context(format!(
+            "Failed to access file metadata: {}",
+            file.display()
+        ))?;
+        let size = metadata.size();
+        let modified = metadata.mtime();
+        let modified_utc: DateTime<Utc> = Utc.timestamp_opt(modified as i64, 0).unwrap();
+
+        let path = file.to_string_lossy().into_owned();
+
+        if path.contains("sample_sheet") {
+            sample_sheet = Some(path.clone());
+        }
+
+        result.push((path, size, modified_utc));
+    }
+
+    let sample_sheet = sample_sheet.ok_or(anyhow::anyhow!("No sample sheet detected in: {dir}"))?;
+    let sample_sheet = MinKnowSampleSheet::from_path(&sample_sheet)
+        .context(anyhow::anyhow!("Can't parse sample sheet data"))?;
+
+    Ok((sample_sheet, result))
+}
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct FlowCel {
+    pub flowcell_id: String,
+    pub experiment: FlowCellExperiment,
+    pub location: FlowCellLocation,
+    pub modified: DateTime<Utc>,
+    pub run: MinKnowSampleSheet,
+    pub pod5_size: usize,
+    pub n_pod5: usize,
+    pub cases: Vec<IdInput>,
+}
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub enum FlowCellLocation {
+    Local(String),
+    Archived(String),
+}
+
+impl FlowCel {
+    pub fn new(
+        sample_sheet: MinKnowSampleSheet,
+        location: FlowCellLocation,
+        files: Vec<(String, u64, DateTime<Utc>)>,
+    ) -> anyhow::Result<Self> {
+        let flowcell_id = format!("{}/{}", sample_sheet.experiment_id, sample_sheet.sample_id);
+
+        let pod5s: Vec<(String, u64, DateTime<Utc>)> = files
+            .iter()
+            .filter(|(path, _, _)| path.ends_with(".pod5"))
+            .cloned()
+            .collect();
+        let n_pod5 = pod5s.len();
+
+        let experiment = FlowCellExperiment::from_pod5_paths(
+            &files.iter().map(|(p, _, _)| p.to_string()).collect(),
+        )
+        .ok_or(anyhow::anyhow!(
+            "Can't find experiment type for {flowcell_id}"
+        ))?;
+
+        let (pod5_size, modified): (usize, DateTime<Utc>) = files
+            .into_iter()
+            .filter(|(path, _, _)| path.ends_with(".pod5"))
+            .fold(
+                (0usize, DateTime::<Utc>::MIN_UTC),
+                |(acc_size, acc_time), (_, size, time)| {
+                    (
+                        acc_size + size as usize,
+                        if acc_time < time { time } else { acc_time },
+                    )
+                },
+            );
+
+        Ok(Self {
+            flowcell_id,
+            experiment,
+            location,
+            modified,
+            run: sample_sheet,
+            pod5_size,
+            n_pod5,
+            cases: Vec::new(),
+        })
+    }
+}
 #[derive(Debug, Serialize, Deserialize, Clone)]
-pub struct Pod5s {
-    pub locals: Vec<MinKnowSampleSheet>,
-    pub archived: Vec<Pod5Archived>,
-    pub inputs: IdsInput,
-    pub runs: Vec<Pod5Run>,
-}
-
-impl Pod5s {
-    pub fn load_from_local(local_run_dir: &str, inputs_path: &str) -> anyhow::Result<Self> {
-        let pattern = format!("{local_run_dir}/*/*/*/sample_sheet_*.csv");
-        let locals: Vec<MinKnowSampleSheet> = glob(&pattern)
-            .expect("Failed to read glob pattern")
-            .par_bridge()
+pub struct FlowCells {
+    pub flow_cells: Vec<FlowCel>,
+}
+
+impl FlowCells {
+    pub fn load(
+        local_run_dir: &str,
+        inputs_path: &str,
+        archive_store_path: &str,
+    ) -> anyhow::Result<Self> {
+        let mut flow_cells: Vec<FlowCel> = if Path::new(archive_store_path).exists() {
+            let file = File::open(archive_store_path)?;
+            serde_json::from_reader(BufReader::new(file))?
+        } else {
+            Vec::new()
+        };
+
+        let sample_sheets = find_files(&format!("{local_run_dir}/**/sample_sheet*"));
+
+        for sample_sheet in sample_sheets {
+            let dir = sample_sheet.parent().ok_or_else(|| {
+                anyhow::anyhow!("Failed to get dir from {}", sample_sheet.display())
+            })?;
+            let dir = dir.to_string_lossy().to_string();
+
+            let (sample_sheet, files) = scan_local(&dir)?;
+            let fc = FlowCel::new(sample_sheet, FlowCellLocation::Local(dir), files)?;
+            flow_cells.push(fc);
+        }
+
+        let inputs = IdsInput::load_json(inputs_path)?;
+        flow_cells.iter_mut().for_each(|fc| {
+            fc.cases = inputs.data.iter().filter(|info| {
+                info.flow_cell == fc.run.sample_id
+                    && info.run == fc.run.experiment_id
+            }).cloned().collect();
+        });
+
+        Ok(Self { flow_cells })
+    }
+
+    pub fn update_archive_from_scan(archive_path: &str, save_path: &str) -> anyhow::Result<()> {
+        let mut all: Vec<FlowCel> = if Path::new(save_path).exists() {
+            let file = File::open(save_path)?;
+            serde_json::from_reader(BufReader::new(file))?
+        } else {
+            Vec::new()
+        };
+        let n_before = 0;
+
+        let pattern = format!("{archive_path}/*.tar");
+
+        let res: Vec<FlowCel> = glob(&pattern)
+            .map_err(|e| anyhow::anyhow!("Failed to read: {pattern}.\n{e}"))?
             .filter_map(|entry| {
                 match entry {
-                    Ok(path) => match MinKnowSampleSheet::from_path(path.to_str().unwrap()) {
-                        Ok(sample_sheet) => return Some(sample_sheet),
+                    Ok(path) => match scan_archive(path.to_str().unwrap()) {
+                        Ok((sample_sheet, files)) => match FlowCel::new(
+                            sample_sheet,
+                            FlowCellLocation::Archived(archive_path.to_string()),
+                            files,
+                        ) {
+                            Ok(r) => return Some(r),
+                            Err(e) => warn!("{e}"),
+                        },
                         Err(e) => warn!("{e}"),
                     },
                     Err(e) => warn!("Error: {:?}", e),
@@ -717,82 +992,19 @@ impl Pod5s {
             })
             .collect();
 
-        // let inputs = load_flowcells_corrected_names(inputs_path)?;
-        let inputs = IdsInput::load_json(inputs_path)?;
-
-        let runs = locals
-            .iter()
-            .flat_map(|local_run| {
-                inputs
-                    .data
-                    .iter()
-                    .filter(|info| {
-                        info.flow_cell == local_run.sample_id && info.run == local_run.experiment_id
-                    })
-                    .map(|info| Pod5Run {
-                        protocol_run_id: local_run.protocol_run_id.to_string(),
-                        position_id: local_run.position_id.to_string(),
-                        flow_cell_id: local_run.flow_cell_id.to_string(),
-                        id: info.id.to_string(),
-                        time_point: info.time_point.to_string(),
-                        barcode_number: info.barcode.to_string(),
-                        flow_cell: info.flow_cell.to_string(),
-                        run: info.run.to_string(),
-                        last_pod_dir: (Utc::now(), local_run_dir.to_string()),
-                        archives: Vec::new(),
-                    })
-            })
-            .collect();
+        all.extend(res);
+        all.sort_by(|a, b| a.flowcell_id.cmp(&b.flowcell_id));
+        all.dedup_by_key(|v| v.flowcell_id.clone());
 
-        Ok(Self {
-            locals,
-            archived: Vec::new(),
-            inputs,
-            runs,
-        })
-    }
+        let n_final = all.len();
+        info!("{} new archive discovered.", n_final - n_before);
 
-    pub fn runs_without_input(&self) -> Vec<MinKnowSampleSheet> {
-        self.locals
-            .iter()
-            .filter_map(|local_run| {
-                let input_count = self.count_matching_inputs(local_run);
-
-                if input_count == 0 {
-                    warn!(
-                        "No input information for {}/{}",
-                        local_run.experiment_id, local_run.sample_id
-                    );
-                    Some(local_run.clone())
-                } else if self.is_lacking_inputs(local_run, input_count) {
-                    warn!(
-                        "Lacking input information for {}/{}",
-                        local_run.experiment_id, local_run.sample_id
-                    );
-                    Some(local_run.clone())
-                } else {
-                    None
-                }
-            })
-            .collect()
-    }
+        let json = serde_json::to_string_pretty(&all)
+            .map_err(|e| anyhow::anyhow!("Can't convert into json.\n{e}"))?;
 
-    pub fn count_matching_inputs(&self, local_run: &MinKnowSampleSheet) -> usize {
-        self.inputs
-            .data
-            .iter()
-            .filter(|info| {
-                info.flow_cell == local_run.sample_id && info.run == local_run.experiment_id
-            })
-            .count()
-    }
+        fs::write(save_path, json)
+            .map_err(|e| anyhow::anyhow!("Can't write file: {save_path}.\n{e}"))?;
 
-    pub fn is_lacking_inputs(&self, local_run: &MinKnowSampleSheet, input_count: usize) -> bool {
-        let parts: Vec<&str> = local_run.sample_id.split(&['_', '-']).collect();
-        if parts.len() % 3 == 0 {
-            let expected_count = parts.len() / 3;
-            return expected_count != input_count;
-        }
-        false
+        Ok(())
     }
 }

+ 64 - 6
src/helpers.rs

@@ -1,4 +1,6 @@
 use anyhow::Context;
+use chrono::{DateTime, TimeZone, Utc};
+use glob::glob;
 use serde::{Deserialize, Serialize};
 use std::{
     cmp::Ordering,
@@ -7,6 +9,7 @@ use std::{
     iter::Sum,
     ops::{Add, Div},
     path::{Path, PathBuf},
+    time::{SystemTime, UNIX_EPOCH},
 };
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
@@ -286,7 +289,7 @@ where
     let sum: T = values.iter().copied().sum();
 
     // Calculate and return the average as f64
-   sum.into() / count as f64
+    sum.into() / count as f64
 }
 
 pub fn app_storage_dir() -> anyhow::Result<PathBuf> {
@@ -294,12 +297,11 @@ pub fn app_storage_dir() -> anyhow::Result<PathBuf> {
     let app_dir = dirs::data_dir()
         .context("Failed to get data directory")?
         .join(app_name);
-    
+
     if !app_dir.exists() {
-        fs::create_dir_all(&app_dir)
-            .context("Failed to create application directory")?;
+        fs::create_dir_all(&app_dir).context("Failed to create application directory")?;
     }
-    
+
     Ok(app_dir)
 }
 use blake3::Hasher as Blake3Hasher;
@@ -318,7 +320,7 @@ impl Hasher for Blake3Hash {
     }
 }
 
-/// Default Hasher 
+/// Default Hasher
 #[derive(Default, Clone)]
 pub struct Blake3BuildHasher;
 
@@ -347,3 +349,59 @@ impl Hash128 {
         self.0
     }
 }
+
+pub fn get_dir_size(path: &Path) -> std::io::Result<u64> {
+    let mut total_size = 0;
+    if path.is_dir() {
+        for entry in fs::read_dir(path)? {
+            let entry = entry?;
+            let path = entry.path();
+            if path.is_file() {
+                total_size += path.metadata()?.len();
+            } else if path.is_dir() {
+                total_size += get_dir_size(&path)?;
+            }
+        }
+    }
+    Ok(total_size)
+}
+
+// "**/*.pod5"
+pub fn find_files(pattern: &str) -> Vec<PathBuf> {
+    let mut result = Vec::new();
+
+    glob(pattern)
+        .expect("Failed to read glob pattern")
+        .for_each(|entry| {
+            if let Ok(path) = entry {
+                result.push(path);
+            }
+        });
+
+    result
+}
+
+fn system_time_to_utc(system_time: SystemTime) -> Option<DateTime<Utc>> {
+    system_time
+        .duration_since(UNIX_EPOCH)
+        .ok()
+        .map(|duration| Utc.timestamp(duration.as_secs() as i64, duration.subsec_nanos()))
+}
+
+pub fn list_directories(dir_path: &str) -> std::io::Result<Vec<String>> {
+    let mut directories = Vec::new();
+
+    for entry in fs::read_dir(dir_path)? {
+        let entry = entry?;
+        let path = entry.path();
+
+        // Check if the path is a directory
+        if path.is_dir() {
+            if let Some(dir_name) = path.file_name().and_then(|name| name.to_str()) {
+                directories.push(dir_name.to_string());
+            }
+        }
+    }
+
+    Ok(directories)
+}

+ 9 - 20
src/lib.rs

@@ -29,7 +29,7 @@ mod tests {
 
     use annotation::{vep::{VepLine, VEP}, Annotations};
     use callers::{nanomonsv::nanomonsv_create_pon, savana::{Savana, SavanaReadCounts}, severus::{Severus, SeverusSolo}};
-    use collection::{bam::{counts_at, counts_ins_at, nt_pileup, WGSBam, WGSBamStats}, pod5::{Pod5, Pod5Config}, Initialize, InitializeSolo, Version};
+    use collection::{bam::{counts_at, counts_ins_at, nt_pileup, WGSBam, WGSBamStats}, Initialize, InitializeSolo, Version};
     use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}};
     use functions::assembler::{Assembler, AssemblerConfig};
     use helpers::estimate_shannon_entropy;
@@ -43,7 +43,7 @@ mod tests {
 
     use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}}, collection::{bam, pod5::{IdsInput, MinKnowSampleSheet, Pod5s}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
+    use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}}, collection::{bam, pod5::{scan_archive, FlowCells}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado};
 
     // export RUST_LOG="debug"
     fn init() {
@@ -627,19 +627,10 @@ mod tests {
 
 
     #[test]
-    fn tar() {
+    fn tar() -> anyhow::Result<()> {
         init();
-        let mut ar = tar::Archive::new(fs::File::open("/data/lto/20241030_CUNVI-DG-N20_SEBZA-DG-N21.tar").unwrap());
-
-        for file in ar.entries().unwrap() {
-            let file = file.unwrap();
-            let p = String::from_utf8(file.path_bytes().into_owned()).unwrap();
-            if p.contains("/pod5/") {
-                let u = Pod5::from_path(&file.path().unwrap().into_owned(), &Pod5Config::default());
-                println!("{p}\n{u:#?}");
-            }
-        }
-
+        scan_archive("/data/lto/20241030_CUNVI-DG-N20_SEBZA-DG-N21.tar")?;
+        Ok(())
     }
 
     #[test]
@@ -694,13 +685,11 @@ mod tests {
     }
 
     #[test]
-    fn load_sample_sheet() -> anyhow::Result<()> {
+    fn load_fc() -> anyhow::Result<()> {
         init();
-
-        let pod5s = Pod5s::load_from_local("/data/run_data", "/data/inputs_ids.json")?;
-        let res = pod5s.runs_without_input();
-        println!("runs: {}", pod5s.runs.len());
-        println!("lacking info runs: {:#?}", res);
+        // FlowCells::load_archive_from_scan("/data/lto", "/data/archives.json")?;
+        let r = FlowCells::load("/data/run_data", "/data/inputs_ids.json", "/data/archives.json")?;
+        println!("{r:#?}");
         Ok(())
     }
 }