From 6fbfb87d90a05e2870b78bdea004696cee071dab Mon Sep 17 00:00:00 2001 From: Jozef Kralik Date: Wed, 17 Feb 2021 08:33:19 +0000 Subject: [PATCH] eventstore: fix using of hints in mongo --- .../cqrs/eventstore/mongodb/eventstore.go | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go b/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go index c35600bc2..4cd899f19 100644 --- a/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go +++ b/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go @@ -36,8 +36,8 @@ var snapshotsQueryIndex = bson.D{ } var eventsQueryIndex = bson.D{ - {versionKey, 1}, {aggregateIDKey, 1}, + {versionKey, 1}, } type signOperator string @@ -279,21 +279,21 @@ func (s *EventStore) Save(ctx context.Context, collectionID, aggregateID string, return false, errors.New("cannot save events without AggregateId") } + col := s.client.Database(s.DBName()).Collection(getEventCollectionName(collectionID)) if events[0].Version() == 0 { - concurrencyException, err = s.SaveSnapshotQuery(ctx, collectionID, aggregateID, 0) + err = s.ensureIndex(ctx, col, eventsQueryIndex) if err != nil { - return false, fmt.Errorf("cannot save events without snapshot query for version 0: %w", err) - } - if concurrencyException { - return concurrencyException, nil + return false, fmt.Errorf("cannot save events: %w", err) } } - col := s.client.Database(s.DBName()).Collection(getEventCollectionName(collectionID)) if events[0].Version() == 0 { - err = s.ensureIndex(ctx, col, eventsQueryIndex) + concurrencyException, err = s.SaveSnapshotQuery(ctx, collectionID, aggregateID, 0) if err != nil { - return false, fmt.Errorf("cannot save events: %w", err) + return false, fmt.Errorf("cannot save events without snapshot query for version 0: %w", err) + } + if concurrencyException { + return concurrencyException, nil } } @@ -349,8 +349,7 @@ func (i *iterator) Err() error { } func versionQueriesToMgoQuery(queries []eventstore.VersionQuery, op signOperator) (bson.M, error) { - orQueries := make([]bson.M, 0, 32) - + orQueries := make([]bson.D, 0, 32) if len(queries) == 0 { return bson.M{}, fmt.Errorf("empty []eventstore.VersionQuery") } @@ -368,10 +367,10 @@ func versionQueriesToMgoQuery(queries []eventstore.VersionQuery, op signOperator return bson.M{"$or": orQueries}, nil } -func versionQueryToMgoQuery(query eventstore.VersionQuery, op signOperator) bson.M { - return bson.M{ - versionKey: bson.M{string(op): query.Version}, - aggregateIDKey: aggregateID2Hash(query.AggregateID), +func versionQueryToMgoQuery(query eventstore.VersionQuery, op signOperator) bson.D { + return bson.D{ + {Key: aggregateIDKey, Value: aggregateID2Hash(query.AggregateID)}, + {Key: versionKey, Value: bson.M{string(op): query.Version}}, } } @@ -682,26 +681,24 @@ func getSnapshotID(aggregateID string) string { return aggregateID + ".s" } -func snapshotQueriesToMgoQuery(queries []eventstore.SnapshotQuery) (bson.M, *options.FindOptions) { +func snapshotQueriesToMgoQuery(queries []eventstore.SnapshotQuery) (interface{}, *options.FindOptions) { if len(queries) == 0 { opts := options.FindOptions{} opts.SetHint(eventsQueryIndex) - return bson.M{aggregateIDKey: aggregateID2Hash("snapshot"), versionKey: -1}, &opts + return bson.D{{Key: aggregateIDKey, Value: aggregateID2Hash("snapshot")}, {Key: versionKey, Value: -1}}, &opts } if len(queries) == 1 { opts := options.FindOptions{} opts.SetHint(snapshotsQueryIndex) - return bson.M{idKey: getSnapshotID(queries[0].AggregateID)}, &opts + return bson.D{{Key: idKey, Value: getSnapshotID(queries[0].AggregateID)}}, &opts } orQueries := make([]bson.M, 0, 32) for _, q := range queries { - andQueries := make([]bson.M, 0, 4) if q.AggregateID != "" { - andQueries = append(andQueries, bson.M{idKey: getSnapshotID(q.AggregateID)}) + orQueries = append(orQueries, bson.M{idKey: getSnapshotID(q.AggregateID)}) } - orQueries = append(orQueries, bson.M{"$and": andQueries}) } opts := options.FindOptions{} opts.SetHint(snapshotsQueryIndex)