Skip to content

Commit

Permalink
Using hint for recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Mar 19, 2021
1 parent 087adde commit c900e1c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Add in your `build.sbt` the following lines:
resolvers += "null-vector" at "https://nullvector.jfrog.io/artifactory/releases"
```
```scala
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.5"
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.7"
```

## Configuration
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ publishLocal := {}
lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := s"1.4.6",
version := s"1.4.7",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
}

def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = {
if (shoudExplain(queryType)) {
if (shouldExplain(queryType)) {
queryBuilder.explain().cursor().collect[List]()
.map(docs => Try(Json.parse(BsonTextNormalizer(docs.head))).foreach(println))
}
}

def explainAgg(collection: BSONCollection)
(queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = {
if (shoudExplain(queryType)) {
if (shouldExplain(queryType)) {
collection
.aggregatorContext[BSONDocument](stages, explain = true, hint = hint)
.prepared
Expand All @@ -138,7 +138,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
}


private def shoudExplain(queryType: QueryType) = {
private def shouldExplain(queryType: QueryType) = {
explainOptions.exists(shouldType => shouldType == QueryType.All || shouldType == queryType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@ trait ReactiveMongoAsyncReplay {
Fields.to_sn -> BSONDocument("$gte" -> fromSequenceNr),
Fields.from_sn -> BSONDocument("$lte" -> toSequenceNr),
)
val queryBuilder = collection.find(query)
val queryBuilder = collection
.find(query)
.hint(collection.hint(BSONDocument(
Fields.persistenceId -> 1,
Fields.to_sn -> 1,
Fields.from_sn -> 1,
)))

rxDriver.explain(collection)(QueryType.Recovery, queryBuilder)

queryBuilder
.cursor[BSONDocument]()
.documentSource(if (max >= Int.MaxValue) Int.MaxValue else max.intValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender
actorRef ! Command("get_state") //Will recover Nothing
expectMsg(13.seconds, None)

actorRef ! Command("Command1")
actorRef ! Command("Command2")
actorRef ! Command("Command3")
actorRef ! Command("Command4")
actorRef ! Command("Command5")
actorRef ! Command("Command6")
actorRef ! Command("Command7")
receiveN(7, 15.seconds)
actorRef ! Command("A")
actorRef ! CommandAll(Seq("B","C","D"))
actorRef ! Command("E")
actorRef ! CommandAll(Seq("F","G","H"))
actorRef ! Command("I")
receiveN(4, 15.seconds)

actorRef ! Kill

Thread.sleep(1000)

actorRef ! Command("get_state")
expectMsg(Some("Command7"))
expectMsg(Some("I"))
}

"PersistAll" in {
Expand Down Expand Up @@ -116,6 +118,8 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender

case class Command(action: Any)

case class CommandAll(actions: Seq[Any])

case class MultiCommand(action1: String, action2: String, action3: String)

case class AnEvent(string: String)
Expand All @@ -134,13 +138,20 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender
sender() ! state

case Command(action) =>

persist(AnEvent(action.toString)) { event =>
state = Some(event.string)
sender() ! Done
if (lastSequenceNr % 13 == 0) saveSnapshot(AnEvent(action.toString))
}

case CommandAll(actions) =>
persistAll(actions.map(a => AnEvent(a.toString))) { event =>
}
defer(actions){ _ =>
sender() ! Done
state = Some(actions.last.toString)
}

case MultiCommand(action1, action2, action3) =>
persistAll(Seq(AnEvent(action1), AnEvent(action2), AnEvent(action3))) { _ => }
deferAsync(()) { _ =>
Expand Down

0 comments on commit c900e1c

Please sign in to comment.