Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/234-get-mai…
Browse files Browse the repository at this point in the history
…n-flow-of-a-partitioning

# Conflicts:
#	server/src/main/scala/za/co/absa/atum/server/Main.scala
#	server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala
#	server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala
#	server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala
#	server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala
#	server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala
#	server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala
#	server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala
#	server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala
#	server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala
  • Loading branch information
lsulak committed Sep 27, 2024
2 parents db3f72f + db4a867 commit a9734a2
Show file tree
Hide file tree
Showing 36 changed files with 1,454 additions and 351 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/jacoco_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Add coverage to PR (model)
if: steps.jacocorun.outcome == 'success'
id: jacoco-model
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -74,7 +74,7 @@ jobs:
- name: Add coverage to PR (agent)
if: steps.jacocorun.outcome == 'success'
id: jacoco-agent
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -85,7 +85,7 @@ jobs:
- name: Add coverage to PR (reader)
if: steps.jacocorun.outcome == 'success'
id: jacoco-reader
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -96,7 +96,7 @@ jobs:
- name: Add coverage to PR (server)
if: steps.jacocorun.outcome == 'success'
id: jacoco-server
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows(
IN i_fk_partitioning BIGINT,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_flow BIGINT
OUT status_text TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION flows.get_flow_partitionings(
IN i_flow_id BIGINT,
IN i_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT partitioning JSONB,
OUT author TEXT,
OUT has_more BOOLEAN
) RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: flows.get_flow_partitionings(3)
-- Retrieves all partitionings associated with the input flow.
--
-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table
--
-- Parameters:
-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved
-- i_limit - (optional) maximum number of partitionings to return, default is 5
-- i_offset - (optional) offset to use for pagination, default is 0
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id - ID of retrieved partitioning
-- partitioning - Partitioning value
-- author - Author of the partitioning
-- has_more - Flag indicating if there are more partitionings available
--
-- Status codes:
-- 11 - OK
-- 41 - Flow not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_has_more BOOLEAN;
BEGIN
PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Flow not found';
RETURN NEXT;
RETURN;
END IF;

IF i_limit IS NOT NULL THEN
SELECT count(*) > i_limit
FROM flows.partitioning_to_flow PTF
WHERE PTF.fk_flow = i_flow_id
LIMIT i_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;


RETURN QUERY
SELECT
11 AS status,
'OK' AS status_text,
P.id_partitioning,
P.partitioning,
P.created_by,
_has_more
FROM
runs.partitionings P INNER JOIN
flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning
WHERE
PF.fk_flow = i_flow_id
ORDER BY
P.id_partitioning,
P.created_at DESC
LIMIT i_limit OFFSET i_offset;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner;
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* limitations under the License.
*/

-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
IN i_partitioning JSONB,
IN i_limit INT DEFAULT 5,
IN i_partitioning_id BIGINT,
IN i_checkpoints_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
Expand All @@ -28,86 +28,106 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE,
OUT has_more BOOLEAN
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
-- Retrieves all checkpoints (measures and their measurement details) related to a
-- Function: runs.get_partitioning_checkpoints(4)
-- Retrieves checkpoints (measures and their measurement details) related to a
-- given partitioning (and checkpoint name, if specified).
--
-- Parameters:
-- i_partitioning - partitioning of requested checkpoints
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoints_limit - (optional) maximum number of checkpoints to return
-- i_offset - (optional) offset of the first checkpoint to return
-- i_checkpoint_name - (optional) name of the checkpoint

-- Note: i_checkpoints_limit and i_offset are used for pagination purposes;
-- checkpoints are ordered by process_start_time in descending order
-- and then by id_checkpoint in ascending order
--
-- Returns:
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- author - Author of the checkpoint
-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
-- has_more - Flag indicating whether there are more checkpoints available
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_fk_partitioning BIGINT;
_has_more BOOLEAN;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

IF i_checkpoints_limit IS NOT NULL THEN
SELECT count(*) > i_checkpoints_limit
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
LIMIT i_checkpoints_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;

RETURN QUERY
WITH limited_checkpoints AS (
SELECT C.id_checkpoint,
C.checkpoint_name,
C.created_by,
C.measured_by_atum_agent,
C.process_start_time,
C.process_end_time
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY C.id_checkpoint, C.process_start_time
LIMIT i_checkpoints_limit OFFSET i_offset
)
SELECT
11 AS status,
'Ok' AS status_text,
C.id_checkpoint,
C.checkpoint_name,
C.created_by AS author,
C.measured_by_atum_agent,
LC.id_checkpoint,
LC.checkpoint_name,
LC.created_by AS author,
LC.measured_by_atum_agent,
md.measure_name,
md.measured_columns,
M.measurement_value,
C.process_start_time AS checkpoint_start_time,
C.process_end_time AS checkpoint_end_time
LC.process_start_time AS checkpoint_start_time,
LC.process_end_time AS checkpoint_end_time,
_has_more AS has_more
FROM
runs.checkpoints C
JOIN
runs.measurements M ON C.id_checkpoint = M.fk_checkpoint
JOIN
limited_checkpoints LC
INNER JOIN
runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint
INNER JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
WHERE
C.fk_partitioning = _fk_partitioning
AND
(i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY
C.process_start_time,
C.id_checkpoint
LIMIT nullif(i_limit, 0);

LC.id_checkpoint, LC.process_start_time;
END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner;

ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner;
Loading

0 comments on commit a9734a2

Please sign in to comment.