Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nagworld9 committed Sep 19, 2024
1 parent 979d617 commit efd2fb4
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 46 deletions.
58 changes: 33 additions & 25 deletions azurelinuxagent/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,19 @@ def is_period_elapsed(self, delta, h):
(self.periodic_events[h] + delta) <= datetime.now()

def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0,
version=str(CURRENT_VERSION), message="", log_event=True, force=False, immediate_flush=False):
version=str(CURRENT_VERSION), message="", log_event=True, force=False):
h = hash(name + op + ustr(is_success) + message)

if force or self.is_period_elapsed(delta, h):
self.add_event(name, op=op, is_success=is_success, duration=duration,
version=version, message=message, log_event=log_event, immediate_flush=immediate_flush)
version=version, message=message, log_event=log_event)
self.periodic_events[h] = datetime.now()

def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, immediate_flush=False):
message="", log_event=True, flush=False):
"""
:param flush: Flush the event immediately to the wire server
"""

if (not is_success) and log_event:
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -509,9 +512,9 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
self.add_common_event_parameters(event, datetime.utcnow())

self.flush_or_save_event(event, message, immediate_flush)
self.report_or_save_event(event, message, flush)

def add_log_event(self, level, message, immediate_flush=False):
def add_log_event(self, level, message):
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level]))
Expand All @@ -520,9 +523,9 @@ def add_log_event(self, level, message, immediate_flush=False):
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
self.add_common_event_parameters(event, datetime.utcnow())

self.flush_or_save_event(event, message, immediate_flush)
self.report_or_save_event(event, message)

