diff --git a/go.mod b/go.mod index 447300672d..362b6069fe 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/clbanning/mxj/v2 v2.5.5 github.com/colinmarc/hdfs v1.1.3 + github.com/couchbase/gocb/v2 v2.6.0 github.com/denisenkom/go-mssqldb v0.11.0 github.com/dgraph-io/ristretto v0.1.0 github.com/dustin/go-humanize v1.0.0 @@ -162,6 +163,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cockroachdb/apd/v2 v2.0.1 // indirect github.com/containerd/continuity v0.2.2 // indirect + github.com/couchbase/gocbcore/v10 v10.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 516f707e1c..298d3de7ba 100644 --- a/go.sum +++ b/go.sum @@ -338,6 +338,12 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/couchbase/gocb/v2 v2.6.0 h1:DhkLNatDcddCcS411D6kNwZspSEAWVeI/N3abzt/HLc= +github.com/couchbase/gocb/v2 v2.6.0/go.mod h1:5su8b1gBF3V4j07SiGw+CA0bK9a84YWEb6UH7up0MEs= +github.com/couchbase/gocbcore/v10 v10.2.0 h1:ZoSBLtcmt+lXbxVVT4SAhXDVNR+D48iSOZWNzHucVVk= +github.com/couchbase/gocbcore/v10 v10.2.0/go.mod h1:qkPnOBziCs0guMEEvd0cRFo+AjOW0yEL99cU3I4n3Ao= +github.com/couchbaselabs/gocaves/client v0.0.0-20220223122017-22859b310bd2 h1:UlwJ2GWpZQAQCLHyO3xHKcqAjUUcX2w7FKpbxCIUQks= +github.com/couchbaselabs/gocaves/client v0.0.0-20220223122017-22859b310bd2/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= diff --git a/internal/impl/couchbase/client.go b/internal/impl/couchbase/client.go new file mode 100644 index 0000000000..eef35e0c56 --- /dev/null +++ b/internal/impl/couchbase/client.go @@ -0,0 +1,120 @@ +package couchbase + +import ( + "context" + "errors" + "fmt" + + "github.com/couchbase/gocb/v2" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase/client" + "github.com/benthosdev/benthos/v4/public/service" +) + +// ErrInvalidTranscoder specified transcoder is not supported. +var ErrInvalidTranscoder = errors.New("invalid transcoder") + +type couchbaseClient struct { + collection *gocb.Collection + cluster *gocb.Cluster +} + +func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseClient, error) { + + // retrieve params + url, err := conf.FieldString("url") + if err != nil { + return nil, err + } + bucket, err := conf.FieldString("bucket") + if err != nil { + return nil, err + } + timeout, err := conf.FieldDuration("timeout") + if err != nil { + return nil, err + } + + // setup couchbase + opts := gocb.ClusterOptions{ + // TODO add opentracing Tracer: + // TODO add metrics Meter: + } + + opts.TimeoutsConfig = gocb.TimeoutsConfig{ + ConnectTimeout: timeout, + KVTimeout: timeout, + KVDurableTimeout: timeout, + ViewTimeout: timeout, + QueryTimeout: timeout, + AnalyticsTimeout: timeout, + SearchTimeout: timeout, + ManagementTimeout: timeout, + } + + if conf.Contains("username") { + username, err := conf.FieldString("username") + if err != nil { + return nil, err + } + password, err := conf.FieldString("password") + if err != nil { + return nil, err + } + opts.Authenticator = gocb.PasswordAuthenticator{ + Username: username, + Password: password, + } + } + + tr, err := conf.FieldString("transcoder") + if err != nil { + return nil, err + } + switch client.Transcoder(tr) { + case client.TranscoderJSON: + opts.Transcoder = gocb.NewJSONTranscoder() + case client.TranscoderRaw: + opts.Transcoder = gocb.NewRawBinaryTranscoder() + case client.TranscoderRawJSON: + opts.Transcoder = gocb.NewRawJSONTranscoder() + case client.TranscoderRawString: + opts.Transcoder = gocb.NewRawStringTranscoder() + case client.TranscoderLegacy: + opts.Transcoder = gocb.NewLegacyTranscoder() + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) + } + + cluster, err := gocb.Connect(url, opts) + if err != nil { + return nil, err + } + + // check that we can do query + err = cluster.Bucket(bucket).WaitUntilReady(timeout, nil) + if err != nil { + return nil, err + } + + proc := &couchbaseClient{ + cluster: cluster, + } + + // retrieve collection + if conf.Contains("collection") { + collectionStr, err := conf.FieldString("collection") + if err != nil { + return nil, err + } + proc.collection = cluster.Bucket(bucket).Collection(collectionStr) + } else { + proc.collection = cluster.Bucket(bucket).DefaultCollection() + } + + return proc, nil +} + +func (p *couchbaseClient) Close(ctx context.Context) error { + return p.cluster.Close(&gocb.ClusterCloseOptions{}) +} diff --git a/internal/impl/couchbase/client/config.go b/internal/impl/couchbase/client/config.go new file mode 100644 index 0000000000..b886e81c43 --- /dev/null +++ b/internal/impl/couchbase/client/config.go @@ -0,0 +1,33 @@ +package client + +// Transcoder represents the transcoder that will be used by Couchbase. +type Transcoder string + +const ( + // TranscoderRaw raw operation. + TranscoderRaw Transcoder = "raw" + // TranscoderRawJSON rawjson transcoder. + TranscoderRawJSON Transcoder = "rawjson" + // TranscoderRawString rawstring transcoder. + TranscoderRawString Transcoder = "rawstring" + // TranscoderJSON JSON transcoder. + TranscoderJSON Transcoder = "json" + // TranscoderLegacy Legacy transcoder. + TranscoderLegacy Transcoder = "legacy" +) + +// Operation represents the operation that will be performed by Couchbase. +type Operation string + +const ( + // OperationGet Get operation. + OperationGet Operation = "get" + // OperationInsert Insert operation. + OperationInsert Operation = "insert" + // OperationRemove Delete operation. + OperationRemove Operation = "remove" + // OperationReplace Replace operation. + OperationReplace Operation = "replace" + // OperationUpsert Upsert operation. + OperationUpsert Operation = "upsert" +) diff --git a/internal/impl/couchbase/client/docs.go b/internal/impl/couchbase/client/docs.go new file mode 100644 index 0000000000..eb3575c6f0 --- /dev/null +++ b/internal/impl/couchbase/client/docs.go @@ -0,0 +1,24 @@ +package client + +import ( + "github.com/benthosdev/benthos/v4/public/service" +) + +// NewConfigSpec constructs a new Couchbase ConfigSpec with common config fields +func NewConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + // TODO Stable(). + Field(service.NewStringField("url").Description("Couchbase connection string.").Example("couchbase://localhost:11210")). + Field(service.NewStringField("username").Description("Username to connect to the cluster.").Optional()). + Field(service.NewStringField("password").Description("Password to connect to the cluster.").Optional()). + Field(service.NewStringField("bucket").Description("Couchbase bucket.")). + Field(service.NewStringField("collection").Description("Bucket collection.").Default("_default").Advanced().Optional()). + Field(service.NewStringAnnotatedEnumField("transcoder", map[string]string{ + string(TranscoderRaw): `RawBinaryTranscoder implements passthrough behavior of raw binary data. This transcoder does not apply any serialization. This will apply the following behavior to the value: binary ([]byte) -> binary bytes, binary expectedFlags. default -> error.`, + string(TranscoderRawJSON): `RawJSONTranscoder implements passthrough behavior of JSON data. This transcoder does not apply any serialization. It will forward data across the network without incurring unnecessary parsing costs. This will apply the following behavior to the value: binary ([]byte) -> JSON bytes, JSON expectedFlags. string -> JSON bytes, JSON expectedFlags. default -> error.`, + string(TranscoderRawString): `RawStringTranscoder implements passthrough behavior of raw string data. This transcoder does not apply any serialization. This will apply the following behavior to the value: string -> string bytes, string expectedFlags. default -> error.`, + string(TranscoderJSON): `JSONTranscoder implements the default transcoding behavior and applies JSON transcoding to all values. This will apply the following behavior to the value: binary ([]byte) -> error. default -> JSON value, JSON Flags.`, + string(TranscoderLegacy): `LegacyTranscoder implements the behaviour for a backward-compatible transcoder. This transcoder implements behaviour matching that of gocb v1.This will apply the following behavior to the value: binary ([]byte) -> binary bytes, Binary expectedFlags. string -> string bytes, String expectedFlags. default -> JSON value, JSON expectedFlags.`, + }).Description("Couchbase transcoder to use.").Default(string(TranscoderLegacy)).Advanced()). + Field(service.NewDurationField("timeout").Description("Operation timeout.").Advanced().Default("15s")) +} diff --git a/internal/impl/couchbase/couchbase.go b/internal/impl/couchbase/couchbase.go new file mode 100644 index 0000000000..387361c1ec --- /dev/null +++ b/internal/impl/couchbase/couchbase.go @@ -0,0 +1,61 @@ +package couchbase + +import ( + "fmt" + + "github.com/couchbase/gocb/v2" +) + +func valueFromOp(op gocb.BulkOp) (out any, err error) { + switch o := op.(type) { + case *gocb.GetOp: + if o.Err != nil { + return nil, o.Err + } + err := o.Result.Content(&out) + return out, err + case *gocb.InsertOp: + return nil, o.Err + case *gocb.RemoveOp: + return nil, o.Err + case *gocb.ReplaceOp: + return nil, o.Err + case *gocb.UpsertOp: + return nil, o.Err + } + + return nil, fmt.Errorf("type not supported") +} + +func get(key string, _ []byte) gocb.BulkOp { + return &gocb.GetOp{ + ID: key, + } +} + +func insert(key string, data []byte) gocb.BulkOp { + return &gocb.InsertOp{ + ID: key, + Value: data, + } +} + +func remove(key string, _ []byte) gocb.BulkOp { + return &gocb.RemoveOp{ + ID: key, + } +} + +func replace(key string, data []byte) gocb.BulkOp { + return &gocb.ReplaceOp{ + ID: key, + Value: data, + } +} + +func upsert(key string, data []byte) gocb.BulkOp { + return &gocb.UpsertOp{ + ID: key, + Value: data, + } +} diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go new file mode 100644 index 0000000000..28a340247b --- /dev/null +++ b/internal/impl/couchbase/processor.go @@ -0,0 +1,163 @@ +package couchbase + +import ( + "context" + "errors" + "fmt" + + "github.com/couchbase/gocb/v2" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase/client" + "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/benthosdev/benthos/v4/public/service" +) + +var ( + // ErrInvalidOperation specified operation is not supported. + ErrInvalidOperation = errors.New("invalid operation") + // ErrContentRequired content field is required. + ErrContentRequired = errors.New("content required") +) + +// ProcessorConfig export couchbase processor specification. +func ProcessorConfig() *service.ConfigSpec { + return client.NewConfigSpec(). + // TODO Stable(). + Version("4.10.0"). + Categories("Integration"). + Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). + Description("When inserting, replacing or upserting documents, each must have the `content` property set."). + Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)). + Field(service.NewBloblangField("content").Description("Document content.").Optional()). + Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ + string(client.OperationGet): "fetch a document.", + string(client.OperationInsert): "insert a new document.", + string(client.OperationRemove): "delete a document.", + string(client.OperationReplace): "replace the contents of a document.", + string(client.OperationUpsert): "creates a new document if it does not exist, if it does exist then it updates it.", + }).Description("Couchbase operation to perform.").Default(string(client.OperationGet))). + LintRule(`root = if ((this.operation == "insert" || this.operation == "replace" || this.operation == "upsert") && !this.exists("content")) { [ "content must be set for insert, replace and upsert operations." ] }`) +} + +func init() { + err := service.RegisterBatchProcessor("couchbase", ProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { + return NewProcessor(conf, mgr) + }, + ) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +// Processor stores or retrieves data from couchbase for each message of a +// batch. +type Processor struct { + *couchbaseClient + id *service.InterpolatedString + content *bloblang.Executor + op func(key string, data []byte) gocb.BulkOp +} + +// NewProcessor returns a Couchbase processor. +func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processor, error) { + cl, err := getClient(conf, mgr) + if err != nil { + return nil, err + } + p := &Processor{ + couchbaseClient: cl, + } + + if p.id, err = conf.FieldInterpolatedString("id"); err != nil { + return nil, err + } + + if conf.Contains("content") { + if p.content, err = conf.FieldBloblang("content"); err != nil { + return nil, err + } + } + + op, err := conf.FieldString("operation") + if err != nil { + return nil, err + } + switch client.Operation(op) { + case client.OperationGet: + p.op = get + case client.OperationRemove: + p.op = remove + case client.OperationInsert: + if p.content == nil { + return nil, ErrContentRequired + } + p.op = insert + case client.OperationReplace: + if p.content == nil { + return nil, ErrContentRequired + } + p.op = replace + case client.OperationUpsert: + if p.content == nil { + return nil, ErrContentRequired + } + p.op = upsert + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) + } + + return p, nil +} + +// ProcessBatch applies the processor to a message batch, either creating >0 +// resulting messages or a response to be sent back to the message source. +func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBatch) ([]service.MessageBatch, error) { + newMsg := inBatch.Copy() + ops := make([]gocb.BulkOp, len(inBatch)) + + // generate query + for index := range newMsg { + // generate id + k := inBatch.InterpolatedString(index, p.id) + + // generate content + var content []byte + if p.content != nil { + res, err := inBatch.BloblangQuery(index, p.content) + if err != nil { + return nil, err + } + content, err = res.AsBytes() + if err != nil { + return nil, err + } + } + + ops[index] = p.op(k, content) + } + + // execute + err := p.collection.Do(ops, &gocb.BulkOpOptions{}) + if err != nil { + return nil, err + } + + // set results + for index, part := range newMsg { + out, err := valueFromOp(ops[index]) + if err != nil { + part.SetError(fmt.Errorf("couchbase operator failed: %w", err)) + } + + if data, ok := out.([]byte); ok { + part.SetBytes(data) + } else if out != nil { + part.SetStructured(out) + } + } + + return []service.MessageBatch{newMsg}, nil +} diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go new file mode 100644 index 0000000000..b56ce93364 --- /dev/null +++ b/internal/impl/couchbase/processor_test.go @@ -0,0 +1,381 @@ +package couchbase_test + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase" + "github.com/benthosdev/benthos/v4/internal/integration" + "github.com/benthosdev/benthos/v4/public/service" + "github.com/bxcodec/faker/v3" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessorConfigLinting(t *testing.T) { + configTests := []struct { + name string + config string + errContains string + }{ + { + name: "get content not required", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'get' +`, + }, + { + name: "remove content not required", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'remove' +`, + }, + { + name: "missing insert content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'insert' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "missing replace content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'replace' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "missing upsert content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'upsert' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "insert with content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + content: 'root = this' + operation: 'insert' +`, + }, + } + + env := service.NewEnvironment() + for _, test := range configTests { + t.Run(test.name, func(t *testing.T) { + strm := env.NewStreamBuilder() + err := strm.AddProcessorYAML(test.config) + if test.errContains == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestProcessorIntegration(t *testing.T) { + integration.CheckSkip(t) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = 30 * time.Second + if deadline, ok := t.Deadline(); ok { + pool.MaxWait = time.Until(deadline) - 100*time.Millisecond + } + + pwd, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get working directory: %s", err) + } + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "couchbase", + Tag: "latest", + Cmd: []string{"/opt/couchbase/configure-server.sh"}, + Env: []string{ + "CLUSTER_NAME=couchbase", + "COUCHBASE_ADMINISTRATOR_USERNAME=benthos", + "COUCHBASE_ADMINISTRATOR_PASSWORD=password", + }, + Mounts: []string{ + fmt.Sprintf("%s/testdata/configure-server.sh:/opt/couchbase/configure-server.sh", pwd), + }, + PortBindings: map[docker.Port][]docker.PortBinding{ + "8091/tcp": { + { + HostIP: "0.0.0.0", HostPort: "8091", + }, + }, + "11210/tcp": { + { + HostIP: "0.0.0.0", HostPort: "11210", + }, + }, + }, + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + _ = resource.Expire(900) + + // Look for readyness + var stderr bytes.Buffer + time.Sleep(15 * time.Second) + for { + time.Sleep(time.Second) + exitCode, err := resource.Exec([]string{"/usr/bin/cat", "/is-ready"}, dockertest.ExecOptions{ + StdErr: &stderr, // without stderr exit code is not reported + }) + if exitCode == 0 && err == nil { + break + } + t.Logf("exit code: %d, err: %s", exitCode, err) + errB, err := io.ReadAll(&stderr) + require.NoError(t, err) + t.Logf("stderr: %s", string(errB)) + } + + t.Logf("couchbase cluster is ready") + + port := resource.GetPort("11210/tcp") + require.NotEmpty(t, port) + + uid := faker.UUIDHyphenated() + payload := fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + + t.Run("Insert", func(t *testing.T) { + testCouchbaseProcessorInsert(uid, payload, port, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) + }) + t.Run("Remove", func(t *testing.T) { + testCouchbaseProcessorRemove(uid, port, t) + }) + t.Run("GetMissing", func(t *testing.T) { + testCouchbaseProcessorGetMissing(uid, port, t) + }) + + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Upsert", func(t *testing.T) { + testCouchbaseProcessorUpsert(uid, payload, port, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) + }) + + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Replace", func(t *testing.T) { + testCouchbaseProcessorReplace(uid, payload, port, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) + }) +} + +func getProc(tb testing.TB, config string) *couchbase.Processor { + tb.Helper() + + confSpec := couchbase.ProcessorConfig() + env := service.NewEnvironment() + + pConf, err := confSpec.ParseYAML(config, env) + require.NoError(tb, err) + proc, err := couchbase.NewProcessor(pConf, service.MockResources()) + require.NoError(tb, err) + require.NotNil(tb, proc) + + return proc +} + +func testCouchbaseProcessorInsert(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'insert' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) +} + +func testCouchbaseProcessorUpsert(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'upsert' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) +} + +func testCouchbaseProcessorReplace(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'replace' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) +} + +func testCouchbaseProcessorGet(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'get' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message should contain expected payload. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) +} + +func testCouchbaseProcessorRemove(uid, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'remove' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.Equal(t, uid, string(dataOut)) +} + +func testCouchbaseProcessorGetMissing(uid, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'get' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) + + // message should contain an error. + assert.Error(t, msgOut[0][0].GetError(), "TODO") + + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.Equal(t, uid, string(dataOut)) +} diff --git a/internal/impl/couchbase/testdata/configure-server.sh b/internal/impl/couchbase/testdata/configure-server.sh new file mode 100755 index 0000000000..4c695a6d84 --- /dev/null +++ b/internal/impl/couchbase/testdata/configure-server.sh @@ -0,0 +1,30 @@ +#!bin/bash + +set -m + +/entrypoint.sh couchbase-server & + +sleep 8 + +# Setup initial cluster/ Initialize Node +couchbase-cli cluster-init -c 127.0.0.1 --cluster-name $CLUSTER_NAME --cluster-username $COUCHBASE_ADMINISTRATOR_USERNAME \ + --cluster-password $COUCHBASE_ADMINISTRATOR_PASSWORD --services data --cluster-ramsize 1024 + +sleep 5 + +# Setup Administrator username and password +curl -s http://127.0.0.1:8091/settings/web -d port=8091 -d username=$COUCHBASE_ADMINISTRATOR_USERNAME -d password=$COUCHBASE_ADMINISTRATOR_PASSWORD + +# Setup buckets +for BUCKET_NAME in testing +do + couchbase-cli bucket-create -c 127.0.0.1:8091 --username $COUCHBASE_ADMINISTRATOR_USERNAME \ + --password $COUCHBASE_ADMINISTRATOR_PASSWORD --bucket $BUCKET_NAME --bucket-type couchbase \ + --bucket-ramsize 128 + + sleep 5 +done + +touch /is-ready + +fg 1 diff --git a/public/components/all/package.go b/public/components/all/package.go index 1c74b95d53..b0253faa5d 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -13,6 +13,7 @@ import ( _ "github.com/benthosdev/benthos/v4/public/components/beanstalkd" _ "github.com/benthosdev/benthos/v4/public/components/cassandra" _ "github.com/benthosdev/benthos/v4/public/components/confluent" + _ "github.com/benthosdev/benthos/v4/public/components/couchbase" _ "github.com/benthosdev/benthos/v4/public/components/dgraph" _ "github.com/benthosdev/benthos/v4/public/components/elasticsearch" _ "github.com/benthosdev/benthos/v4/public/components/gcp" diff --git a/public/components/couchbase/package.go b/public/components/couchbase/package.go new file mode 100644 index 0000000000..75ddf88597 --- /dev/null +++ b/public/components/couchbase/package.go @@ -0,0 +1,6 @@ +package couchbase + +import ( + // Bring in the internal plugin definitions. + _ "github.com/benthosdev/benthos/v4/internal/impl/couchbase" +) diff --git a/website/docs/components/processors/couchbase.md b/website/docs/components/processors/couchbase.md new file mode 100644 index 0000000000..99bd30d0c4 --- /dev/null +++ b/website/docs/components/processors/couchbase.md @@ -0,0 +1,177 @@ +--- +title: couchbase +type: processor +status: experimental +categories: ["Integration"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads. + +Introduced in version 4.10.0. + + + + + + +```yml +# Common config fields, showing default values +label: "" +couchbase: + url: "" + username: "" + password: "" + bucket: "" + id: "" + content: "" + operation: get +``` + + + + +```yml +# All config fields, showing default values +label: "" +couchbase: + url: "" + username: "" + password: "" + bucket: "" + collection: _default + transcoder: legacy + timeout: 15s + id: "" + content: "" + operation: get +``` + + + + +When inserting, replacing or upserting documents, each must have the `content` property set. + +## Fields + +### `url` + +Couchbase connection string. + + +Type: `string` + +```yml +# Examples + +url: couchbase://localhost:11210 +``` + +### `username` + +Username to connect to the cluster. + + +Type: `string` + +### `password` + +Password to connect to the cluster. + + +Type: `string` + +### `bucket` + +Couchbase bucket. + + +Type: `string` + +### `collection` + +Bucket collection. + + +Type: `string` +Default: `"_default"` + +### `transcoder` + +Couchbase transcoder to use. + + +Type: `string` +Default: `"legacy"` + +| Option | Summary | +|---|---| +| `json` | JSONTranscoder implements the default transcoding behavior and applies JSON transcoding to all values. This will apply the following behavior to the value: binary ([]byte) -> error. default -> JSON value, JSON Flags. | +| `legacy` | LegacyTranscoder implements the behaviour for a backward-compatible transcoder. This transcoder implements behaviour matching that of gocb v1.This will apply the following behavior to the value: binary ([]byte) -> binary bytes, Binary expectedFlags. string -> string bytes, String expectedFlags. default -> JSON value, JSON expectedFlags. | +| `raw` | RawBinaryTranscoder implements passthrough behavior of raw binary data. This transcoder does not apply any serialization. This will apply the following behavior to the value: binary ([]byte) -> binary bytes, binary expectedFlags. default -> error. | +| `rawjson` | RawJSONTranscoder implements passthrough behavior of JSON data. This transcoder does not apply any serialization. It will forward data across the network without incurring unnecessary parsing costs. This will apply the following behavior to the value: binary ([]byte) -> JSON bytes, JSON expectedFlags. string -> JSON bytes, JSON expectedFlags. default -> error. | +| `rawstring` | RawStringTranscoder implements passthrough behavior of raw string data. This transcoder does not apply any serialization. This will apply the following behavior to the value: string -> string bytes, string expectedFlags. default -> error. | + + +### `timeout` + +Operation timeout. + + +Type: `string` +Default: `"15s"` + +### `id` + +Document id. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` + +```yml +# Examples + +id: ${! json("id") } +``` + +### `content` + +Document content. + + +Type: `string` + +### `operation` + +Couchbase operation to perform. + + +Type: `string` +Default: `"get"` + +| Option | Summary | +|---|---| +| `get` | fetch a document. | +| `insert` | insert a new document. | +| `remove` | delete a document. | +| `replace` | replace the contents of a document. | +| `upsert` | creates a new document if it does not exist, if it does exist then it updates it. | + + +