Skip to content

Commit

Permalink
Merge branch 'master' into feature/268-get-partitionings
Browse files Browse the repository at this point in the history
# Conflicts:
#	server/src/main/scala/za/co/absa/atum/server/Main.scala
#	server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala
#	server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala
#	server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala
#	server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala
#	server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala
#	server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala
#	server/src/test/scala/za/co/absa/atum/server/api/TestData.scala
#	server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala
#	server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala
#	server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala
  • Loading branch information
salamonpavel committed Sep 27, 2024
2 parents 7fb7475 + db4a867 commit aa7e55e
Show file tree
Hide file tree
Showing 35 changed files with 1,458 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows(
IN i_fk_partitioning BIGINT,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_flow BIGINT
OUT status_text TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION flows.get_flow_partitionings(
IN i_flow_id BIGINT,
IN i_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT partitioning JSONB,
OUT author TEXT,
OUT has_more BOOLEAN
) RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: flows.get_flow_partitionings(3)
-- Retrieves all partitionings associated with the input flow.
--
-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table
--
-- Parameters:
-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved
-- i_limit - (optional) maximum number of partitionings to return, default is 5
-- i_offset - (optional) offset to use for pagination, default is 0
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id - ID of retrieved partitioning
-- partitioning - Partitioning value
-- author - Author of the partitioning
-- has_more - Flag indicating if there are more partitionings available
--
-- Status codes:
-- 11 - OK
-- 41 - Flow not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_has_more BOOLEAN;
BEGIN
PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Flow not found';
RETURN NEXT;
RETURN;
END IF;

IF i_limit IS NOT NULL THEN
SELECT count(*) > i_limit
FROM flows.partitioning_to_flow PTF
WHERE PTF.fk_flow = i_flow_id
LIMIT i_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;


RETURN QUERY
SELECT
11 AS status,
'OK' AS status_text,
P.id_partitioning,
P.partitioning,
P.created_by,
_has_more
FROM
runs.partitionings P INNER JOIN
flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning
WHERE
PF.fk_flow = i_flow_id
ORDER BY
P.id_partitioning,
P.created_at DESC
LIMIT i_limit OFFSET i_offset;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner;
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* limitations under the License.
*/

-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
IN i_partitioning JSONB,
IN i_limit INT DEFAULT 5,
IN i_partitioning_id BIGINT,
IN i_checkpoints_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
Expand All @@ -28,86 +28,106 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE,
OUT has_more BOOLEAN
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
-- Retrieves all checkpoints (measures and their measurement details) related to a
-- Function: runs.get_partitioning_checkpoints(4)
-- Retrieves checkpoints (measures and their measurement details) related to a
-- given partitioning (and checkpoint name, if specified).
--
-- Parameters:
-- i_partitioning - partitioning of requested checkpoints
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoints_limit - (optional) maximum number of checkpoints to return
-- i_offset - (optional) offset of the first checkpoint to return
-- i_checkpoint_name - (optional) name of the checkpoint

-- Note: i_checkpoints_limit and i_offset are used for pagination purposes;
-- checkpoints are ordered by process_start_time in descending order
-- and then by id_checkpoint in ascending order
--
-- Returns:
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- author - Author of the checkpoint
-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
-- has_more - Flag indicating whether there are more checkpoints available
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_fk_partitioning BIGINT;
_has_more BOOLEAN;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

IF i_checkpoints_limit IS NOT NULL THEN
SELECT count(*) > i_checkpoints_limit
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
LIMIT i_checkpoints_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;

RETURN QUERY
WITH limited_checkpoints AS (
SELECT C.id_checkpoint,
C.checkpoint_name,
C.created_by,
C.measured_by_atum_agent,
C.process_start_time,
C.process_end_time
FROM runs.checkpoints C
WHERE C.fk_partitioning = i_partitioning_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY C.id_checkpoint, C.process_start_time
LIMIT i_checkpoints_limit OFFSET i_offset
)
SELECT
11 AS status,
'Ok' AS status_text,
C.id_checkpoint,
C.checkpoint_name,
C.created_by AS author,
C.measured_by_atum_agent,
LC.id_checkpoint,
LC.checkpoint_name,
LC.created_by AS author,
LC.measured_by_atum_agent,
md.measure_name,
md.measured_columns,
M.measurement_value,
C.process_start_time AS checkpoint_start_time,
C.process_end_time AS checkpoint_end_time
LC.process_start_time AS checkpoint_start_time,
LC.process_end_time AS checkpoint_end_time,
_has_more AS has_more
FROM
runs.checkpoints C
JOIN
runs.measurements M ON C.id_checkpoint = M.fk_checkpoint
JOIN
limited_checkpoints LC
INNER JOIN
runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint
INNER JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
WHERE
C.fk_partitioning = _fk_partitioning
AND
(i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY
C.process_start_time,
C.id_checkpoint
LIMIT nullif(i_limit, 0);

LC.id_checkpoint, LC.process_start_time;
END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner;

ALTER FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner;
Loading

0 comments on commit aa7e55e

Please sign in to comment.