Skip to content

Commit

Permalink
feat(websocket): use redis bloom filter (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZigBalthazar authored Oct 15, 2024
1 parent 9fac20a commit d773c8c
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ linters:
- asciicheck
- bidichk
- bodyclose
- contextcheck
- decorder
- dogsled
- dupword
Expand Down Expand Up @@ -84,6 +83,7 @@ linters-settings:
enable-all: true
disable:
- fieldalignment
- contextcheck

predeclared:
# Comma-separated list of predeclared identifiers to not report on.
Expand Down
4 changes: 4 additions & 0 deletions config/.develop.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@

# your MongoDB URI for development. you can use dev docker compose to run it.
IMMO_MONGO_URI="mongodb://username:password@host1:27017,host2:27017,host3:27017/mydatabase?replicaSet=myReplicaSet&authSource=admin&readPreference=primary&ssl=true&retryWrites=true&w=majority"


# your Redis URI for development. you can use dev docker compose to run it.
IMMO_REDIS_URI="redis://localhost:6379"
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"

"github.com/dezh-tech/immortal/database"
"github.com/dezh-tech/immortal/relay/redis"
"github.com/dezh-tech/immortal/server/http"
"github.com/dezh-tech/immortal/server/websocket"
"github.com/dezh-tech/immortal/types/nip11"
Expand All @@ -17,6 +18,7 @@ type Config struct {
WebsocketServer websocket.Config `yaml:"ws_server"`
HTTPServer http.Config `yaml:"http_server"`
Database database.Config `yaml:"database"`
RedisConf redis.Config `yaml:"redis"`
Parameters *Parameters
}

Expand Down Expand Up @@ -49,6 +51,7 @@ func Load(path string) (*Config, error) {
}

config.Database.URI = os.Getenv("IMMO_MONGO_URI")
config.RedisConf.URI = os.Getenv("IMMO_REDIS_URI")

if err = config.basicCheck(); err != nil {
return nil, Error{
Expand Down
22 changes: 14 additions & 8 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@ ws_server:
# default is 7777.
port: 7777

# known_bloom_size is the size of bloom filter to check and avoid trying to store/broadcast existing events.
# default is 1M events.
known_bloom_size: 1000000

# bloom_backup_path is the path to store bloom filter when relay shutdown.
# default is immo_bloom_backup.
bloom_backup_path: "immo_bloom_backup"

# http_server contains information about http server.
http_server:
# bind is the IP address to be bind and listen on.
Expand All @@ -49,3 +41,17 @@ database:
# connection_timeout_in_ms specifies the maximum duration (in milliseconds) that is used for creating connections to the server.
# default is 5000.
connection_timeout_in_ms: 5000

# redis contains details of redis connections and limitations.
redis:
# query_timeout_in_ms specifies the maximum duration (in milliseconds) for query execution before timing out.
# default is 3000.
query_timeout_in_ms: 3000

# connection_timeout_in_ms specifies the maximum duration (in milliseconds) that is used for creating connections to the server.
# default is 5000.
connection_timeout_in_ms: 5000

# bloom_name specifies the name of bloom filter key
# default is IMMO_BLOOM.
bloom_name: "IMMO_BLOOM"
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ module github.com/dezh-tech/immortal
go 1.22.5

require (
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/mailru/easyjson v0.7.7
github.com/prometheus/client_golang v1.20.4
github.com/redis/go-redis/v9 v9.6.2
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.18.0
go.mongodb.org/mongo-driver v1.17.1
Expand All @@ -18,12 +18,12 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.14.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
Expand Down
15 changes: 8 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.14.3 h1:Gd2c8lSNf9pKXom5JtD7AaKO8o7fGQ2LtFj1436qilA=
github.com/bits-and-blooms/bitset v1.14.3/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls=
github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ=
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
Expand All @@ -18,6 +17,8 @@ github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U
github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand Down Expand Up @@ -54,6 +55,8 @@ github.com/prometheus/common v0.60.0 h1:+V9PAREWNvJMAuJ1x1BaWl9dewMW4YrHZQbx0sJN
github.com/prometheus/common v0.60.0/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk=
github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand All @@ -65,8 +68,6 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down
39 changes: 39 additions & 0 deletions relay/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package redis

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

type Redis struct {
Client *redis.Client
BloomName string
QueryTimeout time.Duration
}

func New(cfg Config) (*Redis, error) {
opts, err := redis.ParseURL(cfg.URI)
if err != nil {
return nil, err
}

rc := redis.NewClient(opts)

// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.ConnectionTimeout)*time.Millisecond)
defer cancel()

// Test the connection
if err := rc.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("could not connect to Redis: %w", err)
}

return &Redis{
Client: rc,
BloomName: cfg.BloomName,
QueryTimeout: time.Duration(cfg.QueryTimeout) * time.Millisecond,
}, nil
}
8 changes: 8 additions & 0 deletions relay/redis/redis_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package redis

type Config struct {
URI string
BloomName string `yaml:"bloom_name"`
ConnectionTimeout int16 `yaml:"connection_timeout_in_ms"`
QueryTimeout int16 `yaml:"query_timeout_in_ms"`
}
10 changes: 9 additions & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/dezh-tech/immortal/database"
"github.com/dezh-tech/immortal/handler"
"github.com/dezh-tech/immortal/metrics"
"github.com/dezh-tech/immortal/relay/redis"
"github.com/dezh-tech/immortal/server/http"
"github.com/dezh-tech/immortal/server/websocket"
)
Expand All @@ -17,6 +18,7 @@ type Relay struct {
websocketServer *websocket.Server
httpServer *http.Server
database *database.Database
redis *redis.Redis
}

// NewRelay creates a new relay.
Expand All @@ -35,7 +37,12 @@ func New(cfg *config.Config) (*Relay, error) {

m := metrics.New()

ws, err := websocket.New(cfg.WebsocketServer, cfg.GetNIP11Documents(), h, m)
r, err := redis.New(cfg.RedisConf)
if err != nil {
return nil, err
}

ws, err := websocket.New(cfg.WebsocketServer, cfg.GetNIP11Documents(), h, m, r)
if err != nil {
return nil, err
}
Expand All @@ -50,6 +57,7 @@ func New(cfg *config.Config) (*Relay, error) {
websocketServer: ws,
database: db,
httpServer: hs,
redis: r,
}, nil
}

Expand Down
8 changes: 3 additions & 5 deletions server/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ type Limitation struct {
}

type Config struct {
Bind string `yaml:"bind"`
Port uint16 `yaml:"port"`
BloomBackupPath string `yaml:"bloom_backup_path"`
KnownBloomSize uint `yaml:"known_bloom_size"`
Limitation *Limitation
Bind string `yaml:"bind"`
Port uint16 `yaml:"port"`
Limitation *Limitation
}
68 changes: 29 additions & 39 deletions server/websocket/server.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package websocket

import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"sync"

"github.com/bits-and-blooms/bloom/v3"
"github.com/dezh-tech/immortal/handler"
"github.com/dezh-tech/immortal/metrics"
"github.com/dezh-tech/immortal/relay/redis"
"github.com/dezh-tech/immortal/types/filter"
"github.com/dezh-tech/immortal/types/message"
"github.com/dezh-tech/immortal/types/nip11"
Expand All @@ -27,35 +27,25 @@ var upgrader = websocket.Upgrader{
type Server struct {
mu sync.RWMutex

config Config
knownEvents *bloom.BloomFilter
conns map[*websocket.Conn]clientState
handlers *handler.Handler
nip11Doc *nip11.RelayInformationDocument
metrics *metrics.Metrics
config Config
conns map[*websocket.Conn]clientState
handlers *handler.Handler
nip11Doc *nip11.RelayInformationDocument
metrics *metrics.Metrics
redis *redis.Redis
}

func New(cfg Config, nip11Doc *nip11.RelayInformationDocument,
h *handler.Handler, m *metrics.Metrics,
h *handler.Handler, m *metrics.Metrics, r *redis.Redis,
) (*Server, error) {
seb := bloom.NewWithEstimates(cfg.KnownBloomSize, 0.9)

f, err := os.Open(cfg.BloomBackupPath)
if err == nil {
_, err = seb.ReadFrom(f)
if err != nil {
return nil, fmt.Errorf("server: loading bloom: %s", err.Error())
}
}

return &Server{
config: cfg,
knownEvents: seb,
conns: make(map[*websocket.Conn]clientState),
mu: sync.RWMutex{},
nip11Doc: nip11Doc,
handlers: h,
metrics: m,
config: cfg,
conns: make(map[*websocket.Conn]clientState),
mu: sync.RWMutex{},
nip11Doc: nip11Doc,
handlers: h,
metrics: m,
redis: r,
}, nil
}

Expand Down Expand Up @@ -265,7 +255,15 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {

eID := msg.Event.GetRawID()

if s.knownEvents.Test(eID[:]) {
qCtx, cancel := context.WithTimeout(context.Background(), s.redis.QueryTimeout)
defer cancel()

exists, err := s.redis.Client.BFExists(qCtx, s.redis.BloomName, eID[:]).Result()
if err != nil {
log.Printf("error: checking bloom filter: %s", err.Error())
}

if exists {
okm := message.MakeOK(true, msg.Event.ID, "")
_ = conn.WriteMessage(1, okm)

Expand Down Expand Up @@ -299,11 +297,13 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {

return
}

_ = conn.WriteMessage(1, message.MakeOK(true, msg.Event.ID, ""))
}

s.knownEvents.Add(eID[:])
_, err = s.redis.Client.BFAdd(qCtx, s.redis.BloomName, eID[:]).Result()
if err != nil {
log.Printf("error: checking bloom filter: %s", err.Error())
}

// todo::: can we run goroutines per client?
for conn, client := range s.conns {
Expand Down Expand Up @@ -375,15 +375,5 @@ func (s *Server) Stop() error {
client.Unlock()
}

f, err := os.Create(s.config.BloomBackupPath)
if err != nil {
return fmt.Errorf("error: creating new file for blooms: %s", err.Error())
}

_, err = s.knownEvents.WriteTo(f)
if err != nil {
return fmt.Errorf("error: writing bloom filter to disck: %s", err.Error())
}

return nil
}

0 comments on commit d773c8c

Please sign in to comment.