From c5736ea737e6f435e1b34bcee566c75cf0b30813 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Fri, 2 Dec 2022 00:27:45 +0100 Subject: [PATCH 1/5] lay down bases --- go.mod | 2 + go.sum | 6 + internal/impl/couchbase/client.go | 126 +++++ internal/impl/couchbase/client/config.go | 33 ++ internal/impl/couchbase/client/docs.go | 24 + internal/impl/couchbase/couchbase.go | 61 +++ internal/impl/couchbase/processor.go | 166 ++++++ internal/impl/couchbase/processor_test.go | 481 ++++++++++++++++++ public/components/all/package.go | 1 + public/components/couchbase/package.go | 6 + .../docs/components/processors/couchbase.md | 176 +++++++ 11 files changed, 1082 insertions(+) create mode 100644 internal/impl/couchbase/client.go create mode 100644 internal/impl/couchbase/client/config.go create mode 100644 internal/impl/couchbase/client/docs.go create mode 100644 internal/impl/couchbase/couchbase.go create mode 100644 internal/impl/couchbase/processor.go create mode 100644 internal/impl/couchbase/processor_test.go create mode 100644 public/components/couchbase/package.go create mode 100644 website/docs/components/processors/couchbase.md diff --git a/go.mod b/go.mod index 447300672d..362b6069fe 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/clbanning/mxj/v2 v2.5.5 github.com/colinmarc/hdfs v1.1.3 + github.com/couchbase/gocb/v2 v2.6.0 github.com/denisenkom/go-mssqldb v0.11.0 github.com/dgraph-io/ristretto v0.1.0 github.com/dustin/go-humanize v1.0.0 @@ -162,6 +163,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cockroachdb/apd/v2 v2.0.1 // indirect github.com/containerd/continuity v0.2.2 // indirect + github.com/couchbase/gocbcore/v10 v10.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 516f707e1c..298d3de7ba 100644 --- a/go.sum +++ b/go.sum @@ -338,6 +338,12 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/couchbase/gocb/v2 v2.6.0 h1:DhkLNatDcddCcS411D6kNwZspSEAWVeI/N3abzt/HLc= +github.com/couchbase/gocb/v2 v2.6.0/go.mod h1:5su8b1gBF3V4j07SiGw+CA0bK9a84YWEb6UH7up0MEs= +github.com/couchbase/gocbcore/v10 v10.2.0 h1:ZoSBLtcmt+lXbxVVT4SAhXDVNR+D48iSOZWNzHucVVk= +github.com/couchbase/gocbcore/v10 v10.2.0/go.mod h1:qkPnOBziCs0guMEEvd0cRFo+AjOW0yEL99cU3I4n3Ao= +github.com/couchbaselabs/gocaves/client v0.0.0-20220223122017-22859b310bd2 h1:UlwJ2GWpZQAQCLHyO3xHKcqAjUUcX2w7FKpbxCIUQks= +github.com/couchbaselabs/gocaves/client v0.0.0-20220223122017-22859b310bd2/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= diff --git a/internal/impl/couchbase/client.go b/internal/impl/couchbase/client.go new file mode 100644 index 0000000000..2512c68582 --- /dev/null +++ b/internal/impl/couchbase/client.go @@ -0,0 +1,126 @@ +package couchbase + +import ( + "context" + "errors" + "fmt" + + "github.com/couchbase/gocb/v2" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase/client" + "github.com/benthosdev/benthos/v4/public/service" +) + +// ErrInvalidTranscoder specified transcoder is not supported. +var ErrInvalidTranscoder = errors.New("invalid transcoder") + +type couchbaseClient struct { + collection *gocb.Collection + cluster *gocb.Cluster +} + +func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseClient, error) { + + // retrieve params + url, err := conf.FieldString("url") + if err != nil { + return nil, err + } + bucket, err := conf.FieldString("bucket") + if err != nil { + return nil, err + } + timeout, err := conf.FieldDuration("timeout") + if err != nil { + return nil, err + } + + // setup couchbase + opts := gocb.ClusterOptions{ + // TODO Tracer: mgr.OtelTracer().Tracer(name).Start(context.Background(), operationName) + // TODO Meter: mgr.Metrics(), + } + + if conf.Contains("timeout") { + opts.TimeoutsConfig = gocb.TimeoutsConfig{ + ConnectTimeout: timeout, + KVTimeout: timeout, + KVDurableTimeout: timeout, + ViewTimeout: timeout, + QueryTimeout: timeout, + AnalyticsTimeout: timeout, + SearchTimeout: timeout, + ManagementTimeout: timeout, + } + } + + if conf.Contains("username") { + username, err := conf.FieldString("username") + if err != nil { + return nil, err + } + password, err := conf.FieldString("password") + if err != nil { + return nil, err + } + opts.Authenticator = gocb.PasswordAuthenticator{ + Username: username, + Password: password, + } + } + + if conf.Contains("transcoder") { // TODO is this really needed with default + tr, err := conf.FieldString("transcoder") + if err != nil { + return nil, err + } + switch client.Transcoder(tr) { + case client.TranscoderJSON: + opts.Transcoder = gocb.NewJSONTranscoder() + case client.TranscoderRaw: + opts.Transcoder = gocb.NewRawBinaryTranscoder() + case client.TranscoderRawJSON: + opts.Transcoder = gocb.NewRawJSONTranscoder() + case client.TranscoderRawString: + opts.Transcoder = gocb.NewRawStringTranscoder() + case client.TranscoderLegacy: + opts.Transcoder = gocb.NewLegacyTranscoder() + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) // TODO is this really needed with enum + } + } else { + opts.Transcoder = gocb.NewLegacyTranscoder() + } + + cluster, err := gocb.Connect(url, opts) + if err != nil { + return nil, err + } + + // check that we can do query + err = cluster.Bucket(bucket).WaitUntilReady(timeout, nil) + if err != nil { + return nil, err + } + + proc := &couchbaseClient{ + cluster: cluster, + } + + // retrieve collection + if conf.Contains("collection") { + collectionStr, err := conf.FieldString("collection") + if err != nil { + return nil, err + } + proc.collection = cluster.Bucket(bucket).Collection(collectionStr) + } else { + proc.collection = cluster.Bucket(bucket).DefaultCollection() + } + + return proc, nil +} + +func (p *couchbaseClient) Close(ctx context.Context) error { + return p.cluster.Close(&gocb.ClusterCloseOptions{}) +} diff --git a/internal/impl/couchbase/client/config.go b/internal/impl/couchbase/client/config.go new file mode 100644 index 0000000000..b886e81c43 --- /dev/null +++ b/internal/impl/couchbase/client/config.go @@ -0,0 +1,33 @@ +package client + +// Transcoder represents the transcoder that will be used by Couchbase. +type Transcoder string + +const ( + // TranscoderRaw raw operation. + TranscoderRaw Transcoder = "raw" + // TranscoderRawJSON rawjson transcoder. + TranscoderRawJSON Transcoder = "rawjson" + // TranscoderRawString rawstring transcoder. + TranscoderRawString Transcoder = "rawstring" + // TranscoderJSON JSON transcoder. + TranscoderJSON Transcoder = "json" + // TranscoderLegacy Legacy transcoder. + TranscoderLegacy Transcoder = "legacy" +) + +// Operation represents the operation that will be performed by Couchbase. +type Operation string + +const ( + // OperationGet Get operation. + OperationGet Operation = "get" + // OperationInsert Insert operation. + OperationInsert Operation = "insert" + // OperationRemove Delete operation. + OperationRemove Operation = "remove" + // OperationReplace Replace operation. + OperationReplace Operation = "replace" + // OperationUpsert Upsert operation. + OperationUpsert Operation = "upsert" +) diff --git a/internal/impl/couchbase/client/docs.go b/internal/impl/couchbase/client/docs.go new file mode 100644 index 0000000000..9c78901815 --- /dev/null +++ b/internal/impl/couchbase/client/docs.go @@ -0,0 +1,24 @@ +package client + +import ( + "github.com/benthosdev/benthos/v4/public/service" +) + +// NewConfigSpec constructs a new Couchbase ConfigSpec with common config fields +func NewConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + // TODO Stable(). + Field(service.NewStringField("url").Description("Couchbase connection string.").Example("couchbase://localhost:11210")). + Field(service.NewStringField("username").Description("Username to connect to the cluster.").Optional()). + Field(service.NewStringField("password").Description("Password to connect to the cluster.").Optional()). + Field(service.NewStringField("bucket").Description("Couchbase bucket.")). + Field(service.NewStringField("collection").Description("Bucket collection.").Default("_default").Advanced().Optional()). + Field(service.NewStringAnnotatedEnumField("transcoder", map[string]string{ + string(TranscoderRaw): "fetch a document.", + string(TranscoderRawJSON): "delete a document.", + string(TranscoderRawString): "replace the contents of a document.", + string(TranscoderJSON): "insert a new document.", + string(TranscoderLegacy): "creates a new document if it does not exist, if it does exist then it updates it.", + }).Description("Couchbase transcoder to use.").Default(string(TranscoderLegacy)).Advanced()). + Field(service.NewDurationField("timeout").Description("Operation timeout.").Advanced().Optional()) +} diff --git a/internal/impl/couchbase/couchbase.go b/internal/impl/couchbase/couchbase.go new file mode 100644 index 0000000000..387361c1ec --- /dev/null +++ b/internal/impl/couchbase/couchbase.go @@ -0,0 +1,61 @@ +package couchbase + +import ( + "fmt" + + "github.com/couchbase/gocb/v2" +) + +func valueFromOp(op gocb.BulkOp) (out any, err error) { + switch o := op.(type) { + case *gocb.GetOp: + if o.Err != nil { + return nil, o.Err + } + err := o.Result.Content(&out) + return out, err + case *gocb.InsertOp: + return nil, o.Err + case *gocb.RemoveOp: + return nil, o.Err + case *gocb.ReplaceOp: + return nil, o.Err + case *gocb.UpsertOp: + return nil, o.Err + } + + return nil, fmt.Errorf("type not supported") +} + +func get(key string, _ []byte) gocb.BulkOp { + return &gocb.GetOp{ + ID: key, + } +} + +func insert(key string, data []byte) gocb.BulkOp { + return &gocb.InsertOp{ + ID: key, + Value: data, + } +} + +func remove(key string, _ []byte) gocb.BulkOp { + return &gocb.RemoveOp{ + ID: key, + } +} + +func replace(key string, data []byte) gocb.BulkOp { + return &gocb.ReplaceOp{ + ID: key, + Value: data, + } +} + +func upsert(key string, data []byte) gocb.BulkOp { + return &gocb.UpsertOp{ + ID: key, + Value: data, + } +} diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go new file mode 100644 index 0000000000..96038ffa37 --- /dev/null +++ b/internal/impl/couchbase/processor.go @@ -0,0 +1,166 @@ +package couchbase + +import ( + "context" + "errors" + "fmt" + + "github.com/couchbase/gocb/v2" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase/client" + "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/benthosdev/benthos/v4/public/service" +) + +var ( + // ErrInvalidOperation specified operation is not supported. + ErrInvalidOperation = errors.New("invalid operation") + // ErrContentRequired content field is required. + ErrContentRequired = errors.New("content required") +) + +func couchbaseProcessorConfig() *service.ConfigSpec { + return client.NewConfigSpec(). + // TODO Stable(). + Version("4.10.0"). + Categories("Integration"). + Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). + Description("When inserting, replacing or upserting documents, each must have the `content` property set."). + Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! meta("id") }`)). + Field(service.NewBloblangField("content").Description("Document content.").Optional()). + Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ + string(client.OperationGet): "fetch a document.", + string(client.OperationInsert): "insert a new document.", + string(client.OperationRemove): "delete a document.", + string(client.OperationReplace): "replace the contents of a document.", + string(client.OperationUpsert): "creates a new document if it does not exist, if it does exist then it updates it.", + }).Description("Couchbase operation to perform.").Default(string(client.OperationGet))). + LintRule(`root = if ((this.operation == "insert" || this.operation == "replace" || this.operation == "upsert") && !this.exists("content")) { [ "content must be set for insert, replace and upsert operations." ] }`) +} + +func init() { + err := service.RegisterBatchProcessor("couchbase", couchbaseProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { + return NewProcessor(conf, mgr) + }, + ) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +// Processor stores or retrieves data from couchbase for each message of a +// batch. +type Processor struct { + *couchbaseClient + id *service.InterpolatedString + content *bloblang.Executor + op func(key string, data []byte) gocb.BulkOp +} + +// NewProcessor returns a Couchbase processor. +func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processor, error) { + cl, err := getClient(conf, mgr) + if err != nil { + return nil, err + } + p := &Processor{ + couchbaseClient: cl, + } + + if p.id, err = conf.FieldInterpolatedString("id"); err != nil { + return nil, err + } + + if conf.Contains("content") { + if p.content, err = conf.FieldBloblang("content"); err != nil { + return nil, err + } + } + + if conf.Contains("operation") { // TODO is this really needed with default + op, err := conf.FieldString("operation") + if err != nil { + return nil, err + } + switch client.Operation(op) { + case client.OperationGet: + p.op = get + case client.OperationRemove: + p.op = remove + case client.OperationInsert: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint + } + p.op = insert + case client.OperationReplace: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint + } + p.op = replace + case client.OperationUpsert: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint + } + p.op = upsert + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) // TODO is this really needed with enum + } + } else { + p.op = get + } + + return p, nil +} + +// ProcessBatch applies the processor to a message batch, either creating >0 +// resulting messages or a response to be sent back to the message source. +func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBatch) ([]service.MessageBatch, error) { + newMsg := inBatch.Copy() + ops := make([]gocb.BulkOp, len(inBatch)) + + // generate query + for index := range newMsg { + // generate id + k := inBatch.InterpolatedString(index, p.id) + + // generate content + var content []byte + if p.content != nil { + res, err := inBatch.BloblangQuery(index, p.content) + if err != nil { + return nil, err + } + content, err = res.AsBytes() + if err != nil { + return nil, err + } + } + + ops[index] = p.op(k, content) + } + + // execute + err := p.collection.Do(ops, &gocb.BulkOpOptions{}) + if err != nil { + return nil, err + } + + // set results + for index, part := range newMsg { + out, err := valueFromOp(ops[index]) + if err != nil { + part.SetError(fmt.Errorf("couchbase operator failed: %w", err)) + } + + if data, ok := out.([]byte); ok { + part.SetBytes(data) + } else if out != nil { + part.SetStructured(out) + } + } + + return []service.MessageBatch{newMsg}, nil +} diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go new file mode 100644 index 0000000000..032a45dece --- /dev/null +++ b/internal/impl/couchbase/processor_test.go @@ -0,0 +1,481 @@ +package couchbase_test + +/* +func TestProcessorIntegration(t *testing.T) { + integration.CheckSkip(t) + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + pool, err := dockertest.NewPool("") + if err != nil { + t.Skipf("Could not connect to docker: %s", err) + } + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "mongo", + Tag: "latest", + Env: []string{ + "MONGO_INITDB_ROOT_USERNAME=mongoadmin", + "MONGO_INITDB_ROOT_PASSWORD=secret", + }, + ExposedPorts: []string{"27017"}, + }) + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + var mongoClient *mongo.Client + require.NoError(t, pool.Retry(func() error { + url := "mongodb://localhost:" + resource.GetPort("27017/tcp") + conf := client.NewConfig() + conf.URL = url + conf.Username = "mongoadmin" + conf.Password = "secret" + + if mongoClient == nil { + mongoClient, err = conf.Client() + if err != nil { + return err + } + } + + if err := mongoClient.Connect(context.Background()); err != nil { + return err + } + + return mongoClient.Database("TestDB").CreateCollection(context.Background(), "TestCollection") + })) + + port := resource.GetPort("27017/tcp") + t.Run("insert", func(t *testing.T) { + testMongoDBProcessorInsert(port, t) + }) + t.Run("delete one", func(t *testing.T) { + testMongoDBProcessorDeleteOne(port, t) + }) + t.Run("delete many", func(t *testing.T) { + testMongoDBProcessorDeleteMany(port, t) + }) + t.Run("replace one", func(t *testing.T) { + testMongoDBProcessorReplaceOne(port, t) + }) + t.Run("update one", func(t *testing.T) { + testMongoDBProcessorUpdateOne(port, t) + }) + t.Run("find one", func(t *testing.T) { + testMongoDBProcessorFindOne(port, t) + }) +} + +func testMongoDBProcessorInsert(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + mongoConfig := processor.MongoDBConfig{ + MongoDB: c, + WriteConcern: client.WriteConcern{ + W: "1", + J: false, + WTimeout: "", + }, + Operation: "insert-one", + DocumentMap: "root.a = this.foo\nroot.b = this.bar", + } + + conf.MongoDB = mongoConfig + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + + parts := [][]byte{ + []byte(`{"foo":"foo1","bar":"bar1"}`), + []byte(`{"foo":"foo2","bar":"bar2"}`), + } + + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + + expectedResult := [][]byte{ + []byte(`{"foo":"foo1","bar":"bar1"}`), + []byte(`{"foo":"foo2","bar":"bar2"}`), + } + + assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + + // Validate the record is in the MongoDB + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + + result := collection.FindOne(context.Background(), bson.M{"a": "foo1", "b": "bar1"}) + b, err := result.DecodeBytes() + assert.NoError(t, err) + aVal := b.Lookup("a") + bVal := b.Lookup("b") + assert.Equal(t, `"foo1"`, aVal.String()) + assert.Equal(t, `"bar1"`, bVal.String()) + + result = collection.FindOne(context.Background(), bson.M{"a": "foo2", "b": "bar2"}) + b, err = result.DecodeBytes() + assert.NoError(t, err) + aVal = b.Lookup("a") + bVal = b.Lookup("b") + assert.Equal(t, `"foo2"`, aVal.String()) + assert.Equal(t, `"bar2"`, bVal.String()) +} + +func testMongoDBProcessorDeleteOne(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + mongoConfig := processor.MongoDBConfig{ + MongoDB: c, + WriteConcern: client.WriteConcern{ + W: "1", + J: false, + WTimeout: "100s", + }, + Operation: "delete-one", + FilterMap: "root.a = this.foo\nroot.b = this.bar", + } + + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete", "b": "bar_delete"}) + assert.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + conf.MongoDB = mongoConfig + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + + parts := [][]byte{ + []byte(`{"foo":"foo_delete","bar":"bar_delete"}`), + } + + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + + expectedResult := [][]byte{ + []byte(`{"foo":"foo_delete","bar":"bar_delete"}`), + } + + assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + + // Validate the record has been deleted from the db + result := collection.FindOne(context.Background(), bson.M{"a": "foo_delete", "b": "bar_delete"}) + b, err := result.DecodeBytes() + assert.Nil(t, b) + assert.Error(t, err, "mongo: no documents in result") +} + +func testMongoDBProcessorDeleteMany(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + mongoConfig := processor.MongoDBConfig{ + MongoDB: c, + WriteConcern: client.WriteConcern{ + W: "1", + J: false, + WTimeout: "100s", + }, + Operation: "delete-many", + FilterMap: "root.a = this.foo\nroot.b = this.bar", + } + + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many", "c": "c1"}) + assert.NoError(t, err) + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many", "c": "c2"}) + assert.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + conf.MongoDB = mongoConfig + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + + parts := [][]byte{ + []byte(`{"foo":"foo_delete_many","bar":"bar_delete_many"}`), + } + + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + + expectedResult := [][]byte{ + []byte(`{"foo":"foo_delete_many","bar":"bar_delete_many"}`), + } + assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + + // Validate the record has been deleted from the db + result := collection.FindOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many"}) + b, err := result.DecodeBytes() + assert.Nil(t, b) + assert.Error(t, err, "mongo: no documents in result") +} + +func testMongoDBProcessorReplaceOne(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + mongoConfig := processor.MongoDBConfig{ + MongoDB: c, + WriteConcern: client.WriteConcern{ + W: "1", + J: false, + WTimeout: "", + }, + Operation: "replace-one", + DocumentMap: "root.a = this.foo\nroot.b = this.bar", + FilterMap: "root.a = this.foo", + } + + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_replace", "b": "bar_old", "c": "c1"}) + assert.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + conf.MongoDB = mongoConfig + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + + parts := [][]byte{ + []byte(`{"foo":"foo_replace","bar":"bar_new"}`), + } + + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + + expectedResult := [][]byte{ + []byte(`{"foo":"foo_replace","bar":"bar_new"}`), + } + assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + + // Validate the record has been updated in the db + result := collection.FindOne(context.Background(), bson.M{"a": "foo_replace", "b": "bar_new"}) + b, err := result.DecodeBytes() + assert.NoError(t, err) + aVal := b.Lookup("a") + bVal := b.Lookup("b") + cVal := b.Lookup("c") + assert.Equal(t, `"foo_replace"`, aVal.String()) + assert.Equal(t, `"bar_new"`, bVal.String()) + assert.Equal(t, bson.RawValue{}, cVal) +} + +func testMongoDBProcessorUpdateOne(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + mongoConfig := processor.MongoDBConfig{ + MongoDB: c, + WriteConcern: client.WriteConcern{ + W: "1", + J: false, + WTimeout: "100s", + }, + Operation: "update-one", + DocumentMap: `root = {"$set": {"a": this.foo, "b": this.bar}}`, + FilterMap: "root.a = this.foo", + } + + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_update", "b": "bar_update_old", "c": "c1"}) + assert.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + conf.MongoDB = mongoConfig + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + + parts := [][]byte{ + []byte(`{"foo":"foo_update","bar":"bar_update_new"}`), + } + + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + + expectedResult := [][]byte{ + []byte(`{"foo":"foo_update","bar":"bar_update_new"}`), + } + assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + + // Validate the record has been updated in the db + result := collection.FindOne(context.Background(), bson.M{"a": "foo_update", "b": "bar_update_new"}) + b, err := result.DecodeBytes() + assert.NoError(t, err) + aVal := b.Lookup("a") + bVal := b.Lookup("b") + cVal := b.Lookup("c") + assert.Equal(t, `"foo_update"`, aVal.String()) + assert.Equal(t, `"bar_update_new"`, bVal.String()) + assert.Equal(t, `"c1"`, cVal.String()) +} + +func testMongoDBProcessorFindOne(port string, t *testing.T) { + conf := processor.NewConfig() + conf.Type = "mongodb" + + c := client.Config{ + URL: "mongodb://localhost:" + port, + Database: "TestDB", + Collection: "TestCollection", + Username: "mongoadmin", + Password: "secret", + } + + conf.MongoDB = processor.NewMongoDBConfig() + conf.MongoDB.MongoDB = c + conf.MongoDB.WriteConcern = client.WriteConcern{ + W: "1", + J: false, + WTimeout: "100s", + } + conf.MongoDB.Operation = "find-one" + conf.MongoDB.FilterMap = "root.a = this.a" + + mongoClient, err := c.Client() + require.NoError(t, err) + err = mongoClient.Connect(context.Background()) + require.NoError(t, err) + collection := mongoClient.Database("TestDB").Collection("TestCollection") + _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo", "b": "bar", "c": "baz", "answer_to_everything": 42}) + assert.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + for _, tt := range []struct { + name string + message string + marshalMode client.JSONMarshalMode + collection string + expected string + expectedErr error + }{ + { + name: "canonical marshal mode", + marshalMode: client.JSONMarshalModeCanonical, + message: `{"a":"foo","x":"ignore_me_via_filter_map"}`, + expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":{"$numberInt":"42"}}`, + }, + { + name: "relaxed marshal mode", + marshalMode: client.JSONMarshalModeRelaxed, + message: `{"a":"foo","x":"ignore_me_via_filter_map"}`, + expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":42}`, + }, + { + name: "no documents found", + message: `{"a":"notfound"}`, + expectedErr: mongo.ErrNoDocuments, + }, + { + name: "collection interpolation", + marshalMode: client.JSONMarshalModeCanonical, + collection: `${!json("col")}`, + message: `{"col":"TestCollection","a":"foo"}`, + expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":{"$numberInt":"42"}}`, + }, + } { + if tt.collection != "" { + conf.MongoDB.MongoDB.Collection = tt.collection + } + + conf.MongoDB.JSONMarshalMode = tt.marshalMode + + m, err := mongodb.NewProcessor(conf, mgr) + require.NoError(t, err) + resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, 1), message.QuickBatch([][]byte{[]byte(tt.message)})) + require.Nil(t, response) + require.Len(t, resMsgs, 1) + if tt.expectedErr != nil { + tmpErr := resMsgs[0].Get(0).ErrorGet() + require.Error(t, tmpErr) + require.Equal(t, mongo.ErrNoDocuments.Error(), tmpErr.Error()) + continue + } + + jdopts := jsondiff.DefaultJSONOptions() + diff, explanation := jsondiff.Compare(resMsgs[0].Get(0).AsBytes(), []byte(tt.expected), &jdopts) + assert.Equalf(t, jsondiff.SupersetMatch.String(), diff.String(), "%s: %s", tt.name, explanation) + } +} +*/ diff --git a/public/components/all/package.go b/public/components/all/package.go index 1c74b95d53..b0253faa5d 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -13,6 +13,7 @@ import ( _ "github.com/benthosdev/benthos/v4/public/components/beanstalkd" _ "github.com/benthosdev/benthos/v4/public/components/cassandra" _ "github.com/benthosdev/benthos/v4/public/components/confluent" + _ "github.com/benthosdev/benthos/v4/public/components/couchbase" _ "github.com/benthosdev/benthos/v4/public/components/dgraph" _ "github.com/benthosdev/benthos/v4/public/components/elasticsearch" _ "github.com/benthosdev/benthos/v4/public/components/gcp" diff --git a/public/components/couchbase/package.go b/public/components/couchbase/package.go new file mode 100644 index 0000000000..75ddf88597 --- /dev/null +++ b/public/components/couchbase/package.go @@ -0,0 +1,6 @@ +package couchbase + +import ( + // Bring in the internal plugin definitions. + _ "github.com/benthosdev/benthos/v4/internal/impl/couchbase" +) diff --git a/website/docs/components/processors/couchbase.md b/website/docs/components/processors/couchbase.md new file mode 100644 index 0000000000..d0c6613b46 --- /dev/null +++ b/website/docs/components/processors/couchbase.md @@ -0,0 +1,176 @@ +--- +title: couchbase +type: processor +status: experimental +categories: ["Integration"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads. + +Introduced in version 4.10.0. + + + + + + +```yml +# Common config fields, showing default values +label: "" +couchbase: + url: "" + username: "" + password: "" + bucket: "" + id: "" + content: "" + operation: get +``` + + + + +```yml +# All config fields, showing default values +label: "" +couchbase: + url: "" + username: "" + password: "" + bucket: "" + collection: _default + transcoder: legacy + timeout: "" + id: "" + content: "" + operation: get +``` + + + + +When inserting, replacing or upserting documents, each must have the `content` property set. + +## Fields + +### `url` + +Couchbase connection string. + + +Type: `string` + +```yml +# Examples + +url: couchbase://localhost:11210 +``` + +### `username` + +Username to connect to the cluster. + + +Type: `string` + +### `password` + +Password to connect to the cluster. + + +Type: `string` + +### `bucket` + +Couchbase bucket. + + +Type: `string` + +### `collection` + +Bucket collection. + + +Type: `string` +Default: `"_default"` + +### `transcoder` + +Couchbase transcoder to use. + + +Type: `string` +Default: `"legacy"` + +| Option | Summary | +|---|---| +| `json` | insert a new document. | +| `legacy` | creates a new document if it does not exist, if it does exist then it updates it. | +| `raw` | fetch a document. | +| `rawjson` | delete a document. | +| `rawstring` | replace the contents of a document. | + + +### `timeout` + +Operation timeout. + + +Type: `string` + +### `id` + +Document id. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` + +```yml +# Examples + +id: ${! meta("id") } +``` + +### `content` + +Document content. + + +Type: `string` + +### `operation` + +Couchbase operation to perform. + + +Type: `string` +Default: `"get"` + +| Option | Summary | +|---|---| +| `get` | fetch a document. | +| `insert` | insert a new document. | +| `remove` | delete a document. | +| `replace` | replace the contents of a document. | +| `upsert` | creates a new document if it does not exist, if it does exist then it updates it. | + + + From db44f35400b7e0a8db49acefbfc79ae84853d599 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Sat, 3 Dec 2022 20:57:04 +0100 Subject: [PATCH 2/5] add basic testing --- internal/impl/couchbase/client.go | 56 +- internal/impl/couchbase/client/docs.go | 2 +- internal/impl/couchbase/processor.go | 58 +- internal/impl/couchbase/processor_test.go | 724 ++++++++---------- .../couchbase/testdata/configure-server.sh | 30 + 5 files changed, 395 insertions(+), 475 deletions(-) create mode 100755 internal/impl/couchbase/testdata/configure-server.sh diff --git a/internal/impl/couchbase/client.go b/internal/impl/couchbase/client.go index 2512c68582..6fe5790850 100644 --- a/internal/impl/couchbase/client.go +++ b/internal/impl/couchbase/client.go @@ -41,17 +41,15 @@ func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseCl // TODO Meter: mgr.Metrics(), } - if conf.Contains("timeout") { - opts.TimeoutsConfig = gocb.TimeoutsConfig{ - ConnectTimeout: timeout, - KVTimeout: timeout, - KVDurableTimeout: timeout, - ViewTimeout: timeout, - QueryTimeout: timeout, - AnalyticsTimeout: timeout, - SearchTimeout: timeout, - ManagementTimeout: timeout, - } + opts.TimeoutsConfig = gocb.TimeoutsConfig{ + ConnectTimeout: timeout, + KVTimeout: timeout, + KVDurableTimeout: timeout, + ViewTimeout: timeout, + QueryTimeout: timeout, + AnalyticsTimeout: timeout, + SearchTimeout: timeout, + ManagementTimeout: timeout, } if conf.Contains("username") { @@ -69,27 +67,23 @@ func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseCl } } - if conf.Contains("transcoder") { // TODO is this really needed with default - tr, err := conf.FieldString("transcoder") - if err != nil { - return nil, err - } - switch client.Transcoder(tr) { - case client.TranscoderJSON: - opts.Transcoder = gocb.NewJSONTranscoder() - case client.TranscoderRaw: - opts.Transcoder = gocb.NewRawBinaryTranscoder() - case client.TranscoderRawJSON: - opts.Transcoder = gocb.NewRawJSONTranscoder() - case client.TranscoderRawString: - opts.Transcoder = gocb.NewRawStringTranscoder() - case client.TranscoderLegacy: - opts.Transcoder = gocb.NewLegacyTranscoder() - default: - return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) // TODO is this really needed with enum - } - } else { + tr, err := conf.FieldString("transcoder") + if err != nil { + return nil, err + } + switch client.Transcoder(tr) { + case client.TranscoderJSON: + opts.Transcoder = gocb.NewJSONTranscoder() + case client.TranscoderRaw: + opts.Transcoder = gocb.NewRawBinaryTranscoder() + case client.TranscoderRawJSON: + opts.Transcoder = gocb.NewRawJSONTranscoder() + case client.TranscoderRawString: + opts.Transcoder = gocb.NewRawStringTranscoder() + case client.TranscoderLegacy: opts.Transcoder = gocb.NewLegacyTranscoder() + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) // TODO is this really needed with enum } cluster, err := gocb.Connect(url, opts) diff --git a/internal/impl/couchbase/client/docs.go b/internal/impl/couchbase/client/docs.go index 9c78901815..63d30099a5 100644 --- a/internal/impl/couchbase/client/docs.go +++ b/internal/impl/couchbase/client/docs.go @@ -20,5 +20,5 @@ func NewConfigSpec() *service.ConfigSpec { string(TranscoderJSON): "insert a new document.", string(TranscoderLegacy): "creates a new document if it does not exist, if it does exist then it updates it.", }).Description("Couchbase transcoder to use.").Default(string(TranscoderLegacy)).Advanced()). - Field(service.NewDurationField("timeout").Description("Operation timeout.").Advanced().Optional()) + Field(service.NewDurationField("timeout").Description("Operation timeout.").Advanced().Default("15s")) } diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 96038ffa37..20f0e2d40d 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -19,14 +19,14 @@ var ( ErrContentRequired = errors.New("content required") ) -func couchbaseProcessorConfig() *service.ConfigSpec { +func ProcessorConfig() *service.ConfigSpec { return client.NewConfigSpec(). // TODO Stable(). Version("4.10.0"). Categories("Integration"). Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). Description("When inserting, replacing or upserting documents, each must have the `content` property set."). - Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! meta("id") }`)). + Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)). Field(service.NewBloblangField("content").Description("Document content.").Optional()). Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ string(client.OperationGet): "fetch a document.", @@ -39,7 +39,7 @@ func couchbaseProcessorConfig() *service.ConfigSpec { } func init() { - err := service.RegisterBatchProcessor("couchbase", couchbaseProcessorConfig(), + err := service.RegisterBatchProcessor("couchbase", ProcessorConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { return NewProcessor(conf, mgr) }, @@ -80,36 +80,32 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo } } - if conf.Contains("operation") { // TODO is this really needed with default - op, err := conf.FieldString("operation") - if err != nil { - return nil, err + op, err := conf.FieldString("operation") + if err != nil { + return nil, err + } + switch client.Operation(op) { + case client.OperationGet: + p.op = get + case client.OperationRemove: + p.op = remove + case client.OperationInsert: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint } - switch client.Operation(op) { - case client.OperationGet: - p.op = get - case client.OperationRemove: - p.op = remove - case client.OperationInsert: - if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint - } - p.op = insert - case client.OperationReplace: - if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint - } - p.op = replace - case client.OperationUpsert: - if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint - } - p.op = upsert - default: - return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) // TODO is this really needed with enum + p.op = insert + case client.OperationReplace: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint } - } else { - p.op = get + p.op = replace + case client.OperationUpsert: + if p.content == nil { + return nil, ErrContentRequired // TODO is this really needed with lint + } + p.op = upsert + default: + return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) // TODO is this really needed with enum } return p, nil diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index 032a45dece..b56ce93364 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -1,481 +1,381 @@ package couchbase_test -/* +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/benthosdev/benthos/v4/internal/impl/couchbase" + "github.com/benthosdev/benthos/v4/internal/integration" + "github.com/benthosdev/benthos/v4/public/service" + "github.com/bxcodec/faker/v3" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessorConfigLinting(t *testing.T) { + configTests := []struct { + name string + config string + errContains string + }{ + { + name: "get content not required", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'get' +`, + }, + { + name: "remove content not required", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'remove' +`, + }, + { + name: "missing insert content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'insert' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "missing replace content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'replace' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "missing upsert content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + operation: 'upsert' +`, + errContains: `content must be set for insert, replace and upsert operations.`, + }, + { + name: "insert with content", + config: ` +couchbase: + url: 'url' + bucket: 'bucket' + id: '${! json("id") }' + content: 'root = this' + operation: 'insert' +`, + }, + } + + env := service.NewEnvironment() + for _, test := range configTests { + t.Run(test.name, func(t *testing.T) { + strm := env.NewStreamBuilder() + err := strm.AddProcessorYAML(test.config) + if test.errContains == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + func TestProcessorIntegration(t *testing.T) { integration.CheckSkip(t) - if testing.Short() { - t.Skip("Skipping integration test in short mode") + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = 30 * time.Second + if deadline, ok := t.Deadline(); ok { + pool.MaxWait = time.Until(deadline) - 100*time.Millisecond } - pool, err := dockertest.NewPool("") + pwd, err := os.Getwd() if err != nil { - t.Skipf("Could not connect to docker: %s", err) + t.Fatalf("failed to get working directory: %s", err) } resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "mongo", + Repository: "couchbase", Tag: "latest", + Cmd: []string{"/opt/couchbase/configure-server.sh"}, Env: []string{ - "MONGO_INITDB_ROOT_USERNAME=mongoadmin", - "MONGO_INITDB_ROOT_PASSWORD=secret", + "CLUSTER_NAME=couchbase", + "COUCHBASE_ADMINISTRATOR_USERNAME=benthos", + "COUCHBASE_ADMINISTRATOR_PASSWORD=password", + }, + Mounts: []string{ + fmt.Sprintf("%s/testdata/configure-server.sh:/opt/couchbase/configure-server.sh", pwd), + }, + PortBindings: map[docker.Port][]docker.PortBinding{ + "8091/tcp": { + { + HostIP: "0.0.0.0", HostPort: "8091", + }, + }, + "11210/tcp": { + { + HostIP: "0.0.0.0", HostPort: "11210", + }, + }, }, - ExposedPorts: []string{"27017"}, }) require.NoError(t, err) - t.Cleanup(func() { assert.NoError(t, pool.Purge(resource)) }) - var mongoClient *mongo.Client - require.NoError(t, pool.Retry(func() error { - url := "mongodb://localhost:" + resource.GetPort("27017/tcp") - conf := client.NewConfig() - conf.URL = url - conf.Username = "mongoadmin" - conf.Password = "secret" - - if mongoClient == nil { - mongoClient, err = conf.Client() - if err != nil { - return err - } + _ = resource.Expire(900) + + // Look for readyness + var stderr bytes.Buffer + time.Sleep(15 * time.Second) + for { + time.Sleep(time.Second) + exitCode, err := resource.Exec([]string{"/usr/bin/cat", "/is-ready"}, dockertest.ExecOptions{ + StdErr: &stderr, // without stderr exit code is not reported + }) + if exitCode == 0 && err == nil { + break } + t.Logf("exit code: %d, err: %s", exitCode, err) + errB, err := io.ReadAll(&stderr) + require.NoError(t, err) + t.Logf("stderr: %s", string(errB)) + } - if err := mongoClient.Connect(context.Background()); err != nil { - return err - } + t.Logf("couchbase cluster is ready") + + port := resource.GetPort("11210/tcp") + require.NotEmpty(t, port) - return mongoClient.Database("TestDB").CreateCollection(context.Background(), "TestCollection") - })) + uid := faker.UUIDHyphenated() + payload := fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) - port := resource.GetPort("27017/tcp") - t.Run("insert", func(t *testing.T) { - testMongoDBProcessorInsert(port, t) + t.Run("Insert", func(t *testing.T) { + testCouchbaseProcessorInsert(uid, payload, port, t) }) - t.Run("delete one", func(t *testing.T) { - testMongoDBProcessorDeleteOne(port, t) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) }) - t.Run("delete many", func(t *testing.T) { - testMongoDBProcessorDeleteMany(port, t) + t.Run("Remove", func(t *testing.T) { + testCouchbaseProcessorRemove(uid, port, t) }) - t.Run("replace one", func(t *testing.T) { - testMongoDBProcessorReplaceOne(port, t) + t.Run("GetMissing", func(t *testing.T) { + testCouchbaseProcessorGetMissing(uid, port, t) }) - t.Run("update one", func(t *testing.T) { - testMongoDBProcessorUpdateOne(port, t) + + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Upsert", func(t *testing.T) { + testCouchbaseProcessorUpsert(uid, payload, port, t) }) - t.Run("find one", func(t *testing.T) { - testMongoDBProcessorFindOne(port, t) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) }) -} - -func testMongoDBProcessorInsert(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - mongoConfig := processor.MongoDBConfig{ - MongoDB: c, - WriteConcern: client.WriteConcern{ - W: "1", - J: false, - WTimeout: "", - }, - Operation: "insert-one", - DocumentMap: "root.a = this.foo\nroot.b = this.bar", - } - - conf.MongoDB = mongoConfig - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Replace", func(t *testing.T) { + testCouchbaseProcessorReplace(uid, payload, port, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, port, t) + }) +} - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) +func getProc(tb testing.TB, config string) *couchbase.Processor { + tb.Helper() - parts := [][]byte{ - []byte(`{"foo":"foo1","bar":"bar1"}`), - []byte(`{"foo":"foo2","bar":"bar2"}`), - } + confSpec := couchbase.ProcessorConfig() + env := service.NewEnvironment() - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) - require.Nil(t, response) - require.Len(t, resMsgs, 1) + pConf, err := confSpec.ParseYAML(config, env) + require.NoError(tb, err) + proc, err := couchbase.NewProcessor(pConf, service.MockResources()) + require.NoError(tb, err) + require.NotNil(tb, proc) - expectedResult := [][]byte{ - []byte(`{"foo":"foo1","bar":"bar1"}`), - []byte(`{"foo":"foo2","bar":"bar2"}`), - } - - assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) + return proc +} - // Validate the record is in the MongoDB - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") +func testCouchbaseProcessorInsert(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'insert' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) - result := collection.FindOne(context.Background(), bson.M{"a": "foo1", "b": "bar1"}) - b, err := result.DecodeBytes() + // batch processing should be fine and contain one message. assert.NoError(t, err) - aVal := b.Lookup("a") - bVal := b.Lookup("b") - assert.Equal(t, `"foo1"`, aVal.String()) - assert.Equal(t, `"bar1"`, bVal.String()) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - result = collection.FindOne(context.Background(), bson.M{"a": "foo2", "b": "bar2"}) - b, err = result.DecodeBytes() + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) - aVal = b.Lookup("a") - bVal = b.Lookup("b") - assert.Equal(t, `"foo2"`, aVal.String()) - assert.Equal(t, `"bar2"`, bVal.String()) + assert.JSONEq(t, payload, string(dataOut)) } -func testMongoDBProcessorDeleteOne(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - mongoConfig := processor.MongoDBConfig{ - MongoDB: c, - WriteConcern: client.WriteConcern{ - W: "1", - J: false, - WTimeout: "100s", - }, - Operation: "delete-one", - FilterMap: "root.a = this.foo\nroot.b = this.bar", - } +func testCouchbaseProcessorUpsert(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'upsert' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete", "b": "bar_delete"}) + // batch processing should be fine and contain one message. assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) - - conf.MongoDB = mongoConfig - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) - - parts := [][]byte{ - []byte(`{"foo":"foo_delete","bar":"bar_delete"}`), - } - - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) - require.Nil(t, response) - require.Len(t, resMsgs, 1) - - expectedResult := [][]byte{ - []byte(`{"foo":"foo_delete","bar":"bar_delete"}`), - } - - assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) - - // Validate the record has been deleted from the db - result := collection.FindOne(context.Background(), bson.M{"a": "foo_delete", "b": "bar_delete"}) - b, err := result.DecodeBytes() - assert.Nil(t, b) - assert.Error(t, err, "mongo: no documents in result") + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) } -func testMongoDBProcessorDeleteMany(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - mongoConfig := processor.MongoDBConfig{ - MongoDB: c, - WriteConcern: client.WriteConcern{ - W: "1", - J: false, - WTimeout: "100s", - }, - Operation: "delete-many", - FilterMap: "root.a = this.foo\nroot.b = this.bar", - } +func testCouchbaseProcessorReplace(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! json("id") }' +content: 'root = this' +operation: 'replace' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(payload)), + }) - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many", "c": "c1"}) - assert.NoError(t, err) - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many", "c": "c2"}) + // batch processing should be fine and contain one message. assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) - - conf.MongoDB = mongoConfig - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) - - parts := [][]byte{ - []byte(`{"foo":"foo_delete_many","bar":"bar_delete_many"}`), - } - - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) - require.Nil(t, response) - require.Len(t, resMsgs, 1) - - expectedResult := [][]byte{ - []byte(`{"foo":"foo_delete_many","bar":"bar_delete_many"}`), - } - assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) - - // Validate the record has been deleted from the db - result := collection.FindOne(context.Background(), bson.M{"a": "foo_delete_many", "b": "bar_delete_many"}) - b, err := result.DecodeBytes() - assert.Nil(t, b) - assert.Error(t, err, "mongo: no documents in result") + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.JSONEq(t, payload, string(dataOut)) } -func testMongoDBProcessorReplaceOne(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - mongoConfig := processor.MongoDBConfig{ - MongoDB: c, - WriteConcern: client.WriteConcern{ - W: "1", - J: false, - WTimeout: "", - }, - Operation: "replace-one", - DocumentMap: "root.a = this.foo\nroot.b = this.bar", - FilterMap: "root.a = this.foo", - } +func testCouchbaseProcessorGet(uid, payload, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'get' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_replace", "b": "bar_old", "c": "c1"}) + // batch processing should be fine and contain one message. assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) - - conf.MongoDB = mongoConfig - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) - - parts := [][]byte{ - []byte(`{"foo":"foo_replace","bar":"bar_new"}`), - } - - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) - require.Nil(t, response) - require.Len(t, resMsgs, 1) - - expectedResult := [][]byte{ - []byte(`{"foo":"foo_replace","bar":"bar_new"}`), - } - assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) - - // Validate the record has been updated in the db - result := collection.FindOne(context.Background(), bson.M{"a": "foo_replace", "b": "bar_new"}) - b, err := result.DecodeBytes() + // message should contain expected payload. + dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) - aVal := b.Lookup("a") - bVal := b.Lookup("b") - cVal := b.Lookup("c") - assert.Equal(t, `"foo_replace"`, aVal.String()) - assert.Equal(t, `"bar_new"`, bVal.String()) - assert.Equal(t, bson.RawValue{}, cVal) + assert.JSONEq(t, payload, string(dataOut)) } -func testMongoDBProcessorUpdateOne(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - mongoConfig := processor.MongoDBConfig{ - MongoDB: c, - WriteConcern: client.WriteConcern{ - W: "1", - J: false, - WTimeout: "100s", - }, - Operation: "update-one", - DocumentMap: `root = {"$set": {"a": this.foo, "b": this.bar}}`, - FilterMap: "root.a = this.foo", - } +func testCouchbaseProcessorRemove(uid, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'remove' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo_update", "b": "bar_update_old", "c": "c1"}) + // batch processing should be fine and contain one message. assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) - - conf.MongoDB = mongoConfig - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) - - parts := [][]byte{ - []byte(`{"foo":"foo_update","bar":"bar_update_new"}`), - } - - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, len(parts)), message.QuickBatch(parts)) - require.Nil(t, response) - require.Len(t, resMsgs, 1) - - expectedResult := [][]byte{ - []byte(`{"foo":"foo_update","bar":"bar_update_new"}`), - } - assert.Equal(t, expectedResult, message.GetAllBytes(resMsgs[0])) - - // Validate the record has been updated in the db - result := collection.FindOne(context.Background(), bson.M{"a": "foo_update", "b": "bar_update_new"}) - b, err := result.DecodeBytes() + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) - aVal := b.Lookup("a") - bVal := b.Lookup("b") - cVal := b.Lookup("c") - assert.Equal(t, `"foo_update"`, aVal.String()) - assert.Equal(t, `"bar_update_new"`, bVal.String()) - assert.Equal(t, `"c1"`, cVal.String()) + assert.Equal(t, uid, string(dataOut)) } -func testMongoDBProcessorFindOne(port string, t *testing.T) { - conf := processor.NewConfig() - conf.Type = "mongodb" - - c := client.Config{ - URL: "mongodb://localhost:" + port, - Database: "TestDB", - Collection: "TestCollection", - Username: "mongoadmin", - Password: "secret", - } - - conf.MongoDB = processor.NewMongoDBConfig() - conf.MongoDB.MongoDB = c - conf.MongoDB.WriteConcern = client.WriteConcern{ - W: "1", - J: false, - WTimeout: "100s", - } - conf.MongoDB.Operation = "find-one" - conf.MongoDB.FilterMap = "root.a = this.a" +func testCouchbaseProcessorGetMissing(uid, port string, t *testing.T) { + config := fmt.Sprintf(` +url: 'couchbase://localhost:%s' +bucket: 'testing' +username: benthos +password: password +id: '${! content() }' +operation: 'get' +`, port) + + msgOut, err := getProc(t, config).ProcessBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(uid)), + }) - mongoClient, err := c.Client() - require.NoError(t, err) - err = mongoClient.Connect(context.Background()) - require.NoError(t, err) - collection := mongoClient.Database("TestDB").Collection("TestCollection") - _, err = collection.InsertOne(context.Background(), bson.M{"a": "foo", "b": "bar", "c": "baz", "answer_to_everything": 42}) + // batch processing should be fine and contain one message. assert.NoError(t, err) + assert.Len(t, msgOut, 1) + assert.Len(t, msgOut[0], 1) - mgr, err := manager.New(manager.NewResourceConfig()) - require.NoError(t, err) + // message should contain an error. + assert.Error(t, msgOut[0][0].GetError(), "TODO") - for _, tt := range []struct { - name string - message string - marshalMode client.JSONMarshalMode - collection string - expected string - expectedErr error - }{ - { - name: "canonical marshal mode", - marshalMode: client.JSONMarshalModeCanonical, - message: `{"a":"foo","x":"ignore_me_via_filter_map"}`, - expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":{"$numberInt":"42"}}`, - }, - { - name: "relaxed marshal mode", - marshalMode: client.JSONMarshalModeRelaxed, - message: `{"a":"foo","x":"ignore_me_via_filter_map"}`, - expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":42}`, - }, - { - name: "no documents found", - message: `{"a":"notfound"}`, - expectedErr: mongo.ErrNoDocuments, - }, - { - name: "collection interpolation", - marshalMode: client.JSONMarshalModeCanonical, - collection: `${!json("col")}`, - message: `{"col":"TestCollection","a":"foo"}`, - expected: `{"a":"foo","b":"bar","c":"baz","answer_to_everything":{"$numberInt":"42"}}`, - }, - } { - if tt.collection != "" { - conf.MongoDB.MongoDB.Collection = tt.collection - } - - conf.MongoDB.JSONMarshalMode = tt.marshalMode - - m, err := mongodb.NewProcessor(conf, mgr) - require.NoError(t, err) - resMsgs, response := m.ProcessBatch(context.Background(), make([]*tracing.Span, 1), message.QuickBatch([][]byte{[]byte(tt.message)})) - require.Nil(t, response) - require.Len(t, resMsgs, 1) - if tt.expectedErr != nil { - tmpErr := resMsgs[0].Get(0).ErrorGet() - require.Error(t, tmpErr) - require.Equal(t, mongo.ErrNoDocuments.Error(), tmpErr.Error()) - continue - } - - jdopts := jsondiff.DefaultJSONOptions() - diff, explanation := jsondiff.Compare(resMsgs[0].Get(0).AsBytes(), []byte(tt.expected), &jdopts) - assert.Equalf(t, jsondiff.SupersetMatch.String(), diff.String(), "%s: %s", tt.name, explanation) - } + // message content should stay the same. + dataOut, err := msgOut[0][0].AsBytes() + assert.NoError(t, err) + assert.Equal(t, uid, string(dataOut)) } -*/ diff --git a/internal/impl/couchbase/testdata/configure-server.sh b/internal/impl/couchbase/testdata/configure-server.sh new file mode 100755 index 0000000000..4c695a6d84 --- /dev/null +++ b/internal/impl/couchbase/testdata/configure-server.sh @@ -0,0 +1,30 @@ +#!bin/bash + +set -m + +/entrypoint.sh couchbase-server & + +sleep 8 + +# Setup initial cluster/ Initialize Node +couchbase-cli cluster-init -c 127.0.0.1 --cluster-name $CLUSTER_NAME --cluster-username $COUCHBASE_ADMINISTRATOR_USERNAME \ + --cluster-password $COUCHBASE_ADMINISTRATOR_PASSWORD --services data --cluster-ramsize 1024 + +sleep 5 + +# Setup Administrator username and password +curl -s http://127.0.0.1:8091/settings/web -d port=8091 -d username=$COUCHBASE_ADMINISTRATOR_USERNAME -d password=$COUCHBASE_ADMINISTRATOR_PASSWORD + +# Setup buckets +for BUCKET_NAME in testing +do + couchbase-cli bucket-create -c 127.0.0.1:8091 --username $COUCHBASE_ADMINISTRATOR_USERNAME \ + --password $COUCHBASE_ADMINISTRATOR_PASSWORD --bucket $BUCKET_NAME --bucket-type couchbase \ + --bucket-ramsize 128 + + sleep 5 +done + +touch /is-ready + +fg 1 From a7c9512db6558d753d31ec70d2d7d92ed2154562 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Sat, 3 Dec 2022 21:01:21 +0100 Subject: [PATCH 3/5] cleanup --- internal/impl/couchbase/client.go | 6 +++--- internal/impl/couchbase/processor.go | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/impl/couchbase/client.go b/internal/impl/couchbase/client.go index 6fe5790850..eef35e0c56 100644 --- a/internal/impl/couchbase/client.go +++ b/internal/impl/couchbase/client.go @@ -37,8 +37,8 @@ func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseCl // setup couchbase opts := gocb.ClusterOptions{ - // TODO Tracer: mgr.OtelTracer().Tracer(name).Start(context.Background(), operationName) - // TODO Meter: mgr.Metrics(), + // TODO add opentracing Tracer: + // TODO add metrics Meter: } opts.TimeoutsConfig = gocb.TimeoutsConfig{ @@ -83,7 +83,7 @@ func getClient(conf *service.ParsedConfig, mgr *service.Resources) (*couchbaseCl case client.TranscoderLegacy: opts.Transcoder = gocb.NewLegacyTranscoder() default: - return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) // TODO is this really needed with enum + return nil, fmt.Errorf("%w: %s", ErrInvalidTranscoder, tr) } cluster, err := gocb.Connect(url, opts) diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 20f0e2d40d..28a340247b 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -19,6 +19,7 @@ var ( ErrContentRequired = errors.New("content required") ) +// ProcessorConfig export couchbase processor specification. func ProcessorConfig() *service.ConfigSpec { return client.NewConfigSpec(). // TODO Stable(). @@ -91,21 +92,21 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo p.op = remove case client.OperationInsert: if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint + return nil, ErrContentRequired } p.op = insert case client.OperationReplace: if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint + return nil, ErrContentRequired } p.op = replace case client.OperationUpsert: if p.content == nil { - return nil, ErrContentRequired // TODO is this really needed with lint + return nil, ErrContentRequired } p.op = upsert default: - return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) // TODO is this really needed with enum + return nil, fmt.Errorf("%w: %s", ErrInvalidOperation, op) } return p, nil From ff589b36359d20a037f7c6109bbc4c36dca92dc8 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Sat, 3 Dec 2022 21:02:19 +0100 Subject: [PATCH 4/5] update docs --- website/docs/components/processors/couchbase.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/website/docs/components/processors/couchbase.md b/website/docs/components/processors/couchbase.md index d0c6613b46..cbeb51071d 100644 --- a/website/docs/components/processors/couchbase.md +++ b/website/docs/components/processors/couchbase.md @@ -56,7 +56,7 @@ couchbase: bucket: "" collection: _default transcoder: legacy - timeout: "" + timeout: 15s id: "" content: "" operation: get @@ -134,6 +134,7 @@ Operation timeout. Type: `string` +Default: `"15s"` ### `id` @@ -146,7 +147,7 @@ Type: `string` ```yml # Examples -id: ${! meta("id") } +id: ${! json("id") } ``` ### `content` From a5aee6c0c69d6340b7756c1bf55c06048c090371 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Wed, 7 Dec 2022 00:34:54 +0100 Subject: [PATCH 5/5] fix transcoder docs --- internal/impl/couchbase/client/docs.go | 10 +++++----- website/docs/components/processors/couchbase.md | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/impl/couchbase/client/docs.go b/internal/impl/couchbase/client/docs.go index 63d30099a5..eb3575c6f0 100644 --- a/internal/impl/couchbase/client/docs.go +++ b/internal/impl/couchbase/client/docs.go @@ -14,11 +14,11 @@ func NewConfigSpec() *service.ConfigSpec { Field(service.NewStringField("bucket").Description("Couchbase bucket.")). Field(service.NewStringField("collection").Description("Bucket collection.").Default("_default").Advanced().Optional()). Field(service.NewStringAnnotatedEnumField("transcoder", map[string]string{ - string(TranscoderRaw): "fetch a document.", - string(TranscoderRawJSON): "delete a document.", - string(TranscoderRawString): "replace the contents of a document.", - string(TranscoderJSON): "insert a new document.", - string(TranscoderLegacy): "creates a new document if it does not exist, if it does exist then it updates it.", + string(TranscoderRaw): `RawBinaryTranscoder implements passthrough behavior of raw binary data. This transcoder does not apply any serialization. This will apply the following behavior to the value: binary ([]byte) -> binary bytes, binary expectedFlags. default -> error.`, + string(TranscoderRawJSON): `RawJSONTranscoder implements passthrough behavior of JSON data. This transcoder does not apply any serialization. It will forward data across the network without incurring unnecessary parsing costs. This will apply the following behavior to the value: binary ([]byte) -> JSON bytes, JSON expectedFlags. string -> JSON bytes, JSON expectedFlags. default -> error.`, + string(TranscoderRawString): `RawStringTranscoder implements passthrough behavior of raw string data. This transcoder does not apply any serialization. This will apply the following behavior to the value: string -> string bytes, string expectedFlags. default -> error.`, + string(TranscoderJSON): `JSONTranscoder implements the default transcoding behavior and applies JSON transcoding to all values. This will apply the following behavior to the value: binary ([]byte) -> error. default -> JSON value, JSON Flags.`, + string(TranscoderLegacy): `LegacyTranscoder implements the behaviour for a backward-compatible transcoder. This transcoder implements behaviour matching that of gocb v1.This will apply the following behavior to the value: binary ([]byte) -> binary bytes, Binary expectedFlags. string -> string bytes, String expectedFlags. default -> JSON value, JSON expectedFlags.`, }).Description("Couchbase transcoder to use.").Default(string(TranscoderLegacy)).Advanced()). Field(service.NewDurationField("timeout").Description("Operation timeout.").Advanced().Default("15s")) } diff --git a/website/docs/components/processors/couchbase.md b/website/docs/components/processors/couchbase.md index cbeb51071d..99bd30d0c4 100644 --- a/website/docs/components/processors/couchbase.md +++ b/website/docs/components/processors/couchbase.md @@ -121,11 +121,11 @@ Default: `"legacy"` | Option | Summary | |---|---| -| `json` | insert a new document. | -| `legacy` | creates a new document if it does not exist, if it does exist then it updates it. | -| `raw` | fetch a document. | -| `rawjson` | delete a document. | -| `rawstring` | replace the contents of a document. | +| `json` | JSONTranscoder implements the default transcoding behavior and applies JSON transcoding to all values. This will apply the following behavior to the value: binary ([]byte) -> error. default -> JSON value, JSON Flags. | +| `legacy` | LegacyTranscoder implements the behaviour for a backward-compatible transcoder. This transcoder implements behaviour matching that of gocb v1.This will apply the following behavior to the value: binary ([]byte) -> binary bytes, Binary expectedFlags. string -> string bytes, String expectedFlags. default -> JSON value, JSON expectedFlags. | +| `raw` | RawBinaryTranscoder implements passthrough behavior of raw binary data. This transcoder does not apply any serialization. This will apply the following behavior to the value: binary ([]byte) -> binary bytes, binary expectedFlags. default -> error. | +| `rawjson` | RawJSONTranscoder implements passthrough behavior of JSON data. This transcoder does not apply any serialization. It will forward data across the network without incurring unnecessary parsing costs. This will apply the following behavior to the value: binary ([]byte) -> JSON bytes, JSON expectedFlags. string -> JSON bytes, JSON expectedFlags. default -> error. | +| `rawstring` | RawStringTranscoder implements passthrough behavior of raw string data. This transcoder does not apply any serialization. This will apply the following behavior to the value: string -> string bytes, string expectedFlags. default -> error. | ### `timeout`