Skip to content

Commit

Permalink
Important fix using indices in events by tag.
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Jul 21, 2021
1 parent 529ad05 commit 46c36ee
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ publishLocal := {}
lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := s"1.5.1",
version := s"1.5.2",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/org/nullvector/Collections.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {
verifiedNames.clear()
promisedDone success Done

case GetJournals(response) =>
val collections = database.flatMap(_.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names =>
case GetJournals(response, collectionNames) =>
val collections = database.flatMap(_
.collectionNames.map { allNames =>
val journalNames = allNames.filter(_.startsWith(journalPrefix))
collectionNames match {
case Nil => journalNames
case _ => journalNames.filter(name => collectionNames.exists(colName => name.endsWith(colName)))
}
}.flatMap { names =>
Future.traverse(names) { name =>
val promisedCollection = Promise[BSONCollection]
promisedCollection completeWith verifiedJournalCollection(name)
Expand All @@ -67,7 +74,7 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {

case CheckHealth(ack) =>
val collections = Promise[List[BSONCollection]]
context.self ! GetJournals(collections)
context.self ! GetJournals(collections, Nil)
val eventualDone = collections.future.map(_.headOption).flatMap {
case Some(collection) => collection.find(BSONDocument.empty).one.map(_ => Done)
case None => Future.successful(Done)
Expand Down Expand Up @@ -162,10 +169,16 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {
}

private def ensureTagIndex(indexesManager: CollectionIndexesManager): Future[Unit] = {
ensureIndex(index(Seq(
val tagsById = ensureIndex(index(Seq(
"_id" -> IndexType.Ascending,
Fields.tags -> IndexType.Ascending,
), Some("_tags"), unique = true, sparse = true), indexesManager)
), Some("tags_by_id"), unique = true, sparse = true), indexesManager)

val allTags = ensureIndex(index(Seq(
Fields.tags -> IndexType.Ascending,
), Some("tags"), sparse = true), indexesManager)

tagsById flatMap (_ => allTags)
}

private def ensureIndex(index: Aux[BSONSerializationPack.type], indexesManager: CollectionIndexesManager): Future[Unit] = {
Expand Down Expand Up @@ -209,7 +222,7 @@ object Collections {

case class GetSnapshotCollectionNameFor(persistentId: String, response: Promise[BSONCollection]) extends Command

case class GetJournals(response: Promise[List[BSONCollection]]) extends Command
case class GetJournals(response: Promise[List[BSONCollection]], collectionNames: List[String]) extends Command

case class SetDatabaseProvider(databaseProvider: DatabaseProvider, ack: Promise[Done]) extends Command

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
promise.future
}

def journals(): Future[List[BSONCollection]] = {

def journals(collectionNames: List[String] = Nil): Future[List[BSONCollection]] = {
val promise = Promise[List[BSONCollection]]()
collections ! GetJournals(promise)
collections ! GetJournals(promise, collectionNames)
promise.future
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/nullvector/query/EventsQueries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ trait EventsQueries
}

def eventsByTagQuery(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
Source.future(rxDriver.journals())
Source.future(rxDriver.journals(collectionNames))
.withAttributes(ActorAttributes.dispatcher(ReactiveMongoPlugin.pluginDispatcherName))
.mapConcat(identity)
.splitWhen(_ => true)
Expand All @@ -127,12 +127,17 @@ trait EventsQueries

import collection.AggregationFramework._

val filterByOffsetExp = filterByOffset(offset)
val stages: List[PipelineOperator] = List(
Match(query(Fields.tags) ++ filterByOffset(offset)),
Match(query(Fields.tags) ++ filterByOffsetExp),
UnwindField(Fields.events),
Match(query(s"${Fields.events}.${Fields.tags}")),
)
val hint = Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1)))
val hint = filterByOffsetExp match {
case BSONDocument.empty => Some(collection.hint(BSONDocument(Fields.tags -> 1)))
case _ => Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1)))
}

rxDriver.explainAgg(collection)(QueryType.EventsByTag, stages, hint)

def aggregate(implicit producer: CursorProducer[BSONDocument]): producer.ProducedCursor = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ class ReactiveMongoJournalProvider(system: ExtendedActorSystem) extends ReadJour

import akka.actor.typed.scaladsl.adapter._

override val scaladslReadJournal: ReactiveMongoScalaReadJournal = UnderlyingPersistenceFactory(
new ReactiveMongoScalaReadJournalImpl(system),
new FromMemoryReadJournal(system.toTyped)
)(system)
override val scaladslReadJournal: ReactiveMongoScalaReadJournal = createUnderlyingFactory(Nil)

def readJournalFor(collectionNames: List[String]) = createUnderlyingFactory(collectionNames)

private def createUnderlyingFactory(names: List[String]) = {
UnderlyingPersistenceFactory(
new ReactiveMongoScalaReadJournalImpl(system, names),
new FromMemoryReadJournal(system.toTyped)
)(system)
}


override val javadslReadJournal: ReactiveMongoJavaReadJournal = new ReactiveMongoJavaReadJournal(scaladslReadJournal)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem)
class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem, protected val collectionNames: List[String])
extends akka.persistence.query.scaladsl.ReadJournal
with EventsQueries
with PersistenceIdsQueries with ReactiveMongoScalaReadJournal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ReactiveMongoReadJournalSpec() extends FlatSpec with TestKitBase with Impl
val reactiveMongoJournalImpl: ReactiveMongoJournalImpl = new ReactiveMongoJournalImpl(ConfigFactory.load(), system)

implicit val materializer: Materializer = Materializer.matFromSystem(system)
val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).scaladslReadJournal
val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).readJournalFor(Nil)
private val serializer = ReactiveMongoEventSerializer(system.toTyped)
serializer.addAdapter(new SomeEventAdapter())

Expand Down

0 comments on commit 46c36ee

Please sign in to comment.