Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkShawn2020 committed Aug 13, 2024
1 parent be24627 commit 163d1ab
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
1 change: 0 additions & 1 deletion src/components/dev-config.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as process from "process"
import { useState } from "react"
import { devEnabledAtom } from "../../packages/common-dev/store"
import { IconContainer } from "../../packages/common-ui/components/icon-container"
Expand Down
81 changes: 81 additions & 0 deletions src/llm-pusher.manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import {
pusherServerConfigs,
PusherServerId,
initPusherServer,
ITransEvent,
ResponseFinalStatus,
ResponseStatus,
} from "@cs-magic/common"
import { redis } from "@cs-magic/common/dist/db/redis.js"

import Pusher from "pusher"

import { ILLMManagerPusher } from "@cs-magic/llm"

/**
*
* note:
* - 因为server-action的机制, 所有state需要用redis等中心化管理
*/
export class PusherLLMManager implements ILLMManagerPusher {
private pusher: Pusher
private channel: string

constructor(channelId: string, pusherServerId: PusherServerId) {
this.channel = channelId
this.pusher = initPusherServer(pusherServerConfigs[pusherServerId])
}

//////////////////////////////
// state
//////////////////////////////
public get status() {
return redis.get(this.channel)
}

public setStatus(status: ResponseStatus) {
return redis.set(this.channel, status)
}

//////////////////////////////
// server
//////////////////////////////
async onTriggerStarts() {
await this.setStatus("responding")
await this.push({ event: "init", data: {} })
}

async onTriggerEnds(reason: ResponseFinalStatus) {
await this.setStatus(reason)
await this.push({ event: "close", data: { reason } })
}

async onEvent(event: ITransEvent) {
await this.push(event)
}

//////////////////////////////
// client
//////////////////////////////

async onClientConnected(clientId: string) {
await this.push({ event: "onClientConnected", data: { id: clientId } })
}

async onClientDisconnected(clientId: string) {
await this.push({
event: "onClientDisconnected",
data: { id: clientId },
})
}

//////////////////////////////
// general
//////////////////////////////

private async push(event: ITransEvent) {
// event.data = { time: Date.now(), ...event.data }
console.log(`[PUSHER] >> ${this.channel}: `, event)
await this.pusher.trigger(this.channel, event.event, event.data)
}
}
54 changes: 54 additions & 0 deletions src/llm-sse.route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { ITransEvent } from "@cs-magic/common"

import { llmEncoder, StaticLLMManager } from "@cs-magic/llm"
import { nanoid } from "nanoid"
import { NextRequest } from "next/server"

export async function GET(req: NextRequest) {
const triggerId = new URL(req.url as string).searchParams.get("r") ?? ""
const clientId = nanoid()

const responseStream = new TransformStream()
const writer = responseStream.writable.getWriter()

const write = async (event: ITransEvent) => {
// console.log("[sse] client --> user: ", event)
// 要额外加一个 \n,否则不符合格式规范
await writer.write(
llmEncoder.encode(
`event: ${event.event}\ndata: ${JSON.stringify(event?.data ?? "")}\n\n`,
),
)
}

const llmManager = new StaticLLMManager(triggerId)

// NOTE: 不这么做服务器会报错,ref: https://github.com/vercel/next/discussions/61972#discussioncomment-8545109
req.signal.onabort = async () => {
console.log(`Client(id=${clientId}) aborted connection.`)

// 1. 先写
await write({ event: "error", data: { message: "您已中止" } })
await writer.close()
// 2. 再移除(2要在1之后)
await llmManager.onClientDisconnected(clientId)
}

void llmManager.onClientConnected({
id: clientId,
// todo: onEvent in serialization approach like Redis?
onEvent: async (sseEvent) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await write(sseEvent)
if (sseEvent.event === "close") await writer.close()
},
})

return new Response(responseStream.readable, {
headers: {
"Content-Type": "text/event-stream",
Connection: "keep-alive",
"Cache-Control": "no-cache, no-transform",
},
})
}

0 comments on commit 163d1ab

Please sign in to comment.