From c44e6d54754fcfb16b3617ec054ade5005280c7c Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 14 Oct 2024 09:47:48 +0200 Subject: [PATCH 1/3] #496 Fix handling of retrospectively updated jobs. --- .../absa/pramen/core/pipeline/JobBase.scala | 39 +++++++++++++++---- .../splitter/ScheduleStrategyUtils.scala | 2 +- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index ad14fa81f..14c547a2a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.utils.Emoji._ -import za.co.absa.pramen.core.utils.TimeUtils +import za.co.absa.pramen.core.utils.{Emoji, TimeUtils} import java.time.{Instant, LocalDate} import scala.util.{Failure, Success, Try} @@ -114,15 +114,40 @@ abstract class JobBase(operationDef: OperationDef, } protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = { - if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) { - log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") - Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) - } else { - log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") - None + bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match { + case Some(chunk) => + val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished) + if (outOfDateTables.nonEmpty) { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}") + val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}" + Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning))) + } else { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") + Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) + } + case None => + log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") + None } } + private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = { + operationDef.dependencies + .filter(d => !d.isOptional && !d.isPassive) + .flatMap(_.tables) + .distinct + .filter { table => + bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match { + case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds => + log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " + + s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .") + true + case _ => + false + } + } + } + protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = { val evaluator = new DateExprEvaluator evaluator.setValue("infoDate", infoDate) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index a62c300dd..070c7d7b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -214,7 +214,7 @@ object ScheduleStrategyUtils { dependency.tables.foldLeft(false)((acc, table) => { bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match { case Some(dependencyUpdated) => - val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished + val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished if (isUpdatedRetrospectively) { log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " + s"Adding '$outputTable' to rerun for $infoDate.") From 5978140b6c749dd1de734bcf73653c4d4463538b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 15 Oct 2024 09:15:02 +0200 Subject: [PATCH 2/3] Update CI for support branches. --- .github/workflows/scala.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 1cf8cd1c0..0261a4fda 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -2,12 +2,16 @@ name: ScalaCI on: push: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" pull_request: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" From cd000c8df2aee34921b137a8a15b6f40c993bac3 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 15 Oct 2024 09:32:39 +0200 Subject: [PATCH 3/3] Fix sbt availability in CI. --- .github/workflows/scala.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 0261a4fda..c988005b1 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -46,6 +46,16 @@ jobs: distribution: temurin java-version: 8 cache: sbt + - name: Install sbt + run: | + sudo apt-get update + sudo apt-get install apt-transport-https curl gnupg -yqq + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import + sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg + sudo apt-get update + sudo apt-get install sbt - name: Build and run unit tests working-directory: ./pramen run: sbt ++${{matrix.scala}} test -DSPARK_VERSION=${{matrix.spark}}