From f1c58bade518627f748aacc14865b3172bafec64 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Tue, 9 Nov 2021 13:29:03 +0100 Subject: [PATCH] feature: Postgres support --- Makefile | 6 +- docker-compose.yml | 10 + eventstore/postgres/eventmaintenance.go.off | 133 ++++++ .../postgres/eventmaintenance_test.go.off | 57 +++ eventstore/postgres/eventstore.go | 432 ++++++++++++++++++ eventstore/postgres/eventstore_test.go | 201 ++++++++ examples/guestlist/postgres/postgres.go | 16 + examples/guestlist/postgres/postgres_test.go | 248 ++++++++++ go.mod | 3 + go.sum | 23 +- repo/postrges/repo.go | 377 +++++++++++++++ repo/postrges/repo_test.go | 233 ++++++++++ 12 files changed, 1736 insertions(+), 3 deletions(-) create mode 100644 eventstore/postgres/eventmaintenance.go.off create mode 100644 eventstore/postgres/eventmaintenance_test.go.off create mode 100644 eventstore/postgres/eventstore.go create mode 100644 eventstore/postgres/eventstore_test.go create mode 100644 examples/guestlist/postgres/postgres.go create mode 100644 examples/guestlist/postgres/postgres_test.go create mode 100644 repo/postrges/repo.go create mode 100644 repo/postrges/repo_test.go diff --git a/Makefile b/Makefile index ec6dcd90..41dc2db1 100644 --- a/Makefile +++ b/Makefile @@ -42,12 +42,16 @@ upload_coverage: .PHONY: run run: - docker-compose up -d mongodb gpubsub kafka redis nats + docker-compose up -d mongodb postgres gpubsub kafka redis nats .PHONY: run_mongodb run_mongodb: docker-compose up -d mongodb +.PHONY: run_postgres +run_postgres: + docker-compose up -d postgres + .PHONY: run_gpubsub run_gpubsub: docker-compose up -d gpubsub diff --git a/docker-compose.yml b/docker-compose.yml index c4020d79..94403b3d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,6 +43,16 @@ services: volumes: - mongodb:/data/db + postgres: + image: postgres:14 + ports: + - "5432:5432" + environment: + POSTGRES_PASSWORD: "password" + # PGDATA: "/data/postgres" + # volumes: + # - postgres:/data/postgres + gpubsub: image: gcr.io/google.com/cloudsdktool/cloud-sdk:355.0.0-emulators ports: diff --git a/eventstore/postgres/eventmaintenance.go.off b/eventstore/postgres/eventmaintenance.go.off new file mode 100644 index 00000000..9b094951 --- /dev/null +++ b/eventstore/postgres/eventmaintenance.go.off @@ -0,0 +1,133 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + + // Register uuid.UUID as BSON type. + _ "github.com/looplab/eventhorizon/codec/bson" + + eh "github.com/looplab/eventhorizon" +) + +// Replace implements the Replace method of the eventhorizon.EventStore interface. +func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { + sess, err := s.client.StartSession(nil) + if err != nil { + return eh.EventStoreError{ + Err: eh.ErrCouldNotSaveEvents, + BaseErr: err, + } + } + defer sess.EndSession(ctx) + + if _, err := sess.WithTransaction(ctx, func(txCtx mongo.SessionContext) (interface{}, error) { + // First check if the aggregate exists, the not found error in the update + // query can mean both that the aggregate or the event is not found. + if n, err := s.events.CountDocuments(ctx, + bson.M{"aggregate_id": event.AggregateID()}); n == 0 { + return nil, eh.ErrAggregateNotFound + } else if err != nil { + return nil, err + } + + // Create the event record for the Database. + e, err := newEvt(ctx, event) + if err != nil { + return nil, err + } + + // Copy the event position from the old event (and set in metadata). + res := s.events.FindOne(ctx, bson.M{ + "aggregate_id": event.AggregateID(), + "version": event.Version(), + }) + if res.Err() != nil { + if res.Err() == mongo.ErrNoDocuments { + return nil, eh.ErrInvalidEvent + } + return nil, fmt.Errorf("could not find event to replace: %w", res.Err()) + } + var eventToReplace evt + if err := res.Decode(&eventToReplace); err != nil { + return nil, fmt.Errorf("could not decode event to replace: %w", err) + } + e.Position = eventToReplace.Position + e.Metadata["position"] = eventToReplace.Position + + // Find and replace the event. + if r, err := s.events.ReplaceOne(ctx, bson.M{ + "aggregate_id": event.AggregateID(), + "version": event.Version(), + }, e); err != nil { + return nil, err + } else if r.MatchedCount == 0 { + return nil, eh.ErrInvalidEvent + } + return nil, nil + }); err != nil { + // Return some errors intact. + if err == eh.ErrAggregateNotFound || err == eh.ErrInvalidEvent { + return err + } + return eh.EventStoreError{ + Err: eh.ErrCouldNotSaveEvents, + BaseErr: err, + } + } + + return nil +} + +// RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface. +func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error { + // Find and rename all events. + // TODO: Maybe use change info. + if _, err := s.events.UpdateMany(ctx, + bson.M{ + "event_type": from.String(), + }, + bson.M{ + "$set": bson.M{"event_type": to.String()}, + }, + ); err != nil { + return eh.EventStoreError{ + Err: eh.ErrCouldNotSaveEvents, + BaseErr: err, + } + } + + return nil +} + +// Clear clears the event storage. +func (s *EventStore) Clear(ctx context.Context) error { + if err := s.events.Drop(ctx); err != nil { + return eh.EventStoreError{ + Err: fmt.Errorf("could not clear events collection: %w", err), + } + } + if err := s.streams.Drop(ctx); err != nil { + return eh.EventStoreError{ + Err: fmt.Errorf("could not clear streams collection: %w", err), + } + } + return nil +} diff --git a/eventstore/postgres/eventmaintenance_test.go.off b/eventstore/postgres/eventmaintenance_test.go.off new file mode 100644 index 00000000..27392c66 --- /dev/null +++ b/eventstore/postgres/eventmaintenance_test.go.off @@ -0,0 +1,57 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "crypto/rand" + "encoding/hex" + "os" + "testing" + + "github.com/looplab/eventhorizon/eventstore" +) + +func TestEventStoreMaintenanceIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Use MongoDB in Docker with fallback to localhost. + addr := os.Getenv("MONGODB_ADDR") + if addr == "" { + addr = "localhost:27017" + } + url := "mongodb://" + addr + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + db := "test-" + hex.EncodeToString(b) + t.Log("using DB:", db) + + store, err := NewEventStore(url, db) + if err != nil { + t.Fatal("there should be no error:", err) + } + if store == nil { + t.Fatal("there should be a store") + } + defer store.Close(context.Background()) + + eventstore.MaintenanceAcceptanceTest(t, store, store, context.Background()) +} diff --git a/eventstore/postgres/eventstore.go b/eventstore/postgres/eventstore.go new file mode 100644 index 00000000..12983ad3 --- /dev/null +++ b/eventstore/postgres/eventstore.go @@ -0,0 +1,432 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect/pgdialect" + "github.com/uptrace/bun/driver/pgdriver" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" +) + +// EventStore is an eventhorizon.EventStore for PostgreSQL, using one collection +// for all events and another to keep track of all aggregates/streams. It also +// keep tracks of the global position of events, stored as metadata. +type EventStore struct { + db *bun.DB + eventHandlerAfterSave eh.EventHandler + eventHandlerInTX eh.EventHandler +} + +// NewEventStore creates a new EventStore with a Postgres URI: +// `postgres://user:password@hostname:port/db?options` +func NewEventStore(uri string, options ...Option) (*EventStore, error) { + sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(uri))) + db := bun.NewDB(sqldb, pgdialect.New()) + + // NOTE: For debug logging only. + // db.AddQueryHook(bundebug.NewQueryHook(bundebug.WithVerbose())) + + return NewEventStoreWithDB(db, options...) +} + +// NewEventStoreWithDB creates a new EventStore with a DB. +func NewEventStoreWithDB(db *bun.DB, options ...Option) (*EventStore, error) { + if db == nil { + return nil, fmt.Errorf("missing DB") + } + + s := &EventStore{ + db: db, + } + + for _, option := range options { + if err := option(s); err != nil { + return nil, fmt.Errorf("error while applying option: %w", err) + } + } + + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("could not connect to DB: %w", err) + } + + // Add the UUID extention. + if _, err := db.Exec(`CREATE EXTENSION IF NOT EXISTS "uuid-ossp"`); err != nil { + return nil, fmt.Errorf("could not add the UUID extention: %w", err) + } + + // Make sure event tables exists. + db.RegisterModel((*stream)(nil)) + db.RegisterModel((*evt)(nil)) + ctx := context.Background() + if _, err := db.NewCreateTable(). + Model((*stream)(nil)). + IfNotExists(). + Exec(ctx); err != nil { + return nil, fmt.Errorf("could not create stream table: %w", err) + } + if _, err := db.NewCreateTable(). + Model((*evt)(nil)). + IfNotExists(). + Exec(ctx); err != nil { + return nil, fmt.Errorf("could not create event table: %w", err) + } + + return s, nil +} + +// Option is an option setter used to configure creation. +type Option func(*EventStore) error + +// WithEventHandler adds an event handler that will be called after saving events. +// An example would be to add an event bus to publish events. +func WithEventHandler(h eh.EventHandler) Option { + return func(s *EventStore) error { + if s.eventHandlerAfterSave != nil { + return fmt.Errorf("another event handler is already set") + } + + if s.eventHandlerInTX != nil { + return fmt.Errorf("another TX event handler is already set") + } + + s.eventHandlerAfterSave = h + + return nil + } +} + +// WithEventHandlerInTX adds an event handler that will be called during saving of +// events. An example would be to add an outbox to further process events. +// For an outbox to be atomic it needs to use the same transaction as the save +// operation, which is passed down using the context. +func WithEventHandlerInTX(h eh.EventHandler) Option { + return func(s *EventStore) error { + if s.eventHandlerAfterSave != nil { + return fmt.Errorf("another event handler is already set") + } + + if s.eventHandlerInTX != nil { + return fmt.Errorf("another TX event handler is already set") + } + + s.eventHandlerInTX = h + + return nil + } +} + +// Save implements the Save method of the eventhorizon.EventStore interface. +func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error { + if len(events) == 0 { + return &eh.EventStoreError{ + Err: eh.ErrMissingEvents, + Op: eh.EventStoreOpSave, + } + } + + dbEvents := make([]*evt, len(events)) + id := events[0].AggregateID() + at := events[0].AggregateType() + + // Build all event records, with incrementing versions starting from the + // original aggregate version. + for i, event := range events { + // Only accept events belonging to the same aggregate. + if event.AggregateID() != id { + return &eh.EventStoreError{ + Err: eh.ErrMismatchedEventAggregateIDs, + Op: eh.EventStoreOpSave, + AggregateType: at, + AggregateID: id, + AggregateVersion: originalVersion, + Events: events, + } + } + + if event.AggregateType() != at { + return &eh.EventStoreError{ + Err: eh.ErrMismatchedEventAggregateTypes, + Op: eh.EventStoreOpSave, + AggregateType: at, + AggregateID: id, + AggregateVersion: originalVersion, + Events: events, + } + } + + // Only accept events that apply to the correct aggregate version. + if event.Version() != originalVersion+i+1 { + return &eh.EventStoreError{ + Err: eh.ErrIncorrectEventVersion, + Op: eh.EventStoreOpSave, + AggregateType: at, + AggregateID: id, + AggregateVersion: originalVersion, + Events: events, + } + } + + // Create the event record for the DB. + e, err := newEvt(ctx, event) + if err != nil { + return &eh.EventStoreError{ + Err: fmt.Errorf("could not copy event: %w", err), + Op: eh.EventStoreOpSave, + AggregateType: at, + AggregateID: id, + AggregateVersion: originalVersion, + Events: events, + } + } + + dbEvents[i] = e + } + + if err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + // Insert all the events, will auto-increment and return the position. + if _, err := tx.NewInsert(). + Model(&dbEvents). + Returning("position"). + Exec(ctx); err != nil { + return fmt.Errorf("could not insert events: %w", err) + } + + // Grab the last event to use for position, returned from the insert above. + lastEvent := dbEvents[len(dbEvents)-1] + + // Store the stream, based on the last event for position etc. + strm := &stream{ + ID: lastEvent.AggregateID, + Position: lastEvent.Position, + AggregateType: lastEvent.AggregateType, + Version: lastEvent.Version, + UpdatedAt: lastEvent.Timestamp, + } + if _, err := tx.NewInsert(). + Model(strm). + On("CONFLICT (id) DO UPDATE"). + Set("position = EXCLUDED.position"). + Set("version = EXCLUDED.version"). + Set("updated_at = EXCLUDED.updated_at"). + Exec(ctx); err != nil { + return fmt.Errorf("could not update stream: %w", err) + } + + // Store the position in the event metadata. + for _, event := range dbEvents { + event.Metadata["position"] = event.Position + } + if _, err := tx.NewUpdate(). + Model(&dbEvents). + Column("metadata"). + Bulk(). + Exec(ctx); err != nil { + return fmt.Errorf("could not update event metedata: %w", err) + } + + if s.eventHandlerInTX != nil { + for _, e := range events { + if err := s.eventHandlerInTX.HandleEvent(ctx, e); err != nil { + return fmt.Errorf("could not handle event in transaction: %w", err) + } + } + } + + return nil + }); err != nil { + return &eh.EventStoreError{ + Err: err, + Op: eh.EventStoreOpSave, + AggregateType: at, + AggregateID: id, + AggregateVersion: originalVersion, + Events: events, + } + } + + // Let the optional event handler handle the events. + if s.eventHandlerAfterSave != nil { + for _, e := range events { + if err := s.eventHandlerAfterSave.HandleEvent(ctx, e); err != nil { + return &eh.EventHandlerError{ + Err: err, + Event: e, + } + } + } + } + + return nil +} + +// Load implements the Load method of the eventhorizon.EventStore interface. +func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { + rows, err := s.db.NewSelect(). + Model((*evt)(nil)). + Where("aggregate_id = ?", id). + Rows(ctx) + if err != nil { + return nil, &eh.EventStoreError{ + Err: fmt.Errorf("could not find event: %w", err), + Op: eh.EventStoreOpLoad, + AggregateID: id, + } + } + defer rows.Close() + + var events []eh.Event + + for rows.Next() { + e := new(evt) + if err := s.db.ScanRow(ctx, rows, e); err != nil { + return nil, &eh.EventStoreError{ + Err: fmt.Errorf("could not scan event: %w", err), + Op: eh.EventStoreOpLoad, + AggregateID: id, + Events: events, + } + } + + // Create an event of the correct type and decode from raw BSON. + if len(e.RawData) > 0 { + var err error + if e.data, err = eh.CreateEventData(e.EventType); err != nil { + return nil, &eh.EventStoreError{ + Err: fmt.Errorf("could not create event data: %w", err), + Op: eh.EventStoreOpLoad, + AggregateType: e.AggregateType, + AggregateID: id, + AggregateVersion: e.Version, + Events: events, + } + } + + if err := json.Unmarshal(e.RawData, e.data); err != nil { + return nil, &eh.EventStoreError{ + Err: fmt.Errorf("could not unmarshal event data: %w", err), + Op: eh.EventStoreOpLoad, + AggregateType: e.AggregateType, + AggregateID: id, + AggregateVersion: e.Version, + Events: events, + } + } + + e.RawData = nil + } + + event := eh.NewEvent( + e.EventType, + e.data, + e.Timestamp, + eh.ForAggregate( + e.AggregateType, + e.AggregateID, + e.Version, + ), + eh.WithMetadata(e.Metadata), + ) + events = append(events, event) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error when scanning events: %w", err) + } + + if len(events) == 0 { + return nil, &eh.EventStoreError{ + Err: eh.ErrAggregateNotFound, + Op: eh.EventStoreOpLoad, + AggregateID: id, + } + } + + return events, nil +} + +// Close closes the database client. +func (s *EventStore) Close() error { + if err := s.db.Close(); err != nil { + return fmt.Errorf("could not close DB connection: %w", err) + } + + return nil +} + +// stream is a stream of events, often containing the events for an aggregate. +type stream struct { + bun.BaseModel `bun:"streams,alias:s"` + + ID uuid.UUID `bun:"type:uuid,pk"` + AggregateType eh.AggregateType `bun:",notnull"` + Position int64 `bun:"type:integer,notnull"` + Version int `bun:"type:integer,notnull"` + UpdatedAt time.Time `bun:""` +} + +// evt is the internal event record for the event store used +// to save and load events from the DB. +type evt struct { + bun.BaseModel `bun:"events,alias:e"` + + Position int64 `bun:",pk,autoincrement"` + EventType eh.EventType `bun:",notnull"` + Timestamp time.Time `bun:",notnull"` + AggregateType eh.AggregateType `bun:",notnull"` + AggregateID uuid.UUID `bun:"type:uuid,notnull"` + Version int `bun:"type:integer,notnull"` + RawData json.RawMessage `bun:"type:jsonb,nullzero"` + data eh.EventData `bun:"-"` + Metadata map[string]interface{} `bun:""` +} + +// newEvt returns a new evt for an event. +func newEvt(ctx context.Context, event eh.Event) (*evt, error) { + e := &evt{ + EventType: event.EventType(), + Timestamp: event.Timestamp(), + AggregateType: event.AggregateType(), + AggregateID: event.AggregateID(), + Version: event.Version(), + Metadata: event.Metadata(), + } + + if e.Metadata == nil { + e.Metadata = map[string]interface{}{} + } + + // Marshal event data if there is any. + if event.Data() != nil { + var err error + + e.RawData, err = json.Marshal(event.Data()) + if err != nil { + return nil, &eh.EventStoreError{ + Err: fmt.Errorf("could not marshal event data: %w", err), + } + } + } + + return e, nil +} diff --git a/eventstore/postgres/eventstore_test.go b/eventstore/postgres/eventstore_test.go new file mode 100644 index 00000000..2c2f61be --- /dev/null +++ b/eventstore/postgres/eventstore_test.go @@ -0,0 +1,201 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "os" + "testing" + "time" + + "github.com/uptrace/bun/driver/pgdriver" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/eventstore" + "github.com/looplab/eventhorizon/mocks" + "github.com/looplab/eventhorizon/uuid" +) + +func TestEventStoreIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + db := "test_" + hex.EncodeToString(b) + t.Log("using DB:", db) + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + t.Fatal("could not create test DB:", err) + } + if err := rootDB.Close(); err != nil { + t.Error("could not close DB:", err) + } + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + store, err := NewEventStore(uri) + if err != nil { + t.Fatal("there should be no error:", err) + } + if store == nil { + t.Fatal("there should be a store") + } + + defer store.Close() + + eventstore.AcceptanceTest(t, store, context.Background()) +} + +func TestWithEventHandlerIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + db := "test_" + hex.EncodeToString(b) + t.Log("using DB:", db) + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + t.Fatal("could not create test DB:", err) + } + if err := rootDB.Close(); err != nil { + t.Error("could not close DB:", err) + } + + h := &mocks.EventBus{} + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + store, err := NewEventStore(uri, + WithEventHandler(h), + ) + if err != nil { + t.Fatal("there should be no error:", err) + } + if store == nil { + t.Fatal("there should be a store") + } + + defer store.Close() + + ctx := context.Background() + + // The event handler should be called. + id1 := uuid.New() + timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, + timestamp, mocks.AggregateType, id1, 1) + err = store.Save(ctx, []eh.Event{event1}, 0) + if err != nil { + t.Error("there should be no error:", err) + } + expected := []eh.Event{event1} + // The saved events should be ok. + events, err := store.Load(ctx, id1) + if err != nil { + t.Error("there should be no error:", err) + } + // The stored events should be ok. + if len(events) != len(expected) { + t.Errorf("incorrect number of loaded events: %d", len(events)) + } + for i, event := range events { + if err := eh.CompareEvents(event, expected[i], + eh.IgnoreVersion(), + eh.IgnorePositionMetadata(), + ); err != nil { + t.Error("the stored event was incorrect:", err) + } + if event.Version() != i+1 { + t.Error("the event version should be correct:", event, event.Version()) + } + } + // The handled events should be ok. + if len(h.Events) != len(expected) { + t.Errorf("incorrect number of loaded events: %d", len(events)) + } + for i, event := range h.Events { + if err := eh.CompareEvents(event, expected[i], eh.IgnoreVersion()); err != nil { + t.Error("the handeled event was incorrect:", err) + } + if event.Version() != i+1 { + t.Error("the event version should be correct:", event, event.Version()) + } + } +} + +func BenchmarkEventStore(b *testing.B) { + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + // Get a random DB name. + bs := make([]byte, 4) + if _, err := rand.Read(bs); err != nil { + b.Fatal(err) + } + db := "test_" + hex.EncodeToString(bs) + b.Log("using DB:", db) + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + b.Fatal("could not create test DB:", err) + } + if err := rootDB.Close(); err != nil { + b.Error("could not close DB:", err) + } + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + store, err := NewEventStore(uri) + if err != nil { + b.Fatal("there should be no error:", err) + } + if store == nil { + b.Fatal("there should be a store") + } + + defer store.Close() + + eventstore.Benchmark(b, store) +} diff --git a/examples/guestlist/postgres/postgres.go b/examples/guestlist/postgres/postgres.go new file mode 100644 index 00000000..9c06b63c --- /dev/null +++ b/examples/guestlist/postgres/postgres.go @@ -0,0 +1,16 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package postgres contains an example of a CQRS/ES app using the Postgres adapter. +package postgres diff --git a/examples/guestlist/postgres/postgres_test.go b/examples/guestlist/postgres/postgres_test.go new file mode 100644 index 00000000..be0cd083 --- /dev/null +++ b/examples/guestlist/postgres/postgres_test.go @@ -0,0 +1,248 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "fmt" + "log" + "os" + "sort" + "sync" + "testing" + "time" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/commandhandler/bus" + localEventBus "github.com/looplab/eventhorizon/eventbus/local" + postgresEventStore "github.com/looplab/eventhorizon/eventstore/postgres" + "github.com/looplab/eventhorizon/repo/mongodb" + "github.com/looplab/eventhorizon/repo/version" + "github.com/looplab/eventhorizon/uuid" + "github.com/uptrace/bun/driver/pgdriver" + + "github.com/looplab/eventhorizon/examples/guestlist/domains/guestlist" +) + +func ExampleIntegration() { + if testing.Short() { + // Skip test when not running integration, fake success by printing. + fmt.Println(`invitation: Athena - confirmed +invitation: Hades - confirmed +invitation: Poseidon - denied +invitation: Zeus - declined +guest list: 4 invited - 3 accepted, 1 declined - 2 confirmed, 1 denied`) + return + } + + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + db := "guestlist_example" + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + log.Fatal("could not create DB: ", err) + } + if err := rootDB.Close(); err != nil { + log.Fatal("could not close DB:", err) + } + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + + // Create the event bus that distributes events. + eventBus := localEventBus.NewEventBus(nil) + go func() { + for e := range eventBus.Errors() { + log.Printf("eventbus: %s", e.Error()) + } + }() + + // Create the event store. + eventStore, err := postgresEventStore.NewEventStore(uri, + postgresEventStore.WithEventHandler(eventBus), // Add the event bus as a handler after save. + ) + if err != nil { + log.Fatalf("could not create event store: %s", err) + } + + // Create the command bus. + commandBus := bus.NewCommandHandler() + + // TODO: Use Postgres repos. + mongoAddr := os.Getenv("MONGODB_ADDR") + if mongoAddr == "" { + mongoAddr = "localhost:27017" + } + url := "mongodb://" + mongoAddr + + // Create the read repositories. + invitationRepo, err := mongodb.NewRepo(url, db, "invitations") + if err != nil { + log.Fatalf("could not create invitation repository: %s", err) + } + invitationRepo.SetEntityFactory(func() eh.Entity { return &guestlist.Invitation{} }) + // A version repo is needed for the projector to handle eventual consistency. + invitationVersionRepo := version.NewRepo(invitationRepo) + + guestListRepo, err := mongodb.NewRepo(url, db, "guest_lists") + if err != nil { + log.Fatalf("could not create guest list repository: %s", err) + } + guestListRepo.SetEntityFactory(func() eh.Entity { return &guestlist.GuestList{} }) + + ctx := context.Background() + + // Setup the guestlist. + eventID := uuid.New() + guestlist.Setup( + ctx, + eventStore, + eventBus, // Use the event bus both as local and global handler. + eventBus, + commandBus, + invitationVersionRepo, guestListRepo, + eventID, + ) + + // Setup a test utility waiter that waits for all 11 events to occur before + // evaluating results. + var wg sync.WaitGroup + wg.Add(11) + if err := eventBus.AddHandler(ctx, eh.MatchAll{}, eh.EventHandlerFunc( + func(ctx context.Context, e eh.Event) error { + wg.Done() + return nil + }, + )); err != nil { + log.Fatal("could not add test sync to event bus:", err) + } + + // Clear DB collections. + // if err := eventStore.Clear(ctx); err != nil { + // log.Fatal("could not clear event store:", err) + // } + if err := invitationRepo.Clear(ctx); err != nil { + log.Fatal("could not clear invitation repo:", err) + } + if err := guestListRepo.Clear(ctx); err != nil { + log.Fatal("could not clear guest list repo:", err) + } + + // --- Execute commands on the domain -------------------------------------- + + // IDs for all the guests. + athenaID := uuid.New() + hadesID := uuid.New() + zeusID := uuid.New() + poseidonID := uuid.New() + + // Issue some invitations and responses. Error checking omitted here. + if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: athenaID, Name: "Athena", Age: 42}); err != nil { + log.Println("error:", err) + } + if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: hadesID, Name: "Hades"}); err != nil { + log.Println("error:", err) + } + if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: zeusID, Name: "Zeus"}); err != nil { + log.Println("error:", err) + } + if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: poseidonID, Name: "Poseidon"}); err != nil { + log.Println("error:", err) + } + + // The invited guests accept and decline the event. + // Note that Athena tries to decline the event after first accepting, but + // that is not allowed by the domain logic in InvitationAggregate. The + // result is that she is still accepted. + if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: athenaID}); err != nil { + log.Println("error:", err) + } + if err = commandBus.HandleCommand(ctx, &guestlist.DeclineInvite{ID: athenaID}); err != nil { + // NOTE: This error is supposed to be printed! + log.Println("error:", err) + } + if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: hadesID}); err != nil { + log.Println("error:", err) + } + if err := commandBus.HandleCommand(ctx, &guestlist.DeclineInvite{ID: zeusID}); err != nil { + log.Println("error:", err) + } + + // Poseidon is a bit late to the party, will not be accepted... + if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: poseidonID}); err != nil { + log.Println("error:", err) + } + + // Wait for simulated eventual consistency before reading. + wg.Wait() + time.Sleep(1000 * time.Millisecond) + + // Read all invites. + invitationStrs := []string{} + invitations, err := invitationRepo.FindAll(ctx) + if err != nil { + log.Println("error:", err) + } + for _, i := range invitations { + if i, ok := i.(*guestlist.Invitation); ok { + invitationStrs = append(invitationStrs, fmt.Sprintf("%s - %s", i.Name, i.Status)) + } + } + + // Sort the output to be able to compare test results. + sort.Strings(invitationStrs) + for _, s := range invitationStrs { + log.Printf("invitation: %s\n", s) + fmt.Printf("invitation: %s\n", s) + } + + // Read the guest list. + l, err := guestListRepo.Find(ctx, eventID) + if err != nil { + log.Println("error:", err) + } + if l, ok := l.(*guestlist.GuestList); ok { + log.Printf("guest list: %d invited - %d accepted, %d declined - %d confirmed, %d denied\n", + l.NumGuests, l.NumAccepted, l.NumDeclined, l.NumConfirmed, l.NumDenied) + fmt.Printf("guest list: %d invited - %d accepted, %d declined - %d confirmed, %d denied\n", + l.NumGuests, l.NumAccepted, l.NumDeclined, l.NumConfirmed, l.NumDenied) + } + + if err := eventBus.Close(); err != nil { + log.Println("error closing event bus:", err) + } + if err := invitationRepo.Close(); err != nil { + log.Println("error closing invitation repo:", err) + } + if err := guestListRepo.Close(); err != nil { + log.Println("error closing guest list repo:", err) + } + if err := eventStore.Close(); err != nil { + log.Println("error closing event store:", err) + } + + // Output: + // invitation: Athena - confirmed + // invitation: Hades - confirmed + // invitation: Poseidon - denied + // invitation: Zeus - declined + // guest list: 4 invited - 3 accepted, 1 declined - 2 confirmed, 1 denied +} diff --git a/go.mod b/go.mod index 7748cf52..19424954 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,9 @@ require ( github.com/segmentio/kafka-go v0.4.23 github.com/uber/jaeger-client-go v2.29.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect + github.com/uptrace/bun v1.0.16 + github.com/uptrace/bun/dialect/pgdialect v1.0.16 + github.com/uptrace/bun/driver/pgdriver v1.0.16 go.mongodb.org/mongo-driver v1.7.1 go.uber.org/atomic v1.9.0 // indirect google.golang.org/api v0.54.0 diff --git a/go.sum b/go.sum index e4da2c0c..d479ef1b 100644 --- a/go.sum +++ b/go.sum @@ -204,6 +204,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jinzhu/copier v0.3.2 h1:QdBOCbaouLDYaIPFfi1bKv5F5tPpeTwXe4sD0jqtz5w= github.com/jinzhu/copier v0.3.2/go.mod h1:24xnZezI2Yqac9J61UC6/dG/k76ttpq0DdJI3QmUvro= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -296,10 +298,22 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= github.com/uber/jaeger-client-go v2.29.1+incompatible h1:R9ec3zO3sGpzs0abd43Y+fBZRJ9uiH6lXyR/+u6brW4= github.com/uber/jaeger-client-go v2.29.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/uptrace/bun v1.0.16 h1:tqxmeQvYxiJgH2kL8MWuMvXQRr+fs3Aagu5jbfj6YXc= +github.com/uptrace/bun v1.0.16/go.mod h1:rs7qARtH6aqLtcfmloWM6qn90iyY53N2ScLQssIrevw= +github.com/uptrace/bun/dialect/pgdialect v1.0.16 h1:VULqKYP2e/+zlsolmp5/3LKWHzg/dml72khlJJ8nlHU= +github.com/uptrace/bun/dialect/pgdialect v1.0.16/go.mod h1:N8hUQPkPnrjVS4PcAW5mtAbdWuVXtky1NUdVQirt6jE= +github.com/uptrace/bun/driver/pgdriver v1.0.16 h1:q5noacvJBOY6ifBUFw9YraKq9E2i1HZfK5lwbKf9JEM= +github.com/uptrace/bun/driver/pgdriver v1.0.16/go.mod h1:PYpxsueZy6+7hIbiDKbi3c0jJtz9ekMRVWs7z0wUSYk= +github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= +github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w= @@ -331,6 +345,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -341,8 +356,9 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -505,8 +521,9 @@ golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -740,6 +757,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +mellium.im/sasl v0.2.1 h1:nspKSRg7/SyO0cRGY71OkfHab8tf9kCts6a6oTDut0w= +mellium.im/sasl v0.2.1/go.mod h1:ROaEDLQNuf9vjKqE1SrAfnsobm2YKXT1gnN1uDp1PjQ= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/repo/postrges/repo.go b/repo/postrges/repo.go new file mode 100644 index 00000000..7b89ab32 --- /dev/null +++ b/repo/postrges/repo.go @@ -0,0 +1,377 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "go.mongodb.org/mongo-driver/mongo" + + // Register uuid.UUID as BSON type. + _ "github.com/looplab/eventhorizon/codec/bson" + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect/pgdialect" + "github.com/uptrace/bun/driver/pgdriver" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" +) + +var ( + // ErrModelNotSet is when an model factory is not set on the Repo. + ErrModelNotSet = errors.New("model not set") + // ErrInvalidQuery is when a query was not returned from the callback to FindCustom. + ErrInvalidQuery = errors.New("invalid query") +) + +// Repo implements an MongoDB repository for entities. +type Repo struct { + db *bun.DB + newEntity func() eh.Entity +} + +// NewRepo creates a new Repo with a Postgres URI: +// `postgres://user:password@hostname:port/db?options` +func NewRepo(uri string, options ...Option) (*Repo, error) { + sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(uri))) + db := bun.NewDB(sqldb, pgdialect.New()) + + return NewRepoWithDB(db, options...) +} + +// NewRepoWithDB creates a new Repo with a DB. +func NewRepoWithDB(db *bun.DB, options ...Option) (*Repo, error) { + if db == nil { + return nil, fmt.Errorf("missing DB") + } + + r := &Repo{ + db: db, + } + + for _, option := range options { + if err := option(r); err != nil { + return nil, fmt.Errorf("error while applying option: %w", err) + } + } + + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("could not connect to DB: %w", err) + } + + // Add the UUID extension. + if _, err := db.Exec(`CREATE EXTENSION IF NOT EXISTS "uuid-ossp"`); err != nil { + return nil, fmt.Errorf("could not add the UUID extension: %w", err) + } + + return r, nil +} + +// Option is an option setter used to configure creation. +type Option func(*Repo) error + +// InnerRepo implements the InnerRepo method of the eventhorizon.ReadRepo interface. +func (r *Repo) InnerRepo(ctx context.Context) eh.ReadRepo { + return nil +} + +// IntoRepo tries to convert a eh.ReadRepo into a Repo by recursively looking at +// inner repos. Returns nil if none was found. +func IntoRepo(ctx context.Context, repo eh.ReadRepo) *Repo { + if repo == nil { + return nil + } + + if r, ok := repo.(*Repo); ok { + return r + } + + return IntoRepo(ctx, repo.InnerRepo(ctx)) +} + +// Find implements the Find method of the eventhorizon.ReadRepo interface. +func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { + // if r.newEntity == nil { + // return nil, eh.RepoError{ + // Err: ErrModelNotSet, + // } + // } + + // entity := r.newEntity() + // if err := r.entities.FindOne(ctx, bson.M{"_id": id.String()}).Decode(entity); err == mongo.ErrNoDocuments { + // return nil, eh.RepoError{ + // Err: eh.ErrEntityNotFound, + // BaseErr: err, + // } + // } else if err != nil { + // return nil, eh.RepoError{ + // Err: eh.ErrCouldNotLoadEntity, + // BaseErr: err, + // } + // } + + // return entity, nil + + return nil, nil +} + +// FindAll implements the FindAll method of the eventhorizon.ReadRepo interface. +func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { + // if r.newEntity == nil { + // return nil, eh.RepoError{ + // Err: ErrModelNotSet, + // } + // } + + // cursor, err := r.entities.Find(ctx, bson.M{}) + // if err != nil { + // return nil, eh.RepoError{ + // Err: eh.ErrCouldNotLoadEntity, + // BaseErr: err, + // } + // } + + // result := []eh.Entity{} + // for cursor.Next(ctx) { + // entity := r.newEntity() + // if err := cursor.Decode(entity); err != nil { + // return nil, eh.RepoError{ + // Err: eh.ErrCouldNotLoadEntity, + // BaseErr: err, + // } + // } + // result = append(result, entity) + // } + + // if err := cursor.Close(ctx); err != nil { + // return nil, eh.RepoError{ + // Err: eh.ErrCouldNotLoadEntity, + // BaseErr: err, + // } + // } + + // return result, nil + + return nil, nil +} + +// The iterator is not thread safe. +type iter struct { + cursor *mongo.Cursor + data eh.Entity + newEntity func() eh.Entity + decodeErr error +} + +func (i *iter) Next(ctx context.Context) bool { + if !i.cursor.Next(ctx) { + return false + } + + item := i.newEntity() + i.decodeErr = i.cursor.Decode(item) + i.data = item + + return true +} + +func (i *iter) Value() interface{} { + return i.data +} + +func (i *iter) Close(ctx context.Context) error { + if err := i.cursor.Close(ctx); err != nil { + return err + } + + return i.decodeErr +} + +// FindCustomIter returns a mgo cursor you can use to stream results of very large datasets +// func (r *Repo) FindCustomIter(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) (eh.Iter, error) { +// if r.newEntity == nil { +// return nil, eh.RepoError{ +// Err: ErrModelNotSet, +// } +// } + +// cursor, err := f(ctx, r.entities) +// if err != nil { +// return nil, eh.RepoError{ +// Err: ErrInvalidQuery, +// BaseErr: err, +// } +// } +// if cursor == nil { +// return nil, eh.RepoError{ +// Err: ErrInvalidQuery, +// } +// } + +// return &iter{ +// cursor: cursor, +// newEntity: r.newEntity, +// }, nil +// } + +// FindCustom uses a callback to specify a custom query for returning models. +// It can also be used to do queries that does not map to the model by executing +// the query in the callback and returning nil to block a second execution of +// the same query in FindCustom. Expect a ErrInvalidQuery if returning a nil +// query from the callback. +// func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) ([]interface{}, error) { +// if r.newEntity == nil { +// return nil, eh.RepoError{ +// Err: ErrModelNotSet, +// } +// } + +// cursor, err := f(ctx, r.entities) +// if err != nil { +// return nil, eh.RepoError{ +// Err: ErrInvalidQuery, +// BaseErr: err, +// } +// } +// if cursor == nil { +// return nil, eh.RepoError{ +// Err: ErrInvalidQuery, +// } +// } + +// result := []interface{}{} +// entity := r.newEntity() +// for cursor.Next(ctx) { +// if err := cursor.Decode(entity); err != nil { +// return nil, eh.RepoError{ +// Err: eh.ErrCouldNotLoadEntity, +// BaseErr: err, +// } +// } +// result = append(result, entity) +// entity = r.newEntity() +// } +// if err := cursor.Close(ctx); err != nil { +// return nil, eh.RepoError{ +// Err: eh.ErrCouldNotLoadEntity, +// BaseErr: err, +// } +// } + +// return result, nil +// } + +// Save implements the Save method of the eventhorizon.WriteRepo interface. +func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { + // if entity.EntityID() == uuid.Nil { + // return eh.RepoError{ + // Err: eh.ErrCouldNotSaveEntity, + // BaseErr: eh.ErrMissingEntityID, + // } + // } + + // if _, err := r.entities.UpdateOne(ctx, + // bson.M{ + // "_id": entity.EntityID().String(), + // }, + // bson.M{ + // "$set": entity, + // }, + // options.Update().SetUpsert(true), + // ); err != nil { + // return eh.RepoError{ + // Err: eh.ErrCouldNotSaveEntity, + // BaseErr: err, + // } + // } + return nil +} + +// Remove implements the Remove method of the eventhorizon.WriteRepo interface. +func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { + // if r, err := r.entities.DeleteOne(ctx, bson.M{"_id": id.String()}); err != nil { + // return eh.RepoError{ + // Err: eh.ErrCouldNotRemoveEntity, + // BaseErr: err, + // } + // } else if r.DeletedCount == 0 { + // return eh.RepoError{ + // Err: eh.ErrEntityNotFound, + // } + // } + + return nil +} + +// Collection lets the function do custom actions on the collection. +// func (r *Repo) Collection(ctx context.Context, f func(context.Context, *mongo.Collection) error) error { +// if err := f(ctx, r.entities); err != nil { +// return eh.RepoError{ +// Err: err, +// } +// } + +// return nil +// } + +// CreateIndex creates an index for a field. +// func (r *Repo) CreateIndex(ctx context.Context, field string) error { +// index := mongo.IndexModel{Keys: bson.M{field: 1}} +// if _, err := r.entities.Indexes().CreateOne(ctx, index); err != nil { +// return fmt.Errorf("could not create index: %s", err) +// } +// return nil +// } + +// SetEntityFactory sets a factory function that creates concrete entity types. +func (r *Repo) SetEntityFactory(f func() eh.Entity) { + r.newEntity = f + + m := f() + + r.db.RegisterModel(m) + + // Make sure event tables exists. + if _, err := r.db.NewCreateTable(). + Model(m). + IfNotExists(). + Exec(context.Background()); err != nil { + // TODO: Return error. + // return nil, fmt.Errorf("could not create stream table: %w", err) + } +} + +// Clear clears the read model database. +// func (r *Repo) Clear(ctx context.Context) error { +// if err := r.entities.Drop(ctx); err != nil { +// return eh.RepoError{ +// Err: ErrCouldNotClearDB, +// BaseErr: err, +// } +// } +// return nil +// } + +// Close closes a database session. +func (r *Repo) Close() error { + if err := r.db.Close(); err != nil { + return fmt.Errorf("could not close DB: %w", err) + } + + return nil +} diff --git a/repo/postrges/repo_test.go b/repo/postrges/repo_test.go new file mode 100644 index 00000000..c9ce0318 --- /dev/null +++ b/repo/postrges/repo_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2021 - The Event Horizon authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "os" + "testing" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/mocks" + "github.com/looplab/eventhorizon/repo" + "github.com/uptrace/bun/driver/pgdriver" +) + +func TestReadRepoIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + + db := "test_" + hex.EncodeToString(b) + + t.Log("using DB:", db) + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + t.Fatal("could not create test DB:", err) + } + + if err := rootDB.Close(); err != nil { + t.Error("could not close DB:", err) + } + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + + r, err := NewRepo(uri) + if err != nil { + t.Error("there should be no error:", err) + } + + if r == nil { + t.Error("there should be a repository") + } + + defer r.Close() + + r.SetEntityFactory(func() eh.Entity { + return &mocks.Model{} + }) + + if r.InnerRepo(context.Background()) != nil { + t.Error("the inner repo should be nil") + } + + repo.AcceptanceTest(t, r, context.Background()) + // extraRepoTests(t, r) +} + +// func extraRepoTests(t *testing.T, r *Repo) { +// ctx := context.Background() + +// // Insert a custom item. +// modelCustom := &mocks.Model{ +// ID: uuid.New(), +// Content: "modelCustom", +// CreatedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), +// } +// if err := r.Save(ctx, modelCustom); err != nil { +// t.Error("there should be no error:", err) +// } + +// // FindCustom by content. +// result, err := r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { +// return c.Find(ctx, bson.M{"content": "modelCustom"}) +// }) +// if len(result) != 1 { +// t.Error("there should be one item:", len(result)) +// } +// if !reflect.DeepEqual(result[0], modelCustom) { +// t.Error("the item should be correct:", modelCustom) +// } + +// // FindCustom with no query. +// result, err = r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { +// return nil, nil +// }) +// var repoErr eh.RepoError +// if !errors.As(err, &repoErr) || !errors.Is(err, ErrInvalidQuery) { +// t.Error("there should be a invalid query error:", err) +// } + +// var count int64 +// // FindCustom with query execution in the callback. +// _, err = r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { +// if count, err = c.CountDocuments(ctx, bson.M{}); err != nil { +// t.Error("there should be no error:", err) +// } + +// // Be sure to return nil to not execute the query again in FindCustom. +// return nil, nil +// }) +// if !errors.As(err, &repoErr) || !errors.Is(err, ErrInvalidQuery) { +// t.Error("there should be a invalid query error:", err) +// } +// if count != 2 { +// t.Error("the count should be correct:", count) +// } + +// modelCustom2 := &mocks.Model{ +// ID: uuid.New(), +// Content: "modelCustom2", +// } +// if err := r.Collection(ctx, func(ctx context.Context, c *mongo.Collection) error { +// _, err := c.InsertOne(ctx, modelCustom2) +// return err +// }); err != nil { +// t.Error("there should be no error:", err) +// } +// model, err := r.Find(ctx, modelCustom2.ID) +// if err != nil { +// t.Error("there should be no error:", err) +// } +// if !reflect.DeepEqual(model, modelCustom2) { +// t.Error("the item should be correct:", model) +// } + +// // FindCustomIter by content. +// iter, err := r.FindCustomIter(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { +// return c.Find(ctx, bson.M{"content": "modelCustom"}) +// }) +// if err != nil { +// t.Error("there should be no error:", err) +// } + +// if iter.Next(ctx) != true { +// t.Error("the iterator should have results") +// } +// if !reflect.DeepEqual(iter.Value(), modelCustom) { +// t.Error("the item should be correct:", modelCustom) +// } +// if iter.Next(ctx) == true { +// t.Error("the iterator should have no results") +// } +// err = iter.Close(ctx) +// if err != nil { +// t.Error("there should be no error:", err) +// } +// } + +func TestIntoRepoIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + if r := IntoRepo(context.Background(), nil); r != nil { + t.Error("the repository should be nil:", r) + } + + other := &mocks.Repo{} + if r := IntoRepo(context.Background(), other); r != nil { + t.Error("the repository should be correct:", r) + } + + // Use Postgres in Docker with fallback to localhost. + addr := os.Getenv("POSTGRES_ADDR") + if addr == "" { + addr = "localhost:5432" + } + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + + db := "test_" + hex.EncodeToString(b) + + t.Log("using DB:", db) + + rootURI := "postgres://postgres:password@" + addr + "/postgres?sslmode=disable" + rootDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(rootURI))) + + if _, err := rootDB.Exec("CREATE DATABASE " + db); err != nil { + t.Fatal("could not create test DB:", err) + } + + if err := rootDB.Close(); err != nil { + t.Error("could not close DB:", err) + } + + uri := "postgres://postgres:password@" + addr + "/" + db + "?sslmode=disable" + + inner, err := NewRepo(uri) + if err != nil { + t.Error("there should be no error:", err) + } + + defer inner.Close() + + outer := &mocks.Repo{ParentRepo: inner} + if r := IntoRepo(context.Background(), outer); r != inner { + t.Error("the repository should be correct:", r) + } +}