diff --git a/index.js b/index.js index 01b15bd..40f8ef9 100644 --- a/index.js +++ b/index.js @@ -63,9 +63,39 @@ function compression (options) { var stream var _end = res.end - var _on = res.on var _write = res.write + // proxy drain events from stream + var _addListener = interceptAddListener(res, function (type, listener) { + if (!listeners || type !== 'drain') { + // skip intercept + return false + } else if (stream) { + // add listener to stream instead + stream.on(type, listener) + } else { + // buffer listeners for future stream + listeners.push([type, listener]) + } + }) + + interceptRemoveListener(res, function (type, listener) { + if (!listeners || type !== 'drain') { + // skip intercept + return false + } else if (stream) { + // remove listener from stream + stream.removeListener(type, listener) + } else { + // remove buffered listener + for (var i = listeners.length - 1; i >= 0; i--) { + if (listeners[i][0] === type && listeners[i][1] === listener) { + listeners.splice(i, 1) + } + } + } + }) + // flush res.flush = function flush () { if (stream) { @@ -116,24 +146,9 @@ function compression (options) { : stream.end() } - res.on = function on (type, listener) { - if (!listeners || type !== 'drain') { - return _on.call(this, type, listener) - } - - if (stream) { - return stream.on(type, listener) - } - - // buffer listeners for future stream - listeners.push([type, listener]) - - return this - } - function nocompress (msg) { debug('no compression: %s', msg) - addListeners(res, _on, listeners) + addListeners(res, _addListener, listeners) listeners = null } @@ -207,7 +222,7 @@ function compression (options) { _end.call(res) }) - _on.call(res, 'drain', function onResponseDrain () { + _addListener.call(res, 'drain', function onResponseDrain () { stream.resume() }) }) @@ -221,9 +236,9 @@ function compression (options) { * @private */ -function addListeners (stream, on, listeners) { +function addListeners (stream, addListener, listeners) { for (var i = 0; i < listeners.length; i++) { - on.apply(stream, listeners[i]) + addListener.apply(stream, listeners[i]) } } @@ -241,6 +256,93 @@ function chunkLength (chunk, encoding) { : Buffer.byteLength(chunk, encoding) } +/** + * Intercept add listener on event emitter. + * @private + */ + +function interceptAddListener (ee, fn) { + var _addListener = ee.addListener + var _on = ee.on + + if (_addListener) { + Object.defineProperty(ee, 'addListener', { + configurable: true, + value: addListener, + writable: true + }) + } + + if (_on) { + Object.defineProperty(ee, 'on', { + configurable: true, + value: on, + writable: true + }) + } + + return _addListener || _on || noop + + function addListener (type, listener) { + return fn.call(this, type, listener) === false + ? _addListener.call(this, type, listener) + : this + } + + function on (type, listener) { + return fn.call(this, type, listener) === false + ? _on.call(this, type, listener) + : this + } +} + +/** + * Intercept add listener on event emitter. + * @private + */ + +function interceptRemoveListener (ee, fn) { + var _removeListener = ee.removeListener + var _off = ee.off + + if (_removeListener) { + Object.defineProperty(ee, 'removeListener', { + configurable: true, + value: removeListener, + writable: true + }) + } + + if (_off) { + Object.defineProperty(ee, 'off', { + configurable: true, + value: off, + writable: true + }) + } + + return _removeListener || _off || noop + + function removeListener (type, listener) { + return fn.call(this, type, listener) === false + ? _removeListener.call(this, type, listener) + : this + } + + function off (type, listener) { + return fn.call(this, type, listener) === false + ? _off.call(this, type, listener) + : this + } +} + +/** + * Reusable no-op function. + * @private + */ + +function noop () {} + /** * Default filter function. * @private diff --git a/test/compression.js b/test/compression.js index b6166eb..0318fe8 100644 --- a/test/compression.js +++ b/test/compression.js @@ -315,6 +315,192 @@ describe('compression()', function () { .expect(200, done) }) + it('should support removeListener("drain") after on("drain"); stream present', function (done) { + // compression doesn't proxy listenerCount() to the compression stream, so + // instead watch for a MaxListenersExceededWarning + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain") after addListener("drain")', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support off("drain") after addListener("drain")', function (done) { + if (!require('events').EventEmitter.prototype.off) { // off was added in Node.js v10 + this.skip() + } + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.off('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); buffered', function (done) { + // Variant of above tests for scenario when the listener is buffered (stream + // is not yet present). + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + res.on('end', function () {}) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) { + // Variant of above test for scenario when the listener is buffered (stream + // is not yet present) and the same listener is added two or more times. + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should not leak event listeners when res.unpipe() is used (#135)', function (done) { + // unpipe and stream.Readable were added in v0.9.4 + var stream = require('stream') + if (!(stream.Readable && stream.Readable.prototype.unpipe)) { + this.skip() + } + + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + var server = createServer({ threshold: 0 }, function (req, res) { + var times = 0 + var int = setInterval(function () { + var rs = require('fs').createReadStream('does not exist') + rs.on('error', function (e) { + rs.unpipe(res) + }) + rs.pipe(res) + if (times++ > res.getMaxListeners()) { + clearInterval(int) + res.end('hello, world') + } + }) + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + describeHttp2('http2', function () { it('should work with http2 server', function (done) { var server = createHttp2Server({ threshold: 0 }, function (req, res) { @@ -714,6 +900,13 @@ function createServer (opts, fn) { return } + if (typeof res.getMaxListeners !== 'function') { + // Added in v0.11.2 + res.getMaxListeners = function getMaxListeners () { + return 10 + } + } + fn(req, res) }) })