Skip to content

Commit

Permalink
Supporting #withTagger() of typed persistence.
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Jun 22, 2021
1 parent 9427c33 commit 529ad05
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
23 changes: 23 additions & 0 deletions api/src/main/scala/org/nullvector/TaggedEventAdapter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.nullvector

import akka.persistence.journal.Tagged
import reactivemongo.api.bson.BSONDocument

import scala.reflect.ClassTag

class TaggedEventAdapter[E](adapter: EventAdapter[E], tags: Set[String])
(implicit ev: ClassTag[E]) extends EventAdapter[E] {

override val manifest: String = adapter.manifest

override def tags(payload: E): Set[String] = tags

override def payloadToBson(payload: E): BSONDocument = payload match {
case Tagged(realPayload, _) => adapter.toBson(realPayload)
case _ => adapter.payloadToBson(payload)
}

override def bsonToPayload(doc: BSONDocument): E = adapter.bsonToPayload(doc)

}

2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ publishLocal := {}
lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := s"1.5.0",
version := s"1.5.1",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import akka.Done
import akka.actor.typed.scaladsl.{Behaviors, Routers}
import akka.actor.typed._
import akka.persistence.PersistentRepr
import akka.persistence.journal.Tagged
import akka.util.Timeout
import org.nullvector.logging.LoggerPerClassAware
import org.nullvector.typed.ReactiveMongoEventSerializer.SerializerBehavior
import org.nullvector.{AdapterKey, EventAdapter, Fields, BsonEventAdapter, ReactiveMongoPlugin}
import org.nullvector.{AdapterKey, BsonEventAdapter, EventAdapter, Fields, ReactiveMongoPlugin, TaggedEventAdapter}
import reactivemongo.api.bson.BSONDocument

import scala.collection.concurrent._
Expand Down Expand Up @@ -110,6 +111,11 @@ object ReactiveMongoEventSerializer extends ExtensionId[ReactiveMongoEventSerial
def adapterByPayload(persistentRepr: PersistentRepr): Try[EventAdapter[_]] = {
persistentRepr.payload match {
case _: BSONDocument => Success(BsonEventAdapter)
case Tagged(payload, tags) =>
adaptersByType.get(AdapterKey(payload.getClass))
.fold[Try[EventAdapter[_]]](failureByPayload(persistentRepr))(adapter =>
Success(new TaggedEventAdapter(adapter, tags)))

case payload =>
adaptersByType.get(AdapterKey(payload.getClass))
.fold[Try[EventAdapter[_]]](failureByPayload(persistentRepr))(Success(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.nullvector.typed
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.PersistentRepr
import akka.persistence.journal.Tagged
import org.nullvector.EventAdapterFactory
import org.nullvector.typed.ReactiveMongoEventSerializer.Registry
import org.scalatest.Matchers.{a, convertToAnyShouldWrapper, thrownBy}
Expand Down Expand Up @@ -40,6 +41,15 @@ class EventAdapterSerializerSpec extends FlatSpec {
deserialized.head._2 shouldBe Set("TwoEventTag")
}

it should " serialize with tagger" in {
val serializer = ReactiveMongoEventSerializer(system)
val taggedEvent = Tagged(TwoEvent("TwoEventNameWithTagger"), Set("TagFromTagged"))
val repr = PersistentRepr(taggedEvent, manifest = "TwoManifest")
val future = serializer.serialize(Seq(repr))
val deserialized = Await.result(future, 1.second)
deserialized.head._1.payload shouldBe BSONDocument("name" -> "TwoEventNameWithTagger")
deserialized.head._2 shouldBe Set("TagFromTagged")
}

case class OneEvent(name: String)

Expand Down

0 comments on commit 529ad05

Please sign in to comment.