Skip to content

Commit

Permalink
fix(RedisQueue): call ack and limit the number of messages to prevent…
Browse files Browse the repository at this point in the history
… message loss (#138)
  • Loading branch information
kwkwc authored May 29, 2024
1 parent 89ed6fd commit ab2dddb
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion queues/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (q *RedisQueue) handleMessage(ctx context.Context) {
Group: q.Group,
Consumer: q.Consumer,
Streams: []string{q.Stream, ">"},
Count: int64(10),
Count: int64(1),
Block: 0,
NoAck: false,
}).Result()
Expand All @@ -119,6 +119,12 @@ func (q *RedisQueue) handleMessage(ctx context.Context) {
for _, msg := range messages[0].Messages {
bJ := []byte(fmt.Sprintf("%v", msg.Values["job"]))
q.jobC <- bJ
err := q.RDB.XAck(ctx, q.Stream, q.Group, msg.ID).Err()
if err != nil {
slog.Error(fmt.Sprintf("RedisQueue ack error: `%s`", err))
time.Sleep(1 * time.Second)
continue
}
}
}
}
Expand Down

0 comments on commit ab2dddb

Please sign in to comment.