Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support returning streams from callbacks #145

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/rules/passthrough-handling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
CallbackRequestResult,
CallbackResponseMessageResult
} from './requests/request-handler-definitions';
import { Readable } from 'stream';

// TLS settings for proxied connections, intended to avoid TLS fingerprint blocking
// issues so far as possible, by closely emulating a Firefox Client Hello:
Expand Down Expand Up @@ -86,7 +87,24 @@ export async function buildOverriddenBody(
headers: Headers
) {
// Raw bodies are easy: use them as is.
if (callbackResult?.rawBody) return callbackResult?.rawBody!;
if (callbackResult?.rawBody instanceof Readable) {
return new Promise<Buffer>((resolve, reject) => {
let buf = new Array<any>();
let readable = callbackResult?.rawBody as Readable;
readable.on('data', data => {
buf.push(data);
});

readable.on('end', () => {
resolve(Buffer.from(buf));
});

readable.on('error', (err) => {
reject(err);
})
});
} else if (callbackResult?.rawBody) return callbackResult?.rawBody!;


// In the json/body case, we need to get the body and transform it into a buffer
// for consistent handling later, and encode it to match the headers.
Expand Down
2 changes: 1 addition & 1 deletion src/rules/requests/request-handler-definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export interface CallbackResponseMessageResult {
* You should only return one body field: either `body`, `rawBody` or
* `json`.
*/
rawBody?: Buffer | Uint8Array;
rawBody?: Buffer | Uint8Array | Readable;

/**
* A JSON value, which will be stringified and send as a JSON-encoded
Expand Down
9 changes: 7 additions & 2 deletions src/rules/requests/request-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as fs from 'fs/promises';
import * as h2Client from 'http2-wrapper';
import CacheableLookup from 'cacheable-lookup';
import { decode as decodeBase64 } from 'base64-arraybuffer';
import { Transform } from 'stream';
import { Readable, Transform } from 'stream';
import { stripIndent, oneLine } from 'common-tags';
import { TypedError } from 'typed-error';

Expand Down Expand Up @@ -183,7 +183,12 @@ async function writeResponseFromCallback(result: CallbackResponseMessageResult,
result.statusMessage,
result.headers
);
response.end(result.rawBody || "");

if (result.rawBody instanceof Readable) {
(result.rawBody as Readable).pipe(response);
} else {
response.end(result.rawBody || "");
}
}

export class CallbackHandler extends CallbackHandlerDefinition {
Expand Down
5 changes: 3 additions & 2 deletions src/serialization/body-serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { buildBodyReader, isMockttpBody } from "../util/request-utils";
import { Replace } from "../util/type-utils";

import { deserializeBuffer, serializeBuffer } from "./serialization";
import { Readable } from 'stream';

export function withSerializedBodyReader<T extends {
body: CompletedBody
Expand All @@ -33,7 +34,7 @@ export function withDeserializedBodyReader<T extends { headers: Headers, body: C
*/
export function withSerializedCallbackBuffers<T extends {
body?: CompletedBody | Buffer | Uint8Array | ArrayBuffer | string,
rawBody?: Buffer | Uint8Array
rawBody?: Buffer | Uint8Array | Readable
}>(input: T): Replace<T, { body: string | undefined }> {
let serializedBody: string | undefined;

Expand All @@ -52,7 +53,7 @@ export function withSerializedCallbackBuffers<T extends {
return {
...input,
body: serializedBody,
rawBody: input.rawBody
rawBody: input.rawBody && !(input.rawBody instanceof Readable)
? serializeBuffer(asBuffer(input.rawBody))
: undefined
};
Expand Down
37 changes: 37 additions & 0 deletions test/integration/handlers/callback-response.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import * as zlib from 'zlib';
import { getLocal } from "../../..";
import { expect, fetch, isNode, isWeb, headersToObject } from "../../test-utils";
import * as http from 'node:http';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are failing because of this - this spec runs in browsers too, so you can't use Node modules like this.

You could make this test node-only, and then this would work if you import this directly as http (the tests are already configured to exclude that in browser builds) but it would be better to write this so it will pass in both node & browsers.

I think you can just use streams directly without http.request to test this equally well, in the same way we do testing in https://github.com/httptoolkit/mockttp/blob/main/test/integration/handlers/stream-response.spec.ts.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops :) The one part of the dev environment that I hadn't setup happens to be exactly where I created a test failure. I picked node:http instinctively because that's what I was going to use in my planned use case for this feature. I'll try to figure this out today.

import { Readable } from 'stream';
import { IncomingMessage } from 'http2-wrapper';

describe("Callback response handlers", function () {

Expand All @@ -13,6 +16,40 @@ describe("Callback response handlers", function () {
beforeEach(() => server.start());
afterEach(() => server.stop());

it("should allow returning a stream", async () => {
await server.forGet("/upstream").thenJson(
200,
{ "hello": "world!" },
{ "x-test": "success" }
);

await server.forGet("/mocked-endpoint").thenCallback(async () => {
let { protocol, hostname, pathname, port } = new URL(server.urlFor("/upstream"));
let opts = {
hostname, method: "GET", path: pathname, protocol, port
};

let upstreamMessage = await new Promise<IncomingMessage>((resolve, reject) => {
let upstreamReq = http.request(opts, (res) => {
resolve(res);
});
upstreamReq.end();
});

return {
headers: upstreamMessage.headers,
rawBody: upstreamMessage as Readable
};
});

let response = await fetch(server.urlFor("/mocked-endpoint"));

expect(response.status).to.equal(200);
let resJson = await response.json();
expect(resJson["hello"]).to.equal("world!");
expect(response.headers.get("x-test")).to.equal("success");
});

it("should allow mocking the status with a callback", async () => {
await server.forGet("/mocked-endpoint").thenCallback(() => {
return { statusCode: 204, statusMessage: 'all good' }
Expand Down