Skip to content

Commit

Permalink
Minor refactoring changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dup05 committed Mar 14, 2024
1 parent 0c38e9e commit 78cace3
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 25 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dlp-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: Test INSPECT, DEID & REID

on:
# pull_request:
# types: [ opened, reopened, synchronize ]
pull_request:
types: [ opened, reopened, synchronize ]
schedule:
- cron: '30 9 * * *'
workflow_dispatch:
Expand Down
32 changes: 11 additions & 21 deletions .github/workflows/scripts/fetchJobMetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
TABLE_NAME = "load_test_metrics"
BQ_DATASET_ID = "load_test_report"

class LoadTest:

class MetricsUtil:
def __init__(self, test_id, project_id, job_id, test_details, test_name):
self.test_uuid = test_id
self.project_id = project_id
Expand All @@ -24,7 +25,7 @@ def __init__(self, test_id, project_id, job_id, test_details, test_name):
def set_job_details(self):
client = dataflow_v1beta3.JobsV1Beta3Client()
request = dataflow_v1beta3.GetJobRequest(project_id=self.project_id,
job_id=self.job_id)
job_id=self.job_id)
response = client.get_job(request=request)
return response

Expand All @@ -43,7 +44,6 @@ def get_job_metrics(self):
metrics_map = {}

for metric in job_metrics.metrics:
# print(metric.name.name)
if metric.name.name in VALID_METRICS:
metric_name = metric.name.name
if "tentative" not in metric.name.context:
Expand Down Expand Up @@ -75,31 +75,28 @@ def get_elapsed_time(self):
# Initialize request argument(s)
request = monitoring_v3.ListTimeSeriesRequest(
name=project_name,
filter="metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND metric.labels.job_id=\"{}\"".format(self.job_id),
filter="metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND metric.labels.job_id=\"{}\"".format(
self.job_id),
view="FULL",
interval=interval
# Add aggregation parametes
# https://github.com/GoogleCloudPlatform/dataflow-metrics-exporter/blob/main/src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MonitoringClient.java#L140
)
page_result = client.list_time_series(request=request)
elapsed_time = 0
for response in page_result:
for point in response.points:
# print(point.value.int64_value)
elapsed_time = max(elapsed_time, point.value.int64_value)

return elapsed_time

def get_job_success_status(self,job_metrics):
def get_job_success_status(self, job_metrics):
if job_metrics["numberOfRowsRead"] == job_metrics["numberOfRowDeidentified"]:
return "SUCCESS"
elif job_metrics["numberOfRowDeidentified"] == 0:
return "FAILURE"
else:
return "PARTIAL_SUCCESS"


def write_data_to_bigquery(self,row):
def write_data_to_bigquery(self, row):
client = bigquery.Client()
table_ref = client.dataset(BQ_DATASET_ID, project=project_id).table(TABLE_NAME)
table = client.get_table(table_ref)
Expand All @@ -114,14 +111,8 @@ def get_job_type(self):
return "STREAMING"
return "NONE"


def get_monitoring_dashboard(self):
# client = monitoring_dashboard_v1.DashboardsServiceClient()
# request = monitoring_dashboard_v1.GetDashboardRequest(
# name="projects/175500764928/dashboards/f291443e-c268-4b79-8875-b3258a66bea4",
# ) response = client.get_dashboard(request=request)
# print(response)
print(type(self.dataflow_job_details.create_time),self.dataflow_job_details.create_time)
print(type(self.dataflow_job_details.create_time), self.dataflow_job_details.create_time)
print(self.dataflow_job_details.create_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
print("Current time", datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
start_time = self.dataflow_job_details.create_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
Expand All @@ -134,7 +125,6 @@ def get_monitoring_dashboard(self):
print(monitoring_dashboard_url)
return monitoring_dashboard_url


def prepare_metrics_data(self, metrics):

test_run_data = {
Expand Down Expand Up @@ -163,13 +153,13 @@ def prepare_metrics_data(self, metrics):
test_name = sys.argv[4]
test_details = json.loads(sys.argv[5])

test_job_object = LoadTest(
metrics_util = MetricsUtil(
test_id, project_id, job_id, test_details, test_name
)

job_metrics = test_job_object.get_job_metrics()
job_metrics = metrics_util.get_job_metrics()

test_job_object.write_data_to_bigquery(test_job_object.prepare_metrics_data(job_metrics))
metrics_util.write_data_to_bigquery(metrics_util.prepare_metrics_data(job_metrics))

except Exception as e:
print(e)
2 changes: 0 additions & 2 deletions .github/workflows/scripts/publishTestReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ def get_job_details(job_details):

def fetch_test_details(project_id, test_id):
client = bigquery.Client()
table_ref = client.dataset(BQ_DATASET_ID, project=project_id).table(TABLE_NAME)
table = client.get_table(table_ref)
query = """
SELECT *
FROM `{0}.{1}.{2}`
Expand Down

0 comments on commit 78cace3

Please sign in to comment.