diff --git a/examples/main.go b/examples/main.go index 66157e8..40d087c 100644 --- a/examples/main.go +++ b/examples/main.go @@ -1,8 +1,6 @@ package main import ( - "bufio" - "bytes" "context" "fmt" "log" @@ -10,7 +8,7 @@ import ( "strconv" "time" - "github.com/valyala/fasthttp" + "github.com/katallaxie/pkg/server" htmx "github.com/zeiss/fiber-htmx" "github.com/zeiss/fiber-htmx/components/alerts" "github.com/zeiss/fiber-htmx/components/avatars" @@ -26,6 +24,7 @@ import ( "github.com/zeiss/fiber-htmx/components/tables" "github.com/zeiss/fiber-htmx/components/tailwind" "github.com/zeiss/fiber-htmx/components/toasts" + "github.com/zeiss/fiber-htmx/sse" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/logger" @@ -354,7 +353,7 @@ func (c *exampleController) Get() error { htmx.Div( htmx.HxSSE(), htmx.HxSSEConnect("/sse"), - htmx.HxSSESwap("message"), + htmx.HxSSESwap("demo"), ), htmx.Div( htmx.ID("events"), @@ -535,65 +534,68 @@ func (c *exampleController) Get() error { ) } -func run(_ context.Context) error { - log.SetFlags(0) - log.SetOutput(os.Stderr) +type webSrv struct { + factory sse.SenderFactory +} - app := fiber.New() - app.Use(requestid.New()) - app.Use(logger.New()) - app.Use(recover.New()) - app.Use(htmx.NewHtmxMessageHandler()) - - app.Get("/", htmx.NewHxControllerHandler(func() htmx.Controller { - return &exampleController{} - })) - - app.Get("/sse", func(c *fiber.Ctx) error { - c.Set("Content-Type", "text/event-stream") - c.Set("Cache-Control", "no-cache") - c.Set("Connection", "keep-alive") - c.Set("Transfer-Encoding", "chunked") - - c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - fmt.Println("WRITER") - var i int - for { - i++ - msg := fmt.Sprintf("%d - the time is %v", i, time.Now()) - - var buf bytes.Buffer - err := htmx.Div(htmx.ID("message"), htmx.Text(msg)).Render(&buf) - if err != nil { - break - } - - fmt.Fprintf(w, "event: %s\n", "message") - fmt.Fprintf(w, "data: %s\n\n", buf.String()) - - err = w.Flush() - if err != nil { - // Refreshing page in web browser will establish a new - // SSE connection, but only (the last) one is alive, so - // dead connections must be closed here. - fmt.Printf("Error while flushing: %v. Closing http connection.\n", err) - - break - } - time.Sleep(2 * time.Second) - } +func (w *webSrv) Start(ctx context.Context, ready server.ReadyFunc, run server.RunFunc) func() error { + return func() error { + app := fiber.New() + app.Use(requestid.New()) + app.Use(logger.New()) + app.Use(recover.New()) + app.Use(htmx.NewHtmxMessageHandler()) + + app.Get("/", htmx.NewHxControllerHandler(func() htmx.Controller { + return &exampleController{} })) + app.Get("/sse", sse.NewSSEHandler(w.factory)) + + app.Post("/error", htmx.NewHxControllerHandler(func() htmx.Controller { + return &exampleController{} + })) + + err := app.Listen(cfg.Flags.Addr) + if err != nil { + log.Fatal(err) + } + return nil - }) + } +} + +func run(ctx context.Context) error { + log.SetFlags(0) + log.SetOutput(os.Stderr) + + broadcast := sse.NewBroadcastManager(5) - app.Post("/error", htmx.NewHxControllerHandler(func() htmx.Controller { - return &exampleController{} - })) + webSrv := &webSrv{ + factory: broadcast.CreateSender(), + } + + s, _ := server.WithContext(ctx) + + s.Listen(broadcast, true) + s.Listen(webSrv, true) + + ticker := time.NewTicker(2 * time.Second) + + go func() { + for { + select { + case <-ctx.Done(): + return + case t := <-ticker.C: + broadcast.Send(sse.NewMessage("demo", fmt.Sprintf("Hello, World! %s", t))) + } + } + }() - err := app.Listen(cfg.Flags.Addr) + err := s.Wait() if err != nil { - log.Fatal(err) + return err } return nil diff --git a/sse/sse.go b/sse/sse.go index bb20479..952693d 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -111,7 +111,7 @@ func (c *ClientImpl) Events() chan Event { // Close closes the client. func (c *ClientImpl) Close() { - close(c.events) + <-c.events } // NewClient creates a new client. @@ -177,31 +177,23 @@ func NewSSEHandler(sender SenderFactory, config ...Config) fiber.Handler { c.Set(fiber.HeaderTransferEncoding, "chunked") s := sender(c) // return a sender - defer s.Close() c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - for { - select { - case msg, ok := <-s.Events(): - if !ok { - return - } - - _, err := fmt.Fprint(w, msg.String()) - if err != nil { - return - } + defer s.Close() + for msg := range s.Events() { + _, err := fmt.Fprint(w, msg.String()) + if err != nil { + return + } - err = w.Flush() - if err != nil { - // Refreshing page in web browser will establish a new - // SSE connection, but only (the last) one is alive, so - // dead connections must be closed here. - return - } - case <-c.Context().Done(): + err = w.Flush() + if err != nil { + // Refreshing page in web browser will establish a new + // SSE connection, but only (the last) one is alive, so + // dead connections must be closed here. return } + } }))