Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get flow checkpoints endpoint v2: 233 #264

Merged
merged 52 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4cef220
implementing db function
TebaleloS Sep 4, 2024
4efee52
Refactoring newly added code
TebaleloS Sep 5, 2024
a152006
Adding test cases
TebaleloS Sep 17, 2024
c8dc197
Adding test more cases
TebaleloS Sep 17, 2024
7bfc99e
Finishing the detFlowITest
TebaleloS Sep 17, 2024
4e243bb
Finishing the detFlowITest
TebaleloS Sep 17, 2024
03383a7
change limit
TebaleloS Sep 17, 2024
045c639
Saving changes
TebaleloS Sep 17, 2024
bcb0729
Implementing endpoint
TebaleloS Sep 18, 2024
86f6b99
Adding implicits import and adding the route
TebaleloS Sep 19, 2024
ac06e3d
Merge branch 'master' into feature/#233-getFlowCheckpointsEndpointV2
TebaleloS Sep 19, 2024
3e12bc0
fixing route
TebaleloS Sep 20, 2024
6c11ae8
Fixing file name
TebaleloS Sep 20, 2024
6d970a3
Addressing git comment
TebaleloS Sep 20, 2024
f470f30
Fixing FileNames
TebaleloS Sep 20, 2024
b5eedd9
Merge branch 'master' into feature/#233-getFlowCheckpointsEndpointV2
TebaleloS Sep 20, 2024
35e3434
Fixing testCases
TebaleloS Sep 20, 2024
78f5df8
Fixing some repo and controller unit tests
TebaleloS Sep 20, 2024
ae211a3
Adding endpoint to the list
TebaleloS Sep 20, 2024
5cc90b5
Fix endpoint test
TebaleloS Sep 23, 2024
11f8382
Adding license
TebaleloS Sep 25, 2024
a02929b
Fixing Endpoints Test
TebaleloS Sep 25, 2024
c5ec653
Removing old version code
TebaleloS Sep 25, 2024
a9a341e
Fixing filename check
TebaleloS Sep 25, 2024
c40bb00
Resolving flyway conflicts
TebaleloS Sep 26, 2024
45c5613
Fixing Integr-Test case
TebaleloS Sep 26, 2024
a698af6
Fixing Integr-Test case
TebaleloS Sep 26, 2024
03898a0
Fixing Integr-Test case
TebaleloS Sep 26, 2024
1432927
Applying GH comments in Endpoints
TebaleloS Sep 27, 2024
ba9a42f
Removed implicits declarations from GetFlowCheckpointsV2
TebaleloS Sep 27, 2024
038edc9
Removed unnecessary default on parameters
TebaleloS Sep 27, 2024
4e7092b
Fixed operation definition
TebaleloS Sep 27, 2024
c9af133
Fixing parameter to flowID
TebaleloS Sep 27, 2024
5f52ef5
Fixing parameter to flowID
TebaleloS Sep 27, 2024
9a662b7
Adding comment to the function
TebaleloS Sep 30, 2024
bd74ffa
Re-implement based on agreed logic.
TebaleloS Sep 30, 2024
ea26ced
Merge branch 'master' into feature/#233-getFlowCheckpointsEndpointV2
TebaleloS Sep 30, 2024
a4b1302
Refactoring some class names and merge resolving conflicts
TebaleloS Sep 30, 2024
ae861a5
Fixing checkpoint tests
TebaleloS Sep 30, 2024
b90d844
Fixing some tests
TebaleloS Sep 30, 2024
924dcd6
Renaming sql function
TebaleloS Sep 30, 2024
c776515
Renaming sql function
TebaleloS Sep 30, 2024
6a5d27e
Fixed order
TebaleloS Sep 30, 2024
9ca353a
Fixing status text
TebaleloS Sep 30, 2024
1cd9084
Merge branch 'master' into feature/#233-getFlowCheckpointsEndpointV2
TebaleloS Sep 30, 2024
a0a2e9d
Fixing format
TebaleloS Sep 30, 2024
e5c3710
Fixing test cases
TebaleloS Oct 2, 2024
1fdfa51
Merge branch 'master' into feature/#233-getFlowCheckpointsEndpointV2
TebaleloS Oct 2, 2024
8966e5f
Fixing parameter order
TebaleloS Oct 2, 2024
98075cf
Update database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoin…
TebaleloS Oct 3, 2024
a489ea4
Update database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoin…
TebaleloS Oct 3, 2024
ef4fb90
Applying GitHub comments
TebaleloS Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 78 additions & 47 deletions database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_flow_id BIGINT,
IN i_checkpoints_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
Expand All @@ -28,12 +29,13 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE,
OUT has_more BOOLEAN
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--
-- Function: flows.get_flow_checkpoints(3)
-- Function: flows.get_flow_checkpoints(4)
-- Retrieves all checkpoints (measures and their measurement details) related to a primary flow
-- associated with the input partitioning.
--
Expand All @@ -46,13 +48,13 @@ $$
-- Parameters:
-- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints
-- that will be retrieved
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL
-- i_offset - (optional) offset for checkpoints pagination
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
--
-- Note: checkpoint name uniqueness is not enforced by the data model, so there can be multiple different
-- checkpoints with the same name in the DB, i.e. multiple checkpoints can be retrieved even when
-- specifying `i_checkpoint_name` parameter
-- Note: i_checkpoint_limit and i_offset are used for pagination purposes;
-- checkpoints are ordered by process_start_time in descending order
-- and then by id_checkpoint in ascending order
--
-- Returns:
-- status - Status code
Expand All @@ -65,54 +67,83 @@ $$
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
-- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

