: 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // reference="/data/ref/hs1/chm13v2.0.fa"
  2. // bcftools="/data/tools/bcftools-1.18/bcftools"
  3. //
  4. // DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
  5. // MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
  6. //
  7. // DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
  8. // MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
  9. //
  10. // DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
  11. // MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
  12. //
  13. // DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
  14. // MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
  15. //
  16. // if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
  17. // mkdir "$DIAG_OUTPUT_DIR"
  18. // fi
  19. //
  20. // if [ ! -d "$MRD_OUTPUT_DIR" ]; then
  21. // mkdir "$MRD_OUTPUT_DIR"
  22. // fi
  23. //
  24. // # PARSE
  25. // if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
  26. // nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
  27. // fi
  28. //
  29. // if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
  30. // echo "[pipe] NanomonSV parse ${MRD_BAM}"
  31. // nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
  32. // fi
  33. //
  34. // # GET
  35. // if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
  36. // nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
  37. // --control_bam "$MRD_BAM" \
  38. // --processes 155 \
  39. // "$DIAG_OUT_PREFIX" \
  40. // "$DIAG_BAM" "$reference"
  41. // fi
  42. //
  43. // # GET MRD
  44. // if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
  45. // nanomonsv get \
  46. // --processes 155 \
  47. // "$MRD_OUT_PREFIX" \
  48. // "$MRD_BAM" "$reference"
  49. // fi
  50. //
  51. // vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
  52. // if [ ! -f "$vcf_passed" ]; then
  53. // $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
  54. // $bcftools view -s "TUMOR" --write-index --threads 20 \
  55. // -i "FILTER='PASS'" \
  56. // -o "$vcf_passed"
  57. // fi
  58. use std::{
  59. fs,
  60. io::{BufRead, BufReader},
  61. path::Path,
  62. process::{Command, Stdio},
  63. sync::{Arc, Mutex},
  64. thread,
  65. };
  66. use log::info;
  67. use crate::commands::bcftools::{bcftools_keep_pass, BcftoolsConfig};
  68. #[derive(Debug)]
  69. pub struct NanomonSVConfig {
  70. pub bin: String,
  71. pub result_dir: String,
  72. pub reference: String,
  73. pub thread: u8,
  74. }
  75. impl Default for NanomonSVConfig {
  76. fn default() -> Self {
  77. Self {
  78. bin: "nanomonsv".to_string(),
  79. result_dir: "/data/longreads_basic_pipe".to_string(),
  80. reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
  81. thread: 155,
  82. }
  83. }
  84. }
  85. #[derive(Debug)]
  86. pub struct NanomonSV {
  87. pub id: String,
  88. pub diag_bam: String,
  89. pub mrd_bam: String,
  90. pub diag_out_dir: String,
  91. pub mrd_out_dir: String,
  92. pub log: String,
  93. pub config: NanomonSVConfig,
  94. }
  95. impl NanomonSV {
  96. pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
  97. let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
  98. let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
  99. NanomonSV {
  100. id: id.to_string(),
  101. diag_bam: diag_bam.to_string(),
  102. mrd_bam: mrd_bam.to_string(),
  103. diag_out_dir,
  104. mrd_out_dir,
  105. log: String::default(),
  106. config,
  107. }
  108. }
  109. pub fn run(&mut self) {
  110. // Create direcories
  111. if !Path::new(&self.diag_out_dir).exists() {
  112. fs::create_dir_all(&self.diag_out_dir).unwrap();
  113. }
  114. if !Path::new(&self.mrd_out_dir).exists() {
  115. fs::create_dir_all(&self.mrd_out_dir).unwrap();
  116. }
  117. // Parse
  118. let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
  119. let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
  120. let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
  121. let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
  122. let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
  123. let mut threads_handles = Vec::new();
  124. if !Path::new(&diag_info_vcf).exists() {
  125. let bin = self.config.bin.clone();
  126. let reference = self.config.reference.clone();
  127. let diag_bam = self.diag_bam.clone();
  128. let log = log.clone();
  129. let diag_out_prefix = diag_out_prefix.clone();
  130. let handle = thread::spawn(move || {
  131. let mut child = Command::new(bin)
  132. .args([
  133. "parse",
  134. "--reference_fasta",
  135. &reference,
  136. &diag_bam,
  137. &diag_out_prefix,
  138. ])
  139. .stdout(Stdio::piped())
  140. .stderr(Stdio::inherit())
  141. .spawn()
  142. .expect("Failed to spawn NanomonSV parse");
  143. if let Some(stdout) = child.stdout.take() {
  144. let reader = BufReader::new(stdout);
  145. for line in reader.lines().map_while(Result::ok) {
  146. info!("{}", line);
  147. let mut log_lock = log.lock().unwrap();
  148. log_lock.push_str(&line);
  149. log_lock.push('\n');
  150. }
  151. }
  152. });
  153. threads_handles.push(handle);
  154. }
  155. if !Path::new(&mrd_info_vcf).exists() {
  156. let bin = self.config.bin.clone();
  157. let reference = self.config.reference.clone();
  158. let mrd_bam = self.diag_bam.clone();
  159. let log = log.clone();
  160. let mrd_out_prefix = mrd_out_prefix.clone();
  161. let handle = thread::spawn(move || {
  162. let mut child = Command::new(bin)
  163. .args([
  164. "parse",
  165. "--reference_fasta",
  166. &reference,
  167. &mrd_bam,
  168. &mrd_out_prefix,
  169. ])
  170. .stdout(Stdio::inherit())
  171. .stderr(Stdio::piped())
  172. .spawn()
  173. .expect("Failed to spawn NanomonSV parse");
  174. if let Some(stdout) = child.stderr.take() {
  175. let reader = BufReader::new(stdout);
  176. for line in reader.lines().map_while(Result::ok) {
  177. info!("{}", line);
  178. let mut log_lock = log.lock().unwrap();
  179. log_lock.push_str(&line);
  180. log_lock.push('\n');
  181. }
  182. }
  183. child.wait().unwrap();
  184. });
  185. threads_handles.push(handle);
  186. }
  187. // Wait for all threads to finish
  188. for handle in threads_handles {
  189. handle.join().expect("Thread panicked");
  190. }
  191. // Get
  192. let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
  193. let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
  194. if !Path::new(&mrd_result_vcf).exists() {
  195. let mut child = Command::new(&self.config.bin)
  196. .args([
  197. "get",
  198. "--process",
  199. &self.config.thread.to_string(),
  200. &mrd_out_prefix,
  201. &self.mrd_bam,
  202. &self.config.reference,
  203. ])
  204. .stdout(Stdio::inherit())
  205. .stderr(Stdio::piped())
  206. .spawn()
  207. .expect("Failed to spawn nanomonsv");
  208. if let Some(stdout) = child.stderr.take() {
  209. let reader = BufReader::new(stdout);
  210. for line in reader.lines().map_while(Result::ok) {
  211. info!("{line}");
  212. let mut log_lock = log.lock().unwrap();
  213. log_lock.push_str(&line);
  214. log_lock.push('\n');
  215. }
  216. }
  217. child.wait().unwrap();
  218. }
  219. if !Path::new(&diag_result_vcf).exists() {
  220. let mut child = Command::new(&self.config.bin)
  221. .args([
  222. "get",
  223. "--control_prefix",
  224. &mrd_out_prefix,
  225. "--control_bam",
  226. &self.mrd_bam,
  227. "--process",
  228. &self.config.thread.to_string(),
  229. &diag_out_prefix,
  230. &self.diag_bam,
  231. &self.config.reference,
  232. ])
  233. .stdout(Stdio::inherit())
  234. .stderr(Stdio::piped())
  235. .spawn()
  236. .expect("Failed to spawn nanomonsv");
  237. if let Some(stdout) = child.stderr.take() {
  238. let reader = BufReader::new(stdout);
  239. for line in reader.lines().map_while(Result::ok) {
  240. info!("{line}");
  241. let mut log_lock = log.lock().unwrap();
  242. log_lock.push_str(&line);
  243. log_lock.push('\n');
  244. }
  245. }
  246. child.wait().unwrap();
  247. }
  248. let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
  249. if !Path::new(&vcf_passed).exists() {
  250. bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
  251. }
  252. }
  253. }