modkit.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. use std::{
  2. fs::{self, File}, io::{BufRead, BufReader}, path::{Path, PathBuf}, str::FromStr
  3. };
  4. use anyhow::Context;
  5. use crate::{
  6. collection::InitializeSolo,
  7. runners::{run_wait, CommandRun, Run},
  8. };
  9. #[derive(Debug, Clone)]
  10. pub struct ModkitConfig {
  11. pub bin: String,
  12. pub crabz_bin: String,
  13. pub tabix_bin: String,
  14. pub reference: String,
  15. pub cgi: String,
  16. pub threads: u8,
  17. pub result_dir: String,
  18. }
  19. impl Default for ModkitConfig {
  20. fn default() -> Self {
  21. Self {
  22. bin: "modkit".to_string(),
  23. crabz_bin: "crabz".to_string(),
  24. tabix_bin: "tabix".to_string(),
  25. reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
  26. cgi: "/data/ref/hs1/chm13v2.0_CGI_corrected.bed".to_string(),
  27. threads: 155,
  28. result_dir: "/data/longreads_basic_pipe".to_string(),
  29. }
  30. }
  31. }
  32. pub fn bed_methyl(bam: PathBuf, config: &ModkitConfig) -> anyhow::Result<()> {
  33. let dir = bam
  34. .parent()
  35. .context(format!("No parent dir for {}", bam.display()))?;
  36. let meth_dir = dir.join("5mC_5hmC");
  37. // create modified base dir
  38. if !meth_dir.exists() {
  39. fs::create_dir(&meth_dir).context(format!("Can't create {}", meth_dir.display()))?;
  40. }
  41. // let time_point_dir = dir
  42. // .parent()
  43. // .context(format!("No parent dir for {}", dir.display()))?;
  44. let time_point = dir.file_name().unwrap().to_str().unwrap();
  45. let id_dir = dir
  46. .parent()
  47. .context(format!("No parent dir for {}", dir.display()))?;
  48. let id = id_dir.file_name().unwrap().to_str().unwrap();
  49. let pileup_bed = meth_dir.join(format!("{id}_{time_point}_5mC_5hmC_modPileup.bed"));
  50. let pileup_bed_str = pileup_bed.to_str().unwrap();
  51. let mut cmd_run = CommandRun::new(
  52. &config.bin,
  53. &[
  54. "pileup",
  55. "-t",
  56. &config.threads.to_string(),
  57. bam.to_str().unwrap(),
  58. pileup_bed_str,
  59. "--log-filepath",
  60. meth_dir.join("pileup.log").to_str().unwrap(),
  61. ],
  62. );
  63. let _ = run_wait(&mut cmd_run)?;
  64. let mut cmd_compress =
  65. CommandRun::new(&config.crabz_bin, &["-I", "-f", "bgzf", pileup_bed_str]);
  66. let _ = run_wait(&mut cmd_compress).context(format!(
  67. "Error while runnig crabz for {}",
  68. pileup_bed.display()
  69. ))?;
  70. let final_bed = format!("{pileup_bed_str}.gz");
  71. let mut cmd_tabix = CommandRun::new(&config.tabix_bin, &[&final_bed]);
  72. let _ =
  73. run_wait(&mut cmd_tabix).context(format!("Error while runnig tabix for {final_bed}"))?;
  74. Ok(())
  75. }
  76. pub fn modkit_dmr(
  77. a: &str,
  78. b: &str,
  79. out: &str,
  80. base: &str,
  81. config: &ModkitConfig,
  82. ) -> anyhow::Result<()> {
  83. let args = [
  84. "dmr",
  85. "pair",
  86. "-m",
  87. base,
  88. "--ref",
  89. &config.reference,
  90. "-r",
  91. &config.cgi,
  92. "-t",
  93. &config.threads.to_string(),
  94. "-a",
  95. a,
  96. "-b",
  97. b,
  98. "-o",
  99. out,
  100. ];
  101. let mut cmd_run = CommandRun::new(&config.bin, &args);
  102. run_wait(&mut cmd_run).context(format!("Error while running modkit {}", args.join(" ")))?;
  103. let mut cmd_tabix = CommandRun::new(&config.tabix_bin, &[out]);
  104. let _ = run_wait(&mut cmd_tabix).context(format!("Error while runnig tabix for {out}"))?;
  105. Ok(())
  106. }
  107. pub fn dmr_c_mrd_diag(id: &str, config: &ModkitConfig) -> anyhow::Result<()> {
  108. let mrd = format!(
  109. "{}/{id}/mrd/5mC_5hmC/{id}_mrd_5mC_5hmC_modPileup.bed.gz",
  110. config.result_dir
  111. );
  112. let diag = format!(
  113. "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_modPileup.bed.gz",
  114. config.result_dir
  115. );
  116. let out = format!(
  117. "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_DMR_C_mrd.bed.gz",
  118. config.result_dir
  119. );
  120. modkit_dmr(&mrd, &diag, &out, "C", config)
  121. }
  122. pub struct ModkitSummary {
  123. pub id: String,
  124. pub time: String,
  125. pub bam: String,
  126. pub threads: u8,
  127. pub log_dir: String,
  128. pub summary_file: String,
  129. pub result: Option<ModkitSummaryResult>,
  130. }
  131. impl InitializeSolo for ModkitSummary {
  132. fn initialize(id: &str, time: &str, config: crate::config::Config) -> anyhow::Result<Self> {
  133. let id = id.to_string();
  134. let time = time.to_string();
  135. let log_dir = format!("{}/{}/log/modkit_summary", config.result_dir, id);
  136. if !Path::new(&log_dir).exists() {
  137. fs::create_dir_all(&log_dir)
  138. .context(format!("Failed to create {log_dir} directory"))?;
  139. }
  140. let bam = config.solo_bam(&id, &time);
  141. if !Path::new(&bam).exists() {
  142. anyhow::bail!("Required bam file doesn't exist: {bam}");
  143. }
  144. let summary_file = config.modkit_summary_file(&id, &time);
  145. Ok(ModkitSummary {
  146. id,
  147. time,
  148. bam,
  149. log_dir,
  150. result: None,
  151. summary_file,
  152. threads: config.modkit_summary_threads,
  153. })
  154. }
  155. }
  156. impl Run for ModkitSummary {
  157. fn run(&mut self) -> anyhow::Result<()> {
  158. let modkit_args = [
  159. "summary",
  160. "-t",
  161. &self.threads.to_string(),
  162. &self.bam,
  163. ">",
  164. &self.summary_file
  165. ];
  166. let args = [
  167. "-c",
  168. &modkit_args.join(" ")
  169. ];
  170. let mut cmd_run = CommandRun::new("bash", &args);
  171. let report = run_wait(&mut cmd_run).context(format!(
  172. "Error while running `modkit summary {}`",
  173. args.join(" ")
  174. ))?;
  175. let log_file = format!("{}/modkit_summary_", self.log_dir);
  176. report
  177. .save_to_file(&log_file)
  178. .context(format!("Error while writing logs into {log_file}"))?;
  179. Ok(())
  180. }
  181. }
  182. impl ModkitSummary {
  183. pub fn load(&mut self) -> anyhow::Result<()> {
  184. if !Path::new(&self.summary_file).exists() {
  185. self.run()?;
  186. }
  187. self.result = Some(ModkitSummaryResult::parse_file(&self.summary_file)?);
  188. Ok(())
  189. }
  190. }
  191. #[derive(Debug)]
  192. pub struct ModkitSummaryResult {
  193. pub base: char,
  194. pub total_reads_used: u64,
  195. pub count_reads: u64,
  196. pub pass_threshold: f64,
  197. pub base_data: Vec<ModkitSummaryBase>,
  198. }
  199. #[derive(Debug)]
  200. pub struct ModkitSummaryBase {
  201. pub base: char,
  202. pub code: char,
  203. pub pass_count: u64,
  204. pub pass_frac: f64,
  205. pub all_count: u64,
  206. pub all_frac: f64,
  207. }
  208. impl ModkitSummaryResult {
  209. fn parse_file(filename: &str) -> anyhow::Result<Self> {
  210. let file = File::open(filename).context("Failed to open file")?;
  211. let reader = BufReader::new(file);
  212. let mut lines = reader.lines();
  213. let base = lines
  214. .next()
  215. .context("Missing base line")?
  216. .context("Failed to read base line")?
  217. .split_whitespace()
  218. .last()
  219. .context("Invalid base line")?
  220. .chars()
  221. .next()
  222. .context("Invalid base character")?;
  223. let total_reads_used = parse_value(&mut lines, "total_reads_used")?;
  224. let count_reads = parse_value(&mut lines, "count_reads_C")?;
  225. let pass_threshold = parse_value(&mut lines, "pass_threshold_C")?;
  226. lines.next(); // Skip header line
  227. let mut base_data = Vec::new();
  228. for line in lines {
  229. let line = line.context("Failed to read line")?;
  230. let parts: Vec<&str> = line.split_whitespace().collect();
  231. if parts.len() == 6 {
  232. base_data.push(ModkitSummaryBase {
  233. base: parts[0].chars().next().context("Invalid base character")?,
  234. code: parts[1].chars().next().context("Invalid code character")?,
  235. pass_count: u64::from_str(parts[2]).context("Invalid pass_count")?,
  236. pass_frac: f64::from_str(parts[3]).context("Invalid pass_frac")?,
  237. all_count: u64::from_str(parts[4]).context("Invalid all_count")?,
  238. all_frac: f64::from_str(parts[5]).context("Invalid all_frac")?,
  239. });
  240. }
  241. }
  242. Ok(ModkitSummaryResult {
  243. base,
  244. total_reads_used,
  245. count_reads,
  246. pass_threshold,
  247. base_data,
  248. })
  249. }
  250. }
  251. fn parse_value<T: FromStr>(
  252. lines: &mut std::io::Lines<BufReader<File>>,
  253. key: &str,
  254. ) -> anyhow::Result<T>
  255. where
  256. T::Err: std::error::Error + Send + Sync + 'static,
  257. {
  258. let line = lines
  259. .next()
  260. .context(format!("Missing {} line", key))?
  261. .context(format!("Failed to read {} line", key))?;
  262. let value = line
  263. .split_whitespace()
  264. .last()
  265. .context(format!("Invalid {} line", key))?;
  266. T::from_str(value).context(format!("Failed to parse {} value", key))
  267. }