Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/77-scala212-support' int…
Browse files Browse the repository at this point in the history
…o feature/77-scala212-support
  • Loading branch information
lsulak committed Oct 26, 2023
2 parents 566eedb + 89f95ba commit 6926783
Show file tree
Hide file tree
Showing 36 changed files with 1,255 additions and 138 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel
1 change: 0 additions & 1 deletion .github/codeowners

This file was deleted.

4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ _productMT
_nestedStructs
myTestCheckpoints

**/main/resources/application.properties
**/main/resources/application.conf
**/main/resources/application.yaml
**/main/resources/application.yml

*.config
*.keytab
Expand Down
7 changes: 3 additions & 4 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ runner.dialect = scala212

maxColumn = 120

align.preset = some
align.preset = none
align.multiline = false

indent.main = 2
Expand All @@ -16,6 +16,5 @@ docstrings.style = AsteriskSpace
docstrings.wrap = no
docstrings.removeEmpty = true

align.openParenDefnSite = true
align.openParenCallSite = true

align.openParenDefnSite = false
align.openParenCallSite = false
18 changes: 18 additions & 0 deletions agent/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# dispatcher to be used (http or console)
atum.dispatcher.type="http"

# The REST API URI of the atum server
#atum.dispatcher.http.url=
75 changes: 46 additions & 29 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,77 @@
*/

package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasureResult
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.model.Checkpoint
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
* Place holder for the agent that communicate with the API.
*/
class AtumAgent private() {
class AtumAgent private[agent] () {

val config: Config = ConfigFactory.load()

private val dispatcher = config.getString("atum.dispatcher.type") match {
case "http" => new HttpDispatcher(config.getConfig("atum.dispatcher.http"))
case "console" => new ConsoleDispatcher
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''")
}

/**
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
* Sends `CheckpointDTO` to the AtumService API
* @param checkpoint
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
println(
s"Enqueued measurement: ${Seq(checkpointKey, atumContext, measureResult).mkString(" || ")}"
)
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
* Sends `Checkpoint` to the AtumService API
*
* @param checkpointKey
* @param measureResult
* @param checkpoint
*/
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey || $measureResult")
def saveCheckpoint(checkpoint: Checkpoint): Unit = {
dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* @param atumPartitions
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
val partitioningDTO = PartitioningDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None)
val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(atumPartitions, atumContext)
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit atumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = atumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, atumContext.copy(atumPartitions = newPartitions, parentAgent = this))
def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions

val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions)
val parentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions))
val partitioningDTO = PartitioningDTO(newPartitionsDTO, parentPartitionsDTO)

val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(newPartitions, atumContext)
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
synchronized{
contexts.getOrElse(atumPartitions, {
val result = creationMethod
contexts = contexts + (atumPartitions -> result)
result
})
private def getExistingOrNewContext(atumPartitions: AtumPartitions, newAtumContext: => AtumContext): AtumContext = {
synchronized {
contexts.getOrElse(
atumPartitions, {
contexts = contexts + (atumPartitions -> newAtumContext)
newAtumContext
}
)
}
}


private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}
Expand Down
97 changes: 56 additions & 41 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,53 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{MeasureResult, Measure}
import AtumContext.AtumPartitions
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasuresMapper}
import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO}

import java.time.OffsetDateTime
import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param parentAgent
* @param measures
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param agent
* @param measures
*/
class AtumContext private[agent](
val atumPartitions: AtumPartitions,
val parentAgent: AtumAgent,
private var measures: Set[Measure] = Set.empty) {
class AtumContext private[agent] (
val atumPartitions: AtumPartitions,
val agent: AtumAgent,
private var measures: Set[Measure] = Set.empty
) {

def currentMeasures: Set[Measure] = measures

def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = {
??? //TODO #26
def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = {
??? // TODO #26
}

def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = {
??? //TODO #55
def createCheckpointOnProvidedData(
checkpointName: String,
author: String,
measurements: Seq[Measurement]
): Checkpoint = {
val offsetDateTimeNow = OffsetDateTime.now()
Checkpoint(
name = checkpointName,
author = author,
atumPartitions = this.atumPartitions,
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
measurements = measurements
)
}

def addAdditionalData(key: String, value: String) = {
??? //TODO #60
??? // TODO #60
}

def addMeasure(newMeasure: Measure): AtumContext = {
Expand All @@ -67,11 +82,11 @@ class AtumContext private[agent](
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
parentAgent: AtumAgent = this.parentAgent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, parentAgent, measures)
atumPartitions: AtumPartitions = this.atumPartitions,
agent: AtumAgent = this.agent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, agent, measures)
}
}

Expand All @@ -82,38 +97,38 @@ object AtumContext {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}
}

implicit class DatasetWrapper(df: DataFrame) {
def apply(elems: Seq[(String, String)]): AtumPartitions = {
ListMap(elems:_*)
}

/**
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measure): DataFrame = {
val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): Seq[PartitionDTO] = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = {
measures.foreach(m => executeMeasure(checkpointName, m))
df
private[agent] def fromPartitioning(partitioning: Seq[PartitionDTO]): AtumPartitions = {
AtumPartitions(partitioning.map(partition => partition.key -> partition.value))
}
}

private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresMapper.mapToMeasures(atumContextDTO.measures)
)
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Set a point in the pipeline to execute calculation.
* @param checkpointName The key assigned to this checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.measures.foreach { measure =>
val result = MeasureResult(measure, measure.function(df))
AtumAgent.publish(checkpointName, atumContext, result)
}

def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
// todo: implement checkpoint creation
df
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.dispatcher

import org.apache.spark.internal.Logging
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

/**
* dispatcher useful for development, testing and debugging
*/
class ConsoleDispatcher extends Dispatcher with Logging {

logInfo("using console dispatcher")

override def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with partitioning $partitioning")
AtumContextDTO(partitioning = partitioning.partitioning)
}
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.dispatcher

import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

trait Dispatcher {
def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO

def saveCheckpoint(checkpoint: CheckpointDTO): Unit
}
Loading

0 comments on commit 6926783

Please sign in to comment.