:a 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. use anyhow::{anyhow, Context, Result};
  2. use chrono::{DateTime, Utc};
  3. use csv::ReaderBuilder;
  4. use glob::glob;
  5. use hashbrown::HashMap;
  6. use log::{info, warn};
  7. use rayon::prelude::*;
  8. use serde::{Deserialize, Serialize};
  9. use std::{
  10. fmt::Display,
  11. fs::{self, File, Metadata},
  12. io::{self, BufRead},
  13. os::unix::fs::MetadataExt,
  14. path::PathBuf,
  15. };
  16. use crate::io::pod5_infos::Pod5Info;
  17. #[derive(Debug, Clone)]
  18. pub struct Pod5 {
  19. pub path: PathBuf,
  20. pub pod5_type: Pod5Type,
  21. pub run_name: String,
  22. pub flowcell_name: String,
  23. pub file_metadata: Metadata,
  24. }
  25. #[derive(Debug, Clone, PartialEq)]
  26. pub enum Pod5Type {
  27. Raw,
  28. Demuxed,
  29. }
  30. impl Display for Pod5Type {
  31. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  32. let s = match self {
  33. Pod5Type::Raw => "raw",
  34. Pod5Type::Demuxed => "demuxed",
  35. };
  36. f.write_str(s)
  37. }
  38. }
  39. #[derive(Debug, Clone)]
  40. pub struct Pod5Config {
  41. pub base_dir: String,
  42. pub type_raw: String,
  43. pub type_demuxed: String,
  44. pub run_dir_n: u8,
  45. pub flowcell_dir_n: u8,
  46. }
  47. impl Default for Pod5Config {
  48. fn default() -> Self {
  49. Self {
  50. base_dir: "/data/run_data".to_string(),
  51. type_raw: "/pod5/".to_string(),
  52. type_demuxed: "/pod5_pass/".to_string(),
  53. run_dir_n: 0,
  54. flowcell_dir_n: 1,
  55. }
  56. }
  57. }
  58. impl Pod5 {
  59. pub fn from_path(path: &PathBuf, config: &Pod5Config) -> Result<Self> {
  60. let s = path
  61. .to_str()
  62. .context("Can't convert PathBuf to str {path:?}")?;
  63. let pod5_type = if s.contains(&config.type_raw) {
  64. Pod5Type::Raw
  65. } else if s.contains(&config.type_demuxed) {
  66. Pod5Type::Demuxed
  67. } else {
  68. return Err(anyhow!("Can't find the pod5 type {s}"));
  69. };
  70. let file_metadata = fs::metadata(path)?;
  71. let sr = s.replace(&config.base_dir, "");
  72. let components: Vec<&str> = sr.split('/').filter(|c| !c.is_empty()).collect();
  73. let run_name = components
  74. .get(config.run_dir_n as usize)
  75. .context("Can't get run_name")?
  76. .to_string();
  77. let flowcell_name = components
  78. .get(config.flowcell_dir_n as usize)
  79. .context("Can't get flowcell_name")?
  80. .to_string();
  81. Ok(Self {
  82. path: path.to_path_buf(),
  83. pod5_type,
  84. run_name,
  85. flowcell_name,
  86. file_metadata,
  87. })
  88. }
  89. }
  90. pub fn list_pod_files(dir: &str) -> Result<Vec<Pod5>> {
  91. let pattern = format!("{}/**/*.pod5", dir);
  92. let mut pod_files = Vec::new();
  93. let conf = Pod5Config {
  94. base_dir: if dir.ends_with('/') {
  95. dir.to_string()
  96. } else {
  97. format!("{dir}/")
  98. },
  99. ..Pod5Config::default()
  100. };
  101. for entry in glob(&pattern).expect("Failed to read glob pattern") {
  102. match entry {
  103. Ok(path) => {
  104. let p = path.to_str().context("Can't parse path to string {path}")?;
  105. if p.contains("/pod5_fail/") || p.contains("/pod5_skip/") {
  106. continue;
  107. }
  108. match Pod5::from_path(&path, &conf) {
  109. Ok(pod5) => pod_files.push(pod5),
  110. Err(e) => warn!("{e}"),
  111. }
  112. }
  113. Err(e) => warn!("Error: {:?}", e),
  114. }
  115. }
  116. Ok(pod_files)
  117. }
  118. #[derive(Debug)]
  119. pub struct Run {
  120. pub run_name: String,
  121. pub flowcells: Vec<FlowCell>,
  122. }
  123. #[derive(Debug, Clone)]
  124. pub struct FlowCell {
  125. pub flowcell_name: String,
  126. pub corrected_name: String,
  127. pub cases: Vec<FlowCellCase>,
  128. pub run_name: String,
  129. pub pod5_type: Pod5Type,
  130. pub pod5_info: Pod5Info,
  131. pub pod5: Vec<Pod5>,
  132. }
  133. // impl FlowCell {
  134. // pub fn cases_pod5_dir(&self) -> Vec<PathBuf> {
  135. // match self.pod5_type {
  136. // Pod5Type::Raw => {
  137. // let p = self.pod5.first().unwrap();
  138. // vec![p.path.parent().unwrap().to_path_buf()]
  139. // },
  140. // Pod5Type::Demuxed => {
  141. // self.cases.iter().map(|c| {
  142. // let str_barcode = format!("barcode{}", c.barcode);
  143. // })
  144. // },
  145. // }
  146. // }
  147. // }
  148. #[derive(Debug, Default)]
  149. pub struct Pod5Collection {
  150. pub importation_date: DateTime<Utc>,
  151. pub runs: Vec<Run>,
  152. pub bam_dir: String,
  153. pub pod5_dir: String,
  154. }
  155. #[derive(Debug, Clone, Default)]
  156. pub struct FlowCellCase {
  157. pub id: String,
  158. pub time_point: String,
  159. pub barcode: String,
  160. pub pod_dir: PathBuf,
  161. // pub basecalled: Option<bool>,
  162. }
  163. impl Pod5Collection {
  164. pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result<Self> {
  165. let pod5 = list_pod_files(pod5_dir)?;
  166. info!("n pod5 {}", pod5.len());
  167. let mut fc: HashMap<String, Vec<Pod5>> = HashMap::new();
  168. for pod in pod5 {
  169. let k = format!("{}-{}", pod.run_name, pod.flowcell_name);
  170. fc.entry(k).or_default().push(pod);
  171. }
  172. let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?;
  173. let flow_cells: Vec<FlowCell> = fc
  174. .par_values()
  175. .map(|v| {
  176. let first = &v[0];
  177. let pod5_info = Pod5Info::from_pod5(first.path.to_str().unwrap());
  178. let flowcell_name = first.flowcell_name.clone();
  179. let sel: Vec<FCLine> = corrected_fc
  180. .iter()
  181. .filter(|e| e.flow_cell == flowcell_name)
  182. .cloned()
  183. .collect();
  184. let mut corrected_name: Vec<String> = sel
  185. .clone()
  186. .into_iter()
  187. .map(|e| e.ref_flow_cell)
  188. .filter(|e| !e.is_empty())
  189. .collect();
  190. corrected_name.dedup();
  191. if corrected_name.len() > 1 {
  192. panic!("Multiple corrected flow_cells for {v:?}");
  193. }
  194. let corrected_name = if !corrected_name.is_empty() {
  195. corrected_name.first().unwrap().to_string()
  196. } else {
  197. "".to_string()
  198. };
  199. let cases: Vec<FlowCellCase> = sel
  200. .iter()
  201. .map(|e| {
  202. let pod_dir = match first.pod5_type {
  203. Pod5Type::Raw => first.path.parent().unwrap().to_path_buf(),
  204. Pod5Type::Demuxed => {
  205. let mut bc_dir =
  206. first.path.parent().unwrap().parent().unwrap().to_path_buf();
  207. bc_dir
  208. .push(format!("barcode{}", e.barcode_number.replace("NB", "")));
  209. bc_dir
  210. }
  211. };
  212. FlowCellCase {
  213. id: e.id.clone(),
  214. time_point: e.time_point.clone(),
  215. barcode: e.barcode_number.clone(),
  216. pod_dir,
  217. }
  218. })
  219. .collect();
  220. FlowCell {
  221. flowcell_name,
  222. corrected_name,
  223. cases,
  224. run_name: first.run_name.clone(),
  225. pod5_type: first.pod5_type.clone(),
  226. pod5_info,
  227. pod5: v.to_vec(),
  228. }
  229. })
  230. .collect();
  231. let mut runs = HashMap::new();
  232. for fc in flow_cells {
  233. runs.entry(fc.run_name.clone())
  234. .or_insert_with(Vec::new)
  235. .push(fc);
  236. }
  237. let runs: Vec<Run> = runs
  238. .into_values()
  239. .map(|v| Run {
  240. run_name: v[0].run_name.clone(),
  241. flowcells: v.to_vec(),
  242. })
  243. .collect();
  244. Ok(Self {
  245. importation_date: Utc::now(),
  246. runs,
  247. bam_dir: bam_dir.to_string(),
  248. pod5_dir: pod5_dir.to_string(),
  249. })
  250. }
  251. pub fn print_info(&self) {
  252. self.runs.iter().for_each(|run| {
  253. run.flowcells.iter().for_each(|fc| {
  254. let total_size: u64 = fc.pod5.iter().map(|p| p.file_metadata.size()).sum();
  255. let n_files = fc.pod5.len();
  256. let dates: Vec<DateTime<Utc>> = fc
  257. .pod5
  258. .iter()
  259. .map(|p| p.file_metadata.modified().unwrap().into())
  260. .collect();
  261. let from = dates.iter().min().unwrap();
  262. let to = dates.iter().max().unwrap();
  263. let s = [
  264. run.run_name.clone(),
  265. from.to_string(),
  266. to.to_string(),
  267. n_files.to_string(),
  268. total_size.to_string(),
  269. fc.flowcell_name.to_string(),
  270. fc.pod5_type.to_string(),
  271. fc.pod5_info.acquisition_id.clone(),
  272. format!("{:?}", fc.cases),
  273. ]
  274. .join("\t");
  275. println!("{s}");
  276. });
  277. });
  278. }
  279. // pub fn check_local(&self) -> anyhow::Result<()> {
  280. // let mut res = Vec::new();
  281. // for run in self.runs.iter() {
  282. // for fc in run.flowcells.iter() {
  283. // for c in fc.cases.iter() {
  284. // let bases_called = if let Some(b) = c.basecalled {
  285. // if b {
  286. // "✅".to_string()
  287. // } else {
  288. // "❌".to_string()
  289. // }
  290. // } else {
  291. // "❌".to_string()
  292. // };
  293. //
  294. // let s = [
  295. // c.id.to_string(),
  296. // c.time_point.to_string(),
  297. // c.barcode.to_string(),
  298. // run.run_name.clone(),
  299. // fc.flowcell_name.to_string(),
  300. // fc.pod5_type.to_string(),
  301. // fc.pod5_info.acquisition_id.clone(),
  302. // bases_called,
  303. // ]
  304. // .join("\t");
  305. // res.push(s);
  306. // }
  307. // }
  308. // }
  309. // res.sort();
  310. // println!("{}", res.join("\n"));
  311. // Ok(())
  312. // }
  313. // pub fn fc_done(&self) {
  314. // for run in self.runs.iter() {
  315. // for fc in run.flowcells.iter() {
  316. // let n_called = fc
  317. // .cases
  318. // .iter()
  319. // .filter(|c| if let Some(b) = c.basecalled { b } else { false })
  320. // .count();
  321. // if n_called != 0 && n_called == fc.cases.len() {
  322. // let s = [
  323. // format!("{}/{}", run.run_name, fc.flowcell_name),
  324. // fc.pod5_info.acquisition_id.to_string(),
  325. // format!("{:#?}", fc.cases),
  326. // ]
  327. // .join("\t");
  328. // println!("{s}");
  329. // }
  330. // }
  331. // }
  332. // }
  333. // pub fn todo(&self) {
  334. // let run_dir = &self.pod5_dir;
  335. // for run in self.runs.iter() {
  336. // for fc in run.flowcells.iter() {
  337. // let to_call: Vec<_> = fc
  338. // .cases
  339. // .iter()
  340. // .filter(|c| if let Some(b) = c.basecalled { !b } else { true })
  341. // .collect();
  342. //
  343. // if !to_call.is_empty() {
  344. // if fc.pod5_type == Pod5Type::Raw && to_call.len() != fc.cases.len() {
  345. // println!("No solution for: {}/{}", run.run_name, fc.flowcell_name);
  346. // } else {
  347. // match fc.pod5_type {
  348. // Pod5Type::Raw => {
  349. // let cases: Vec<String> = to_call
  350. // .iter()
  351. // .map(|c| {
  352. // let bc = c.barcode.replace("NB", "");
  353. // let tp = c.time_point.to_lowercase();
  354. // [bc, c.id.to_string(), tp].join(" ")
  355. // })
  356. // .collect();
  357. // println!(
  358. // "from_mux.sh {}/{}/{} {}",
  359. // run_dir,
  360. // run.run_name,
  361. // fc.flowcell_name,
  362. // cases.join(" ")
  363. // );
  364. // }
  365. // Pod5Type::Demuxed => to_call.iter().for_each(|c| {
  366. // let bc = c.barcode.replace("NB", "");
  367. // let tp = c.time_point.to_lowercase();
  368. // let bam = format!(
  369. // "{}/{}/{}/{}_{}_hs1.bam",
  370. // self.bam_dir, c.id, c.time_point, c.id, c.time_point
  371. // );
  372. // if PathBuf::from(bam).exists() {
  373. // let pod_dir: Vec<String> = fc
  374. // .pod5
  375. // .iter()
  376. // .filter(|p| {
  377. // p.path.contains(&format!("barcode{}", bc.clone()))
  378. // })
  379. // .take(1)
  380. // .map(|p| p.path.to_string())
  381. // .collect();
  382. //
  383. // let pod_dir = pod_dir.first().unwrap();
  384. // let mut pod_dir = PathBuf::from(pod_dir);
  385. // pod_dir.pop();
  386. //
  387. // // TODO sheduler
  388. // println!(
  389. // "complete_bam.sh {} {} {}",
  390. // c.id,
  391. // tp,
  392. // pod_dir.to_string_lossy()
  393. // )
  394. // } else {
  395. // let pod_dir: Vec<String> = fc
  396. // .pod5
  397. // .iter()
  398. // .filter(|p| {
  399. // p.path.contains(&format!("barcode{}", bc.clone()))
  400. // })
  401. // .take(1)
  402. // .map(|p| p.path.to_string())
  403. // .collect();
  404. //
  405. // let pod_dir = pod_dir.first().unwrap();
  406. // let mut pod_dir = PathBuf::from(pod_dir);
  407. // pod_dir.pop();
  408. //
  409. // println!(
  410. // "dorado.sh {} {} {}",
  411. // c.id,
  412. // tp,
  413. // pod_dir.to_string_lossy()
  414. // )
  415. // }
  416. // }),
  417. // };
  418. // }
  419. // }
  420. // }
  421. // }
  422. // }
  423. pub fn ids(&self) -> Vec<String> {
  424. let mut ids: Vec<String> = self
  425. .runs
  426. .iter()
  427. .flat_map(|r| {
  428. r.flowcells
  429. .iter()
  430. .flat_map(|f| {
  431. f.cases
  432. .iter()
  433. .map(|c| c.id.clone())
  434. .collect::<Vec<String>>()
  435. })
  436. .collect::<Vec<String>>()
  437. })
  438. .collect();
  439. ids.sort();
  440. ids.dedup();
  441. ids
  442. }
  443. }
  444. #[derive(Debug, Deserialize, Clone)]
  445. pub struct FCLine {
  446. pub id: String,
  447. pub time_point: String,
  448. pub barcode_number: String,
  449. pub flow_cell: String,
  450. pub run: String,
  451. pub path: String,
  452. pub ref_flow_cell: String,
  453. }
  454. pub fn load_flowcells_corrected_names(file_path: &str) -> anyhow::Result<Vec<FCLine>> {
  455. let file = File::open(file_path)?;
  456. let mut rdr = ReaderBuilder::new()
  457. .delimiter(b'\t')
  458. .has_headers(true)
  459. .from_reader(file);
  460. let mut records = Vec::new();
  461. for result in rdr.deserialize() {
  462. let mut record: FCLine = result?;
  463. // formating
  464. record.time_point = record.time_point.to_lowercase();
  465. record.id = record.id.to_uppercase();
  466. records.push(record);
  467. }
  468. Ok(records)
  469. }
  470. #[derive(Debug, Serialize, Deserialize)]
  471. struct MinKnowSampleSheet {
  472. pub protocol_run_id: String,
  473. pub position_id: String,
  474. pub flow_cell_id: String,
  475. pub sample_id: String,
  476. pub experiment_id: String,
  477. pub flow_cell_product_code: String,
  478. pub kit: String,
  479. }
  480. impl TryFrom<&str> for MinKnowSampleSheet {
  481. type Error = anyhow::Error;
  482. fn try_from(value: &str) -> anyhow::Result<Self> {
  483. let cells: Vec<&str> = value.split(",").collect();
  484. if cells.len() != 7 {
  485. return Err(anyhow::anyhow!(
  486. "Number of cells not equal to definition. {value}"
  487. ));
  488. }
  489. Ok(Self {
  490. protocol_run_id: cells[0].to_string(),
  491. position_id: cells[1].to_string(),
  492. flow_cell_id: cells[2].to_string(),
  493. sample_id: cells[3].to_string(),
  494. experiment_id: cells[4].to_string(),
  495. flow_cell_product_code: cells[5].to_string(),
  496. kit: cells[6].to_string(),
  497. })
  498. }
  499. }
  500. impl MinKnowSampleSheet {
  501. pub fn from_path(path: &str) -> anyhow::Result<Self> {
  502. let file = File::open(path).map_err(|e| format!("Can't open file: {path}\n{e}"))?;
  503. let reader = io::BufReader::new(file);
  504. for (i, line) in reader.lines().enumerate() {
  505. let line = line.map_err(|e| format!("Error parsing line: {line:?}\n\t{e}"))?;
  506. if i == 0 && line != "protocol_run_id,position_id,flow_cell_id,sample_id,experiment_id,flow_cell_product_code,kit" {
  507. return Err(anyhow::anyhow!("File header doesnt correspond to MinKnwo sample sheet: {line}"));
  508. } else if i == 1 {
  509. return Ok(line.as_str().try_into()?);
  510. } else {
  511. return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format."));
  512. }
  513. }
  514. return Err(anyhow::anyhow!("Wrong MinKnow sample sheet format."));
  515. }
  516. }