diff --git a/index.js b/index.js index f190c689..ee68967a 100644 --- a/index.js +++ b/index.js @@ -64,6 +64,7 @@ function compression (options) { var _end = res.end var _on = res.on + var _removeListener = res.removeListener var _write = res.write // flush @@ -131,6 +132,33 @@ function compression (options) { return this } + res.addListener = res.on + + res.removeListener = function removeListener (type, listener) { + if (!listeners || type !== 'drain') { + return _removeListener.call(this, type, listener) + } + + if (stream) { + return stream.removeListener(type, listener) + } + + // remove buffered listener + for (var i = listeners.length - 1; i >= 0; i--) { + if (listeners[i][0] === type && listeners[i][1] === listener) { + listeners.splice(i, 1) + } + } + + return this + } + + /* istanbul ignore next */ + if (res.off) { + // emitter.off was added in Node.js v10+; don't add it to earlier versions + res.off = res.removeListener + } + function nocompress (msg) { debug('no compression: %s', msg) addListeners(res, _on, listeners) diff --git a/test/compression.js b/test/compression.js index 5c5ed080..969111f8 100644 --- a/test/compression.js +++ b/test/compression.js @@ -305,6 +305,192 @@ describe('compression()', function () { .expect(200, done) }) + it('should support removeListener("drain") after on("drain"); stream present', function (done) { + // compression doesn't proxy listenerCount() to the compression stream, so + // instead watch for a MaxListenersExceededWarning + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain") after addListener("drain")', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support off("drain") after addListener("drain")', function (done) { + if (!require('events').EventEmitter.prototype.off) { // off was added in Node.js v10 + this.skip() + } + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.off('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); buffered', function (done) { + // Variant of above tests for scenario when the listener is buffered (stream + // is not yet present). + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + res.on('end', function () {}) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) { + // Variant of above test for scenario when the listener is buffered (stream + // is not yet present) and the same listener is added two or more times. + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should not leak event listeners when res.unpipe() is used (#135)', function (done) { + // unpipe and stream.Readable were added in v0.9.4 + var stream = require('stream') + if (!(stream.Readable && stream.Readable.prototype.unpipe)) { + this.skip() + } + + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + var server = createServer({threshold: 0}, function (req, res) { + var times = 0 + var int = setInterval(function () { + var rs = require('fs').createReadStream('does not exist') + rs.on('error', function (e) { + rs.unpipe(res) + }) + rs.pipe(res) + if (times++ > res.getMaxListeners()) { + clearInterval(int) + res.end('hello, world') + } + }) + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + describe('threshold', function () { it('should not compress responses below the threshold size', function (done) { var server = createServer({ threshold: '1kb' }, function (req, res) { @@ -669,6 +855,13 @@ function createServer (opts, fn) { return } + if (typeof res.getMaxListeners !== 'function') { + // Added in v0.11.2 + res.getMaxListeners = function getMaxListeners () { + return 10 + } + } + fn(req, res) }) })