-- 11 - OK
-- 42 - Flow not found
---------------------------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
_fk_flow BIGINT;
_has_more BOOLEAN;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning_of_flow);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'Partitioning not found';
-- Check if the flow exists by querying the partitioning_to_flow table.
-- Rationale:
-- This table is preferred over the flows table because:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the comment 👍

-- 1. Every flow has at least one record in partitioning_to_flow.
-- 2. This table is used in subsequent queries, providing a caching advantage.
-- 3. Improves performance by reducing the need to query the flows table directly.
PERFORM 1 FROM flows.partitioning_to_flow WHERE fk_flow = i_flow_id;
IF NOT FOUND THEN
status := 42;
status_text := 'Flow not found';
RETURN NEXT;
RETURN;
END IF;

SELECT id_flow
FROM flows.flows
WHERE fk_primary_partitioning = _fk_partitioning
INTO _fk_flow;
-- Determine if there are more checkpoints than the limit
IF i_checkpoints_limit IS NOT NULL THEN
SELECT count(*) > i_checkpoints_limit
FROM runs.checkpoints C
JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning
WHERE PF.fk_flow = i_flow_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
LIMIT i_checkpoints_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;

-- Retrieve the checkpoints and their associated measurements
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
FROM flows.partitioning_to_flow AS PF
JOIN runs.checkpoints AS CP
ON PF.fk_partitioning = CP.fk_partitioning
JOIN runs.measurements AS M
ON CP.id_checkpoint = M.fk_checkpoint
JOIN runs.measure_definitions AS MD
ON M.fk_measure_definition = MD.id_measure_definition
WHERE PF.fk_flow = _fk_flow
AND (i_checkpoint_name IS NULL OR CP.checkpoint_name = i_checkpoint_name)
ORDER BY CP.process_start_time,
CP.id_checkpoint
LIMIT nullif(i_limit, 0); -- NULL means no limit will be applied, it's same as LIMIT ALL

WITH limited_checkpoints AS (
SELECT C.id_checkpoint,
C.checkpoint_name,
C.created_by,
C.measured_by_atum_agent,
C.process_start_time,
C.process_end_time
FROM runs.checkpoints C
JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning
WHERE PF.fk_flow = i_flow_id
AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY C.id_checkpoint, C.process_start_time
LIMIT i_checkpoints_limit OFFSET i_offset
)
SELECT
11 AS status,
'OK' AS status_text,
LC.id_checkpoint,
LC.checkpoint_name,
LC.created_by AS author,
LC.measured_by_atum_agent,
MD.measure_name,
MD.measured_columns,
M.measurement_value,
LC.process_start_time AS checkpoint_start_time,
LC.process_end_time AS checkpoint_end_time,
_has_more AS has_more
FROM
limited_checkpoints LC
INNER JOIN
runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint
INNER JOIN
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
ORDER BY
LC.id_checkpoint, LC.process_start_time;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(JSONB, INT, TEXT) TO atum_owner;
GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner;
Loading
Loading