Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get partitioning measures: 229 #249

Merged
merged 35 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
29d3750
Defining and implementing PL/PG function
TebaleloS Aug 20, 2024
d51000c
Defining measure dto
TebaleloS Aug 20, 2024
b1bf7c4
Removing measure ID
TebaleloS Aug 26, 2024
b58c814
Merge branch 'master' into feature/229-get-partitioning-measures
TebaleloS Aug 26, 2024
70fbe30
defining get class to call db-function
TebaleloS Aug 26, 2024
1763353
Implement db call class
TebaleloS Aug 27, 2024
a434181
removing unused import
TebaleloS Aug 27, 2024
b8dfddf
adding integration tests
TebaleloS Aug 28, 2024
a95ed41
restoring db-function
TebaleloS Aug 28, 2024
857dd02
Adding test and fixing the build
TebaleloS Aug 28, 2024
880b435
Adding tests layer and updating sql function
TebaleloS Aug 28, 2024
1098791
Implementing service functionality
TebaleloS Aug 28, 2024
32862a0
Implementing Controller functionality
TebaleloS Aug 28, 2024
8457553
fixing return type
TebaleloS Aug 28, 2024
5791887
Merge branch 'master' into feature/229-get-partitioning-measures
TebaleloS Aug 28, 2024
6a691b4
fixing merge conflicts
TebaleloS Aug 28, 2024
74a8c45
Adding the endpoint
TebaleloS Aug 28, 2024
415739f
Adding repository unit test
TebaleloS Aug 28, 2024
78132bb
Adding service unit test
TebaleloS Aug 28, 2024
6ba1114
Merge branch 'master' into feature/229-get-partitioning-measures
TebaleloS Aug 30, 2024
6a6786d
Fixing merge conflicts
TebaleloS Aug 30, 2024
a6f627c
Fixing fileName conflicts
TebaleloS Aug 30, 2024
2df573a
removing unused file
TebaleloS Aug 30, 2024
02dbe17
Changing file name
TebaleloS Sep 2, 2024
760eac3
fixing file format
TebaleloS Sep 2, 2024
448cb33
Removing outdated tests
TebaleloS Sep 2, 2024
bcfbe5b
Fixing the function and the test cases
TebaleloS Sep 2, 2024
6f4ba3c
Merge branch 'master' into feature/229-get-partitioning-measures
TebaleloS Sep 2, 2024
fdcdb7e
Restoring files
TebaleloS Sep 2, 2024
94611e1
implementing endpoint test-cases
TebaleloS Sep 2, 2024
8f5bdc1
Fixing endpoint tests
TebaleloS Sep 2, 2024
18faca9
Merge branch 'master' into feature/229-get-partitioning-measures
TebaleloS Sep 4, 2024
bcce813
Addressing GitHub comments
TebaleloS Sep 4, 2024
9556fd1
Update database/src/test/scala/za/co/absa/atum/database/runs/GetParti…
TebaleloS Sep 4, 2024
d4a49fa
Update database/src/test/scala/za/co/absa/atum/database/runs/GetParti…
TebaleloS Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Function: runs.get_partitioning_measures_by_id(Long)
CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id(
IN i_partitioning_id BIGINT,
OUT status INTEGER,
OUT status_text TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[]
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_measures_by_id(1)
-- Returns measures for the given partitioning id
--
-- Parameters:
-- i_partitioning_id - partitioning id we are asking the measures for
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- measure_name - Name of the measure
-- measured_columns - Array of columns associated with the measure
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN

PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;

IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

RETURN QUERY
SELECT 11, 'OK', MD.measure_name, MD.measured_columns
FROM runs.measure_definitions AS MD
WHERE MD.fk_partitioning = i_partitioning_id;

RETURN;

END;
$$
LANGUAGE plpgsql volatile SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_measures_by_id(BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures_by_id(BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import za.co.absa.balta.classes.setter.CustomDBType

class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite {
private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id"

private val partitioning: JsonBString = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

test("Get partitioning measures by id should return partitioning measures for partitioning with measures") {

table("runs.partitionings").insert(
add("partitioning", partitioning)
.add("created_by", "Thomas")
)

val fkPartitioning: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning, "id_partitioning").get.get

table("runs.measure_definitions").insert(
add("fk_partitioning", fkPartitioning)
.add("created_by", "Thomas")
.add("measure_name", "measure1")
.add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]"))
)

table("runs.measure_definitions").insert(
add("fk_partitioning", fkPartitioning)
.add("created_by", "Thomas")
.add("measure_name", "measure2")
.add("measured_columns", CustomDBType("""{"col2"}""", "TEXT[]"))
)

function(fncGetPartitioningMeasuresById)
.setParam("i_partitioning_id", fkPartitioning)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("OK"))
assert(results.getString("measure_name").contains("measure1"))
assert(results.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1")))

val results2 = queryResult.next()
assert(results2.getInt("status").contains(11))
assert(results2.getString("status_text").contains("OK"))
assert(results2.getString("measure_name").contains("measure2"))
assert(results2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2")))
}
}

test("Get partitioning measures by id should return error for partitioning without measures") {

table("runs.partitionings").insert(
add("partitioning", partitioning)
.add("created_by", "Thomas")
)

val fkPartitioning: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning, "id_partitioning").get.get

function(fncGetPartitioningMeasuresById)
.setParam(fkPartitioning)
.execute { queryResult =>
assert(!queryResult.hasNext)
}
}

test("Get partitioning measures by id should return an error for non-existing partitioning") {

function(fncGetPartitioningMeasuresById)
.setParam(999)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(41))
assert(results.getString("status_text").contains("Partitioning not found"))
TebaleloS marked this conversation as resolved.
Show resolved Hide resolved
assert(!queryResult.hasNext) // checking no more records are returned.
}
}

}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object Main extends ZIOAppDefault with Server {
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningMeasuresById.layer,
GetPartitioningAdditionalData.layer,
GetPartitioningAdditionalDataV2.layer,
CreateOrUpdateAdditionalData.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ trait PartitioningController {

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]


def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}

