From 3136680d90683ddf2e72c97ec30c8bff84517dac Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Tue, 11 Jun 2024 22:29:49 +0200 Subject: [PATCH] [feature] Add support for `Blob` Closes #2206 --- doc/ws.md | 18 ++- lib/constants.js | 8 +- lib/receiver.js | 2 + lib/sender.js | 191 ++++++++++++++++------ lib/validation.js | 21 +++ lib/websocket.js | 76 +++++++-- test/fixtures/file.txt | 1 + test/receiver.test.js | 40 ++++- test/sender.test.js | 142 +++++++++++++++-- test/websocket.test.js | 354 ++++++++++++++++++++++++++++++++++++++++- 10 files changed, 760 insertions(+), 93 deletions(-) create mode 100644 test/fixtures/file.txt diff --git a/doc/ws.md b/doc/ws.md index 1189fd02a..f30ad4cae 100644 --- a/doc/ws.md +++ b/doc/ws.md @@ -466,10 +466,11 @@ does nothing if `type` is not one of `'close'`, `'error'`, `'message'`, or - {String} A string indicating the type of binary data being transmitted by the connection. -This should be one of "nodebuffer", "arraybuffer" or "fragments". Defaults to -"nodebuffer". Type "fragments" will emit the array of fragments as received from -the sender, without copyfull concatenation, which is useful for the performance -of binary protocols transferring large messages with multiple fragments. +This should be one of "nodebuffer", "arraybuffer", "blob", or "fragments". +Defaults to "nodebuffer". Type "fragments" will emit the array of fragments as +received from the sender, without copyfull concatenation, which is useful for +the performance of binary protocols transferring large messages with multiple +fragments. ### websocket.bufferedAmount @@ -538,7 +539,8 @@ is a noop if the ready state is `CONNECTING` or `CLOSED`. ### websocket.ping([data[, mask]][, callback]) -- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The +- `data` + {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The data to send in the ping frame. - `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to `true` when `websocket` is not a server client. @@ -550,7 +552,8 @@ Send a ping. This method throws an error if the ready state is `CONNECTING`. ### websocket.pong([data[, mask]][, callback]) -- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The +- `data` + {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The data to send in the pong frame. - `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to `true` when `websocket` is not a server client. @@ -588,7 +591,8 @@ only removes listeners added with ### websocket.send(data[, options][, callback]) -- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The +- `data` + {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The data to send. `Object` values are only supported if they conform to the requirements of [`Buffer.from()`][]. If those constraints are not met, a `TypeError` is thrown. diff --git a/lib/constants.js b/lib/constants.js index d691b30a1..74214d466 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -1,9 +1,15 @@ 'use strict'; +const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments']; +const hasBlob = typeof Blob !== 'undefined'; + +if (hasBlob) BINARY_TYPES.push('blob'); + module.exports = { - BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'], + BINARY_TYPES, EMPTY_BUFFER: Buffer.alloc(0), GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', + hasBlob, kForOnEventAttribute: Symbol('kIsForOnEventAttribute'), kListener: Symbol('kListener'), kStatusCode: Symbol('status-code'), diff --git a/lib/receiver.js b/lib/receiver.js index 70dfd9933..54d9b4fad 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -559,6 +559,8 @@ class Receiver extends Writable { data = concat(fragments, messageLength); } else if (this._binaryType === 'arraybuffer') { data = toArrayBuffer(concat(fragments, messageLength)); + } else if (this._binaryType === 'blob') { + data = new Blob(fragments); } else { data = fragments; } diff --git a/lib/sender.js b/lib/sender.js index 5ea2986ee..1c4734df1 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -6,8 +6,8 @@ const { Duplex } = require('stream'); const { randomFillSync } = require('crypto'); const PerMessageDeflate = require('./permessage-deflate'); -const { EMPTY_BUFFER } = require('./constants'); -const { isValidStatusCode } = require('./validation'); +const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants'); +const { isBlob, isValidStatusCode } = require('./validation'); const { mask: applyMask, toBuffer } = require('./buffer-util'); const kByteLength = Symbol('kByteLength'); @@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024; let randomPool; let randomPoolPointer = RANDOM_POOL_SIZE; +const DEFAULT = 0; +const DEFLATING = 1; +const GET_BLOB_DATA = 2; + /** * HyBi Sender implementation. */ @@ -42,8 +46,10 @@ class Sender { this._compress = false; this._bufferedBytes = 0; - this._deflating = false; this._queue = []; + this._state = DEFAULT; + this.onerror = NOOP; + this[kWebSocket] = undefined; } /** @@ -205,7 +211,7 @@ class Sender { rsv1: false }; - if (this._deflating) { + if (this._state !== DEFAULT) { this.enqueue([this.dispatch, buf, false, options, cb]); } else { this.sendFrame(Sender.frame(buf, options), cb); @@ -227,6 +233,9 @@ class Sender { if (typeof data === 'string') { byteLength = Buffer.byteLength(data); readOnly = false; + } else if (isBlob(data)) { + byteLength = data.size; + readOnly = false; } else { data = toBuffer(data); byteLength = data.length; @@ -248,7 +257,13 @@ class Sender { rsv1: false }; - if (this._deflating) { + if (isBlob(data)) { + if (this._state !== DEFAULT) { + this.enqueue([this.getBlobData, data, false, options, cb]); + } else { + this.getBlobData(data, false, options, cb); + } + } else if (this._state !== DEFAULT) { this.enqueue([this.dispatch, data, false, options, cb]); } else { this.sendFrame(Sender.frame(data, options), cb); @@ -270,6 +285,9 @@ class Sender { if (typeof data === 'string') { byteLength = Buffer.byteLength(data); readOnly = false; + } else if (isBlob(data)) { + byteLength = data.size; + readOnly = false; } else { data = toBuffer(data); byteLength = data.length; @@ -291,7 +309,13 @@ class Sender { rsv1: false }; - if (this._deflating) { + if (isBlob(data)) { + if (this._state !== DEFAULT) { + this.enqueue([this.getBlobData, data, false, options, cb]); + } else { + this.getBlobData(data, false, options, cb); + } + } else if (this._state !== DEFAULT) { this.enqueue([this.dispatch, data, false, options, cb]); } else { this.sendFrame(Sender.frame(data, options), cb); @@ -325,6 +349,9 @@ class Sender { if (typeof data === 'string') { byteLength = Buffer.byteLength(data); readOnly = false; + } else if (isBlob(data)) { + byteLength = data.size; + readOnly = false; } else { data = toBuffer(data); byteLength = data.length; @@ -352,40 +379,107 @@ class Sender { if (options.fin) this._firstFragment = true; - if (perMessageDeflate) { - const opts = { - [kByteLength]: byteLength, - fin: options.fin, - generateMask: this._generateMask, - mask: options.mask, - maskBuffer: this._maskBuffer, - opcode, - readOnly, - rsv1 - }; - - if (this._deflating) { - this.enqueue([this.dispatch, data, this._compress, opts, cb]); + const opts = { + [kByteLength]: byteLength, + fin: options.fin, + generateMask: this._generateMask, + mask: options.mask, + maskBuffer: this._maskBuffer, + opcode, + readOnly, + rsv1 + }; + + if (isBlob(data)) { + if (this._state !== DEFAULT) { + this.enqueue([this.getBlobData, data, this._compress, opts, cb]); } else { - this.dispatch(data, this._compress, opts, cb); + this.getBlobData(data, this._compress, opts, cb); } + } else if (this._state !== DEFAULT) { + this.enqueue([this.dispatch, data, this._compress, opts, cb]); } else { - this.sendFrame( - Sender.frame(data, { - [kByteLength]: byteLength, - fin: options.fin, - generateMask: this._generateMask, - mask: options.mask, - maskBuffer: this._maskBuffer, - opcode, - readOnly, - rsv1: false - }), - cb - ); + this.dispatch(data, this._compress, opts, cb); } } + /** + * Calls queued callbacks with an error. + * + * @param {Error} err The error to call the callbacks with + * @param {Function} [cb] The first callback + * @private + */ + callCallbacks(err, cb) { + if (typeof cb === 'function') cb(err); + + for (let i = 0; i < this._queue.length; i++) { + const params = this._queue[i]; + const callback = params[params.length - 1]; + + if (typeof callback === 'function') callback(err); + } + } + + /** + * Gets the contents of a blob as binary data. + * + * @param {Blob} blob The blob + * @param {Boolean} [compress=false] Specifies whether or not to compress + * the data + * @param {Object} options Options object + * @param {Boolean} [options.fin=false] Specifies whether or not to set the + * FIN bit + * @param {Function} [options.generateMask] The function used to generate the + * masking key + * @param {Boolean} [options.mask=false] Specifies whether or not to mask + * `data` + * @param {Buffer} [options.maskBuffer] The buffer used to store the masking + * key + * @param {Number} options.opcode The opcode + * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be + * modified + * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the + * RSV1 bit + * @param {Function} [cb] Callback + * @private + */ + getBlobData(blob, compress, options, cb) { + this._bufferedBytes += options[kByteLength]; + this._state = GET_BLOB_DATA; + + blob + .arrayBuffer() + .then((arrayBuffer) => { + if (this._socket.destroyed) { + const err = new Error( + 'The socket was closed while the blob was being read' + ); + + this.callCallbacks(err, cb); + return; + } + + this._bufferedBytes -= options[kByteLength]; + const data = toBuffer(arrayBuffer); + + if (!compress) { + this._state = DEFAULT; + this.sendFrame(Sender.frame(data, options), cb); + this.dequeue(); + } else { + this.dispatch(data, compress, options, cb); + } + }) + .catch((err) => { + // + // `onError` is called in the next tick to not suppress the throwing + // behavior of the `'error'` event emitted by the `WebSocket` object. + // + process.nextTick(onError, this, err, cb); + }); + } + /** * Dispatches a message. * @@ -418,27 +512,19 @@ class Sender { const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; this._bufferedBytes += options[kByteLength]; - this._deflating = true; + this._state = DEFLATING; perMessageDeflate.compress(data, options.fin, (_, buf) => { if (this._socket.destroyed) { const err = new Error( 'The socket was closed while data was being compressed' ); - if (typeof cb === 'function') cb(err); - - for (let i = 0; i < this._queue.length; i++) { - const params = this._queue[i]; - const callback = params[params.length - 1]; - - if (typeof callback === 'function') callback(err); - } - + this.callCallbacks(err, cb); return; } this._bufferedBytes -= options[kByteLength]; - this._deflating = false; + this._state = DEFAULT; options.readOnly = false; this.sendFrame(Sender.frame(buf, options), cb); this.dequeue(); @@ -451,7 +537,7 @@ class Sender { * @private */ dequeue() { - while (!this._deflating && this._queue.length) { + while (this._state === DEFAULT && this._queue.length) { const params = this._queue.shift(); this._bufferedBytes -= params[3][kByteLength]; @@ -490,3 +576,16 @@ class Sender { } module.exports = Sender; + +/** + * Handles a `Sender` error. + * + * @param {Sender} sender The `Sender` instance + * @param {Error} err The error + * @param {Function} [cb] The first pending callback + * @private + */ +function onError(sender, err, cb) { + sender.callCallbacks(err, cb); + sender.onerror(err); +} diff --git a/lib/validation.js b/lib/validation.js index c352e6ea7..5954eecbb 100644 --- a/lib/validation.js +++ b/lib/validation.js @@ -2,6 +2,8 @@ const { isUtf8 } = require('buffer'); +const { hasBlob } = require('./constants'); + // // Allowed token characters: // @@ -107,7 +109,26 @@ function _isValidUTF8(buf) { return true; } +/** + * Determines whether a value is a `Blob`. + * + * @param {*} value The value to be tested + * @return {Boolean} `true` if `value` is a `Blob`, else `false` + * @private + */ +function isBlob(value) { + return ( + hasBlob && + typeof value === 'object' && + typeof value.arrayBuffer === 'function' && + typeof value.type === 'string' && + typeof value.stream === 'function' && + value[Symbol.toStringTag] === 'Blob' + ); +} + module.exports = { + isBlob, isValidStatusCode, isValidUTF8: _isValidUTF8, tokenChars diff --git a/lib/websocket.js b/lib/websocket.js index 709ad825a..49fca886f 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -14,6 +14,8 @@ const { URL } = require('url'); const PerMessageDeflate = require('./permessage-deflate'); const Receiver = require('./receiver'); const Sender = require('./sender'); +const { isBlob } = require('./validation'); + const { BINARY_TYPES, EMPTY_BUFFER, @@ -58,6 +60,7 @@ class WebSocket extends EventEmitter { this._closeFrameSent = false; this._closeMessage = EMPTY_BUFFER; this._closeTimer = null; + this._errorEmitted = false; this._extensions = {}; this._paused = false; this._protocol = ''; @@ -90,9 +93,8 @@ class WebSocket extends EventEmitter { } /** - * This deviates from the WHATWG interface since ws doesn't support the - * required default "blob" type (instead we define a custom "nodebuffer" - * type). + * For historical reasons, the custom "nodebuffer" type is used by the default + * instead of "blob". * * @type {String} */ @@ -213,11 +215,14 @@ class WebSocket extends EventEmitter { skipUTF8Validation: options.skipUTF8Validation }); - this._sender = new Sender(socket, this._extensions, options.generateMask); + const sender = new Sender(socket, this._extensions, options.generateMask); + this._receiver = receiver; + this._sender = sender; this._socket = socket; receiver[kWebSocket] = this; + sender[kWebSocket] = this; socket[kWebSocket] = this; receiver.on('conclude', receiverOnConclude); @@ -227,6 +232,8 @@ class WebSocket extends EventEmitter { receiver.on('ping', receiverOnPing); receiver.on('pong', receiverOnPong); + sender.onerror = senderOnError; + // // These methods may not be available if `socket` is just a `Duplex`. // @@ -322,13 +329,7 @@ class WebSocket extends EventEmitter { } }); - // - // Specify a timeout for the closing handshake to complete. - // - this._closeTimer = setTimeout( - this._socket.destroy.bind(this._socket), - closeTimeout - ); + setCloseTimer(this); } /** @@ -1030,6 +1031,11 @@ function initAsClient(websocket, address, protocols, options) { */ function emitErrorAndClose(websocket, err) { websocket._readyState = WebSocket.CLOSING; + // + // The following assignment is practically useless and is done only for + // consistency. + // + websocket._errorEmitted = true; websocket.emit('error', err); websocket.emitClose(); } @@ -1110,7 +1116,7 @@ function abortHandshake(websocket, stream, message) { */ function sendAfterClose(websocket, data, cb) { if (data) { - const length = toBuffer(data).length; + const length = isBlob(data) ? data.size : toBuffer(data).length; // // The `_bufferedAmount` property is used only when the peer is a client and @@ -1186,7 +1192,10 @@ function receiverOnError(err) { websocket.close(err[kStatusCode]); } - websocket.emit('error', err); + if (!websocket._errorEmitted) { + websocket._errorEmitted = true; + websocket.emit('error', err); + } } /** @@ -1242,6 +1251,47 @@ function resume(stream) { stream.resume(); } +/** + * The `Sender` error event handler. + * + * @param {Error} The error + * @private + */ +function senderOnError(err) { + const websocket = this[kWebSocket]; + + if (websocket.readyState === WebSocket.CLOSED) return; + if (websocket.readyState === WebSocket.OPEN) { + websocket._readyState = WebSocket.CLOSING; + setCloseTimer(websocket); + } + + // + // `socket.end()` is used instead of `socket.destroy()` to allow the other + // peer to finish sending queued data. There is no need to set a timer here + // because `CLOSING` means that it is already set or not needed. + // + this._socket.end(); + + if (!websocket._errorEmitted) { + websocket._errorEmitted = true; + websocket.emit('error', err); + } +} + +/** + * Set a timer to destroy the underlying raw socket of a WebSocket. + * + * @param {WebSocket} websocket The WebSocket instance + * @private + */ +function setCloseTimer(websocket) { + websocket._closeTimer = setTimeout( + websocket._socket.destroy.bind(websocket._socket), + closeTimeout + ); +} + /** * The listener of the socket `'close'` event. * diff --git a/test/fixtures/file.txt b/test/fixtures/file.txt new file mode 100644 index 000000000..4dd1ef756 --- /dev/null +++ b/test/fixtures/file.txt @@ -0,0 +1 @@ +This is a file. diff --git a/test/receiver.test.js b/test/receiver.test.js index 88a6326d1..2433b987e 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -7,7 +7,7 @@ const EventEmitter = require('events'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Receiver = require('../lib/receiver'); const Sender = require('../lib/sender'); -const { EMPTY_BUFFER, kStatusCode } = require('../lib/constants'); +const { EMPTY_BUFFER, hasBlob, kStatusCode } = require('../lib/constants'); describe('Receiver', () => { it('parses an unmasked text message', (done) => { @@ -1061,6 +1061,44 @@ describe('Receiver', () => { }); }); + it("honors the 'blob' binary type", function (done) { + if (!hasBlob) return this.skip(); + + const receiver = new Receiver({ binaryType: 'blob' }); + const frags = [ + crypto.randomBytes(75688), + crypto.randomBytes(2688), + crypto.randomBytes(46753) + ]; + + receiver.on('message', (data, isBinary) => { + assert.ok(data instanceof Blob); + assert.ok(isBinary); + + data + .arrayBuffer() + .then((arrayBuffer) => { + assert.deepStrictEqual( + Buffer.from(arrayBuffer), + Buffer.concat(frags) + ); + + done(); + }) + .catch(done); + }); + + frags.forEach((frag, i) => { + Sender.frame(frag, { + fin: i === frags.length - 1, + opcode: i === 0 ? 2 : 0, + readOnly: true, + mask: false, + rsv1: false + }).forEach((buf) => receiver.write(buf)); + }); + }); + it('honors the `skipUTF8Validation` option (1/2)', (done) => { const receiver = new Receiver({ skipUTF8Validation: true }); diff --git a/test/sender.test.js b/test/sender.test.js index 532239fa1..df9057e8a 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -5,7 +5,7 @@ const assert = require('assert'); const extension = require('../lib/extension'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); -const { EMPTY_BUFFER } = require('../lib/constants'); +const { EMPTY_BUFFER, hasBlob } = require('../lib/constants'); class MockSocket { constructor({ write } = {}) { @@ -250,17 +250,15 @@ describe('Sender', () => { }); describe('#ping', () => { - it('works with multiple types of data', (done) => { + it('can send a string as ping payload', (done) => { const perMessageDeflate = new PerMessageDeflate(); let count = 0; const mockSocket = new MockSocket({ write: (data) => { if (++count < 3) return; - if (count % 2) { - assert.ok(data.equals(Buffer.from([0x89, 0x02]))); - } else if (count < 8) { - assert.ok(data.equals(Buffer.from([0x68, 0x69]))); + if (count === 3) { + assert.deepStrictEqual(data, Buffer.from([0x89, 0x02])); } else { assert.strictEqual(data, 'hi'); done(); @@ -273,27 +271,81 @@ describe('Sender', () => { perMessageDeflate.accept([{}]); + sender.send('foo', { compress: true, fin: true }); + sender.ping('hi', false); + }); + + it('can send a `TypedArray` as ping payload', (done) => { + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count === 1) { + assert.deepStrictEqual(data, Buffer.from([0x89, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + done(); + } + } + }); + + const sender = new Sender(mockSocket); const array = new Uint8Array([0x68, 0x69]); - sender.send('foo', { compress: true, fin: true }); - sender.ping(array.buffer, false); sender.ping(array, false); - sender.ping('hi', false); + }); + + it('can send an `ArrayBuffer` as ping payload', (done) => { + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count === 1) { + assert.deepStrictEqual(data, Buffer.from([0x89, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + done(); + } + } + }); + + const sender = new Sender(mockSocket); + const array = new Uint8Array([0x68, 0x69]); + + sender.ping(array.buffer, false); + }); + + it('can send a `Blob` as ping payload', function (done) { + if (!hasBlob) return this.skip(); + + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count % 2) { + assert.deepStrictEqual(data, Buffer.from([0x89, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + if (count === 4) done(); + } + } + }); + + const sender = new Sender(mockSocket); + const blob = new Blob(['hi']); + + sender.ping(blob, false); + sender.ping(blob, false); }); }); describe('#pong', () => { - it('works with multiple types of data', (done) => { + it('can send a string as ping payload', (done) => { const perMessageDeflate = new PerMessageDeflate(); let count = 0; const mockSocket = new MockSocket({ write: (data) => { if (++count < 3) return; - if (count % 2) { - assert.ok(data.equals(Buffer.from([0x8a, 0x02]))); - } else if (count < 8) { - assert.ok(data.equals(Buffer.from([0x68, 0x69]))); + if (count === 3) { + assert.deepStrictEqual(data, Buffer.from([0x8a, 0x02])); } else { assert.strictEqual(data, 'hi'); done(); @@ -306,12 +358,68 @@ describe('Sender', () => { perMessageDeflate.accept([{}]); + sender.send('foo', { compress: true, fin: true }); + sender.pong('hi', false); + }); + + it('can send a `TypedArray` as ping payload', (done) => { + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count === 1) { + assert.deepStrictEqual(data, Buffer.from([0x8a, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + done(); + } + } + }); + + const sender = new Sender(mockSocket); const array = new Uint8Array([0x68, 0x69]); - sender.send('foo', { compress: true, fin: true }); - sender.pong(array.buffer, false); sender.pong(array, false); - sender.pong('hi', false); + }); + + it('can send an `ArrayBuffer` as ping payload', (done) => { + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count === 1) { + assert.deepStrictEqual(data, Buffer.from([0x8a, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + done(); + } + } + }); + + const sender = new Sender(mockSocket); + const array = new Uint8Array([0x68, 0x69]); + + sender.pong(array.buffer, false); + }); + + it('can send a `Blob` as ping payload', function (done) { + if (!hasBlob) return this.skip(); + + let count = 0; + const mockSocket = new MockSocket({ + write: (data) => { + if (++count % 2) { + assert.deepStrictEqual(data, Buffer.from([0x8a, 0x02])); + } else { + assert.deepStrictEqual(data, Buffer.from([0x68, 0x69])); + if (count === 4) done(); + } + } + }); + + const sender = new Sender(mockSocket); + const blob = new Blob(['hi']); + + sender.pong(blob, false); + sender.pong(blob, false); }); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index 5570b1caf..6be86bfa1 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -22,7 +22,13 @@ const { Event, MessageEvent } = require('../lib/event-target'); -const { EMPTY_BUFFER, GUID, kListener, NOOP } = require('../lib/constants'); +const { + EMPTY_BUFFER, + GUID, + hasBlob, + kListener, + NOOP +} = require('../lib/constants'); const highWaterMark = getDefaultHighWaterMark ? getDefaultHighWaterMark(false) @@ -617,7 +623,7 @@ describe('WebSocket', () => { }); describe('Events', () => { - it("emits an 'error' event if an error occurs", (done) => { + it("emits an 'error' event if an error occurs (1/2)", (done) => { let clientCloseEventEmitted = false; let serverClientCloseEventEmitted = false; @@ -655,6 +661,191 @@ describe('WebSocket', () => { }); }); + it("emits an 'error' event if an error occurs (2/2)", function (done) { + if (!fs.openAsBlob) return this.skip(); + + const file = path.join(__dirname, 'fixtures', 'file.txt'); + const data = fs.readFileSync(file); + + fs.openAsBlob(file) + .then((blob) => { + // + // "Modify" the file on disk by writing its own initial contents to + // it. + // + fs.writeFileSync(file, data); + process.nextTick(runTest, blob); + }) + .catch(done); + + function runTest(blob) { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + ws.send(blob); + + ws.on('error', (err) => { + assert.ok(err instanceof DOMException); + assert.strictEqual(err.name, 'NotReadableError'); + assert.strictEqual(err.message, 'The blob could not be read'); + + ws.on('close', () => { + wss.close(done); + }); + }); + }); + } + }); + + it("emits the 'error' event only once (1/2)", function (done) { + if (!fs.openAsBlob) return this.skip(); + + const file = path.join(__dirname, 'fixtures', 'file.txt'); + const data = fs.readFileSync(file); + + fs.openAsBlob(file) + .then((blob) => { + fs.writeFileSync(file, data); + process.nextTick(runTest, blob); + }) + .catch(done); + + function runTest(blob) { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + ws.on('open', () => { + ws.send('foo'); + ws.send(blob); + }); + + ws.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE'); + assert.strictEqual( + err.message, + 'Invalid WebSocket frame: invalid opcode 5' + ); + + ws.on('close', () => { + wss.close(done); + }); + }); + } + ); + + wss.on('connection', (ws) => { + ws._socket.write(Buffer.from([0x85, 0x00])); + }); + } + }); + + it("emits the 'error' event only once (2/2)", function (done) { + if (!fs.openAsBlob) return this.skip(); + + const file = path.join(__dirname, 'fixtures', 'file.txt'); + const data = fs.readFileSync(file); + + fs.openAsBlob(file) + .then((blob) => { + fs.writeFileSync(file, data); + process.nextTick(runTest, blob); + }) + .catch(done); + + function runTest(blob) { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.send(blob); + }); + + ws.on('error', (err) => { + assert.ok(err instanceof DOMException); + assert.strictEqual(err.name, 'NotReadableError'); + assert.strictEqual(err.message, 'The blob could not be read'); + + ws.on('close', () => { + wss.close(done); + }); + }); + } + ); + + wss.on('connection', (ws) => { + const buf = Buffer.from('c10100c101008500', 'hex'); + + ws._socket.write(buf); + }); + } + }); + + it("does not emit 'error' after 'close'", function (done) { + if (!fs.openAsBlob) return this.skip(); + + const randomString = crypto.randomBytes(16).toString('hex'); + const file = path.join(os.tmpdir(), `ws-${randomString}.bin`); + + fs.writeFileSync(file, crypto.randomBytes(1024 * 1024)); + + fs.openAsBlob(file) + .then((blob) => { + process.nextTick(runTest, blob); + }) + .catch(done); + + function runTest(blob) { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.send(blob, (err) => { + try { + assert.ok(err instanceof DOMException); + assert.strictEqual(err.name, 'NotReadableError'); + assert.strictEqual(err.message, 'The blob could not be read'); + } catch (e) { + ws.removeListener(onClose); + throw e; + } finally { + fs.unlinkSync(file); + } + + wss.close(done); + }); + }); + + ws.on('error', () => { + done(new Error("Unexpected 'error' event")); + }); + ws.on('close', onClose); + + function onClose() { + fs.writeFileSync(file, 'foo'); + } + }); + + wss.on('connection', (ws) => { + ws._socket.end(); + }); + } + }); + it('does not re-emit `net.Socket` errors', function (done) { // // `socket.resetAndDestroy()` is not available in Node.js < 16.17.0. @@ -2094,6 +2285,11 @@ describe('WebSocket', () => { ws.ping(); assert.strictEqual(ws.bufferedAmount, 4); + if (hasBlob) { + ws.ping(new Blob(['hi'])); + assert.strictEqual(ws.bufferedAmount, 6); + } + done(); }); }); @@ -2263,6 +2459,11 @@ describe('WebSocket', () => { ws.pong(); assert.strictEqual(ws.bufferedAmount, 4); + if (hasBlob) { + ws.pong(new Blob(['hi'])); + assert.strictEqual(ws.bufferedAmount, 6); + } + done(); }); }); @@ -2507,6 +2708,11 @@ describe('WebSocket', () => { ws.send(); assert.strictEqual(ws.bufferedAmount, 4); + if (hasBlob) { + ws.send(new Blob(['hi'])); + assert.strictEqual(ws.bufferedAmount, 6); + } + done(); }); }); @@ -2724,6 +2930,39 @@ describe('WebSocket', () => { }); }); + it('can send a `Blob`', function (done) { + if (!hasBlob) return this.skip(); + + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + const messages = []; + + ws.on('open', () => { + ws.send(new Blob(['foo'])); + ws.send(new Blob(['bar'])); + ws.close(); + }); + + ws.on('message', (message, isBinary) => { + assert.ok(isBinary); + messages.push(message.toString()); + + if (messages.length === 2) { + assert.deepStrictEqual(messages, ['foo', 'bar']); + wss.close(done); + } + }); + }); + + wss.on('connection', (ws) => { + ws.on('message', (message, isBinary) => { + assert.ok(isBinary); + ws.send(message); + }); + }); + }); + it('calls the callback when data is written out', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); @@ -2741,6 +2980,48 @@ describe('WebSocket', () => { }); }); + it('calls the callback if the socket is forcibly closed', function (done) { + if (!hasBlob) return this.skip(); + + const called = []; + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.send(new Blob(['foo']), (err) => { + called.push(1); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while the blob was being read' + ); + }); + ws.send('bar'); + ws.send('baz', (err) => { + called.push(2); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while the blob was being read' + ); + }); + + ws.terminate(); + }); + }); + + wss.on('connection', (ws) => { + ws.on('close', () => { + assert.deepStrictEqual(called, [1, 2]); + wss.close(done); + }); + }); + }); + it('works when the `data` argument is falsy', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); @@ -3645,25 +3926,41 @@ describe('WebSocket', () => { ws.onmessage = (evt) => { if (binaryType === 'nodebuffer') { assert.ok(Buffer.isBuffer(evt.data)); - assert.ok(evt.data.equals(buf)); + assert.deepStrictEqual(evt.data, buf); + next(); } else if (binaryType === 'arraybuffer') { assert.ok(evt.data instanceof ArrayBuffer); - assert.ok(Buffer.from(evt.data).equals(buf)); + assert.deepStrictEqual(Buffer.from(evt.data), buf); + next(); } else if (binaryType === 'fragments') { assert.deepStrictEqual(evt.data, [buf]); + next(); + } else if (binaryType === 'blob') { + assert.ok(evt.data instanceof Blob); + evt.data + .arrayBuffer() + .then((arrayBuffer) => { + assert.deepStrictEqual(Buffer.from(arrayBuffer), buf); + next(); + }) + .catch(done); } - next(); }; ws.send(buf); } + function close() { + ws.close(); + wss.close(done); + } + ws.onopen = () => { testType('nodebuffer', () => { testType('arraybuffer', () => { testType('fragments', () => { - ws.close(); - wss.close(done); + if (hasBlob) testType('blob', close); + else close(); }); }); }); @@ -4188,7 +4485,7 @@ describe('WebSocket', () => { ws.on('open', () => { ws._receiver.on('conclude', () => { - assert.ok(ws._sender._deflating); + assert.strictEqual(ws._sender._state, 1); }); ws.send('foo'); @@ -4366,6 +4663,47 @@ describe('WebSocket', () => { }); }); + it('can send a `Blob`', function (done) { + if (!hasBlob) return this.skip(); + + const wss = new WebSocket.Server( + { + perMessageDeflate: { threshold: 0 }, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + const messages = []; + + ws.on('open', () => { + ws.send(new Blob(['foo'])); + ws.send(new Blob(['bar'])); + ws.close(); + }); + + ws.on('message', (message, isBinary) => { + assert.ok(isBinary); + messages.push(message.toString()); + + if (messages.length === 2) { + assert.deepStrictEqual(messages, ['foo', 'bar']); + wss.close(done); + } + }); + } + ); + + wss.on('connection', (ws) => { + ws.on('message', (message, isBinary) => { + assert.ok(isBinary); + ws.send(message); + }); + }); + }); + it('ignores the `compress` option if the extension is disabled', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`, {