| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- // reference="/data/ref/hs1/chm13v2.0.fa"
- // bcftools="/data/tools/bcftools-1.18/bcftools"
- //
- // DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
- // MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
- //
- // DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
- // MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
- //
- // DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
- // MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
- //
- // DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
- // MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
- //
- // if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
- // mkdir "$DIAG_OUTPUT_DIR"
- // fi
- //
- // if [ ! -d "$MRD_OUTPUT_DIR" ]; then
- // mkdir "$MRD_OUTPUT_DIR"
- // fi
- //
- // # PARSE
- // if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
- // nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
- // fi
- //
- // if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
- // echo "[pipe] NanomonSV parse ${MRD_BAM}"
- // nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
- // fi
- //
- // # GET
- // if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
- // nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
- // --control_bam "$MRD_BAM" \
- // --processes 155 \
- // "$DIAG_OUT_PREFIX" \
- // "$DIAG_BAM" "$reference"
- // fi
- //
- // # GET MRD
- // if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
- // nanomonsv get \
- // --processes 155 \
- // "$MRD_OUT_PREFIX" \
- // "$MRD_BAM" "$reference"
- // fi
- //
- // vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
- // if [ ! -f "$vcf_passed" ]; then
- // $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
- // $bcftools view -s "TUMOR" --write-index --threads 20 \
- // -i "FILTER='PASS'" \
- // -o "$vcf_passed"
- // fi
- use std::{
- fs,
- io::{BufRead, BufReader},
- path::Path,
- process::{Command, Stdio},
- sync::{Arc, Mutex},
- thread,
- };
- use log::info;
- use crate::commands::bcftools::{bcftools_keep_pass, BcftoolsConfig};
- #[derive(Debug)]
- pub struct NanomonSVConfig {
- pub bin: String,
- pub result_dir: String,
- pub reference: String,
- pub thread: u8,
- }
- impl Default for NanomonSVConfig {
- fn default() -> Self {
- Self {
- bin: "nanomonsv".to_string(),
- result_dir: "/data/longreads_basic_pipe".to_string(),
- reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
- thread: 155,
- }
- }
- }
- #[derive(Debug)]
- pub struct NanomonSV {
- pub id: String,
- pub diag_bam: String,
- pub mrd_bam: String,
- pub diag_out_dir: String,
- pub mrd_out_dir: String,
- pub log: String,
- pub config: NanomonSVConfig,
- }
- impl NanomonSV {
- pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
- let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
- let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
- NanomonSV {
- id: id.to_string(),
- diag_bam: diag_bam.to_string(),
- mrd_bam: mrd_bam.to_string(),
- diag_out_dir,
- mrd_out_dir,
- log: String::default(),
- config,
- }
- }
- pub fn run(&mut self) {
- // Create direcories
- if !Path::new(&self.diag_out_dir).exists() {
- fs::create_dir_all(&self.diag_out_dir).unwrap();
- }
- if !Path::new(&self.mrd_out_dir).exists() {
- fs::create_dir_all(&self.mrd_out_dir).unwrap();
- }
- // Parse
- let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
- let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
- let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
- let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
- let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
- let mut threads_handles = Vec::new();
- if !Path::new(&diag_info_vcf).exists() {
- let bin = self.config.bin.clone();
- let reference = self.config.reference.clone();
- let diag_bam = self.diag_bam.clone();
- let log = log.clone();
- let diag_out_prefix = diag_out_prefix.clone();
- let handle = thread::spawn(move || {
- let mut child = Command::new(bin)
- .args([
- "parse",
- "--reference_fasta",
- &reference,
- &diag_bam,
- &diag_out_prefix,
- ])
- .stdout(Stdio::piped())
- .stderr(Stdio::inherit())
- .spawn()
- .expect("Failed to spawn NanomonSV parse");
- if let Some(stdout) = child.stdout.take() {
- let reader = BufReader::new(stdout);
- for line in reader.lines().map_while(Result::ok) {
- info!("{}", line);
- let mut log_lock = log.lock().unwrap();
- log_lock.push_str(&line);
- log_lock.push('\n');
- }
- }
- });
- threads_handles.push(handle);
- }
- if !Path::new(&mrd_info_vcf).exists() {
- let bin = self.config.bin.clone();
- let reference = self.config.reference.clone();
- let mrd_bam = self.diag_bam.clone();
- let log = log.clone();
- let mrd_out_prefix = mrd_out_prefix.clone();
- let handle = thread::spawn(move || {
- let mut child = Command::new(bin)
- .args([
- "parse",
- "--reference_fasta",
- &reference,
- &mrd_bam,
- &mrd_out_prefix,
- ])
- .stdout(Stdio::inherit())
- .stderr(Stdio::piped())
- .spawn()
- .expect("Failed to spawn NanomonSV parse");
- if let Some(stdout) = child.stderr.take() {
- let reader = BufReader::new(stdout);
- for line in reader.lines().map_while(Result::ok) {
- info!("{}", line);
- let mut log_lock = log.lock().unwrap();
- log_lock.push_str(&line);
- log_lock.push('\n');
- }
- }
- child.wait().unwrap();
- });
- threads_handles.push(handle);
- }
- // Wait for all threads to finish
- for handle in threads_handles {
- handle.join().expect("Thread panicked");
- }
- // Get
- let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
- let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
- if !Path::new(&mrd_result_vcf).exists() {
- let mut child = Command::new(&self.config.bin)
- .args([
- "get",
- "--process",
- &self.config.thread.to_string(),
- &mrd_out_prefix,
- &self.mrd_bam,
- &self.config.reference,
- ])
- .stdout(Stdio::inherit())
- .stderr(Stdio::piped())
- .spawn()
- .expect("Failed to spawn nanomonsv");
- if let Some(stdout) = child.stderr.take() {
- let reader = BufReader::new(stdout);
- for line in reader.lines().map_while(Result::ok) {
- info!("{line}");
- let mut log_lock = log.lock().unwrap();
- log_lock.push_str(&line);
- log_lock.push('\n');
- }
- }
- child.wait().unwrap();
- }
- if !Path::new(&diag_result_vcf).exists() {
- let mut child = Command::new(&self.config.bin)
- .args([
- "get",
- "--control_prefix",
- &mrd_out_prefix,
- "--control_bam",
- &self.mrd_bam,
- "--process",
- &self.config.thread.to_string(),
- &diag_out_prefix,
- &self.diag_bam,
- &self.config.reference,
- ])
- .stdout(Stdio::inherit())
- .stderr(Stdio::piped())
- .spawn()
- .expect("Failed to spawn nanomonsv");
- if let Some(stdout) = child.stderr.take() {
- let reader = BufReader::new(stdout);
- for line in reader.lines().map_while(Result::ok) {
- info!("{line}");
- let mut log_lock = log.lock().unwrap();
- log_lock.push_str(&line);
- log_lock.push('\n');
- }
- }
- child.wait().unwrap();
- }
- let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
- if !Path::new(&vcf_passed).exists() {
- bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
- }
- }
- }
|