use std::{ fs::{self, File}, io::{BufRead, BufReader}, path::{Path, PathBuf}, str::FromStr }; use anyhow::Context; use crate::{ collection::InitializeSolo, runners::{run_wait, CommandRun, Run}, }; #[derive(Debug, Clone)] pub struct ModkitConfig { pub bin: String, pub crabz_bin: String, pub tabix_bin: String, pub reference: String, pub cgi: String, pub threads: u8, pub result_dir: String, } impl Default for ModkitConfig { fn default() -> Self { Self { bin: "modkit".to_string(), crabz_bin: "crabz".to_string(), tabix_bin: "tabix".to_string(), reference: "/data/ref/hs1/chm13v2.0.fa".to_string(), cgi: "/data/ref/hs1/chm13v2.0_CGI_corrected.bed".to_string(), threads: 155, result_dir: "/data/longreads_basic_pipe".to_string(), } } } pub fn bed_methyl(bam: PathBuf, config: &ModkitConfig) -> anyhow::Result<()> { let dir = bam .parent() .context(format!("No parent dir for {}", bam.display()))?; let meth_dir = dir.join("5mC_5hmC"); // create modified base dir if !meth_dir.exists() { fs::create_dir(&meth_dir).context(format!("Can't create {}", meth_dir.display()))?; } // let time_point_dir = dir // .parent() // .context(format!("No parent dir for {}", dir.display()))?; let time_point = dir.file_name().unwrap().to_str().unwrap(); let id_dir = dir .parent() .context(format!("No parent dir for {}", dir.display()))?; let id = id_dir.file_name().unwrap().to_str().unwrap(); let pileup_bed = meth_dir.join(format!("{id}_{time_point}_5mC_5hmC_modPileup.bed")); let pileup_bed_str = pileup_bed.to_str().unwrap(); let mut cmd_run = CommandRun::new( &config.bin, &[ "pileup", "-t", &config.threads.to_string(), bam.to_str().unwrap(), pileup_bed_str, "--log-filepath", meth_dir.join("pileup.log").to_str().unwrap(), ], ); let _ = run_wait(&mut cmd_run)?; let mut cmd_compress = CommandRun::new(&config.crabz_bin, &["-I", "-f", "bgzf", pileup_bed_str]); let _ = run_wait(&mut cmd_compress).context(format!( "Error while runnig crabz for {}", pileup_bed.display() ))?; let final_bed = format!("{pileup_bed_str}.gz"); let mut cmd_tabix = CommandRun::new(&config.tabix_bin, &[&final_bed]); let _ = run_wait(&mut cmd_tabix).context(format!("Error while runnig tabix for {final_bed}"))?; Ok(()) } pub fn modkit_dmr( a: &str, b: &str, out: &str, base: &str, config: &ModkitConfig, ) -> anyhow::Result<()> { let args = [ "dmr", "pair", "-m", base, "--ref", &config.reference, "-r", &config.cgi, "-t", &config.threads.to_string(), "-a", a, "-b", b, "-o", out, ]; let mut cmd_run = CommandRun::new(&config.bin, &args); run_wait(&mut cmd_run).context(format!("Error while running modkit {}", args.join(" ")))?; let mut cmd_tabix = CommandRun::new(&config.tabix_bin, &[out]); let _ = run_wait(&mut cmd_tabix).context(format!("Error while runnig tabix for {out}"))?; Ok(()) } pub fn dmr_c_mrd_diag(id: &str, config: &ModkitConfig) -> anyhow::Result<()> { let mrd = format!( "{}/{id}/mrd/5mC_5hmC/{id}_mrd_5mC_5hmC_modPileup.bed.gz", config.result_dir ); let diag = format!( "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_modPileup.bed.gz", config.result_dir ); let out = format!( "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_DMR_C_mrd.bed.gz", config.result_dir ); modkit_dmr(&mrd, &diag, &out, "C", config) } pub struct ModkitSummary { pub id: String, pub time: String, pub bam: String, pub threads: u8, pub log_dir: String, pub summary_file: String, pub result: Option, } impl InitializeSolo for ModkitSummary { fn initialize(id: &str, time: &str, config: crate::config::Config) -> anyhow::Result { let id = id.to_string(); let time = time.to_string(); let log_dir = format!("{}/{}/log/modkit_summary", config.result_dir, id); if !Path::new(&log_dir).exists() { fs::create_dir_all(&log_dir) .context(format!("Failed to create {log_dir} directory"))?; } let bam = config.solo_bam(&id, &time); if !Path::new(&bam).exists() { anyhow::bail!("Required bam file doesn't exist: {bam}"); } let summary_file = config.modkit_summary_file(&id, &time); Ok(ModkitSummary { id, time, bam, log_dir, result: None, summary_file, threads: config.modkit_summary_threads, }) } } impl Run for ModkitSummary { fn run(&mut self) -> anyhow::Result<()> { let modkit_args = [ "summary", "-t", &self.threads.to_string(), &self.bam, ">", &self.summary_file ]; let args = [ "-c", &modkit_args.join(" ") ]; let mut cmd_run = CommandRun::new("bash", &args); let report = run_wait(&mut cmd_run).context(format!( "Error while running `modkit summary {}`", args.join(" ") ))?; let log_file = format!("{}/modkit_summary_", self.log_dir); report .save_to_file(&log_file) .context(format!("Error while writing logs into {log_file}"))?; Ok(()) } } impl ModkitSummary { pub fn load(&mut self) -> anyhow::Result<()> { if !Path::new(&self.summary_file).exists() { self.run()?; } self.result = Some(ModkitSummaryResult::parse_file(&self.summary_file)?); Ok(()) } } #[derive(Debug)] pub struct ModkitSummaryResult { pub base: char, pub total_reads_used: u64, pub count_reads: u64, pub pass_threshold: f64, pub base_data: Vec, } #[derive(Debug)] pub struct ModkitSummaryBase { pub base: char, pub code: char, pub pass_count: u64, pub pass_frac: f64, pub all_count: u64, pub all_frac: f64, } impl ModkitSummaryResult { fn parse_file(filename: &str) -> anyhow::Result { let file = File::open(filename).context("Failed to open file")?; let reader = BufReader::new(file); let mut lines = reader.lines(); let base = lines .next() .context("Missing base line")? .context("Failed to read base line")? .split_whitespace() .last() .context("Invalid base line")? .chars() .next() .context("Invalid base character")?; let total_reads_used = parse_value(&mut lines, "total_reads_used")?; let count_reads = parse_value(&mut lines, "count_reads_C")?; let pass_threshold = parse_value(&mut lines, "pass_threshold_C")?; lines.next(); // Skip header line let mut base_data = Vec::new(); for line in lines { let line = line.context("Failed to read line")?; let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() == 6 { base_data.push(ModkitSummaryBase { base: parts[0].chars().next().context("Invalid base character")?, code: parts[1].chars().next().context("Invalid code character")?, pass_count: u64::from_str(parts[2]).context("Invalid pass_count")?, pass_frac: f64::from_str(parts[3]).context("Invalid pass_frac")?, all_count: u64::from_str(parts[4]).context("Invalid all_count")?, all_frac: f64::from_str(parts[5]).context("Invalid all_frac")?, }); } } Ok(ModkitSummaryResult { base, total_reads_used, count_reads, pass_threshold, base_data, }) } } fn parse_value( lines: &mut std::io::Lines>, key: &str, ) -> anyhow::Result where T::Err: std::error::Error + Send + Sync + 'static, { let line = lines .next() .context(format!("Missing {} line", key))? .context(format!("Failed to read {} line", key))?; let value = line .split_whitespace() .last() .context(format!("Invalid {} line", key))?; T::from_str(value).context(format!("Failed to parse {} value", key)) }