Skip to content

Commit

Permalink
#84 DTOs integrated into atum-agent
Browse files Browse the repository at this point in the history
  • Loading branch information
salamonpavel committed Oct 12, 2023
1 parent 5b4bbc0 commit 8d517a0
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 107 deletions.
30 changes: 10 additions & 20 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasureResult
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.CheckpointDTO

/**
* Place holder for the agent that communicate with the API.
Expand All @@ -34,22 +34,12 @@ class AtumAgent private() {
}

/**
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
dispatcher.publish(checkpointKey, atumContext, measureResult)

/**
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
*
* @param checkpointKey
* @param measureResult
* Sends `CheckpointDTO` to the AtumService API
* @param checkpoint
*/
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
dispatcher.publish(checkpointKey, measureResult)
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
Expand All @@ -60,9 +50,9 @@ class AtumAgent private() {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit atumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = atumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, atumContext.copy(atumPartitions = newPartitions, parentAgent = this))
def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, parentAtumContext.copy(atumPartitions = newPartitions, parentAgent = this))
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
Expand Down
63 changes: 22 additions & 41 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,39 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{MeasureResult, Measure}
import za.co.absa.atum.agent.model.Measure
import AtumContext.AtumPartitions

import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param parentAgent
* @param measures
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param agent
* @param measures
*/
class AtumContext private[agent](
val atumPartitions: AtumPartitions,
val parentAgent: AtumAgent,
private var measures: Set[Measure] = Set.empty) {
class AtumContext private[agent] (
val atumPartitions: AtumPartitions,
val agent: AtumAgent,
private var measures: Set[Measure] = Set.empty
) {

def currentMeasures: Set[Measure] = measures

def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = {
??? //TODO #26
def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = {
??? // TODO #26
}

def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = {
??? //TODO #55
??? // TODO #55
}

def addAdditionalData(key: String, value: String) = {
??? //TODO #60
??? // TODO #60
}

def addMeasure(newMeasure: Measure): AtumContext = {
Expand All @@ -67,11 +68,11 @@ class AtumContext private[agent](
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
parentAgent: AtumAgent = this.parentAgent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, parentAgent, measures)
atumPartitions: AtumPartitions = this.atumPartitions,
agent: AtumAgent = this.agent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, agent, measures)
}
}

Expand All @@ -86,34 +87,14 @@ object AtumContext {

implicit class DatasetWrapper(df: DataFrame) {

/**
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measure): DataFrame = {
val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
}

def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = {
measures.foreach(m => executeMeasure(checkpointName, m))
df
}

/**
* Set a point in the pipeline to execute calculation.
* @param checkpointName The key assigned to this checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.measures.foreach { measure =>
val result = MeasureResult(measure, measure.function(df))
AtumAgent.publish(checkpointName, atumContext, result)
}

def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
// todo: implement checkpoint creation
df
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.agent.dispatcher

import org.apache.spark.internal.Logging
import za.co.absa.atum.agent.AtumContext
import za.co.absa.atum.agent.model.MeasureResult
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}

/**
* dispatcher useful for development, testing and debugging
Expand All @@ -27,10 +27,14 @@ class ConsoleDispatcher extends Dispatcher with Logging {

logInfo("using console dispatcher")

override def publish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Publishing $checkpointKey $measureResult")

override def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit =
println(s"Publishing $checkpointKey $context $measureResult")

override def fetchAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): Option[AtumContextDTO] = {
println(s"Fetching AtumContext using ConsoleDispatcher with $partitioning and $parentPartitioning")
None
}
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package za.co.absa.atum.agent.dispatcher

import za.co.absa.atum.agent.AtumContext
import za.co.absa.atum.agent.model.MeasureResult
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}

trait Dispatcher {
def publish(checkpointKey: String, measureResult: MeasureResult): Unit
def fetchAtumContext(partitioning: Partitioning, parentPartitioning: Option[Partitioning]): Option[AtumContextDTO]

def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit
def saveCheckpoint(checkpoint: CheckpointDTO): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import za.co.absa.atum.agent.model.MeasureResult
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.agent.AtumContext
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}

class HttpDispatcher(config: Config) extends Dispatcher with Logging {

Expand All @@ -31,16 +31,16 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
logInfo("using http dispatcher")
logInfo(s"serverUri $serverUri")

override def publish(checkpointKey: String, measureResult: MeasureResult): Unit = {
basicRequest
.body(s"$checkpointKey $measureResult")
.post(serverUri)
.send(backend)
override def fetchAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): Option[AtumContextDTO] = {
???
}

override def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit = {
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
basicRequest
.body(s"$checkpointKey $context $measureResult")
.body(s"$checkpoint")
.post(serverUri)
.send(backend)
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.agent.model

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.{AtumAgent, AtumContext}
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.model.Measure._
import za.co.absa.spark.commons.test.SparkTestBase
Expand Down Expand Up @@ -48,21 +48,21 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>
.format("csv")
.option("header", "true")
.load("agent/src/test/resources/random-dataset/persons.csv")
.createCheckpoint("name1")(atumContextInstanceWithRecordCount)
.createCheckpoint("name2")(atumContextWithNameHashSum)
.createCheckpoint("name1", "author")(atumContextInstanceWithRecordCount)
.createCheckpoint("name2", "author")(atumContextWithNameHashSum)

val dsEnrichment = spark.read
.format("csv")
.option("header", "true")
.load("agent/src/test/resources/random-dataset/persons-enriched.csv")
.createCheckpoint("name3")(
.createCheckpoint("name3", "author")(
atumContextWithSalaryAbsMeasure
.removeMeasure(salaryAbsSum)
)

val dfFull = dfPersons
.join(dsEnrichment, Seq("id"))
.createCheckpoint("other different name")(atumContextWithSalaryAbsMeasure)
.createCheckpoint("other different name", "author")(atumContextWithSalaryAbsMeasure)

val dfExtraPersonWithNegativeSalary = spark
.createDataFrame(
Expand All @@ -74,7 +74,7 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>

val dfExtraPerson = dfExtraPersonWithNegativeSalary.union(dfPersons)

dfExtraPerson.createCheckpoint("a checkpoint name")(
dfExtraPerson.createCheckpoint("a checkpoint name", "author")(
atumContextWithSalaryAbsMeasure
.removeMeasure(measureIds)
.removeMeasure(salaryAbsSum)
Expand Down

0 comments on commit 8d517a0

Please sign in to comment.