Skip to content

Commit

Permalink
#25: just making it more English and consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
lsulak committed Aug 22, 2023
1 parent 0f3099d commit 5a2b61b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 23 deletions.
17 changes: 9 additions & 8 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,27 @@ import za.co.absa.atum.agent.model.{AtumPartitions, MeasureResult}
object AtumAgent {

/**
* Sends the `MeasureResult` and extra data from a given `AtumContext` to the AtumService API.
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit = println(
Seq(checkpointKey, atumContext, measureResult).mkString(" || ")
)
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
println(
s"Enqueued measurement: ${Seq(checkpointKey, atumContext, measureResult).mkString(" || ")}"
)

/**
* Sends a single `MeasureResult` to the AtumService API. With not AtumContext involve.
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
*
* @param checkpointKey
* @param measureResult
*/
def measurePublish(checkpointKey: String, measure: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey, " + (measure))
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey || $measureResult")

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from Atum Service API.
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* @param atumPartitions
* @return
*/
Expand Down
21 changes: 6 additions & 15 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,18 @@ import za.co.absa.atum.agent.model.{AtumPartitions, MeasureResult, Measurement}
* @param measurements: A sequences of measurements.
*/

case class AtumContext(measurements: Set[Measurement] = Set(), atumPartitions: AtumPartitions = AtumPartitions()) {
case class AtumContext(measurements: Set[Measurement] = Set.empty, atumPartitions: AtumPartitions = AtumPartitions()) {

def withMeasuresReplaced(
byMeasure: Measurement
): AtumContext =
def withMeasuresReplaced(byMeasure: Measurement): AtumContext =
this.copy(measurements = Set(byMeasure))

def withMeasuresReplaced(
byMeasures: Iterable[Measurement]
): AtumContext =
def withMeasuresReplaced(byMeasures: Iterable[Measurement]): AtumContext =
this.copy(measurements = byMeasures.toSet)

def withMeasuresAdded(
measure: Measurement
): AtumContext =
def withMeasuresAdded(measure: Measurement): AtumContext =
this.copy(measurements = measurements + measure)

def withMeasuresAdded(
measures: Iterable[Measurement]
): AtumContext =
def withMeasuresAdded(measures: Iterable[Measurement]): AtumContext =
this.copy(measurements = measurements ++ measures)

def withMeasureRemoved(measurement: Measurement): AtumContext =
Expand All @@ -55,12 +47,11 @@ object AtumContext {
implicit class DatasetWrapper(df: DataFrame) {

/**
* Executes the measure directly with not AtumContext.
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measurement): DataFrame = {

val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
Expand Down

0 comments on commit 5a2b61b

Please sign in to comment.