Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#25: create class for partitions #45

Merged
merged 19 commits into from
Sep 18, 2023
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# Atum


## Modules

### Agent `agent/`
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository. Provides libraries for establishing and pushing them to the API located in `server/`.
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository.
It provides libraries for establishing and pushing them to the API located in `server/`.
See `agent/README.md`.

### Server `server/`
An API under construction that communicates with AtumAgent and with the persisting storage. It also provides measure configuration to the `AtumAgent`.
An API under construction that communicates with the Agent and with the persistent storage.
It also provides measure configuration to the `AtumAgent`.
See `server/README.md`.

## How to generate Code coverage report
Expand Down
32 changes: 18 additions & 14 deletions agent/README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
# Atum Agent


`Atum Agent` module has two main features
`AtumAgent`: Retrieves the configurations and reports the measures.
`AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.

`Atum Agent` module has two main parts:
* `AtumAgent`: Retrieves the configurations and reports the measures.
* `AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.


## Usage

Include `AtumAgent` as an implicit in the scope for use by the `AtumContext`.
Create multiple `AtumContext` with different control measures to be applied

### Option 1
```scala
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import za.co.absa.atum.agent.model._
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
```

### Option 2
Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`.
```scala
implicit val agent: AtumAgent = new AgentImpl
val atumContext1 = AtumAgent.createAtumContext(atumPartition)
```

Create multiple `AtumContext` with different control measures
#### AtumPartitions
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" ))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
val subPartition = atumPartitions.addPartition("otherKey", "otherValue")
```

Control measures can also be overwritten, added or removed.
Expand Down
55 changes: 49 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,61 @@

package za.co.absa.atum.agent

import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasureResult

/**
* Place holder for the agent that communicate with the API.
*/
object AtumAgent {
class AtumAgent private() {

def measurePublish(checkpointKey: String, measure: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey, " + (measure))
/**
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
println(
s"Enqueued measurement: ${Seq(checkpointKey, atumContext, measureResult).mkString(" || ")}"
)

def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit = println(
Seq(checkpointKey, context, measureResult).mkString(" || ")
)
/**
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
*
* @param checkpointKey
* @param measureResult
*/
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey || $measureResult")

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* @param atumPartitions
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit atumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = atumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, atumContext.copy(atumPartitions = newPartitions, parentAgent = this))
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
synchronized{
contexts.getOrElse(atumPartitions, {
val result = creationMethod
contexts = contexts + (atumPartitions -> result)
result
})
}
}


private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}

object AtumAgent extends AtumAgent
87 changes: 59 additions & 28 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,87 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{MeasureResult, Measurement}
import za.co.absa.atum.agent.model.{MeasureResult, Measure}
import AtumContext.AtumPartitions

import scala.collection.immutable.ListMap

/**
* AtumContext: This class provides the methods to measure Spark `Dataframe`. Also allows to add/edit/remove measures.
* @param measurements: A sequences of measurements.
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param parentAgent
* @param measures
*/
class AtumContext private[agent](
val atumPartitions: AtumPartitions,
val parentAgent: AtumAgent,
private var measures: Set[Measure] = Set.empty) {

def currentMeasures: Set[Measure] = measures

def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

case class AtumContext(measurements: Set[Measurement] = Set()) {
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = {
??? //TODO #26
}

def withMeasuresReplaced(
byMeasure: Measurement
): AtumContext =
this.copy(measurements = Set(byMeasure))
def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = {
??? //TODO #55
}

def withMeasuresReplaced(
byMeasures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = byMeasures.toSet)
def addAdditionalData(key: String, value: String) = {
??? //TODO #60
}

def withMeasuresAdded(
measure: Measurement
): AtumContext =
this.copy(measurements = measurements + measure)
def addMeasure(newMeasure: Measure): AtumContext = {
measures = measures + newMeasure
this
}

def withMeasuresAdded(
measures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = measurements ++ measures)
def addMeasures(newMeasures: Set[Measure]): AtumContext = {
measures = measures ++ newMeasures
this
}

def withMeasureRemoved(measurement: Measurement): AtumContext =
this.copy(measurements = measurements.filterNot(_ == measurement))
def removeMeasure(measureToRemove: Measure): AtumContext = {
measures = measures - measureToRemove
this
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
parentAgent: AtumAgent = this.parentAgent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, parentAgent, measures)
}
}

object AtumContext {
type AtumPartitions = ListMap[String, String]

object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Executes the measure directly with not AtumContext.
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measurement): DataFrame = {

def executeMeasure(checkpointName: String, measure: Measure): DataFrame = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that execute should do only executing and publishing should do publishing. And it can be all put together in some other operation.

I definitely wouldn't expect from 'executeMeasure' to actually also send data to the server

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these could and should be eventually completely removed. Thee;s no scenario where just particular measure are recorded for a checkpoint.

val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
}

def executeMeasures(checkpointName: String, measures: Iterable[Measurement]): DataFrame = {
def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = {
measures.foreach(m => executeMeasure(checkpointName, m))
df
}
Expand All @@ -78,11 +109,11 @@ object AtumContext {
* @return
*/
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.measurements.foreach { measure =>
atumContext.measures.foreach { measure =>
val result = MeasureResult(measure, measure.function(df))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you can call executeMeasure function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it will (probably) go away, can this stay? 😉

AtumAgent.publish(checkpointName, atumContext, result)

executeMeasures(checkpointName, atumContext.measurements)
executeMeasures(checkpointName, atumContext.measures)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line can be removed, it's doing the same as the whole foreach basically

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will removed, but as said, this needs reword. createCheckpoint and send it as an "atomic" steo.

}

df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ package za.co.absa.atum.agent.model

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

/**
* Type of different measures to be applied to the columns.
*/
trait Measurement extends MeasurementProcessor {
trait Measure extends MeasurementProcessor {

val controlCol: String
}

object Measurement {
object Measure {

private val valueColumnName: String = "value"

case class RecordCount(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => ds.select(col(controlCol)).count().toString
Expand All @@ -46,14 +46,14 @@ object Measurement {

case class DistinctRecordCount(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => ds.select(col(controlCol)).distinct().count().toString
}
case class SumOfValuesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(col(valueColumnName))
Expand All @@ -64,7 +64,7 @@ object Measurement {

case class AbsSumOfValuesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
Expand All @@ -74,7 +74,7 @@ object Measurement {

case class SumOfHashesOfColumn(
controlCol: String
) extends Measurement {
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

package za.co.absa.atum.agent.model

case class MeasureResult(measurement: Measurement, result: String)
case class MeasureResult(measurement: Measure, result: String)
lsulak marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

Loading
Loading