mod.rs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. use std::{
  2. collections::HashMap,
  3. fmt,
  4. fs::{self, metadata},
  5. path::{Path, PathBuf},
  6. thread,
  7. time::SystemTime,
  8. };
  9. use anyhow::Context;
  10. use chrono::{DateTime, Utc};
  11. use glob::glob;
  12. use log::{error, info, warn};
  13. use modbases::{Dmr, ModBasesCollection, ModType};
  14. use pandora_lib_variants::variants::Variants;
  15. use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
  16. use crate::{
  17. callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::NanomonSV},
  18. collection::pod5::FlowCellCase,
  19. commands::{
  20. dorado::Dorado as BasecallAlign,
  21. modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig},
  22. },
  23. config::Config,
  24. functions::{
  25. assembler::{Assembler, AssemblerConfig},
  26. variants::{RunVariantsAgg, VariantsConfig},
  27. },
  28. runners::Run, scan::scan::{par_whole_scan, par_whole_scan_local},
  29. };
  30. pub mod bam;
  31. pub mod modbases;
  32. pub mod pod5;
  33. pub mod vcf;
  34. #[derive(Debug, Clone)]
  35. pub struct CollectionsConfig {
  36. pub pod_dir: String,
  37. pub corrected_fc_path: String,
  38. pub result_dir: String,
  39. pub dict_file: String,
  40. pub min_diag_cov: f32,
  41. pub min_mrd_cov: f32,
  42. pub id_black_list: Vec<String>,
  43. }
  44. impl Default for CollectionsConfig {
  45. fn default() -> Self {
  46. Self {
  47. pod_dir: "/data/run_data".to_string(),
  48. corrected_fc_path: "/data/flow_cells.tsv".to_string(),
  49. result_dir: "/data/longreads_basic_pipe".to_string(),
  50. dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
  51. min_diag_cov: 11.0,
  52. min_mrd_cov: 10.0,
  53. id_black_list: Vec::default(),
  54. }
  55. }
  56. }
  57. #[derive(Debug)]
  58. pub struct Collections {
  59. pub config: CollectionsConfig,
  60. pub pod5: Pod5Collection,
  61. pub bam: BamCollection,
  62. pub vcf: VcfCollection,
  63. pub modbases: ModBasesCollection,
  64. pub tasks: Vec<CollectionsTasks>,
  65. }
  66. impl Collections {
  67. pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
  68. let CollectionsConfig {
  69. pod_dir,
  70. corrected_fc_path,
  71. result_dir,
  72. ..
  73. } = &config;
  74. let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
  75. // let pod5 = Pod5Collection::default();
  76. let bam = BamCollection::new(result_dir);
  77. let vcf = VcfCollection::new(result_dir);
  78. let modbases = ModBasesCollection::new(result_dir);
  79. Ok(Self {
  80. pod5,
  81. bam,
  82. vcf,
  83. modbases,
  84. tasks: Vec::new(),
  85. config,
  86. })
  87. }
  88. pub fn todo(&mut self) -> anyhow::Result<()> {
  89. info!("Looking for base calling tasks...");
  90. let mut tasks = Vec::new();
  91. let mut to_demux = Vec::new();
  92. for run in self.pod5.runs.iter() {
  93. for fc in run.flowcells.iter() {
  94. let acq_id = fc.pod5_info.acquisition_id.clone();
  95. for case in fc.cases.iter() {
  96. let bams_ids: Vec<String> = self
  97. .bam
  98. .get(&case.id, &case.time_point)
  99. .iter()
  100. .flat_map(|b| {
  101. b.composition
  102. .iter()
  103. .map(|c| c.0.clone())
  104. .collect::<Vec<String>>()
  105. })
  106. .filter(|id| *id == acq_id)
  107. .collect();
  108. if bams_ids.is_empty() {
  109. match fc.pod5_type {
  110. pod5::Pod5Type::Raw => to_demux.push(case.clone()),
  111. pod5::Pod5Type::Demuxed => {
  112. tasks.push(CollectionsTasks::Align(case.clone()))
  113. }
  114. }
  115. }
  116. }
  117. }
  118. }
  119. // Group for muxed and push task with all cases
  120. let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
  121. for case in to_demux {
  122. grouped.entry(case.pod_dir.clone()).or_default().push(case);
  123. }
  124. grouped
  125. .into_values()
  126. .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
  127. // de novo
  128. // tasks.extend(self.todo_assembler()?);
  129. // Remove VCF anterior to BAM
  130. // let vcf_by_id = self.vcf.group_by_id();
  131. // vcf_by_id.iter().for_each(|(id, vcfs)| {
  132. // if let (Some(diag), Some(mrd)) = (
  133. // self.bam.get(id, "diag").first(),
  134. // self.bam.get(id, "mrd").first(),
  135. // ) {
  136. // let diag_modified = diag.modified;
  137. // let mrd_modified = mrd.modified;
  138. // let mut rm_paths: Vec<&Path> = vcfs
  139. // .iter()
  140. // .flat_map(|vcf| {
  141. // let vcf_mod: DateTime<Utc> = vcf
  142. // .file_metadata
  143. // .modified()
  144. // .expect("Can't read VCF modified time.")
  145. // .into();
  146. //
  147. // // For somatic caller erase if one bam (diag or mrd) is more recent.
  148. // if vcf.caller != "DeepVariant" {
  149. // if vcf_mod < diag_modified || vcf_mod < mrd_modified {
  150. // vec![vcf.path.parent().unwrap()]
  151. // } else {
  152. // vec![]
  153. // }
  154. // } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
  155. // || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
  156. // {
  157. // vec![vcf.path.parent().unwrap()]
  158. // } else {
  159. // vec![]
  160. // }
  161. // })
  162. // .collect();
  163. // rm_paths.sort();
  164. // rm_paths.dedup();
  165. // rm_paths
  166. // .iter()
  167. // .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
  168. // }
  169. // });
  170. // Variant calling
  171. info!("Looking for variant calling tasks...");
  172. // self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
  173. // if let (Some(diag), Some(mrd)) = (
  174. // self.bam.get(&id, "diag").first(),
  175. // self.bam.get(&id, "mrd").first(),
  176. // ) {
  177. // if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
  178. // if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
  179. // && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
  180. // && !self.config.id_black_list.contains(&id)
  181. // {
  182. // let caller_time: Vec<(&str, &str)> = vcf_by_id
  183. // .iter()
  184. // .filter(|(i, _)| *i == id)
  185. // .flat_map(|(_, vcfs)| {
  186. // vcfs.iter()
  187. // .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
  188. // })
  189. // .collect();
  190. //
  191. // if !caller_time.contains(&("clairs", "diag"))
  192. // || !caller_time.contains(&("clairs_indel", "diag"))
  193. // {
  194. // tasks.push(CollectionsTasks::ClairS {
  195. // id: id.to_string(),
  196. // diag_bam: diag.path.to_str().unwrap().to_string(),
  197. // mrd_bam: mrd.path.to_str().unwrap().to_string(),
  198. // config: ClairSConfig::default(),
  199. // });
  200. // }
  201. // if !caller_time.contains(&("DeepVariant", "diag")) {
  202. // tasks.push(CollectionsTasks::DeepVariant {
  203. // id: id.to_string(),
  204. // time_point: "diag".to_string(),
  205. // bam: diag.path.to_str().unwrap().to_string(),
  206. // config: DeepVariantConfig::default(),
  207. // });
  208. // }
  209. // if !caller_time.contains(&("DeepVariant", "mrd")) {
  210. // tasks.push(CollectionsTasks::DeepVariant {
  211. // id: id.to_string(),
  212. // time_point: "mrd".to_string(),
  213. // bam: mrd.path.to_str().unwrap().to_string(),
  214. // config: DeepVariantConfig::default(),
  215. // });
  216. // }
  217. // if !caller_time.contains(&("nanomonsv", "diag")) {
  218. // tasks.push(CollectionsTasks::NanomonSV {
  219. // id: id.to_string(),
  220. // diag_bam: diag.path.to_str().unwrap().to_string(),
  221. // mrd_bam: mrd.path.to_str().unwrap().to_string(),
  222. // config: NanomonSVConfig::default(),
  223. // });
  224. // }
  225. // }
  226. // }
  227. // }
  228. // });
  229. // Tasks dedup
  230. let mut hs = HashMap::new();
  231. tasks.into_iter().for_each(|t| {
  232. hs.insert(t.to_string(), t);
  233. });
  234. self.tasks = hs.into_values().collect();
  235. // Variants DeepVariant
  236. // self.tasks.extend(self.todo_deepvariants());
  237. // Variants ClairS
  238. // self.tasks.extend(self.todo_clairs());
  239. // Variants Nanomonsv
  240. // self.tasks.extend(self.todo_nanomonsv());
  241. // Variants aggregation
  242. // self.tasks.extend(self.todo_variants_agg()?);
  243. // ModPileup
  244. // self.tasks.extend(self.todo_mod_pileup());
  245. // DMR C diag vs mrd
  246. // self.tasks.extend(self.todo_dmr_c_diag_mrd());
  247. // Tasks sorting
  248. self.tasks.sort_by_cached_key(|task| task.get_order());
  249. Ok(())
  250. }
  251. pub fn tasks_dedup(&mut self) {
  252. let mut hs = HashMap::new();
  253. self.tasks.clone().into_iter().for_each(|t| {
  254. hs.insert(t.to_string(), t);
  255. });
  256. self.tasks = hs.into_values().collect();
  257. }
  258. pub fn todo_bam_count(&mut self, config: &Config) -> anyhow::Result<()> {
  259. // Whole scan
  260. for wgs_bam in self
  261. .bam
  262. .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
  263. {
  264. let id = wgs_bam.id.as_str();
  265. let count_dir = match wgs_bam.time_point.as_str() {
  266. "diag" => config.tumoral_dir_count(id),
  267. "mrd" => config.normal_dir_count(id),
  268. _ => anyhow::bail!("Unknown bam time point {}", wgs_bam.time_point),
  269. };
  270. if PathBuf::from(&count_dir).exists() {
  271. let dir_mod: DateTime<Utc> = fs::metadata(&count_dir)?.modified()?.into();
  272. if wgs_bam.modified > dir_mod {
  273. fs::remove_dir_all(&count_dir)?;
  274. }
  275. }
  276. if !PathBuf::from(&count_dir).exists() {
  277. self.tasks.push(CollectionsTasks::CountBam {
  278. bam_path: wgs_bam.path.to_string_lossy().to_string(),
  279. count_dir,
  280. config: config.clone(),
  281. });
  282. }
  283. }
  284. Ok(())
  285. }
  286. // No pair needed
  287. pub fn todo_assembler(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
  288. let mut tasks = Vec::new();
  289. let config = AssemblerConfig::default();
  290. for b in &self.bam.bams {
  291. let assemblies_dir = format!(
  292. "{}/{}/{}/{}",
  293. config.result_dir, b.id, b.time_point, config.output_dir_name
  294. );
  295. if !Path::new(&assemblies_dir).exists() {
  296. tasks.push(CollectionsTasks::Assemble {
  297. id: b.id.clone(),
  298. time_point: b.time_point.clone(),
  299. config: config.clone(),
  300. });
  301. continue;
  302. }
  303. let pattern = format!("{assemblies_dir}/*/*.bam");
  304. let mut mtimes: Vec<SystemTime> = glob(&pattern)?
  305. .filter_map(|entry| entry.ok())
  306. .filter_map(|path| metadata(path).ok()?.modified().ok())
  307. .collect();
  308. if mtimes.is_empty() {
  309. tasks.push(CollectionsTasks::Assemble {
  310. id: b.id.clone(),
  311. time_point: b.time_point.clone(),
  312. config: config.clone(),
  313. });
  314. continue;
  315. }
  316. mtimes.sort_unstable();
  317. mtimes.dedup();
  318. let max_mtime: DateTime<Utc> =
  319. mtimes.last().context("No modified time")?.to_owned().into();
  320. if b.modified > max_mtime {
  321. tasks.push(CollectionsTasks::Assemble {
  322. id: b.id.clone(),
  323. time_point: b.time_point.clone(),
  324. config: config.clone(),
  325. });
  326. }
  327. }
  328. Ok(tasks)
  329. }
  330. pub fn todo_deepvariants(&self) -> Vec<CollectionsTasks> {
  331. self.bam
  332. .bams
  333. .iter()
  334. .filter_map(|b| {
  335. if self.vcf.vcfs.iter().any(|v| {
  336. v.caller == "DeepVariant"
  337. && v.id == b.id
  338. && v.time == b.time_point
  339. && v.modified().unwrap_or_default() > b.modified
  340. }) {
  341. None
  342. } else {
  343. Some(CollectionsTasks::DeepVariant {
  344. id: b.id.clone(),
  345. time_point: b.time_point.clone(),
  346. bam: b.path.to_string_lossy().to_string(),
  347. })
  348. }
  349. })
  350. .collect()
  351. }
  352. pub fn todo_clairs(&self) -> Vec<CollectionsTasks> {
  353. self.bam_pairs()
  354. .iter()
  355. .filter_map(|(diag, mrd)| {
  356. if self.vcf.vcfs.iter().any(|v| {
  357. v.caller == "clairs"
  358. && v.id == diag.id
  359. && v.time == diag.time_point
  360. && (v.modified().unwrap_or_default() > diag.modified
  361. || v.modified().unwrap_or_default() > mrd.modified)
  362. }) {
  363. None
  364. } else {
  365. Some(CollectionsTasks::ClairS {
  366. id: diag.id.clone(),
  367. diag_bam: diag.path.to_string_lossy().to_string(),
  368. mrd_bam: mrd.path.to_string_lossy().to_string(),
  369. })
  370. }
  371. })
  372. .collect()
  373. }
  374. pub fn run_clairs(&self) -> anyhow::Result<()> {
  375. for task in self.todo_clairs() {
  376. match task.run() {
  377. Ok(_) => info!("done"),
  378. Err(e) => warn!("{e}"),
  379. }
  380. }
  381. Ok(())
  382. }
  383. pub fn todo_nanomonsv(&self) -> Vec<CollectionsTasks> {
  384. self.bam_pairs()
  385. .iter()
  386. .filter_map(|(diag, mrd)| {
  387. if self.vcf.vcfs.iter().any(|v| {
  388. v.caller == "nanomonsv"
  389. && v.id == diag.id
  390. && v.time == diag.time_point
  391. && (v.modified().unwrap_or_default() > diag.modified
  392. || v.modified().unwrap_or_default() > mrd.modified)
  393. }) {
  394. None
  395. } else {
  396. Some(CollectionsTasks::NanomonSV {
  397. id: diag.id.clone(),
  398. })
  399. }
  400. })
  401. .collect()
  402. }
  403. pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
  404. let config = ModkitConfig::default();
  405. self.bam
  406. .bams
  407. .iter()
  408. .filter_map(|b| {
  409. if self.modbases.modbases.iter().any(|mb| {
  410. mb.id == b.id && mb.time_point == b.time_point && mb.pileup_modif > b.modified
  411. }) {
  412. None
  413. } else {
  414. Some(CollectionsTasks::ModPileup {
  415. bam: b.path.clone(),
  416. config: config.clone(),
  417. })
  418. }
  419. })
  420. .collect()
  421. }
  422. pub fn todo_dmr_c_diag_mrd(&self) -> Vec<CollectionsTasks> {
  423. let config = ModkitConfig::default();
  424. self.bam
  425. .ids()
  426. .iter()
  427. .filter_map(|id| {
  428. if let Ok((diag, mrd)) = self.modbases.get_diag_mrd(id, ModType::Mod5mC5hmC) {
  429. let dmr: Vec<&Dmr> = diag
  430. .dmr_files
  431. .iter()
  432. .filter(|dmr| dmr.base == "C" && dmr.vs == "mrd")
  433. .collect();
  434. if dmr.len() == 1 {
  435. let dmr = dmr.first().unwrap();
  436. if let Ok(metadata) = dmr.path.metadata() {
  437. if let Ok(modif) = metadata.modified() {
  438. let m: DateTime<Utc> = modif.into();
  439. if diag.pileup_modif > m || mrd.pileup_modif > m {
  440. return Some(CollectionsTasks::DMRCDiagMrd {
  441. id: id.clone(),
  442. config: config.clone(),
  443. });
  444. }
  445. }
  446. }
  447. None
  448. } else {
  449. Some(CollectionsTasks::DMRCDiagMrd {
  450. id: id.clone(),
  451. config: config.clone(),
  452. })
  453. }
  454. } else {
  455. None
  456. }
  457. })
  458. .collect()
  459. }
  460. /// Generates pairs of diagnostic and MRD BAM files.
  461. ///
  462. /// This function performs the following steps:
  463. /// 1. Extracts and deduplicates IDs from all BAM files.
  464. /// 2. For each unique ID, attempts to find a pair of BAM files:
  465. /// - One labeled as "diag" (diagnostic)
  466. /// - One labeled as "mrd" (minimal residual disease)
  467. /// 3. Returns pairs where both "diag" and "mrd" BAMs are found.
  468. ///
  469. /// # Returns
  470. ///
  471. /// * `Vec<(bam::Bam, bam::Bam)>` - A vector of tuples, each containing a pair of BAM files
  472. /// (diagnostic and MRD) for a unique ID.
  473. ///
  474. pub fn bam_pairs(&self) -> Vec<(bam::WGSBam, bam::WGSBam)> {
  475. let mut ids: Vec<String> = self.bam.bams.iter().map(|b| b.id.clone()).collect();
  476. ids.sort();
  477. ids.dedup();
  478. ids.iter()
  479. .filter_map(|id| {
  480. match (
  481. self.bam.get(id, "diag").first(),
  482. self.bam.get(id, "mrd").first(),
  483. ) {
  484. (Some(&diag), Some(&mrd)) => Some((diag.clone(), mrd.clone())),
  485. _ => None,
  486. }
  487. })
  488. .collect()
  489. }
  490. /// Aggregates variant tasks based on BAM pairs and VCF files.
  491. ///
  492. /// This function performs the following operations:
  493. /// 1. Iterates through BAM pairs (DIAG/MRD).
  494. /// 2. Checks for the existence of a _constit.bytes.gz file for each pair.
  495. /// 3. If the file exists, compares its modification time with VCF files.
  496. /// 4. Creates variant tasks if the file is older than one of VCF or if it doesn't exist.
  497. ///
  498. /// # Arguments
  499. ///
  500. /// * `self` - The struct instance containing BAM pairs and VCF information.
  501. ///
  502. /// # Returns
  503. ///
  504. /// * `anyhow::Result<Vec<CollectionsTasks>>` - A Result containing a vector of `CollectionsTasks::Variants`
  505. /// if successful, or an error if file metadata cannot be accessed.
  506. ///
  507. // pub fn todo_variants_agg(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
  508. // let mut tasks = Vec::new();
  509. // let config = VariantsConfig::default();
  510. // let vcfs_ids = self.vcf.group_by_id();
  511. // for pair in &self.bam_pairs() {
  512. // if self.config.id_black_list.contains(&pair.0.id) {
  513. // continue;
  514. // }
  515. // let const_path = format!(
  516. // "{}/{}/diag/{}_constit.bytes.gz",
  517. // &config.result_dir, &pair.0.id, &pair.0.id
  518. // );
  519. // let constit = Path::new(&const_path);
  520. //
  521. // if constit.exists() {
  522. // let vcfs: Vec<_> = vcfs_ids.iter().filter(|(id, _)| id == &pair.0.id).collect();
  523. // if let Some((_, vcfs)) = vcfs.first() {
  524. // let mtime = constit
  525. // .metadata()
  526. // .context(format!("Can't access file metadata {const_path}."))?
  527. // .mtime();
  528. // let n_new = vcfs
  529. // .iter()
  530. // .filter(|vcf| mtime < vcf.file_metadata.mtime())
  531. // .count();
  532. // if n_new > 0 {
  533. // tasks.push(CollectionsTasks::SomaticVariants {
  534. // id: pair.0.id.clone(),
  535. // config: config.clone(),
  536. // });
  537. // }
  538. // }
  539. // } else {
  540. // tasks.push(CollectionsTasks::SomaticVariants {
  541. // id: pair.0.id.clone(),
  542. // config: config.clone(),
  543. // });
  544. // }
  545. // }
  546. // Ok(tasks)
  547. // }
  548. /// Runs all tasks in the collection.
  549. ///
  550. /// This method attempts to execute each task in the collection.
  551. ///
  552. /// # Returns
  553. ///
  554. /// Returns `Ok(())` if the process completes without any critical errors, even if
  555. /// individual tasks fail.
  556. ///
  557. /// # Errors
  558. ///
  559. /// This function will return an error if:
  560. /// - Fetching todo tasks fails when the initial task list is empty.
  561. /// - Any critical error occurs during the execution process.
  562. ///
  563. /// Note that individual task failures do not cause this method to return an error.
  564. pub fn run(&mut self) -> anyhow::Result<()> {
  565. if self.tasks.is_empty() {
  566. self.todo().context("Failed to fetch todo tasks")?;
  567. if self.tasks.is_empty() {
  568. info!("No tasks to run");
  569. return Ok(());
  570. }
  571. }
  572. let n_tasks = self.tasks.len();
  573. warn!("{n_tasks} tasks to run");
  574. let mut completed_tasks = Vec::new();
  575. for (i, task) in self.tasks.iter().enumerate() {
  576. warn!("Running task {}/{}", i + 1, n_tasks);
  577. info!("{task}");
  578. match task.clone().run() {
  579. Ok(_) => {
  580. info!("Task completed successfully");
  581. completed_tasks.push(i);
  582. }
  583. Err(err) => error!("Task failed: {}", err),
  584. }
  585. }
  586. // Remove completed tasks
  587. for &index in completed_tasks.iter().rev() {
  588. self.tasks.remove(index);
  589. }
  590. info!(
  591. "{} tasks completed, {} tasks remaining",
  592. completed_tasks.len(),
  593. self.tasks.len()
  594. );
  595. Ok(())
  596. }
  597. pub fn run_deepvariant(&mut self) -> anyhow::Result<()> {
  598. let tasks = self.todo_deepvariants();
  599. let n_tasks = tasks.len();
  600. warn!("{n_tasks} tasks to run");
  601. for (i, tasks_chunk) in tasks.chunks_exact(2).enumerate() {
  602. match tasks_chunk {
  603. [a, b] => {
  604. warn!("Running task {}/{} and {}/{n_tasks}", i + 1, n_tasks, i + 2);
  605. info!("{a} and {b}");
  606. let a = if let CollectionsTasks::DeepVariant {
  607. id,
  608. time_point,
  609. bam,
  610. ..
  611. } = a
  612. {
  613. CollectionsTasks::DeepVariant {
  614. id: id.to_string(),
  615. time_point: time_point.to_string(),
  616. bam: bam.to_string(),
  617. }
  618. } else {
  619. anyhow::bail!("Err")
  620. };
  621. let b = if let CollectionsTasks::DeepVariant {
  622. id,
  623. time_point,
  624. bam,
  625. ..
  626. } = b
  627. {
  628. CollectionsTasks::DeepVariant {
  629. id: id.to_string(),
  630. time_point: time_point.to_string(),
  631. bam: bam.to_string(),
  632. }
  633. } else {
  634. anyhow::bail!("Err");
  635. };
  636. let handle1 = thread::spawn(|| a.run());
  637. let handle2 = thread::spawn(|| b.run());
  638. let _ = handle1.join().unwrap();
  639. let _ = handle2.join().unwrap();
  640. }
  641. [a] => {
  642. info!("Single task: ({})", a);
  643. let _ = a.clone().run();
  644. }
  645. _ => (),
  646. }
  647. }
  648. Ok(())
  649. }
  650. }
  651. #[derive(Clone, Debug)]
  652. pub enum CollectionsTasks {
  653. Align(FlowCellCase),
  654. DemuxAlign(Vec<FlowCellCase>),
  655. CountBam {
  656. bam_path: String,
  657. count_dir: String,
  658. config: Config,
  659. },
  660. Assemble {
  661. id: String,
  662. time_point: String,
  663. config: AssemblerConfig,
  664. },
  665. ModPileup {
  666. bam: PathBuf,
  667. config: ModkitConfig,
  668. },
  669. DMRCDiagMrd {
  670. id: String,
  671. config: ModkitConfig,
  672. },
  673. DeepVariant {
  674. id: String,
  675. time_point: String,
  676. bam: String,
  677. },
  678. ClairS {
  679. id: String,
  680. diag_bam: String,
  681. mrd_bam: String,
  682. },
  683. NanomonSV {
  684. id: String,
  685. },
  686. SomaticVariants {
  687. id: String,
  688. config: VariantsConfig,
  689. },
  690. }
  691. impl CollectionsTasks {
  692. pub fn run(self) -> anyhow::Result<()> {
  693. match self {
  694. CollectionsTasks::Align(case) => {
  695. BasecallAlign::init(case.clone(), Config::default())?.run_pipe()
  696. }
  697. CollectionsTasks::DemuxAlign(cases) => {
  698. BasecallAlign::from_mux(cases, Config::default())
  699. }
  700. CollectionsTasks::ModPileup { bam, config } => bed_methyl(bam, &config),
  701. CollectionsTasks::DeepVariant { id, time_point, .. } => {
  702. DeepVariant::initialize(&id, &time_point, Config::default())?.run()
  703. }
  704. CollectionsTasks::ClairS { id, .. } => {
  705. ClairS::initialize(&id, Config::default())?.run()
  706. }
  707. CollectionsTasks::NanomonSV { id, .. } => {
  708. NanomonSV::initialize(&id, Config::default())?.run()
  709. }
  710. CollectionsTasks::CountBam {
  711. bam_path,
  712. count_dir,
  713. config,
  714. } => par_whole_scan(&count_dir, &bam_path, &config),
  715. CollectionsTasks::SomaticVariants { id, config } => {
  716. RunVariantsAgg::new(id, config).run()
  717. }
  718. CollectionsTasks::Assemble {
  719. id,
  720. time_point,
  721. config,
  722. } => Assembler::new(id, time_point, config).run(),
  723. CollectionsTasks::DMRCDiagMrd { id, config } => dmr_c_mrd_diag(&id, &config),
  724. }
  725. }
  726. pub fn get_order(&self) -> u8 {
  727. match self {
  728. CollectionsTasks::Align(_) => 0,
  729. CollectionsTasks::DemuxAlign(_) => 1,
  730. CollectionsTasks::ModPileup { .. } => 2,
  731. CollectionsTasks::DMRCDiagMrd { .. } => 3,
  732. CollectionsTasks::CountBam { .. } => 4,
  733. CollectionsTasks::Assemble { .. } => 5,
  734. CollectionsTasks::DeepVariant { .. } => 6,
  735. CollectionsTasks::ClairS { .. } => 7,
  736. CollectionsTasks::NanomonSV { .. } => 8,
  737. CollectionsTasks::SomaticVariants { .. } => 9,
  738. }
  739. }
  740. }
  741. // Implement Display for CollectionsTasks
  742. impl fmt::Display for CollectionsTasks {
  743. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  744. use CollectionsTasks::*;
  745. match self {
  746. Align(case) => write!(
  747. f,
  748. "Alignment task for: {} {} {} {}",
  749. case.id,
  750. case.time_point,
  751. case.barcode,
  752. case.pod_dir.display()
  753. ),
  754. DemuxAlign(cases) => write!(
  755. f,
  756. "Demultiplex and alignment task for: {}",
  757. cases
  758. .iter()
  759. .map(|c| format!("{} {} {}", c.id, c.time_point, c.barcode))
  760. .collect::<Vec<String>>()
  761. .join(", ")
  762. ),
  763. DeepVariant {
  764. id,
  765. time_point,
  766. bam,
  767. ..
  768. } => {
  769. write!(
  770. f,
  771. "DeepVariant calling task for {} {}, from bam: {}",
  772. id, time_point, bam
  773. )
  774. }
  775. ClairS {
  776. id,
  777. diag_bam,
  778. mrd_bam,
  779. ..
  780. } => {
  781. write!(
  782. f,
  783. "ClairS calling task for {}, with diag_bam: {}, mrd_bam: {}",
  784. id, diag_bam, mrd_bam
  785. )
  786. }
  787. NanomonSV { id } => {
  788. write!(f, "NanomonSV calling task for {id}")
  789. }
  790. CountBam {
  791. bam_path,
  792. count_dir,
  793. ..
  794. } => write!(f, "Whole bam count for bam: {bam_path} into {count_dir}"),
  795. SomaticVariants { id, .. } => write!(f, "Variants aggregation for {}", id),
  796. Assemble { id, time_point, .. } => {
  797. write!(f, "De novo assemblage for {} {}", id, time_point)
  798. }
  799. ModPileup { bam, .. } => write!(f, "ModPileup for {}", bam.display()),
  800. DMRCDiagMrd { id, .. } => write!(f, "DMR C methylation diag vs mrd for {id}"),
  801. }
  802. }
  803. }
  804. pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
  805. let mut last_n = Vec::with_capacity(3);
  806. let mut consecutive_same_count = 0;
  807. loop {
  808. let mut collection =
  809. Collections::new(config.clone()).context("Failed to create new Collections")?;
  810. collection.todo().context("Failed to get todo tasks")?;
  811. collection
  812. .tasks
  813. .iter()
  814. .for_each(|t| info!("Planned task: {t}"));
  815. let n_tasks = collection.tasks.len();
  816. if n_tasks == 0 {
  817. info!("All results are up to date");
  818. break;
  819. }
  820. if last_n.len() >= 2 && last_n.iter().rev().take(2).all(|&x| x == n_tasks) {
  821. consecutive_same_count += 1;
  822. if consecutive_same_count >= 2 {
  823. error!("Tasks are not progressing");
  824. break;
  825. }
  826. } else {
  827. consecutive_same_count = 0;
  828. }
  829. last_n.push(n_tasks);
  830. if last_n.len() > 3 {
  831. last_n.remove(0);
  832. }
  833. collection.run().context("Failed to run collection tasks")?;
  834. }
  835. Ok(())
  836. }
  837. pub trait Initialize: Sized {
  838. fn initialize(id: &str, config: Config) -> anyhow::Result<Self>;
  839. }
  840. pub trait InitializeSolo: Sized {
  841. fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self>;
  842. }
  843. pub trait HasOutputs {
  844. fn has_output(&self, id: &str) -> (bool, bool);
  845. }
  846. pub trait Version {
  847. fn version(config: &Config) -> anyhow::Result<String>;
  848. }
  849. pub trait LoadVariants {
  850. fn load_variants(&self) -> anyhow::Result<Variants>;
  851. }
  852. pub fn exists_all(paths: Vec<&str>) -> anyhow::Result<()> {
  853. for path in paths.iter() {
  854. if !Path::new(path).exists() {
  855. anyhow::bail!("{path} should exist")
  856. }
  857. }
  858. Ok(())
  859. }