From 625e8352ef18d0dd2ffbe4918b1b5618ff3f780d Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Thu, 2 Sep 2021 17:03:32 -0300 Subject: [PATCH] some refactors --- README.md | 2 +- .../org/nullvector/query/CustomReadOps.scala | 2 +- .../org/nullvector/query/EventsQueries.scala | 2 +- .../query/FromMemoryReadJournal.scala | 2 +- .../queries/ReactiveMongoReadJournalSpec.scala | 17 +++++++---------- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 74a69f3..f9c608a 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ If you want different refresh intervals from different query, you can add a `Ref ```scala val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders")) readJournal - .currentEventsByTag("TAG", NoOffset, BSONDocument("events.p.customerId" -> customerId), None) + .currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None) .mapAsyc(envelope => someEventualWork(envelope)) .run() ``` diff --git a/core/src/main/scala/org/nullvector/query/CustomReadOps.scala b/core/src/main/scala/org/nullvector/query/CustomReadOps.scala index 89f563c..c85168e 100644 --- a/core/src/main/scala/org/nullvector/query/CustomReadOps.scala +++ b/core/src/main/scala/org/nullvector/query/CustomReadOps.scala @@ -21,7 +21,7 @@ trait CustomReadOps { * an optional hint index to use with filter * @return */ - def currentEventsByTag( + def currentEventsByTags( tag: Seq[String], offset: Offset, eventFilter: BSONDocument, diff --git a/core/src/main/scala/org/nullvector/query/EventsQueries.scala b/core/src/main/scala/org/nullvector/query/EventsQueries.scala index a05e46c..7d66744 100644 --- a/core/src/main/scala/org/nullvector/query/EventsQueries.scala +++ b/core/src/main/scala/org/nullvector/query/EventsQueries.scala @@ -123,7 +123,7 @@ trait EventsQueries eventsByTagQuery(tags, offset, BSONDocument.empty, None) } - override def currentEventsByTag( + override def currentEventsByTags( tag: Seq[String], offset: Offset, eventFilter: BSONDocument, diff --git a/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala b/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala index 7e5b2ba..dbdf9de 100644 --- a/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala +++ b/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala @@ -158,7 +158,7 @@ class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoSc * a document filter for events * @return */ - override def currentEventsByTag( + override def currentEventsByTags( tag: Seq[String], offset: Offset, eventFilter: BSONDocument, diff --git a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala index 52e9f48..b448de1 100644 --- a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala +++ b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala @@ -136,7 +136,7 @@ class ReactiveMongoReadJournalSpec() ) { - val future = readJournal.currentEventsByTag(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> "5"), None).runWith(Sink.seq) + val future = readJournal.currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> "5"), None).runWith(Sink.seq) Await.result(future, 1.second).size shouldBe 10 } } @@ -270,9 +270,9 @@ class ReactiveMongoReadJournalSpec() .run(), 14.seconds ) - Thread.sleep(1500) + Thread.sleep(200) val offset = ObjectIdOffset.newOffset() - Thread.sleep(1000) + Thread.sleep(200) Await.ready( Source(1 to 10) .mapAsync(amountOfCores) { idx => @@ -292,7 +292,7 @@ class ReactiveMongoReadJournalSpec() .run(), 14.seconds ) - Thread.sleep(1500) + Thread.sleep(200) val eventualDone = readJournal.currentEventsByTag("event_tag_1", offset).runWith(Sink.seq) val envelopes = Await.result(eventualDone, 14.seconds) @@ -400,11 +400,9 @@ class ReactiveMongoReadJournalSpec() val pId = s"$prefixReadColl-123" readJournal .eventsByPersistenceId(pId, 0L, Long.MaxValue) - .addAttributes(RefreshInterval(5.millis)) + .addAttributes(RefreshInterval(1.millis)) .runWith(Sink.foreach(e => envelopes.add(e))) - .recover { case e: Throwable => - e.printStackTrace() - } + Await.ready( Source(1 to 10) .mapAsync(amountOfCores) { idx => @@ -423,7 +421,6 @@ class ReactiveMongoReadJournalSpec() .runWith(Sink.ignore), 14.seconds ) - Thread.sleep(1000) Await.ready( Source(11 to 20) .mapAsync(amountOfCores) { idx => @@ -442,7 +439,7 @@ class ReactiveMongoReadJournalSpec() .runWith(Sink.ignore), 14.seconds ) - Thread.sleep(1000) + Thread.sleep(100) envelopes.peek().persistenceId shouldBe pId envelopes.size shouldBe 20 }