Skip to content

Commit

Permalink
added pooling support for wombat
Browse files Browse the repository at this point in the history
Signed-off-by: Daan Gerits <daan.gerits@gmail.com>
  • Loading branch information
calmera committed Jun 11, 2024
1 parent cdf7d3f commit d692a5c
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 51 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/Jeffail/shutdown v1.0.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/google/uuid v1.6.0
github.com/nats-io/nats.go v1.35.0
github.com/nats-io/nkeys v0.4.7
github.com/ory/dockertest/v3 v3.10.0
Expand Down Expand Up @@ -165,7 +166,6 @@ require (
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/css v1.0.0 // indirect
Expand Down
54 changes: 32 additions & 22 deletions internal/components/nats/connection.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nats

import (
"context"
"crypto/tls"
"github.com/redpanda-data/benthos/v4/public/service"
"strings"
Expand Down Expand Up @@ -30,60 +29,71 @@ func connectionHeadFields() []*service.ConfigField {
Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.").
Example([]string{"nats://127.0.0.1:4222"}).
Example([]string{"nats://username:password@127.0.0.1:4222"}),
service.NewStringField("name").
Description("An optional name to assign to the connection. If not set, will default to the label").
Default(""),
}
}

func connectionTailFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewTLSToggledField("tls"),
authFieldSpec(),
service.NewStringField("pool_key").
Description("The connection pool key to use. Components using the same poolKey will share their connection").
Default("default").
Advanced(),
}
}

type connectionDetails struct {
label string

Check failure on line 50 in internal/components/nats/connection.go

View workflow job for this annotation

GitHub Actions / golangci-lint

field `label` is unused (unused)
logger *service.Logger

Check failure on line 51 in internal/components/nats/connection.go

View workflow job for this annotation

GitHub Actions / golangci-lint

field `logger` is unused (unused)
tlsConf *tls.Config

Check failure on line 52 in internal/components/nats/connection.go

View workflow job for this annotation

GitHub Actions / golangci-lint

field `tlsConf` is unused (unused)
authConf authConfig
fs *service.FS

Check failure on line 53 in internal/components/nats/connection.go

View workflow job for this annotation

GitHub Actions / golangci-lint

field `fs` is unused (unused)
poolKey string
urls string
opts []nats.Option
authConf authConfig
}

func connectionDetailsFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (c connectionDetails, err error) {
c.label = mgr.Label()
c.fs = mgr.FS()
c.logger = mgr.Logger()

func connectionDetailsFromParsed(conf *service.ParsedConfig, mgr *service.Resources, extraOpts ...nats.Option) (c connectionDetails, err error) {
var urlList []string
if urlList, err = conf.FieldStringList("urls"); err != nil {
return
}
c.urls = strings.Join(urlList, ",")

if c.poolKey, err = conf.FieldString("pool_key"); err != nil {
return
}

var name string
if name, err = conf.FieldString("name"); err != nil {
return
}
if name == "" {
name = mgr.Label()
}
c.opts = append(c.opts, nats.Name(name))

var tlsEnabled bool
if c.tlsConf, tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil {
var tlsConf *tls.Config
if tlsConf, tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil {
return
}
if !tlsEnabled {
c.tlsConf = nil
if tlsEnabled && tlsConf != nil {
c.opts = append(c.opts, nats.Secure(tlsConf))
}

if c.authConf, err = authFromParsedConfig(conf.Namespace("auth")); err != nil {
return
}
return
}

func (c *connectionDetails) get(_ context.Context, extraOpts ...nats.Option) (*nats.Conn, error) {
var opts []nats.Option
if c.tlsConf != nil {
opts = append(opts, nats.Secure(c.tlsConf))
}
opts = append(opts, nats.Name(c.label))
opts = append(opts, errorHandlerOption(c.logger))
opts = append(opts, authConfToOptions(c.authConf, c.fs)...)
opts = append(opts, extraOpts...)
return nats.Connect(c.urls, opts...)
c.opts = append(c.opts, authConfToOptions(c.authConf, mgr.FS())...)
c.opts = append(c.opts, errorHandlerOption(mgr.Logger()))
c.opts = append(c.opts, extraOpts...)
return
}

func errorHandlerOption(logger *service.Logger) nats.Option {
Expand Down
10 changes: 8 additions & 2 deletions internal/components/nats/core_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nats
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/redpanda-data/benthos/v4/public/service"
"sync"
"time"
Expand Down Expand Up @@ -88,12 +89,17 @@ type natsReader struct {
natsChan chan *nats.Msg
interruptChan chan struct{}
interruptOnce sync.Once

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newNATSReader(conf *service.ParsedConfig, mgr *service.Resources) (*natsReader, error) {
n := natsReader{
log: mgr.Logger(),
interruptChan: make(chan struct{}),
pcid: uuid.New().String(),
}

var err error
Expand Down Expand Up @@ -139,7 +145,7 @@ func (n *natsReader) Connect(ctx context.Context) error {
var natsSub *nats.Subscription
var err error

if natsConn, err = n.connDetails.get(ctx); err != nil {
if natsConn, err = pool.Get(ctx, n.pcid, n.connDetails); err != nil {
return err
}

Expand Down Expand Up @@ -170,7 +176,7 @@ func (n *natsReader) disconnect() {
n.natsSub = nil
}
if n.natsConn != nil {
n.natsConn.Close()
_ = pool.Release(n.pcid, n.connDetails)
n.natsConn = nil
}
n.natsChan = nil
Expand Down
12 changes: 9 additions & 3 deletions internal/components/nats/core_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/redpanda-data/benthos/v4/public/service"
"sync"

Expand Down Expand Up @@ -69,12 +70,17 @@ type natsWriter struct {

natsConn *nats.Conn
connMut sync.RWMutex

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newNATSWriter(conf *service.ParsedConfig, mgr *service.Resources) (*natsWriter, error) {
n := natsWriter{
log: mgr.Logger(),
headers: make(map[string]*service.InterpolatedString),
pcid: uuid.New().String(),
}

var err error
Expand Down Expand Up @@ -111,7 +117,7 @@ func (n *natsWriter) Connect(ctx context.Context) error {
}

var err error
if n.natsConn, err = n.connDetails.get(ctx); err != nil {
if n.natsConn, err = pool.Get(ctx, n.pcid, n.connDetails); err != nil {
return err
}
return err
Expand Down Expand Up @@ -156,7 +162,7 @@ func (n *natsWriter) Write(context context.Context, msg *service.Message) error
}

if err = conn.PublishMsg(nMsg); errors.Is(err, nats.ErrConnectionClosed) {
conn.Close()
_ = pool.Release(n.pcid, n.connDetails)
n.connMut.Lock()
n.natsConn = nil
n.connMut.Unlock()
Expand All @@ -170,7 +176,7 @@ func (n *natsWriter) Close(context.Context) (err error) {
defer n.connMut.Unlock()

if n.natsConn != nil {
n.natsConn.Close()
_ = pool.Release(n.pcid, n.connDetails)
n.natsConn = nil
}
return
Expand Down
12 changes: 9 additions & 3 deletions internal/components/nats/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nats
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/nats-io/nats.go/jetstream"
"github.com/redpanda-data/benthos/v4/public/service"
"sync"
Expand Down Expand Up @@ -45,12 +46,17 @@ type kvCache struct {
connMut sync.RWMutex
natsConn *nats.Conn
kv jetstream.KeyValue

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newKVCache(conf *service.ParsedConfig, mgr *service.Resources) (*kvCache, error) {
p := &kvCache{
log: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
pcid: uuid.New().String(),
}

var err error
Expand All @@ -71,7 +77,7 @@ func (p *kvCache) disconnect() {
defer p.connMut.Unlock()

if p.natsConn != nil {
p.natsConn.Close()
_ = pool.Release(p.pcid, p.connDetails)
p.natsConn = nil
}
p.kv = nil
Expand All @@ -86,13 +92,13 @@ func (p *kvCache) connect(ctx context.Context) error {
}

var err error
if p.natsConn, err = p.connDetails.get(ctx); err != nil {
if p.natsConn, err = pool.Get(ctx, p.pcid, p.connDetails); err != nil {
return err
}

defer func() {
if err != nil {
p.natsConn.Close()
_ = pool.Release(p.pcid, p.connDetails)
p.natsConn = nil
}
}()
Expand Down
12 changes: 9 additions & 3 deletions internal/components/nats/kv_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nats

import (
"context"
"github.com/google/uuid"
"github.com/redpanda-data/benthos/v4/public/service"
"sync"

Expand Down Expand Up @@ -91,12 +92,17 @@ type kvReader struct {
connMut sync.Mutex
natsConn *nats.Conn
watcher jetstream.KeyWatcher

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader, error) {
r := &kvReader{
log: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
pcid: uuid.New().String(),
}

var err error
Expand Down Expand Up @@ -141,12 +147,12 @@ func (r *kvReader) Connect(ctx context.Context) (err error) {
_ = r.watcher.Stop()
}
if r.natsConn != nil {
r.natsConn.Close()
_ = pool.Release(r.pcid, r.connDetails)
}
}
}()

if r.natsConn, err = r.connDetails.get(ctx); err != nil {
if r.natsConn, err = pool.Get(ctx, r.pcid, r.connDetails); err != nil {
return err
}

Expand Down Expand Up @@ -187,7 +193,7 @@ func (r *kvReader) disconnect() {
r.watcher = nil
}
if r.natsConn != nil {
r.natsConn.Close()
_ = pool.Release(r.pcid, r.connDetails)
r.natsConn = nil
}
}
Expand Down
12 changes: 9 additions & 3 deletions internal/components/nats/kv_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nats

import (
"context"
"github.com/google/uuid"
"github.com/redpanda-data/benthos/v4/public/service"
"sync"

Expand Down Expand Up @@ -68,12 +69,17 @@ type kvOutput struct {
keyValue jetstream.KeyValue

shutSig *shutdown.Signaller

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newKVOutput(conf *service.ParsedConfig, mgr *service.Resources) (*kvOutput, error) {
kv := kvOutput{
log: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
pcid: uuid.New().String(),
}

var err error
Expand Down Expand Up @@ -109,11 +115,11 @@ func (kv *kvOutput) Connect(ctx context.Context) (err error) {

defer func() {
if err != nil && natsConn != nil {
natsConn.Close()
_ = pool.Release(kv.pcid, kv.connDetails)
}
}()

if natsConn, err = kv.connDetails.get(ctx); err != nil {
if natsConn, err = pool.Get(ctx, kv.pcid, kv.connDetails); err != nil {
return err
}

Expand All @@ -136,7 +142,7 @@ func (kv *kvOutput) disconnect() {
defer kv.connMut.Unlock()

if kv.natsConn != nil {
kv.natsConn.Close()
_ = pool.Release(kv.pcid, kv.connDetails)
kv.natsConn = nil
}
kv.keyValue = nil
Expand Down
Loading

0 comments on commit d692a5c

Please sign in to comment.