From e7e90c3abf62a765441471eb32d640995cf83ec9 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Thu, 18 May 2023 15:45:05 +0530 Subject: [PATCH] Support returning streams from callbacks You can now pass a `Readable` stream into `rawBody` inside `thenCallback`. --- src/rules/passthrough-handling.ts | 20 +++++++++- .../requests/request-handler-definitions.ts | 2 +- src/rules/requests/request-handlers.ts | 9 ++++- src/serialization/body-serialization.ts | 5 ++- .../handlers/callback-response.spec.ts | 37 +++++++++++++++++++ 5 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/rules/passthrough-handling.ts b/src/rules/passthrough-handling.ts index 0da7c61b2..688d486b7 100644 --- a/src/rules/passthrough-handling.ts +++ b/src/rules/passthrough-handling.ts @@ -13,6 +13,7 @@ import { CallbackRequestResult, CallbackResponseMessageResult } from './requests/request-handler-definitions'; +import { Readable } from 'stream'; // TLS settings for proxied connections, intended to avoid TLS fingerprint blocking // issues so far as possible, by closely emulating a Firefox Client Hello: @@ -86,7 +87,24 @@ export async function buildOverriddenBody( headers: Headers ) { // Raw bodies are easy: use them as is. - if (callbackResult?.rawBody) return callbackResult?.rawBody!; + if (callbackResult?.rawBody instanceof Readable) { + return new Promise((resolve, reject) => { + let buf = new Array(); + let readable = callbackResult?.rawBody as Readable; + readable.on('data', data => { + buf.push(data); + }); + + readable.on('end', () => { + resolve(Buffer.from(buf)); + }); + + readable.on('error', (err) => { + reject(err); + }) + }); + } else if (callbackResult?.rawBody) return callbackResult?.rawBody!; + // In the json/body case, we need to get the body and transform it into a buffer // for consistent handling later, and encode it to match the headers. diff --git a/src/rules/requests/request-handler-definitions.ts b/src/rules/requests/request-handler-definitions.ts index 93f232fef..b3faa5a86 100644 --- a/src/rules/requests/request-handler-definitions.ts +++ b/src/rules/requests/request-handler-definitions.ts @@ -204,7 +204,7 @@ export interface CallbackResponseMessageResult { * You should only return one body field: either `body`, `rawBody` or * `json`. */ - rawBody?: Buffer | Uint8Array; + rawBody?: Buffer | Uint8Array | Readable; /** * A JSON value, which will be stringified and send as a JSON-encoded diff --git a/src/rules/requests/request-handlers.ts b/src/rules/requests/request-handlers.ts index 79aa610f1..11d5b4895 100644 --- a/src/rules/requests/request-handlers.ts +++ b/src/rules/requests/request-handlers.ts @@ -9,7 +9,7 @@ import * as fs from 'fs/promises'; import * as h2Client from 'http2-wrapper'; import CacheableLookup from 'cacheable-lookup'; import { decode as decodeBase64 } from 'base64-arraybuffer'; -import { Transform } from 'stream'; +import { Readable, Transform } from 'stream'; import { stripIndent, oneLine } from 'common-tags'; import { TypedError } from 'typed-error'; @@ -183,7 +183,12 @@ async function writeResponseFromCallback(result: CallbackResponseMessageResult, result.statusMessage, result.headers ); - response.end(result.rawBody || ""); + + if (result.rawBody instanceof Readable) { + (result.rawBody as Readable).pipe(response); + } else { + response.end(result.rawBody || ""); + } } export class CallbackHandler extends CallbackHandlerDefinition { diff --git a/src/serialization/body-serialization.ts b/src/serialization/body-serialization.ts index 27b385245..9c5644bab 100644 --- a/src/serialization/body-serialization.ts +++ b/src/serialization/body-serialization.ts @@ -7,6 +7,7 @@ import { buildBodyReader, isMockttpBody } from "../util/request-utils"; import { Replace } from "../util/type-utils"; import { deserializeBuffer, serializeBuffer } from "./serialization"; +import { Readable } from 'stream'; export function withSerializedBodyReader(input: T): Replace { let serializedBody: string | undefined; @@ -52,7 +53,7 @@ export function withSerializedCallbackBuffers server.start()); afterEach(() => server.stop()); + it("should allow returning a stream", async () => { + await server.forGet("/upstream").thenJson( + 200, + { "hello": "world!" }, + { "x-test": "success" } + ); + + await server.forGet("/mocked-endpoint").thenCallback(async () => { + let { protocol, hostname, pathname, port } = new URL(server.urlFor("/upstream")); + let opts = { + hostname, method: "GET", path: pathname, protocol, port + }; + + let upstreamMessage = await new Promise((resolve, reject) => { + let upstreamReq = http.request(opts, (res) => { + resolve(res); + }); + upstreamReq.end(); + }); + + return { + headers: upstreamMessage.headers, + rawBody: upstreamMessage as Readable + }; + }); + + let response = await fetch(server.urlFor("/mocked-endpoint")); + + expect(response.status).to.equal(200); + let resJson = await response.json(); + expect(resJson["hello"]).to.equal("world!"); + expect(response.headers.get("x-test")).to.equal("success"); + }); + it("should allow mocking the status with a callback", async () => { await server.forGet("/mocked-endpoint").thenCallback(() => { return { statusCode: 204, statusMessage: 'all good' }