def add_metric(self, category, counter, instance, value, log_event=False, immediate_flush=False):
def add_metric(self, category, counter, instance, value, log_event=False):
"""
Create and save an event which contains a telemetry event.
Expand All @@ -544,15 +547,16 @@ def add_metric(self, category, counter, instance, value, log_event=False, immedi
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
self.add_common_event_parameters(event, datetime.utcnow())

self.flush_or_save_event(event, message, immediate_flush)
self.report_or_save_event(event, message)

def flush_or_save_event(self, event, message, immediate_flush):
def report_or_save_event(self, event, message, flush=False):
"""
Flush the event to wireserver if immediate_flush to set to true, else
Flush the event to wireserver if flush to set to true, else
save it disk if we fail to send or not required to flush immediately.
TODO: pickup as many events as possible and send them in one go.
"""
report_success = False
if immediate_flush and self.protocol is not None:
if flush and self.protocol is not None:
report_success = self.protocol.report_event([event])
if not report_success:
logger.error("Failed to send event: '{0}' directly to Wireserver. So, agent will save it to disk for periodic flush.", message)
Expand Down Expand Up @@ -644,32 +648,34 @@ def elapsed_milliseconds(utc_start):
(d.microseconds / 1000.0))


def report_event(op, is_success=True, message='', log_event=True, immediate_flush=False):
def report_event(op, is_success=True, message='', log_event=True, flush=False):
"""
:param flush: if true, flush the event immediately to the wire server
"""
add_event(AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op,
log_event=log_event, immediate_flush=immediate_flush)
log_event=log_event, flush=flush)


def report_periodic(delta, op, is_success=True, message='', immediate_flush=False):
def report_periodic(delta, op, is_success=True, message=''):
add_periodic(delta, AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op, immediate_flush=immediate_flush)
op=op)


def report_metric(category, counter, instance, value, log_event=False, immediate_flush=False, reporter=__event_logger__):
def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__):
"""
Send a telemetry event reporting a single instance of a performance counter.
:param str category: The category of the metric (cpu, memory, etc)
:param str counter: The name of the metric ("%idle", etc)
:param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core#
:param value: The value of the metric
:param bool log_event: If True, log the metric in the agent log as well
:param bool immediate_flush: If True, flush the event to wireserver immediately
:param EventLogger reporter: The EventLogger instance to which metric events should be sent
"""
if reporter.event_dir is None:
Expand All @@ -678,7 +684,7 @@ def report_metric(category, counter, instance, value, log_event=False, immediate
_log_event(AGENT_NAME, "METRIC", message, 0)
return
try:
reporter.add_metric(category, counter, instance, float(value), log_event, immediate_flush)
reporter.add_metric(category, counter, instance, float(value), log_event)
except ValueError:
logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - "
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))
Expand All @@ -691,7 +697,10 @@ def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, repo


def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, immediate_flush=False, reporter=__event_logger__):
message="", log_event=True, flush=False, reporter=__event_logger__):
"""
:param flush: if true, flush the event immediately to the wire server
"""
if reporter.event_dir is None:
logger.warn("Cannot add event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -701,16 +710,15 @@ def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, d
mark_event_status(name, version, op, is_success)
reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message,
log_event=log_event, immediate_flush=immediate_flush)
log_event=log_event, flush=flush)


def add_log_event(level, message, forced=False, immediate_flush=False, reporter=__event_logger__):
def add_log_event(level, message, forced=False, reporter=__event_logger__):
"""
:param level: LoggerLevel of the log event
:param message: Message
:param forced: Force write the event even if send_logs_to_telemetry() is disabled
(NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
:param immediate_flush: Flush the event immediately
:param reporter: The EventLogger instance to which metric events should be sent
:return:
"""
Expand All @@ -721,18 +729,18 @@ def add_log_event(level, message, forced=False, immediate_flush=False, reporter=
return

if level >= logger.LogLevel.WARNING:
reporter.add_log_event(level, message, immediate_flush)
reporter.add_log_event(level, message)


def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, force=False, immediate_flush=False, reporter=__event_logger__):
message="", log_event=True, force=False, reporter=__event_logger__):
if reporter.event_dir is None:
logger.warn("Cannot add periodic event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
return

reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message, log_event=log_event, force=force, immediate_flush=immediate_flush)
message=message, log_event=log_event, force=force)


def mark_event_status(name, version, op, status):
Expand Down
20 changes: 12 additions & 8 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@

_DOWNLOAD_TIMEOUT = timedelta(minutes=5)

# telemetrydata api max calls per 15 secs
# telemetrydata api max calls 15 per 15 secs
# Considered conservative approach and set the value to 12
TELEMETRY_MAX_CALLS_PER_INTERVAL = 12
# telemetrydata api interval 15 secs
TELEMETRY_INTERVAL = 15 # 15 seconds

# The maximum number of times to retry sending telemetry data
TELEMETRY_MAX_RETRIES = 3
TELEMETRY_DELAY = 1 # 1 second
TELEMETRY_RETRY_DELAY = 1 # 1 second


class UploadError(HttpError):
Expand Down Expand Up @@ -1043,7 +1044,7 @@ def report_health(self, status, substatus, description):
u",{0}: {1}").format(resp.status,
resp.read()))

def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
def _send_encoded_event(self, provider_id, event_str, encoding='utf8'):
"""
Construct the encoded event and url for telemetry endpoint call
Before calling telemetry endpoint, ensure calls are under throttling limits and checks if the number of telemetry endpoint calls 12 in the last 15 seconds
Expand Down Expand Up @@ -1099,7 +1100,7 @@ def can_make_wireserver_call():
def report_event(self, events_iterator):
"""
Report events to the wire server. The events are grouped and sent out in batches well with in the api body limit.
Note: Max body size is 64kb and throttling limit is 15 calls in 15 seconds
When talking to wire server, make sure to take account of telemetry endpoint limits: Max body size is 64kb and throttling limit is 15 calls in 15 seconds
"""
buf = {}
debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT)
Expand All @@ -1111,18 +1112,21 @@ def _send_event(provider_id, debug_info):
try:
while True:
try:
self.send_encoded_event(provider_id, buf[provider_id])
self._send_encoded_event(provider_id, buf[provider_id])
break
except Exception:
except Exception as ex:
# we should not retry on unicode errors as it's permanent error
if isinstance(ex, UnicodeError):
raise
error_count += 1
if error_count >= TELEMETRY_MAX_RETRIES:
raise
time.sleep(TELEMETRY_DELAY)
time.sleep(TELEMETRY_RETRY_DELAY)
except UnicodeError as uni_error:
debug_info.update_unicode_error(uni_error)
except Exception as error:
debug_info.update_op_error(error)

