Skip to content

Commit

Permalink
Fix memory leak using non-termination persistence query.
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogdea committed Feb 1, 2021
1 parent fea6000 commit ec126ac
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 96 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/project/target
.DS_Store
/project/project/
/.bsp/
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
lazy val scala213 = "2.13.4"
lazy val scala300 = "3.0.0-M2"
lazy val supportedScalaVersions = List(scala213)
lazy val akkaVersion = "2.6.10"
lazy val rxmongoVersion = "1.0.1"

lazy val commonSettings = Seq(
name := "akka-reactivemongo-plugin",
organization := "null-vector",
version := "1.4.3",
version := s"1.4.4",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
scalacOptions := Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ trait CollectionNameMapping {

class DefaultCollectionNameMapping(config: Config) extends CollectionNameMapping {
private val separator: String = config.getString("akka-persistence-reactivemongo.persistence-id-separator")
private val pattern: Regex = buildPattern(separator.head)
private val pattern: Regex = buildPattern(separator.headOption)

override def collectionNameOf(persistentId: String): Option[String] = persistentId match {
case pattern(name, _) if name.isEmpty => None
case pattern(name, _) => Some(name)
case _ => None
}

private def buildPattern(separator: Char) = s"(\\w+)[$separator](.+)".r
private def buildPattern(maybeSeparator: Option[Char]) = maybeSeparator match {
case Some(char) => s"(\\w+)[$char](.+)".r
case None => s"()(.+)".r
}

}

11 changes: 5 additions & 6 deletions core/src/main/scala/org/nullvector/query/EventsQueries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait EventsQueries
(_, fromToSequences) => fromToSequences,
offset => currentEventsByPersistenceId(persistenceId, offset._1, offset._2)
))
.flatMapConcat(identity)
.mapConcat(identity)
}

override def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
Expand All @@ -54,8 +54,8 @@ trait EventsQueries
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
Source
.fromGraph(new PullerGraph[EventEnvelope, Offset](
offset, defaultRefreshInterval, _.offset, greaterOffsetOf, o => currentEventsByTag(tag, o)))
.flatMapConcat(identity)
offset, defaultRefreshInterval, _.offset, greaterOffsetOf, offset => currentEventsByTag(tag, offset)))
.mapConcat(identity)

/*
* Query events that have a specific tag. Those events matching target tags would
Expand Down Expand Up @@ -83,11 +83,11 @@ trait EventsQueries
}

private def eventsByTagQuery(tags: Seq[String], offset: Offset)(implicit serializableMethod: (BSONDocument, BSONDocument) => Future[Any]): Source[EventEnvelope, NotUsed] = {
Source.lazyFuture(() => rxDriver.journals())
Source.future(rxDriver.journals())
.mapConcat(identity)
.splitWhen(_ => true)
.flatMapConcat(buildFindEventsByTagsQuery(_, offset, tags))
.mergeSubstreams
.mergeSubstreamsWithParallelism(amountOfCores)
.via(document2Envelope(serializableMethod))
}

Expand All @@ -110,7 +110,6 @@ trait EventsQueries

private def buildFindEventsByTagsQuery(coll: collection.BSONCollection, offset: Offset, tags: Seq[String]) = {
def query(field: String) = BSONDocument(field -> BSONDocument("$in" -> tags))

coll
.aggregateWith[BSONDocument]()(framework =>
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.nullvector.query
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.persistence.query.{EventEnvelope, NoOffset, Offset}
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import org.nullvector.PersistInMemory.EventWithOffset
import org.nullvector.{PersistInMemory, ReactiveMongoEventSerializer}
Expand All @@ -12,6 +13,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future}

class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoScalaReadJournal {
private implicit val ec: ExecutionContextExecutor = actorSystem.executionContext
private implicit val mat: Materializer = Materializer.matFromSystem(actorSystem)
private val memory: PersistInMemory = PersistInMemory(actorSystem)
private val serializer: ReactiveMongoEventSerializer = ReactiveMongoEventSerializer(actorSystem)
val defaultRefreshInterval: FiniteDuration = 500.millis
Expand All @@ -28,7 +30,7 @@ class FromMemoryReadJournal(actorSystem: ActorSystem[_]) extends ReactiveMongoSc
.fromGraph(
new PullerGraph[EventEnvelope, Offset](offset, defaultRefreshInterval, _.offset, greaterOffsetOf, offset => currentEventsByTag(tag, offset))
)
.flatMapConcat(identity)
.mapConcat(identity)

override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
currentEventsByTags(Seq(tag), offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class ObjectIdOffset(id: BSONObjectID) extends Offset with Ordered[ObjectId

override val toString: String = s"Offset(${id.stringify})"

override def compare(that: ObjectIdOffset): Int = BigInt(id.byteArray).compare(BigInt(that.id.byteArray))
override def compare(that: ObjectIdOffset): Int = id.stringify.compare(that.id.stringify)
}


Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait PersistenceIdsQueries
greaterOffsetOf,
o => currentPersistenceIds(o)
))
.flatMapConcat(identity)
.mapConcat(identity)
.map(_.persistenceId)
}

Expand Down
82 changes: 49 additions & 33 deletions core/src/main/scala/org/nullvector/query/PullerGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,70 @@ package org.nullvector.query
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.{Attributes, Materializer, Outlet, SourceShape}
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

class PullerGraph[D, O](
initialOffset: O,
refreshInterval: FiniteDuration,
offsetOf: D => O,
graterOf: (O, O) => O,
nextChunk: O => Source[D, NotUsed],
)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Source[D, NotUsed]]] {
class PullerGraph[Element, Offset](
initialOffset: Offset,
refreshInterval: FiniteDuration,
offsetOf: Element => Offset,
greaterOf: (Offset, Offset) => Offset,
query: Offset => Source[Element, NotUsed],
)(implicit ec: ExecutionContext, mat: Materializer) extends GraphStage[SourceShape[Seq[Element]]] {

private val outlet: Outlet[Source[D, NotUsed]] = Outlet[Source[D, NotUsed]]("PullerGraph.OUT")
private val outlet: Outlet[Seq[Element]] = Outlet[Seq[Element]]("PullerGraph.OUT")

override def shape: SourceShape[Source[D, NotUsed]] = SourceShape.of(outlet)
override def shape: SourceShape[Seq[Element]] = SourceShape.of(outlet)

override def createLogic(attributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {

var currentOffset: Offset = initialOffset
private val effectiveRefreshInterval: FiniteDuration = attributes.get[RefreshInterval].fold(refreshInterval)(_.interval)
var currentOffset: O = initialOffset
var eventStreamConsuming = false

private val updateConsumingState: AsyncCallback[Boolean] = createAsyncCallback[Boolean](eventStreamConsuming = _)
private val updateCurrentOffset: AsyncCallback[D] =
createAsyncCallback[D](event => currentOffset = graterOf(currentOffset, offsetOf(event)))
private val updateCurrentOffset = createAsyncCallback[Offset](offset => currentOffset = offset)
private val failAsync = createAsyncCallback[Throwable](throwable => failStage(throwable))
private val pushElements = createAsyncCallback[Seq[Element]](elements => push(outlet, elements))

private val timerName = "timer"
setHandler(outlet, new OutHandler {
override def onPull(): Unit = {}
override def onPull() = scheduleNext()

override def onDownstreamFinish(cause: Throwable): Unit = cancelTimer("timer")
override def onDownstreamFinish(cause: Throwable) = cancelTimer(timerName)
})

override def preStart(): Unit = scheduleWithFixedDelay("timer", effectiveRefreshInterval, effectiveRefreshInterval)

override protected def onTimer(timerKey: Any): Unit = {
if (isAvailable(outlet) && !eventStreamConsuming) {
eventStreamConsuming = true
val source = nextChunk(currentOffset)
.mapAsync(1)(entry => updateCurrentOffset.invokeWithFeedback(entry).map(_ => entry))
.watchTermination() { (mat, future) =>
future.onComplete { _ => updateConsumingState.invoke(false) }
mat
}
push(outlet, source)
override protected def onTimer(timerKey: Any) = {
query(currentOffset)
.runFold(new Accumulator(currentOffset))((acc, element) => acc.update(element))
.flatMap(_.pushOrScheduleNext())
.recover { case throwable: Throwable => failAsync.invoke(throwable) }
}

private def scheduleNext() = {
if (!isTimerActive(timerName)) {
scheduleOnce(timerName, effectiveRefreshInterval)
}
}

}
class Accumulator(private var latestOffset: Offset, private val elements: mutable.Buffer[Element] = mutable.Buffer.empty) {

def update(anElement: Element): Accumulator = {
latestOffset = greaterOf(latestOffset, offsetOf(anElement))
elements.append(anElement)
this
}

def pushOrScheduleNext(): Future[Unit] = {
if (elements.nonEmpty) {
for {
_ <- updateCurrentOffset.invokeWithFeedback(latestOffset)
_ <- pushElements.invokeWithFeedback(elements.toSeq)
} yield ()
}
else Future.successful(scheduleNext())
}
}

}
}
Loading

0 comments on commit ec126ac

Please sign in to comment.