Skip to content

Commit

Permalink
#25: create class for partitions (#45)
Browse files Browse the repository at this point in the history
* * It's now possible to create an `AtumContext` of sub-partitions
* `AtumAgent` now a class and companion object of the class to allow referencing
* `AtumContext` now behaves somewhat like a singleton for given `AtumPartitions` - as a result it's mutable
* Rename `Measurement` to `Measure`
* Created simplified `AtumPartitions`

---------

Co-authored-by: Hector Perez <hector.perez@absa.africa>
Co-authored-by: Ladislav Sulak <laco.sulak@gmail.com>
Co-authored-by: David Benedeki <benedeki@volny.cz>
  • Loading branch information
4 people authored Sep 18, 2023
1 parent 1bcaa3d commit aec3c76
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 136 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# Atum


## Modules

### Agent `agent/`
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository. Provides libraries for establishing and pushing them to the API located in `server/`.
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository.
It provides libraries for establishing and pushing them to the API located in `server/`.
See `agent/README.md`.

### Server `server/`
An API under construction that communicates with AtumAgent and with the persisting storage. It also provides measure configuration to the `AtumAgent`.
An API under construction that communicates with the Agent and with the persistent storage.
It also provides measure configuration to the `AtumAgent`.
See `server/README.md`.

## How to generate Code coverage report
Expand Down
32 changes: 18 additions & 14 deletions agent/README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
# Atum Agent


`Atum Agent` module has two main features
`AtumAgent`: Retrieves the configurations and reports the measures.
`AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.

`Atum Agent` module has two main parts:
* `AtumAgent`: Retrieves the configurations and reports the measures.
* `AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.


## Usage

Include `AtumAgent` as an implicit in the scope for use by the `AtumContext`.
Create multiple `AtumContext` with different control measures to be applied

### Option 1
```scala
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import za.co.absa.atum.agent.model._
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
```

### Option 2
Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`.
```scala
implicit val agent: AtumAgent = new AgentImpl
val atumContext1 = AtumAgent.createAtumContext(atumPartition)
```

Create multiple `AtumContext` with different control measures
#### AtumPartitions
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" ))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
val subPartition = atumPartitions.addPartition("otherKey", "otherValue")
```

Control measures can also be overwritten, added or removed.
Expand Down
55 changes: 49 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,61 @@

package za.co.absa.atum.agent

import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasureResult

/**
* Place holder for the agent that communicate with the API.
*/
object AtumAgent {
class AtumAgent private() {

def measurePublish(checkpointKey: String, measure: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey, " + (measure))
/**
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
println(
s"Enqueued measurement: ${Seq(checkpointKey, atumContext, measureResult).mkString(" || ")}"
)

def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit = println(
Seq(checkpointKey, context, measureResult).mkString(" || ")
)
/**
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
*
* @param checkpointKey
* @param measureResult
*/
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey || $measureResult")

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* @param atumPartitions
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit atumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = atumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, atumContext.copy(atumPartitions = newPartitions, parentAgent = this))
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
synchronized{
contexts.getOrElse(atumPartitions, {
val result = creationMethod
contexts = contexts + (atumPartitions -> result)
result
})
}
}


private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}

object AtumAgent extends AtumAgent
87 changes: 58 additions & 29 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,87 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{MeasureResult, Measurement}
import za.co.absa.atum.agent.model.{MeasureResult, Measure}
import AtumContext.AtumPartitions

import scala.collection.immutable.ListMap

/**
* AtumContext: This class provides the methods to measure Spark `Dataframe`. Also allows to add/edit/remove measures.
* @param measurements: A sequences of measurements.
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param parentAgent
* @param measures
*/
class AtumContext private[agent](
val atumPartitions: AtumPartitions,
val parentAgent: AtumAgent,
private var measures: Set[Measure] = Set.empty) {

case class AtumContext(measurements: Set[Measurement] = Set()) {
def currentMeasures: Set[Measure] = measures

def withMeasuresReplaced(
byMeasure: Measurement
): AtumContext =
this.copy(measurements = Set(byMeasure))
def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def withMeasuresReplaced(
byMeasures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = byMeasures.toSet)
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = {
??? //TODO #26
}

def withMeasuresAdded(
measure: Measurement
): AtumContext =
this.copy(measurements = measurements + measure)
def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = {
??? //TODO #55
}

def withMeasuresAdded(
measures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = measurements ++ measures)
def addAdditionalData(key: String, value: String) = {
??? //TODO #60
}

def withMeasureRemoved(measurement: Measurement): AtumContext =
this.copy(measurements = measurements.filterNot(_ == measurement))
def addMeasure(newMeasure: Measure): AtumContext = {
measures = measures + newMeasure
this
}

def addMeasures(newMeasures: Set[Measure]): AtumContext = {
measures = measures ++ newMeasures
this
}

def removeMeasure(measureToRemove: Measure): AtumContext = {
measures = measures - measureToRemove
this
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
parentAgent: AtumAgent = this.parentAgent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, parentAgent, measures)
}
}

object AtumContext {
type AtumPartitions = ListMap[String, String]

object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Executes the measure directly with not AtumContext.
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measurement): DataFrame = {

def executeMeasure(checkpointName: String, measure: Measure): DataFrame = {
val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
}

def executeMeasures(checkpointName: String, measures: Iterable[Measurement]): DataFrame = {
def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = {
measures.foreach(m => executeMeasure(checkpointName, m))
df
}
Expand All @@ -78,11 +109,9 @@ object AtumContext {
* @return
*/
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.measurements.foreach { measure =>
atumContext.measures.foreach { measure =>
val result = MeasureResult(measure, measure.function(df))
AtumAgent.publish(checkpointName, atumContext, result)

executeMeasures(checkpointName, atumContext.measurements)
}

df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ package za.co.absa.atum.agent.model

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

/**
* Type of different measures to be applied to the columns.
*/
trait Measurement extends MeasurementProcessor {
trait Measure extends MeasurementProcessor {

val controlCol: String
}

object Measurement {
object Measure {

private val valueColumnName: String = "value"

case class RecordCount(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => ds.select(col(controlCol)).count().toString
Expand All @@ -46,14 +46,14 @@ object Measurement {

case class DistinctRecordCount(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => ds.select(col(controlCol)).distinct().count().toString
}
case class SumOfValuesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(col(valueColumnName))
Expand All @@ -64,7 +64,7 @@ object Measurement {

case class AbsSumOfValuesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
Expand All @@ -74,7 +74,7 @@ object Measurement {

case class SumOfHashesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

package za.co.absa.atum.agent.model

case class MeasureResult(measurement: Measurement, result: String)
case class MeasureResult(measurement: Measure, result: String)
53 changes: 0 additions & 53 deletions agent/src/test/scala/za/co/absa/atum/agent/AtumContextSpec.scala

This file was deleted.

Loading

0 comments on commit aec3c76

Please sign in to comment.