-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#25: create class for partitions #45
Changes from 18 commits
51fa957
2918fd0
ce4c6e8
507b03b
f15d728
ddce673
8130c4a
b2cdd2f
48dd958
c9e43cc
bbfe10b
f6df867
47d92c8
0f3099d
5a2b61b
29f8300
5b9b422
a074610
7faadcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,56 +17,87 @@ | |
package za.co.absa.atum.agent | ||
|
||
import org.apache.spark.sql.DataFrame | ||
import za.co.absa.atum.agent.model.{MeasureResult, Measurement} | ||
import za.co.absa.atum.agent.model.{MeasureResult, Measure} | ||
import AtumContext.AtumPartitions | ||
|
||
import scala.collection.immutable.ListMap | ||
|
||
/** | ||
* AtumContext: This class provides the methods to measure Spark `Dataframe`. Also allows to add/edit/remove measures. | ||
* @param measurements: A sequences of measurements. | ||
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures. | ||
* @param atumPartitions | ||
* @param parentAgent | ||
* @param measures | ||
*/ | ||
class AtumContext private[agent]( | ||
val atumPartitions: AtumPartitions, | ||
val parentAgent: AtumAgent, | ||
private var measures: Set[Measure] = Set.empty) { | ||
|
||
def currentMeasures: Set[Measure] = measures | ||
|
||
def subPartitionContext(subPartitions: AtumPartitions): AtumContext = { | ||
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) | ||
} | ||
|
||
case class AtumContext(measurements: Set[Measurement] = Set()) { | ||
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = { | ||
??? //TODO #26 | ||
} | ||
|
||
def withMeasuresReplaced( | ||
byMeasure: Measurement | ||
): AtumContext = | ||
this.copy(measurements = Set(byMeasure)) | ||
def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = { | ||
??? //TODO #55 | ||
} | ||
|
||
def withMeasuresReplaced( | ||
byMeasures: Iterable[Measurement] | ||
): AtumContext = | ||
this.copy(measurements = byMeasures.toSet) | ||
def addAdditionalData(key: String, value: String) = { | ||
??? //TODO #60 | ||
} | ||
|
||
def withMeasuresAdded( | ||
measure: Measurement | ||
): AtumContext = | ||
this.copy(measurements = measurements + measure) | ||
def addMeasure(newMeasure: Measure): AtumContext = { | ||
measures = measures + newMeasure | ||
this | ||
} | ||
|
||
def withMeasuresAdded( | ||
measures: Iterable[Measurement] | ||
): AtumContext = | ||
this.copy(measurements = measurements ++ measures) | ||
def addMeasures(newMeasures: Set[Measure]): AtumContext = { | ||
measures = measures ++ newMeasures | ||
this | ||
} | ||
|
||
def withMeasureRemoved(measurement: Measurement): AtumContext = | ||
this.copy(measurements = measurements.filterNot(_ == measurement)) | ||
def removeMeasure(measureToRemove: Measure): AtumContext = { | ||
measures = measures - measureToRemove | ||
this | ||
} | ||
|
||
private[agent] def copy( | ||
atumPartitions: AtumPartitions = this.atumPartitions, | ||
parentAgent: AtumAgent = this.parentAgent, | ||
measures: Set[Measure] = this.measures | ||
): AtumContext = { | ||
new AtumContext(atumPartitions, parentAgent, measures) | ||
} | ||
} | ||
|
||
object AtumContext { | ||
type AtumPartitions = ListMap[String, String] | ||
|
||
object AtumPartitions { | ||
def apply(elems: (String, String)): AtumPartitions = { | ||
ListMap(elems) | ||
} | ||
} | ||
|
||
implicit class DatasetWrapper(df: DataFrame) { | ||
|
||
/** | ||
* Executes the measure directly with not AtumContext. | ||
* Executes the measure directly (without AtumContext). | ||
* @param measure the measure to be calculated | ||
* @return | ||
*/ | ||
def executeMeasure(checkpointName: String, measure: Measurement): DataFrame = { | ||
|
||
def executeMeasure(checkpointName: String, measure: Measure): DataFrame = { | ||
val result = MeasureResult(measure, measure.function(df)) | ||
AtumAgent.measurePublish(checkpointName, result) | ||
df | ||
} | ||
|
||
def executeMeasures(checkpointName: String, measures: Iterable[Measurement]): DataFrame = { | ||
def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = { | ||
measures.foreach(m => executeMeasure(checkpointName, m)) | ||
df | ||
} | ||
|
@@ -78,11 +109,11 @@ object AtumContext { | |
* @return | ||
*/ | ||
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = { | ||
atumContext.measurements.foreach { measure => | ||
atumContext.measures.foreach { measure => | ||
val result = MeasureResult(measure, measure.function(df)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here you can call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it will (probably) go away, can this stay? 😉 |
||
AtumAgent.publish(checkpointName, atumContext, result) | ||
|
||
executeMeasures(checkpointName, atumContext.measurements) | ||
executeMeasures(checkpointName, atumContext.measures) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line can be removed, it's doing the same as the whole foreach basically There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will removed, but as said, this needs reword. |
||
} | ||
|
||
df | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that execute should do only executing and publishing should do publishing. And it can be all put together in some other operation.
I definitely wouldn't expect from 'executeMeasure' to actually also send data to the server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these could and should be eventually completely removed. Thee;s no scenario where just particular measure are recorded for a checkpoint.