How would I use Cloudflare D1-Backed Chat Memory AND stream the response back to the user? #7304
-
Checked other resources
Commit to Help
Example Codeconst parser = new HttpResponseOutputParser({ contentType: "text/event-stream" });
try {
const stream = await chain.pipe(parser).stream(chainInput);
await memory.saveContext(chainInput, {
output: stream,
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
} catch (err) {
console.error("Error during chain execution:", err);
} DescriptionI'm trying to use Cloudflare D1-Backed Chat Memory so that I can have persistent storage for each chat session, however I also want to stream the response back to the user using an event stream so that the user can receive the response faster. The problem is i receive back a ReadableStream which is not text so the memory.saveContext function does not accept it. If i read the ReadableStream so that I can get the text and save it, I'm not able to return the stream back to the user as the ReadableStream can only be read once. Is there a better way of approaching this? System Infolangchain@0.3.6 |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Hey @StavrosHacker! 👋 I'm here to help you with your code and answer any questions you have. Let's dive into this together! To handle both storing the response in Cloudflare D1-Backed Chat Memory and streaming it back to the user, you can use a technique to duplicate the stream. Since a
Here's an example of how you might implement this: async function streamToString(stream: ReadableStream): Promise<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let result = '';
let done = false;
while (!done) {
const { value, done: doneReading } = await reader.read();
done = doneReading;
if (value) {
result += decoder.decode(value, { stream: true });
}
}
return result;
}
async function handleRequest(chainInput) {
const parser = new HttpResponseOutputParser({ contentType: "text/event-stream" });
try {
const stream = await chain.pipe(parser).stream(chainInput);
// Convert the stream to a string
const responseText = await streamToString(stream);
// Save the context
await memory.saveContext(chainInput, { output: responseText });
// Create a new stream from the response text
const newStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(responseText));
controller.close();
}
});
return new Response(newStream, {
headers: { "Content-Type": "text/event-stream" },
});
} catch (err) {
console.error("Error during chain execution:", err);
}
} This code reads the original stream into a string, saves it to memory, and then creates a new stream from the stored string to send back to the user. This way, you can both store the response and stream it back to the user efficiently. |
Beta Was this translation helpful? Give feedback.
-
I ended up using a TransformStream to get each chunk and concatenate it together while the stream is being returned to the user, and once the stream is completed, it the saves the full string to D1. Fairly simple, for more info look at this: https://developers.cloudflare.com/workers/runtime-apis/streams/ |
Beta Was this translation helpful? Give feedback.
I ended up using a TransformStream to get each chunk and concatenate it together while the stream is being returned to the user, and once the stream is completed, it the saves the full string to D1. Fairly simple, for more info look at this: https://developers.cloudflare.com/workers/runtime-apis/streams/