| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- use rayon::prelude::*;
- use std::{
- fs::{self},
- path::{Path, PathBuf},
- thread,
- };
- use anyhow::Context;
- use log::{debug, error, info, warn};
- use crate::{
- annotation::{Annotation, Annotations, Caller, CallerCat},
- collection::{vcf::Vcf, Initialize, InitializeSolo},
- commands::bcftools::{bcftools_concat, bcftools_keep_pass, BcftoolsConfig},
- config::Config,
- io::vcf::read_vcf,
- runners::{run_wait, CommandRun, Run, RunReport},
- variant::{
- variant::{RunnerVariants, Variants},
- variant_collection::VariantCollection,
- },
- };
- #[derive(Debug)]
- pub struct NanomonSV {
- pub id: String,
- pub diag_bam: String,
- pub mrd_bam: String,
- pub diag_out_dir: String,
- pub mrd_out_dir: String,
- pub log_dir: String,
- pub vcf_passed: String,
- pub config: Config,
- }
- impl Initialize for NanomonSV {
- fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- info!("Initialize Nanomonsv for {id}.");
- let log_dir = format!("{}/{}/log/clairs", config.result_dir, &id);
- if !Path::new(&log_dir).exists() {
- fs::create_dir_all(&log_dir)
- .context(format!("Failed to create {log_dir} directory"))?;
- }
- let diag_bam = config.tumoral_bam(&id);
- let mrd_bam = config.normal_bam(&id);
- let diag_out_dir = config.nanomonsv_output_dir(&id, "diag");
- fs::create_dir_all(&diag_out_dir)?;
- let mrd_out_dir = config.nanomonsv_output_dir(&id, "mrd");
- fs::create_dir_all(&mrd_out_dir)?;
- let vcf_passed = config.nanomonsv_passed_vcf(&id);
- Ok(Self {
- id,
- diag_bam,
- mrd_bam,
- diag_out_dir,
- mrd_out_dir,
- log_dir,
- vcf_passed,
- config,
- })
- }
- }
- impl Run for NanomonSV {
- fn run(&mut self) -> anyhow::Result<()> {
- somatic_parse(self)?;
- let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
- let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
- let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
- let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
- if !Path::new(&mrd_result_vcf).exists() {
- info!("Nanomonsv get from normal bam: {}.", self.mrd_bam);
- let report = nanomonsv_get(&self.mrd_bam, &mrd_out_prefix, None, None, &self.config)
- .context(format!(
- "Error while running NanomonSV get for {mrd_result_vcf}"
- ))?;
- report.save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))?;
- }
- if !Path::new(&diag_result_vcf).exists() {
- info!("NanomonSV get from tumoral bam: {}.", self.diag_bam);
- let report = nanomonsv_get(
- &self.diag_bam,
- &diag_out_prefix,
- Some(&self.mrd_bam),
- Some(&mrd_out_prefix),
- &self.config,
- )
- .context(format!(
- "Error while running NanomonSV get for {diag_result_vcf}"
- ))?;
- report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?;
- }
- if !Path::new(&self.vcf_passed).exists() {
- let report = bcftools_keep_pass(
- &diag_result_vcf,
- &self.vcf_passed,
- BcftoolsConfig::default(),
- )
- .context(format!("Can't index {}", self.vcf_passed))?;
- report
- .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
- .context("Can't save report")?;
- }
- Ok(())
- }
- }
- impl CallerCat for NanomonSV {
- fn caller_cat(&self) -> (Caller, Annotation) {
- (Caller::NanomonSV, Annotation::Somatic)
- }
- }
- impl Variants for NanomonSV {
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let (caller, category) = self.caller_cat();
- let add = vec![Annotation::Callers(caller.clone()), category.clone()];
- info!(
- "Loading variants from {} {}: {}",
- caller, category, self.vcf_passed
- );
- let variants = read_vcf(&self.vcf_passed)?;
- variants.par_iter().for_each(|v| {
- annotations.insert_update(v.hash(), &add);
- });
- info!(
- "{} {}, {} variants loaded.",
- caller,
- category,
- variants.len()
- );
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(self.vcf_passed.clone().into())?,
- caller,
- category,
- })
- }
- }
- impl RunnerVariants for NanomonSV {}
- #[derive(Debug)]
- pub struct NanomonSVSolo {
- pub id: String,
- pub bam: String,
- pub time_point: String,
- pub out_dir: String,
- pub log_dir: String,
- pub vcf_passed: String,
- pub config: Config,
- }
- impl InitializeSolo for NanomonSVSolo {
- fn initialize(id: &str, time: &str, config: Config) -> anyhow::Result<Self> {
- let id = id.to_string();
- info!("Initialize Nanomonsv solo for {id} {time}.");
- let log_dir = format!("{}/{}/log/nanomonsv_solo", config.result_dir, &id);
- if !Path::new(&log_dir).exists() {
- fs::create_dir_all(&log_dir)
- .context(format!("Failed to create {log_dir} directory"))?;
- }
- let out_dir = config.nanomonsv_solo_output_dir(&id, time);
- fs::create_dir_all(&out_dir)?;
- let bam = config.solo_bam(&id, time);
- let vcf_passed = config.nanomonsv_solo_passed_vcf(&id, time);
- Ok(Self {
- id,
- bam,
- time_point: time.to_string(),
- out_dir,
- log_dir,
- vcf_passed,
- config,
- })
- }
- }
- impl Run for NanomonSVSolo {
- fn run(&mut self) -> anyhow::Result<()> {
- // Parse
- info!("Nanomonsv Parse");
- let out_prefix = format!("{}/{}_{}", self.out_dir, self.id, self.time_point);
- let info_vcf = format!("{out_prefix}.bp_info.sorted.bed.gz");
- if !Path::new(&info_vcf).exists() {
- let report = nanomonsv_parse(&self.bam, &out_prefix, &self.config).context(format!(
- "Error while running NanomonSV parse for {}",
- self.bam
- ))?;
- let log_file = format!("{}/nanomonsv_solo_parse_{}_", self.log_dir, self.time_point);
- report
- .save_to_file(&log_file)
- .context(format!("Can't write logs into {log_file}"))?;
- }
- // Get
- let result_vcf = format!("{out_prefix}.nanomonsv.result.vcf");
- if !Path::new(&result_vcf).exists() {
- info!("Nanomonsv Get");
- let report = nanomonsv_get(&self.bam, &out_prefix, None, None, &self.config).context(
- format!("Error while running NanomonSV get for {result_vcf}"),
- )?;
- let log_file = format!("{}/nanomonsv_solo_get_{}_", self.log_dir, self.time_point);
- report
- .save_to_file(&log_file)
- .context(format!("Error while writing log into {log_file}"))?;
- }
- if !Path::new(&self.vcf_passed).exists() {
- let report =
- bcftools_keep_pass(&result_vcf, &self.vcf_passed, BcftoolsConfig::default())
- .context(format!(
- "Error while running bcftools keep PASS for {result_vcf}"
- ))?;
- let log_file = format!("{}/bcftools_pass_", self.log_dir);
- report
- .save_to_file(&log_file)
- .context(format!("Error while writing log into {log_file}"))?;
- }
- Ok(())
- }
- }
- impl CallerCat for NanomonSVSolo {
- fn caller_cat(&self) -> (Caller, Annotation) {
- let Config {
- normal_name,
- tumoral_name,
- ..
- } = &self.config;
- let cat = if *normal_name == self.time_point {
- Annotation::SoloConstit
- } else if *tumoral_name == self.time_point {
- Annotation::SoloTumor
- } else {
- panic!("Error in time_point name: {}", self.time_point);
- };
- (Caller::NanomonSVSolo, cat)
- }
- }
- impl RunnerVariants for NanomonSVSolo {}
- impl Variants for NanomonSVSolo {
- fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
- let (caller, category) = self.caller_cat();
- let add = vec![Annotation::Callers(caller.clone()), category.clone()];
- info!(
- "Loading variant from ClairS {} with annotations: {:?}",
- self.id, add
- );
- let variants = read_vcf(&self.vcf_passed)?;
- variants.par_iter().for_each(|v| {
- let var_cat = v.alteration_category();
- annotations.insert_update(
- v.hash(),
- &vec![
- Annotation::Callers(caller.clone()),
- category.clone(),
- Annotation::AlterationCategory(var_cat),
- ],
- );
- });
- Ok(VariantCollection {
- variants,
- vcf: Vcf::new(self.vcf_passed.clone().into())?,
- caller,
- category,
- })
- }
- }
- // Helper functions
- pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::Result<RunReport> {
- let args = vec![
- "parse",
- "--reference_fasta",
- &config.reference,
- bam,
- out_prefix,
- ];
- let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
- let res = run_wait(&mut cmd_run)?;
- Ok(res)
- }
- pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
- let diag_out_prefix = format!("{}/{}_diag", s.diag_out_dir, s.id);
- let mrd_out_prefix = format!("{}/{}_mrd", s.mrd_out_dir, s.id);
- let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
- let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
- let mut threads_handles = Vec::new();
- if !Path::new(&diag_info_vcf).exists() {
- let diag_bam = s.diag_bam.clone();
- info!("Nanomonsv parse {diag_bam}.");
- let diag_out_prefix = diag_out_prefix.clone();
- let config = s.config.clone();
- let log_dir = s.log_dir.clone();
- let handle = thread::spawn(move || {
- let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, &config).unwrap();
- report
- .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
- .unwrap();
- });
- threads_handles.push(handle);
- } else {
- debug!("Nanonmonsv parse results already exists: {diag_info_vcf}");
- }
- if !Path::new(&mrd_info_vcf).exists() {
- let mrd_bam = s.mrd_bam.clone();
- info!("Nanomonsv parse {mrd_bam}.");
- let mrd_out_prefix = mrd_out_prefix.clone();
- let config = s.config.clone();
- let log_dir = s.log_dir.clone();
- let handle = thread::spawn(move || {
- let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, &config).unwrap();
- report
- .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
- .unwrap();
- });
- threads_handles.push(handle);
- } else {
- debug!("Nanonmonsv parse results already exists: {mrd_info_vcf}");
- }
- // Wait for all threads to finish
- for handle in threads_handles {
- handle.join().expect("Thread panicked for nanomonsv parse");
- }
- Ok(())
- }
- pub fn nanomonsv_get(
- bam: &str,
- out_prefix: &str,
- ctrl_bam: Option<&str>,
- ctrl_prefix: Option<&str>,
- config: &Config,
- ) -> anyhow::Result<RunReport> {
- let threads = config.nanomonsv_threads.to_string();
- let mut args = vec!["get"];
- if let (Some(ctrl_bam), Some(ctrl_prefix)) = (ctrl_bam, ctrl_prefix) {
- args.extend(vec![
- "--control_prefix",
- ctrl_prefix,
- "--control_bam",
- ctrl_bam,
- ])
- }
- args.extend(vec![
- "--process",
- &threads,
- out_prefix,
- bam,
- &config.reference,
- ]);
- let mut cmd_run = CommandRun::new(&config.nanomonsv_bin, &args);
- let res = run_wait(&mut cmd_run)?;
- Ok(res)
- }
- pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
- let mut passed_mrd = Vec::new();
- for mrd_dir in find_nanomonsv_dirs(&PathBuf::from(&config.result_dir), "mrd", 0, 3) {
- let mut passed = None;
- let mut passed_csi = None;
- let mut result = None;
- for entry in fs::read_dir(&mrd_dir)
- .with_context(|| format!("Failed to read directory: {}", mrd_dir.display()))?
- {
- let entry =
- entry.with_context(|| format!("Failed to read entry in: {}", mrd_dir.display()))?;
- let file_name = entry.file_name().to_string_lossy().to_string();
- let path = entry.path();
- if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz") {
- passed = Some(path);
- } else if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz.csi") {
- passed_csi = Some(path);
- } else if file_name.ends_with("_mrd.nanomonsv.result.vcf") {
- result = Some(path);
- }
- }
- match (result, passed.clone(), passed_csi) {
- (None, None, None) => error!("No result in {}", mrd_dir.display()),
- (Some(p), None, None) => {
- let output = replace_filename_suffix(
- &p,
- "_mrd.nanomonsv.result.vcf",
- "_mrd_nanomonsv_PASSED.vcf.gz",
- );
- info!("Do pass for {} to {}", p.display(), output.display());
- if let Err(r) = bcftools_keep_pass(
- p.to_str().unwrap(),
- output.to_str().unwrap(),
- BcftoolsConfig::default(),
- ) {
- error!("{r}");
- } else {
- passed_mrd.push(output);
- }
- }
- (Some(_), Some(p), None) => warn!("Do csi for {}", p.display()),
- (Some(_), Some(p), Some(_)) => passed_mrd.push(p),
- _ => {} // All files found
- }
- }
- println!("{} vcf to concat", passed_mrd.len());
- bcftools_concat(
- passed_mrd
- .iter()
- .map(|p| p.to_string_lossy().to_string())
- .collect(),
- pon_path,
- BcftoolsConfig::default(),
- )?;
- Ok(())
- }
- pub fn find_nanomonsv_dirs(
- root: &Path,
- time_point: &str,
- depth: usize,
- max_depth: usize,
- ) -> Vec<PathBuf> {
- if depth >= max_depth {
- return Vec::new();
- }
- let entries: Vec<_> = match fs::read_dir(root) {
- Ok(entries) => entries.filter_map(Result::ok).collect(),
- Err(_) => return Vec::new(),
- };
- let mut result: Vec<PathBuf> = entries
- .iter()
- .filter_map(|entry| {
- let path = entry.path();
- if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false)
- && path
- .to_string_lossy()
- .contains(&format!("{}/nanomonsv", time_point))
- {
- Some(path)
- } else {
- None
- }
- })
- .collect();
- result.extend(
- entries
- .iter()
- .filter(|entry| entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false))
- .flat_map(|dir| find_nanomonsv_dirs(&dir.path(), time_point, depth + 1, max_depth)),
- );
- result
- }
- pub fn replace_filename_suffix(path: &Path, from: &str, to: &str) -> PathBuf {
- let file_name = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
- let new_file_name = file_name.replace(from, to);
- path.with_file_name(new_file_name)
- }
|