Skip to content

Commit

Permalink
driver connection
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Mar 18, 2021
1 parent 79ef367 commit 087adde
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
5 changes: 1 addition & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ publishArtifact := false
publish := {}
publishLocal := {}

publishArtifact := false
publish := {}
publishLocal := {}

lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := s"1.4.5",
version := s"1.4.6",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/nullvector/Collections.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste
config.getString("akka-persistence-reactivemongo.collection-name-mapping")
).get.getDeclaredConstructor(classOf[Config]).newInstance(config)

def database = currentDatabaseProvider.database
private def database = currentDatabaseProvider.database

override def receive: Receive = {
case SetDatabaseProvider(databaseProvider, ack) =>
currentDatabaseProvider = databaseProvider
case SetDatabaseProvider(aDatabaseProvider, ack) =>
currentDatabaseProvider = aDatabaseProvider
ack.success(Done)

case GetJournalCollectionNameFor(persistentId, promise) =>
Expand All @@ -51,10 +51,15 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste
promisedDone success Done

case GetJournals(response) =>
response completeWith (for {
names <- database.collectionNames
collections = names.filter(_.startsWith(journalPrefix)).map(database.collection[BSONCollection](_))
} yield collections)
val collections = database.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names =>
Future.traverse(names) { name =>
val promisedCollection = Promise[BSONCollection]
promisedCollection completeWith verifiedJournalCollection(name)
promisedCollection.future
}
}
response completeWith collections

}

private def verifiedJournalCollection(name: String): Future[BSONCollection] = {
Expand Down
31 changes: 23 additions & 8 deletions core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.nullvector.ReactiveMongoDriver.QueryType.QueryType
import org.nullvector.ReactiveMongoDriver.{DatabaseProvider, QueryType}
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.{JsString, Json}
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
Expand Down Expand Up @@ -33,16 +34,30 @@ object ReactiveMongoDriver extends ExtensionId[ReactiveMongoDriver] with Extensi
}

class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
protected val logger: Logger = LoggerFactory.getLogger(getClass)

private val dispatcherName = "akka-persistence-reactivemongo-dispatcher"
protected implicit val dispatcher: ExecutionContext = system.dispatchers.lookup(dispatcherName)
private implicit val timeout: Timeout = Timeout(5.seconds)
private val defaultProvider: DatabaseProvider = new DatabaseProvider {
private lazy val db: DB = {
val mongoUri = system.settings.config.getString("akka-persistence-reactivemongo.mongo-uri")
logger.info("Connecting to {}", mongoUri)
Await.result(
MongoConnection.fromString(mongoUri).flatMap { parsedUri =>
val databaseName = parsedUri.db.getOrElse(throw new Exception("Missing database name"))
AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName))
parsedUri.db match {
case Some(databaseName) =>
AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName))
.recover {
case throwable: Throwable =>
logger.error(throwable.getMessage, throwable)
throw throwable
}
case None =>
val exception = new IllegalStateException(s"Missing Database Name in $mongoUri")
logger.error(exception.getMessage, exception)
throw exception
}
},
30.seconds
)
Expand Down Expand Up @@ -96,11 +111,11 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
}

(extractValue("mongodb.explain-all").map(_ => QueryType.All) ::
extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) ::
extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) ::
extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) ::
extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) ::
Nil).flatten
extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) ::
extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) ::
extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) ::
extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) ::
Nil).flatten
}

def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = {
Expand All @@ -114,7 +129,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
(queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = {
if (shoudExplain(queryType)) {
collection
.aggregatorContext[BSONDocument](stages,explain = true, hint = hint)
.aggregatorContext[BSONDocument](stages, explain = true, hint = hint)
.prepared
.cursor
.collect[List]()
Expand Down

0 comments on commit 087adde

Please sign in to comment.