Skip to content

Commit

Permalink
Add tests for slow streams and chunk concatenation by network
Browse files Browse the repository at this point in the history
  • Loading branch information
bencmbrook committed Oct 22, 2023
1 parent bbdbe97 commit db54619
Showing 1 changed file with 54 additions and 1 deletion.
55 changes: 54 additions & 1 deletion test/mux-web-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,61 @@ describe('mux/demux', () => {
assert.deepStrictEqual(demuxedData, inputData);
});

test('`mux` works on `Uint8Array` streams', async () => {
const uint8Stream = new Response('hello world').body!;
const muxedStream = muxer([uint8Stream]);
const demuxedStream = demuxer(muxedStream, 1)[0]!;

let result = '';
for await (const chunk of demuxedStream as any as AsyncIterable<Uint8Array>) {
assert.ok(chunk instanceof Uint8Array);
result += new TextDecoder().decode(chunk);
}
assert.equal('hello world', result);
});

test('`demux` can handle chunks that were concatenated by network pipes', async () => {
const testData = ['a', 'b', 'c'];
const stream = createStreamFromArray(testData);
const muxedStream = muxer([stream]);

// Concatenate the multiplexed chunks (e.g., something that can happen over a network pipe)
// e.g., when `fetch`ing a multiplexed stream from a server
const chunksToConcat: Uint8Array[] = [];
const concatenatedMuxedStream = muxedStream.pipeThrough(
new TransformStream({
transform(chunk, controller) {
chunksToConcat.push(chunk);
controller.enqueue(new Uint8Array([]));
},
flush(controller) {
// Get total length of Uint8Arrays in `chunksToConcat`
const totalLength = chunksToConcat.reduce(
(acc, chunk) => acc + chunk.length,
0,
);
// Create a new array with total length and merge all source arrays.\
const concatenatedChunks = new Uint8Array(totalLength);
let offset = 0;
for (let chunk of chunksToConcat) {
concatenatedChunks.set(chunk, offset);
offset += chunk.length;
}

controller.enqueue(concatenatedChunks);
},
}),
);

// Demux normally
const demuxedStream = demuxer(concatenatedMuxedStream, 1)[0]!;
const resultData = await readStreamToArray(demuxedStream);

assert.deepStrictEqual(resultData, testData);
});

// Test async is not blocking
test('async is not blocking - streams do not wait on slowest stream', async () => {
test('asynchronous input streams do not block - `mux` continues with fast streams while waiting on slow streams', async () => {
// Create test data
const originalData = [
[1, 2, 3],
Expand Down

0 comments on commit db54619

Please sign in to comment.