diff --git a/build.sbt b/build.sbt index 835298b..ce94d2f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ -lazy val scala213 = "2.13.6" +lazy val scala213 = "2.13.11" lazy val scala300 = "3.0.0" lazy val supportedScalaVersions = List(scala213) -lazy val akkaVersion = "2.6.17" -lazy val rxmongoVersion = "1.0.7" +lazy val akkaVersion = "2.6.20" +lazy val rxmongoVersion = "1.0.10" publishArtifact := false publish := {} @@ -11,13 +11,14 @@ publishLocal := {} lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := s"1.6.0", + version := s"1.6.2", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( + "-Xsource:3", "-encoding", "UTF-8", - "-target:12", + "-release:17", "-deprecation", "-language:experimental.macros", // "-Ymacro-annotations", @@ -55,7 +56,6 @@ lazy val commonSettings = Seq( ).mkString(";"), Test / fork := true, Test / javaOptions += "-Xmx4G", - Test / javaOptions += "-XX:+CMSClassUnloadingEnabled", Test / javaOptions += "-Dfile.encoding=UTF-8" ) @@ -65,9 +65,9 @@ lazy val core = (project in file("core")) commonSettings, publishTo := Some( "nullvector" at (if (isSnapshot.value) - "https://nullvector.jfrog.io/artifactory/snapshots" + "https://nullvectormirror.jfrog.io/artifactory/libs-snapshots" else - "https://nullvector.jfrog.io/artifactory/releases") + "https://nullvectormirror.jfrog.io/artifactory/libs-release") ), credentials += Credentials(Path.userHome / ".jfrog" / "credentials"), Compile / packageDoc / publishArtifact := false, diff --git a/core/src/main/scala/org/nullvector/crud/ReactiveMongoCrud.scala b/core/src/main/scala/org/nullvector/crud/ReactiveMongoCrud.scala index 8f52278..af304f4 100644 --- a/core/src/main/scala/org/nullvector/crud/ReactiveMongoCrud.scala +++ b/core/src/main/scala/org/nullvector/crud/ReactiveMongoCrud.scala @@ -7,8 +7,9 @@ import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateSto import org.nullvector.ReactiveMongoDriver import org.nullvector.crud.ReactiveMongoCrud.Schema import org.nullvector.typed.ReactiveMongoEventSerializer -import reactivemongo.api.bson.BSONDocument +import reactivemongo.api.bson.{BSONDateTime, BSONDocument} +import java.time.{Clock, Instant} import scala.concurrent.{ExecutionContext, Future} object ReactiveMongoCrud { @@ -18,15 +19,18 @@ object ReactiveMongoCrud { val payload = "payload" val manifest = "manifest" val revision = "revision" + val created = "created" + val updated = "updated" val tags = "tags" } } -class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] with DurableStateUpdateStore[Any] { +class ReactiveMongoCrud(system: ActorSystem[?]) extends DurableStateStore[Any] with DurableStateUpdateStore[Any] { private implicit lazy val dispatcher: ExecutionContext = system.dispatchers.lookup(DispatcherSelector.fromConfig("akka-persistence-reactivemongo-dispatcher")) private val driver: ReactiveMongoDriver = ReactiveMongoDriver(system) private val serializer: ReactiveMongoEventSerializer = ReactiveMongoEventSerializer(system) + private val utcClock: Clock = Clock.systemUTC() override def getObject(persistenceId: String): Future[GetObjectResult[Any]] = { for { @@ -39,11 +43,12 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w serializer .deserialize(PersistentRepr(payload = payload, manifest = manifest)) .map(rep => Some(rep.payload) -> revision) - case None => Future.successful(None, 1L) + case None => Future.successful(None, 0L) } } yield GetObjectResult(found, revision) } override def upsertObject(persistenceId: String, revision: Long, value: Any, tag: String): Future[Done] = { + val nowBsonDateTime = BSONDateTime(Instant.now(utcClock).toEpochMilli) for { coll <- driver.crudCollection(persistenceId) rep <- serializer.serialize(PersistentRepr(value)) @@ -51,12 +56,13 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w .findAndUpdate( BSONDocument(Schema.persistenceId -> persistenceId, Schema.revision -> (revision - 1)), BSONDocument( - "$set" -> BSONDocument( + "$set" -> (BSONDocument( Schema.payload -> rep._1.payload.asInstanceOf[BSONDocument], Schema.manifest -> rep._1.manifest, Schema.revision -> revision, - Schema.tags -> rep._2 - ) + Schema.tags -> rep._2, + Schema.updated -> nowBsonDateTime + ) ++ (if (revision == 1) BSONDocument(Schema.created -> nowBsonDateTime) else BSONDocument())) ), upsert = true ) @@ -68,4 +74,11 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w _ <- coll.findAndRemove(BSONDocument(Schema.persistenceId -> persistenceId)) } yield Done } + + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = { + for { + coll <- driver.crudCollection(persistenceId) + _ <- coll.findAndRemove(BSONDocument(Schema.persistenceId -> persistenceId, Schema.revision -> revision)) + } yield Done + } } diff --git a/core/src/main/scala/org/nullvector/journal/InMemoryAsyncWriteJournal.scala b/core/src/main/scala/org/nullvector/journal/InMemoryAsyncWriteJournal.scala index 3620bb4..0302198 100644 --- a/core/src/main/scala/org/nullvector/journal/InMemoryAsyncWriteJournal.scala +++ b/core/src/main/scala/org/nullvector/journal/InMemoryAsyncWriteJournal.scala @@ -4,20 +4,20 @@ import akka.actor.ActorSystem import akka.persistence.{AtomicWrite, PersistentRepr} import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} +import org.nullvector.PersistInMemory import org.nullvector.PersistInMemory.EventEntry import org.nullvector.typed.ReactiveMongoEventSerializer -import org.nullvector.{PersistInMemory} import reactivemongo.api.bson.BSONDocument import scala.collection.immutable -import scala.concurrent.Future -import scala.util.{Success, Try} +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.Try class InMemoryAsyncWriteJournal(val system: ActorSystem) extends AsyncWriteJournalOps { - import akka.actor.typed.scaladsl.adapter._ + import akka.actor.typed.scaladsl.adapter.* - private implicit val ec = system.dispatcher + private implicit val ec: ExecutionContextExecutor = system.dispatcher private implicit val materializer: Materializer = Materializer.matFromSystem(system) private val eventSerializer: ReactiveMongoEventSerializer = diff --git a/core/src/test/scala/org/nullvector/crud/ReactiveMongoCrudTest.scala b/core/src/test/scala/org/nullvector/crud/ReactiveMongoCrudTest.scala index 988bf20..3ba0dd7 100644 --- a/core/src/test/scala/org/nullvector/crud/ReactiveMongoCrudTest.scala +++ b/core/src/test/scala/org/nullvector/crud/ReactiveMongoCrudTest.scala @@ -25,8 +25,7 @@ import scala.util.Random class ReactiveMongoCrudTest extends AsyncFlatSpec { implicit private val system: ActorSystem = ActorSystem("Crud") - val crud = DurableStateStoreRegistry - .get(system) + val crud = DurableStateStoreRegistry(system) .durableStateStoreFor[ReactiveMongoCrud](ReactiveMongoCrud.pluginId) private val driver: ReactiveMongoDriver = ReactiveMongoDriver(system) @@ -37,7 +36,7 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec { it should "insert an object" in { crud - .upsertObject(randomPersistenceId, 2, ChessBoard(Map("a1" -> "R")), "") + .upsertObject(randomPersistenceId, 1, ChessBoard(Map("a1" -> "R")), "") .map(_ shouldBe Done) } @@ -57,13 +56,13 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec { val originalChessBoard = ChessBoard(Map("a1" -> "RB")) val updatedChessBoard = originalChessBoard.copy(piecePositions = Map("4b" -> "KW")) for { - _ <- crud.upsertObject(pid, 35, originalChessBoard, "") + _ <- crud.upsertObject(pid, 1, originalChessBoard, "") result1 <- crud.getObject(pid) - _ <- crud.upsertObject(pid, 36, updatedChessBoard, "") + _ <- crud.upsertObject(pid, 2, updatedChessBoard, "") result2 <- crud.getObject(pid) } yield { - result1 shouldBe GetObjectResult(Some(originalChessBoard), 35) - result2 shouldBe GetObjectResult(Some(updatedChessBoard), 36) + result1 shouldBe GetObjectResult(Some(originalChessBoard), 1) + result2 shouldBe GetObjectResult(Some(updatedChessBoard), 2) } } @@ -77,7 +76,7 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec { result2 <- crud.getObject(pid) } yield { result1 shouldBe GetObjectResult(Some(chessBoard), 35) - result2 shouldBe GetObjectResult(None, 1) + result2 shouldBe GetObjectResult(None, 0) } } @@ -104,8 +103,8 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec { boardV2 <- chessBoardRef.ask(ChessBoardBehavior.GetBoard(_)) resultV2 <- crud.getObject(PersistenceId("ChessBoard", chessBoardId).id) } yield { - resultV1 shouldBe GetObjectResult(Some(boardV1), 2L) - resultV2 shouldBe GetObjectResult(Some(boardV2), 3L) + resultV1 shouldBe GetObjectResult(Some(boardV1), 1L) + resultV2 shouldBe GetObjectResult(Some(boardV2), 2L) } } diff --git a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala index fdf8917..099c6be 100644 --- a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala +++ b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala @@ -53,6 +53,8 @@ class ReactiveMongoReadJournalSpec() Await.result(rxDriver.journalCollection("j-0"), 1.second) } + behavior of "Read Journal" + it should "Events by tag from NoOffset" in { val prefixReadColl = "ReadCollection_A" @@ -439,7 +441,7 @@ class ReactiveMongoReadJournalSpec() .runWith(Sink.ignore), 14.seconds ) - Thread.sleep(100) + Thread.sleep(200) envelopes.peek().persistenceId shouldBe pId envelopes.size shouldBe 20 } @@ -450,14 +452,10 @@ class ReactiveMongoReadJournalSpec() val ids = new AtomicInteger() readJournal .persistenceIds() - .async .runWith(Sink.foreach(e => ids.incrementAndGet())) - .recover { case e: Throwable => - e.printStackTrace() - } Await.ready( Source(1 to 10) - .mapAsync(10) { collId => + .mapAsync(Runtime.getRuntime.availableProcessors()) { collId => reactiveMongoJournalImpl.asyncWriteMessages((1 to 25).map { jIdx => val pId = s"${prefixReadColl}_$collId-${Random.nextLong().abs}" AtomicWrite( @@ -469,7 +467,7 @@ class ReactiveMongoReadJournalSpec() ) }) } - .runWith(Sink.ignore), + .run(), 14.seconds ) Thread.sleep(2000) diff --git a/core/src/test/scala/org/nullvector/snapshot/ReactiveMongoSnapshotSpec.scala b/core/src/test/scala/org/nullvector/snapshot/ReactiveMongoSnapshotSpec.scala index 2894b34..7c5a320 100644 --- a/core/src/test/scala/org/nullvector/snapshot/ReactiveMongoSnapshotSpec.scala +++ b/core/src/test/scala/org/nullvector/snapshot/ReactiveMongoSnapshotSpec.scala @@ -1,7 +1,7 @@ package org.nullvector.snapshot -import akka.actor.typed import akka.actor.typed.scaladsl.Behaviors +import akka.actor.{ActorSystem, typed} import akka.persistence.{SnapshotMetadata, SnapshotSelectionCriteria} import akka.testkit.{ImplicitSender, TestKitBase} import com.typesafe.config.ConfigFactory @@ -14,15 +14,15 @@ import reactivemongo.api.bson.{BSONDocument, Macros} import util.Collections import java.util.Date -import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.concurrent.duration.* +import scala.concurrent.{Await, ExecutionContextExecutor} class ReactiveMongoSnapshotSpec() extends TestKitBase with ImplicitSender with AnyWordSpecLike with Matchers with BeforeAndAfterAll { private lazy implicit val typedAs: typed.ActorSystem[Nothing] = typed.ActorSystem(Behaviors.empty, "ReactiveMongoPlugin") - override lazy val system = typedAs.classicSystem - implicit lazy val ec = system.dispatcher + override lazy val system: ActorSystem = typedAs.classicSystem + implicit lazy val ec: ExecutionContextExecutor = system.dispatcher val snapshotter: ReactiveMongoSnapshotImpl = new ReactiveMongoSnapshotImpl(ConfigFactory.load(), system) diff --git a/project/build.properties b/project/build.properties index 5b6d073..fd5b157 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.5.0 \ No newline at end of file +sbt.version=1.9.0 \ No newline at end of file