Skip to content

Commit

Permalink
#280: agent and server compatibility in AD
Browse files Browse the repository at this point in the history
  • Loading branch information
lsulak committed Oct 1, 2024
1 parent a0e7045 commit 6f10138
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 28 deletions.
8 changes: 4 additions & 4 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ trait AtumAgent {

/**
* Sends `AdditionalDataPatchDTO` to the AtumService API
* @param partitioningId partitioning ID for which the additional data is to be saved.
* @param atumPartitions: Partitioning for which the additional data is to be saved.
* @param additionalDataPatchDTO the data to be saved or updated if already existing.
*/
private[agent] def updateAdditionalData(
partitioningId: Long,
atumPartitions: AtumPartitions,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
dispatcher.updateAdditionalData(partitioningId, additionalDataPatchDTO)
dispatcher.updateAdditionalData(AtumPartitions.toSeqPartitionDTO(atumPartitions), additionalDataPatchDTO)
}

/**
Expand Down Expand Up @@ -126,7 +126,7 @@ object AtumAgent extends AtumAgent {
case "http" => new HttpDispatcher(config)
case "console" => new ConsoleDispatcher(config)
case "capture" => new CapturingDispatcher(config)
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''")
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt'")
}
}

Expand Down
7 changes: 2 additions & 5 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitioningId: Atum partitioning ID associated with a given Atum Context.
* @param atumPartitions: Atum partitions associated with a given Atum Context.
* @param agent: Reference to an Atum Agent object that will be used within the current Atum Contedt.
* @param measures: Variable set of measures associated with a given partitions / context.
* @param additionalData: Additional metadata or tags, associated with a given context.
*/
class AtumContext private[agent] (
val atumPartitioningId: Long,
val atumPartitions: AtumPartitions,
val agent: AtumAgent,
private var measures: Set[AtumMeasure] = Set.empty,
Expand Down Expand Up @@ -151,7 +149,7 @@ class AtumContext private[agent] (
agent.currentUser,
newAdditionalDataToAdd,
)
agent.updateAdditionalData(this.atumPartitioningId, currAdditionalDataSubmit)
agent.updateAdditionalData(this.atumPartitions, currAdditionalDataSubmit)

this.additionalData ++= newAdditionalDataToAdd.map{case (k,v) => (k, Some(v))}

Expand Down Expand Up @@ -203,7 +201,7 @@ class AtumContext private[agent] (
measures: Set[AtumMeasure] = measures,
additionalData: InitialAdditionalDataDTO = additionalData
): AtumContext = {
new AtumContext(1L, atumPartitions, agent, measures, additionalData)
new AtumContext(atumPartitions, agent, measures, additionalData)
}
}

Expand Down Expand Up @@ -237,7 +235,6 @@ object AtumContext {

private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = {
new AtumContext(
1L, // TODO
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) {

/**
* This method is used to save the additional data to the server.
* @param partitioningId partitioning ID for which the additional data is to be saved.
* @param partitioning partitioning for which the additional data is to be saved.
* @param additionalDataPatchDTO the data to be saved or updated if already existing.
*/
override protected[agent] def updateAdditionalData(
partitioningId: Long,
partitioning: PartitioningDTO,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
val result = AdditionalDataDTO(
Expand All @@ -129,7 +129,7 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) {
}
)

captureFunctionCall((partitioningId, additionalDataPatchDTO), result)
captureFunctionCall((partitioning, additionalDataPatchDTO), result)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import za.co.absa.atum.model.dto.{
AdditionalDataPatchDTO,
AtumContextDTO,
CheckpointDTO,
PartitioningDTO,
PartitioningSubmitDTO
}

/**
* dispatcher useful for development, testing and debugging
*/
class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {
class ConsoleDispatcher(config: Config) extends Dispatcher(config) with Logging {

logInfo("using console dispatcher")

Expand All @@ -44,10 +45,10 @@ class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with
}

override protected[agent] def updateAdditionalData(
partitioningId: Long,
partitioning: PartitioningDTO,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
println(s"Saving the additional data to server for partitioning ID: '$partitioningId'. $additionalDataPatchDTO")
println(s"Saving the additional data to server for partitioning: '$partitioning': $additionalDataPatchDTO")

AdditionalDataDTO(
additionalDataPatchDTO.data.map { case (key, value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.atum.model.dto.{
AdditionalDataPatchDTO,
AtumContextDTO,
CheckpointDTO,
PartitioningDTO,
PartitioningSubmitDTO
}

Expand All @@ -31,6 +32,7 @@ import za.co.absa.atum.model.dto.{
*/
abstract class Dispatcher(config: Config) {


/**
* This method is used to ensure the server knows the given partitioning.
* As a response the `AtumContext` is fetched from the server.
Expand All @@ -47,11 +49,11 @@ abstract class Dispatcher(config: Config) {

/**
* This method is used to save the additional data to the server.
* @param partitioningId partitioning ID for which the additional data is to be saved.
* @param partitioning partitioning for which the additional data is to be saved.
* @param additionalDataPatchDTO the data to be saved or updated if already existing.
*/
protected[agent] def updateAdditionalData(
partitioningId: Long,
partitioning: PartitioningDTO,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import io.circe.syntax.EncoderOps
import org.apache.spark.internal.Logging
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto.{
AdditionalDataDTO,
AdditionalDataPatchDTO,
AtumContextDTO,
CheckpointDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.model.dto
import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._

class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {
import java.util.Base64

class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
import HttpDispatcher._

val serverUrl: String = config.getString(UrlKey)
Expand All @@ -42,6 +40,8 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log

private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/createPartitioning")
private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/createCheckpoint")

private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath")
private def createAdditionalDataEndpoint(partitioningId: Long): Uri =
Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data")

Expand All @@ -54,6 +54,24 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
logInfo("using http dispatcher")
logInfo(s"serverUrl $serverUrl")

/**
* This method is used to get the partitioning ID from the server.
* @param partitioning: Partitioning to obtain ID for.
* @return Long ID of the partitioning.
*/

private[dispatcher] def getPartitioningId(partitioning: PartitioningDTO): Long = {
val encodedPartitioning = Base64.getUrlEncoder.encodeToString(
partitioning.asJson(dto.encodePartitioningDTO).noSpaces.getBytes("UTF-8")
)

val request = commonAtumRequest.get(getPartitioningIdEndpoint.addParam("partitioning", encodedPartitioning))

val response = backend.send(request)

handleResponseBody(response).as[PartitioningWithIdDTO].id
}

override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
val request = commonAtumRequest
.post(createPartitioningEndpoint)
Expand All @@ -75,9 +93,11 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
}

override protected[agent] def updateAdditionalData(
partitioningId: Long,
partitioning: PartitioningDTO,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
val partitioningId = getPartitioningId(partitioning)

val request = commonAtumRequest
.patch(createAdditionalDataEndpoint(partitioningId))
.body(additionalDataPatchDTO.asJsonString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ trait Routes extends Endpoints with ServerOptions {
// postCheckpointEndpointV2,
createPartitioningEndpointV1,
// postPartitioningEndpointV2,
// patchPartitioningAdditionalDataEndpointV2,
patchPartitioningAdditionalDataEndpointV2,
// getPartitioningCheckpointsEndpointV2,
// getPartitioningCheckpointEndpointV2,
// getPartitioningMeasuresEndpointV2,
// getPartitioningEndpointV2,
getPartitioningEndpointV2
// getPartitioningMeasuresEndpointV2,
// getFlowPartitioningsEndpointV2,
// getPartitioningMainFlowEndpointV2
Expand Down

0 comments on commit 6f10138

Please sign in to comment.