| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- import { spawn } from 'child_process';
- import { cpus } from 'os';
- import fs from 'fs'
- import path from 'path';
- import * as fastq from "fastq";
- import type { queueAsPromised } from "fastq";
- const async_exec = (prog: string, args: string[], onData: Function) => {
- return new Promise((resolve, reject) => {
- const child = spawn(prog, args, {shell: true})
- child.stdout.on('data', data => onData(data.toString().trim()))
- child.stderr.on('data', data => onData(data.toString().trim()))
- child.on('error', err => reject(err))
- child.on('exit', code => resolve(code))
- })
- }
- const invReplace = (regex: RegExp, string: string, by = '_') => string.split('').map(letter => letter.match(regex) ? letter : by).join('')
- const writeSequence = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => {
- return new Promise<boolean>(async(resolve, reject) => {
- try {
- const r = new RegExp(".{1," + lineN + "}","g");
- const regex_sam_restriction: RegExp = /[>0-9A-Za-z!#$%&+\./:;?@^_|~-]|[\n\t]/g;
- const nSeqName = invReplace(regex_sam_restriction, sequenceName)
- await fs.promises.writeFile(filePath, '>' + nSeqName + '\n' + sequence.match(r)?.join('\n').toUpperCase())
- resolve(true)
- } catch (error) {
- console.log(error)
- reject(false)
- }
- })
- }
- const makeReference = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => {
- if (await writeSequence(sequenceName, sequence, filePath, lineN)) await async_exec('bwa', ['index', filePath], () => console.log)
- }
- const asyncBwaMem = (
- refPath : string,
- reads : string | Array<string> | Array<Array<string>>,
- // R1 : string | Array<string>,
- // R2 : string | Array<string>,
- runName : string,
- libName : string,
- outputDir : string,
- onData : Function,
- options? : any,
- ) => {
- return new Promise<string[]>(async (resolve, reject) => {
- try {
- const defaultOptions = {
- output_discordant: true,
- output_splitted: true,
- output_unmapped: true
- }
-
- if (typeof options === 'undefined') {
- options = defaultOptions
- } else {
- options = {...defaultOptions, ...options}
- }
-
- const refName = path.parse(refPath).name
-
- const bwa = 'bwa'
- const samblaster = 'samblaster'
- const samtools = 'samtools'
- const sambamba = 'sambamba'
-
- let readsIn: string
- let isPairedEnd = false
-
- if (Array.isArray(reads) ) {
- isPairedEnd = true
- console.log('Assuming paired end reads');
-
- const [R1, R2] = reads
- const R1_arr = Array.isArray(R1) ? R1.join(' ') : R1
- const R2_arr = Array.isArray(R2) ? R2.join(' ') : R2
-
- const R1_kitty = R1_arr.slice(-2) === 'gz' ? 'zcat' : 'cat'
- const R2_kitty = R2_arr.slice(-2) === 'gz' ? 'zcat' : 'cat'
-
- const R1_in = `'< ${R1_kitty} ${R1_arr}'`
- const R2_in = `'< ${R2_kitty} ${R2_arr}'`
- readsIn = R1_in + ' ' + R2_in
- } else {
- readsIn = reads
- }
-
- let bam = path.join(outputDir, `bwa_mem_properly_on_${refName}.bam`)
- let bamSorted = path.join(outputDir, `bwa_mem_properly_on_${refName}.sorted.bam`)
- let retObj: any = { bamSorted }
- if(options?.remove_mapped) {
- bam = '/dev/null'
- delete retObj.bamSorted
- }
-
- const threads = String(cpus().length)
-
- let samblasterCmd: Array<string> = []
- // https://github.com/GregoryFaust/samblaster
- samblasterCmd = ['|', samblaster,
- '--addMateTags',
- '-a', // Accept duplicate marks already in input file
- '-e', // Exclude reads marked as duplicates from discordant, splitter, and/or unmapped
- ]
- if (options?.output_discordant || options?.output_splitted) {
- console.log('Using samblaster');
- if(options?.output_discordant) {
- if(!isPairedEnd) {
- console.log('Discordant reads can be found only in paired reads, skipping')
- } else {
- const discordantFile = path.join(outputDir, `bwa_mem_discordants_on_${refName}.sam`)
- console.log('Discordant reads file path: ', discordantFile);
- samblasterCmd = [...samblasterCmd, '-d', discordantFile]
- retObj = {...retObj, discordantFile}
- }
- }
-
- if (!isPairedEnd) {
- samblasterCmd = [...samblasterCmd, '--ignoreUnmated']
- }
-
- if(options?.output_splitted) {
- const splitterFile = path.join(outputDir, `bwa_mem_splitters_on_${refName}.sam`)
- console.log('Splitted reads file path: ', splitterFile);
- samblasterCmd = [...samblasterCmd, '-s', splitterFile]
- retObj = {...retObj, splitterFile}
- }
- }
- if(options?.output_unmapped) {
- const unmappedFile = path.join(outputDir, `bwa_mem_unmapped_on_${refName}.fq`)
- console.log('Unmapped reads file path: ', unmappedFile);
- samblasterCmd = [...samblasterCmd, '-u', unmappedFile]
- retObj = {...retObj, unmappedFile}
- }
- if(!fs.existsSync(refPath+'.amb')) {
- await async_exec(bwa, ['index', refPath], (message: string) => onData('[BWA-INDEX] ' + message))
- }
- console.log(options, samblasterCmd);
-
- const code = await async_exec(
- bwa, ['mem',
- '-t', threads,
- '-R', `"@RG\\tPL:Illumina\\tID:${+(new Date)}\\tSM:${runName}\\tLB:${libName}"`,
- refPath,
- readsIn,
- ...samblasterCmd,
- '|',
- samtools,
- 'view',
- '-Sb',
- '-',
- '>',
- bam
- ], (message: string) => onData('[BWA-MEM] ' + message))
- onData('[BWA-MEM][EXIT CODE] ' + code)
-
- if(retObj.bamSorted) {
- const code_sort = await async_exec(
- sambamba, ['sort',
- '-t', threads,
- bam
- ], (message: string) => onData('[SAMBAMBA-SORT] ' + message))
- onData('[SAMBAMBA-SORT][EXIT CODE] ' + code_sort)
- fs.unlinkSync(bam)
- }
-
- resolve(retObj)
- } catch (err) {
- reject(err)
- }
- })
- }
- type Task = {
- refPath : string,
- reads : string | Array<string> | Array<Array<string>> ;
- runName : string;
- libName : string;
- outputDir : string;
- onData : Function
- }
- const asyncBwaMemWorker = async (args:any) => {
- const {refPath, reads, runName, libName, outputDir, onData} = args
- await asyncBwaMem(refPath, reads, runName, libName, outputDir, onData)
- }
- const scheduleAsyncBwaMem = async (
- allReads: Array<Array<string>> | Array<Array<Array<string>>>,
- refPath : string,
- runName : string,
- libName : string,
- allOutputDir : string[],
- onData : Function
- ) => {
- const q: queueAsPromised<Task> = fastq.promise(asyncBwaMemWorker, 1)
- await Promise.all(allReads.map((reads,i) => q.push({refPath, reads, runName, libName, outputDir: allOutputDir[i], onData})))
- }
- export { asyncBwaMem, writeSequence, makeReference, scheduleAsyncBwaMem }
|