| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- use std::{
- collections::HashMap,
- fmt,
- fs::{self, metadata},
- path::{Path, PathBuf},
- time::SystemTime,
- };
- use anyhow::Context;
- use chrono::{DateTime, Utc};
- use glob::glob;
- use log::{info, warn};
- use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
- use crate::{
- callers::{
- clairs::{ClairS, ClairSConfig},
- deep_variant::{DeepVariant, DeepVariantConfig},
- nanomonsv::{NanomonSV, NanomonSVConfig},
- },
- collection::pod5::FlowCellCase,
- commands::dorado::Dorado as BasecallAlign,
- config::Config,
- functions::{
- assembler::{Assembler, AssemblerConfig},
- variants::{Variants, VariantsConfig},
- whole_scan::{WholeScan, WholeScanConfig},
- },
- };
- pub mod bam;
- pub mod pod5;
- pub mod variants;
- pub mod vcf;
- #[derive(Debug, Clone)]
- pub struct CollectionsConfig {
- pub pod_dir: String,
- pub corrected_fc_path: String,
- pub result_dir: String,
- pub dict_file: String,
- pub min_diag_cov: f32,
- pub min_mrd_cov: f32,
- }
- impl Default for CollectionsConfig {
- fn default() -> Self {
- Self {
- pod_dir: "/data/run_data".to_string(),
- corrected_fc_path: "/data/flow_cells.tsv".to_string(),
- result_dir: "/data/longreads_basic_pipe".to_string(),
- dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
- min_diag_cov: 12.0,
- min_mrd_cov: 10.0,
- }
- }
- }
- #[derive(Debug)]
- pub struct Collections {
- pub config: CollectionsConfig,
- pub pod5: Pod5Collection,
- pub bam: BamCollection,
- pub vcf: VcfCollection,
- pub tasks: Vec<CollectionsTasks>,
- }
- impl Collections {
- pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
- let CollectionsConfig {
- pod_dir,
- corrected_fc_path,
- result_dir,
- ..
- } = &config;
- let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
- let bam = BamCollection::new(result_dir);
- let vcf = VcfCollection::new(result_dir);
- Ok(Self {
- pod5,
- bam,
- vcf,
- tasks: Vec::new(),
- config,
- })
- }
- pub fn todo(&mut self) -> anyhow::Result<()> {
- info!("Looking for base calling tasks...");
- let mut tasks = Vec::new();
- let mut to_demux = Vec::new();
- for run in self.pod5.runs.iter() {
- for fc in run.flowcells.iter() {
- let acq_id = fc.pod5_info.acquisition_id.clone();
- for case in fc.cases.iter() {
- let bams_ids: Vec<String> = self
- .bam
- .get(&case.id, &case.time_point)
- .iter()
- .flat_map(|b| {
- b.composition
- .iter()
- .map(|c| c.0.clone())
- .collect::<Vec<String>>()
- })
- .filter(|id| *id == acq_id)
- .collect();
- if bams_ids.is_empty() {
- match fc.pod5_type {
- pod5::Pod5Type::Raw => to_demux.push(case.clone()),
- pod5::Pod5Type::Demuxed => {
- tasks.push(CollectionsTasks::Align(case.clone()))
- }
- }
- }
- }
- }
- }
- // Group for muxed and push task with all cases
- let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
- for case in to_demux {
- grouped.entry(case.pod_dir.clone()).or_default().push(case);
- }
- grouped
- .into_values()
- .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
- // Whole scan
- for bam in self
- .bam
- .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
- {
- let config = WholeScanConfig::default();
- let scan_dir = format!(
- "{}/{}/{}/{}",
- &config.result_dir, bam.id, bam.time_point, config.scan_dir
- );
- if PathBuf::from(&scan_dir).exists() {
- let dir_mod: DateTime<Utc> = fs::metadata(&scan_dir)?.modified()?.into();
- if bam.modified > dir_mod {
- fs::remove_dir_all(&scan_dir)?;
- }
- }
- if !PathBuf::from(&scan_dir).exists() {
- tasks.push(CollectionsTasks::WholeScan {
- id: bam.id,
- time_point: bam.time_point,
- bam: bam
- .path
- .to_str()
- .context("Cant convert path to string")?
- .to_string(),
- config: WholeScanConfig::default(),
- });
- }
- }
- // Remove VCF anterior to BAM
- let vcf_by_id = self.vcf.group_by_id();
- vcf_by_id.iter().for_each(|(id, vcfs)| {
- if let (Some(diag), Some(mrd)) = (
- self.bam.get(id, "diag").first(),
- self.bam.get(id, "mrd").first(),
- ) {
- let diag_modified = diag.modified;
- let mrd_modified = mrd.modified;
- let mut rm_paths: Vec<&Path> = vcfs
- .iter()
- .flat_map(|vcf| {
- let vcf_mod: DateTime<Utc> = vcf
- .file_metadata
- .modified()
- .expect("Can't read VCF modified time.")
- .into();
- // For somatic caller erase if one bam (diag or mrd) is more recent.
- if vcf.caller != "DeepVariant" {
- if vcf_mod < diag_modified || vcf_mod < mrd_modified {
- vec![vcf.path.parent().unwrap()]
- } else {
- vec![]
- }
- } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
- || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
- {
- vec![vcf.path.parent().unwrap()]
- } else {
- vec![]
- }
- })
- .collect();
- rm_paths.sort();
- rm_paths.dedup();
- rm_paths
- .iter()
- .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
- }
- });
- // Variant calling
- info!("Looking for variant calling tasks...");
- self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
- if let (Some(diag), Some(mrd)) = (
- self.bam.get(&id, "diag").first(),
- self.bam.get(&id, "mrd").first(),
- ) {
- if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
- if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
- && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
- {
- let caller_time: Vec<(&str, &str)> = vcf_by_id
- .iter()
- .filter(|(i, _)| *i == id)
- .flat_map(|(_, vcfs)| {
- vcfs.iter()
- .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
- })
- .collect();
- if !caller_time.contains(&("clairs", "diag"))
- || !caller_time.contains(&("clairs_indel", "diag"))
- {
- tasks.push(CollectionsTasks::ClairS {
- id: id.to_string(),
- diag_bam: diag.path.to_str().unwrap().to_string(),
- mrd_bam: mrd.path.to_str().unwrap().to_string(),
- config: ClairSConfig::default(),
- });
- }
- if !caller_time.contains(&("DeepVariant", "diag")) {
- tasks.push(CollectionsTasks::DeepVariant {
- id: id.to_string(),
- time_point: "diag".to_string(),
- bam: diag.path.to_str().unwrap().to_string(),
- config: DeepVariantConfig::default(),
- });
- }
- if !caller_time.contains(&("DeepVariant", "mrd")) {
- tasks.push(CollectionsTasks::DeepVariant {
- id: id.to_string(),
- time_point: "mrd".to_string(),
- bam: mrd.path.to_str().unwrap().to_string(),
- config: DeepVariantConfig::default(),
- });
- }
- if !caller_time.contains(&("nanomonsv", "diag")) {
- tasks.push(CollectionsTasks::NanomonSV {
- id: id.to_string(),
- diag_bam: diag.path.to_str().unwrap().to_string(),
- mrd_bam: mrd.path.to_str().unwrap().to_string(),
- config: NanomonSVConfig::default(),
- });
- }
- }
- }
- }
- });
- // Variants aggregation
- // info!("Looking for variants aggregation tasks...");
- // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| {
- // let id = bam.id;
- // });
- // de novo
- tasks.extend(self.todo_assembler()?);
- // Tasks sorting and dedup
- let mut hs = HashMap::new();
- tasks.into_iter().for_each(|t| {
- hs.insert(t.to_string(), t);
- });
- self.tasks = hs.into_values().collect();
- Ok(())
- }
- pub fn tasks_dedup(&mut self) {
- let mut hs = HashMap::new();
- self.tasks.clone().into_iter().for_each(|t| {
- hs.insert(t.to_string(), t);
- });
- self.tasks = hs.into_values().collect();
- }
- pub fn todo_assembler(&mut self) -> anyhow::Result<Vec<CollectionsTasks>> {
- let mut tasks = Vec::new();
- let config = AssemblerConfig::default();
- for b in &self.bam.bams {
- let assemblies_dir = format!(
- "{}/{}/{}/{}",
- config.result_dir, b.id, b.time_point, config.output_dir_name
- );
- if !Path::new(&assemblies_dir).exists() {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- continue;
- }
- let pattern = format!("{assemblies_dir}/*/*.bam");
- let mut mtimes: Vec<SystemTime> = glob(&pattern)?
- .filter_map(|entry| entry.ok())
- .filter_map(|path| metadata(path).ok()?.modified().ok())
- .collect();
- if mtimes.is_empty() {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- continue;
- }
- mtimes.sort_unstable();
- mtimes.dedup();
- let max_mtime: DateTime<Utc> =
- mtimes.last().context("No modified time")?.to_owned().into();
- if b.modified > max_mtime {
- tasks.push(CollectionsTasks::Assemble {
- id: b.id.clone(),
- time_point: b.time_point.clone(),
- config: config.clone(),
- });
- }
- }
- Ok(tasks)
- }
- pub fn run(&mut self) -> anyhow::Result<()> {
- // self.tasks.reverse();
- if self.tasks.is_empty() {
- self.todo()?;
- if self.tasks.is_empty() {
- return Ok(());
- } else {
- self.run()?;
- }
- } else {
- let n_tasks = self.tasks.len();
- warn!("{n_tasks} tasks to run");
- let mut i = 1;
- while let Some(task) = self.tasks.pop() {
- warn!("Running {i}/{n_tasks}");
- info!("{task:#?}");
- task.run()?;
- i += 1;
- }
- }
- Ok(())
- }
- }
- use strum_macros::EnumOrd;
- #[derive(Debug, Clone, EnumOrd)]
- pub enum CollectionsTasks {
- Align(FlowCellCase),
- DemuxAlign(Vec<FlowCellCase>),
- WholeScan {
- id: String,
- time_point: String,
- bam: String,
- config: WholeScanConfig,
- },
- Assemble {
- id: String,
- time_point: String,
- config: AssemblerConfig,
- },
- DeepVariant {
- id: String,
- time_point: String,
- bam: String,
- config: DeepVariantConfig,
- },
- ClairS {
- id: String,
- diag_bam: String,
- mrd_bam: String,
- config: ClairSConfig,
- },
- NanomonSV {
- id: String,
- diag_bam: String,
- mrd_bam: String,
- config: NanomonSVConfig,
- },
- Variants {
- id: String,
- config: VariantsConfig,
- },
- }
- impl CollectionsTasks {
- pub fn run(self) -> anyhow::Result<()> {
- match self {
- CollectionsTasks::Align(case) => {
- BasecallAlign::init(case.clone(), Config::default())?.run_pipe()?;
- }
- CollectionsTasks::DemuxAlign(cases) => {
- BasecallAlign::from_mux(cases, Config::default())?;
- }
- CollectionsTasks::DeepVariant {
- id,
- time_point,
- bam,
- config,
- } => {
- DeepVariant::new(&id, &time_point, &bam, config).run();
- }
- CollectionsTasks::ClairS {
- id,
- diag_bam,
- mrd_bam,
- config,
- } => {
- ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
- }
- CollectionsTasks::NanomonSV {
- id,
- diag_bam,
- mrd_bam,
- config,
- } => {
- NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
- }
- CollectionsTasks::WholeScan {
- id,
- time_point,
- bam,
- config,
- } => {
- WholeScan::new(id, time_point, bam, config)?.run()?;
- }
- CollectionsTasks::Variants { id, config } => {
- Variants::new(id, config).run()?;
- }
- CollectionsTasks::Assemble {
- id,
- time_point,
- config,
- } => {
- Assembler::new(id, time_point, config).run()?;
- }
- }
- Ok(())
- }
- }
- // Implement Display for CollectionsTasks
- impl fmt::Display for CollectionsTasks {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- use CollectionsTasks::*;
- match self {
- Align(case) => write!(f, "Align task with: {:#?}", case),
- DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases),
- DeepVariant {
- id,
- time_point,
- bam,
- ..
- } => {
- write!(
- f,
- "DeepVariant task with id: {}, time_point: {}, bam: {}",
- id, time_point, bam
- )
- }
- ClairS {
- id,
- diag_bam,
- mrd_bam,
- ..
- } => {
- write!(
- f,
- "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}",
- id, diag_bam, mrd_bam
- )
- }
- NanomonSV {
- id,
- diag_bam,
- mrd_bam,
- ..
- } => {
- write!(
- f,
- "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}",
- id, diag_bam, mrd_bam
- )
- }
- WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam),
- Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id),
- Assemble { id, time_point, .. } => {
- write!(f, "Assembly for id: {}, time point: {}", id, time_point)
- }
- }
- }
- }
- pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
- let mut last_n = Vec::new();
- loop {
- let mut collection = Collections::new(config.clone())?;
- collection.todo()?;
- if collection.tasks.is_empty() {
- warn!("All results are update");
- break;
- }
- let n_tasks = collection.tasks.len();
- warn!("{n_tasks} tasks to run");
- if last_n.len() > 2
- && last_n[last_n.len() - 1] == n_tasks
- && last_n[last_n.len() - 2] == n_tasks
- {
- warn!("Tasks don't progress");
- break;
- }
- last_n.push(n_tasks);
- collection.run()?;
- }
- Ok(())
- }
|