index.ts 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import { spawn } from 'child_process';
  2. import { cpus } from 'os';
  3. import fs from 'fs'
  4. import path from 'path';
  5. import * as fastq from "fastq";
  6. import type { queueAsPromised } from "fastq";
  7. const async_exec = (prog: string, args: string[], onData: Function) => {
  8. return new Promise((resolve, reject) => {
  9. const child = spawn(prog, args, {shell: true})
  10. child.stdout.on('data', data => onData(data.toString().trim()))
  11. child.stderr.on('data', data => onData(data.toString().trim()))
  12. child.on('error', err => reject(err))
  13. child.on('exit', code => resolve(code))
  14. })
  15. }
  16. const invReplace = (regex: RegExp, string: string, by = '_') => string.split('').map(letter => letter.match(regex) ? letter : by).join('')
  17. const writeSequence = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => {
  18. return new Promise<boolean>(async(resolve, reject) => {
  19. try {
  20. const r = new RegExp(".{1," + lineN + "}","g");
  21. const regex_sam_restriction: RegExp = /[>0-9A-Za-z!#$%&+\./:;?@^_|~-]|[\n\t]/g;
  22. const nSeqName = invReplace(regex_sam_restriction, sequenceName)
  23. await fs.promises.writeFile(filePath, '>' + nSeqName + '\n' + sequence.match(r)?.join('\n').toUpperCase())
  24. resolve(true)
  25. } catch (error) {
  26. console.log(error)
  27. reject(false)
  28. }
  29. })
  30. }
  31. const makeReference = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => {
  32. if (await writeSequence(sequenceName, sequence, filePath, lineN)) await async_exec('bwa', ['index', filePath], () => console.log)
  33. }
  34. const asyncBwaMem = (
  35. refPath : string,
  36. reads : string | Array<string> | Array<Array<string>>,
  37. // R1 : string | Array<string>,
  38. // R2 : string | Array<string>,
  39. runName : string,
  40. libName : string,
  41. outputDir : string,
  42. onData : Function,
  43. options? : any,
  44. ) => {
  45. return new Promise<string[]>(async (resolve, reject) => {
  46. try {
  47. const defaultOptions = {
  48. output_discordant: true,
  49. output_splitted: true,
  50. output_unmapped: true
  51. }
  52. if (typeof options === 'undefined') {
  53. options = defaultOptions
  54. } else {
  55. options = {...defaultOptions, ...options}
  56. }
  57. const refName = path.parse(refPath).name
  58. const bwa = 'bwa'
  59. const samblaster = 'samblaster'
  60. const samtools = 'samtools'
  61. const sambamba = 'sambamba'
  62. let readsIn: string
  63. let isPairedEnd = false
  64. if (Array.isArray(reads) ) {
  65. isPairedEnd = true
  66. console.log('Assuming paired end reads');
  67. const [R1, R2] = reads
  68. const R1_arr = Array.isArray(R1) ? R1.join(' ') : R1
  69. const R2_arr = Array.isArray(R2) ? R2.join(' ') : R2
  70. const R1_kitty = R1_arr.slice(-2) === 'gz' ? 'zcat' : 'cat'
  71. const R2_kitty = R2_arr.slice(-2) === 'gz' ? 'zcat' : 'cat'
  72. const R1_in = `'< ${R1_kitty} ${R1_arr}'`
  73. const R2_in = `'< ${R2_kitty} ${R2_arr}'`
  74. readsIn = R1_in + ' ' + R2_in
  75. } else {
  76. readsIn = reads
  77. }
  78. let bam = path.join(outputDir, `bwa_mem_properly_on_${refName}.bam`)
  79. let bamSorted = path.join(outputDir, `bwa_mem_properly_on_${refName}.sorted.bam`)
  80. let retObj: any = { bamSorted }
  81. if(options?.remove_mapped) {
  82. bam = '/dev/null'
  83. delete retObj.bamSorted
  84. }
  85. const threads = String(cpus().length)
  86. let samblasterCmd: Array<string> = []
  87. // https://github.com/GregoryFaust/samblaster
  88. samblasterCmd = ['|', samblaster,
  89. '--addMateTags',
  90. '-a', // Accept duplicate marks already in input file
  91. '-e', // Exclude reads marked as duplicates from discordant, splitter, and/or unmapped
  92. ]
  93. if (options?.output_discordant || options?.output_splitted) {
  94. console.log('Using samblaster');
  95. if(options?.output_discordant) {
  96. if(!isPairedEnd) {
  97. console.log('Discordant reads can be found only in paired reads, skipping')
  98. } else {
  99. const discordantFile = path.join(outputDir, `bwa_mem_discordants_on_${refName}.sam`)
  100. console.log('Discordant reads file path: ', discordantFile);
  101. samblasterCmd = [...samblasterCmd, '-d', discordantFile]
  102. retObj = {...retObj, discordantFile}
  103. }
  104. }
  105. if (!isPairedEnd) {
  106. samblasterCmd = [...samblasterCmd, '--ignoreUnmated']
  107. }
  108. if(options?.output_splitted) {
  109. const splitterFile = path.join(outputDir, `bwa_mem_splitters_on_${refName}.sam`)
  110. console.log('Splitted reads file path: ', splitterFile);
  111. samblasterCmd = [...samblasterCmd, '-s', splitterFile]
  112. retObj = {...retObj, splitterFile}
  113. }
  114. }
  115. if(options?.output_unmapped) {
  116. const unmappedFile = path.join(outputDir, `bwa_mem_unmapped_on_${refName}.fq`)
  117. console.log('Unmapped reads file path: ', unmappedFile);
  118. samblasterCmd = [...samblasterCmd, '-u', unmappedFile]
  119. retObj = {...retObj, unmappedFile}
  120. }
  121. if(!fs.existsSync(refPath+'.amb')) {
  122. await async_exec(bwa, ['index', refPath], (message: string) => onData('[BWA-INDEX] ' + message))
  123. }
  124. console.log(options, samblasterCmd);
  125. const code = await async_exec(
  126. bwa, ['mem',
  127. '-t', threads,
  128. '-R', `"@RG\\tPL:Illumina\\tID:${+(new Date)}\\tSM:${runName}\\tLB:${libName}"`,
  129. refPath,
  130. readsIn,
  131. ...samblasterCmd,
  132. '|',
  133. samtools,
  134. 'view',
  135. '-Sb',
  136. '-',
  137. '>',
  138. bam
  139. ], (message: string) => onData('[BWA-MEM] ' + message))
  140. onData('[BWA-MEM][EXIT CODE] ' + code)
  141. if(retObj.bamSorted) {
  142. const code_sort = await async_exec(
  143. sambamba, ['sort',
  144. '-t', threads,
  145. bam
  146. ], (message: string) => onData('[SAMBAMBA-SORT] ' + message))
  147. onData('[SAMBAMBA-SORT][EXIT CODE] ' + code_sort)
  148. fs.unlinkSync(bam)
  149. }
  150. resolve(retObj)
  151. } catch (err) {
  152. reject(err)
  153. }
  154. })
  155. }
  156. type Task = {
  157. refPath : string,
  158. reads : string | Array<string> | Array<Array<string>> ;
  159. runName : string;
  160. libName : string;
  161. outputDir : string;
  162. onData : Function
  163. }
  164. const asyncBwaMemWorker = async (args:any) => {
  165. const {refPath, reads, runName, libName, outputDir, onData} = args
  166. await asyncBwaMem(refPath, reads, runName, libName, outputDir, onData)
  167. }
  168. const scheduleAsyncBwaMem = async (
  169. allReads: Array<Array<string>> | Array<Array<Array<string>>>,
  170. refPath : string,
  171. runName : string,
  172. libName : string,
  173. allOutputDir : string,
  174. onData : Function
  175. ) => {
  176. const q: queueAsPromised<Task> = fastq.promise(asyncBwaMemWorker, 1)
  177. await Promise.all(allReads.map((reads,i) => q.push({refPath, reads, runName, libName, outputDir:allOutputDir[i], onData})))
  178. }
  179. export { asyncBwaMem, writeSequence, makeReference, scheduleAsyncBwaMem }