diff --git a/src/components/dev-config.tsx b/src/components/dev-config.tsx index 5d666da..0ad8d46 100644 --- a/src/components/dev-config.tsx +++ b/src/components/dev-config.tsx @@ -1,4 +1,3 @@ -import * as process from "process" import { useState } from "react" import { devEnabledAtom } from "../../packages/common-dev/store" import { IconContainer } from "../../packages/common-ui/components/icon-container" diff --git a/src/llm-pusher.manager.ts b/src/llm-pusher.manager.ts new file mode 100644 index 0000000..2a7bcdd --- /dev/null +++ b/src/llm-pusher.manager.ts @@ -0,0 +1,81 @@ +import { + pusherServerConfigs, + PusherServerId, + initPusherServer, + ITransEvent, + ResponseFinalStatus, + ResponseStatus, +} from "@cs-magic/common" +import { redis } from "@cs-magic/common/dist/db/redis.js" + +import Pusher from "pusher" + +import { ILLMManagerPusher } from "@cs-magic/llm" + +/** + * + * note: + * - 因为server-action的机制, 所有state需要用redis等中心化管理 + */ +export class PusherLLMManager implements ILLMManagerPusher { + private pusher: Pusher + private channel: string + + constructor(channelId: string, pusherServerId: PusherServerId) { + this.channel = channelId + this.pusher = initPusherServer(pusherServerConfigs[pusherServerId]) + } + + ////////////////////////////// + // state + ////////////////////////////// + public get status() { + return redis.get(this.channel) + } + + public setStatus(status: ResponseStatus) { + return redis.set(this.channel, status) + } + + ////////////////////////////// + // server + ////////////////////////////// + async onTriggerStarts() { + await this.setStatus("responding") + await this.push({ event: "init", data: {} }) + } + + async onTriggerEnds(reason: ResponseFinalStatus) { + await this.setStatus(reason) + await this.push({ event: "close", data: { reason } }) + } + + async onEvent(event: ITransEvent) { + await this.push(event) + } + + ////////////////////////////// + // client + ////////////////////////////// + + async onClientConnected(clientId: string) { + await this.push({ event: "onClientConnected", data: { id: clientId } }) + } + + async onClientDisconnected(clientId: string) { + await this.push({ + event: "onClientDisconnected", + data: { id: clientId }, + }) + } + + ////////////////////////////// + // general + ////////////////////////////// + + private async push(event: ITransEvent) { + // event.data = { time: Date.now(), ...event.data } + console.log(`[PUSHER] >> ${this.channel}: `, event) + await this.pusher.trigger(this.channel, event.event, event.data) + } +} diff --git a/src/llm-sse.route.ts b/src/llm-sse.route.ts new file mode 100644 index 0000000..55bf167 --- /dev/null +++ b/src/llm-sse.route.ts @@ -0,0 +1,54 @@ +import type { ITransEvent } from "@cs-magic/common" + +import { llmEncoder, StaticLLMManager } from "@cs-magic/llm" +import { nanoid } from "nanoid" +import { NextRequest } from "next/server" + +export async function GET(req: NextRequest) { + const triggerId = new URL(req.url as string).searchParams.get("r") ?? "" + const clientId = nanoid() + + const responseStream = new TransformStream() + const writer = responseStream.writable.getWriter() + + const write = async (event: ITransEvent) => { + // console.log("[sse] client --> user: ", event) + // 要额外加一个 \n,否则不符合格式规范 + await writer.write( + llmEncoder.encode( + `event: ${event.event}\ndata: ${JSON.stringify(event?.data ?? "")}\n\n`, + ), + ) + } + + const llmManager = new StaticLLMManager(triggerId) + + // NOTE: 不这么做服务器会报错,ref: https://github.com/vercel/next/discussions/61972#discussioncomment-8545109 + req.signal.onabort = async () => { + console.log(`Client(id=${clientId}) aborted connection.`) + + // 1. 先写 + await write({ event: "error", data: { message: "您已中止" } }) + await writer.close() + // 2. 再移除(2要在1之后) + await llmManager.onClientDisconnected(clientId) + } + + void llmManager.onClientConnected({ + id: clientId, + // todo: onEvent in serialization approach like Redis? + onEvent: async (sseEvent) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + await write(sseEvent) + if (sseEvent.event === "close") await writer.close() + }, + }) + + return new Response(responseStream.readable, { + headers: { + "Content-Type": "text/event-stream", + Connection: "keep-alive", + "Cache-Control": "no-cache, no-transform", + }, + }) +}