import { spawn } from 'child_process'; import { cpus } from 'os'; import fs from 'fs' import path from 'path'; import * as fastq from "fastq"; import type { queueAsPromised } from "fastq"; const async_exec = (prog: string, args: string[], onData: Function) => { return new Promise((resolve, reject) => { const child = spawn(prog, args, {shell: true}) child.stdout.on('data', data => onData(data.toString().trim())) child.stderr.on('data', data => onData(data.toString().trim())) child.on('error', err => reject(err)) child.on('exit', code => resolve(code)) }) } const invReplace = (regex: RegExp, string: string, by = '_') => string.split('').map(letter => letter.match(regex) ? letter : by).join('') const writeSequence = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => { return new Promise(async(resolve, reject) => { try { const r = new RegExp(".{1," + lineN + "}","g"); const regex_sam_restriction: RegExp = /[>0-9A-Za-z!#$%&+\./:;?@^_|~-]|[\n\t]/g; const nSeqName = invReplace(regex_sam_restriction, sequenceName) await fs.promises.writeFile(filePath, '>' + nSeqName + '\n' + sequence.match(r)?.join('\n').toUpperCase()) resolve(true) } catch (error) { console.log(error) reject(false) } }) } const makeReference = async (sequenceName:string, sequence:string, filePath: string, lineN = 80) => { if (await writeSequence(sequenceName, sequence, filePath, lineN)) await async_exec('bwa', ['index', filePath], () => console.log) } const asyncBwaMem = ( refPath : string, reads : string | Array | Array>, // R1 : string | Array, // R2 : string | Array, runName : string, libName : string, outputDir : string, onData : Function, options? : any, ) => { return new Promise(async (resolve, reject) => { try { const defaultOptions = { output_discordant: true, output_splitted: true, output_unmapped: true } if (typeof options === 'undefined') { options = defaultOptions } else { options = {...defaultOptions, ...options} } const refName = path.parse(refPath).name const bwa = 'bwa' const samblaster = 'samblaster' const samtools = 'samtools' const sambamba = 'sambamba' let readsIn: string let isPairedEnd = false if (Array.isArray(reads) ) { isPairedEnd = true console.log('Assuming paired end reads'); const [R1, R2] = reads const R1_arr = Array.isArray(R1) ? R1.join(' ') : R1 const R2_arr = Array.isArray(R2) ? R2.join(' ') : R2 const R1_kitty = R1_arr.slice(-2) === 'gz' ? 'zcat' : 'cat' const R2_kitty = R2_arr.slice(-2) === 'gz' ? 'zcat' : 'cat' const R1_in = `'< ${R1_kitty} ${R1_arr}'` const R2_in = `'< ${R2_kitty} ${R2_arr}'` readsIn = R1_in + ' ' + R2_in } else { readsIn = reads } let bam = path.join(outputDir, `bwa_mem_properly_on_${refName}.bam`) let bamSorted = path.join(outputDir, `bwa_mem_properly_on_${refName}.sorted.bam`) let retObj: any = { bamSorted } if(options?.remove_mapped) { bam = '/dev/null' delete retObj.bamSorted } const threads = String(cpus().length) let samblasterCmd: Array = [] // https://github.com/GregoryFaust/samblaster samblasterCmd = ['|', samblaster, '--addMateTags', '-a', // Accept duplicate marks already in input file '-e', // Exclude reads marked as duplicates from discordant, splitter, and/or unmapped ] if (options?.output_discordant || options?.output_splitted) { console.log('Using samblaster'); if(options?.output_discordant) { if(!isPairedEnd) { console.log('Discordant reads can be found only in paired reads, skipping') } else { const discordantFile = path.join(outputDir, `bwa_mem_discordants_on_${refName}.sam`) console.log('Discordant reads file path: ', discordantFile); samblasterCmd = [...samblasterCmd, '-d', discordantFile] retObj = {...retObj, discordantFile} } } if (!isPairedEnd) { samblasterCmd = [...samblasterCmd, '--ignoreUnmated'] } if(options?.output_splitted) { const splitterFile = path.join(outputDir, `bwa_mem_splitters_on_${refName}.sam`) console.log('Splitted reads file path: ', splitterFile); samblasterCmd = [...samblasterCmd, '-s', splitterFile] retObj = {...retObj, splitterFile} } } if(options?.output_unmapped) { const unmappedFile = path.join(outputDir, `bwa_mem_unmapped_on_${refName}.fq`) console.log('Unmapped reads file path: ', unmappedFile); samblasterCmd = [...samblasterCmd, '-u', unmappedFile] retObj = {...retObj, unmappedFile} } if(!fs.existsSync(refPath+'.amb')) { await async_exec(bwa, ['index', refPath], (message: string) => onData('[BWA-INDEX] ' + message)) } console.log(options, samblasterCmd); const code = await async_exec( bwa, ['mem', '-t', threads, '-R', `"@RG\\tPL:Illumina\\tID:${+(new Date)}\\tSM:${runName}\\tLB:${libName}"`, refPath, readsIn, ...samblasterCmd, '|', samtools, 'view', '-Sb', '-', '>', bam ], (message: string) => onData('[BWA-MEM] ' + message)) onData('[BWA-MEM][EXIT CODE] ' + code) if(retObj.bamSorted) { const code_sort = await async_exec( sambamba, ['sort', '-t', threads, bam ], (message: string) => onData('[SAMBAMBA-SORT] ' + message)) onData('[SAMBAMBA-SORT][EXIT CODE] ' + code_sort) fs.unlinkSync(bam) } resolve(retObj) } catch (err) { reject(err) } }) } type Task = { refPath : string, reads : string | Array | Array> ; runName : string; libName : string; outputDir : string; onData : Function } const asyncBwaMemWorker = async (args:any) => { const {refPath, reads, runName, libName, outputDir, onData} = args await asyncBwaMem(refPath, reads, runName, libName, outputDir, onData) } const scheduleAsyncBwaMem = async ( allReads: Array> | Array>>, refPath : string, runName : string, libName : string, allOutputDir : string[], onData : Function ) => { const q: queueAsPromised = fastq.promise(asyncBwaMemWorker, 1) await Promise.all(allReads.map((reads,i) => q.push({refPath, reads, runName, libName, outputDir: allOutputDir[i], onData}))) } export { asyncBwaMem, writeSequence, makeReference, scheduleAsyncBwaMem }