From c900e1c5267960f06058f5a7544e722b4865ceb1 Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Fri, 19 Mar 2021 16:53:39 -0300 Subject: [PATCH] Using hint for recovery --- README.md | 2 +- build.sbt | 2 +- .../org/nullvector/ReactiveMongoDriver.scala | 6 ++-- .../journal/ReactiveMongoAsyncReplay.scala | 10 +++++- .../journal/PersistentActorSpec.scala | 31 +++++++++++++------ 5 files changed, 35 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 95fcab4..9e0249b 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Add in your `build.sbt` the following lines: resolvers += "null-vector" at "https://nullvector.jfrog.io/artifactory/releases" ``` ```scala -libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.5" +libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.7" ``` ## Configuration diff --git a/build.sbt b/build.sbt index 72b0ef8..3b16208 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ publishLocal := {} lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := s"1.4.6", + version := s"1.4.7", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( diff --git a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala index dddd24f..1f1872f 100644 --- a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala +++ b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala @@ -119,7 +119,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { } def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = { - if (shoudExplain(queryType)) { + if (shouldExplain(queryType)) { queryBuilder.explain().cursor().collect[List]() .map(docs => Try(Json.parse(BsonTextNormalizer(docs.head))).foreach(println)) } @@ -127,7 +127,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { def explainAgg(collection: BSONCollection) (queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = { - if (shoudExplain(queryType)) { + if (shouldExplain(queryType)) { collection .aggregatorContext[BSONDocument](stages, explain = true, hint = hint) .prepared @@ -138,7 +138,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { } - private def shoudExplain(queryType: QueryType) = { + private def shouldExplain(queryType: QueryType) = { explainOptions.exists(shouldType => shouldType == QueryType.All || shouldType == queryType) } } \ No newline at end of file diff --git a/core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncReplay.scala b/core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncReplay.scala index 4aec3ec..e08db45 100644 --- a/core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncReplay.scala +++ b/core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncReplay.scala @@ -23,8 +23,16 @@ trait ReactiveMongoAsyncReplay { Fields.to_sn -> BSONDocument("$gte" -> fromSequenceNr), Fields.from_sn -> BSONDocument("$lte" -> toSequenceNr), ) - val queryBuilder = collection.find(query) + val queryBuilder = collection + .find(query) + .hint(collection.hint(BSONDocument( + Fields.persistenceId -> 1, + Fields.to_sn -> 1, + Fields.from_sn -> 1, + ))) + rxDriver.explain(collection)(QueryType.Recovery, queryBuilder) + queryBuilder .cursor[BSONDocument]() .documentSource(if (max >= Int.MaxValue) Int.MaxValue else max.intValue()) diff --git a/core/src/test/scala/org/nullvector/journal/PersistentActorSpec.scala b/core/src/test/scala/org/nullvector/journal/PersistentActorSpec.scala index e65a66e..33031af 100644 --- a/core/src/test/scala/org/nullvector/journal/PersistentActorSpec.scala +++ b/core/src/test/scala/org/nullvector/journal/PersistentActorSpec.scala @@ -40,17 +40,19 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender actorRef ! Command("get_state") //Will recover Nothing expectMsg(13.seconds, None) - actorRef ! Command("Command1") - actorRef ! Command("Command2") - actorRef ! Command("Command3") - actorRef ! Command("Command4") - actorRef ! Command("Command5") - actorRef ! Command("Command6") - actorRef ! Command("Command7") - receiveN(7, 15.seconds) + actorRef ! Command("A") + actorRef ! CommandAll(Seq("B","C","D")) + actorRef ! Command("E") + actorRef ! CommandAll(Seq("F","G","H")) + actorRef ! Command("I") + receiveN(4, 15.seconds) + + actorRef ! Kill + + Thread.sleep(1000) actorRef ! Command("get_state") - expectMsg(Some("Command7")) + expectMsg(Some("I")) } "PersistAll" in { @@ -116,6 +118,8 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender case class Command(action: Any) + case class CommandAll(actions: Seq[Any]) + case class MultiCommand(action1: String, action2: String, action3: String) case class AnEvent(string: String) @@ -134,13 +138,20 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender sender() ! state case Command(action) => - persist(AnEvent(action.toString)) { event => state = Some(event.string) sender() ! Done if (lastSequenceNr % 13 == 0) saveSnapshot(AnEvent(action.toString)) } + case CommandAll(actions) => + persistAll(actions.map(a => AnEvent(a.toString))) { event => + } + defer(actions){ _ => + sender() ! Done + state = Some(actions.last.toString) + } + case MultiCommand(action1, action2, action3) => persistAll(Seq(AnEvent(action1), AnEvent(action2), AnEvent(action3))) { _ => } deferAsync(()) { _ =>