Skip to content

Commit

Permalink
#421 Rename metastore reader interface for the future implementation …
Browse files Browse the repository at this point in the history
…of incremental transformations and sinks.
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 64e47b1 commit 6c108ed
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ trait MetastoreReader {
infoDateTo: Option[LocalDate] = None): DataFrame

/**
* Reads the 'current batch' of the table.
* Reads the 'current batch' of the table to be processed incrementally.
*
* For incremental processing this method returns the current chunk being processed.
* It may include multiple chunks from non-processed data if transformer has failed previously.
*
* For non-incremental processing the call to this method is equivalent to:
* {{{
Expand All @@ -55,15 +56,15 @@ trait MetastoreReader {
*
* which returns all data for the current information date being processed.
*
* This method is the method to use for transformers that
* This method is the method to use for transformers that would use 'incremental' schedule.
*
* In order to read a table it is not sufficient the table to be registered in the metastore. It also
* should be defined as input tables of the job. Otherwise, a runtime exception will be thrown.
*
* @param tableName The name of the table to read.
* @return The dataframe containing data from the table.
*/
def getCurrentBatch(tableName: String): DataFrame
def getTableIncremental(tableName: String): DataFrame

/**
* Reads the latest partition of a given table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait Metastore {

def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame

def getCurrentBatch(tableName: String, infoDate: LocalDate): DataFrame
def getTableIncremental(tableName: String, infoDate: LocalDate): DataFrame

def getLatest(tableName: String, until: Option[LocalDate]): DataFrame

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo)
}

override def getCurrentBatch(tableName: String, infoDate: LocalDate): DataFrame = {
override def getTableIncremental(tableName: String, infoDate: LocalDate): DataFrame = {
val mt = getTableDef(tableName)

val df = MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(Option(infoDate), Option(infoDate))
Expand Down Expand Up @@ -211,10 +211,10 @@ class MetastoreImpl(appConfig: Config,
metastore.getTable(tableName, from, to)
}

override def getCurrentBatch(tableName: String): DataFrame = {
override def getTableIncremental(tableName: String): DataFrame = {
validateTable(tableName)
if (isIncremental)
metastore.getCurrentBatch(tableName, infoDate)
metastore.getTableIncremental(tableName, infoDate)
else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
}

val updatedDf = metastore.getCurrentBatch(outputTable.name, infoDate)
val updatedDf = metastore.getTableIncremental(outputTable.name, infoDate)

if (updatedDf.isEmpty) {
if (isRerun) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class IdentityTransformer extends Transformer {

val tableName = options.getOrElse(INPUT_TABLE_KEY, options(INPUT_TABLE_LEGACY_KEY))

val df = metastore.getCurrentBatch(tableName)
val df = metastore.getTableIncremental(tableName)

if (emptyAllowed || !df.isEmpty) {
Reason.Ready
Expand All @@ -46,7 +46,7 @@ class IdentityTransformer extends Transformer {
options: Map[String, String]): DataFrame = {
val tableName = options.getOrElse(INPUT_TABLE_KEY, options(INPUT_TABLE_LEGACY_KEY))

metastore.getCurrentBatch(tableName)
metastore.getTableIncremental(tableName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MetastoreReaderMock(tables: Seq[(String, DataFrame)], infoDate: LocalDate)
}
}

override def getCurrentBatch(tableName: String): DataFrame = getTable(tableName, Option(infoDate), Option(infoDate))
override def getTableIncremental(tableName: String): DataFrame = getTable(tableName, Option(infoDate), Option(infoDate))

override def getLatest(tableName: String, until: Option[LocalDate]): DataFrame = {
tables.find(_._1 == tableName) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
tableDf
}

override def getCurrentBatch(tableName: String, infoDate: LocalDate): DataFrame = {
override def getTableIncremental(tableName: String, infoDate: LocalDate): DataFrame = {
getTable(tableName, Option(infoDate), Option(infoDate))
}

Expand Down Expand Up @@ -116,7 +116,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
metastore.getTable(tableName, from, to)
}

override def getCurrentBatch(tableName: String): DataFrame = {
override def getTableIncremental(tableName: String): DataFrame = {
getTable(tableName, Option(infoDate), Option(infoDate))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC
whenMock(metastoreReadeMock.getTable("table1", Some(infoDateWithData), Some(infoDateWithData))).thenReturn(exampleDf)
whenMock(metastoreReadeMock.getTable("table1", Some(infoDateWithEmptyDf), Some(infoDateWithEmptyDf))).thenReturn(emptyDf)
if (isEmptyToday)
whenMock(metastoreReadeMock.getCurrentBatch("table1")).thenReturn(emptyDf)
whenMock(metastoreReadeMock.getTableIncremental("table1")).thenReturn(emptyDf)
else
whenMock(metastoreReadeMock.getCurrentBatch("table1")).thenReturn(exampleDf)
whenMock(metastoreReadeMock.getTableIncremental("table1")).thenReturn(exampleDf)

(new IdentityTransformer(), metastoreReadeMock)
}
Expand Down

0 comments on commit 6c108ed

Please sign in to comment.