From 29d37506ee41c236286c64358fc337e8dbd1a27d Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 20 Aug 2024 11:32:47 +0200 Subject: [PATCH 01/30] Defining and implementing PL/PG function --- ...1.9.5__get_partitioning_measures_by_id.sql | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql new file mode 100644 index 000000000..af6e5e5fa --- /dev/null +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -0,0 +1,68 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id( + IN i_partitioning_id BIGINT, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_measure_definition BIGINT, + OUT measure_name TEXT, + OUT measured_columns TEXT[] +) RETURNS SETOF record AS +$$ +------------------------------------------------------------------------------- +-- +-- Function: runs.get_partitioning_measures_by_id(1) +-- Returns measures for the given partitioning id +-- +-- Parameters: +-- i_partitioning_id - partitioning id we are asking the measures for +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- id_measure_definition - ID of the measure definition +-- measure_name - Name of the measure +-- measured_columns - Array of columns associated with the measure +-- +-- Status codes: +-- 11 - OK +-- 41 - Measures not found // Todo discus with the team +-- +------------------------------------------------------------------------------- +BEGIN + + RETURN QUERY + SELECT MD.id_measure_definition, MD.measure_name, MD.measured_columns + FROM runs.measure_definitions AS MD + WHERE MD.fk_partitioning = i_partitioning_id; + + IF FOUND THEN + status := 11; + status_text := 'Measures found'; + ELSE + status := 41; + status_text := 'Measures not found'; + END IF; + + RETURN NEXT; + RETURN; +END; +$$ +LANGUAGE plpgsql volatile SECURITY DEFINER; + +ALTER FUNCTION runs.get_partitioning_measures_by_id(BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures_by_id(BIGINT) TO atum_user; From d51000c09685622b9285762bc8005756f78cba7f Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 20 Aug 2024 12:03:39 +0200 Subject: [PATCH 02/30] Defining measure dto --- .../co/absa/atum/model/dto/MeasureWithIdDTO.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala new file mode 100644 index 000000000..a0fd75e42 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala @@ -0,0 +1,15 @@ +package za.co.absa.atum.model.dto + +import io.circe._ +import io.circe.generic.semiauto._ + +case class MeasureWithIdDTO ( + id: String, + measureName: String, + measuredColumns: Seq[String] +) + +object MeasureWithIdDTO { + implicit val decodeMeasureWithIdDTO: Decoder[MeasureWithIdDTO] = deriveDecoder[MeasureWithIdDTO] + implicit val encoderMeasureWithIdDTO: Encoder[MeasureWithIdDTO] = deriveEncoder[MeasureWithIdDTO] +} From b1bf7c4e567dce75e71a45802c0427deb3b4dd46 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 26 Aug 2024 09:18:36 +0200 Subject: [PATCH 03/30] Removing measure ID --- .../postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql | 4 +--- .../scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql index af6e5e5fa..6a786d52d 100644 --- a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -18,7 +18,6 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id( IN i_partitioning_id BIGINT, OUT status INTEGER, OUT status_text TEXT, - OUT id_measure_definition BIGINT, OUT measure_name TEXT, OUT measured_columns TEXT[] ) RETURNS SETOF record AS @@ -34,7 +33,6 @@ $$ -- Returns: -- status - Status code -- status_text - Status message --- id_measure_definition - ID of the measure definition -- measure_name - Name of the measure -- measured_columns - Array of columns associated with the measure -- @@ -46,7 +44,7 @@ $$ BEGIN RETURN QUERY - SELECT MD.id_measure_definition, MD.measure_name, MD.measured_columns + SELECT MD.measure_name, MD.measured_columns FROM runs.measure_definitions AS MD WHERE MD.fk_partitioning = i_partitioning_id; diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala index a0fd75e42..5386ee58d 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala @@ -4,7 +4,6 @@ import io.circe._ import io.circe.generic.semiauto._ case class MeasureWithIdDTO ( - id: String, measureName: String, measuredColumns: Seq[String] ) From 70fbe30ffb126e6a13d77424c38306fd8431c32b Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 26 Aug 2024 14:51:52 +0200 Subject: [PATCH 04/30] defining get class to call db-function --- .../functions/GetPartitioningMeasuresV2.scala | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala new file mode 100644 index 000000000..5325908b7 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import zio.Task + + +class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[Long, CheckpointFromDB, Task](input => + Seq(fr"${input}") + ) + +} From 17633539854f9a96cc9ed27babc3e02eae045f39 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 27 Aug 2024 11:49:12 +0200 Subject: [PATCH 05/30] Implement db call class --- .../scala/za/co/absa/atum/server/Main.scala | 1 + .../functions/GetPartitioningMeasuresV2.scala | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 32ddbf835..360ec7758 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -59,6 +59,7 @@ object Main extends ZIOAppDefault with Server { WriteCheckpointV2.layer, GetPartitioningCheckpointV2.layer, GetFlowCheckpoints.layer, + GetPartitioningMeasuresV2.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala index 5325908b7..43424b1e0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala @@ -17,16 +17,32 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus -import zio.Task +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio._ +import io.circe.syntax._ +import za.co.absa.atum.server.model.MeasureFromDB class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[Long, CheckpointFromDB, Task](input => - Seq(fr"${input}") + extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values => + Seq(fr"${values}") ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns") +} + +object GetPartitioningMeasuresV2 { + val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasuresV2] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetPartitioningMeasuresV2()(Runs, dbProvider.dbEngine) + } } From a4341813ab06a7735386274339df5db25d36c3be Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 27 Aug 2024 11:49:42 +0200 Subject: [PATCH 06/30] removing unused import --- .../api/database/runs/functions/GetPartitioningMeasuresV2.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala index 43424b1e0..23fdfd2d3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala @@ -25,7 +25,6 @@ import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWith import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ -import io.circe.syntax._ import za.co.absa.atum.server.model.MeasureFromDB From b8dfddf4d8fd9abb700dee8650fb0e086da39bc0 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 08:14:52 +0200 Subject: [PATCH 07/30] adding integration tests --- ...V1.9.3__get_partitioning_checkpoint_v2.sql | 108 ------------------ ...1.9.5__get_partitioning_measures_by_id.sql | 51 --------- ...titioningMeasuresByIdIntegrationTest.scala | 64 +++++++++++ .../scala/za/co/absa/atum/server/Main.scala | 1 - .../functions/GetPartitioningMeasuresV2.scala | 1 + ...artitioningMeasuresV2IntegrationTest.scala | 32 ++++++ 6 files changed, 97 insertions(+), 160 deletions(-) delete mode 100644 database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala diff --git a/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql b/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql deleted file mode 100644 index 4adcd809b..000000000 --- a/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoint_v2( - IN i_partitioning_id BIGINT, - IN i_checkpoint_id UUID, - OUT status INTEGER, - OUT status_text TEXT, - OUT id_checkpoint UUID, - OUT checkpoint_name TEXT, - OUT author TEXT, - OUT measured_by_atum_agent BOOLEAN, - OUT measure_name TEXT, - OUT measured_columns TEXT[], - OUT measurement_value JSONB, - OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE -) - RETURNS SETOF record AS -$$ - ------------------------------------------------------------------------------- --- --- Function: runs.get_partitioning_checkpoint_v2(BIGINT, UUID) --- Retrieves a single checkpoint (measures and their measurement details) related to a --- given partitioning and checkpoint ID. --- --- Parameters: --- i_partitioning_id - ID of the partitioning --- i_checkpoint_id - ID of the checkpoint --- --- Returns: --- status - Status code --- status_text - Status message --- id_checkpoint - ID of the checkpoint --- checkpoint_name - Name of the checkpoint --- author - Author of the checkpoint --- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent --- measure_name - Name of the measure --- measure_columns - Columns of the measure --- measurement_value - Value of the measurement --- checkpoint_start_time - Time of the checkpoint --- checkpoint_end_time - End time of the checkpoint computation --- --- Status codes: --- 11 - OK --- 41 - Partitioning not found --- 42 - Checkpoint not found --- -------------------------------------------------------------------------------- -BEGIN - PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; - IF NOT FOUND THEN - status := 41; - status_text := 'Partitioning not found'; - RETURN NEXT; - RETURN; - END IF; - - RETURN QUERY - SELECT - 11 AS status, - 'Ok' AS status_text, - C.id_checkpoint, - C.checkpoint_name, - C.created_by AS author, - C.measured_by_atum_agent, - md.measure_name, - md.measured_columns, - M.measurement_value, - C.process_start_time AS checkpoint_start_time, - C.process_end_time AS checkpoint_end_time - FROM - runs.checkpoints C - JOIN - runs.measurements M ON C.id_checkpoint = M.fk_checkpoint - JOIN - runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - WHERE - C.fk_partitioning = i_partitioning_id - AND - C.id_checkpoint = i_checkpoint_id; - - IF NOT FOUND THEN - status := 42; - status_text := 'Checkpoint not found'; - RETURN NEXT; - RETURN; - END IF; -END; -$$ - -LANGUAGE plpgsql VOLATILE SECURITY DEFINER; - -ALTER FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) OWNER TO atum_owner; - -GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) TO atum_owner; diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql index 6a786d52d..3d4af8d6a 100644 --- a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -13,54 +13,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id( - IN i_partitioning_id BIGINT, - OUT status INTEGER, - OUT status_text TEXT, - OUT measure_name TEXT, - OUT measured_columns TEXT[] -) RETURNS SETOF record AS -$$ -------------------------------------------------------------------------------- --- --- Function: runs.get_partitioning_measures_by_id(1) --- Returns measures for the given partitioning id --- --- Parameters: --- i_partitioning_id - partitioning id we are asking the measures for --- --- Returns: --- status - Status code --- status_text - Status message --- measure_name - Name of the measure --- measured_columns - Array of columns associated with the measure --- --- Status codes: --- 11 - OK --- 41 - Measures not found // Todo discus with the team --- -------------------------------------------------------------------------------- -BEGIN - - RETURN QUERY - SELECT MD.measure_name, MD.measured_columns - FROM runs.measure_definitions AS MD - WHERE MD.fk_partitioning = i_partitioning_id; - - IF FOUND THEN - status := 11; - status_text := 'Measures found'; - ELSE - status := 41; - status_text := 'Measures not found'; - END IF; - - RETURN NEXT; - RETURN; -END; -$$ -LANGUAGE plpgsql volatile SECURITY DEFINER; - -ALTER FUNCTION runs.get_partitioning_measures_by_id(BIGINT) OWNER TO atum_owner; -GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures_by_id(BIGINT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala new file mode 100644 index 000000000..0caca71d5 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala @@ -0,0 +1,64 @@ +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString +import za.co.absa.balta.classes.setter.CustomDBType + +class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { + private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id" + + test("Get partitioning measures by id should return partitioning measures for partitioning with measures") { + val partitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValues": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", "Thomas") + ) + + val fkPartitioning: Long = table("runs.partitionings") + .fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("created_by", "Thomas") + .add("measure_name", "measure1") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + ) + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("created_by", "Thomas") + .add("measure_name", "measure2") + .add("measured_columns", CustomDBType("""{"col2"}""", "TEXT[]")) + ) + + function(fncGetPartitioningMeasuresById) + .setParam("i_partitioning", fkPartitioning) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(11)) + assert(results.getString("status_text").contains("OK")) + assert(results.getString("measure_name").contains("measure1")) + assert(results.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + + val results2 = queryResult.next() + assert(results2.getInt("status").contains(11)) + assert(results2.getString("status_text").contains("OK")) + assert(results2.getString("measure_name").contains("measure2")) + assert(results2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + } + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 360ec7758..32ddbf835 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -59,7 +59,6 @@ object Main extends ZIOAppDefault with Server { WriteCheckpointV2.layer, GetPartitioningCheckpointV2.layer, GetFlowCheckpoints.layer, - GetPartitioningMeasuresV2.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala index 23fdfd2d3..c9a1cd8f2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala @@ -27,6 +27,7 @@ import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ import za.co.absa.atum.server.model.MeasureFromDB +import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values => diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala new file mode 100644 index 000000000..dc07c21f4 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala @@ -0,0 +1,32 @@ +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import zio.interop.catz.asyncInstance +import za.co.absa.db.fadb.status.FunctionStatus +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.{Scope, ZIO} + +object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + suite("GetPartitioningMeasuresSuite")( + test("Returns expected sequence of Measures with existing partitioning") { + val partitioningID: Long = 1L + + for { + getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresV2] + result <- getPartitioningMeasuresV2(partitioningID) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "No measures found")))) + } + ).provide( + GetPartitioningMeasuresV2.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + } + +} From a95ed412dffe8c45c88823b30fb0d409c6a4b9ef Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 09:26:12 +0200 Subject: [PATCH 08/30] restoring db-function --- ...1.9.5__get_partitioning_measures_by_id.sql | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql index 3d4af8d6a..222a2f6f3 100644 --- a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -13,3 +13,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id( + IN i_partitioning_id BIGINT, + OUT status INTEGER, + OUT status_text TEXT, + OUT measure_name TEXT, + OUT measured_columns TEXT[] +) RETURNS SETOF record AS +$$ +------------------------------------------------------------------------------- +-- +-- Function: runs.get_partitioning_measures_by_id(1) +-- Returns measures for the given partitioning id +-- +-- Parameters: +-- i_partitioning_id - partitioning id we are asking the measures for +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- measure_name - Name of the measure +-- measured_columns - Array of columns associated with the measure +-- +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- 42 - Measures not found +-- +------------------------------------------------------------------------------- +BEGIN + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + + IF NOT FOUND THEN + status := 41; + status_text := 'Partitioning not found'; + RETURN NEXT; + RETURN; + END IF; + + RETURN QUERY + SELECT MD.measure_name, MD.measured_columns + FROM runs.measure_definitions AS MD + WHERE MD.fk_partitioning = i_partitioning_id; + + IF FOUND THEN + status := 11; + status_text := 'Measures found'; + ELSE + status := 42; + status_text := 'Measures not found'; + END IF; + + RETURN NEXT; + RETURN; + +END; +$$ +LANGUAGE plpgsql volatile SECURITY DEFINER; + +ALTER FUNCTION runs.get_partitioning_measures_by_id(BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures_by_id(BIGINT) TO atum_user; From 857dd021f596e45aa5ce3c1c133f37b51f4107b3 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 16:10:31 +0200 Subject: [PATCH 09/30] Adding test and fixing the build --- ...titioningMeasuresByIdIntegrationTest.scala | 2 +- .../scala/za/co/absa/atum/server/Main.scala | 1 + .../functions/GetPartitioningMeasuresV2.scala | 4 +-- .../repository/PartitioningRepository.scala | 2 ++ .../PartitioningRepositoryImpl.scala | 34 +++++++++---------- ...artitioningMeasuresV2IntegrationTest.scala | 2 +- .../PartitioningRepositoryUnitTests.scala | 11 +++++- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala index 0caca71d5..46904311b 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala @@ -46,7 +46,7 @@ class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { ) function(fncGetPartitioningMeasuresById) - .setParam("i_partitioning", fkPartitioning) + .setParam("i_partitioning", fkPartitioning: Long) .execute { queryResult => val results = queryResult.next() assert(results.getInt("status").contains(11)) diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 32ddbf835..b6b06f348 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -52,6 +52,7 @@ object Main extends ZIOAppDefault with Server { FlowRepositoryImpl.layer, CreatePartitioningIfNotExists.layer, GetPartitioningMeasures.layer, + GetPartitioningMeasuresV2.layer, GetPartitioningAdditionalData.layer, CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala index c9a1cd8f2..0bedfceb4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala @@ -32,9 +32,7 @@ import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values => Seq(fr"${values}") - ) - with StandardStatusHandling - with ByFirstErrorStatusAggregator { + ) with StandardStatusHandling with ByFirstErrorStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns") } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 52e7c6c02..95750353d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -40,4 +40,6 @@ trait PartitioningRepository { def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Unit] def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] + + def getPartitioningMeasuresV2(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 68ff69276..a0f10a6d3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -16,22 +16,9 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.{ - InitialAdditionalDataDTO, - AdditionalDataSubmitDTO, - CheckpointQueryDTO, - MeasureDTO, - PartitioningDTO, - PartitioningSubmitDTO -} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointQueryDTO, InitialAdditionalDataDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO} import za.co.absa.atum.server.model.MeasureFromDB -import za.co.absa.atum.server.api.database.runs.functions.{ - CreateOrUpdateAdditionalData, - CreatePartitioningIfNotExists, - GetPartitioningAdditionalData, - GetPartitioningCheckpoints, - GetPartitioningMeasures -} +import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures, GetPartitioningMeasuresV2} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -43,7 +30,8 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresFn: GetPartitioningMeasures, getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, - getPartitioningCheckpointsFn: GetPartitioningCheckpoints + getPartitioningCheckpointsFn: GetPartitioningCheckpoints, + getPartitioningMeasuresByIdFn: GetPartitioningMeasuresV2 ) extends PartitioningRepository with BaseRepository { @@ -83,6 +71,13 @@ class PartitioningRepositoryImpl( ) } + override def getPartitioningMeasuresV2(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = { + dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresByIdFn(partitioningId), "getPartitioningMeasures") + .map(_.map { case MeasureFromDB(measureName, measuredColumns) => + MeasureDTO(measureName.get, measuredColumns.get) + }) + } + } object PartitioningRepositoryImpl { @@ -91,7 +86,8 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasures with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData - with GetPartitioningCheckpoints, + with GetPartitioningCheckpoints + with GetPartitioningMeasuresV2, PartitioningRepository ] = ZLayer { for { @@ -100,12 +96,14 @@ object PartitioningRepositoryImpl { getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] + getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresV2] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, - getPartitioningCheckpoints + getPartitioningCheckpoints, + getPartitioningMeasuresV2 ) } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala index dc07c21f4..cfae9fef3 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala @@ -20,7 +20,7 @@ object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { for { getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresV2] result <- getPartitioningMeasuresV2(partitioningID) - } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "No measures found")))) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( GetPartitioningMeasuresV2.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index c1174e3a2..b08638f2e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -87,6 +87,14 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) + private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresV2]) + + // when(getPartitioningMeasuresV2Mock.apply(2L)) + // .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + + private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { suite("PartitioningRepositorySuite")( @@ -181,7 +189,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresMockLayer, getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, - getPartitioningCheckpointsMockLayer + getPartitioningCheckpointsMockLayer, + getPartitioningMeasuresV2MockLayer ) } From 880b4353adcc04972ca1b1bbb50ae2811ea5c8b6 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 16:48:59 +0200 Subject: [PATCH 10/30] Adding tests layer and updating sql function --- ...1.9.5__get_partitioning_measures_by_id.sql | 2 +- ...titioningMeasuresByIdIntegrationTest.scala | 62 ++++++++++++++----- .../scala/za/co/absa/atum/server/Main.scala | 2 +- ...cala => GetPartitioningMeasuresById.scala} | 8 +-- .../PartitioningRepositoryImpl.scala | 8 +-- ...itioningMeasuresByIdIntegrationTest.scala} | 6 +- .../PartitioningRepositoryUnitTests.scala | 2 +- 7 files changed, 61 insertions(+), 29 deletions(-) rename server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/{GetPartitioningMeasuresV2.scala => GetPartitioningMeasuresById.scala} (88%) rename server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/{GetPartitioningMeasuresV2IntegrationTest.scala => GetPartitioningMeasuresByIdIntegrationTest.scala} (88%) diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql index 222a2f6f3..66f503266 100644 --- a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -60,7 +60,7 @@ BEGIN IF FOUND THEN status := 11; - status_text := 'Measures found'; + status_text := 'OK'; ELSE status := 42; status_text := 'Measures not found'; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala index 46904311b..7e145b13a 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala @@ -7,21 +7,22 @@ import za.co.absa.balta.classes.setter.CustomDBType class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id" + private val partitioning: JsonBString = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValues": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + test("Get partitioning measures by id should return partitioning measures for partitioning with measures") { - val partitioning = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["key1", "key3", "key2", "key4"], - | "keysToValues": { - | "key1": "valueX", - | "key2": "valueY", - | "key3": "valueZ", - | "key4": "valueA" - | } - |} - |""".stripMargin - ) table("runs.partitionings").insert( add("partitioning", partitioning) @@ -46,7 +47,7 @@ class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { ) function(fncGetPartitioningMeasuresById) - .setParam("i_partitioning", fkPartitioning: Long) + .setParam("i_partitioning", fkPartitioning) .execute { queryResult => val results = queryResult.next() assert(results.getInt("status").contains(11)) @@ -61,4 +62,35 @@ class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { assert(results2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) } } + + test("Get partitioning measures by id should return error for partitioning without measures") { + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", "Thomas") + ) + + val fkPartitioning: Long = table("runs.partitionings") + .fieldValue("partitioning", partitioning, "id_partitioning").get.get + + function(fncGetPartitioningMeasuresById) + .setParam("i_partitioning", fkPartitioning) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(42)) + assert(results.getString("status_text").contains("Measures not found")) + } + } + + test("Get partitioning measures by id should return an error for non-existing partitioning") { + + function(fncGetPartitioningMeasuresById) + .setParam("i_partitioning", 999) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(41)) + assert(results.getString("status_text").contains("Partitioning not found")) + } + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index b6b06f348..a61d38fe5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -52,7 +52,7 @@ object Main extends ZIOAppDefault with Server { FlowRepositoryImpl.layer, CreatePartitioningIfNotExists.layer, GetPartitioningMeasures.layer, - GetPartitioningMeasuresV2.layer, + GetPartitioningMeasuresById.layer, GetPartitioningAdditionalData.layer, CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresById.scala similarity index 88% rename from server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala rename to server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresById.scala index 0bedfceb4..5fc9b198b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresById.scala @@ -29,7 +29,7 @@ import za.co.absa.atum.server.model.MeasureFromDB import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get -class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) +class GetPartitioningMeasuresById(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values => Seq(fr"${values}") ) with StandardStatusHandling with ByFirstErrorStatusAggregator { @@ -37,10 +37,10 @@ class GetPartitioningMeasuresV2(implicit schema: DBSchema, dbEngine: DoobieEngin override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns") } -object GetPartitioningMeasuresV2 { - val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasuresV2] = ZLayer { +object GetPartitioningMeasuresById { + val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasuresById] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] - } yield new GetPartitioningMeasuresV2()(Runs, dbProvider.dbEngine) + } yield new GetPartitioningMeasuresById()(Runs, dbProvider.dbEngine) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index a0f10a6d3..c1ce92eb4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointQueryDTO, InitialAdditionalDataDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO} import za.co.absa.atum.server.model.MeasureFromDB -import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures, GetPartitioningMeasuresV2} +import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures, GetPartitioningMeasuresById} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -31,7 +31,7 @@ class PartitioningRepositoryImpl( getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, getPartitioningCheckpointsFn: GetPartitioningCheckpoints, - getPartitioningMeasuresByIdFn: GetPartitioningMeasuresV2 + getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById ) extends PartitioningRepository with BaseRepository { @@ -87,7 +87,7 @@ object PartitioningRepositoryImpl { with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData with GetPartitioningCheckpoints - with GetPartitioningMeasuresV2, + with GetPartitioningMeasuresById, PartitioningRepository ] = ZLayer { for { @@ -96,7 +96,7 @@ object PartitioningRepositoryImpl { getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] - getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresV2] + getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, getPartitioningMeasures, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala similarity index 88% rename from server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala rename to server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala index cfae9fef3..dddea4925 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala @@ -9,7 +9,7 @@ import za.co.absa.db.fadb.status.FunctionStatus import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Scope, ZIO} -object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { +object GetPartitioningMeasuresByIdIntegrationTest extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -18,12 +18,12 @@ object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { val partitioningID: Long = 1L for { - getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresV2] + getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] result <- getPartitioningMeasuresV2(partitioningID) } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( - GetPartitioningMeasuresV2.layer, + GetPartitioningMeasuresById.layer, PostgresDatabaseProvider.layer, TestTransactorProvider.layerWithRollback ) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index b08638f2e..f9148f229 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -87,7 +87,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) - private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresV2]) + private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresById]) // when(getPartitioningMeasuresV2Mock.apply(2L)) // .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) From 10987911c266c84e1f8ecf944d5c657af49f5fd4 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 17:22:01 +0200 Subject: [PATCH 11/30] Implementing service functionality --- .../server/api/repository/PartitioningRepository.scala | 2 +- .../api/repository/PartitioningRepositoryImpl.scala | 2 +- .../atum/server/api/service/PartitioningService.scala | 2 ++ .../atum/server/api/service/PartitioningServiceImpl.scala | 8 ++++++++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 95750353d..2bd636f6e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -41,5 +41,5 @@ trait PartitioningRepository { def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] - def getPartitioningMeasuresV2(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] + def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index c1ce92eb4..82b0c4c3d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -71,7 +71,7 @@ class PartitioningRepositoryImpl( ) } - override def getPartitioningMeasuresV2(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = { + override def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = { dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresByIdFn(partitioningId), "getPartitioningMeasures") .map(_.map { case MeasureFromDB(measureName, measuredColumns) => MeasureDTO(measureName.get, measuredColumns.get) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 2dce04b51..56cb5a5cd 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -40,4 +40,6 @@ trait PartitioningService { def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ServiceError, Unit] def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] + + def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index eebb7a2ba..447edf43c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -73,6 +73,14 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) } yield checkpointDTOs } + + override def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] = { + repositoryCall( + partitioningRepository.getPartitioningMeasuresById(partitioningId), + "getPartitioningMeasuresById" + ) + } + } object PartitioningServiceImpl { From 32862a0dcd3c2402fff1f980a50114cead5bac57 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 17:33:28 +0200 Subject: [PATCH 12/30] Implementing Controller functionality --- .../api/controller/PartitioningController.scala | 4 ++++ .../api/controller/PartitioningControllerImpl.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 2dfc53016..ca46b0670 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -45,4 +45,8 @@ trait PartitioningController { def getPartitioningCheckpointsV2( checkpointQueryDTO: CheckpointQueryDTO ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] + + def getPartitioningMeasuresV2( + partitioningId: Long + ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 6fe9efff9..c6204e4d0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -77,6 +77,17 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getPartitioningMeasuresV2( + partitioningId: Long + ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] = { + mapToMultiSuccessResponse( + serviceCall[Seq[MeasureDTO], Seq[MeasureDTO]]( + partitioningService.getPartitioningMeasuresById(partitioningId), + identity + ) + ) + } + } object PartitioningControllerImpl { From 8457553bd32b32309a03cc20cba68670decd9569 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 17:36:34 +0200 Subject: [PATCH 13/30] fixing return type --- .../server/api/controller/PartitioningController.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index ca46b0670..11edeb7d5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -16,13 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{ - AdditionalDataSubmitDTO, - AtumContextDTO, - CheckpointDTO, - CheckpointQueryDTO, - PartitioningSubmitDTO -} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, CheckpointQueryDTO, MeasureDTO, PartitioningSubmitDTO} import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio.IO @@ -48,5 +42,5 @@ trait PartitioningController { def getPartitioningMeasuresV2( partitioningId: Long - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] + ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] } From 6a691b41a1812c71840be3b34e74f91b004fd7c5 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 18:53:24 +0200 Subject: [PATCH 14/30] fixing merge conflicts --- .../server/api/repository/PartitioningRepositoryImpl.scala | 4 +--- .../api/repository/PartitioningRepositoryUnitTests.scala | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index c109f81e1..ace5d5ba0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -29,8 +29,7 @@ class PartitioningRepositoryImpl( getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, getPartitioningCheckpointsFn: GetPartitioningCheckpoints, - getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2 - getPartitioningCheckpointsFn: GetPartitioningCheckpoints, + getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById ) extends PartitioningRepository with BaseRepository { @@ -117,7 +116,6 @@ object PartitioningRepositoryImpl { createOrUpdateAdditionalData, getPartitioningCheckpoints, getPartitioningAdditionalDataV2, - getPartitioningCheckpoints, getPartitioningMeasuresV2 ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 94de480b4..0438dc178 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -226,7 +226,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { createOrUpdateAdditionalDataMockLayer, getPartitioningCheckpointsMockLayer, getPartitioningAdditionalDataV2MockLayer, - getPartitioningCheckpointsMockLayer, getPartitioningMeasuresV2MockLayer ) From 74a8c45b2f8c9dc01d30b79d2ebf61b8c80f63b6 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 19:13:29 +0200 Subject: [PATCH 15/30] Adding the endpoint --- .../za/co/absa/atum/server/api/http/BaseEndpoints.scala | 3 ++- .../scala/za/co/absa/atum/server/api/http/Endpoints.scala | 8 ++++++++ .../scala/za/co/absa/atum/server/api/http/Routes.scala | 4 +++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index 15919c240..c15d03578 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -71,7 +71,8 @@ trait BaseEndpoints { oneOf[ErrorResponse]( badRequestOneOfVariant, generalErrorOneOfVariant, - internalServerErrorOneOfVariant + internalServerErrorOneOfVariant, + notFoundErrorOneOfVariant ) ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 0ec3ab8f5..072dd5f1e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -114,6 +114,14 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) } + protected val getPartitioningMeasuresV2 + : PublicEndpoint[Long, ErrorResponse, MultiSuccessResponse[MeasureDTO], Any] = { + apiV2.get + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Measures) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[MultiSuccessResponse[MeasureDTO]]) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index c0b64d061..c770380e1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -78,6 +78,7 @@ trait Routes extends Endpoints with ServerOptions { ), createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), + createServerEndpoint(getPartitioningMeasuresV2, PartitioningController.getPartitioningMeasuresV2), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -95,7 +96,8 @@ trait Routes extends Endpoints with ServerOptions { createOrUpdateAdditionalDataEndpointV2, getPartitioningCheckpointsEndpointV2, getPartitioningCheckpointEndpointV2, - getFlowCheckpointsEndpointV2 + getFlowCheckpointsEndpointV2, + getPartitioningMeasuresV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) From 415739f41e80136a4f8eceab0ba2860b76c42d75 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 19:35:17 +0200 Subject: [PATCH 16/30] Adding repository unit test --- .../PartitioningRepositoryUnitTests.scala | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 0438dc178..0ff57376b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -103,8 +103,13 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresById]) - // when(getPartitioningMeasuresV2Mock.apply(2L)) - // .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + when(getPartitioningMeasuresV2Mock.apply(1L)).thenReturn( + ZIO.right(Seq(Row(FunctionStatus(0, "success"), measureFromDB1), Row(FunctionStatus(0, "success"), measureFromDB2)))) + when(getPartitioningMeasuresV2Mock.apply(2L)) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + when(getPartitioningMeasuresV2Mock.apply(3L)) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(42, "Measures not found")))) + when(getPartitioningMeasuresV2Mock.apply(4L)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) @@ -217,6 +222,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundDatabaseError] ) } + ), + suite("GetPartitioningMeasuresByIdSuite")( + test("Returns expected Seq") { + for { + result <- PartitioningRepository.getPartitioningMeasuresById(1L) + } yield assertTrue(result == Seq(measureDTO1, measureDTO2)) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getPartitioningMeasuresById(2L).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getPartitioningMeasuresById(3L).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected DatabaseError") { + assertZIO(PartitioningRepository.getPartitioningMeasuresById(4L).exit)( + failsWithA[GeneralDatabaseError] + ) + } ) ).provide( PartitioningRepositoryImpl.layer, From 78132bb00d7cc2d3576447af531bd5a5c85ddcc2 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 28 Aug 2024 19:57:41 +0200 Subject: [PATCH 17/30] Adding service unit test --- .../PartitioningServiceUnitTests.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index f87fa1aa3..88e2c22a2 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -62,6 +62,13 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(2L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(1L)) + .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(2L)) + .thenReturn(ZIO.fail(NotFoundDatabaseError("boom!"))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(3L)) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -156,6 +163,23 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[ServiceError] ) } + ), + suite("GetPartitioningMeasuresByIdSuite")( + test("Returns expected Right with Seq[MeasureDTO]") { + for { + result <- PartitioningService.getPartitioningMeasuresById(1L) + } yield assertTrue(result == Seq(measureDTO1, measureDTO2)) + }, + test("Returns expected ServiceError") { + assertZIO(PartitioningService.getPartitioningMeasuresById(2L).exit)( + failsWithA[NotFoundServiceError] + ) + }, + test("Returns expected ServiceError") { + assertZIO(PartitioningService.getPartitioningMeasuresById(3L).exit)( + failsWithA[GeneralServiceError] + ) + } ) ).provide( PartitioningServiceImpl.layer, From 6a6786dbc77d52ddbbba27b31ce929758fb16204 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 30 Aug 2024 11:28:46 +0200 Subject: [PATCH 18/30] Fixing merge conflicts --- .../atum/server/api/repository/PartitioningRepositoryImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index aabbd89c8..556621d5b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PartitioningFromDB} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import zio._ From a6f627cf413b137bb6c741355bf3f57e09b07635 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 30 Aug 2024 11:43:27 +0200 Subject: [PATCH 19/30] Fixing fileName conflicts --- ...scala => GetPartitioningMeasuresByIdV2IntegrationTest.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename database/src/test/scala/za/co/absa/atum/database/runs/{GetPartitioningMeasuresByIdIntegrationTest.scala => GetPartitioningMeasuresByIdV2IntegrationTest.scala} (97%) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala similarity index 97% rename from database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala rename to database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala index 7e145b13a..bedc70edd 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdIntegrationTest.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala @@ -4,7 +4,7 @@ import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import za.co.absa.balta.classes.setter.CustomDBType -class GetPartitioningMeasuresByIdIntegrationTest extends DBTestSuite { +class GetPartitioningMeasuresByIdV2IntegrationTest extends DBTestSuite { private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id" private val partitioning: JsonBString = JsonBString( From 2df573aca443d8c69e74b1aff0e552ba68c48b46 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 30 Aug 2024 11:46:31 +0200 Subject: [PATCH 20/30] removing unused file --- .../co/absa/atum/model/dto/MeasureWithIdDTO.scala | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala deleted file mode 100644 index 5386ee58d..000000000 --- a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureWithIdDTO.scala +++ /dev/null @@ -1,14 +0,0 @@ -package za.co.absa.atum.model.dto - -import io.circe._ -import io.circe.generic.semiauto._ - -case class MeasureWithIdDTO ( - measureName: String, - measuredColumns: Seq[String] -) - -object MeasureWithIdDTO { - implicit val decodeMeasureWithIdDTO: Decoder[MeasureWithIdDTO] = deriveDecoder[MeasureWithIdDTO] - implicit val encoderMeasureWithIdDTO: Encoder[MeasureWithIdDTO] = deriveEncoder[MeasureWithIdDTO] -} From 02dbe1771dac0a3c96bd7b7baa499b07f9c2ce3c Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 09:54:42 +0200 Subject: [PATCH 21/30] Changing file name --- ...est.scala => GetPartitioningMeasuresV2IntegrationTest.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/{GetPartitioningMeasuresByIdIntegrationTest.scala => GetPartitioningMeasuresV2IntegrationTest.scala} (93%) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala similarity index 93% rename from server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala rename to server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala index dddea4925..344d6d1c1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdIntegrationTest.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala @@ -9,7 +9,7 @@ import za.co.absa.db.fadb.status.FunctionStatus import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Scope, ZIO} -object GetPartitioningMeasuresByIdIntegrationTest extends ConfigProviderTest { +object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { From 760eac313542d62c8417508f55f93770fe00b5c4 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 10:58:14 +0200 Subject: [PATCH 22/30] fixing file format --- ...cala => GetPartitioningMeasuresByIdV2IntegrationTests.scala} | 2 +- ...cala => GetPartitioningMeasuresByIdV2IntegrationTests.scala} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename database/src/test/scala/za/co/absa/atum/database/runs/{GetPartitioningMeasuresByIdV2IntegrationTest.scala => GetPartitioningMeasuresByIdV2IntegrationTests.scala} (97%) rename server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/{GetPartitioningMeasuresV2IntegrationTest.scala => GetPartitioningMeasuresByIdV2IntegrationTests.scala} (93%) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala similarity index 97% rename from database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala rename to database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala index bedc70edd..affa46fe0 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTest.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala @@ -4,7 +4,7 @@ import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import za.co.absa.balta.classes.setter.CustomDBType -class GetPartitioningMeasuresByIdV2IntegrationTest extends DBTestSuite { +class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite { private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id" private val partitioning: JsonBString = JsonBString( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdV2IntegrationTests.scala similarity index 93% rename from server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala rename to server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdV2IntegrationTests.scala index 344d6d1c1..b6f415332 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresV2IntegrationTest.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresByIdV2IntegrationTests.scala @@ -9,7 +9,7 @@ import za.co.absa.db.fadb.status.FunctionStatus import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Scope, ZIO} -object GetPartitioningMeasuresV2IntegrationTest extends ConfigProviderTest { +object GetPartitioningMeasuresByIdV2IntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { From 448cb33dcc9b03e202c7cb62c20b9f57a221bb6e Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 13:41:48 +0200 Subject: [PATCH 23/30] Removing outdated tests --- ...itioningCheckpointV2IntegrationTests.scala | 167 ------------------ 1 file changed, 167 deletions(-) delete mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala deleted file mode 100644 index 123dff16d..000000000 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.database.runs - -import za.co.absa.balta.DBTestSuite -import za.co.absa.balta.classes.JsonBString -import za.co.absa.balta.classes.setter.CustomDBType - -import java.time.OffsetDateTime -import java.util.UUID - -class GetPartitioningCheckpointV2IntegrationTests extends DBTestSuite { - - private val fncGetPartitioningCheckpointV2 = "runs.get_partitioning_checkpoint_v2" - - case class MeasuredDetails( - measureName: String, - measureColumns: Seq[String], - measurementValue: JsonBString - ) - - private val partitioning1 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["keyX", "keyY", "keyZ"], - | "keysToValues": { - | "keyX": "value1", - | "keyZ": "value3", - | "keyY": "value2" - | } - |} - |""".stripMargin - ) - - private val partitioning2 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["key1", "key3", "key2", "key4"], - | "keysToValues": { - | "key1": "valueX", - | "key2": "valueY", - | "key3": "valueZ", - | "key4": "valueA" - | } - |} - |""".stripMargin - ) - - private val measurement1 = JsonBString("""1""".stripMargin) - - private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]") - - test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") { - - val uuid = UUID.randomUUID - val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val id_measure_definition: Long = 1 - - table("runs.partitionings").insert( - add("partitioning", partitioning1) - .add("created_by", "Daniel") - ) - - val fkPartitioning1: Long = table("runs.partitionings") - .fieldValue("partitioning", partitioning1, "id_partitioning") - .get - .get - - table("runs.checkpoints").insert( - add("id_checkpoint", uuid) - .add("fk_partitioning", fkPartitioning1) - .add("checkpoint_name", "checkpoint_1") - .add("process_start_time", startTime) - .add("process_end_time", endTime) - .add("measured_by_atum_agent", true) - .add("created_by", "Daniel") - ) - - table("runs.measure_definitions").insert( - add("id_measure_definition", id_measure_definition) - .add("fk_partitioning", fkPartitioning1) - .add("created_by", "Daniel") - .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) - ) - - table("runs.measurements").insert( - add("fk_checkpoint", uuid) - .add("fk_measure_definition", id_measure_definition) - .add("measurement_value", measurement1) - ) - - function(fncGetPartitioningCheckpointV2) - .setParam("i_partitioning_id", fkPartitioning1) - .setParam("i_checkpoint_id", uuid) - .execute { queryResult => - assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) - assert(results.getString("measure_name").contains("measure_1")) - assert(!queryResult.hasNext) - } - } - - test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") { - - table("runs.partitionings").insert( - add("partitioning", partitioning2) - .add("created_by", "Daniel") - ) - - val fkPartitioning2: Long = table("runs.partitionings") - .fieldValue("partitioning", partitioning2, "id_partitioning") - .get - .get - - function(fncGetPartitioningCheckpointV2) - .setParam("i_partitioning_id", fkPartitioning2) - .setParam("i_checkpoint_id", UUID.randomUUID()) - .execute { queryResult => - assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(42)) - assert(results.getString("status_text").contains("Checkpoint not found")) - } - - } - - test("Get partitioning checkpoints no checkpoints non-existent partitionings") { - - function(fncGetPartitioningCheckpointV2) - .setParam("i_partitioning_id", 0L) - .setParam("i_checkpoint_id", UUID.randomUUID()) - .execute { queryResult => - assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(41)) - assert(results.getString("status_text").contains("Partitioning not found")) - } - - } - -} From bcfbe5b022e27ac4c3792e0d818785eb59458727 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 15:01:47 +0200 Subject: [PATCH 24/30] Fixing the function and the test cases --- .../V1.9.5__get_partitioning_measures_by_id.sql | 13 ++----------- ...PartitioningMeasuresByIdV2IntegrationTests.scala | 11 +++++------ 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql index 66f503266..ec5f9b53e 100644 --- a/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql +++ b/database/src/main/postgres/runs/V1.9.5__get_partitioning_measures_by_id.sql @@ -14,6 +14,7 @@ * limitations under the License. */ +-- Function: runs.get_partitioning_measures_by_id(Long) CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id( IN i_partitioning_id BIGINT, OUT status INTEGER, @@ -39,7 +40,6 @@ $$ -- Status codes: -- 11 - OK -- 41 - Partitioning not found --- 42 - Measures not found -- ------------------------------------------------------------------------------- BEGIN @@ -54,19 +54,10 @@ BEGIN END IF; RETURN QUERY - SELECT MD.measure_name, MD.measured_columns + SELECT 11, 'OK', MD.measure_name, MD.measured_columns FROM runs.measure_definitions AS MD WHERE MD.fk_partitioning = i_partitioning_id; - IF FOUND THEN - status := 11; - status_text := 'OK'; - ELSE - status := 42; - status_text := 'Measures not found'; - END IF; - - RETURN NEXT; RETURN; END; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala index affa46fe0..fc05d65dc 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala @@ -47,7 +47,7 @@ class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite { ) function(fncGetPartitioningMeasuresById) - .setParam("i_partitioning", fkPartitioning) + .setParam("i_partitioning_id", fkPartitioning) .execute { queryResult => val results = queryResult.next() assert(results.getInt("status").contains(11)) @@ -74,18 +74,17 @@ class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite { .fieldValue("partitioning", partitioning, "id_partitioning").get.get function(fncGetPartitioningMeasuresById) - .setParam("i_partitioning", fkPartitioning) + .setParam(fkPartitioning) .execute { queryResult => - val results = queryResult.next() - assert(results.getInt("status").contains(42)) - assert(results.getString("status_text").contains("Measures not found")) + val results = queryResult.hasNext + assert(!results) } } test("Get partitioning measures by id should return an error for non-existing partitioning") { function(fncGetPartitioningMeasuresById) - .setParam("i_partitioning", 999) + .setParam(999) .execute { queryResult => val results = queryResult.next() assert(results.getInt("status").contains(41)) From fdcdb7e8e1e0381d0c13aa1b9b2df232e680f14e Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 17:11:12 +0200 Subject: [PATCH 25/30] Restoring files --- ...V1.9.3__get_partitioning_checkpoint_v2.sql | 108 +++++++++++ ...itioningCheckpointV2IntegrationTests.scala | 167 ++++++++++++++++++ 2 files changed, 275 insertions(+) create mode 100644 database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala diff --git a/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql b/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql new file mode 100644 index 000000000..4adcd809b --- /dev/null +++ b/database/src/main/postgres/runs/V1.9.3__get_partitioning_checkpoint_v2.sql @@ -0,0 +1,108 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoint_v2( + IN i_partitioning_id BIGINT, + IN i_checkpoint_id UUID, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_checkpoint UUID, + OUT checkpoint_name TEXT, + OUT author TEXT, + OUT measured_by_atum_agent BOOLEAN, + OUT measure_name TEXT, + OUT measured_columns TEXT[], + OUT measurement_value JSONB, + OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE +) + RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_partitioning_checkpoint_v2(BIGINT, UUID) +-- Retrieves a single checkpoint (measures and their measurement details) related to a +-- given partitioning and checkpoint ID. +-- +-- Parameters: +-- i_partitioning_id - ID of the partitioning +-- i_checkpoint_id - ID of the checkpoint +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- id_checkpoint - ID of the checkpoint +-- checkpoint_name - Name of the checkpoint +-- author - Author of the checkpoint +-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent +-- measure_name - Name of the measure +-- measure_columns - Columns of the measure +-- measurement_value - Value of the measurement +-- checkpoint_start_time - Time of the checkpoint +-- checkpoint_end_time - End time of the checkpoint computation +-- +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- 42 - Checkpoint not found +-- +------------------------------------------------------------------------------- +BEGIN + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Partitioning not found'; + RETURN NEXT; + RETURN; + END IF; + + RETURN QUERY + SELECT + 11 AS status, + 'Ok' AS status_text, + C.id_checkpoint, + C.checkpoint_name, + C.created_by AS author, + C.measured_by_atum_agent, + md.measure_name, + md.measured_columns, + M.measurement_value, + C.process_start_time AS checkpoint_start_time, + C.process_end_time AS checkpoint_end_time + FROM + runs.checkpoints C + JOIN + runs.measurements M ON C.id_checkpoint = M.fk_checkpoint + JOIN + runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition + WHERE + C.fk_partitioning = i_partitioning_id + AND + C.id_checkpoint = i_checkpoint_id; + + IF NOT FOUND THEN + status := 42; + status_text := 'Checkpoint not found'; + RETURN NEXT; + RETURN; + END IF; +END; +$$ + +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) OWNER TO atum_owner; + +GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoint_v2(BIGINT, UUID) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala new file mode 100644 index 000000000..123dff16d --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointV2IntegrationTests.scala @@ -0,0 +1,167 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString +import za.co.absa.balta.classes.setter.CustomDBType + +import java.time.OffsetDateTime +import java.util.UUID + +class GetPartitioningCheckpointV2IntegrationTests extends DBTestSuite { + + private val fncGetPartitioningCheckpointV2 = "runs.get_partitioning_checkpoint_v2" + + case class MeasuredDetails( + measureName: String, + measureColumns: Seq[String], + measurementValue: JsonBString + ) + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyX", "keyY", "keyZ"], + | "keysToValues": { + | "keyX": "value1", + | "keyZ": "value3", + | "keyY": "value2" + | } + |} + |""".stripMargin + ) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValues": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + private val measurement1 = JsonBString("""1""".stripMargin) + + private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]") + + test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") { + + val uuid = UUID.randomUUID + val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + + val id_measure_definition: Long = 1 + + table("runs.partitionings").insert( + add("partitioning", partitioning1) + .add("created_by", "Daniel") + ) + + val fkPartitioning1: Long = table("runs.partitionings") + .fieldValue("partitioning", partitioning1, "id_partitioning") + .get + .get + + table("runs.checkpoints").insert( + add("id_checkpoint", uuid) + .add("fk_partitioning", fkPartitioning1) + .add("checkpoint_name", "checkpoint_1") + .add("process_start_time", startTime) + .add("process_end_time", endTime) + .add("measured_by_atum_agent", true) + .add("created_by", "Daniel") + ) + + table("runs.measure_definitions").insert( + add("id_measure_definition", id_measure_definition) + .add("fk_partitioning", fkPartitioning1) + .add("created_by", "Daniel") + .add("measure_name", "measure_1") + .add("measured_columns", measured_columns) + ) + + table("runs.measurements").insert( + add("fk_checkpoint", uuid) + .add("fk_measure_definition", id_measure_definition) + .add("measurement_value", measurement1) + ) + + function(fncGetPartitioningCheckpointV2) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoint_id", uuid) + .execute { queryResult => + assert(queryResult.hasNext) + val results = queryResult.next() + assert(results.getInt("status").contains(11)) + assert(results.getString("status_text").contains("Ok")) + assert(results.getString("checkpoint_name").contains("checkpoint_1")) + assert(results.getUUID("id_checkpoint").contains(uuid)) + assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) + assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(results.getJsonB("measurement_value").contains(measurement1)) + assert(results.getString("measure_name").contains("measure_1")) + assert(!queryResult.hasNext) + } + } + + test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") { + + table("runs.partitionings").insert( + add("partitioning", partitioning2) + .add("created_by", "Daniel") + ) + + val fkPartitioning2: Long = table("runs.partitionings") + .fieldValue("partitioning", partitioning2, "id_partitioning") + .get + .get + + function(fncGetPartitioningCheckpointV2) + .setParam("i_partitioning_id", fkPartitioning2) + .setParam("i_checkpoint_id", UUID.randomUUID()) + .execute { queryResult => + assert(queryResult.hasNext) + val results = queryResult.next() + assert(results.getInt("status").contains(42)) + assert(results.getString("status_text").contains("Checkpoint not found")) + } + + } + + test("Get partitioning checkpoints no checkpoints non-existent partitionings") { + + function(fncGetPartitioningCheckpointV2) + .setParam("i_partitioning_id", 0L) + .setParam("i_checkpoint_id", UUID.randomUUID()) + .execute { queryResult => + assert(queryResult.hasNext) + val results = queryResult.next() + assert(results.getInt("status").contains(41)) + assert(results.getString("status_text").contains("Partitioning not found")) + } + + } + +} From 94611e10d8b3b2ad859dda6d1d8d337efed775f4 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 20:08:06 +0200 Subject: [PATCH 26/30] implementing endpoint test-cases --- .../absa/atum/server/api/http/Endpoints.scala | 3 +- .../co/absa/atum/server/api/http/Routes.scala | 4 +- ...titioningMeasuresV2EndpointUnitTests.scala | 91 +++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 52fa0d844..3953147ba 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -125,12 +125,13 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } - protected val getPartitioningMeasuresV2 + protected val getPartitioningMeasuresEndpointV2 : PublicEndpoint[Long, ErrorResponse, MultiSuccessResponse[MeasureDTO], Any] = { apiV2.get .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Measures) .out(statusCode(StatusCode.Ok)) .out(jsonBody[MultiSuccessResponse[MeasureDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) } protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 1b8107da9..1a0ec8579 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -79,7 +79,7 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), - createServerEndpoint(getPartitioningMeasuresV2, PartitioningController.getPartitioningMeasuresV2), + createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -98,7 +98,7 @@ trait Routes extends Endpoints with ServerOptions { getPartitioningCheckpointsEndpointV2, getPartitioningCheckpointEndpointV2, getFlowCheckpointsEndpointV2, - getPartitioningMeasuresV2 + getPartitioningMeasuresEndpointV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala new file mode 100644 index 000000000..5ae8affd2 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.testing.SttpBackendStub +import sttp.client3._ +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import sttp.client3.circe._ +import sttp.model.StatusCode +import za.co.absa.atum.model.dto.MeasureDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{GeneralErrorResponse, NotFoundErrorResponse} +import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import zio._ +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertTrue} + +object GetPartitioningMeasuresV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getPartitioningMeasuresV2(1L)) + .thenReturn(ZIO.succeed(MultiSuccessResponse(Seq(measureDTO1, measureDTO2), uuid1))) + when(partitioningControllerMock.getPartitioningMeasuresV2(2L)) + .thenReturn(ZIO.fail(GeneralErrorResponse("error"))) + when(partitioningControllerMock.getPartitioningMeasuresV2(3L)) + .thenReturn(ZIO.fail(NotFoundErrorResponse("boom!"))) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getPartitioningMeasuresServerEndpoint = + getPartitioningMeasuresEndpointV2.zServerLogic(PartitioningController.getPartitioningMeasuresV2) + + def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getPartitioningMeasuresServerEndpoint) + .thenRunLogic() + .backend() + + def createBasicRequest(id: Long): RequestT[Identity, Either[ResponseException[String, io.circe.Error], MultiSuccessResponse[MeasureDTO]], Any] = { + basicRequest + .get(uri"https://test.com/api/v2/partitioningId/$id/measures") + .response(asJson[MultiSuccessResponse[MeasureDTO]]) + } + + suite("GetPartitioningMeasuresV2EndpointSuite")( + test("Returns expected MeasureDTO") { + for { + response <- createBasicRequest(1L).send(backendStub) + body <- ZIO.fromEither(response.body) + statusCode = response.code + } yield { + assertTrue(body.data == MultiSuccessResponse(Seq(measureDTO1, measureDTO2), uuid1).data, statusCode == StatusCode.Ok) + } + }, + test("Returns expected general error") { + for { + response <- createBasicRequest(2L).send(backendStub) + statusCode = response.code + } yield { + assertTrue(statusCode == StatusCode.InternalServerError) + } + }, + test("Returns expected not found error") { + for { + response <- createBasicRequest(3L).send(backendStub) + statusCode = response.code + } yield { + assertTrue(statusCode == StatusCode.NotFound) + } + } + ) + }.provide(partitioningControllerMockLayer) +} + From 8f5bdc1238a01cc8aecfaadcbe98eed8f15117bc Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 2 Sep 2024 20:31:44 +0200 Subject: [PATCH 27/30] Fixing endpoint tests --- .../http/GetPartitioningMeasuresV2EndpointUnitTests.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala index 5ae8affd2..0cc6b6d4c 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningMeasuresV2EndpointUnitTests.scala @@ -45,7 +45,9 @@ object GetPartitioningMeasuresV2EndpointUnitTests extends ZIOSpecDefault with En private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) private val getPartitioningMeasuresServerEndpoint = - getPartitioningMeasuresEndpointV2.zServerLogic(PartitioningController.getPartitioningMeasuresV2) + getPartitioningMeasuresEndpointV2.zServerLogic({partitioningId: Long => + PartitioningController.getPartitioningMeasuresV2(partitioningId) + }) def spec: Spec[TestEnvironment with Scope, Any] = { val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) @@ -55,7 +57,7 @@ object GetPartitioningMeasuresV2EndpointUnitTests extends ZIOSpecDefault with En def createBasicRequest(id: Long): RequestT[Identity, Either[ResponseException[String, io.circe.Error], MultiSuccessResponse[MeasureDTO]], Any] = { basicRequest - .get(uri"https://test.com/api/v2/partitioningId/$id/measures") + .get(uri"https://test.com/api/v2/partitionings/$id/measures") .response(asJson[MultiSuccessResponse[MeasureDTO]]) } @@ -74,7 +76,7 @@ object GetPartitioningMeasuresV2EndpointUnitTests extends ZIOSpecDefault with En response <- createBasicRequest(2L).send(backendStub) statusCode = response.code } yield { - assertTrue(statusCode == StatusCode.InternalServerError) + assertTrue(statusCode == StatusCode.BadRequest) } }, test("Returns expected not found error") { From bcce813c533817be421b3b9eb85964d3b08fab25 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Wed, 4 Sep 2024 09:55:41 +0200 Subject: [PATCH 28/30] Addressing GitHub comments --- .../scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala | 3 +-- .../main/scala/za/co/absa/atum/server/api/http/Routes.scala | 2 +- .../api/repository/PartitioningRepositoryUnitTests.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index c15d03578..15919c240 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -71,8 +71,7 @@ trait BaseEndpoints { oneOf[ErrorResponse]( badRequestOneOfVariant, generalErrorOneOfVariant, - internalServerErrorOneOfVariant, - notFoundErrorOneOfVariant + internalServerErrorOneOfVariant ) ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index cc4ee8eea..01e1ed6ec 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -25,7 +25,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, CheckpointV2DTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO} import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 0c78717e6..4e2bec8e1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -271,7 +271,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundDatabaseError] ) }, - test("Returns expected DatabaseError") { + test("Returns expected GeneralDatabaseError") { assertZIO(PartitioningRepository.getPartitioningMeasuresById(4L).exit)( failsWithA[GeneralDatabaseError] ) From 9556fd1cfc38a34c1ac46e45a728bde843412c6f Mon Sep 17 00:00:00 2001 From: TebaleloS <107194332+TebaleloS@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:12:10 +0200 Subject: [PATCH 29/30] Update database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- .../runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala index fc05d65dc..9ba39c454 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala @@ -76,8 +76,7 @@ class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite { function(fncGetPartitioningMeasuresById) .setParam(fkPartitioning) .execute { queryResult => - val results = queryResult.hasNext - assert(!results) + assert(!queryResult.hasNext) } } From d4a49fa180f01ea4b991f7fa0721770ba4d5978c Mon Sep 17 00:00:00 2001 From: TebaleloS <107194332+TebaleloS@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:12:19 +0200 Subject: [PATCH 30/30] Update database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- .../runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala index 9ba39c454..15e4fa75a 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala @@ -88,6 +88,7 @@ class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite { val results = queryResult.next() assert(results.getInt("status").contains(41)) assert(results.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) // checking no more records are returned. } }