Skip to content

Commit

Permalink
wip: sse send channels broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
katallaxie authored Jul 8, 2024
1 parent b681c94 commit e700d82
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 77 deletions.
114 changes: 58 additions & 56 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package main

import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os"
"strconv"
"time"

"github.com/valyala/fasthttp"
"github.com/katallaxie/pkg/server"
htmx "github.com/zeiss/fiber-htmx"
"github.com/zeiss/fiber-htmx/components/alerts"
"github.com/zeiss/fiber-htmx/components/avatars"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/zeiss/fiber-htmx/components/tables"
"github.com/zeiss/fiber-htmx/components/tailwind"
"github.com/zeiss/fiber-htmx/components/toasts"
"github.com/zeiss/fiber-htmx/sse"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
Expand Down Expand Up @@ -354,7 +353,7 @@ func (c *exampleController) Get() error {
htmx.Div(
htmx.HxSSE(),
htmx.HxSSEConnect("/sse"),
htmx.HxSSESwap("message"),
htmx.HxSSESwap("demo"),
),
htmx.Div(
htmx.ID("events"),
Expand Down Expand Up @@ -535,65 +534,68 @@ func (c *exampleController) Get() error {
)
}

func run(_ context.Context) error {
log.SetFlags(0)
log.SetOutput(os.Stderr)
type webSrv struct {
factory sse.SenderFactory
}

app := fiber.New()
app.Use(requestid.New())
app.Use(logger.New())
app.Use(recover.New())
app.Use(htmx.NewHtmxMessageHandler())

app.Get("/", htmx.NewHxControllerHandler(func() htmx.Controller {
return &exampleController{}
}))

app.Get("/sse", func(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")

c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
fmt.Println("WRITER")
var i int
for {
i++
msg := fmt.Sprintf("%d - the time is %v", i, time.Now())

var buf bytes.Buffer
err := htmx.Div(htmx.ID("message"), htmx.Text(msg)).Render(&buf)
if err != nil {
break
}

fmt.Fprintf(w, "event: %s\n", "message")
fmt.Fprintf(w, "data: %s\n\n", buf.String())

err = w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
fmt.Printf("Error while flushing: %v. Closing http connection.\n", err)

break
}
time.Sleep(2 * time.Second)
}
func (w *webSrv) Start(ctx context.Context, ready server.ReadyFunc, run server.RunFunc) func() error {
return func() error {
app := fiber.New()
app.Use(requestid.New())
app.Use(logger.New())
app.Use(recover.New())
app.Use(htmx.NewHtmxMessageHandler())

app.Get("/", htmx.NewHxControllerHandler(func() htmx.Controller {
return &exampleController{}
}))

app.Get("/sse", sse.NewSSEHandler(w.factory))

app.Post("/error", htmx.NewHxControllerHandler(func() htmx.Controller {
return &exampleController{}
}))

err := app.Listen(cfg.Flags.Addr)
if err != nil {
log.Fatal(err)
}

return nil
})
}
}

func run(ctx context.Context) error {
log.SetFlags(0)
log.SetOutput(os.Stderr)

broadcast := sse.NewBroadcastManager(5)

app.Post("/error", htmx.NewHxControllerHandler(func() htmx.Controller {
return &exampleController{}
}))
webSrv := &webSrv{
factory: broadcast.CreateSender(),
}

s, _ := server.WithContext(ctx)

s.Listen(broadcast, true)
s.Listen(webSrv, true)

ticker := time.NewTicker(2 * time.Second)

go func() {
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
broadcast.Send(sse.NewMessage("demo", fmt.Sprintf("Hello, World! %s", t)))
}
}
}()

err := app.Listen(cfg.Flags.Addr)
err := s.Wait()
if err != nil {
log.Fatal(err)
return err
}

return nil
Expand Down
34 changes: 13 additions & 21 deletions sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *ClientImpl) Events() chan Event {

// Close closes the client.
func (c *ClientImpl) Close() {
close(c.events)
<-c.events
}

// NewClient creates a new client.
Expand Down Expand Up @@ -177,31 +177,23 @@ func NewSSEHandler(sender SenderFactory, config ...Config) fiber.Handler {
c.Set(fiber.HeaderTransferEncoding, "chunked")

s := sender(c) // return a sender
defer s.Close()

c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
for {
select {
case msg, ok := <-s.Events():
if !ok {
return
}

_, err := fmt.Fprint(w, msg.String())
if err != nil {
return
}
defer s.Close()
for msg := range s.Events() {
_, err := fmt.Fprint(w, msg.String())
if err != nil {
return
}

err = w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
return
}
case <-c.Context().Done():
err = w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
return
}

}
}))

Expand Down

0 comments on commit e700d82

Please sign in to comment.