From 529ad05fe8a373a43c3ae5fd42d9244304d9f483 Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Tue, 22 Jun 2021 18:45:47 -0300 Subject: [PATCH] Supporting #withTagger() of typed persistence. --- .../org/nullvector/TaggedEventAdapter.scala | 23 +++++++++++++++++++ build.sbt | 2 +- .../typed/ReactiveMongoEventSerializer.scala | 8 ++++++- .../typed/EventAdapterSerializerSpec.scala | 10 ++++++++ 4 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 api/src/main/scala/org/nullvector/TaggedEventAdapter.scala diff --git a/api/src/main/scala/org/nullvector/TaggedEventAdapter.scala b/api/src/main/scala/org/nullvector/TaggedEventAdapter.scala new file mode 100644 index 0000000..3a4a43b --- /dev/null +++ b/api/src/main/scala/org/nullvector/TaggedEventAdapter.scala @@ -0,0 +1,23 @@ +package org.nullvector + +import akka.persistence.journal.Tagged +import reactivemongo.api.bson.BSONDocument + +import scala.reflect.ClassTag + +class TaggedEventAdapter[E](adapter: EventAdapter[E], tags: Set[String]) + (implicit ev: ClassTag[E]) extends EventAdapter[E] { + + override val manifest: String = adapter.manifest + + override def tags(payload: E): Set[String] = tags + + override def payloadToBson(payload: E): BSONDocument = payload match { + case Tagged(realPayload, _) => adapter.toBson(realPayload) + case _ => adapter.payloadToBson(payload) + } + + override def bsonToPayload(doc: BSONDocument): E = adapter.bsonToPayload(doc) + +} + diff --git a/build.sbt b/build.sbt index d9ad075..2cbe7b2 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ publishLocal := {} lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := s"1.5.0", + version := s"1.5.1", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( diff --git a/core/src/main/scala/org/nullvector/typed/ReactiveMongoEventSerializer.scala b/core/src/main/scala/org/nullvector/typed/ReactiveMongoEventSerializer.scala index 35308bc..53053f5 100644 --- a/core/src/main/scala/org/nullvector/typed/ReactiveMongoEventSerializer.scala +++ b/core/src/main/scala/org/nullvector/typed/ReactiveMongoEventSerializer.scala @@ -4,10 +4,11 @@ import akka.Done import akka.actor.typed.scaladsl.{Behaviors, Routers} import akka.actor.typed._ import akka.persistence.PersistentRepr +import akka.persistence.journal.Tagged import akka.util.Timeout import org.nullvector.logging.LoggerPerClassAware import org.nullvector.typed.ReactiveMongoEventSerializer.SerializerBehavior -import org.nullvector.{AdapterKey, EventAdapter, Fields, BsonEventAdapter, ReactiveMongoPlugin} +import org.nullvector.{AdapterKey, BsonEventAdapter, EventAdapter, Fields, ReactiveMongoPlugin, TaggedEventAdapter} import reactivemongo.api.bson.BSONDocument import scala.collection.concurrent._ @@ -110,6 +111,11 @@ object ReactiveMongoEventSerializer extends ExtensionId[ReactiveMongoEventSerial def adapterByPayload(persistentRepr: PersistentRepr): Try[EventAdapter[_]] = { persistentRepr.payload match { case _: BSONDocument => Success(BsonEventAdapter) + case Tagged(payload, tags) => + adaptersByType.get(AdapterKey(payload.getClass)) + .fold[Try[EventAdapter[_]]](failureByPayload(persistentRepr))(adapter => + Success(new TaggedEventAdapter(adapter, tags))) + case payload => adaptersByType.get(AdapterKey(payload.getClass)) .fold[Try[EventAdapter[_]]](failureByPayload(persistentRepr))(Success(_)) diff --git a/core/src/test/scala/org/nullvector/typed/EventAdapterSerializerSpec.scala b/core/src/test/scala/org/nullvector/typed/EventAdapterSerializerSpec.scala index b6998de..76268ff 100644 --- a/core/src/test/scala/org/nullvector/typed/EventAdapterSerializerSpec.scala +++ b/core/src/test/scala/org/nullvector/typed/EventAdapterSerializerSpec.scala @@ -3,6 +3,7 @@ package org.nullvector.typed import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.persistence.PersistentRepr +import akka.persistence.journal.Tagged import org.nullvector.EventAdapterFactory import org.nullvector.typed.ReactiveMongoEventSerializer.Registry import org.scalatest.Matchers.{a, convertToAnyShouldWrapper, thrownBy} @@ -40,6 +41,15 @@ class EventAdapterSerializerSpec extends FlatSpec { deserialized.head._2 shouldBe Set("TwoEventTag") } + it should " serialize with tagger" in { + val serializer = ReactiveMongoEventSerializer(system) + val taggedEvent = Tagged(TwoEvent("TwoEventNameWithTagger"), Set("TagFromTagged")) + val repr = PersistentRepr(taggedEvent, manifest = "TwoManifest") + val future = serializer.serialize(Seq(repr)) + val deserialized = Await.result(future, 1.second) + deserialized.head._1.payload shouldBe BSONDocument("name" -> "TwoEventNameWithTagger") + deserialized.head._2 shouldBe Set("TagFromTagged") + } case class OneEvent(name: String)