| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- use anyhow::{anyhow, Context, Result};
- use chrono::{DateTime, Utc};
- use csv::ReaderBuilder;
- use glob::glob;
- use hashbrown::HashMap;
- use log::{info, warn};
- use rayon::prelude::*;
- use serde::{Deserialize, Serialize};
- use std::{
- fmt::Display,
- fs::{self, File, Metadata},
- io::{self, BufRead},
- os::unix::fs::MetadataExt,
- path::PathBuf,
- };
- use crate::io::pod5_infos::Pod5Info;
- #[derive(Debug, Clone)]
- pub struct Pod5 {
- pub path: PathBuf,
- pub pod5_type: Pod5Type,
- pub run_name: String,
- pub flowcell_name: String,
- pub file_metadata: Metadata,
- }
- #[derive(Debug, Clone, PartialEq)]
- pub enum Pod5Type {
- Raw,
- Demuxed,
- }
- impl Display for Pod5Type {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let s = match self {
- Pod5Type::Raw => "raw",
- Pod5Type::Demuxed => "demuxed",
- };
- f.write_str(s)
- }
- }
- #[derive(Debug, Clone)]
- pub struct Pod5Config {
- pub base_dir: String,
- pub type_raw: String,
- pub type_demuxed: String,
- pub run_dir_n: u8,
- pub flowcell_dir_n: u8,
- }
- impl Default for Pod5Config {
- fn default() -> Self {
- Self {
- base_dir: "/data/run_data".to_string(),
- type_raw: "/pod5/".to_string(),
- type_demuxed: "/pod5_pass/".to_string(),
- run_dir_n: 0,
- flowcell_dir_n: 1,
- }
- }
- }
- impl Pod5 {
- pub fn from_path(path: &PathBuf, config: &Pod5Config) -> Result<Self> {
- let s = path
- .to_str()
- .context("Can't convert PathBuf to str {path:?}")?;
- let pod5_type = if s.contains(&config.type_raw) {
- Pod5Type::Raw
- } else if s.contains(&config.type_demuxed) {
- Pod5Type::Demuxed
- } else {
- return Err(anyhow!("Can't find the pod5 type {s}"));
- };
- let file_metadata = fs::metadata(path)?;
- let sr = s.replace(&config.base_dir, "");
- let components: Vec<&str> = sr.split('/').filter(|c| !c.is_empty()).collect();
- let run_name = components
- .get(config.run_dir_n as usize)
- .context("Can't get run_name")?
- .to_string();
- let flowcell_name = components
- .get(config.flowcell_dir_n as usize)
- .context("Can't get flowcell_name")?
- .to_string();
- Ok(Self {
- path: path.to_path_buf(),
- pod5_type,
- run_name,
- flowcell_name,
- file_metadata,
- })
- }
- }
- pub fn list_pod_files(dir: &str) -> Result<Vec<Pod5>> {
- let pattern = format!("{}/**/*.pod5", dir);
- let mut pod_files = Vec::new();
- let conf = Pod5Config {
- base_dir: if dir.ends_with('/') {
- dir.to_string()
- } else {
- format!("{dir}/")
- },
- ..Pod5Config::default()
- };
- for entry in glob(&pattern).expect("Failed to read glob pattern") {
- match entry {
- Ok(path) => {
- let p = path.to_str().context("Can't parse path to string {path}")?;
- if p.contains("/pod5_fail/") || p.contains("/pod5_skip/") {
- continue;
- }
- match Pod5::from_path(&path, &conf) {
- Ok(pod5) => pod_files.push(pod5),
- Err(e) => warn!("{e}"),
- }
- }
- Err(e) => warn!("Error: {:?}", e),
- }
- }
- Ok(pod_files)
- }
- #[derive(Debug)]
- pub struct Run {
- pub run_name: String,
- pub flowcells: Vec<FlowCell>,
- }
- #[derive(Debug, Clone)]
- pub struct FlowCell {
- pub flowcell_name: String,
- pub corrected_name: String,
- pub cases: Vec<FlowCellCase>,
- pub run_name: String,
- pub pod5_type: Pod5Type,
- pub pod5_info: Pod5Info,
- pub pod5: Vec<Pod5>,
- }
- // impl FlowCell {
- // pub fn cases_pod5_dir(&self) -> Vec<PathBuf> {
- // match self.pod5_type {
- // Pod5Type::Raw => {
- // let p = self.pod5.first().unwrap();
- // vec![p.path.parent().unwrap().to_path_buf()]
- // },
- // Pod5Type::Demuxed => {
- // self.cases.iter().map(|c| {
- // let str_barcode = format!("barcode{}", c.barcode);
- // })
- // },
- // }
- // }
- // }
- #[derive(Debug, Default)]
- pub struct Pod5Collection {
- pub importation_date: DateTime<Utc>,
- pub runs: Vec<Run>,
- pub bam_dir: String,
- pub pod5_dir: String,
- }
- #[derive(Debug, Clone, Default)]
- pub struct FlowCellCase {
- pub id: String,
- pub time_point: String,
- pub barcode: String,
- pub pod_dir: PathBuf,
- // pub basecalled: Option<bool>,
- }
- impl Pod5Collection {
- pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result<Self> {
- let pod5 = list_pod_files(pod5_dir)?;
- info!("n pod5 {}", pod5.len());
- let mut fc: HashMap<String, Vec<Pod5>> = HashMap::new();
- for pod in pod5 {
- let k = format!("{}-{}", pod.run_name, pod.flowcell_name);
- fc.entry(k).or_default().push(pod);
- }
- let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?;
- let flow_cells: Vec<FlowCell> = fc
- .par_values()
- .map(|v| {
- let first = &v[0];
- let pod5_info = Pod5Info::from_pod5(first.path.to_str().unwrap());
- let flowcell_name = first.flowcell_name.clone();
- let sel: Vec<FCLine> = corrected_fc
- .iter()
- .filter(|e| e.flow_cell == flowcell_name)
- .cloned()
- .collect();
- let mut corrected_name: Vec<String> = sel
- .clone()
- .into_iter()
- .map(|e| e.ref_flow_cell)
- .filter(|e| !e.is_empty())
- .collect();
- corrected_name.dedup();
- if corrected_name.len() > 1 {
- panic!("Multiple corrected flow_cells for {v:?}");
- }
- let corrected_name = if !corrected_name.is_empty() {
- corrected_name.first().unwrap().to_string()
- } else {
- "".to_string()
- };
- let cases: Vec<FlowCellCase> = sel
- .iter()
- .map(|e| {
- let pod_dir = match first.pod5_type {
- Pod5Type::Raw => first.path.parent().unwrap().to_path_buf(),
- Pod5Type::Demuxed => {
- let mut bc_dir =
- first.path.parent().unwrap().parent().unwrap().to_path_buf();
- bc_dir
- .push(format!("barcode{}", e.barcode_number.replace("NB", "")));
- bc_dir
- }
- };
- FlowCellCase {
- id: e.id.clone(),
- time_point: e.time_point.clone(),
- barcode: e.barcode_number.clone(),
- pod_dir,
- }
- })
- .collect();
- FlowCell {
- flowcell_name,
- corrected_name,
- cases,
- run_name: first.run_name.clone(),
- pod5_type: first.pod5_type.clone(),
- pod5_info,
- pod5: v.to_vec(),
- }
- })
- .collect();
- let mut runs = HashMap::new();
- for fc in flow_cells {
- runs.entry(fc.run_name.clone())
- .or_insert_with(Vec::new)
- .push(fc);
- }
- let runs: Vec<Run> = runs
- .into_values()
- .map(|v| Run {
- run_name: v[0].run_name.clone(),
- flowcells: v.to_vec(),
- })
- .collect();
- Ok(Self {
- importation_date: Utc::now(),
- runs,
- bam_dir: bam_dir.to_string(),
- pod5_dir: pod5_dir.to_string(),
- })
- }
- pub fn print_info(&self) {
- self.runs.iter().for_each(|run| {
- run.flowcells.iter().for_each(|fc| {
- let total_size: u64 = fc.pod5.iter().map(|p| p.file_metadata.size()).sum();
- let n_files = fc.pod5.len();
- let dates: Vec<DateTime<Utc>> = fc
- .pod5
- .iter()
- .map(|p| p.file_metadata.modified().unwrap().into())
- .collect();
- let from = dates.iter().min().unwrap();
- let to = dates.iter().max().unwrap();
- let s = [
- run.run_name.clone(),
- from.to_string(),
- to.to_string(),
- n_files.to_string(),
- total_size.to_string(),
- fc.flowcell_name.to_string(),
- fc.pod5_type.to_string(),
- fc.pod5_info.acquisition_id.clone(),
- format!("{:?}", fc.cases),
- ]
- .join("\t");
- println!("{s}");
- });
- });
- }
- // pub fn check_local(&self) -> anyhow::Result<()> {
- // let mut res = Vec::new();
- // for run in self.runs.iter() {
- // for fc in run.flowcells.iter() {
- // for c in fc.cases.iter() {
- // let bases_called = if let Some(b) = c.basecalled {
- // if b {
- // "✅".to_string()
- // } else {
- // "❌".to_string()
- // }
- // } else {
- // "❌".to_string()
- // };
- //
- // let s = [
- // c.id.to_string(),
- // c.time_point.to_string(),
- // c.barcode.to_string(),
- // run.run_name.clone(),
- // fc.flowcell_name.to_string(),
- // fc.pod5_type.to_string(),
- // fc.pod5_info.acquisition_id.clone(),
- // bases_called,
- // ]
- // .join("\t");
- // res.push(s);
- // }
- // }
- // }
- // res.sort();
- // println!("{}", res.join("\n"));
- // Ok(())
- // }
- // pub fn fc_done(&self) {
- // for run in self.runs.iter() {
- // for fc in run.flowcells.iter() {
- // let n_called = fc
- // .cases
- // .iter()
- // .filter(|c| if let Some(b) = c.basecalled { b } else { false })
- // .count();
- // if n_called != 0 && n_called == fc.cases.len() {
- // let s = [
- // format!("{}/{}", run.run_name, fc.flowcell_name),
- // fc.pod5_info.acquisition_id.to_string(),
- // format!("{:#?}", fc.cases),
- // ]
- // .join("\t");
- // println!("{s}");
- // }
- // }
- // }
- // }
- // pub fn todo(&self) {
- // let run_dir = &self.pod5_dir;
- // for run in self.runs.iter() {
- // for fc in run.flowcells.iter() {
- // let to_call: Vec<_> = fc
- // .cases
- // .iter()
- // .filter(|c| if let Some(b) = c.basecalled { !b } else { true })
- // .collect();
- //
- // if !to_call.is_empty() {
- // if fc.pod5_type == Pod5Type::Raw && to_call.len() != fc.cases.len() {
- // println!("No solution for: {}/{}", run.run_name, fc.flowcell_name);
- // } else {
- // match fc.pod5_type {
- // Pod5Type::Raw => {
- // let cases: Vec<String> = to_call
- // .iter()
- // .map(|c| {
- // let bc = c.barcode.replace("NB", "");
- // let tp = c.time_point.to_lowercase();
- // [bc, c.id.to_string(), tp].join(" ")
- // })
- // .collect();
- // println!(
- // "from_mux.sh {}/{}/{} {}",
- // run_dir,
- // run.run_name,
- // fc.flowcell_name,
- // cases.join(" ")
- // );
- // }
- // Pod5Type::Demuxed => to_call.iter().for_each(|c| {
- // let bc = c.barcode.replace("NB", "");
- // let tp = c.time_point.to_lowercase();
- // let bam = format!(
- // "{}/{}/{}/{}_{}_hs1.bam",
- // self.bam_dir, c.id, c.time_point, c.id, c.time_point
- // );
- // if PathBuf::from(bam).exists() {
- // let pod_dir: Vec<String> = fc
- // .pod5
- // .iter()
- // .filter(|p| {
- // p.path.contains(&format!("barcode{}", bc.clone()))
- // })
- // .take(1)
- // .map(|p| p.path.to_string())
- // .collect();
- //
- // let pod_dir = pod_dir.first().unwrap();
- // let mut pod_dir = PathBuf::from(pod_dir);
- // pod_dir.pop();
- //
- // // TODO sheduler
- // println!(
- // "complete_bam.sh {} {} {}",
- // c.id,
- // tp,
- // pod_dir.to_string_lossy()
- // )
- // } else {
- // let pod_dir: Vec<String> = fc
- // .pod5
- // .iter()
- // .filter(|p| {
- // p.path.contains(&format!("barcode{}", bc.clone()))
- // })
- // .take(1)
- // .map(|p| p.path.to_string())
- // .collect();
- //
- // let pod_dir = pod_dir.first().unwrap();
- // let mut pod_dir = PathBuf::from(pod_dir);
- // pod_dir.pop();
- //
- // println!(
- // "dorado.sh {} {} {}",
- // c.id,
- // tp,
- // pod_dir.to_string_lossy()
- // )
- // }
- // }),
- // };
- // }
- // }
- // }
- // }
- // }
- pub fn ids(&self) -> Vec<String> {
- let mut ids: Vec<String> = self
- .runs
- .iter()
- .flat_map(|r| {
- r.flowcells
- .iter()
- .flat_map(|f| {
- f.cases
- .iter()
- .map(|c| c.id.clone())
- .collect::<Vec<String>>()
- })
- .collect::<Vec<String>>()
- })
- .collect();
- ids.sort();
- ids.dedup();
- ids
- }
- }
- #[derive(Debug, Deserialize, Clone)]
- pub struct FCLine {
- pub id: String,
- pub time_point: String,
- pub barcode_number: String,
- pub flow_cell: String,
- pub run: String,
- pub path: String,
- pub ref_flow_cell: String,
- }
- pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result<Vec<FCLine>> {
- let file = File::open(file_path)?;
- let mut rdr = ReaderBuilder::new()
- .delimiter(b'\t')
- .has_headers(true)
- .from_reader(file);
- let mut records = Vec::new();
- for result in rdr.deserialize() {
- let mut record: FCLine = result?;
- // formating
- record.time_point = record.time_point.to_lowercase();
- record.id = record.id.to_uppercase();
- records.push(record);
- }
- Ok(records)
- }
- #[derive(Debug, Serialize, Deserialize)]
- struct MinKnowSampleSheet {
- pub protocol_run_id: String,
- pub position_id: String,
- pub flow_cell_id: String,
- pub sample_id: String,
- pub experiment_id: String,
- pub flow_cell_product_code: String,
- pub kit: String,
- }
- impl TryFrom<&str> for MinKnowSampleSheet {
- type Error = anyhow::Error;
- fn try_from(value: &str) -> anyhow::Result<Self> {
- let cells: Vec<&str> = value.split(",").collect();
- if cells.len() != 7 {
- return Err(anyhow::anyhow!(
- "Number of cells not equal to definition. {value}"
- ));
- }
- Ok(Self {
- protocol_run_id: cells[0].to_string(),
- position_id: cells[1].to_string(),
- flow_cell_id: cells[2].to_string(),
- sample_id: cells[3].to_string(),
- experiment_id: cells[4].to_string(),
- flow_cell_product_code: cells[5].to_string(),
- kit: cells[6].to_string(),
- })
- }
- }
- impl MinKnowSampleSheet {
- pub fn from_path(path: &str) -> anyhow::Result<Self> {
- let file = File::open(path).map_err(|e| format!("Can't open file: {path}\n{e}"))?;
- let reader = io::BufReader::new(file);
- for (i, line) in reader.lines().enumerate() {
- let line = line.map_err(|e| format!("Error parsing line: {line:?}\n\t{e}"))?;
- if i == 0 && line != "protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit" {
- return Err(anyhow::anyhow!("File header doesnt correspond to MinKnwo sample sheet: {line}"));
- } else if i == 1 {
- return Ok(line.as_str().try_into()?);
- } else {
- return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format."));
- }
- }
- return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format."));
- }
- }
|