Skip to content

Commit

Permalink
Merge pull request #258 from lichess-org/skip-forum-posts-that-exceed…
Browse files Browse the repository at this point in the history
…-5000-chars

Filter out over size forum posts in change stream
  • Loading branch information
lenguyenthanh authored Jul 3, 2024
2 parents 246aea1 + 1a69866 commit 18de0f1
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ object ForumIngestor:
private val index = Index.Forum

private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private def maxPostSizeFilter(max: Int) =
Filter.expr(s"{ $$lte: [{ $$strLenCP: '$$fullDocument.text' }, $max] }")

private def eventFilter(maxPostLength: Int) =
Filter.in("operationType", interestedOperations) && maxPostSizeFilter(maxPostLength)

private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt)
private val postProjection = Projection.include(interestedFields)
Expand All @@ -35,7 +40,8 @@ object ForumIngestor:
List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _)
private val eventProjection = Projection.include(interestedEventFields)

private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))
private def aggregate(maxPostLength: Int) =
Aggregate.matchBy(eventFilter(maxPostLength)).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
using Logger[IO]
Expand Down Expand Up @@ -104,7 +110,7 @@ object ForumIngestor:
.map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
val builder = posts.watch(aggregate(config.maxPostLength))
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
Expand All @@ -113,7 +119,6 @@ object ForumIngestor:
.batchSize(config.batchSize)
.boundedStream(config.batchSize)
.drop(skip)
.filter(_.validText)
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList)
Expand Down Expand Up @@ -169,9 +174,6 @@ object ForumIngestor:
private def isDelete: Boolean =
event.operationType == DELETE || event.fullDocument.exists(_.isErased)

private def validText: Boolean =
event.fullDocument.exists(_.validText)

object F:
val text = "text"
val topicId = "topicId"
Expand Down

0 comments on commit 18de0f1

Please sign in to comment.