diff --git a/.gitignore b/.gitignore index c7de436..863e30a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /project/target .DS_Store /project/project/ +/.bsp/ diff --git a/build.sbt b/build.sbt index da09219..e2a5e23 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ lazy val scala213 = "2.13.4" +lazy val scala300 = "3.0.0-M2" lazy val supportedScalaVersions = List(scala213) lazy val akkaVersion = "2.6.10" lazy val rxmongoVersion = "1.0.1" @@ -6,7 +7,7 @@ lazy val rxmongoVersion = "1.0.1" lazy val commonSettings = Seq( name := "akka-reactivemongo-plugin", organization := "null-vector", - version := "1.4.3", + version := s"1.4.4", scalaVersion := scala213, crossScalaVersions := supportedScalaVersions, scalacOptions := Seq( diff --git a/core/src/main/scala/org/nullvector/CollectionNameMapping.scala b/core/src/main/scala/org/nullvector/CollectionNameMapping.scala index 8219dc4..5300f1f 100644 --- a/core/src/main/scala/org/nullvector/CollectionNameMapping.scala +++ b/core/src/main/scala/org/nullvector/CollectionNameMapping.scala @@ -10,13 +10,18 @@ trait CollectionNameMapping { class DefaultCollectionNameMapping(config: Config) extends CollectionNameMapping { private val separator: String = config.getString("akka-persistence-reactivemongo.persistence-id-separator") - private val pattern: Regex = buildPattern(separator.head) + private val pattern: Regex = buildPattern(separator.headOption) override def collectionNameOf(persistentId: String): Option[String] = persistentId match { + case pattern(name, _) if name.isEmpty => None case pattern(name, _) => Some(name) case _ => None } - private def buildPattern(separator: Char) = s"(\\w+)[$separator](.+)".r + private def buildPattern(maybeSeparator: Option[Char]) = maybeSeparator match { + case Some(char) => s"(\\w+)[$char](.+)".r + case None => s"()(.+)".r + } + } diff --git a/core/src/main/scala/org/nullvector/query/EventsQueries.scala b/core/src/main/scala/org/nullvector/query/EventsQueries.scala index af9e236..fd73ce1 100644 --- a/core/src/main/scala/org/nullvector/query/EventsQueries.scala +++ b/core/src/main/scala/org/nullvector/query/EventsQueries.scala @@ -41,7 +41,7 @@ trait EventsQueries (_, fromToSequences) => fromToSequences, offset => currentEventsByPersistenceId(persistenceId, offset._1, offset._2) )) - .flatMapConcat(identity) + .mapConcat(identity) } override def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { @@ -54,8 +54,8 @@ trait EventsQueries override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = Source .fromGraph(new PullerGraph[EventEnvelope, Offset]( - offset, defaultRefreshInterval, _.offset, greaterOffsetOf, o => currentEventsByTag(tag, o))) - .flatMapConcat(identity) + offset, defaultRefreshInterval, _.offset, greaterOffsetOf, offset => currentEventsByTag(tag, offset))) + .mapConcat(identity) /* * Query events that have a specific tag. Those events matching target tags would @@ -83,11 +83,11 @@ trait EventsQueries } private def eventsByTagQuery(tags: Seq[String], offset: Offset)(implicit serializableMethod: (BSONDocument, BSONDocument) => Future[Any]): Source[EventEnvelope, NotUsed] = { - Source.lazyFuture(() => rxDriver.journals()) + Source.future(rxDriver.journals()) .mapConcat(identity) .splitWhen(_ => true) .flatMapConcat(buildFindEventsByTagsQuery(_, offset, tags)) - .mergeSubstreams + .mergeSubstreamsWithParallelism(amountOfCores) .via(document2Envelope(serializableMethod)) } @@ -110,7 +110,6 @@ trait EventsQueries private def buildFindEventsByTagsQuery(coll: collection.BSONCollection, offset: Offset, tags: Seq[String]) = { def query(field: String) = BSONDocument(field -> BSONDocument("$in" -> tags)) - coll .aggregateWith[BSONDocument]()(framework => List( diff --git a/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala b/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala index 543a459..8c859c8 100644 --- a/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala +++ b/core/src/main/scala/org/nullvector/query/FromMemoryReadJournal.scala @@ -3,6 +3,7 @@ package org.nullvector.query import akka.NotUsed import akka.actor.typed.ActorSystem import akka.persistence.query.{EventEnvelope, NoOffset, Offset} +import akka.stream.Materializer import akka.stream.scaladsl.Source import org.nullvector.PersistInMemory.EventWithOffset import org.nullvector.{PersistInMemory, ReactiveMongoEventSerializer} @@ -12,6 +13,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future} class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoScalaReadJournal { private implicit val ec: ExecutionContextExecutor = actorSystem.executionContext + private implicit val mat: Materializer = Materializer.matFromSystem(actorSystem) private val memory: PersistInMemory = PersistInMemory(actorSystem) private val serializer: ReactiveMongoEventSerializer = ReactiveMongoEventSerializer(actorSystem) val defaultRefreshInterval: FiniteDuration = 500.millis @@ -28,7 +30,7 @@ class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoSc .fromGraph( new PullerGraph[EventEnvelope, Offset](offset, defaultRefreshInterval, _.offset, greaterOffsetOf, offset => currentEventsByTag(tag, offset)) ) - .flatMapConcat(identity) + .mapConcat(identity) override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = currentEventsByTags(Seq(tag), offset) diff --git a/core/src/main/scala/org/nullvector/query/ObjectIdOffset.scala b/core/src/main/scala/org/nullvector/query/ObjectIdOffset.scala index 3133f19..ea11138 100644 --- a/core/src/main/scala/org/nullvector/query/ObjectIdOffset.scala +++ b/core/src/main/scala/org/nullvector/query/ObjectIdOffset.scala @@ -17,7 +17,7 @@ case class ObjectIdOffset(id: BSONObjectID) extends Offset with Ordered[ObjectId override val toString: String = s"Offset(${id.stringify})" - override def compare(that: ObjectIdOffset): Int = BigInt(id.byteArray).compare(BigInt(that.id.byteArray)) + override def compare(that: ObjectIdOffset): Int = id.stringify.compare(that.id.stringify) } diff --git a/core/src/main/scala/org/nullvector/query/PersistenceIdsQueries.scala b/core/src/main/scala/org/nullvector/query/PersistenceIdsQueries.scala index 1b1e629..908eaa8 100644 --- a/core/src/main/scala/org/nullvector/query/PersistenceIdsQueries.scala +++ b/core/src/main/scala/org/nullvector/query/PersistenceIdsQueries.scala @@ -32,7 +32,7 @@ trait PersistenceIdsQueries greaterOffsetOf, o => currentPersistenceIds(o) )) - .flatMapConcat(identity) + .mapConcat(identity) .map(_.persistenceId) } diff --git a/core/src/main/scala/org/nullvector/query/PullerGraph.scala b/core/src/main/scala/org/nullvector/query/PullerGraph.scala index 5fc600f..00939bb 100644 --- a/core/src/main/scala/org/nullvector/query/PullerGraph.scala +++ b/core/src/main/scala/org/nullvector/query/PullerGraph.scala @@ -3,54 +3,70 @@ package org.nullvector.query import akka.NotUsed import akka.stream.scaladsl.Source import akka.stream.stage._ -import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.{Attributes, Materializer, Outlet, SourceShape} +import org.slf4j.{Logger, LoggerFactory} -import scala.concurrent.ExecutionContext +import scala.collection.mutable import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} -class PullerGraph[D, O]( - initialOffset: O, - refreshInterval: FiniteDuration, - offsetOf: D => O, - graterOf: (O, O) => O, - nextChunk: O => Source[D, NotUsed], - )(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Source[D, NotUsed]]] { +class PullerGraph[Element, Offset]( + initialOffset: Offset, + refreshInterval: FiniteDuration, + offsetOf: Element => Offset, + greaterOf: (Offset, Offset) => Offset, + query: Offset => Source[Element, NotUsed], + )(implicit ec: ExecutionContext, mat: Materializer) extends GraphStage[SourceShape[Seq[Element]]] { - private val outlet: Outlet[Source[D, NotUsed]] = Outlet[Source[D, NotUsed]]("PullerGraph.OUT") + private val outlet: Outlet[Seq[Element]] = Outlet[Seq[Element]]("PullerGraph.OUT") - override def shape: SourceShape[Source[D, NotUsed]] = SourceShape.of(outlet) + override def shape: SourceShape[Seq[Element]] = SourceShape.of(outlet) override def createLogic(attributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - + var currentOffset: Offset = initialOffset private val effectiveRefreshInterval: FiniteDuration = attributes.get[RefreshInterval].fold(refreshInterval)(_.interval) - var currentOffset: O = initialOffset - var eventStreamConsuming = false - - private val updateConsumingState: AsyncCallback[Boolean] = createAsyncCallback[Boolean](eventStreamConsuming = _) - private val updateCurrentOffset: AsyncCallback[D] = - createAsyncCallback[D](event => currentOffset = graterOf(currentOffset, offsetOf(event))) + private val updateCurrentOffset = createAsyncCallback[Offset](offset => currentOffset = offset) + private val failAsync = createAsyncCallback[Throwable](throwable => failStage(throwable)) + private val pushElements = createAsyncCallback[Seq[Element]](elements => push(outlet, elements)) + private val timerName = "timer" setHandler(outlet, new OutHandler { - override def onPull(): Unit = {} + override def onPull() = scheduleNext() - override def onDownstreamFinish(cause: Throwable): Unit = cancelTimer("timer") + override def onDownstreamFinish(cause: Throwable) = cancelTimer(timerName) }) - override def preStart(): Unit = scheduleWithFixedDelay("timer", effectiveRefreshInterval, effectiveRefreshInterval) - - override protected def onTimer(timerKey: Any): Unit = { - if (isAvailable(outlet) && !eventStreamConsuming) { - eventStreamConsuming = true - val source = nextChunk(currentOffset) - .mapAsync(1)(entry => updateCurrentOffset.invokeWithFeedback(entry).map(_ => entry)) - .watchTermination() { (mat, future) => - future.onComplete { _ => updateConsumingState.invoke(false) } - mat - } - push(outlet, source) + override protected def onTimer(timerKey: Any) = { + query(currentOffset) + .runFold(new Accumulator(currentOffset))((acc, element) => acc.update(element)) + .flatMap(_.pushOrScheduleNext()) + .recover { case throwable: Throwable => failAsync.invoke(throwable) } + } + + private def scheduleNext() = { + if (!isTimerActive(timerName)) { + scheduleOnce(timerName, effectiveRefreshInterval) } } - } + class Accumulator(private var latestOffset: Offset, private val elements: mutable.Buffer[Element] = mutable.Buffer.empty) { + def update(anElement: Element): Accumulator = { + latestOffset = greaterOf(latestOffset, offsetOf(anElement)) + elements.append(anElement) + this + } + + def pushOrScheduleNext(): Future[Unit] = { + if (elements.nonEmpty) { + for { + _ <- updateCurrentOffset.invokeWithFeedback(latestOffset) + _ <- pushElements.invokeWithFeedback(elements.toSeq) + } yield () + } + else Future.successful(scheduleNext()) + } + } + + } } diff --git a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala index 1a007d8..824578d 100644 --- a/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala +++ b/core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala @@ -2,12 +2,11 @@ package org.nullvector.queries import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger - import akka.actor.{ActorSystem, typed} import akka.actor.typed.scaladsl.Behaviors import akka.persistence.query.{EventEnvelope, NoOffset} import akka.persistence.{AtomicWrite, PersistentRepr} -import akka.stream.Materializer +import akka.stream.{DelayOverflowStrategy, Materializer} import akka.stream.scaladsl.{Sink, Source} import akka.testkit.{ImplicitSender, TestKit, TestKitBase} import com.typesafe.config.{Config, ConfigFactory} @@ -26,7 +25,12 @@ import scala.util.Random class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { - override lazy val system = typed.ActorSystem(Behaviors.empty,"ReactiveMongoPlugin").classicSystem + + private lazy val config: Config = ConfigFactory + .parseString("akka-persistence-reactivemongo.persistence-id-separator = \"\"") + .withFallback(ConfigFactory.load()) + + override lazy val system = typed.ActorSystem[Any](Behaviors.empty, "ReactiveMongoPlugin", config).classicSystem implicit lazy val dispatcher: ExecutionContextExecutor = system.dispatcher protected lazy val rxDriver: ReactiveMongoDriver = ReactiveMongoDriver(system) private val amountOfCores: Int = 10 @@ -127,20 +131,22 @@ class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender "Events by tag from a given Offset" in { val prefixReadColl = "ReadCollection" dropAll() - Await.ready(Source(1 to 10).mapAsync(10) { idx => + Await.ready(Source(1 to 10).mapAsync(amountOfCores) { idx => val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" reactiveMongoJournalImpl.asyncWriteMessages((1 to 25).map(jIdx => AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) )) - }.runWith(Sink.ignore), 14.seconds) - Thread.sleep(700) + }.run(), 14.seconds) + Thread.sleep(1500) val offset = ObjectIdOffset.newOffset() - Await.ready(Source(1 to 10).mapAsync(10) { idx => + Thread.sleep(500) + Await.ready(Source(1 to 10).mapAsync(amountOfCores) { idx => val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" reactiveMongoJournalImpl.asyncWriteMessages((26 to 50).map(jIdx => AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) )) - }.runWith(Sink.ignore), 14.seconds) + }.run(), 14.seconds) + Thread.sleep(500) val eventualDone = readJournal.currentEventsByTag("event_tag_1", offset).runWith(Sink.seq) val envelopes = Await.result(eventualDone, 14.seconds) @@ -150,55 +156,45 @@ class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender "Infinite Events by tag" in { val prefixReadColl = "ReadCollection" dropAll() - val envelopes = new ConcurrentLinkedQueue[EventEnvelope]() - readJournal.eventsByTag("event_tag_1", NoOffset).async.runWith(Sink.foreach(e => envelopes.add(e))).recover { + val counter = new AtomicInteger() + readJournal.eventsByTag("event_tag_1", NoOffset) + .addAttributes(RefreshInterval(300.millis)) + .runWith(Sink.foreach(_ => counter.incrementAndGet())).recover { case e: Throwable => e.printStackTrace() } - Thread.sleep(700) - Await.ready(Source(1 to 10).mapAsync(amountOfCores) { idx => - val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" - reactiveMongoJournalImpl.asyncWriteMessages((1 to 25).map(jIdx => - AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) - )) - }.runWith(Sink.ignore), 14.seconds) - Thread.sleep(700) - Await.ready(Source(1 to 10).mapAsync(amountOfCores) { idx => - val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" - reactiveMongoJournalImpl.asyncWriteMessages((26 to 50).map(jIdx => - AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) - )) - }.runWith(Sink.ignore), 14.seconds) - Thread.sleep(700) - envelopes.size shouldBe 500 + + val eventualDone = Source(1 to 125) + .delay(100.millis, DelayOverflowStrategy.backpressure) + .mapAsync(amountOfCores) { idx => + val collId = (idx % 7) + 1 + val pId = s"${prefixReadColl}_$collId-${Random.nextLong().abs}" + reactiveMongoJournalImpl.asyncWriteMessages((1 to 4).map(jIdx => + AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) + )) + }.run() + Await.ready(eventualDone, 7.seconds) + Thread.sleep(1000) + counter.get() shouldBe 500 } "Infinite Events by tag with Custom RefreshInterval" in { val prefixReadColl = "ReadCollection" dropAll() - val envelopes = new ConcurrentLinkedQueue[EventEnvelope]() + val counter = new AtomicInteger(0) readJournal .eventsByTag("event_tag_1", NoOffset) - .async .addAttributes(RefreshInterval(5.millis)) - .runWith(Sink.foreach(e => envelopes.add(e))).recover { + .runWith(Sink.foreach(e => counter.incrementAndGet())).recover { case e: Throwable => e.printStackTrace() } - readJournal - .eventsByTag("some_tag", NoOffset) - .async - .addAttributes(RefreshInterval(5.millis)) - .runWith(Sink.foreach(println)) - Thread.sleep(500) - + Thread.sleep(1000) Await.ready(Source(1 to 3).mapAsync(amountOfCores) { idx => val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" reactiveMongoJournalImpl.asyncWriteMessages((1 to 25).map(jIdx => AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = jIdx)) )) }.runWith(Sink.ignore), 14.seconds) - - Thread.sleep(500) - + Thread.sleep(1000) Await.ready(Source(1 to 3).mapAsync(amountOfCores) { idx => val pId = s"${prefixReadColl}_$idx-${Random.nextLong().abs}" reactiveMongoJournalImpl.asyncWriteMessages((26 to 50).map(jIdx => @@ -206,9 +202,9 @@ class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender )) }.runWith(Sink.ignore), 14.seconds) - Thread.sleep(600) + Thread.sleep(1000) - envelopes.size shouldBe 150 + counter.get() shouldBe 150 } "Infinite Events by Id" in { @@ -218,23 +214,22 @@ class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender val pId = s"$prefixReadColl-123" readJournal .eventsByPersistenceId(pId, 0L, Long.MaxValue) - .async + .addAttributes(RefreshInterval(5.millis)) .runWith(Sink.foreach(e => envelopes.add(e))).recover { case e: Throwable => e.printStackTrace() } - Thread.sleep(700) Await.ready(Source(1 to 10).mapAsync(amountOfCores) { idx => - reactiveMongoJournalImpl.asyncWriteMessages(immutable.Seq( + reactiveMongoJournalImpl.asyncWriteMessages(Seq( AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = idx)) )) }.runWith(Sink.ignore), 14.seconds) Thread.sleep(700) Await.ready(Source(11 to 20).mapAsync(amountOfCores) { idx => - reactiveMongoJournalImpl.asyncWriteMessages(immutable.Seq( + reactiveMongoJournalImpl.asyncWriteMessages(Seq( AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$idx", 23.45), persistenceId = pId, sequenceNr = idx)) )) }.runWith(Sink.ignore), 14.seconds) - Thread.sleep(700) + Thread.sleep(500) envelopes.peek().persistenceId shouldBe pId envelopes.size shouldBe 20 } @@ -276,14 +271,14 @@ class ReactiveMongoReadJournalSpec() extends TestKitBase with ImplicitSender AtomicWrite(PersistentRepr(payload = SomeEvent(s"lechuga_$collId", 23.45), persistenceId = pId, sequenceNr = 1)) }) }.runWith(Sink.ignore), 14.seconds) - val ids = Await.result(readJournal.currentPersistenceIds().async.runWith(Sink.seq), 14.seconds) + val ids = Await.result(readJournal.currentPersistenceIds().runWith(Sink.seq), 14.seconds) ids.size shouldBe 250 } } - private def dropAll() = { - Await.ready(Source.future(rxDriver.journals()) - .mapConcat(identity) + private def dropAll(prefix: Option[String] = None) = { + Await.result(Source.future(rxDriver.journals()) + .mapConcat(colls => prefix.fold(colls)(x => colls.filter(_.name == x))) .mapAsync(amountOfCores)(_.drop(failIfNotFound = false)) .runWith(Sink.ignore), 14.seconds) } diff --git a/project/build.properties b/project/build.properties index 8b8de72..7de0a93 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.3.5 \ No newline at end of file +sbt.version=1.4.4 diff --git a/project/plugins.sbt b/project/plugins.sbt index b49a6e4..b621176 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1,2 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") +addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.4.6")