Skip to content

Commit

Permalink
Adding created timestamp and updated for crud plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea authored and mosaic-rodrigo-gomez committed Jun 10, 2023
1 parent 66b2f56 commit 41b4ab9
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 42 deletions.
16 changes: 8 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
lazy val scala213 = "2.13.6"
lazy val scala213 = "2.13.11"
lazy val scala300 = "3.0.0"
lazy val supportedScalaVersions = List(scala213)
lazy val akkaVersion = "2.6.17"
lazy val rxmongoVersion = "1.0.7"
lazy val akkaVersion = "2.6.20"
lazy val rxmongoVersion = "1.0.10"

publishArtifact := false
publish := {}
Expand All @@ -11,13 +11,14 @@ publishLocal := {}
lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := s"1.6.0",
version := s"1.6.2",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
"-Xsource:3",
"-encoding",
"UTF-8",
"-target:12",
"-release:17",
"-deprecation",
"-language:experimental.macros",
// "-Ymacro-annotations",
Expand Down Expand Up @@ -55,7 +56,6 @@ lazy val commonSettings = Seq(
).mkString(";"),
Test / fork := true,
Test / javaOptions += "-Xmx4G",
Test / javaOptions += "-XX:+CMSClassUnloadingEnabled",
Test / javaOptions += "-Dfile.encoding=UTF-8"
)

Expand All @@ -65,9 +65,9 @@ lazy val core = (project in file("core"))
commonSettings,
publishTo := Some(
"nullvector" at (if (isSnapshot.value)
"https://nullvector.jfrog.io/artifactory/snapshots"
"https://nullvectormirror.jfrog.io/artifactory/libs-snapshots"
else
"https://nullvector.jfrog.io/artifactory/releases")
"https://nullvectormirror.jfrog.io/artifactory/libs-release")
),
credentials += Credentials(Path.userHome / ".jfrog" / "credentials"),
Compile / packageDoc / publishArtifact := false,
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/org/nullvector/crud/ReactiveMongoCrud.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateSto
import org.nullvector.ReactiveMongoDriver
import org.nullvector.crud.ReactiveMongoCrud.Schema
import org.nullvector.typed.ReactiveMongoEventSerializer
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.{BSONDateTime, BSONDocument}

import java.time.{Clock, Instant}
import scala.concurrent.{ExecutionContext, Future}

object ReactiveMongoCrud {
Expand All @@ -18,15 +19,18 @@ object ReactiveMongoCrud {
val payload = "payload"
val manifest = "manifest"
val revision = "revision"
val created = "created"
val updated = "updated"
val tags = "tags"
}
}

