const through = require('through2'); const {Tail} = require('tail'); const {Writable} = require('stream'); const {StringDecoder} = require('string_decoder'); class StringWritable extends Writable { constructor(handler, options) { super(options); this.handler = handler; const state = this._writableState; this._decoder = new StringDecoder(state.defaultEncoding); this.data = ''; } _write(chunk, encoding, callback) { if (encoding === 'buffer') { chunk = this._decoder.write(chunk); for (const line of chunk.split('\n')) if (line.length) { this.handler(line); } } this.data += chunk; callback(); } _final(callback) { this.data += this._decoder.end(); callback(); } } module.exports = (handler) => { return through.obj(function (file, enc, callback) { if (file.contents && file.contents.pipe) { file.contents.pipe(new StringWritable(handler)); } else { const tail = new Tail(file.path); tail.on('line', data => handler(data)); tail.on('error', error => handler('[ERROR]: ', error)); this.on('end', () => tail.unwatch()); } this.push(file); callback(); }); };