Filter Events by some Event's Attribute
From regular stream
val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))
readJournal
.currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None)
.mapAsyc(envelope => someEventualWork(envelope))
.run()
From non-termination stream
val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))
readJournal
.eventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None, 5.seconds)
.mapAsyc(envelope => someEventualWork(envelope) )
.run()