Skip to content

Commit

Permalink
support res.removeLister("drain"), res.once("drain")
Browse files Browse the repository at this point in the history
Fixes #152
Fixes #135
  • Loading branch information
zbjornson committed Mar 12, 2019
1 parent edb43f3 commit 9d85331
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 0 deletions.
28 changes: 28 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function compression (options) {

var _end = res.end
var _on = res.on
var _removeListener = res.removeListener
var _write = res.write

// flush
Expand Down Expand Up @@ -131,6 +132,33 @@ function compression (options) {
return this
}

res.addListener = res.on

res.removeListener = function removeListener (type, listener) {
if (!listeners || type !== 'drain') {
return _removeListener.call(this, type, listener)
}

if (stream) {
return stream.removeListener(type, listener)
}

// remove buffered listener
for (var i = listeners.length - 1; i >= 0; i--) {
if (listeners[i][0] === type && listeners[i][1] === listener) {
listeners.splice(i, 1)
}
}

return this
}

/* istanbul ignore next */
if (res.off) {
// emitter.off was added in Node.js v10+; don't add it to earlier versions
res.off = res.removeListener
}

function nocompress (msg) {
debug('no compression: %s', msg)
addListeners(res, _on, listeners)
Expand Down
193 changes: 193 additions & 0 deletions test/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,192 @@ describe('compression()', function () {
.expect(200, done)
})

it('should support removeListener("drain") after on("drain"); stream present', function (done) {
// compression doesn't proxy listenerCount() to the compression stream, so
// instead watch for a MaxListenersExceededWarning
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({threshold: 0}, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain") after addListener("drain")', function (done) {
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({threshold: 0}, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support off("drain") after addListener("drain")', function (done) {
if (!require('events').EventEmitter.prototype.off) { // off was added in Node.js v10
this.skip()
}
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({threshold: 0}, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.off('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); buffered', function (done) {
// Variant of above tests for scenario when the listener is buffered (stream
// is not yet present).
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({threshold: 0}, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
res.on('end', function () {})
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) {
// Variant of above test for scenario when the listener is buffered (stream
// is not yet present) and the same listener is added two or more times.
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({threshold: 0}, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should not leak event listeners when res.unpipe() is used (#135)', function (done) {
// unpipe and stream.Readable were added in v0.9.4
var stream = require('stream')
if (!(stream.Readable && stream.Readable.prototype.unpipe)) {
this.skip()
}

var hasWarned = false
var onWarning = function () {
hasWarned = true
}
var server = createServer({threshold: 0}, function (req, res) {
var times = 0
var int = setInterval(function () {
var rs = require('fs').createReadStream('does not exist')
rs.on('error', function (e) {
rs.unpipe(res)
})
rs.pipe(res)
if (times++ > res.getMaxListeners()) {
clearInterval(int)
res.end('hello, world')
}
})
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

describe('threshold', function () {
it('should not compress responses below the threshold size', function (done) {
var server = createServer({ threshold: '1kb' }, function (req, res) {
Expand Down Expand Up @@ -669,6 +855,13 @@ function createServer (opts, fn) {
return
}

if (typeof res.getMaxListeners !== 'function') {
// Added in v0.11.2
res.getMaxListeners = function getMaxListeners () {
return 10
}
}

fn(req, res)
})
})
Expand Down

0 comments on commit 9d85331

Please sign in to comment.