class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] with DurableStateUpdateStore[Any] {
class ReactiveMongoCrud(system: ActorSystem[?]) extends DurableStateStore[Any] with DurableStateUpdateStore[Any] {
private implicit lazy val dispatcher: ExecutionContext =
system.dispatchers.lookup(DispatcherSelector.fromConfig("akka-persistence-reactivemongo-dispatcher"))
private val driver: ReactiveMongoDriver = ReactiveMongoDriver(system)
private val serializer: ReactiveMongoEventSerializer = ReactiveMongoEventSerializer(system)
private val utcClock: Clock = Clock.systemUTC()

override def getObject(persistenceId: String): Future[GetObjectResult[Any]] = {
for {
Expand All @@ -39,24 +43,26 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w
serializer
.deserialize(PersistentRepr(payload = payload, manifest = manifest))
.map(rep => Some(rep.payload) -> revision)
case None => Future.successful(None, 1L)
case None => Future.successful(None, 0L)
}
} yield GetObjectResult(found, revision)
}
override def upsertObject(persistenceId: String, revision: Long, value: Any, tag: String): Future[Done] = {
val nowBsonDateTime = BSONDateTime(Instant.now(utcClock).toEpochMilli)
for {
coll <- driver.crudCollection(persistenceId)
rep <- serializer.serialize(PersistentRepr(value))
_ <- coll
.findAndUpdate(
BSONDocument(Schema.persistenceId -> persistenceId, Schema.revision -> (revision - 1)),
BSONDocument(
"$set" -> BSONDocument(
"$set" -> (BSONDocument(
Schema.payload -> rep._1.payload.asInstanceOf[BSONDocument],
Schema.manifest -> rep._1.manifest,
Schema.revision -> revision,
Schema.tags -> rep._2
)
Schema.tags -> rep._2,
Schema.updated -> nowBsonDateTime
) ++ (if (revision == 1) BSONDocument(Schema.created -> nowBsonDateTime) else BSONDocument()))
),
upsert = true
)
Expand All @@ -68,4 +74,11 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w
_ <- coll.findAndRemove(BSONDocument(Schema.persistenceId -> persistenceId))
} yield Done
}

override def deleteObject(persistenceId: String, revision: Long): Future[Done] = {
for {
coll <- driver.crudCollection(persistenceId)
_ <- coll.findAndRemove(BSONDocument(Schema.persistenceId -> persistenceId, Schema.revision -> revision))
} yield Done
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import akka.actor.ActorSystem
import akka.persistence.{AtomicWrite, PersistentRepr}
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import org.nullvector.PersistInMemory
import org.nullvector.PersistInMemory.EventEntry
import org.nullvector.typed.ReactiveMongoEventSerializer
import org.nullvector.{PersistInMemory}
import reactivemongo.api.bson.BSONDocument

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{Success, Try}
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.Try

class InMemoryAsyncWriteJournal(val system: ActorSystem) extends AsyncWriteJournalOps {

import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.adapter.*

private implicit val ec = system.dispatcher
private implicit val ec: ExecutionContextExecutor = system.dispatcher
private implicit val materializer: Materializer =
Materializer.matFromSystem(system)
private val eventSerializer: ReactiveMongoEventSerializer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import scala.util.Random
class ReactiveMongoCrudTest extends AsyncFlatSpec {

implicit private val system: ActorSystem = ActorSystem("Crud")
val crud = DurableStateStoreRegistry
.get(system)
val crud = DurableStateStoreRegistry(system)
.durableStateStoreFor[ReactiveMongoCrud](ReactiveMongoCrud.pluginId)
private val driver: ReactiveMongoDriver = ReactiveMongoDriver(system)

Expand All @@ -37,7 +36,7 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec {

it should "insert an object" in {
crud
.upsertObject(randomPersistenceId, 2, ChessBoard(Map("a1" -> "R")), "")
.upsertObject(randomPersistenceId, 1, ChessBoard(Map("a1" -> "R")), "")
.map(_ shouldBe Done)
}

Expand All @@ -57,13 +56,13 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec {
val originalChessBoard = ChessBoard(Map("a1" -> "RB"))
val updatedChessBoard = originalChessBoard.copy(piecePositions = Map("4b" -> "KW"))
for {
_ <- crud.upsertObject(pid, 35, originalChessBoard, "")
_ <- crud.upsertObject(pid, 1, originalChessBoard, "")
result1 <- crud.getObject(pid)
_ <- crud.upsertObject(pid, 36, updatedChessBoard, "")
_ <- crud.upsertObject(pid, 2, updatedChessBoard, "")
result2 <- crud.getObject(pid)
} yield {
result1 shouldBe GetObjectResult(Some(originalChessBoard), 35)
result2 shouldBe GetObjectResult(Some(updatedChessBoard), 36)
result1 shouldBe GetObjectResult(Some(originalChessBoard), 1)
result2 shouldBe GetObjectResult(Some(updatedChessBoard), 2)
}
}

Expand All @@ -77,7 +76,7 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec {
result2 <- crud.getObject(pid)
} yield {
result1 shouldBe GetObjectResult(Some(chessBoard), 35)
result2 shouldBe GetObjectResult(None, 1)
result2 shouldBe GetObjectResult(None, 0)
}
}

Expand All @@ -104,8 +103,8 @@ class ReactiveMongoCrudTest extends AsyncFlatSpec {
boardV2 <- chessBoardRef.ask(ChessBoardBehavior.GetBoard(_))
resultV2 <- crud.getObject(PersistenceId("ChessBoard", chessBoardId).id)
} yield {
resultV1 shouldBe GetObjectResult(Some(boardV1), 2L)
resultV2 shouldBe GetObjectResult(Some(boardV2), 3L)
resultV1 shouldBe GetObjectResult(Some(boardV1), 1L)
resultV2 shouldBe GetObjectResult(Some(boardV2), 2L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class ReactiveMongoReadJournalSpec()
Await.result(rxDriver.journalCollection("j-0"), 1.second)
}

behavior of "Read Journal"

it should "Events by tag from NoOffset" in {
val prefixReadColl = "ReadCollection_A"

Expand Down Expand Up @@ -439,7 +441,7 @@ class ReactiveMongoReadJournalSpec()
.runWith(Sink.ignore),
14.seconds
)
Thread.sleep(100)
Thread.sleep(200)
envelopes.peek().persistenceId shouldBe pId
envelopes.size shouldBe 20
}
Expand All @@ -450,14 +452,10 @@ class ReactiveMongoReadJournalSpec()
val ids = new AtomicInteger()
readJournal
.persistenceIds()
.async
.runWith(Sink.foreach(e => ids.incrementAndGet()))
.recover { case e: Throwable =>
e.printStackTrace()
}
Await.ready(
Source(1 to 10)
.mapAsync(10) { collId =>
.mapAsync(Runtime.getRuntime.availableProcessors()) { collId =>
reactiveMongoJournalImpl.asyncWriteMessages((1 to 25).map { jIdx =>
val pId = s"${prefixReadColl}_$collId-${Random.nextLong().abs}"
AtomicWrite(
Expand All @@ -469,7 +467,7 @@ class ReactiveMongoReadJournalSpec()
)
})
}
.runWith(Sink.ignore),
.run(),
14.seconds
)
Thread.sleep(2000)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.nullvector.snapshot

import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.{ActorSystem, typed}
import akka.persistence.{SnapshotMetadata, SnapshotSelectionCriteria}
import akka.testkit.{ImplicitSender, TestKitBase}
import com.typesafe.config.ConfigFactory
Expand All @@ -14,15 +14,15 @@ import reactivemongo.api.bson.{BSONDocument, Macros}
import util.Collections

import java.util.Date
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.duration.*
import scala.concurrent.{Await, ExecutionContextExecutor}

class ReactiveMongoSnapshotSpec() extends TestKitBase with ImplicitSender with AnyWordSpecLike with Matchers with BeforeAndAfterAll {

private lazy implicit val typedAs: typed.ActorSystem[Nothing] =
typed.ActorSystem(Behaviors.empty, "ReactiveMongoPlugin")
override lazy val system = typedAs.classicSystem
implicit lazy val ec = system.dispatcher
override lazy val system: ActorSystem = typedAs.classicSystem
implicit lazy val ec: ExecutionContextExecutor = system.dispatcher

val snapshotter: ReactiveMongoSnapshotImpl =
new ReactiveMongoSnapshotImpl(ConfigFactory.load(), system)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.0
sbt.version=1.9.0

0 comments on commit 41b4ab9

Please sign in to comment.