Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support res.removeListener('drain'), res.once('drain') #153

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 122 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,39 @@ function compression (options) {
var stream

var _end = res.end
var _on = res.on
var _write = res.write

// proxy drain events from stream
var _addListener = interceptAddListener(res, function (type, listener) {
if (!listeners || type !== 'drain') {
// skip intercept
return false
} else if (stream) {
// add listener to stream instead
stream.on(type, listener)
} else {
// buffer listeners for future stream
listeners.push([type, listener])
}
})

interceptRemoveListener(res, function (type, listener) {
if (!listeners || type !== 'drain') {
// skip intercept
return false
} else if (stream) {
// remove listener from stream
stream.removeListener(type, listener)
} else {
// remove buffered listener
for (var i = listeners.length - 1; i >= 0; i--) {
if (listeners[i][0] === type && listeners[i][1] === listener) {
listeners.splice(i, 1)
}
}
}
})

// flush
res.flush = function flush () {
if (stream) {
Expand Down Expand Up @@ -116,24 +146,9 @@ function compression (options) {
: stream.end()
}

res.on = function on (type, listener) {
if (!listeners || type !== 'drain') {
return _on.call(this, type, listener)
}

if (stream) {
return stream.on(type, listener)
}

// buffer listeners for future stream
listeners.push([type, listener])

return this
}

function nocompress (msg) {
debug('no compression: %s', msg)
addListeners(res, _on, listeners)
addListeners(res, _addListener, listeners)
listeners = null
}

Expand Down Expand Up @@ -207,7 +222,7 @@ function compression (options) {
_end.call(res)
})

_on.call(res, 'drain', function onResponseDrain () {
_addListener.call(res, 'drain', function onResponseDrain () {
stream.resume()
})
})
Expand All @@ -221,9 +236,9 @@ function compression (options) {
* @private
*/

function addListeners (stream, on, listeners) {
function addListeners (stream, addListener, listeners) {
for (var i = 0; i < listeners.length; i++) {
on.apply(stream, listeners[i])
addListener.apply(stream, listeners[i])
}
}

Expand All @@ -241,6 +256,93 @@ function chunkLength (chunk, encoding) {
: Buffer.byteLength(chunk, encoding)
}

/**
* Intercept add listener on event emitter.
* @private
*/

function interceptAddListener (ee, fn) {
var _addListener = ee.addListener
var _on = ee.on

if (_addListener) {
Object.defineProperty(ee, 'addListener', {
configurable: true,
value: addListener,
writable: true
})
}

if (_on) {
Object.defineProperty(ee, 'on', {
configurable: true,
value: on,
writable: true
})
}

return _addListener || _on || noop

function addListener (type, listener) {
return fn.call(this, type, listener) === false
? _addListener.call(this, type, listener)
: this
}

function on (type, listener) {
return fn.call(this, type, listener) === false
? _on.call(this, type, listener)
: this
}
}

/**
* Intercept add listener on event emitter.
* @private
*/

function interceptRemoveListener (ee, fn) {
var _removeListener = ee.removeListener
var _off = ee.off

if (_removeListener) {
Object.defineProperty(ee, 'removeListener', {
configurable: true,
value: removeListener,
writable: true
})
}

if (_off) {
Object.defineProperty(ee, 'off', {
configurable: true,
value: off,
writable: true
})
}

return _removeListener || _off || noop

function removeListener (type, listener) {
return fn.call(this, type, listener) === false
? _removeListener.call(this, type, listener)
: this
}

function off (type, listener) {
return fn.call(this, type, listener) === false
? _off.call(this, type, listener)
: this
}
}

/**
* Reusable no-op function.
* @private
*/

function noop () {}

/**
* Default filter function.
* @private
Expand Down
193 changes: 193 additions & 0 deletions test/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,192 @@ describe('compression()', function () {
.expect(200, done)
})

it('should support removeListener("drain") after on("drain"); stream present', function (done) {
// compression doesn't proxy listenerCount() to the compression stream, so
// instead watch for a MaxListenersExceededWarning
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain") after addListener("drain")', function (done) {
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support off("drain") after addListener("drain")', function (done) {
if (!require('events').EventEmitter.prototype.off) { // off was added in Node.js v10
this.skip()
}
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.off('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); buffered', function (done) {
// Variant of above tests for scenario when the listener is buffered (stream
// is not yet present).
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
res.on('end', function () {})
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) {
// Variant of above test for scenario when the listener is buffered (stream
// is not yet present) and the same listener is added two or more times.
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should not leak event listeners when res.unpipe() is used (#135)', function (done) {
// unpipe and stream.Readable were added in v0.9.4
var stream = require('stream')
if (!(stream.Readable && stream.Readable.prototype.unpipe)) {
this.skip()
}

var hasWarned = false
var onWarning = function () {
hasWarned = true
}
var server = createServer({ threshold: 0 }, function (req, res) {
var times = 0
var int = setInterval(function () {
var rs = require('fs').createReadStream('does not exist')
rs.on('error', function (e) {
rs.unpipe(res)
})
rs.pipe(res)
if (times++ > res.getMaxListeners()) {
clearInterval(int)
res.end('hello, world')
}
})
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

describeHttp2('http2', function () {
it('should work with http2 server', function (done) {
var server = createHttp2Server({ threshold: 0 }, function (req, res) {
Expand Down Expand Up @@ -714,6 +900,13 @@ function createServer (opts, fn) {
return
}

if (typeof res.getMaxListeners !== 'function') {
// Added in v0.11.2
res.getMaxListeners = function getMaxListeners () {
return 10
}
}

fn(req, res)
})
})
Expand Down
Loading