// reference="/data/ref/hs1/chm13v2.0.fa" // bcftools="/data/tools/bcftools-1.18/bcftools" // // DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag" // MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd" // // DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv" // MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv" // // DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam" // MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam" // // DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag" // MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd" // // if [ ! -d "$DIAG_OUTPUT_DIR" ]; then // mkdir "$DIAG_OUTPUT_DIR" // fi // // if [ ! -d "$MRD_OUTPUT_DIR" ]; then // mkdir "$MRD_OUTPUT_DIR" // fi // // # PARSE // if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then // nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX" // fi // // if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then // echo "[pipe] NanomonSV parse ${MRD_BAM}" // nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX" // fi // // # GET // if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then // nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \ // --control_bam "$MRD_BAM" \ // --processes 155 \ // "$DIAG_OUT_PREFIX" \ // "$DIAG_BAM" "$reference" // fi // // # GET MRD // if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then // nanomonsv get \ // --processes 155 \ // "$MRD_OUT_PREFIX" \ // "$MRD_BAM" "$reference" // fi // // vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz" // if [ ! -f "$vcf_passed" ]; then // $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \ // $bcftools view -s "TUMOR" --write-index --threads 20 \ // -i "FILTER='PASS'" \ // -o "$vcf_passed" // fi use std::{ fs, io::{BufRead, BufReader}, path::Path, process::{Command, Stdio}, sync::{Arc, Mutex}, thread, }; use log::info; use crate::commands::bcftools::{bcftools_keep_pass, BcftoolsConfig}; #[derive(Debug)] pub struct NanomonSVConfig { pub bin: String, pub result_dir: String, pub reference: String, pub thread: u8, } impl Default for NanomonSVConfig { fn default() -> Self { Self { bin: "nanomonsv".to_string(), result_dir: "/data/longreads_basic_pipe".to_string(), reference: "/data/ref/hs1/chm13v2.0.fa".to_string(), thread: 155, } } } #[derive(Debug)] pub struct NanomonSV { pub id: String, pub diag_bam: String, pub mrd_bam: String, pub diag_out_dir: String, pub mrd_out_dir: String, pub log: String, pub config: NanomonSVConfig, } impl NanomonSV { pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self { let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir); let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir); NanomonSV { id: id.to_string(), diag_bam: diag_bam.to_string(), mrd_bam: mrd_bam.to_string(), diag_out_dir, mrd_out_dir, log: String::default(), config, } } pub fn run(&mut self) { // Create direcories if !Path::new(&self.diag_out_dir).exists() { fs::create_dir_all(&self.diag_out_dir).unwrap(); } if !Path::new(&self.mrd_out_dir).exists() { fs::create_dir_all(&self.mrd_out_dir).unwrap(); } // Parse let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id); let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id); let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz"); let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz"); let log: Arc> = Arc::new(Mutex::new(String::default())); let mut threads_handles = Vec::new(); if !Path::new(&diag_info_vcf).exists() { let bin = self.config.bin.clone(); let reference = self.config.reference.clone(); let diag_bam = self.diag_bam.clone(); let log = log.clone(); let diag_out_prefix = diag_out_prefix.clone(); let handle = thread::spawn(move || { let mut child = Command::new(bin) .args([ "parse", "--reference_fasta", &reference, &diag_bam, &diag_out_prefix, ]) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn() .expect("Failed to spawn NanomonSV parse"); if let Some(stdout) = child.stdout.take() { let reader = BufReader::new(stdout); for line in reader.lines().map_while(Result::ok) { info!("{}", line); let mut log_lock = log.lock().unwrap(); log_lock.push_str(&line); log_lock.push('\n'); } } }); threads_handles.push(handle); } if !Path::new(&mrd_info_vcf).exists() { let bin = self.config.bin.clone(); let reference = self.config.reference.clone(); let mrd_bam = self.diag_bam.clone(); let log = log.clone(); let mrd_out_prefix = mrd_out_prefix.clone(); let handle = thread::spawn(move || { let mut child = Command::new(bin) .args([ "parse", "--reference_fasta", &reference, &mrd_bam, &mrd_out_prefix, ]) .stdout(Stdio::inherit()) .stderr(Stdio::piped()) .spawn() .expect("Failed to spawn NanomonSV parse"); if let Some(stdout) = child.stderr.take() { let reader = BufReader::new(stdout); for line in reader.lines().map_while(Result::ok) { info!("{}", line); let mut log_lock = log.lock().unwrap(); log_lock.push_str(&line); log_lock.push('\n'); } } child.wait().unwrap(); }); threads_handles.push(handle); } // Wait for all threads to finish for handle in threads_handles { handle.join().expect("Thread panicked"); } // Get let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf"); let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf"); if !Path::new(&mrd_result_vcf).exists() { let mut child = Command::new(&self.config.bin) .args([ "get", "--process", &self.config.thread.to_string(), &mrd_out_prefix, &self.mrd_bam, &self.config.reference, ]) .stdout(Stdio::inherit()) .stderr(Stdio::piped()) .spawn() .expect("Failed to spawn nanomonsv"); if let Some(stdout) = child.stderr.take() { let reader = BufReader::new(stdout); for line in reader.lines().map_while(Result::ok) { info!("{line}"); let mut log_lock = log.lock().unwrap(); log_lock.push_str(&line); log_lock.push('\n'); } } child.wait().unwrap(); } if !Path::new(&diag_result_vcf).exists() { let mut child = Command::new(&self.config.bin) .args([ "get", "--control_prefix", &mrd_out_prefix, "--control_bam", &self.mrd_bam, "--process", &self.config.thread.to_string(), &diag_out_prefix, &self.diag_bam, &self.config.reference, ]) .stdout(Stdio::inherit()) .stderr(Stdio::piped()) .spawn() .expect("Failed to spawn nanomonsv"); if let Some(stdout) = child.stderr.take() { let reader = BufReader::new(stdout); for line in reader.lines().map_while(Result::ok) { info!("{line}"); let mut log_lock = log.lock().unwrap(); log_lock.push_str(&line); log_lock.push('\n'); } } child.wait().unwrap(); } let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz"); if !Path::new(&vcf_passed).exists() { bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap(); } } }