From 43ec036ffa57bbb148dd4b5e42b38e99a69bdcfb Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Thu, 26 Sep 2024 16:30:52 +0200 Subject: [PATCH 1/2] Feature/230 get partitioning checkpoints (#265) get partitioning checkpoints --- .github/workflows/jacoco_report.yml | 8 +- .../V1.8.3__get_partitioning_checkpoints.sql | 100 +++--- ...titioningCheckpointsIntegrationTests.scala | 291 +++++++++++------- .../api/controller/BaseController.scala | 12 + .../api/controller/CheckpointController.scala | 9 +- .../controller/CheckpointControllerImpl.scala | 19 +- .../controller/PartitioningController.scala | 5 - .../PartitioningControllerImpl.scala | 10 - .../GetPartitioningCheckpoints.scala | 30 +- .../absa/atum/server/api/http/Endpoints.scala | 17 +- .../co/absa/atum/server/api/http/Routes.scala | 13 +- .../api/repository/CheckpointRepository.scala | 7 + .../repository/CheckpointRepositoryImpl.scala | 56 +++- .../repository/PartitioningRepository.scala | 3 - .../PartitioningRepositoryImpl.scala | 13 - .../api/service/CheckpointService.scala | 7 + .../api/service/CheckpointServiceImpl.scala | 12 + .../api/service/PartitioningService.scala | 2 - .../api/service/PartitioningServiceImpl.scala | 19 -- .../server/model/CheckpointItemFromDB.scala | 21 +- .../za/co/absa/atum/server/api/TestData.scala | 3 +- .../CheckpointControllerUnitTests.scala | 69 ++++- .../PartitioningControllerUnitTests.scala | 25 -- ...titioningCheckpointsIntegrationTests.scala | 18 +- ...itioningCheckpointsEndpointUnitTests.scala | 143 +++++++++ .../CheckpointRepositoryUnitTests.scala | 37 ++- .../PartitioningRepositoryUnitTests.scala | 28 -- .../service/CheckpointServiceUnitTests.scala | 20 ++ .../PartitioningServiceUnitTests.scala | 19 -- 29 files changed, 680 insertions(+), 336 deletions(-) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala diff --git a/.github/workflows/jacoco_report.yml b/.github/workflows/jacoco_report.yml index e726ee807..ff49512e5 100644 --- a/.github/workflows/jacoco_report.yml +++ b/.github/workflows/jacoco_report.yml @@ -63,7 +63,7 @@ jobs: - name: Add coverage to PR (model) if: steps.jacocorun.outcome == 'success' id: jacoco-model - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -74,7 +74,7 @@ jobs: - name: Add coverage to PR (agent) if: steps.jacocorun.outcome == 'success' id: jacoco-agent - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -85,7 +85,7 @@ jobs: - name: Add coverage to PR (reader) if: steps.jacocorun.outcome == 'success' id: jacoco-reader - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -96,7 +96,7 @@ jobs: - name: Add coverage to PR (server) if: steps.jacocorun.outcome == 'success' id: jacoco-server - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@v1.7.1 with: paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml token: ${{ secrets.GITHUB_TOKEN }} diff --git a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql index 565d582a6..ed2f466d0 100644 --- a/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql +++ b/database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoints.sql @@ -13,10 +13,10 @@ * limitations under the License. */ --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( - IN i_partitioning JSONB, - IN i_limit INT DEFAULT 5, + IN i_partitioning_id BIGINT, + IN i_checkpoints_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, OUT status INTEGER, OUT status_text TEXT, @@ -28,86 +28,106 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints( OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT has_more BOOLEAN ) - RETURNS SETOF record AS -$$ - ------------------------------------------------------------------------------- +RETURNS SETOF record AS +------------------------------------------------------------------------------- -- --- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT) --- Retrieves all checkpoints (measures and their measurement details) related to a +-- Function: runs.get_partitioning_checkpoints(4) +-- Retrieves checkpoints (measures and their measurement details) related to a -- given partitioning (and checkpoint name, if specified). -- -- Parameters: -- i_partitioning - partitioning of requested checkpoints --- i_limit - (optional) maximum number of checkpoint's measurements to return --- if 0 specified, all data will be returned, i.e. no limit will be applied +-- i_checkpoints_limit - (optional) maximum number of checkpoints to return +-- i_offset - (optional) offset of the first checkpoint to return +-- i_checkpoint_name - (optional) name of the checkpoint + +-- Note: i_checkpoints_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order -- -- Returns: --- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name -- status - Status code -- status_text - Status message -- id_checkpoint - ID of the checkpoint -- checkpoint_name - Name of the checkpoint -- author - Author of the checkpoint --- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent -- measure_name - Name of the measure -- measure_columns - Columns of the measure -- measurement_value - Value of the measurement -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation +-- has_more - Flag indicating whether there are more checkpoints available -- -- Status codes: -- 11 - OK -- 41 - Partitioning not found -- ------------------------------------------------------------------------------- +$$ DECLARE - _fk_partitioning BIGINT; + _has_more BOOLEAN; BEGIN - _fk_partitioning = runs._get_id_partitioning(i_partitioning); - - IF _fk_partitioning IS NULL THEN + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN status := 41; status_text := 'Partitioning not found'; RETURN NEXT; RETURN; END IF; + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(*) > i_checkpoints_limit + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + RETURN QUERY + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + WHERE C.fk_partitioning = i_partitioning_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.id_checkpoint, C.process_start_time + LIMIT i_checkpoints_limit OFFSET i_offset + ) SELECT 11 AS status, 'Ok' AS status_text, - C.id_checkpoint, - C.checkpoint_name, - C.created_by AS author, - C.measured_by_atum_agent, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, md.measure_name, md.measured_columns, M.measurement_value, - C.process_start_time AS checkpoint_start_time, - C.process_end_time AS checkpoint_end_time + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, + _has_more AS has_more FROM - runs.checkpoints C - JOIN - runs.measurements M ON C.id_checkpoint = M.fk_checkpoint - JOIN + limited_checkpoints LC + INNER JOIN + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint + INNER JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - WHERE - C.fk_partitioning = _fk_partitioning - AND - (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) ORDER BY - C.process_start_time, - C.id_checkpoint - LIMIT nullif(i_limit, 0); - + LC.id_checkpoint, LC.process_start_time; END; $$ - LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner; - -GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner; - +ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala index 719391c3c..21d623126 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpointsIntegrationTests.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.database.runs import za.co.absa.balta.DBTestSuite @@ -7,7 +23,7 @@ import za.co.absa.balta.classes.setter.CustomDBType import java.time.OffsetDateTime import java.util.UUID -class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ +class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite { private val fncGetPartitioningCheckpoints = "runs.get_partitioning_checkpoints" @@ -31,37 +47,26 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ |""".stripMargin ) - private val partitioning2 = JsonBString( - """ - |{ - | "version": 1, - | "keys": ["key1", "key3", "key2", "key4"], - | "keysToValues": { - | "key1": "valueX", - | "key2": "valueY", - | "key3": "valueZ", - | "key4": "valueA" - | } - |} - |""".stripMargin - ) - - private val i_limit = 10 + private val i_checkpoints_limit = 1 + private val i_offset = 0 private val i_checkpoint_name = "checkpoint_1" private val measurement1 = JsonBString("""1""".stripMargin) private val measurement2 = JsonBString("""2""".stripMargin) - private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]") + private val measured_columns1 = CustomDBType("""{"col1"}""", "TEXT[]") + private val measured_columns2 = CustomDBType("""{"col2"}""", "TEXT[]") - test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") { + private val id_measure_definition1: Long = 1 + private val id_measure_definition2: Long = 2 - val uuid = UUID.randomUUID - val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val id_measure_definition: Long = 1 + private val uuid1 = UUID.fromString("d56fa5e2-79af-4a08-8b0c-6f83ff12cb2c") + private val uuid2 = UUID.fromString("6e42d61e-5cfa-45c1-9d0d-e1f3120107da") + private val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + private val startTime2 = OffsetDateTime.parse("1993-08-03T10:00:00Z") + private val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + test("Returns expected results when there is two measurements for one checkpoint") { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -71,77 +76,85 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .fieldValue("partitioning", partitioning1, "id_partitioning").get.get table("runs.checkpoints").insert( - add("id_checkpoint", uuid) + add("id_checkpoint", uuid1) .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") - .add("process_start_time", startTime) + .add("process_start_time", startTime1) .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) table("runs.measure_definitions").insert( - add("id_measure_definition", id_measure_definition) + add("id_measure_definition", id_measure_definition1) .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) + ) + + table("runs.measure_definitions").insert( + add("id_measure_definition", id_measure_definition2) + .add("fk_partitioning", fkPartitioning1) + .add("created_by", "Daniel") + .add("measure_name", "measure_2") + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( - add("fk_checkpoint", uuid) - .add("fk_measure_definition", id_measure_definition) + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition1) .add("measurement_value", measurement1) ) + table("runs.measurements").insert( + add("fk_checkpoint", uuid1) + .add("fk_measure_definition", id_measure_definition2) + .add("measurement_value", measurement2) + ) + function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", i_checkpoints_limit) + .setParam("i_offset", i_offset) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) - assert(results.getString("measure_name").contains("measure_1")) - assert(!queryResult.hasNext) - } + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) - function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) - .execute { queryResult => assert(queryResult.hasNext) - val results = queryResult.next() - assert(results.getInt("status").contains(11)) - assert(results.getString("status_text").contains("Ok")) - assert(results.getString("checkpoint_name").contains("checkpoint_1")) - assert(results.getUUID("id_checkpoint").contains(uuid)) - assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime)) - assert(results.getJsonB("measurement_value").contains(measurement1)) + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_2")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result2.getJsonB("measurement_value").contains(measurement2)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } } - test("Get partitioning checkpoints returns multiple checkpoints for partitioning with multiple checkpoints") { - - val uuid1 = UUID.randomUUID - val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val endTime1 = OffsetDateTime.parse("2022-11-05T08:00:00Z") - - val uuid2 = UUID.randomUUID - val startTime2 = OffsetDateTime.parse("1995-05-15T12:00:00Z") - val endTime2 = OffsetDateTime.parse("2025-07-20T15:00:00Z") - - val id_measure_definition1: Long = 1 - val id_measure_definition2: Long = 2 - + test( + "Returns expected results when there is one measurement for each of two checkpoints and different filtration is applied" + ) { table("runs.partitionings").insert( add("partitioning", partitioning1) .add("created_by", "Daniel") @@ -155,7 +168,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_1") .add("process_start_time", startTime1) - .add("process_end_time", endTime1) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -165,7 +178,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("checkpoint_name", "checkpoint_2") .add("process_start_time", startTime2) - .add("process_end_time", endTime2) + .add("process_end_time", endTime) .add("measured_by_atum_agent", true) .add("created_by", "Daniel") ) @@ -175,7 +188,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_1") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns1) ) table("runs.measure_definitions").insert( @@ -183,7 +196,7 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ .add("fk_partitioning", fkPartitioning1) .add("created_by", "Daniel") .add("measure_name", "measure_2") - .add("measured_columns", measured_columns) + .add("measured_columns", measured_columns2) ) table("runs.measurements").insert( @@ -199,53 +212,123 @@ class GetPartitioningCheckpointsIntegrationTests extends DBTestSuite{ ) function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning1) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", i_offset) .execute { queryResult => assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) + + val result2 = queryResult.next() + assert(result2.getInt("status").contains(11)) + assert(result2.getString("status_text").contains("Ok")) + assert(result2.getUUID("id_checkpoint").contains(uuid1)) + assert(result2.getString("checkpoint_name").contains("checkpoint_1")) + assert(result2.getString("author").contains("Daniel")) + assert(result2.getBoolean("measured_by_atum_agent").contains(true)) + assert(result2.getString("measure_name").contains("measure_1")) + assert(result2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result2.getJsonB("measurement_value").contains(measurement1)) + assert(result2.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result2.getBoolean("has_more").contains(false)) + assert(!queryResult.hasNext) + } - // Check the first result - val results1 = queryResult.next() - assert(results1.getInt("status").contains(11)) - assert(results1.getString("status_text").contains("Ok")) - assert(results1.getString("checkpoint_name").contains("checkpoint_1")) - assert(results1.getUUID("id_checkpoint").contains(uuid1)) - assert(results1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) - assert(results1.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) - assert(results1.getJsonB("measurement_value").contains(measurement1)) - assert(results1.getString("measure_name").contains("measure_1")) - - // Check the second result + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", i_offset) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => assert(queryResult.hasNext) - val results2 = queryResult.next() - assert(results2.getInt("status").contains(11)) - assert(results2.getString("status_text").contains("Ok")) - assert(results2.getString("checkpoint_name").contains("checkpoint_2")) - assert(results2.getUUID("id_checkpoint").contains(uuid2)) - assert(results2.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) - assert(results2.getOffsetDateTime("checkpoint_end_time").contains(endTime2)) - assert(results2.getJsonB("measurement_value").contains(measurement2)) - assert(results2.getString("measure_name").contains("measure_2")) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid1)) + assert(result1.getString("checkpoint_name").contains("checkpoint_1")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_1")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + assert(result1.getJsonB("measurement_value").contains(measurement1)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(false)) assert(!queryResult.hasNext) } - } - - test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") { - table("runs.partitionings").insert( - add("partitioning", partitioning2) - .add("created_by", "Daniel") - ) + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", 1) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(!queryResult.hasNext) + } function(fncGetPartitioningCheckpoints) - .setParam("i_partitioning", partitioning2) - .setParam("i_limit", i_limit) + .setParam("i_partitioning_id", 0L) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", 1) .setParam("i_checkpoint_name", i_checkpoint_name) .execute { queryResult => + assert(queryResult.hasNext) + val result = queryResult.next() + assert(result.getInt("status").contains(41)) + assert(result.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } + + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", fkPartitioning1) + .setParam("i_checkpoints_limit", 1) + .setParam("i_offset", i_offset) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(11)) + assert(result1.getString("status_text").contains("Ok")) + assert(result1.getUUID("id_checkpoint").contains(uuid2)) + assert(result1.getString("checkpoint_name").contains("checkpoint_2")) + assert(result1.getString("author").contains("Daniel")) + assert(result1.getBoolean("measured_by_atum_agent").contains(true)) + assert(result1.getString("measure_name").contains("measure_2")) + assert(result1.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + assert(result1.getJsonB("measurement_value").contains(measurement2)) + assert(result1.getOffsetDateTime("checkpoint_start_time").contains(startTime2)) + assert(result1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(result1.getBoolean("has_more").contains(true)) assert(!queryResult.hasNext) } + } + test("Returns expected status when partitioning not found"){ + function(fncGetPartitioningCheckpoints) + .setParam("i_partitioning_id", 1) + .setParam("i_checkpoints_limit", i_checkpoints_limit) + .setParam("i_offset", i_offset) + .setParam("i_checkpoint_name", i_checkpoint_name) + .execute { queryResult => + assert(queryResult.hasNext) + val result1 = queryResult.next() + assert(result1.getInt("status").contains(41)) + assert(result1.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 5e88951ae..fd0e9720f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.http.ApiPaths +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse._ import za.co.absa.atum.server.model._ import zio._ @@ -54,6 +55,17 @@ trait BaseController { effect.map(MultiSuccessResponse(_)) } + protected def mapToPaginatedResponse[A]( + limit: Int, + offset: Long, + effect: IO[ErrorResponse, PaginatedResult[A]] + ): IO[ErrorResponse, PaginatedResponse[A]] = { + effect.map { + case ResultHasMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = true)) + case ResultNoMore(data) => PaginatedResponse(data, Pagination(limit, offset, hasMore = false)) + } + } + // Root-anchored URL path // https://stackoverflow.com/questions/2005079/absolute-vs-relative-urls/78439286#78439286 protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index e72ee1ae3..84280e465 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -39,4 +39,11 @@ trait CheckpointController { checkpointId: UUID ): IO[ErrorResponse, SingleSuccessResponse[CheckpointV2DTO]] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] = None, + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 9cbd50391..c2221eec8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import java.util.UUID @@ -63,6 +63,21 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che ) ) } + + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] = None + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + checkpointService.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + ) + ) + } } object CheckpointControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index ac8d057f1..db212a249 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -41,13 +41,8 @@ trait PartitioningController { additionalDataPatchDTO: AdditionalDataPatchDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] - def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] - def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] - def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index cdb457e8c..6bac1662a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -50,16 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) atumContextDTOEffect } - override def getPartitioningCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { - mapToMultiSuccessResponse( - serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( - partitioningService.getPartitioningCheckpoints(checkpointQueryDTO) - ) - ) - } - override def getPartitioningAdditionalDataV2( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala index 4901b867a..a76ed0ad1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala @@ -17,31 +17,31 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs -import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} +import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB} import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import zio._ -import io.circe.syntax.EncoderOps import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ -import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} -import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstRowStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => + extends DoobieMultipleResultFunctionWithAggStatus[GetPartitioningCheckpointsArgs, Option[CheckpointItemFromDB], Task](args => Seq( - fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", - fr"${values.limit}", - fr"${values.checkpointName}" + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}", + fr"${args.checkpointName}" ) ) with StandardStatusHandling - with ByFirstErrorStatusAggregator { + with ByFirstRowStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", @@ -52,11 +52,19 @@ class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngi "measured_columns", "measurement_value", "checkpoint_start_time", - "checkpoint_end_time" + "checkpoint_end_time", + "has_more" ) } object GetPartitioningCheckpoints { + case class GetPartitioningCheckpointsArgs( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ) + val layer: URLayer[PostgresDatabaseProvider, GetPartitioningCheckpoints] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index d8c3d729f..46f66d095 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -23,8 +23,8 @@ import sttp.tapir.json.circe.jsonBody import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} -import sttp.tapir.{PublicEndpoint, endpoint} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} +import sttp.tapir.{PublicEndpoint, Validator, endpoint} import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} import java.util.UUID @@ -108,12 +108,17 @@ trait Endpoints extends BaseEndpoints { } protected val getPartitioningCheckpointsEndpointV2 - : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[ + CheckpointV2DTO + ], Any] = { apiV2.get - .in(GetPartitioningCheckpoints) - .in(jsonBody[CheckpointQueryDTO]) + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Checkpoints) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) + .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) } protected val getFlowCheckpointsEndpointV2 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index ea8d7d87b..452ff87ea 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -29,7 +29,7 @@ import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -80,7 +80,16 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointId) } ), - createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ]( + getPartitioningCheckpointsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + } + ), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 771b68efa..5f48c96b3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.PaginatedResult import zio._ import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointRepository { def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] def writeCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[DatabaseError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[DatabaseError, CheckpointV2DTO] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index f4ee58629..9f7608232 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -19,11 +19,13 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs -import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpointV2, WriteCheckpoint} +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV2} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError -import za.co.absa.atum.server.model.CheckpointItemFromDB +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} import zio._ import zio.interop.catz.asyncInstance @@ -32,7 +34,8 @@ import java.util.UUID class CheckpointRepositoryImpl( writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2, - getCheckpointV2Fn: GetPartitioningCheckpointV2 + getCheckpointV2Fn: GetPartitioningCheckpointV2, + getPartitioningCheckpoints: GetPartitioningCheckpoints ) extends CheckpointRepository with BaseRepository { @@ -60,14 +63,47 @@ class CheckpointRepositoryImpl( } } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(partitioningId, limit, offset, checkpointName)), + "getPartitioningCheckpoints" + ) + .map(_.flatten) + .flatMap { checkpointItems => + ZIO + .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + checkpoints => + if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) + else ResultNoMore(checkpoints) + ) + } + } + } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2, CheckpointRepository] = ZLayer { - for { - writeCheckpoint <- ZIO.service[WriteCheckpoint] - writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] - getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] - } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2, getCheckpointV2) - } + val layer: URLayer[ + WriteCheckpoint with WriteCheckpointV2 with GetPartitioningCheckpointV2 with GetPartitioningCheckpoints, + CheckpointRepository + ] = + ZLayer { + for { + writeCheckpoint <- ZIO.service[WriteCheckpoint] + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + getCheckpointV2 <- ZIO.service[GetPartitioningCheckpointV2] + getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] + } yield new CheckpointRepositoryImpl( + writeCheckpoint, + writeCheckpointV2, + getCheckpointV2, + getPartitioningCheckpoints + ) + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index b35f908da..88ca0dbbb 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,7 +18,6 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB import zio.IO import zio.macros.accessible @@ -43,8 +42,6 @@ trait PartitioningRepository { additionalData: AdditionalDataPatchDTO ): IO[DatabaseError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] - def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 83fa2d92f..c6363589b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -31,7 +31,6 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresFn: GetPartitioningMeasures, getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, - getPartitioningCheckpointsFn: GetPartitioningCheckpoints, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById @@ -82,15 +81,6 @@ class PartitioningRepositoryImpl( ).map(_.map { case AdditionalDataFromDB(adName, adValue) => adName.get -> adValue }.toMap) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbMultipleResultCallWithAggregatedStatus( - getPartitioningCheckpointsFn(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[DatabaseError, AdditionalDataDTO] = { dbMultipleResultCallWithAggregatedStatus( getPartitioningAdditionalDataV2Fn(partitioningId), @@ -127,7 +117,6 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasures with GetPartitioningAdditionalData with CreateOrUpdateAdditionalData - with GetPartitioningCheckpoints with GetPartitioningAdditionalDataV2 with GetPartitioningById with GetPartitioningMeasuresById, @@ -139,7 +128,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures] getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData] - getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] @@ -149,7 +137,6 @@ object PartitioningRepositoryImpl { getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, - getPartitioningCheckpoints, getPartitioningById, getPartitioningAdditionalDataV2, getPartitioningMeasuresV2 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index 62cf7a231..d45c09e59 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -28,4 +29,10 @@ trait CheckpointService { def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] def getCheckpointV2(partitioningId: Long, checkpointId: UUID): IO[ServiceError, CheckpointV2DTO] def saveCheckpointV2(partitioningId: Long, checkpointV2DTO: CheckpointV2DTO): IO[ServiceError, Unit] + def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index 1da3a19f8..c0844f063 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult import zio._ import java.util.UUID @@ -46,6 +47,17 @@ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends ) } + override def getPartitioningCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + repositoryCall( + checkpointRepository.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName), + "getPartitioningCheckpoints" + ) + } } object CheckpointServiceImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index b47bd9652..7779a710d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -40,8 +40,6 @@ trait PartitioningService { additionalData: AdditionalDataPatchDTO ): IO[ServiceError, AdditionalDataDTO] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] - def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 6fcd7c966..2525568f9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -18,9 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.atum.server.model.CheckpointFromDB import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -59,23 +57,6 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } - override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ServiceError, Seq[CheckpointDTO]] = { - for { - checkpointsFromDB <- repositoryCall( - partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), - "getPartitioningCheckpoints" - ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => - ZIO - .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => GeneralServiceError(error.getMessage)) - } - } yield checkpointDTOs - - } - override def getPartitioningAdditionalDataV2(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] = { repositoryCall( partitioningRepository.getPartitioningAdditionalDataV2(partitioningId), diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala index 035e55fc9..3d8ca67c9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala @@ -31,7 +31,8 @@ case class CheckpointItemFromDB( measuredColumns: Seq[String], measurementValue: Json, // JSON representation of `MeasurementDTO` checkpointStartTime: ZonedDateTime, - checkpointEndTime: Option[ZonedDateTime] + checkpointEndTime: Option[ZonedDateTime], + hasMore: Boolean ) object CheckpointItemFromDB { @@ -71,4 +72,22 @@ object CheckpointItemFromDB { } } + def groupAndConvertItemsToCheckpointV2DTOs( + checkpointItems: Seq[CheckpointItemFromDB] + ): Either[DecodingFailure, Seq[CheckpointV2DTO]] = { + val groupedItems = checkpointItems.groupBy(_.idCheckpoint) + val orderedIds = checkpointItems.map(_.idCheckpoint).distinct + + val result = orderedIds.map { id => + CheckpointItemFromDB.fromItemsToCheckpointV2DTO(groupedItems(id)) + } + + val errors = result.collect { case Left(err) => err } + if (errors.nonEmpty) { + Left(errors.head) + } else { + Right(result.collect { case Right(dto) => dto }) + } + } + } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 61d0df6b0..aca4c0784 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -353,7 +353,8 @@ trait TestData { measuredColumns = checkpointV2DTO1.measurements.head.measure.measuredColumns, measurementValue = checkpointV2DTO1.measurements.head.result.asJson, checkpointStartTime = checkpointV2DTO1.processStartTime, - checkpointEndTime = checkpointV2DTO1.processEndTime + checkpointEndTime = checkpointV2DTO1.processEndTime, + hasMore = true ) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index 52d13e6c3..b4e8d33fa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -22,8 +22,14 @@ import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.CheckpointService +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse -import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} +import za.co.absa.atum.server.model.{ + ConflictErrorResponse, + InternalServerErrorResponse, + NotFoundErrorResponse, + Pagination +} import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -38,23 +44,31 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { when(checkpointServiceMock.saveCheckpoint(checkpointDTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) - private val partitioningId = 1L + private val partitioningId1 = 1L + private val partitioningId2 = 2L - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO1)).thenReturn(ZIO.unit) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO2)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO1)).thenReturn(ZIO.unit) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO2)) .thenReturn(ZIO.fail(GeneralServiceError("error in data"))) - when(checkpointServiceMock.saveCheckpointV2(partitioningId, checkpointV2DTO3)) + when(checkpointServiceMock.saveCheckpointV2(partitioningId1, checkpointV2DTO3)) .thenReturn(ZIO.fail(ConflictServiceError("boom!"))) when(checkpointServiceMock.saveCheckpointV2(0L, checkpointV2DTO3)) .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO1.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO1.id)) .thenReturn(ZIO.succeed(checkpointV2DTO1)) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundServiceError("not found"))) - when(checkpointServiceMock.getCheckpointV2(partitioningId, checkpointV2DTO3.id)) + when(checkpointServiceMock.getCheckpointV2(partitioningId1, checkpointV2DTO3.id)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId1, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(partitioningId2, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO1)))) + when(checkpointServiceMock.getPartitioningCheckpoints(0L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) + private val checkpointServiceMockLayer = ZLayer.succeed(checkpointServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -80,11 +94,11 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("PostCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.postCheckpointV2(partitioningId, checkpointV2DTO1) + result <- CheckpointController.postCheckpointV2(partitioningId1, checkpointV2DTO1) } yield assertTrue( result._1.isInstanceOf[SingleSuccessResponse[CheckpointV2DTO]] && result._1.data == checkpointV2DTO1 - && result._2 == s"/api/v2/partitionings/$partitioningId/checkpoints/${checkpointV2DTO1.id}" + && result._2 == s"/api/v2/partitionings/$partitioningId1/checkpoints/${checkpointV2DTO1.id}" ) }, test("Returns expected InternalServerErrorResponse") { @@ -106,19 +120,48 @@ object CheckpointControllerUnitTests extends ConfigProviderTest with TestData { suite("GetPartitioningCheckpointV2Suite")( test("Returns expected CheckpointDTO") { for { - result <- CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO1.id) + result <- CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO1.id) } yield assertTrue(result.data == checkpointV2DTO1) }, test("Returns expected NotFoundErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO2.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO2.id).exit)( failsWithA[NotFoundErrorResponse] ) }, test("Returns expected InternalServerErrorResponse") { - assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId, checkpointV2DTO3.id).exit)( + assertZIO(CheckpointController.getPartitioningCheckpointV2(partitioningId1, checkpointV2DTO3.id).exit)( failsWithA[InternalServerErrorResponse] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId1, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = true) + ) + }, + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { + for { + result <- CheckpointController.getPartitioningCheckpoints( + partitioningId2, + limit = Some(10), + offset = Some(0) + ) + } yield assertTrue( + result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(10, 0, hasMore = false) + ) + }, + test("Returns expected NotFoundErrorResponse when service returns NotFoundServiceError") { + assertZIO(CheckpointController.getPartitioningCheckpoints(0L, Some(10), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( CheckpointControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 2473bf490..0e249faee 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -17,7 +17,6 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError.{ConflictServiceError, GeneralServiceError, NotFoundServiceError} import za.co.absa.atum.server.api.service.PartitioningService @@ -55,13 +54,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.patchAdditionalData(2L, additionalDataPatchDTO1)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointDTO1, checkpointDTO2))) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.succeed(Seq.empty)) - when(partitioningServiceMock.getPartitioningCheckpoints(checkpointQueryDTO3)) - .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(partitioningServiceMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningServiceMock.getPartitioningAdditionalDataV2(2L)) @@ -132,23 +124,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq[MeasureDTO]") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) - } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) - }, - test("Returns expected empty sequence") { - for { - result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) - } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) - }, - test("Returns expected InternalServerErrorResponse") { - assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( - failsWithA[InternalServerErrorResponse] - ) - } - ), suite("GetPartitioningSuite")( test("Returns expected PartitioningWithIdDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala index ab38e39ca..0c8e12cf9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala @@ -17,9 +17,9 @@ package za.co.absa.atum.server.api.database.runs.functions import za.co.absa.atum.server.ConfigProviderTest -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.db.fadb.exceptions.DataNotFoundException import za.co.absa.db.fadb.status.FunctionStatus import zio.interop.catz.asyncInstance @@ -29,23 +29,11 @@ import zio.test._ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - - val partitioningDTO1: PartitioningDTO = Seq( - PartitionDTO("stringA", "stringA"), - PartitionDTO("stringB", "stringB") - ) - suite("GetPartitioningCheckpointsIntegrationTests")( - test("Returns expected sequence of Checkpoints with existing partitioning") { - val partitioningQueryDTO: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO1, - limit = Some(10), - checkpointName = Some("checkpointName") - ) - + test("Returns expected sequence of Checkpoints with non-existing partitioning id") { for { getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] - result <- getPartitioningCheckpoints(partitioningQueryDTO) + result <- getPartitioningCheckpoints(GetPartitioningCheckpointsArgs(0L, None, None, None)) } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala new file mode 100644 index 000000000..dd9fc89c7 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetPartitioningCheckpointsEndpointUnitTests.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.{UriContext, basicRequest} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.CheckpointController +import za.co.absa.atum.server.model.{NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.{Scope, ZIO, ZLayer} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} + +import java.util.UUID + +object GetPartitioningCheckpointsEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val checkpointControllerMock = mock(classOf[CheckpointController]) + + private val uuid = UUID.randomUUID() + + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(10), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(1L, Some(20), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid))) + when(checkpointControllerMock.getPartitioningCheckpoints(2L, Some(10), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundErrorResponse("partitioning not found"))) + + private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) + + private val getPartitioningCheckpointServerEndpointV2 = getPartitioningCheckpointsEndpointV2 + .zServerLogic({ + case (partitioningId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[CheckpointController])) + .whenServerEndpoint(getPartitioningCheckpointServerEndpointV2) + .thenRunLogic() + .backend() + + suite("GetPartitioningCheckpointsEndpointSuite")( + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(10, 0, hasMore = true), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with no more data available") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=20&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(20, 0, hasMore = false), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns expected 404 when checkpoint data for a given ID doesn't exist") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/2/checkpoints?limit=10&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns expected 400 when limit is out of range") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=1001&offset=0") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + }, + test("Returns expected 400 when offset is negative") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1/checkpoints?limit=10&offset=-1") + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + } + ) + }.provide( + checkpointControllerMockLayer + ) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index 797c4d82b..9f5568979 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -17,19 +17,21 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args +import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException} import za.co.absa.db.fadb.status.FunctionStatus import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ - import za.co.absa.db.fadb.status.Row object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -37,6 +39,7 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) private val getCheckpointMockV2: GetPartitioningCheckpointV2 = mock(classOf[GetPartitioningCheckpointV2]) private val writeCheckpointV2Mock: WriteCheckpointV2 = mock(classOf[WriteCheckpointV2]) + private val getPartitioningCheckpointsMock: GetPartitioningCheckpoints = mock(classOf[GetPartitioningCheckpoints]) when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(writeCheckpointMock.apply(checkpointDTO2)) @@ -61,9 +64,17 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { when(getCheckpointMockV2.apply(GetPartitioningCheckpointV2Args(partitioningId, checkpointV2DTO3.id))) .thenReturn(ZIO.fail(new Exception("boom!"))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(0L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), Some(checkpointItemFromDB1))))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(1L, None, None, None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "ok"), None)))) + when(getPartitioningCheckpointsMock.apply(GetPartitioningCheckpointsArgs(3L, None, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) private val getCheckpointV2MockLayer = ZLayer.succeed(getCheckpointMockV2) private val writeCheckpointV2MockLayer = ZLayer.succeed(writeCheckpointV2Mock) + private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -121,12 +132,34 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Seq") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(0L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultHasMore[CheckpointV2DTO]] && result.data == Seq(checkpointV2DTO1) + ) + }, + test("Returns expected Seq.empty") { + for { + result <- CheckpointRepository.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue( + result.isInstanceOf[ResultNoMore[CheckpointV2DTO]] && result.data == Seq.empty[CheckpointV2DTO] + ) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(CheckpointRepository.getPartitioningCheckpoints(3L, None, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + } ) ).provide( CheckpointRepositoryImpl.layer, writeCheckpointMockLayer, writeCheckpointV2MockLayer, - getCheckpointV2MockLayer + getCheckpointV2MockLayer, + getPartitioningCheckpointsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 8f3f3dc49..a56dd9604 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -100,16 +100,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) - // Get Partitioning Checkpoints Mocks - private val getPartitioningCheckpointsMock = mock(classOf[GetPartitioningCheckpoints]) - - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)) - .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1)))) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.right(Seq.empty)) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - - private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) - // Get Partitioning By Id Mocks private val getPartitioningByIdMock = mock(classOf[GetPartitioningById]) @@ -244,23 +234,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Seq") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue(result == Seq(checkpointFromDB1)) - }, - test("Returns expected DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[DatabaseError] - ) - }, - test("Returns expected Seq.empty") { - for { - result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO3) - } yield assertTrue(result.isEmpty) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected AdditionalDataDTO instance") { for { @@ -330,7 +303,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresMockLayer, getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, - getPartitioningCheckpointsMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, getPartitioningMeasuresV2MockLayer diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index 69b844bfc..3a88de682 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -21,6 +21,7 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.CheckpointRepository +import za.co.absa.atum.server.model.PaginatedResult.ResultHasMore import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -49,6 +50,11 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { when(checkpointRepositoryMock.getCheckpointV2(partitioningId, checkpointV2DTO2.id)) .thenReturn(ZIO.fail(NotFoundDatabaseError("not found"))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(1L, None, None, None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(checkpointRepositoryMock.getPartitioningCheckpoints(0L, None, None, None)) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + private val checkpointRepositoryMockLayer = ZLayer.succeed(checkpointRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -104,6 +110,20 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundServiceError] ) } + ), + suite("GetPartitioningCheckpointsSuite")( + test("Returns expected Right with Seq[CheckpointDTO]") { + for { + result <- CheckpointService.getPartitioningCheckpoints(1L, None, None, None) + } yield assertTrue { + result == ResultHasMore(Seq(checkpointV2DTO1)) + } + }, + test("Returns expected NotFoundServiceError") { + assertZIO(CheckpointService.getPartitioningCheckpoints(0L, None, None, None).exit)( + failsWithA[NotFoundServiceError] + ) + } ) ).provide( CheckpointServiceImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index ffbe160f7..6776dde90 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -61,11 +61,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningAdditionalData(partitioningDTO2)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) - when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(1L)) .thenReturn(ZIO.succeed(additionalDataDTO1)) when(partitioningRepositoryMock.getPartitioningAdditionalDataV2(2L)) @@ -176,20 +171,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( - test("Returns expected Right with Seq[CheckpointDTO]") { - for { - result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue { - result == Seq(checkpointDTO1, checkpointDTO2.copy(partitioning = checkpointDTO1.partitioning)) - } - }, - test("Returns expected ServiceError") { - assertZIO(PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( - failsWithA[ServiceError] - ) - } - ), suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected Right with AdditionalDataDTO") { for { From db4a867104dcc592acc2cc0e2cc51b3dd26ebd18 Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Thu, 26 Sep 2024 16:46:19 +0200 Subject: [PATCH 2/2] Feature/235 get flow partitionings (#267) get flow partitionings --- .../flows/V1.3.5___add_to_parent_flows.sql | 3 +- .../flows/V1.9.8__get_flow_partitionings.sql | 97 +++++++++ ...GetFlowPartitioningsIntegrationTests.scala | 193 ++++++++++++++++++ .../scala/za/co/absa/atum/server/Main.scala | 3 +- .../controller/PartitioningController.scala | 8 +- .../PartitioningControllerImpl.scala | 18 +- .../functions/GetFlowPartitionings.scala | 81 ++++++++ .../absa/atum/server/api/http/Endpoints.scala | 13 ++ .../co/absa/atum/server/api/http/Routes.scala | 15 +- .../repository/PartitioningRepository.scala | 8 +- .../PartitioningRepositoryImpl.scala | 37 +++- .../api/service/PartitioningService.scala | 7 + .../api/service/PartitioningServiceImpl.scala | 11 + .../za/co/absa/atum/server/api/TestData.scala | 15 ++ .../PartitioningControllerUnitTests.scala | 42 +++- ...GetFlowPartitioningsIntegrationTests.scala | 46 +++++ ...FlowPartitioningsV2EndpointUnitTests.scala | 118 +++++++++++ .../PartitioningRepositoryUnitTests.scala | 44 +++- .../PartitioningServiceUnitTests.scala | 41 +++- 19 files changed, 778 insertions(+), 22 deletions(-) create mode 100644 database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala diff --git a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql index a117a323a..d424ebf9f 100644 --- a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql +++ b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows( IN i_fk_partitioning BIGINT, IN i_by_user TEXT, OUT status INTEGER, - OUT status_text TEXT, - OUT id_flow BIGINT + OUT status_text TEXT ) RETURNS record AS $$ ------------------------------------------------------------------------------- diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql new file mode 100644 index 000000000..6ef71fd94 --- /dev/null +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -0,0 +1,97 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( + IN i_flow_id BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +------------------------------------------------------------------------------- +-- +-- Function: flows.get_flow_partitionings(3) +-- Retrieves all partitionings associated with the input flow. +-- +-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table +-- +-- Parameters: +-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id - ID of retrieved partitioning +-- partitioning - Partitioning value +-- author - Author of the partitioning +-- has_more - Flag indicating if there are more partitionings available +-- +-- Status codes: +-- 11 - OK +-- 41 - Flow not found +-- +------------------------------------------------------------------------------- +$$ +DECLARE + _has_more BOOLEAN; +BEGIN + PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Flow not found'; + RETURN NEXT; + RETURN; + END IF; + + IF i_limit IS NOT NULL THEN + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = i_flow_id + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + + + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning, + P.partitioning, + P.created_by, + _has_more + FROM + runs.partitionings P INNER JOIN + flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning + WHERE + PF.fk_flow = i_flow_id + ORDER BY + P.id_partitioning, + P.created_at DESC + LIMIT i_limit OFFSET i_offset; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..904cd42b9 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,193 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.flows + +import io.circe.Json +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class GetFlowPartitioningsIntegrationTests extends DBTestSuite { + + private val getFlowPartitioningsFn = "flows.get_flow_partitionings" + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioningsTable = "runs.partitionings" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val partitioning1Parent = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB" + | } + |} + |""".stripMargin + ) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValues": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfParentPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns partitioning(s) for a given flow") { + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph")) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId1Parent: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1Parent, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1Parent) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfParentPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1Parent) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfPartitioning1) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson == returnedPartitioningJson) + assert(!result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 1) // limit is set to 1, so only one partitioning should be returned and more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 2) // limit is set to 2, so both partitionings should be returned and no more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(!result1.getBoolean("has_more").get) + assert(queryResult.hasNext) + assert(queryResult.hasNext) + val result2 = queryResult.next() + assert(result2.getLong("id").get == partId1Parent) + val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning1Parent) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(result2.getJsonB("partitioning").get) + assert(expectedPartitioningJson2 == returnedPartitioningJson2) + assert(!result2.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + } + + test("Fails for non-existent flow"){ + function(getFlowPartitioningsFn) + .setParam("i_flow_id", 999999) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 41) + assert(result1.getString("status_text").get == "Flow not found") + assert(!queryResult.hasNext) + } + } + + private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { + parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) + } + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index cd60a204d..be3d7bc5a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server import za.co.absa.atum.server.api.controller._ -import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings} import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.http.Server @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server { GetPartitioningCheckpointV2.layer, GetFlowCheckpoints.layer, GetPartitioningById.layer, + GetFlowPartitionings.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index db212a249..605e06140 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -46,4 +46,10 @@ trait PartitioningController { def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 6bac1662a..c450405e8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio._ class PartitioningControllerImpl(partitioningService: PartitioningService) @@ -105,6 +105,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getFlowPartitionings(flowId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala new file mode 100644 index 000000000..be56853dc --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitioningDTO, PartitioningWithIdDTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.Flows +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +import scala.annotation.tailrec + +class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, Option[ + GetFlowPartitioningsResult + ], Task](args => + Seq( + fr"${args.flowId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("id", "partitioning", "author", "has_more") +} + +object GetFlowPartitionings { + case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long]) + case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetFlowPartitioningsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetFlowPartitioningsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningDTO] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningDTO) => + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.id, partitioningDTO, head.author)) + } + } + } + + } + + val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetFlowPartitionings()(Flows, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 46f66d095..0f4febcc5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -148,6 +148,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } + protected val getFlowPartitioningsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Partitionings) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 452ff87ea..a2456c697 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -93,6 +93,16 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getFlowPartitioningsEndpointV2, + { case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -111,7 +121,8 @@ trait Routes extends Endpoints with ServerOptions { getPartitioningCheckpointsEndpointV2, getPartitioningCheckpointEndpointV2, getFlowCheckpointsEndpointV2, - getPartitioningMeasuresEndpointV2 + getPartitioningMeasuresEndpointV2, + getFlowPartitioningsEndpointV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 88ca0dbbb..54ca4f700 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -44,6 +45,11 @@ trait PartitioningRepository { def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] - def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index c6363589b..37e6f81ef 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -17,6 +17,11 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.{ + GetFlowPartitioningsArgs, + GetFlowPartitioningsResult +} import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -24,6 +29,7 @@ import za.co.absa.atum.server.model._ import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} class PartitioningRepositoryImpl( createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists, @@ -33,7 +39,8 @@ class PartitioningRepositoryImpl( createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, - getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById + getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, + getFlowPartitioningsFn: GetFlowPartitionings ) extends PartitioningRepository with BaseRepository { @@ -108,6 +115,27 @@ class PartitioningRepositoryImpl( }) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getFlowPartitioningsFn(GetFlowPartitioningsArgs(flowId, limit, offset)), + "getFlowPartitionings" + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetFlowPartitioningsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } + } } object PartitioningRepositoryImpl { @@ -119,7 +147,8 @@ object PartitioningRepositoryImpl { with CreateOrUpdateAdditionalData with GetPartitioningAdditionalDataV2 with GetPartitioningById - with GetPartitioningMeasuresById, + with GetPartitioningMeasuresById + with GetFlowPartitionings, PartitioningRepository ] = ZLayer { for { @@ -131,6 +160,7 @@ object PartitioningRepositoryImpl { getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -139,7 +169,8 @@ object PartitioningRepositoryImpl { createOrUpdateAdditionalData, getPartitioningById, getPartitioningAdditionalDataV2, - getPartitioningMeasuresV2 + getPartitioningMeasuresV2, + getFlowPartitionings ) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 7779a710d..a9d03b798 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -43,4 +44,10 @@ trait PartitioningService { def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 2525568f9..6d678e89d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.PartitioningRepository +import za.co.absa.atum.server.model.PaginatedResult import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -85,6 +86,16 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getFlowPartitionings(flowId, limit, offset), + "getFlowPartitionings" + ) + } } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index aca4c0784..db8c3c390 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -25,6 +25,7 @@ import java.util.UUID import MeasureResultDTO.TypedValue import io.circe.syntax.EncoderOps import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult trait TestData { @@ -75,6 +76,20 @@ trait TestData { author = "author" ) + protected val getFlowPartitioningsResult1: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getFlowPartitioningsResult2: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 0e249faee..315c21e76 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -18,10 +18,11 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.ServiceError.{ConflictServiceError, GeneralServiceError, NotFoundServiceError} +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model._ import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -68,6 +69,15 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioning(99L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(3L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(4L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -161,6 +171,32 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundErrorResponse] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getFlowPartitionings(1L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with no more data available") { + for { + result <- PartitioningController.getFlowPartitionings(2L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = false), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(3L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(4L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( PartitioningControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..3760c62b4 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetFlowPartitioningsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetFlowPartitioningsIntegrationTests")( + test("Returns expected DataNotFoundException when flow not found") { + for { + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] + result <- getFlowPartitionings(GetFlowPartitioningsArgs(0L, None, None)) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + } + ) + }.provide( + GetFlowPartitionings.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala new file mode 100644 index 000000000..06616eedd --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetFlowPartitioningsV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("flow not found") + ) + ) + when(partitioningControllerMock.getFlowPartitionings(3L, None, None)) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getFlowPartitioningsServerEndpoint = + getFlowPartitioningsEndpointV2.zServerLogic({ case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getFlowPartitioningsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetFlowPartitioningsV2EndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/1/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/2/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/3/partitionings") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index a56dd9604..23e147aaa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -19,17 +19,20 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, PartitioningWithIdDTO} import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException, ErrorInDataException} import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, PaginatedResult} object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -138,6 +141,20 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) + private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) + + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { suite("PartitioningRepositorySuite")( @@ -295,6 +312,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(2L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(0L, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(3L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } ) ).provide( PartitioningRepositoryImpl.layer, @@ -305,7 +344,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { createOrUpdateAdditionalDataMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, - getPartitioningMeasuresV2MockLayer + getPartitioningMeasuresV2MockLayer, + getFlowPartitioningsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 6776dde90..3e647d5a1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -23,6 +23,7 @@ import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -79,6 +80,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningMeasuresById(3L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(1L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(2L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(3L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(4L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -218,11 +228,32 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralServiceError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(1L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(2L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getFlowPartitionings(3L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when flow doesn't exist") { + assertZIO(PartitioningService.getFlowPartitionings(4L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } ) - ).provide( - PartitioningServiceImpl.layer, - partitioningRepositoryMockLayer ) - - } + }.provide( + PartitioningServiceImpl.layer, + partitioningRepositoryMockLayer + ) }