From 78cace37dc8eabb51490cc1ea8e8da4aa0fbd621 Mon Sep 17 00:00:00 2001 From: dup05 Date: Thu, 14 Mar 2024 12:14:04 +0530 Subject: [PATCH] Minor refactoring changes --- .github/workflows/dlp-pipelines.yml | 4 +-- .github/workflows/scripts/fetchJobMetrics.py | 32 +++++++------------ .../workflows/scripts/publishTestReport.py | 2 -- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/.github/workflows/dlp-pipelines.yml b/.github/workflows/dlp-pipelines.yml index e646f1ec..9036aaeb 100644 --- a/.github/workflows/dlp-pipelines.yml +++ b/.github/workflows/dlp-pipelines.yml @@ -1,8 +1,8 @@ name: Test INSPECT, DEID & REID on: -# pull_request: -# types: [ opened, reopened, synchronize ] + pull_request: + types: [ opened, reopened, synchronize ] schedule: - cron: '30 9 * * *' workflow_dispatch: diff --git a/.github/workflows/scripts/fetchJobMetrics.py b/.github/workflows/scripts/fetchJobMetrics.py index b168bce9..ffc6a4d4 100644 --- a/.github/workflows/scripts/fetchJobMetrics.py +++ b/.github/workflows/scripts/fetchJobMetrics.py @@ -12,7 +12,8 @@ TABLE_NAME = "load_test_metrics" BQ_DATASET_ID = "load_test_report" -class LoadTest: + +class MetricsUtil: def __init__(self, test_id, project_id, job_id, test_details, test_name): self.test_uuid = test_id self.project_id = project_id @@ -24,7 +25,7 @@ def __init__(self, test_id, project_id, job_id, test_details, test_name): def set_job_details(self): client = dataflow_v1beta3.JobsV1Beta3Client() request = dataflow_v1beta3.GetJobRequest(project_id=self.project_id, - job_id=self.job_id) + job_id=self.job_id) response = client.get_job(request=request) return response @@ -43,7 +44,6 @@ def get_job_metrics(self): metrics_map = {} for metric in job_metrics.metrics: - # print(metric.name.name) if metric.name.name in VALID_METRICS: metric_name = metric.name.name if "tentative" not in metric.name.context: @@ -75,22 +75,20 @@ def get_elapsed_time(self): # Initialize request argument(s) request = monitoring_v3.ListTimeSeriesRequest( name=project_name, - filter="metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND metric.labels.job_id=\"{}\"".format(self.job_id), + filter="metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND metric.labels.job_id=\"{}\"".format( + self.job_id), view="FULL", interval=interval - # Add aggregation parametes - # https://github.com/GoogleCloudPlatform/dataflow-metrics-exporter/blob/main/src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MonitoringClient.java#L140 ) page_result = client.list_time_series(request=request) elapsed_time = 0 for response in page_result: for point in response.points: - # print(point.value.int64_value) elapsed_time = max(elapsed_time, point.value.int64_value) return elapsed_time - def get_job_success_status(self,job_metrics): + def get_job_success_status(self, job_metrics): if job_metrics["numberOfRowsRead"] == job_metrics["numberOfRowDeidentified"]: return "SUCCESS" elif job_metrics["numberOfRowDeidentified"] == 0: @@ -98,8 +96,7 @@ def get_job_success_status(self,job_metrics): else: return "PARTIAL_SUCCESS" - - def write_data_to_bigquery(self,row): + def write_data_to_bigquery(self, row): client = bigquery.Client() table_ref = client.dataset(BQ_DATASET_ID, project=project_id).table(TABLE_NAME) table = client.get_table(table_ref) @@ -114,14 +111,8 @@ def get_job_type(self): return "STREAMING" return "NONE" - def get_monitoring_dashboard(self): - # client = monitoring_dashboard_v1.DashboardsServiceClient() - # request = monitoring_dashboard_v1.GetDashboardRequest( - # name="projects/175500764928/dashboards/f291443e-c268-4b79-8875-b3258a66bea4", - # ) response = client.get_dashboard(request=request) - # print(response) - print(type(self.dataflow_job_details.create_time),self.dataflow_job_details.create_time) + print(type(self.dataflow_job_details.create_time), self.dataflow_job_details.create_time) print(self.dataflow_job_details.create_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) print("Current time", datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')) start_time = self.dataflow_job_details.create_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') @@ -134,7 +125,6 @@ def get_monitoring_dashboard(self): print(monitoring_dashboard_url) return monitoring_dashboard_url - def prepare_metrics_data(self, metrics): test_run_data = { @@ -163,13 +153,13 @@ def prepare_metrics_data(self, metrics): test_name = sys.argv[4] test_details = json.loads(sys.argv[5]) - test_job_object = LoadTest( + metrics_util = MetricsUtil( test_id, project_id, job_id, test_details, test_name ) - job_metrics = test_job_object.get_job_metrics() + job_metrics = metrics_util.get_job_metrics() - test_job_object.write_data_to_bigquery(test_job_object.prepare_metrics_data(job_metrics)) + metrics_util.write_data_to_bigquery(metrics_util.prepare_metrics_data(job_metrics)) except Exception as e: print(e) diff --git a/.github/workflows/scripts/publishTestReport.py b/.github/workflows/scripts/publishTestReport.py index 87571dcd..2fa9c66e 100644 --- a/.github/workflows/scripts/publishTestReport.py +++ b/.github/workflows/scripts/publishTestReport.py @@ -17,8 +17,6 @@ def get_job_details(job_details): def fetch_test_details(project_id, test_id): client = bigquery.Client() - table_ref = client.dataset(BQ_DATASET_ID, project=project_id).table(TABLE_NAME) - table = client.get_table(table_ref) query = """ SELECT * FROM `{0}.{1}.{2}`