use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; use csv::ReaderBuilder; use glob::glob; use hashbrown::HashMap; use log::{info, warn}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::{ fmt::Display, fs::{self, File, Metadata}, io::{self, BufRead}, os::unix::fs::MetadataExt, path::PathBuf, }; use crate::io::pod5_infos::Pod5Info; #[derive(Debug, Clone)] pub struct Pod5 { pub path: PathBuf, pub pod5_type: Pod5Type, pub run_name: String, pub flowcell_name: String, pub file_metadata: Metadata, } #[derive(Debug, Clone, PartialEq)] pub enum Pod5Type { Raw, Demuxed, } impl Display for Pod5Type { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match self { Pod5Type::Raw => "raw", Pod5Type::Demuxed => "demuxed", }; f.write_str(s) } } #[derive(Debug, Clone)] pub struct Pod5Config { pub base_dir: String, pub type_raw: String, pub type_demuxed: String, pub run_dir_n: u8, pub flowcell_dir_n: u8, } impl Default for Pod5Config { fn default() -> Self { Self { base_dir: "/data/run_data".to_string(), type_raw: "/pod5/".to_string(), type_demuxed: "/pod5_pass/".to_string(), run_dir_n: 0, flowcell_dir_n: 1, } } } impl Pod5 { pub fn from_path(path: &PathBuf, config: &Pod5Config) -> Result { let s = path .to_str() .context("Can't convert PathBuf to str {path:?}")?; let pod5_type = if s.contains(&config.type_raw) { Pod5Type::Raw } else if s.contains(&config.type_demuxed) { Pod5Type::Demuxed } else { return Err(anyhow!("Can't find the pod5 type {s}")); }; let file_metadata = fs::metadata(path)?; let sr = s.replace(&config.base_dir, ""); let components: Vec<&str> = sr.split('/').filter(|c| !c.is_empty()).collect(); let run_name = components .get(config.run_dir_n as usize) .context("Can't get run_name")? .to_string(); let flowcell_name = components .get(config.flowcell_dir_n as usize) .context("Can't get flowcell_name")? .to_string(); Ok(Self { path: path.to_path_buf(), pod5_type, run_name, flowcell_name, file_metadata, }) } } pub fn list_pod_files(dir: &str) -> Result> { let pattern = format!("{}/**/*.pod5", dir); let mut pod_files = Vec::new(); let conf = Pod5Config { base_dir: if dir.ends_with('/') { dir.to_string() } else { format!("{dir}/") }, ..Pod5Config::default() }; for entry in glob(&pattern).expect("Failed to read glob pattern") { match entry { Ok(path) => { let p = path.to_str().context("Can't parse path to string {path}")?; if p.contains("/pod5_fail/") || p.contains("/pod5_skip/") { continue; } match Pod5::from_path(&path, &conf) { Ok(pod5) => pod_files.push(pod5), Err(e) => warn!("{e}"), } } Err(e) => warn!("Error: {:?}", e), } } Ok(pod_files) } #[derive(Debug)] pub struct Run { pub run_name: String, pub flowcells: Vec, } #[derive(Debug, Clone)] pub struct FlowCell { pub flowcell_name: String, pub corrected_name: String, pub cases: Vec, pub run_name: String, pub pod5_type: Pod5Type, pub pod5_info: Pod5Info, pub pod5: Vec, } // impl FlowCell { // pub fn cases_pod5_dir(&self) -> Vec { // match self.pod5_type { // Pod5Type::Raw => { // let p = self.pod5.first().unwrap(); // vec![p.path.parent().unwrap().to_path_buf()] // }, // Pod5Type::Demuxed => { // self.cases.iter().map(|c| { // let str_barcode = format!("barcode{}", c.barcode); // }) // }, // } // } // } #[derive(Debug, Default)] pub struct Pod5Collection { pub importation_date: DateTime, pub runs: Vec, pub bam_dir: String, pub pod5_dir: String, } #[derive(Debug, Clone, Default)] pub struct FlowCellCase { pub id: String, pub time_point: String, pub barcode: String, pub pod_dir: PathBuf, // pub basecalled: Option, } impl Pod5Collection { pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result { let pod5 = list_pod_files(pod5_dir)?; info!("n pod5 {}", pod5.len()); let mut fc: HashMap> = HashMap::new(); for pod in pod5 { let k = format!("{}-{}", pod.run_name, pod.flowcell_name); fc.entry(k).or_default().push(pod); } let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?; let flow_cells: Vec = fc .par_values() .map(|v| { let first = &v[0]; let pod5_info = Pod5Info::from_pod5(first.path.to_str().unwrap()); let flowcell_name = first.flowcell_name.clone(); let sel: Vec = corrected_fc .iter() .filter(|e| e.flow_cell == flowcell_name) .cloned() .collect(); let mut corrected_name: Vec = sel .clone() .into_iter() .map(|e| e.ref_flow_cell) .filter(|e| !e.is_empty()) .collect(); corrected_name.dedup(); if corrected_name.len() > 1 { panic!("Multiple corrected flow_cells for {v:?}"); } let corrected_name = if !corrected_name.is_empty() { corrected_name.first().unwrap().to_string() } else { "".to_string() }; let cases: Vec = sel .iter() .map(|e| { let pod_dir = match first.pod5_type { Pod5Type::Raw => first.path.parent().unwrap().to_path_buf(), Pod5Type::Demuxed => { let mut bc_dir = first.path.parent().unwrap().parent().unwrap().to_path_buf(); bc_dir .push(format!("barcode{}", e.barcode_number.replace("NB", ""))); bc_dir } }; FlowCellCase { id: e.id.clone(), time_point: e.time_point.clone(), barcode: e.barcode_number.clone(), pod_dir, } }) .collect(); FlowCell { flowcell_name, corrected_name, cases, run_name: first.run_name.clone(), pod5_type: first.pod5_type.clone(), pod5_info, pod5: v.to_vec(), } }) .collect(); let mut runs = HashMap::new(); for fc in flow_cells { runs.entry(fc.run_name.clone()) .or_insert_with(Vec::new) .push(fc); } let runs: Vec = runs .into_values() .map(|v| Run { run_name: v[0].run_name.clone(), flowcells: v.to_vec(), }) .collect(); Ok(Self { importation_date: Utc::now(), runs, bam_dir: bam_dir.to_string(), pod5_dir: pod5_dir.to_string(), }) } pub fn print_info(&self) { self.runs.iter().for_each(|run| { run.flowcells.iter().for_each(|fc| { let total_size: u64 = fc.pod5.iter().map(|p| p.file_metadata.size()).sum(); let n_files = fc.pod5.len(); let dates: Vec> = fc .pod5 .iter() .map(|p| p.file_metadata.modified().unwrap().into()) .collect(); let from = dates.iter().min().unwrap(); let to = dates.iter().max().unwrap(); let s = [ run.run_name.clone(), from.to_string(), to.to_string(), n_files.to_string(), total_size.to_string(), fc.flowcell_name.to_string(), fc.pod5_type.to_string(), fc.pod5_info.acquisition_id.clone(), format!("{:?}", fc.cases), ] .join("\t"); println!("{s}"); }); }); } // pub fn check_local(&self) -> anyhow::Result<()> { // let mut res = Vec::new(); // for run in self.runs.iter() { // for fc in run.flowcells.iter() { // for c in fc.cases.iter() { // let bases_called = if let Some(b) = c.basecalled { // if b { // "✅".to_string() // } else { // "❌".to_string() // } // } else { // "❌".to_string() // }; // // let s = [ // c.id.to_string(), // c.time_point.to_string(), // c.barcode.to_string(), // run.run_name.clone(), // fc.flowcell_name.to_string(), // fc.pod5_type.to_string(), // fc.pod5_info.acquisition_id.clone(), // bases_called, // ] // .join("\t"); // res.push(s); // } // } // } // res.sort(); // println!("{}", res.join("\n")); // Ok(()) // } // pub fn fc_done(&self) { // for run in self.runs.iter() { // for fc in run.flowcells.iter() { // let n_called = fc // .cases // .iter() // .filter(|c| if let Some(b) = c.basecalled { b } else { false }) // .count(); // if n_called != 0 && n_called == fc.cases.len() { // let s = [ // format!("{}/{}", run.run_name, fc.flowcell_name), // fc.pod5_info.acquisition_id.to_string(), // format!("{:#?}", fc.cases), // ] // .join("\t"); // println!("{s}"); // } // } // } // } // pub fn todo(&self) { // let run_dir = &self.pod5_dir; // for run in self.runs.iter() { // for fc in run.flowcells.iter() { // let to_call: Vec<_> = fc // .cases // .iter() // .filter(|c| if let Some(b) = c.basecalled { !b } else { true }) // .collect(); // // if !to_call.is_empty() { // if fc.pod5_type == Pod5Type::Raw && to_call.len() != fc.cases.len() { // println!("No solution for: {}/{}", run.run_name, fc.flowcell_name); // } else { // match fc.pod5_type { // Pod5Type::Raw => { // let cases: Vec = to_call // .iter() // .map(|c| { // let bc = c.barcode.replace("NB", ""); // let tp = c.time_point.to_lowercase(); // [bc, c.id.to_string(), tp].join(" ") // }) // .collect(); // println!( // "from_mux.sh {}/{}/{} {}", // run_dir, // run.run_name, // fc.flowcell_name, // cases.join(" ") // ); // } // Pod5Type::Demuxed => to_call.iter().for_each(|c| { // let bc = c.barcode.replace("NB", ""); // let tp = c.time_point.to_lowercase(); // let bam = format!( // "{}/{}/{}/{}_{}_hs1.bam", // self.bam_dir, c.id, c.time_point, c.id, c.time_point // ); // if PathBuf::from(bam).exists() { // let pod_dir: Vec = fc // .pod5 // .iter() // .filter(|p| { // p.path.contains(&format!("barcode{}", bc.clone())) // }) // .take(1) // .map(|p| p.path.to_string()) // .collect(); // // let pod_dir = pod_dir.first().unwrap(); // let mut pod_dir = PathBuf::from(pod_dir); // pod_dir.pop(); // // // TODO sheduler // println!( // "complete_bam.sh {} {} {}", // c.id, // tp, // pod_dir.to_string_lossy() // ) // } else { // let pod_dir: Vec = fc // .pod5 // .iter() // .filter(|p| { // p.path.contains(&format!("barcode{}", bc.clone())) // }) // .take(1) // .map(|p| p.path.to_string()) // .collect(); // // let pod_dir = pod_dir.first().unwrap(); // let mut pod_dir = PathBuf::from(pod_dir); // pod_dir.pop(); // // println!( // "dorado.sh {} {} {}", // c.id, // tp, // pod_dir.to_string_lossy() // ) // } // }), // }; // } // } // } // } // } pub fn ids(&self) -> Vec { let mut ids: Vec = self .runs .iter() .flat_map(|r| { r.flowcells .iter() .flat_map(|f| { f.cases .iter() .map(|c| c.id.clone()) .collect::>() }) .collect::>() }) .collect(); ids.sort(); ids.dedup(); ids } } #[derive(Debug, Deserialize, Clone)] pub struct FCLine { pub id: String, pub time_point: String, pub barcode_number: String, pub flow_cell: String, pub run: String, pub path: String, pub ref_flow_cell: String, } pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result> { let file = File::open(file_path)?; let mut rdr = ReaderBuilder::new() .delimiter(b'\t') .has_headers(true) .from_reader(file); let mut records = Vec::new(); for result in rdr.deserialize() { let mut record: FCLine = result?; // formating record.time_point = record.time_point.to_lowercase(); record.id = record.id.to_uppercase(); records.push(record); } Ok(records) } #[derive(Debug, Serialize, Deserialize)] struct MinKnowSampleSheet { pub protocol_run_id: String, pub position_id: String, pub flow_cell_id: String, pub sample_id: String, pub experiment_id: String, pub flow_cell_product_code: String, pub kit: String, } impl TryFrom<&str> for MinKnowSampleSheet { type Error = anyhow::Error; fn try_from(value: &str) -> anyhow::Result { let cells: Vec<&str> = value.split(",").collect(); if cells.len() != 7 { return Err(anyhow::anyhow!( "Number of cells not equal to definition. {value}" )); } Ok(Self { protocol_run_id: cells[0].to_string(), position_id: cells[1].to_string(), flow_cell_id: cells[2].to_string(), sample_id: cells[3].to_string(), experiment_id: cells[4].to_string(), flow_cell_product_code: cells[5].to_string(), kit: cells[6].to_string(), }) } } impl MinKnowSampleSheet { pub fn from_path(path: &str) -> anyhow::Result { let file = File::open(path).map_err(|e| format!("Can't open file: {path}\n{e}"))?; let reader = io::BufReader::new(file); for (i, line) in reader.lines().enumerate() { let line = line.map_err(|e| format!("Error parsing line: {line:?}\n\t{e}"))?; if i == 0 && line != "protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit" { return Err(anyhow::anyhow!("File header doesnt correspond to MinKnwo sample sheet: {line}")); } else if i == 1 { return Ok(line.as_str().try_into()?); } else { return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format.")); } } return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format.")); } }