Skip to content

Commit

Permalink
Added better logs for metrics step intervals. (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgabani8 authored Sep 27, 2024
1 parent 0294c4c commit 571e4c0
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dataclasses
import datetime
import inspect
import json
import logging
import time
import typing as t
Expand Down Expand Up @@ -141,11 +142,11 @@ def process(self, element):
try:
if len(element) == 0:
raise ValueError("time_dict not found.")
(_, asset_start_time), time_dict = element
(asset_id, asset_start_time), time_dict = element
if not isinstance(time_dict, OrderedDict):
raise ValueError("time_dict not found.")

uri = time_dict.pop("uri", _)
uri = time_dict.pop("uri")

# Time for a file to get ingested into EE from when it appeared in bucket.
# When the pipeline is in batch mode, it will be from when the file
Expand All @@ -171,13 +172,15 @@ def process(self, element):
time_dict.move_to_end("FileInit", last=False)

# Logging time taken by each step...
for (current_step, current_time), (next_step, next_time) in zip(
time_dict.items(), list(time_dict.items())[1:]
):
step_time = round(next_time - current_time)
logger.info(
f"{uri}: Time from {current_step} -> {next_step}: {step_time} seconds."
)
step_intervals = {
f"{current_step} -> {next_step}": round(next_time - current_time)
for (current_step, current_time), (next_step, next_time)
in zip(time_dict.items(), list(time_dict.items())[1:])
}
logger.info(
f"Step intervals for {uri}:{asset_id} :: {json.dumps(step_intervals, indent=4)}"
)

yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000))
except Exception as e:
logger.warning(
Expand Down

0 comments on commit 571e4c0

Please sign in to comment.