diff --git a/build.sbt b/build.sbt index 2cbe7b2..8ac4a44 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ publishLocal := {} lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := s"1.5.1", + version := s"1.5.2", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( diff --git a/core/src/main/scala/org/nullvector/Collections.scala b/core/src/main/scala/org/nullvector/Collections.scala index e4864b8..3d7e326 100644 --- a/core/src/main/scala/org/nullvector/Collections.scala +++ b/core/src/main/scala/org/nullvector/Collections.scala @@ -55,8 +55,15 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging { verifiedNames.clear() promisedDone success Done - case GetJournals(response) => - val collections = database.flatMap(_.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names => + case GetJournals(response, collectionNames) => + val collections = database.flatMap(_ + .collectionNames.map { allNames => + val journalNames = allNames.filter(_.startsWith(journalPrefix)) + collectionNames match { + case Nil => journalNames + case _ => journalNames.filter(name => collectionNames.exists(colName => name.endsWith(colName))) + } + }.flatMap { names => Future.traverse(names) { name => val promisedCollection = Promise[BSONCollection] promisedCollection completeWith verifiedJournalCollection(name) @@ -67,7 +74,7 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging { case CheckHealth(ack) => val collections = Promise[List[BSONCollection]] - context.self ! GetJournals(collections) + context.self ! GetJournals(collections, Nil) val eventualDone = collections.future.map(_.headOption).flatMap { case Some(collection) => collection.find(BSONDocument.empty).one.map(_ => Done) case None => Future.successful(Done) @@ -162,10 +169,16 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging { } private def ensureTagIndex(indexesManager: CollectionIndexesManager): Future[Unit] = { - ensureIndex(index(Seq( + val tagsById = ensureIndex(index(Seq( "_id" -> IndexType.Ascending, Fields.tags -> IndexType.Ascending, - ), Some("_tags"), unique = true, sparse = true), indexesManager) + ), Some("tags_by_id"), unique = true, sparse = true), indexesManager) + + val allTags = ensureIndex(index(Seq( + Fields.tags -> IndexType.Ascending, + ), Some("tags"), sparse = true), indexesManager) + + tagsById flatMap (_ => allTags) } private def ensureIndex(index: Aux[BSONSerializationPack.type], indexesManager: CollectionIndexesManager): Future[Unit] = { @@ -209,7 +222,7 @@ object Collections { case class GetSnapshotCollectionNameFor(persistentId: String, response: Promise[BSONCollection]) extends Command - case class GetJournals(response: Promise[List[BSONCollection]]) extends Command + case class GetJournals(response: Promise[List[BSONCollection]], collectionNames: List[String]) extends Command case class SetDatabaseProvider(databaseProvider: DatabaseProvider, ack: Promise[Done]) extends Command diff --git a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala index ebe5d3a..8339a27 100644 --- a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala +++ b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala @@ -58,9 +58,10 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { promise.future } - def journals(): Future[List[BSONCollection]] = { + + def journals(collectionNames: List[String] = Nil): Future[List[BSONCollection]] = { val promise = Promise[List[BSONCollection]]() - collections ! GetJournals(promise) + collections ! GetJournals(promise, collectionNames) promise.future } diff --git a/core/src/main/scala/org/nullvector/query/EventsQueries.scala b/core/src/main/scala/org/nullvector/query/EventsQueries.scala index 66bd9c7..6e238c5 100644 --- a/core/src/main/scala/org/nullvector/query/EventsQueries.scala +++ b/core/src/main/scala/org/nullvector/query/EventsQueries.scala @@ -108,7 +108,7 @@ trait EventsQueries } def eventsByTagQuery(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = { - Source.future(rxDriver.journals()) + Source.future(rxDriver.journals(collectionNames)) .withAttributes(ActorAttributes.dispatcher(ReactiveMongoPlugin.pluginDispatcherName)) .mapConcat(identity) .splitWhen(_ => true) @@ -127,12 +127,17 @@ trait EventsQueries import collection.AggregationFramework._ + val filterByOffsetExp = filterByOffset(offset) val stages: List[PipelineOperator] = List( - Match(query(Fields.tags) ++ filterByOffset(offset)), + Match(query(Fields.tags) ++ filterByOffsetExp), UnwindField(Fields.events), Match(query(s"${Fields.events}.${Fields.tags}")), ) - val hint = Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1))) + val hint = filterByOffsetExp match { + case BSONDocument.empty => Some(collection.hint(BSONDocument(Fields.tags -> 1))) + case _ => Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1))) + } + rxDriver.explainAgg(collection)(QueryType.EventsByTag, stages, hint) def aggregate(implicit producer: CursorProducer[BSONDocument]): producer.ProducedCursor = { diff --git a/core/src/main/scala/org/nullvector/query/ReactiveMongoJournalProvider.scala b/core/src/main/scala/org/nullvector/query/ReactiveMongoJournalProvider.scala index e2c58bf..53f8503 100644 --- a/core/src/main/scala/org/nullvector/query/ReactiveMongoJournalProvider.scala +++ b/core/src/main/scala/org/nullvector/query/ReactiveMongoJournalProvider.scala @@ -16,10 +16,17 @@ class ReactiveMongoJournalProvider(system: ExtendedActorSystem) extends ReadJour import akka.actor.typed.scaladsl.adapter._ - override val scaladslReadJournal: ReactiveMongoScalaReadJournal = UnderlyingPersistenceFactory( - new ReactiveMongoScalaReadJournalImpl(system), - new FromMemoryReadJournal(system.toTyped) - )(system) + override val scaladslReadJournal: ReactiveMongoScalaReadJournal = createUnderlyingFactory(Nil) + + def readJournalFor(collectionNames: List[String]) = createUnderlyingFactory(collectionNames) + + private def createUnderlyingFactory(names: List[String]) = { + UnderlyingPersistenceFactory( + new ReactiveMongoScalaReadJournalImpl(system, names), + new FromMemoryReadJournal(system.toTyped) + )(system) + } + override val javadslReadJournal: ReactiveMongoJavaReadJournal = new ReactiveMongoJavaReadJournal(scaladslReadJournal) } diff --git a/core/src/main/scala/org/nullvector/query/ReactiveMongoScalaReadJournalImpl.scala b/core/src/main/scala/org/nullvector/query/ReactiveMongoScalaReadJournalImpl.scala index b83c520..b3d6cd3 100644 --- a/core/src/main/scala/org/nullvector/query/ReactiveMongoScalaReadJournalImpl.scala +++ b/core/src/main/scala/org/nullvector/query/ReactiveMongoScalaReadJournalImpl.scala @@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem) +class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem, protected val collectionNames: List[String]) extends akka.persistence.query.scaladsl.ReadJournal with EventsQueries with PersistenceIdsQueries with ReactiveMongoScalaReadJournal { diff --git a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala index d7dfeea..48e08a0 100644 --- a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala +++ b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala @@ -37,7 +37,7 @@ class ReactiveMongoReadJournalSpec() extends FlatSpec with TestKitBase with Impl val reactiveMongoJournalImpl: ReactiveMongoJournalImpl = new ReactiveMongoJournalImpl(ConfigFactory.load(), system) implicit val materializer: Materializer = Materializer.matFromSystem(system) - val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).scaladslReadJournal + val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).readJournalFor(Nil) private val serializer = ReactiveMongoEventSerializer(system.toTyped) serializer.addAdapter(new SomeEventAdapter())