Skip to content

Commit

Permalink
eventstore: fix using of hints in mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Feb 22, 2021
1 parent a604a86 commit 6fbfb87
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions resource-aggregate/cqrs/eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var snapshotsQueryIndex = bson.D{
}

var eventsQueryIndex = bson.D{
{versionKey, 1},
{aggregateIDKey, 1},
{versionKey, 1},
}

type signOperator string
Expand Down Expand Up @@ -279,21 +279,21 @@ func (s *EventStore) Save(ctx context.Context, collectionID, aggregateID string,
return false, errors.New("cannot save events without AggregateId")
}

col := s.client.Database(s.DBName()).Collection(getEventCollectionName(collectionID))
if events[0].Version() == 0 {
concurrencyException, err = s.SaveSnapshotQuery(ctx, collectionID, aggregateID, 0)
err = s.ensureIndex(ctx, col, eventsQueryIndex)
if err != nil {
return false, fmt.Errorf("cannot save events without snapshot query for version 0: %w", err)
}
if concurrencyException {
return concurrencyException, nil
return false, fmt.Errorf("cannot save events: %w", err)
}
}

col := s.client.Database(s.DBName()).Collection(getEventCollectionName(collectionID))
if events[0].Version() == 0 {
err = s.ensureIndex(ctx, col, eventsQueryIndex)
concurrencyException, err = s.SaveSnapshotQuery(ctx, collectionID, aggregateID, 0)
if err != nil {
return false, fmt.Errorf("cannot save events: %w", err)
return false, fmt.Errorf("cannot save events without snapshot query for version 0: %w", err)
}
if concurrencyException {
return concurrencyException, nil
}
}

Expand Down Expand Up @@ -349,8 +349,7 @@ func (i *iterator) Err() error {
}

func versionQueriesToMgoQuery(queries []eventstore.VersionQuery, op signOperator) (bson.M, error) {
orQueries := make([]bson.M, 0, 32)

orQueries := make([]bson.D, 0, 32)
if len(queries) == 0 {
return bson.M{}, fmt.Errorf("empty []eventstore.VersionQuery")
}
Expand All @@ -368,10 +367,10 @@ func versionQueriesToMgoQuery(queries []eventstore.VersionQuery, op signOperator
return bson.M{"$or": orQueries}, nil
}

func versionQueryToMgoQuery(query eventstore.VersionQuery, op signOperator) bson.M {
return bson.M{
versionKey: bson.M{string(op): query.Version},
aggregateIDKey: aggregateID2Hash(query.AggregateID),
func versionQueryToMgoQuery(query eventstore.VersionQuery, op signOperator) bson.D {
return bson.D{
{Key: aggregateIDKey, Value: aggregateID2Hash(query.AggregateID)},
{Key: versionKey, Value: bson.M{string(op): query.Version}},
}
}

Expand Down Expand Up @@ -682,26 +681,24 @@ func getSnapshotID(aggregateID string) string {
return aggregateID + ".s"
}

func snapshotQueriesToMgoQuery(queries []eventstore.SnapshotQuery) (bson.M, *options.FindOptions) {
func snapshotQueriesToMgoQuery(queries []eventstore.SnapshotQuery) (interface{}, *options.FindOptions) {
if len(queries) == 0 {
opts := options.FindOptions{}
opts.SetHint(eventsQueryIndex)
return bson.M{aggregateIDKey: aggregateID2Hash("snapshot"), versionKey: -1}, &opts
return bson.D{{Key: aggregateIDKey, Value: aggregateID2Hash("snapshot")}, {Key: versionKey, Value: -1}}, &opts
}

if len(queries) == 1 {
opts := options.FindOptions{}
opts.SetHint(snapshotsQueryIndex)
return bson.M{idKey: getSnapshotID(queries[0].AggregateID)}, &opts
return bson.D{{Key: idKey, Value: getSnapshotID(queries[0].AggregateID)}}, &opts
}

orQueries := make([]bson.M, 0, 32)
for _, q := range queries {
andQueries := make([]bson.M, 0, 4)
if q.AggregateID != "" {
andQueries = append(andQueries, bson.M{idKey: getSnapshotID(q.AggregateID)})
orQueries = append(orQueries, bson.M{idKey: getSnapshotID(q.AggregateID)})
}
orQueries = append(orQueries, bson.M{"$and": andQueries})
}
opts := options.FindOptions{}
opts.SetHint(snapshotsQueryIndex)
Expand Down

0 comments on commit 6fbfb87

Please sign in to comment.