diff --git a/azurelinuxagent/common/event.py b/azurelinuxagent/common/event.py index 44f0c694b..358806f9e 100644 --- a/azurelinuxagent/common/event.py +++ b/azurelinuxagent/common/event.py @@ -486,16 +486,19 @@ def is_period_elapsed(self, delta, h): (self.periodic_events[h] + delta) <= datetime.now() def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, - version=str(CURRENT_VERSION), message="", log_event=True, force=False, immediate_flush=False): + version=str(CURRENT_VERSION), message="", log_event=True, force=False): h = hash(name + op + ustr(is_success) + message) if force or self.is_period_elapsed(delta, h): self.add_event(name, op=op, is_success=is_success, duration=duration, - version=version, message=message, log_event=log_event, immediate_flush=immediate_flush) + version=version, message=message, log_event=log_event) self.periodic_events[h] = datetime.now() def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), - message="", log_event=True, immediate_flush=False): + message="", log_event=True, flush=False): + """ + :param flush: Flush the event immediately to the wire server + """ if (not is_success) and log_event: _log_event(name, op, message, duration, is_success=is_success) @@ -509,9 +512,9 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration))) self.add_common_event_parameters(event, datetime.utcnow()) - self.flush_or_save_event(event, message, immediate_flush) + self.report_or_save_event(event, message, flush) - def add_log_event(self, level, message, immediate_flush=False): + def add_log_event(self, level, message): event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID) event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log)) event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level])) @@ -520,9 +523,9 @@ def add_log_event(self, level, message, immediate_flush=False): event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, '')) self.add_common_event_parameters(event, datetime.utcnow()) - self.flush_or_save_event(event, message, immediate_flush) + self.report_or_save_event(event, message) - def add_metric(self, category, counter, instance, value, log_event=False, immediate_flush=False): + def add_metric(self, category, counter, instance, value, log_event=False): """ Create and save an event which contains a telemetry event. @@ -544,15 +547,16 @@ def add_metric(self, category, counter, instance, value, log_event=False, immedi event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value))) self.add_common_event_parameters(event, datetime.utcnow()) - self.flush_or_save_event(event, message, immediate_flush) + self.report_or_save_event(event, message) - def flush_or_save_event(self, event, message, immediate_flush): + def report_or_save_event(self, event, message, flush=False): """ - Flush the event to wireserver if immediate_flush to set to true, else + Flush the event to wireserver if flush to set to true, else save it disk if we fail to send or not required to flush immediately. + TODO: pickup as many events as possible and send them in one go. """ report_success = False - if immediate_flush and self.protocol is not None: + if flush and self.protocol is not None: report_success = self.protocol.report_event([event]) if not report_success: logger.error("Failed to send event: '{0}' directly to Wireserver. So, agent will save it to disk for periodic flush.", message) @@ -644,24 +648,27 @@ def elapsed_milliseconds(utc_start): (d.microseconds / 1000.0)) -def report_event(op, is_success=True, message='', log_event=True, immediate_flush=False): +def report_event(op, is_success=True, message='', log_event=True, flush=False): + """ + :param flush: if true, flush the event immediately to the wire server + """ add_event(AGENT_NAME, version=str(CURRENT_VERSION), is_success=is_success, message=message, op=op, - log_event=log_event, immediate_flush=immediate_flush) + log_event=log_event, flush=flush) -def report_periodic(delta, op, is_success=True, message='', immediate_flush=False): +def report_periodic(delta, op, is_success=True, message=''): add_periodic(delta, AGENT_NAME, version=str(CURRENT_VERSION), is_success=is_success, message=message, - op=op, immediate_flush=immediate_flush) + op=op) -def report_metric(category, counter, instance, value, log_event=False, immediate_flush=False, reporter=__event_logger__): +def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__): """ Send a telemetry event reporting a single instance of a performance counter. :param str category: The category of the metric (cpu, memory, etc) @@ -669,7 +676,6 @@ def report_metric(category, counter, instance, value, log_event=False, immediate :param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core# :param value: The value of the metric :param bool log_event: If True, log the metric in the agent log as well - :param bool immediate_flush: If True, flush the event to wireserver immediately :param EventLogger reporter: The EventLogger instance to which metric events should be sent """ if reporter.event_dir is None: @@ -678,7 +684,7 @@ def report_metric(category, counter, instance, value, log_event=False, immediate _log_event(AGENT_NAME, "METRIC", message, 0) return try: - reporter.add_metric(category, counter, instance, float(value), log_event, immediate_flush) + reporter.add_metric(category, counter, instance, float(value), log_event) except ValueError: logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - " "{0}/{1} [{2}] = {3}".format(category, counter, instance, value)) @@ -691,7 +697,10 @@ def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, repo def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), - message="", log_event=True, immediate_flush=False, reporter=__event_logger__): + message="", log_event=True, flush=False, reporter=__event_logger__): + """ + :param flush: if true, flush the event immediately to the wire server + """ if reporter.event_dir is None: logger.warn("Cannot add event -- Event reporter is not initialized.") _log_event(name, op, message, duration, is_success=is_success) @@ -701,16 +710,15 @@ def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, d mark_event_status(name, version, op, is_success) reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version), message=message, - log_event=log_event, immediate_flush=immediate_flush) + log_event=log_event, flush=flush) -def add_log_event(level, message, forced=False, immediate_flush=False, reporter=__event_logger__): +def add_log_event(level, message, forced=False, reporter=__event_logger__): """ :param level: LoggerLevel of the log event :param message: Message :param forced: Force write the event even if send_logs_to_telemetry() is disabled (NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events) - :param immediate_flush: Flush the event immediately :param reporter: The EventLogger instance to which metric events should be sent :return: """ @@ -721,18 +729,18 @@ def add_log_event(level, message, forced=False, immediate_flush=False, reporter= return if level >= logger.LogLevel.WARNING: - reporter.add_log_event(level, message, immediate_flush) + reporter.add_log_event(level, message) def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), - message="", log_event=True, force=False, immediate_flush=False, reporter=__event_logger__): + message="", log_event=True, force=False, reporter=__event_logger__): if reporter.event_dir is None: logger.warn("Cannot add periodic event -- Event reporter is not initialized.") _log_event(name, op, message, duration, is_success=is_success) return reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version), - message=message, log_event=log_event, force=force, immediate_flush=immediate_flush) + message=message, log_event=log_event, force=force) def mark_event_status(name, version, op, status): diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py index c9400701a..c4300134d 100644 --- a/azurelinuxagent/common/protocol/wire.py +++ b/azurelinuxagent/common/protocol/wire.py @@ -63,14 +63,15 @@ _DOWNLOAD_TIMEOUT = timedelta(minutes=5) -# telemetrydata api max calls per 15 secs +# telemetrydata api max calls 15 per 15 secs # Considered conservative approach and set the value to 12 TELEMETRY_MAX_CALLS_PER_INTERVAL = 12 +# telemetrydata api interval 15 secs TELEMETRY_INTERVAL = 15 # 15 seconds # The maximum number of times to retry sending telemetry data TELEMETRY_MAX_RETRIES = 3 -TELEMETRY_DELAY = 1 # 1 second +TELEMETRY_RETRY_DELAY = 1 # 1 second class UploadError(HttpError): @@ -1043,7 +1044,7 @@ def report_health(self, status, substatus, description): u",{0}: {1}").format(resp.status, resp.read())) - def send_encoded_event(self, provider_id, event_str, encoding='utf8'): + def _send_encoded_event(self, provider_id, event_str, encoding='utf8'): """ Construct the encoded event and url for telemetry endpoint call Before calling telemetry endpoint, ensure calls are under throttling limits and checks if the number of telemetry endpoint calls 12 in the last 15 seconds @@ -1099,7 +1100,7 @@ def can_make_wireserver_call(): def report_event(self, events_iterator): """ Report events to the wire server. The events are grouped and sent out in batches well with in the api body limit. - Note: Max body size is 64kb and throttling limit is 15 calls in 15 seconds + When talking to wire server, make sure to take account of telemetry endpoint limits: Max body size is 64kb and throttling limit is 15 calls in 15 seconds """ buf = {} debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT) @@ -1111,18 +1112,21 @@ def _send_event(provider_id, debug_info): try: while True: try: - self.send_encoded_event(provider_id, buf[provider_id]) + self._send_encoded_event(provider_id, buf[provider_id]) break - except Exception: + except Exception as ex: + # we should not retry on unicode errors as it's permanent error + if isinstance(ex, UnicodeError): + raise error_count += 1 if error_count >= TELEMETRY_MAX_RETRIES: raise - time.sleep(TELEMETRY_DELAY) + time.sleep(TELEMETRY_RETRY_DELAY) except UnicodeError as uni_error: debug_info.update_unicode_error(uni_error) except Exception as error: debug_info.update_op_error(error) - + # # Group events by providerId for event in events_iterator: try: diff --git a/azurelinuxagent/ga/collect_telemetry_events.py b/azurelinuxagent/ga/collect_telemetry_events.py index 153250bbc..9e667cdef 100644 --- a/azurelinuxagent/ga/collect_telemetry_events.py +++ b/azurelinuxagent/ga/collect_telemetry_events.py @@ -261,6 +261,7 @@ def _read_event_file(event_file_path): :return: Event data in list or string format. """ # Retry for reading the event file in case file is modified while reading + # We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading error_count = 0 while True: try: @@ -270,7 +271,7 @@ def _read_event_file(event_file_path): # Parse the string and get the list of events return json.loads(event_data) - except Exception: + except (ValueError, FileNotFoundError): error_count += 1 if error_count >= NUM_OF_EVENT_FILE_RETRIES: raise @@ -478,7 +479,8 @@ def process_events(self): # Todo: We should delete files after ensuring that we sent the data to Wireserver successfully # from our end rather than deleting first and sending later. This is to ensure the data reliability # of the agent telemetry pipeline. - os.remove(event_file_path) + if os.path.exists(event_file_path): + os.remove(event_file_path) except ServiceStoppedError as stopped_error: logger.error( "Unable to enqueue events as service stopped: {0}, skipping events collection".format( @@ -498,13 +500,14 @@ def _read_and_parse_event_file(event_file_path): :return: TelemetryEvent object. """ # Retry for reading the event file in case file is modified while reading + # We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading error_count = 0 while True: try: with open(event_file_path, "rb") as event_fd: event_data = event_fd.read().decode("utf-8") return parse_event(event_data) - except Exception: + except (ValueError, FileNotFoundError): error_count += 1 if error_count >= NUM_OF_EVENT_FILE_RETRIES: raise diff --git a/tests/common/protocol/test_wire.py b/tests/common/protocol/test_wire.py index 32189301f..a51a21fdc 100644 --- a/tests/common/protocol/test_wire.py +++ b/tests/common/protocol/test_wire.py @@ -421,7 +421,7 @@ def test_send_encoded_event(self, mock_http_request, *args): event_str = u'a test string' client = WireProtocol(WIRESERVER_URL).client - client.send_encoded_event("foo", event_str.encode('utf-8')) + client._send_encoded_event("foo", event_str.encode('utf-8')) first_call = mock_http_request.call_args_list[0] args, kwargs = first_call @@ -433,7 +433,7 @@ def test_send_encoded_event(self, mock_http_request, *args): # the body is encoded, decode and check for equality self.assertIn(event_str, body_received.decode('utf-8')) - @patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event") + @patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event") def test_report_event_small_event(self, patch_send_event, *args): # pylint: disable=unused-argument event_list = [] client = WireProtocol(WIRESERVER_URL).client @@ -455,7 +455,7 @@ def test_report_event_small_event(self, patch_send_event, *args): # pylint: dis # It merges the messages into one message self.assertEqual(patch_send_event.call_count, 1) - @patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event") + @patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event") def test_report_event_multiple_events_to_fill_buffer(self, patch_send_event, *args): # pylint: disable=unused-argument event_list = [] client = WireProtocol(WIRESERVER_URL).client @@ -469,7 +469,7 @@ def test_report_event_multiple_events_to_fill_buffer(self, patch_send_event, *ar # It merges the messages into one message self.assertEqual(patch_send_event.call_count, 2) - @patch("azurelinuxagent.common.protocol.wire.WireClient.send_encoded_event") + @patch("azurelinuxagent.common.protocol.wire.WireClient._send_encoded_event") def test_report_event_large_event(self, patch_send_event, *args): # pylint: disable=unused-argument event_list = [] event_str = random_generator(2 ** 18) @@ -522,7 +522,7 @@ def test_report_event_should_only_attempt_max_retries_if_fails_to_send(self, moc event_list.append(get_event(message=event_str)) client = WireProtocol(WIRESERVER_URL).client with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_MAX_RETRIES", 5): - with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_DELAY", 0.1): + with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_RETRY_DELAY", 0.1): client.report_event(self._get_telemetry_events_generator(event_list)) self.assertEqual(mock_http_request.call_count, 5) diff --git a/tests/common/test_event.py b/tests/common/test_event.py index 2e0b8be25..10dc1afd2 100644 --- a/tests/common/test_event.py +++ b/tests/common/test_event.py @@ -359,7 +359,7 @@ def test_periodic_forwards_args(self, mock_event): event.add_periodic(logger.EVERY_DAY, "FauxEvent", op=WALAEventOperation.Log, is_success=True, duration=0, version=str(CURRENT_VERSION), message="FauxEventMessage", log_event=True, force=False) mock_event.assert_called_once_with("FauxEvent", op=WALAEventOperation.Log, is_success=True, duration=0, - version=str(CURRENT_VERSION), message="FauxEventMessage", log_event=True, immediate_flush=False) + version=str(CURRENT_VERSION), message="FauxEventMessage", log_event=True) @patch("azurelinuxagent.common.event.datetime") @patch('azurelinuxagent.common.event.EventLogger.add_event') @@ -367,14 +367,14 @@ def test_periodic_forwards_args_default_values(self, mock_event, mock_datetime): event.__event_logger__.reset_periodic() event.add_periodic(logger.EVERY_DAY, "FauxEvent", message="FauxEventMessage") mock_event.assert_called_once_with("FauxEvent", op=WALAEventOperation.Unknown, is_success=True, duration=0, - version=str(CURRENT_VERSION), message="FauxEventMessage", log_event=True, immediate_flush=False) + version=str(CURRENT_VERSION), message="FauxEventMessage", log_event=True) @patch("azurelinuxagent.common.event.EventLogger.add_event") def test_add_event_default_variables(self, mock_add_event): add_event('test', message='test event') mock_add_event.assert_called_once_with('test', duration=0, is_success=True, log_event=True, message='test event', op=WALAEventOperation.Unknown, - version=str(CURRENT_VERSION), immediate_flush=False) + version=str(CURRENT_VERSION), flush=False) def test_collect_events_should_delete_event_files(self): add_event(name='Event1', op=TestEvent._Operation) @@ -409,7 +409,7 @@ def http_post_handler(url, body, **__): with mock_wire_protocol(wire_protocol_data.DATA_FILE, http_post_handler=http_post_handler): expected_message = 'test event' - add_event('test', message=expected_message, op=TestEvent._Operation, immediate_flush=True) + add_event('test', message=expected_message, op=TestEvent._Operation, flush=True) event_message = self._get_event_message_from_http_request_body(http_post_handler.request_body) @@ -428,7 +428,7 @@ def http_post_handler(url, **__): with mock_wire_protocol(wire_protocol_data.DATA_FILE, http_post_handler=http_post_handler): with patch("azurelinuxagent.common.protocol.wire.TELEMETRY_MAX_RETRIES", 1): expected_message = 'test event' - add_event('test', message=expected_message, op=TestEvent._Operation, immediate_flush=True) + add_event('test', message=expected_message, op=TestEvent._Operation, flush=True) # In case of failure, the event file should be created self.assertTrue(len(self._collect_event_files()) == 1)