Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#496 Fix handling of retrospectively updated jobs. #497

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ name: ScalaCI

on:
push:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
pull_request:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
Expand Down Expand Up @@ -42,6 +46,16 @@ jobs:
distribution: temurin
java-version: 8
cache: sbt
- name: Install sbt
run: |
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
- name: Build and run unit tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} test -DSPARK_VERSION=${{matrix.spark}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.TimeUtils
import za.co.absa.pramen.core.utils.{Emoji, TimeUtils}

import java.time.{Instant, LocalDate}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -114,15 +114,40 @@ abstract class JobBase(operationDef: OperationDef,
}

protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = {
if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
} else {
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
case Some(chunk) =>
val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished)
if (outOfDateTables.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}")
val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}"
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning)))
} else {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
}
case None =>
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
}
}

private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = {
operationDef.dependencies
.filter(d => !d.isOptional && !d.isPassive)
.flatMap(_.tables)
.distinct
.filter { table =>
bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match {
case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds =>
log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .")
true
case _ =>
false
}
}
}

protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = {
val evaluator = new DateExprEvaluator
evaluator.setValue("infoDate", infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object ScheduleStrategyUtils {
dependency.tables.foldLeft(false)((acc, table) => {
bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match {
case Some(dependencyUpdated) =>
val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished
val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished
if (isUpdatedRetrospectively) {
log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " +
s"Adding '$outputTable' to rerun for $infoDate.")
Expand Down
Loading