From b8a94a78781f1ef9c6126a9d0ea5ff0faca42483 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Tue, 2 Jul 2024 12:25:15 +0700 Subject: [PATCH 1/3] Correctly handle error for team.storeBulk `handleErrorWith` should act in storeBulk instead of harmless logging --- modules/ingestor/src/main/scala/ingestor.team.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index 410d942e..618ca0cf 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -59,9 +59,11 @@ object TeamIngestor: private def storeBulk(docs: List[Document]): IO[Unit] = val sources = docs.toSources info"Received ${docs.size} teams to index" *> - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams" + elastic + .storeBulk(index, sources) .handleErrorWith: e => Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}") + *> info"Indexed ${sources.size} teams" private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) From 2e4d745690e6a7e81571f8cd56d39f59ad517b68 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Tue, 2 Jul 2024 12:32:07 +0700 Subject: [PATCH 2/3] Correct index for deleteMany logging --- modules/ingestor/src/main/scala/package.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index 191f5fb5..347b3744 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -44,22 +44,22 @@ def range(field: String)(since: Instant, until: Option[Instant]): Filter = until.fold(gtes)(until => gtes.and(Filter.lt(field, until))) extension (elastic: ESClient[IO]) + @scala.annotation.targetName("deleteManyWithIds") def deleteMany(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] = elastic .deleteMany(index, ids) .flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s")) .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete ${index.value}s: ${ids.map(_.value).mkString(", ")}") + Logger[IO].error(e)(s"Failed to delete ${index.value}: ${ids.map(_.value).mkString(", ")}") .whenA(ids.nonEmpty) @scala.annotation.targetName("deleteManyWithDocs") def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> - deleteMany(index, events.flatMap(_.id).map(Id.apply)) - .whenA(events.nonEmpty) + info"Received ${events.size} ${index.value} to delete" *> + deleteMany(index, events.flatMap(_.id).map(Id.apply)).whenA(events.nonEmpty) @scala.annotation.targetName("deleteManyWithChanges") def deleteMany(index: Index, events: List[ChangeStreamDocument[Document]])(using Logger[IO]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> + info"Received ${events.size} ${index.value} to delete" *> deleteMany(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) From b8e41390a9a6828a3c14dac95f4b62510b639646 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Tue, 2 Jul 2024 12:34:18 +0700 Subject: [PATCH 3/3] Dont call elastic search if there no team to index --- modules/ingestor/src/main/scala/ingestor.team.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index 618ca0cf..d0959b04 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -63,6 +63,7 @@ object TeamIngestor: .storeBulk(index, sources) .handleErrorWith: e => Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}") + .whenA(sources.nonEmpty) *> info"Indexed ${sources.size} teams" private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =