Skip to content

Commit

Permalink
some refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Sep 2, 2021
1 parent 54fc957 commit 625e835
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ If you want different refresh intervals from different query, you can add a `Ref
```scala
val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))
readJournal
.currentEventsByTag("TAG", NoOffset, BSONDocument("events.p.customerId" -> customerId), None)
.currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None)
.mapAsyc(envelope => someEventualWork(envelope))
.run()
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trait CustomReadOps {
* an optional hint index to use with filter
* @return
*/
def currentEventsByTag(
def currentEventsByTags(
tag: Seq[String],
offset: Offset,
eventFilter: BSONDocument,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ trait EventsQueries
eventsByTagQuery(tags, offset, BSONDocument.empty, None)
}

override def currentEventsByTag(
override def currentEventsByTags(
tag: Seq[String],
offset: Offset,
eventFilter: BSONDocument,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoSc
* a document filter for events
* @return
*/
override def currentEventsByTag(
override def currentEventsByTags(
tag: Seq[String],
offset: Offset,
eventFilter: BSONDocument,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ReactiveMongoReadJournalSpec()
)

{
val future = readJournal.currentEventsByTag(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> "5"), None).runWith(Sink.seq)
val future = readJournal.currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> "5"), None).runWith(Sink.seq)
Await.result(future, 1.second).size shouldBe 10
}
}
Expand Down Expand Up @@ -270,9 +270,9 @@ class ReactiveMongoReadJournalSpec()
.run(),
14.seconds
)
Thread.sleep(1500)
Thread.sleep(200)
val offset = ObjectIdOffset.newOffset()
Thread.sleep(1000)
Thread.sleep(200)
Await.ready(
Source(1 to 10)
.mapAsync(amountOfCores) { idx =>
Expand All @@ -292,7 +292,7 @@ class ReactiveMongoReadJournalSpec()
.run(),
14.seconds
)
Thread.sleep(1500)
Thread.sleep(200)
val eventualDone =
readJournal.currentEventsByTag("event_tag_1", offset).runWith(Sink.seq)
val envelopes = Await.result(eventualDone, 14.seconds)
Expand Down Expand Up @@ -400,11 +400,9 @@ class ReactiveMongoReadJournalSpec()
val pId = s"$prefixReadColl-123"
readJournal
.eventsByPersistenceId(pId, 0L, Long.MaxValue)
.addAttributes(RefreshInterval(5.millis))
.addAttributes(RefreshInterval(1.millis))
.runWith(Sink.foreach(e => envelopes.add(e)))
.recover { case e: Throwable =>
e.printStackTrace()
}

Await.ready(
Source(1 to 10)
.mapAsync(amountOfCores) { idx =>
Expand All @@ -423,7 +421,6 @@ class ReactiveMongoReadJournalSpec()
.runWith(Sink.ignore),
14.seconds
)
Thread.sleep(1000)
Await.ready(
Source(11 to 20)
.mapAsync(amountOfCores) { idx =>
Expand All @@ -442,7 +439,7 @@ class ReactiveMongoReadJournalSpec()
.runWith(Sink.ignore),
14.seconds
)
Thread.sleep(1000)
Thread.sleep(100)
envelopes.peek().persistenceId shouldBe pId
envelopes.size shouldBe 20
}
Expand Down

0 comments on commit 625e835

Please sign in to comment.