override def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[MeasureDTO], Seq[MeasureDTO]](
partitioningService.getPartitioningMeasuresById(partitioningId)
)
)
}

}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio._
import za.co.absa.atum.server.model.MeasureFromDB

import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

class GetPartitioningMeasuresById(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values =>
Seq(fr"${values}")
) with StandardStatusHandling with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns")
}

object GetPartitioningMeasuresById {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasuresById] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningMeasuresById()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ trait Endpoints extends BaseEndpoints {
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val getPartitioningMeasuresEndpointV2
: PublicEndpoint[Long, ErrorResponse, MultiSuccessResponse[MeasureDTO], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Measures)
.out(statusCode(StatusCode.Ok))
.out(jsonBody[MultiSuccessResponse[MeasureDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = {
endpoint.get.in(ZioMetrics).out(stringBody)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.swagger.bundle.SwaggerInterpreter
import sttp.tapir.ztapir._
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO}
import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion}
import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController}
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
Expand Down Expand Up @@ -85,6 +85,7 @@ trait Routes extends Endpoints with ServerOptions {
createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2),
createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2),
createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2),
createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2),
createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit)
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes
Expand All @@ -102,7 +103,8 @@ trait Routes extends Endpoints with ServerOptions {
patchPartitioningAdditionalDataEndpointV2,
getPartitioningCheckpointsEndpointV2,
getPartitioningCheckpointEndpointV2,
getFlowCheckpointsEndpointV2
getFlowCheckpointsEndpointV2,
getPartitioningMeasuresEndpointV2
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))
.from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ trait PartitioningRepository {

def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO]


def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class PartitioningRepositoryImpl(
createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData,
getPartitioningCheckpointsFn: GetPartitioningCheckpoints,
getPartitioningByIdFn: GetPartitioningById,
getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2
getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2,
getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById
) extends PartitioningRepository
with BaseRepository {

Expand Down Expand Up @@ -98,6 +99,14 @@ class PartitioningRepositoryImpl(
}
}


override def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = {
dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresByIdFn(partitioningId), "getPartitioningMeasures")
.map(_.map { case MeasureFromDB(measureName, measuredColumns) =>
MeasureDTO(measureName.get, measuredColumns.get)
})
}

}

object PartitioningRepositoryImpl {
Expand All @@ -108,8 +117,8 @@ object PartitioningRepositoryImpl {
with CreateOrUpdateAdditionalData
with GetPartitioningCheckpoints
with GetPartitioningAdditionalDataV2
with GetPartitioningCheckpoints
with GetPartitioningById,
with GetPartitioningById
with GetPartitioningMeasuresById,
PartitioningRepository
] = ZLayer {
for {
Expand All @@ -120,14 +129,16 @@ object PartitioningRepositoryImpl {
getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints]
getPartitioningById <- ZIO.service[GetPartitioningById]
getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2]
getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById]
} yield new PartitioningRepositoryImpl(
createPartitioningIfNotExists,
getPartitioningMeasures,
getPartitioningAdditionalData,
createOrUpdateAdditionalData,
getPartitioningCheckpoints,
getPartitioningById,
getPartitioningAdditionalDataV2
getPartitioningAdditionalDataV2,
getPartitioningMeasuresV2
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ trait PartitioningService {

def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO]

def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
override def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] = {
repositoryCall(partitioningRepository.getPartitioning(partitioningId), "getPartitioning")
}

override def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] = {
repositoryCall(
partitioningRepository.getPartitioningMeasuresById(partitioningId),
"getPartitioningMeasuresById"
)
}

}

object PartitioningServiceImpl {
Expand Down
Loading
Loading