From 087adde69819102b1ecfd9b81ec5cc8579d754b3 Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Thu, 18 Mar 2021 19:28:44 -0300 Subject: [PATCH] driver connection --- build.sbt | 5 +-- .../scala/org/nullvector/Collections.scala | 19 +++++++----- .../org/nullvector/ReactiveMongoDriver.scala | 31 ++++++++++++++----- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/build.sbt b/build.sbt index 2908ad6..72b0ef8 100644 --- a/build.sbt +++ b/build.sbt @@ -8,14 +8,11 @@ publishArtifact := false publish := {} publishLocal := {} -publishArtifact := false -publish := {} -publishLocal := {} lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := s"1.4.5", + version := s"1.4.6", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( diff --git a/core/src/main/scala/org/nullvector/Collections.scala b/core/src/main/scala/org/nullvector/Collections.scala index cf44a94..13c4c50 100644 --- a/core/src/main/scala/org/nullvector/Collections.scala +++ b/core/src/main/scala/org/nullvector/Collections.scala @@ -29,11 +29,11 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste config.getString("akka-persistence-reactivemongo.collection-name-mapping") ).get.getDeclaredConstructor(classOf[Config]).newInstance(config) - def database = currentDatabaseProvider.database + private def database = currentDatabaseProvider.database override def receive: Receive = { - case SetDatabaseProvider(databaseProvider, ack) => - currentDatabaseProvider = databaseProvider + case SetDatabaseProvider(aDatabaseProvider, ack) => + currentDatabaseProvider = aDatabaseProvider ack.success(Done) case GetJournalCollectionNameFor(persistentId, promise) => @@ -51,10 +51,15 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste promisedDone success Done case GetJournals(response) => - response completeWith (for { - names <- database.collectionNames - collections = names.filter(_.startsWith(journalPrefix)).map(database.collection[BSONCollection](_)) - } yield collections) + val collections = database.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names => + Future.traverse(names) { name => + val promisedCollection = Promise[BSONCollection] + promisedCollection completeWith verifiedJournalCollection(name) + promisedCollection.future + } + } + response completeWith collections + } private def verifiedJournalCollection(name: String): Future[BSONCollection] = { diff --git a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala index 8334d53..dddd24f 100644 --- a/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala +++ b/core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala @@ -6,6 +6,7 @@ import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.nullvector.ReactiveMongoDriver.QueryType.QueryType import org.nullvector.ReactiveMongoDriver.{DatabaseProvider, QueryType} +import org.slf4j.{Logger, LoggerFactory} import play.api.libs.json.{JsString, Json} import reactivemongo.api.bson.BSONDocument import reactivemongo.api.bson.collection.BSONCollection @@ -33,16 +34,30 @@ object ReactiveMongoDriver extends ExtensionId[ReactiveMongoDriver] with Extensi } class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { + protected val logger: Logger = LoggerFactory.getLogger(getClass) + private val dispatcherName = "akka-persistence-reactivemongo-dispatcher" protected implicit val dispatcher: ExecutionContext = system.dispatchers.lookup(dispatcherName) private implicit val timeout: Timeout = Timeout(5.seconds) private val defaultProvider: DatabaseProvider = new DatabaseProvider { private lazy val db: DB = { val mongoUri = system.settings.config.getString("akka-persistence-reactivemongo.mongo-uri") + logger.info("Connecting to {}", mongoUri) Await.result( MongoConnection.fromString(mongoUri).flatMap { parsedUri => - val databaseName = parsedUri.db.getOrElse(throw new Exception("Missing database name")) - AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName)) + parsedUri.db match { + case Some(databaseName) => + AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName)) + .recover { + case throwable: Throwable => + logger.error(throwable.getMessage, throwable) + throw throwable + } + case None => + val exception = new IllegalStateException(s"Missing Database Name in $mongoUri") + logger.error(exception.getMessage, exception) + throw exception + } }, 30.seconds ) @@ -96,11 +111,11 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { } (extractValue("mongodb.explain-all").map(_ => QueryType.All) :: - extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) :: - extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) :: - extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) :: - extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) :: - Nil).flatten + extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) :: + extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) :: + extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) :: + extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) :: + Nil).flatten } def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = { @@ -114,7 +129,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension { (queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = { if (shoudExplain(queryType)) { collection - .aggregatorContext[BSONDocument](stages,explain = true, hint = hint) + .aggregatorContext[BSONDocument](stages, explain = true, hint = hint) .prepared .cursor .collect[List]()