#
# Group events by providerId
for event in events_iterator:
try:
Expand Down
9 changes: 6 additions & 3 deletions azurelinuxagent/ga/collect_telemetry_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def _read_event_file(event_file_path):
:return: Event data in list or string format.
"""
# Retry for reading the event file in case file is modified while reading
# We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading
error_count = 0
while True:
try:
Expand All @@ -270,7 +271,7 @@ def _read_event_file(event_file_path):

# Parse the string and get the list of events
return json.loads(event_data)
except Exception:
except (ValueError, FileNotFoundError):
error_count += 1
if error_count >= NUM_OF_EVENT_FILE_RETRIES:
raise
Expand Down Expand Up @@ -478,7 +479,8 @@ def process_events(self):
# Todo: We should delete files after ensuring that we sent the data to Wireserver successfully
# from our end rather than deleting first and sending later. This is to ensure the data reliability
# of the agent telemetry pipeline.
os.remove(event_file_path)
if os.path.exists(event_file_path):
os.remove(event_file_path)
except ServiceStoppedError as stopped_error:
logger.error(
"Unable to enqueue events as service stopped: {0}, skipping events collection".format(
Expand All @@ -498,13 +500,14 @@ def _read_and_parse_event_file(event_file_path):
:return: TelemetryEvent object.
"""
# Retry for reading the event file in case file is modified while reading
# We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading
error_count = 0
while True:
try:
with open(event_file_path, "rb") as event_fd:
event_data = event_fd.read().decode("utf-8")
return parse_event(event_data)
except Exception:
except (ValueError, FileNotFoundError):
error_count += 1
if error_count >= NUM_OF_EVENT_FILE_RETRIES:
raise
Expand Down
10 changes: 5 additions & 5 deletions tests/common/protocol/test_wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def test_send_encoded_event(self, mock_http_request, *args):

event_str = u'a test string'
client = WireProtocol(WIRESERVER_URL).client
client.send_encoded_event("foo", event_str.encode('utf-8'))
client._send_encoded_event("foo", event_str.encode('utf-8'))

first_call = mock_http_request.call_args_list[0]
args, kwargs = first_call
Expand All @@ -433,7 +433,7 @@ def test_send_encoded_event(self, mock_http_request, *args):
# the body is encoded, decode and check for equality
self.assertIn(event_str, body_received.decode('utf-8'))

@patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event")
@patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event")
def test_report_event_small_event(self, patch_send_event, *args): # pylint: disable=unused-argument
event_list = []
client = WireProtocol(WIRESERVER_URL).client
Expand All @@ -455,7 +455,7 @@ def test_report_event_small_event(self, patch_send_event, *args): # pylint: dis
# It merges the messages into one message
self.assertEqual(patch_send_event.call_count, 1)

@patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event")
@patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event")
def test_report_event_multiple_events_to_fill_buffer(self, patch_send_event, *args): # pylint: disable=unused-argument
event_list = []
client = WireProtocol(WIRESERVER_URL).client
Expand All @@ -469,7 +469,7 @@ def test_report_event_multiple_events_to_fill_buffer(self, patch_send_event, *ar
# It merges the messages into one message
self.assertEqual(patch_send_event.call_count, 2)

@patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event")
@patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event")
def test_report_event_large_event(self, patch_send_event, *args): # pylint: disable=unused-argument
event_list = []
event_str = random_generator(2 ** 18)
Expand Down Expand Up @@ -522,7 +522,7 @@ def test_report_event_should_only_attempt_max_retries_if_fails_to_send(self, moc
event_list.append(get_event(message=event_str))
client = WireProtocol(WIRESERVER_URL).client
with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_MAX_RETRIES", 5):
with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_DELAY", 0.1):
with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_RETRY_DELAY", 0.1):
client.report_event(self._get_telemetry_events_generator(event_list))
self.assertEqual(mock_http_request.call_count, 5)

Expand Down
Loading

0 comments on commit efd2fb4

Please sign in to comment.