Skip to content

Commit

Permalink
get flow checkpoints endpoint v2: 233 (#264)
Browse files Browse the repository at this point in the history
* implementing db function
* Adding implicits import and adding the route

---------

Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com>
  • Loading branch information
TebaleloS and benedeki authored Oct 3, 2024
1 parent 0b97d6f commit 59d8e4d
Show file tree
Hide file tree
Showing 22 changed files with 682 additions and 231 deletions.
125 changes: 78 additions & 47 deletions database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_flow_id BIGINT,
IN i_checkpoints_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
Expand All @@ -28,12 +29,13 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE,
OUT has_more BOOLEAN
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--
-- Function: flows.get_flow_checkpoints(3)
-- Function: flows.get_flow_checkpoints(4)
-- Retrieves all checkpoints (measures and their measurement details) related to a primary flow
-- associated with the input partitioning.
--
Expand All @@ -46,13 +48,13 @@ $$
-- Parameters:
-- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints
-- that will be retrieved
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL
-- i_offset - (optional) offset for checkpoints pagination
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
--
-- Note: checkpoint name uniqueness is not enforced by the data model, so there can be multiple different
-- checkpoints with the same name in the DB, i.e. multiple checkpoints can be retrieved even when
-- specifying `i_checkpoint_name` parameter
-- Note: i_checkpoint_limit and i_offset are used for pagination purposes;
-- checkpoints are ordered by process_start_time in descending order
-- and then by id_checkpoint in ascending order
--
-- Returns:
-- status - Status code
Expand All @@ -65,54 +67,83 @@ $$
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
-- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

-- 11 - OK
-- 42 - Flow not found
---------------------------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
_fk_flow BIGINT;
_has_more BOOLEAN;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning_of_flow);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'Partitioning not found';
-- Check if the flow exists by querying the partitioning_to_flow table.
-- Rationale:
-- This table is preferred over the flows table because:
-- 1. Every flow has at least one record in partitioning_to_flow.
-- 2. This table is used in subsequent queries, providing a caching advantage.
-- 3. Improves performance by reducing the need to query the flows table directly.
PERFORM 1 FROM flows.partitioning_to_flow WHERE fk_flow = i_flow_id;
IF NOT FOUND THEN
status := 42;
status_text := 'Flow not found';
RETURN NEXT;
RETURN;
END IF;

SELECT id_flow
FROM flows.flows
WHERE fk_primary_partitioning = _fk_partitioning
INTO _fk_flow;
-- Determine if there are more checkpoints than the limit
IF i_checkpoints_limit IS NOT NULL THEN
SELECT count(*) > i_checkpoints_limit
FROM runs.checkpoints C
JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning
WHERE PF.fk_flow = i_flow_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
LIMIT i_checkpoints_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;

-- Retrieve the checkpoints and their associated measurements
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
FROM flows.partitioning_to_flow AS PF
JOIN runs.checkpoints AS CP
ON PF.fk_partitioning = CP.fk_partitioning
JOIN runs.measurements AS M
ON CP.id_checkpoint = M.fk_checkpoint
JOIN runs.measure_definitions AS MD
ON M.fk_measure_definition = MD.id_measure_definition
WHERE PF.fk_flow = _fk_flow
AND (i_checkpoint_name IS NULL OR CP.checkpoint_name = i_checkpoint_name)
ORDER BY CP.process_start_time,
CP.id_checkpoint
LIMIT nullif(i_limit, 0); -- NULL means no limit will be applied, it's same as LIMIT ALL

WITH limited_checkpoints AS (
SELECT C.id_checkpoint,
C.checkpoint_name,
C.created_by,
C.measured_by_atum_agent,
C.process_start_time,
C.process_end_time
FROM runs.checkpoints C
JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning
WHERE PF.fk_flow = i_flow_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY C.id_checkpoint, C.process_start_time
LIMIT i_checkpoints_limit OFFSET i_offset
)
SELECT
11 AS status,
'OK' AS status_text,
LC.id_checkpoint,
LC.checkpoint_name,
LC.created_by AS author,
LC.measured_by_atum_agent,
MD.measure_name,
MD.measured_columns,
M.measurement_value,
LC.process_start_time AS checkpoint_start_time,
LC.process_end_time AS checkpoint_end_time,
_has_more AS has_more
FROM
limited_checkpoints LC
INNER JOIN
runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint
INNER JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
ORDER BY
LC.id_checkpoint, LC.process_start_time;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(JSONB, INT, TEXT) TO atum_owner;
GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner;
Loading

0 comments on commit 59d8e4d

Please sign in to comment.