nanomonsv.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. use rayon::prelude::*;
  2. use std::{
  3. fs::{self},
  4. path::{Path, PathBuf},
  5. thread,
  6. };
  7. use anyhow::Context;
  8. use log::{debug, error, info, warn};
  9. use crate::{
  10. annotation::{Annotation, Annotations, Caller, CallerCat},
  11. collection::{vcf::Vcf, Initialize, InitializeSolo},
  12. commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
  13. config::Config,
  14. io::vcf::read_vcf,
  15. runners::{run_wait, CommandRun, Run, RunReport},
  16. variant::{
  17. variant::{RunnerVariants, Variants},
  18. variant_collection::VariantCollection,
  19. },
  20. };
  21. #[derive(Debug)]
  22. pub struct NanomonSV {
  23. pub id: String,
  24. pub diag_bam: String,
  25. pub mrd_bam: String,
  26. pub diag_out_dir: String,
  27. pub mrd_out_dir: String,
  28. pub log_dir: String,
  29. pub vcf_passed: String,
  30. pub config: Config,
  31. }
  32. impl Initialize for NanomonSV {
  33. fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
  34. let id = id.to_string();
  35. info!("Initialize Nanomonsv for {id}.");
  36. let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
  37. if !Path::new(&log_dir).exists() {
  38. fs::create_dir_all(&log_dir)
  39. .context(format!("Failed to create {log_dir} directory"))?;
  40. }
  41. let diag_bam = config.tumoral_bam(&id);
  42. let mrd_bam = config.normal_bam(&id);
  43. let diag_out_dir = config.nanomonsv_output_dir(&id, "diag");
  44. fs::create_dir_all(&diag_out_dir)?;
  45. let mrd_out_dir = config.nanomonsv_output_dir(&id, "mrd");
  46. fs::create_dir_all(&mrd_out_dir)?;
  47. let vcf_passed = config.nanomonsv_passed_vcf(&id);
  48. Ok(Self {
  49. id,
  50. diag_bam,
  51. mrd_bam,
  52. diag_out_dir,
  53. mrd_out_dir,
  54. log_dir,
  55. vcf_passed,
  56. config,
  57. })
  58. }
  59. }
  60. impl Run for NanomonSV {
  61. fn run(&mut self) -> anyhow::Result<()> {
  62. somatic_parse(self)?;
  63. let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
  64. let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
  65. let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
  66. let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
  67. if !Path::new(&mrd_result_vcf).exists() {
  68. info!("Nanomonsv get from normal bam: {}.", self.mrd_bam);
  69. let report = nanomonsv_get(&self.mrd_bam, &mrd_out_prefix, None, None, &self.config)
  70. .context(format!(
  71. "Error while running NanomonSV get for {mrd_result_vcf}"
  72. ))?;
  73. report.save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))?;
  74. }
  75. if !Path::new(&diag_result_vcf).exists() {
  76. info!("NanomonSV get from tumoral bam: {}.", self.diag_bam);
  77. let report = nanomonsv_get(
  78. &self.diag_bam,
  79. &diag_out_prefix,
  80. Some(&self.mrd_bam),
  81. Some(&mrd_out_prefix),
  82. &self.config,
  83. )
  84. .context(format!(
  85. "Error while running NanomonSV get for {diag_result_vcf}"
  86. ))?;
  87. report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?;
  88. }
  89. if !Path::new(&self.vcf_passed).exists() {
  90. let report = bcftools_keep_pass(
  91. &diag_result_vcf,
  92. &self.vcf_passed,
  93. BcftoolsConfig::default(),
  94. )
  95. .context(format!("Can't index {}", self.vcf_passed))?;
  96. report
  97. .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
  98. .context("Can't save report")?;
  99. }
  100. Ok(())
  101. }
  102. }
  103. impl CallerCat for NanomonSV {
  104. fn caller_cat(&self) -> (Caller, Annotation) {
  105. (Caller::NanomonSV, Annotation::Somatic)
  106. }
  107. }
  108. impl Variants for NanomonSV {
  109. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  110. let (caller, category) = self.caller_cat();
  111. let add = vec![Annotation::Callers(caller.clone()), category.clone()];
  112. info!(
  113. "Loading variants from {} {}: {}",
  114. caller, category, self.vcf_passed
  115. );
  116. let variants = read_vcf(&self.vcf_passed)?;
  117. variants.par_iter().for_each(|v| {
  118. annotations.insert_update(v.hash(), &add);
  119. });
  120. info!(
  121. "{} {}, {} variants loaded.",
  122. caller,
  123. category,
  124. variants.len()
  125. );
  126. Ok(VariantCollection {
  127. variants,
  128. vcf: Vcf::new(self.vcf_passed.clone().into())?,
  129. caller,
  130. category,
  131. })
  132. }
  133. }
  134. impl RunnerVariants for NanomonSV {}
  135. #[derive(Debug)]
  136. pub struct NanomonSVSolo {
  137. pub id: String,
  138. pub bam: String,
  139. pub time_point: String,
  140. pub out_dir: String,
  141. pub log_dir: String,
  142. pub vcf_passed: String,
  143. pub config: Config,
  144. }
  145. impl InitializeSolo for NanomonSVSolo {
  146. fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
  147. let id = id.to_string();
  148. info!("Initialize Nanomonsv solo for {id} {time}.");
  149. let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id);
  150. if !Path::new(&log_dir).exists() {
  151. fs::create_dir_all(&log_dir)
  152. .context(format!("Failed to create {log_dir} directory"))?;
  153. }
  154. let out_dir = config.nanomonsv_solo_output_dir(&id, time);
  155. fs::create_dir_all(&out_dir)?;
  156. let bam = config.solo_bam(&id, time);
  157. let vcf_passed = config.nanomonsv_solo_passed_vcf(&id, time);
  158. Ok(Self {
  159. id,
  160. bam,
  161. time_point: time.to_string(),
  162. out_dir,
  163. log_dir,
  164. vcf_passed,
  165. config,
  166. })
  167. }
  168. }
  169. impl Run for NanomonSVSolo {
  170. fn run(&mut self) -> anyhow::Result<()> {
  171. // Parse
  172. info!("Nanomonsv Parse");
  173. let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point);
  174. let info_vcf = format!("{out_prefix}.bp_info.sorted.bed.gz");
  175. if !Path::new(&info_vcf).exists() {
  176. let report = nanomonsv_parse(&self.bam, &out_prefix, &self.config).context(format!(
  177. "Error while running NanomonSV parse for {}",
  178. self.bam
  179. ))?;
  180. let log_file = format!("{}/nanomonsv_solo_parse_{}_", self.log_dir, self.time_point);
  181. report
  182. .save_to_file(&log_file)
  183. .context(format!("Can't write logs into {log_file}"))?;
  184. }
  185. // Get
  186. let result_vcf = format!("{out_prefix}.nanomonsv.result.vcf");
  187. if !Path::new(&result_vcf).exists() {
  188. info!("Nanomonsv Get");
  189. let report = nanomonsv_get(&self.bam, &out_prefix, None, None, &self.config).context(
  190. format!("Error while running NanomonSV get for {result_vcf}"),
  191. )?;
  192. let log_file = format!("{}/nanomonsv_solo_get_{}_", self.log_dir, self.time_point);
  193. report
  194. .save_to_file(&log_file)
  195. .context(format!("Error while writing log into {log_file}"))?;
  196. }
  197. if !Path::new(&self.vcf_passed).exists() {
  198. let report =
  199. bcftools_keep_pass(&result_vcf, &self.vcf_passed, BcftoolsConfig::default())
  200. .context(format!(
  201. "Error while running bcftools keep PASS for {result_vcf}"
  202. ))?;
  203. let log_file = format!("{}/bcftools_pass_", self.log_dir);
  204. report
  205. .save_to_file(&log_file)
  206. .context(format!("Error while writing log into {log_file}"))?;
  207. }
  208. Ok(())
  209. }
  210. }
  211. impl CallerCat for NanomonSVSolo {
  212. fn caller_cat(&self) -> (Caller, Annotation) {
  213. let Config {
  214. normal_name,
  215. tumoral_name,
  216. ..
  217. } = &self.config;
  218. let cat = if *normal_name == self.time_point {
  219. Annotation::SoloConstit
  220. } else if *tumoral_name == self.time_point {
  221. Annotation::SoloTumor
  222. } else {
  223. panic!("Error in time_point name: {}", self.time_point);
  224. };
  225. (Caller::NanomonSVSolo, cat)
  226. }
  227. }
  228. impl RunnerVariants for NanomonSVSolo {}
  229. impl Variants for NanomonSVSolo {
  230. fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
  231. let (caller, category) = self.caller_cat();
  232. let add = vec![Annotation::Callers(caller.clone()), category.clone()];
  233. info!(
  234. "Loading variant from ClairS {} with annotations: {:?}",
  235. self.id, add
  236. );
  237. let variants = read_vcf(&self.vcf_passed)?;
  238. variants.par_iter().for_each(|v| {
  239. let var_cat = v.alteration_category();
  240. annotations.insert_update(
  241. v.hash(),
  242. &vec![
  243. Annotation::Callers(caller.clone()),
  244. category.clone(),
  245. Annotation::AlterationCategory(var_cat),
  246. ],
  247. );
  248. });
  249. Ok(VariantCollection {
  250. variants,
  251. vcf: Vcf::new(self.vcf_passed.clone().into())?,
  252. caller,
  253. category,
  254. })
  255. }
  256. }
  257. // Helper functions
  258. pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::Result<RunReport> {
  259. let args = vec![
  260. "parse",
  261. "--reference_fasta",
  262. &config.reference,
  263. bam,
  264. out_prefix,
  265. ];
  266. let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
  267. let res = run_wait(&mut cmd_run)?;
  268. Ok(res)
  269. }
  270. pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
  271. let diag_out_prefix = format!("{}/{}_diag", s.diag_out_dir, s.id);
  272. let mrd_out_prefix = format!("{}/{}_mrd", s.mrd_out_dir, s.id);
  273. let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
  274. let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
  275. let mut threads_handles = Vec::new();
  276. if !Path::new(&diag_info_vcf).exists() {
  277. let diag_bam = s.diag_bam.clone();
  278. info!("Nanomonsv parse {diag_bam}.");
  279. let diag_out_prefix = diag_out_prefix.clone();
  280. let config = s.config.clone();
  281. let log_dir = s.log_dir.clone();
  282. let handle = thread::spawn(move || {
  283. let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, &config).unwrap();
  284. report
  285. .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
  286. .unwrap();
  287. });
  288. threads_handles.push(handle);
  289. } else {
  290. debug!("Nanonmonsv parse results already exists: {diag_info_vcf}");
  291. }
  292. if !Path::new(&mrd_info_vcf).exists() {
  293. let mrd_bam = s.mrd_bam.clone();
  294. info!("Nanomonsv parse {mrd_bam}.");
  295. let mrd_out_prefix = mrd_out_prefix.clone();
  296. let config = s.config.clone();
  297. let log_dir = s.log_dir.clone();
  298. let handle = thread::spawn(move || {
  299. let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, &config).unwrap();
  300. report
  301. .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
  302. .unwrap();
  303. });
  304. threads_handles.push(handle);
  305. } else {
  306. debug!("Nanonmonsv parse results already exists: {mrd_info_vcf}");
  307. }
  308. // Wait for all threads to finish
  309. for handle in threads_handles {
  310. handle.join().expect("Thread panicked for nanomonsv parse");
  311. }
  312. Ok(())
  313. }
  314. pub fn nanomonsv_get(
  315. bam: &str,
  316. out_prefix: &str,
  317. ctrl_bam: Option<&str>,
  318. ctrl_prefix: Option<&str>,
  319. config: &Config,
  320. ) -> anyhow::Result<RunReport> {
  321. let threads = config.nanomonsv_threads.to_string();
  322. let mut args = vec!["get"];
  323. if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
  324. args.extend(vec![
  325. "--control_prefix",
  326. ctrl_prefix,
  327. "--control_bam",
  328. ctrl_bam,
  329. ])
  330. }
  331. args.extend(vec![
  332. "--process",
  333. &threads,
  334. out_prefix,
  335. bam,
  336. &config.reference,
  337. ]);
  338. let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
  339. let res = run_wait(&mut cmd_run)?;
  340. Ok(res)
  341. }
  342. pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
  343. let mut passed_mrd = Vec::new();
  344. for mrd_dir in find_nanomonsv_dirs(&PathBuf::from(&config.result_dir), "mrd", 0, 3) {
  345. let mut passed = None;
  346. let mut passed_csi = None;
  347. let mut result = None;
  348. for entry in fs::read_dir(&mrd_dir)
  349. .with_context(|| format!("Failed to read directory: {}", mrd_dir.display()))?
  350. {
  351. let entry =
  352. entry.with_context(|| format!("Failed to read entry in: {}", mrd_dir.display()))?;
  353. let file_name = entry.file_name().to_string_lossy().to_string();
  354. let path = entry.path();
  355. if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz") {
  356. passed = Some(path);
  357. } else if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz.csi") {
  358. passed_csi = Some(path);
  359. } else if file_name.ends_with("_mrd.nanomonsv.result.vcf") {
  360. result = Some(path);
  361. }
  362. }
  363. match (result, passed.clone(), passed_csi) {
  364. (None, None, None) => error!("No result in {}", mrd_dir.display()),
  365. (Some(p), None, None) => {
  366. let output = replace_filename_suffix(
  367. &p,
  368. "_mrd.nanomonsv.result.vcf",
  369. "_mrd_nanomonsv_PASSED.vcf.gz",
  370. );
  371. info!("Do pass for {} to {}", p.display(), output.display());
  372. if let Err(r) = bcftools_keep_pass(
  373. p.to_str().unwrap(),
  374. output.to_str().unwrap(),
  375. BcftoolsConfig::default(),
  376. ) {
  377. error!("{r}");
  378. } else {
  379. passed_mrd.push(output);
  380. }
  381. }
  382. (Some(_), Some(p), None) => warn!("Do csi for {}", p.display()),
  383. (Some(_), Some(p), Some(_)) => passed_mrd.push(p),
  384. _ => {} // All files found
  385. }
  386. }
  387. println!("{} vcf to concat", passed_mrd.len());
  388. bcftools_concat(
  389. passed_mrd
  390. .iter()
  391. .map(|p| p.to_string_lossy().to_string())
  392. .collect(),
  393. pon_path,
  394. BcftoolsConfig::default(),
  395. )?;
  396. Ok(())
  397. }
  398. pub fn find_nanomonsv_dirs(
  399. root: &Path,
  400. time_point: &str,
  401. depth: usize,
  402. max_depth: usize,
  403. ) -> Vec<PathBuf> {
  404. if depth >= max_depth {
  405. return Vec::new();
  406. }
  407. let entries: Vec<_> = match fs::read_dir(root) {
  408. Ok(entries) => entries.filter_map(Result::ok).collect(),
  409. Err(_) => return Vec::new(),
  410. };
  411. let mut result: Vec<PathBuf> = entries
  412. .iter()
  413. .filter_map(|entry| {
  414. let path = entry.path();
  415. if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false)
  416. && path
  417. .to_string_lossy()
  418. .contains(&format!("{}/nanomonsv", time_point))
  419. {
  420. Some(path)
  421. } else {
  422. None
  423. }
  424. })
  425. .collect();
  426. result.extend(
  427. entries
  428. .iter()
  429. .filter(|entry| entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false))
  430. .flat_map(|dir| find_nanomonsv_dirs(&dir.path(), time_point, depth + 1, max_depth)),
  431. );
  432. result
  433. }
  434. pub fn replace_filename_suffix(path: &Path, from: &str, to: &str) -> PathBuf {
  435. let file_name = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
  436. let new_file_name = file_name.replace(from, to);
  437. path.with_file_name(new_file_name)
  438. }