diff --git a/changelog.md b/changelog.md index c186dd575..bd5730fea 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,34 @@ # Change Log +## v0.1.19 + +### Added +- Added support for populating JIRA fields via fields in the match +- Added support for using a TLS certificate file for SMTP connections +- Allow a custom suffix for non-analyzed Elasticsearch fields, like ".raw" or ".keyword" +- Added match_time to Elastalert alert documents in Elasticsearch + +### Fixed +- Fixed an error in the documentation for rule importing +- Prevent enhancements from re-running on retried alerts +- Fixed a bug when using custom timestamp formats and new term rule +- Lowered jira_bump_after_inactivity default to 0 days + +## v0.1.18 + +### Added +- Added a new alerter "post" based on "simple" which makes POSTS JSON to HTTP endpoints +- Added an option jira_bump_after_inacitivty to prevent ElastAlert commenting on active JIRA tickets + +### Removed +- Removed "simple" alerter, replaced by "post" + +## v0.1.17 + +### Added +- Added a --patience flag to allow Elastalert to wait for Elasticsearch to become available +- Allow custom PagerDuty alert titles via alert_subject + ## v0.1.16 ### Fixed diff --git a/docs/source/elastalert.rst b/docs/source/elastalert.rst index 0321ca58d..1a9631eb3 100755 --- a/docs/source/elastalert.rst +++ b/docs/source/elastalert.rst @@ -179,6 +179,9 @@ The environment variable ``AWS_DEFAULT_PROFILE`` will override this field. ``replace_dots_in_field_names``: If ``True``, ElastAlert replaces any dots in field names with an underscore before writing documents to Elasticsearch. The default value is ``False``. Elasticsearch 2.0 - 2.3 does not support dots in field names. +``string_multi_field_name``: If set, the suffix to use for the subfield for string multi-fields in Elasticsearch. +The default value is ``.raw`` for Elasticsearch 2 and ``.keyword`` for Elasticsearch 5. + .. _runningelastalert: Running ElastAlert diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index 146dc3388..f9713828b 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -221,7 +221,8 @@ import ``import``: If specified includes all the settings from this yaml file. This allows common config options to be shared. Note that imported files that aren't complete rules should not have a ``.yml`` or ``.yaml`` suffix so that ElastAlert doesn't treat them as rules. Filters in imported files are merged (ANDed) -with any filters in the rule. (Optional, string, no default) +with any filters in the rule. You can only have one import per rule, though the imported file can import another file, recursively. The filename +can be an absolute path or relative to the rules directory. (Optional, string, no default) use_ssl ^^^^^^^ @@ -1081,7 +1082,7 @@ or - email - jira -E-mail subject or JIRA issue summary can also be customized by adding an ``alert_subject`` that contains a custom summary. +E-mail subjects, JIRA issue summaries, and PagerDuty alerts can also be customized by adding an ``alert_subject`` that contains a custom summary. It can be further formatted using standard Python formatting syntax:: alert_subject: "Issue {0} occurred at {1}" @@ -1228,6 +1229,10 @@ STARTTLS. ``smtp_auth_file``: The path to a file which contains SMTP authentication credentials. It should be YAML formatted and contain two fields, ``user`` and ``password``. If this is not present, no authentication will be attempted. +``smtp_cert_file``: Connect the SMTP host using the given path to a TLS certificate file, default to ``None``. + +``smtp_key_file``: Connect the SMTP host using the given path to a TLS key file, default to ``None``. + ``email_reply_to``: This sets the Reply-To header in the email. By default, the from address is ElastAlert@ and the domain will be set by the smtp server. @@ -1310,6 +1315,9 @@ Example usage:: jira_bump_in_statuses: - Open +``jira_bump_after_inactivity``: If this is set, ElastAlert will only comment on tickets that have been inactive for at least this many days. +It only applies if ``jira_bump_tickets`` is true. Default is 0 days. + Arbitrary Jira fields: ElastAlert supports setting any arbitrary JIRA field that your jira issue supports. For example, if you had a custom field, called "Affected User", you can set it by providing that field name in ``snake_case`` prefixed with ``jira_``. These fields can contain primitive strings or arrays of strings. Note that when you create a custom field in your JIRA server, internally, the field is represented as ``customfield_1111``. In elastalert, you may refer to either the public facing name OR the internal representation. @@ -1480,12 +1488,16 @@ The alerter requires the following option: Optional: -``pagerduty_incident_key``: If not set pagerduty will trigger a new incident for each alert sent. If set to a unique string per rule pagerduty will identify the incident that this event should be applied. +``alert_subject``: If set, this will be used as the Incident description within PagerDuty. If not set, ElastAlert will default to using the rule name of the alert for the incident. + +``alert_subject_args``: If set, and ``alert_subject`` is a formattable string, ElastAlert will format the incident key based on the provided array of fields from the rule or match. + +``pagerduty_incident_key``: If not set PagerDuty will trigger a new incident for each alert sent. If set to a unique string per rule PagerDuty will identify the incident that this event should be applied. If there's no open (i.e. unresolved) incident with this key, a new one will be created. If there's already an open incident with a matching key, this event will be appended to that incident's log. ``pagerduty_incident_key_args``: If set, and ``pagerduty_incident_key`` is a formattable string, Elastalert will format the incident key based on the provided array of fields from the rule or match. -``pagerduty_proxy``: By default ElastAlert will not use a network proxy to send notifications to Pagerduty. Set this option using ``hostname:port`` if you need to use a proxy. +``pagerduty_proxy``: By default ElastAlert will not use a network proxy to send notifications to PagerDuty. Set this option using ``hostname:port`` if you need to use a proxy. Exotel ~~~~~~ @@ -1614,6 +1626,35 @@ Optional: The stomp_destination field depends on the broker, the /queue/ALERT example is the nomenclature used by ActiveMQ. Each broker has its own logic. +HTTP POST +~~~~~~~~~ + +This alert type will send results to a JSON endpoint using HTTP POST. The key names are configurable so this is compatible with almost any endpoint. By default, the JSON will contain al the items from the match, unless you specify http_post_payload, in which case it will only contain those items. + +Required: + +``http_post_url``: The URL to POST. + +Optional: + +``http_post_payload``: List of keys:values to use as the content of the POST. Example - ip:clientip will map the value from the clientip index of Elasticsearch to JSON key named ip. If not defined, all the Elasticsearch keys will be sent. + +``http_post_static_payload``: Key:value pairs of static parameters to be sent, along with the Elasticsearch results. Put your authentication or other information here. + +``http_post_proxy``: URL of proxy, if required. + +``http_post_all_values``: Boolean of whether or not to include every key value pair from the match in addition to those in http_post_payload and http_post_static_payload. Defaults to True if http_post_payload is not specified, otherwise False. + +Example usage:: + + alert: post + http_post_url: "http://example.com/api" + http_post_payload: + ip: clientip + http_post_static_payload: + apikey: abc123 + + Alerter ~~~~~~~ diff --git a/elastalert/alerts.py b/elastalert/alerts.py index ed2d7e06f..0534d6072 100644 --- a/elastalert/alerts.py +++ b/elastalert/alerts.py @@ -29,6 +29,8 @@ from util import elastalert_logger from util import lookup_es_key from util import pretty_ts +from util import ts_now +from util import ts_to_dt class DateTimeEncoder(json.JSONEncoder): @@ -365,6 +367,8 @@ def __init__(self, *args): self.smtp_port = self.rule.get('smtp_port') if self.rule.get('smtp_auth_file'): self.get_account(self.rule['smtp_auth_file']) + self.smtp_key_file = self.rule.get('smtp_key_file') + self.smtp_cert_file = self.rule.get('smtp_cert_file') # Convert email to a list if it isn't already if isinstance(self.rule['email'], basestring): self.rule['email'] = [self.rule['email']] @@ -415,9 +419,9 @@ def alert(self, matches): try: if self.smtp_ssl: if self.smtp_port: - self.smtp = SMTP_SSL(self.smtp_host, self.smtp_port) + self.smtp = SMTP_SSL(self.smtp_host, self.smtp_port, keyfile=self.smtp_key_file, certfile=self.smtp_cert_file) else: - self.smtp = SMTP_SSL(self.smtp_host) + self.smtp = SMTP_SSL(self.smtp_host, keyfile=self.smtp_key_file, certfile=self.smtp_cert_file) else: if self.smtp_port: self.smtp = SMTP(self.smtp_host, self.smtp_port) @@ -425,7 +429,7 @@ def alert(self, matches): self.smtp = SMTP(self.smtp_host) self.smtp.ehlo() if self.smtp.has_extn('STARTTLS'): - self.smtp.starttls() + self.smtp.starttls(keyfile=self.smtp_key_file, certfile=self.smtp_cert_file) if 'smtp_auth_file' in self.rule: self.smtp.login(self.user, self.password) except (SMTPException, error) as e: @@ -462,6 +466,7 @@ class JiraAlerter(Alerter): known_field_list = [ 'jira_account_file', 'jira_assignee', + 'jira_bump_after_inactivity', 'jira_bump_in_statuses', 'jira_bump_not_in_statuses', 'jira_bump_tickets', @@ -498,6 +503,10 @@ def __init__(self, rule): self.project = self.rule['jira_project'] self.issue_type = self.rule['jira_issuetype'] + # Deferred settings refer to values that can only be resolved when a match + # is found and as such loading them will be delayed until we find a match + self.deferred_settings = [] + # We used to support only a single component. This allows us to maintain backwards compatibility # while also giving the user-facing API a more representative name self.components = self.rule.get('jira_components', self.rule.get('jira_component')) @@ -513,6 +522,7 @@ def __init__(self, rule): self.bump_tickets = self.rule.get('jira_bump_tickets', False) self.bump_not_in_statuses = self.rule.get('jira_bump_not_in_statuses') self.bump_in_statuses = self.rule.get('jira_bump_in_statuses') + self.bump_after_inactivity = self.rule.get('jira_bump_after_inactivity', 0) self.watchers = self.rule.get('jira_watchers') if self.bump_in_statuses and self.bump_not_in_statuses: @@ -560,6 +570,70 @@ def __init__(self, rule): except KeyError: logging.error("Priority %s not found. Valid priorities are %s" % (self.priority, self.priority_ids.keys())) + def set_jira_arg(self, jira_field, value, fields): + # Remove the jira_ part. Convert underscores to spaces + normalized_jira_field = jira_field[5:].replace('_', ' ').lower() + # All jira fields should be found in the 'id' or the 'name' field. Therefore, try both just in case + for identifier in ['name', 'id']: + field = next((f for f in fields if normalized_jira_field == f[identifier].replace('_', ' ').lower()), None) + if field: + break + if not field: + # Log a warning to ElastAlert saying that we couldn't find that type? + # OR raise and fail to load the alert entirely? Probably the latter... + raise Exception("Could not find a definition for the jira field '{0}'".format(normalized_jira_field)) + arg_name = field['id'] + # Check the schema information to decide how to set the value correctly + # If the schema information is not available, raise an exception since we don't know how to set it + # Note this is only the case for two built-in types, id: issuekey and id: thumbnail + if not ('schema' in field or 'type' in field['schema']): + raise Exception("Could not determine schema information for the jira field '{0}'".format(normalized_jira_field)) + arg_type = field['schema']['type'] + + # Handle arrays of simple types like strings or numbers + if arg_type == 'array': + # As a convenience, support the scenario wherein the user only provides + # a single value for a multi-value field e.g. jira_labels: Only_One_Label + if type(value) != list: + value = [value] + array_items = field['schema']['items'] + # Simple string types + if array_items in ['string', 'date', 'datetime']: + # Special case for multi-select custom types (the JIRA metadata says that these are strings, but + # in reality, they are required to be provided as an object. + if 'custom' in field['schema'] and field['schema']['custom'] in self.custom_string_types_with_special_handling: + self.jira_args[arg_name] = [{'value': v} for v in value] + else: + self.jira_args[arg_name] = value + elif array_items == 'number': + self.jira_args[arg_name] = [int(v) for v in value] + # Also attempt to handle arrays of complex types that have to be passed as objects with an identifier 'key' + elif array_items == 'option': + self.jira_args[arg_name] = [{'value': v} for v in value] + else: + # Try setting it as an object, using 'name' as the key + # This may not work, as the key might actually be 'key', 'id', 'value', or something else + # If it works, great! If not, it will manifest itself as an API error that will bubble up + self.jira_args[arg_name] = [{'name': v} for v in value] + # Handle non-array types + else: + # Simple string types + if arg_type in ['string', 'date', 'datetime']: + # Special case for custom types (the JIRA metadata says that these are strings, but + # in reality, they are required to be provided as an object. + if 'custom' in field['schema'] and field['schema']['custom'] in self.custom_string_types_with_special_handling: + self.jira_args[arg_name] = {'value': value} + else: + self.jira_args[arg_name] = value + # Number type + elif arg_type == 'number': + self.jira_args[arg_name] = int(value) + elif arg_type == 'option': + self.jira_args[arg_name] = {'value': value} + # Complex type + else: + self.jira_args[arg_name] = {'name': value} + def get_arbitrary_fields(self): # This API returns metadata about all the fields defined on the jira server (built-ins and custom ones) fields = self.client.fields() @@ -567,69 +641,10 @@ def get_arbitrary_fields(self): # If we find a field that is not covered by the set that we are aware of, it means it is either: # 1. A built-in supported field in JIRA that we don't have on our radar # 2. A custom field that a JIRA admin has configured - if jira_field.startswith('jira_') and jira_field not in self.known_field_list: - # Remove the jira_ part. Convert underscores to spaces - normalized_jira_field = jira_field[5:].replace('_', ' ').lower() - # All jira fields should be found in the 'id' or the 'name' field. Therefore, try both just in case - for identifier in ['name', 'id']: - field = next((f for f in fields if normalized_jira_field == f[identifier].replace('_', ' ').lower()), None) - if field: - break - if not field: - # Log a warning to ElastAlert saying that we couldn't find that type? - # OR raise and fail to load the alert entirely? Probably the latter... - raise Exception("Could not find a definition for the jira field '{0}'".format(normalized_jira_field)) - arg_name = field['id'] - # Check the schema information to decide how to set the value correctly - # If the schema information is not available, raise an exception since we don't know how to set it - # Note this is only the case for two built-in types, id: issuekey and id: thumbnail - if not ('schema' in field or 'type' in field['schema']): - raise Exception("Could not determine schema information for the jira field '{0}'".format(normalized_jira_field)) - arg_type = field['schema']['type'] - - # Handle arrays of simple types like strings or numbers - if arg_type == 'array': - # As a convenience, support the scenario wherein the user only provides - # a single value for a multi-value field e.g. jira_labels: Only_One_Label - if type(value) != list: - value = [value] - array_items = field['schema']['items'] - # Simple string types - if array_items in ['string', 'date', 'datetime']: - # Special case for multi-select custom types (the JIRA metadata says that these are strings, but - # in reality, they are required to be provided as an object. - if 'custom' in field['schema'] and field['schema']['custom'] in self.custom_string_types_with_special_handling: - self.jira_args[arg_name] = [{'value': v} for v in value] - else: - self.jira_args[arg_name] = value - elif array_items == 'number': - self.jira_args[arg_name] = [int(v) for v in value] - # Also attempt to handle arrays of complex types that have to be passed as objects with an identifier 'key' - elif array_items == 'option': - self.jira_args[arg_name] = [{'value': v} for v in value] - else: - # Try setting it as an object, using 'name' as the key - # This may not work, as the key might actually be 'key', 'id', 'value', or something else - # If it works, great! If not, it will manifest itself as an API error that will bubble up - self.jira_args[arg_name] = [{'name': v} for v in value] - # Handle non-array types - else: - # Simple string types - if arg_type in ['string', 'date', 'datetime']: - # Special case for custom types (the JIRA metadata says that these are strings, but - # in reality, they are required to be provided as an object. - if 'custom' in field['schema'] and field['schema']['custom'] in self.custom_string_types_with_special_handling: - self.jira_args[arg_name] = {'value': value} - else: - self.jira_args[arg_name] = value - # Number type - elif arg_type == 'number': - self.jira_args[arg_name] = int(value) - elif arg_type == 'option': - self.jira_args[arg_name] = {'value': value} - # Complex type - else: - self.jira_args[arg_name] = {'name': value} + if jira_field.startswith('jira_') and jira_field not in self.known_field_list and str(value)[:1] != '#': + self.set_jira_arg(jira_field, value, fields) + if jira_field.startswith('jira_') and jira_field not in self.known_field_list and str(value)[:1] == '#': + self.deferred_settings.append(jira_field) def get_priorities(self): """ Creates a mapping of priority index to id. """ @@ -682,11 +697,23 @@ def comment_on_ticket(self, ticket, match): self.client.add_comment(ticket, comment) def alert(self, matches): + if len(self.deferred_settings) > 0: + fields = self.client.fields() + for jira_field in self.deferred_settings: + value = lookup_es_key(matches[0], self.rule[jira_field][1:]) + self.set_jira_arg(jira_field, value, fields) + title = self.create_title(matches) if self.bump_tickets: ticket = self.find_existing_ticket(matches) if ticket: + inactivity_datetime = ts_now() - datetime.timedelta(days=self.bump_after_inactivity) + if ts_to_dt(ticket.fields.updated) >= inactivity_datetime: + if self.pipeline is not None: + self.pipeline['jira_ticket'] = None + self.pipeline['jira_server'] = self.server + return None elastalert_logger.info('Commenting on existing ticket %s' % (ticket.key)) for match in matches: try: @@ -1045,7 +1072,7 @@ def alert(self, matches): headers = {'content-type': 'application/json'} payload = { 'service_key': self.pagerduty_service_key, - 'description': self.rule['name'], + 'description': self.create_title(matches), 'event_type': 'trigger', 'incident_key': self.get_incident_key(matches), 'client': self.pagerduty_client_name, @@ -1323,33 +1350,41 @@ def get_info(self): 'self.servicenow_rest_url': self.servicenow_rest_url} -class SimplePostAlerter(Alerter): +class HTTPPostAlerter(Alerter): + """ Requested elasticsearch indices are sent by HTTP POST. Encoded with JSON. """ + def __init__(self, rule): - super(SimplePostAlerter, self).__init__(rule) - simple_webhook_url = self.rule.get('simple_webhook_url') - if isinstance(simple_webhook_url, basestring): - simple_webhook_url = [simple_webhook_url] - self.simple_webhook_url = simple_webhook_url - self.simple_proxy = self.rule.get('simple_proxy') + super(HTTPPostAlerter, self).__init__(rule) + post_url = self.rule.get('http_post_url') + if isinstance(post_url, basestring): + post_url = [post_url] + self.post_url = post_url + self.post_proxy = self.rule.get('http_post_proxy') + self.post_payload = self.rule.get('http_post_payload', {}) + self.post_static_payload = self.rule.get('http_post_static_payload', {}) + self.post_all_values = self.rule.get('http_post_all_values', not self.post_payload) def alert(self, matches): - payload = { - 'rule': self.rule['name'], - 'matches': matches - } - headers = { - "Content-Type": "application/json", - "Accept": "application/json;charset=utf-8" - } - proxies = {'https': self.simple_proxy} if self.simple_proxy else None - for url in self.simple_webhook_url: - try: - response = requests.post(url, data=json.dumps(payload, cls=DateTimeEncoder), headers=headers, proxies=proxies) - response.raise_for_status() - except RequestException as e: - raise EAException("Error posting simple alert: %s" % e) - elastalert_logger.info("Simple alert sent") + """ Each match will trigger a POST to the specified endpoint(s). """ + for match in matches: + payload = match if self.post_all_values else {} + payload.update(self.post_static_payload) + for post_key, es_key in self.post_payload.items(): + payload[post_key] = lookup_es_key(match, es_key) + headers = { + "Content-Type": "application/json", + "Accept": "application/json;charset=utf-8" + } + proxies = {'https': self.post_proxy} if self.post_proxy else None + for url in self.post_url: + try: + response = requests.post(url, data=json.dumps(payload, cls=DateTimeEncoder), + headers=headers, proxies=proxies) + response.raise_for_status() + except RequestException as e: + raise EAException("Error posting HTTP Post alert: %s" % e) + elastalert_logger.info("HTTP Post alert sent.") def get_info(self): - return {'type': 'simple', - 'simple_webhook_url': self.simple_webhook_url} + return {'type': 'http_post', + 'http_post_webhook_url': self.post_url} diff --git a/elastalert/config.py b/elastalert/config.py index 0c6336642..137b3fc59 100644 --- a/elastalert/config.py +++ b/elastalert/config.py @@ -72,7 +72,7 @@ 'telegram': alerts.TelegramAlerter, 'gitter': alerts.GitterAlerter, 'servicenow': alerts.ServiceNowAlerter, - 'simple': alerts.SimplePostAlerter + 'post': alerts.HTTPPostAlerter } # A partial ordering of alert types. Relative order will be preserved in the resulting alerts list # For example, jira goes before email so the ticket # will be added to the resulting email. @@ -146,6 +146,7 @@ def load_options(rule, conf, filename, args=None): :param rule: A dictionary of parsed YAML from a rule config file. :param conf: The global configuration dictionary, used for populating defaults. """ + adjust_deprecated_values(rule) try: rule_schema.validate(rule) @@ -466,3 +467,14 @@ def get_rule_hashes(conf, use_rule=None): with open(rule_file) as fh: rule_mod_times[rule_file] = hashlib.sha1(fh.read()).digest() return rule_mod_times + + +def adjust_deprecated_values(rule): + # From rename of simple HTTP alerter + if rule.get('type') == 'simple': + rule['type'] = 'post' + if 'simple_proxy' in rule: + rule['http_post_proxy'] = rule['simple_proxy'] + if 'simple_webhook_url' in rule: + rule['http_post_url'] = rule['simple_webhook_url'] + logging.warning('"simple" alerter has been renamed "post" and comptability may be removed in a future release.') diff --git a/elastalert/create_index.py b/elastalert/create_index.py index 84dc18002..44275e9a3 100644 --- a/elastalert/create_index.py +++ b/elastalert/create_index.py @@ -111,6 +111,7 @@ def main(): es_mapping = {'elastalert': {'properties': {'rule_name': {'index': 'not_analyzed', 'type': 'string'}, '@timestamp': {'format': 'dateOptionalTime', 'type': 'date'}, 'alert_time': {'format': 'dateOptionalTime', 'type': 'date'}, + 'match_time': {'format': 'dateOptionalTime', 'type': 'date'}, 'match_body': {'enabled': False, 'type': 'object'}, 'aggregate_id': {'index': 'not_analyzed', 'type': 'string'}}}} past_mapping = {'past_elastalert': {'properties': {'rule_name': {'index': 'not_analyzed', 'type': 'string'}, diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index f73482dff..47f6b9d19 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import argparse import copy import datetime import json @@ -7,13 +8,13 @@ import signal import sys import time +import timeit import traceback from email.mime.text import MIMEText from smtplib import SMTP from smtplib import SMTPException from socket import error -import argparse import dateutil.tz import kibana import yaml @@ -22,6 +23,7 @@ from config import load_configuration from config import load_rules from croniter import croniter +from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import ElasticsearchException from elasticsearch.exceptions import TransportError from enhancements import DropMatchException @@ -35,6 +37,8 @@ from util import elasticsearch_client from util import format_index from util import lookup_es_key +from util import parse_deadline +from util import parse_duration from util import pretty_ts from util import replace_dots_in_field_names from util import seconds @@ -75,6 +79,11 @@ def parse_args(self, args): 'Use "NOW" to start from current time. (Default: present)') parser.add_argument('--end', dest='end', help='YYYY-MM-DDTHH:MM:SS Query to this timestamp. (Default: present)') parser.add_argument('--verbose', action='store_true', dest='verbose', help='Increase verbosity without suppressing alerts') + parser.add_argument('--patience', action='store', dest='timeout', + type=parse_duration, + default=datetime.timedelta(), + help='Maximum time to wait for ElasticSearch to become responsive. Usage: ' + '--patience =. e.g. --patience minutes=5') parser.add_argument( '--pin_rules', action='store_true', @@ -133,9 +142,10 @@ def __init__(self, args): self.starttime = self.args.start self.disabled_rules = [] self.replace_dots_in_field_names = self.conf.get('replace_dots_in_field_names', False) + self.string_multi_field_name = self.conf.get('string_multi_field_name', False) self.writeback_es = elasticsearch_client(self.conf) - self.es_version = self.get_version() + self._es_version = None remove = [] for rule in self.rules: @@ -150,6 +160,12 @@ def get_version(self): info = self.writeback_es.info() return info['version']['number'] + @property + def es_version(self): + if self._es_version is None: + self._es_version = self.get_version() + return self._es_version + def is_five(self): return self.es_version.startswith('5') @@ -841,11 +857,16 @@ def init_rule(self, new_rule, new=True): # Change top_count_keys to .raw if 'top_count_keys' in new_rule and new_rule.get('raw_count_keys', True): - keys = new_rule.get('top_count_keys') - if self.is_five(): - new_rule['top_count_keys'] = [key + '.keyword' if not key.endswith('.keyword') else key for key in keys] + if self.string_multi_field_name: + string_multi_field_name = self.string_multi_field_name + elif self.is_five(): + string_multi_field_name = '.keyword' else: - new_rule['top_count_keys'] = [key + '.raw' if not key.endswith('.raw') else key for key in keys] + string_multi_field_name = '.raw' + + for i, key in enumerate(new_rule['top_count_keys']): + if not key.endswith(string_multi_field_name): + new_rule['top_count_keys'][i] += string_multi_field_name if 'download_dashboard' in new_rule['filter']: # Download filters from Kibana and set the rules filters to them @@ -973,6 +994,7 @@ def start(self): except (TypeError, ValueError): self.handle_error("%s is not a valid ISO8601 timestamp (YYYY-MM-DDTHH:MM:SS+XX:00)" % (self.starttime)) exit(1) + self.wait_until_responsive(timeout=self.args.timeout) self.running = True elastalert_logger.info("Starting up") while self.running: @@ -994,6 +1016,40 @@ def start(self): sleep_duration = total_seconds(next_run - datetime.datetime.utcnow()) self.sleep_for(sleep_duration) + def wait_until_responsive(self, timeout, clock=timeit.default_timer): + """Wait until ElasticSearch becomes responsive (or too much time passes).""" + + # Elapsed time is a floating point number of seconds. + timeout = timeout.total_seconds() + + # Don't poll unless we're asked to. + if timeout <= 0.0: + return + + # Periodically poll ElasticSearch. Keep going until ElasticSearch is + # responsive *and* the writeback index exists. + ref = clock() + while (clock() - ref) < timeout: + try: + if self.writeback_es.indices.exists(self.writeback_index): + return + except ConnectionError: + pass + time.sleep(1.0) + + if self.writeback_es.ping(): + logging.error( + 'Writeback index "%s" does not exist, did you run `elastalert-create-index`?', + self.writeback_index, + ) + else: + logging.error( + 'Could not reach ElasticSearch at "%s:%d".', + self.conf['es_host'], + self.conf['es_port'], + ) + exit(1) + def run_all_rules(self): """ Run each rule one time """ self.send_pending_alerts() @@ -1175,14 +1231,14 @@ def filters_from_kibana(self, rule, db_name): return None return filters - def alert(self, matches, rule, alert_time=None): + def alert(self, matches, rule, alert_time=None, retried=False): """ Wraps alerting, Kibana linking and enhancements in an exception handler """ try: - return self.send_alert(matches, rule, alert_time=alert_time) + return self.send_alert(matches, rule, alert_time=alert_time, retried=retried) except Exception as e: self.handle_uncaught_exception(e, rule) - def send_alert(self, matches, rule, alert_time=None): + def send_alert(self, matches, rule, alert_time=None, retried=False): """ Send out an alert. :param matches: A list of matches. @@ -1225,7 +1281,10 @@ def send_alert(self, matches, rule, alert_time=None): if kb_link: matches[0]['kibana_link'] = kb_link - if not rule.get('run_enhancements_first'): + # Enhancements were already run at match time if + # run_enhancements_first is set or + # retried==True, which means this is a retry of a failed alert + if not rule.get('run_enhancements_first') and not retried: for enhancement in rule['match_enhancements']: valid_matches = [] for match in matches: @@ -1275,12 +1334,19 @@ def send_alert(self, matches, rule, alert_time=None): agg_id = res['_id'] def get_alert_body(self, match, rule, alert_sent, alert_time, alert_exception=None): - body = {'match_body': match} - body['rule_name'] = rule['name'] + body = { + 'match_body': match, + 'rule_name': rule['name'], + 'alert_info': rule['alert'][0].get_info(), + 'alert_sent': alert_sent, + 'alert_time': alert_time + } + + match_time = lookup_es_key(match, rule['timestamp_field']) + if match_time is not None: + body['match_time'] = match_time + # TODO record info about multiple alerts - body['alert_info'] = rule['alert'][0].get_info() - body['alert_sent'] = alert_sent - body['alert_time'] = alert_time # If the alert failed to send, record the exception if not alert_sent: @@ -1373,7 +1439,11 @@ def send_pending_alerts(self): matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches] self.alert(matches, rule, alert_time=alert_time) else: - self.alert([match_body], rule, alert_time=alert_time) + # If this rule isn't using aggregation, this must be a retry of a failed alert + retried = False + if 'aggregation' not in rule: + retried = True + self.alert([match_body], rule, alert_time=alert_time, retried=retried) if rule['current_aggregate_id']: for qk, agg_id in rule['current_aggregate_id'].iteritems(): @@ -1543,10 +1613,7 @@ def silence(self, silence_cache_key=None): silence_cache_key = self.rules[0]['name'] + "._silence" try: - unit, num = self.args.silence.split('=') - silence_time = datetime.timedelta(**{unit: int(num)}) - # Double conversion to add tzinfo - silence_ts = ts_to_dt(dt_to_ts(silence_time + datetime.datetime.utcnow())) + silence_ts = parse_deadline(self.args.silence) except (ValueError, TypeError): logging.error('%s is not a valid time period' % (self.args.silence)) exit(1) diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index 172806de5..880dda2d8 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -604,7 +604,7 @@ def get_all_terms(self, args): tmp_start = start tmp_end = min(start + step, end) - time_filter = {self.rules['timestamp_field']: {'lt': dt_to_ts(tmp_end), 'gte': dt_to_ts(tmp_start)}} + time_filter = {self.rules['timestamp_field']: {'lt': self.rules['dt_to_ts'](tmp_end), 'gte': self.rules['dt_to_ts'](tmp_start)}} query_template['filter'] = {'bool': {'must': [{'range': time_filter}]}} query = {'aggs': {'filtered': query_template}} # For composite keys, we will need to perform sub-aggregations @@ -650,7 +650,8 @@ def get_all_terms(self, args): break tmp_start = tmp_end tmp_end = min(tmp_start + step, end) - time_filter[self.rules['timestamp_field']] = {'lt': dt_to_ts(tmp_end), 'gte': dt_to_ts(tmp_start)} + time_filter[self.rules['timestamp_field']] = {'lt': self.rules['dt_to_ts'](tmp_end), + 'gte': self.rules['dt_to_ts'](tmp_start)} for key, values in self.seen_values.iteritems(): if not values: diff --git a/elastalert/util.py b/elastalert/util.py index 8c5abe585..b609dd1c9 100644 --- a/elastalert/util.py +++ b/elastalert/util.py @@ -237,7 +237,7 @@ def dt_to_unix(dt): def dt_to_unixms(dt): - return dt_to_unix(dt) * 1000 + return int(dt_to_unix(dt) * 1000) def cronite_datetime_to_timestamp(self, d): @@ -342,3 +342,15 @@ def build_es_conn_config(conf): parsed_conf['es_url_prefix'] = conf['es_url_prefix'] return parsed_conf + + +def parse_duration(value): + """Convert ``unit=num`` spec into a ``timedelta`` object.""" + unit, num = value.split('=') + return datetime.timedelta(**{unit: int(num)}) + + +def parse_deadline(value): + """Convert ``unit=num`` spec into a ``datetime`` object.""" + duration = parse_duration(value) + return ts_now() + duration diff --git a/setup.py b/setup.py index 94856e590..e06eeffa2 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ base_dir = os.path.dirname(__file__) setup( name='elastalert', - version='0.1.16', + version='0.1.19', description='Runs custom filters on Elasticsearch and alerts on matches', author='Quentin Long', author_email='qlo@yelp.com', diff --git a/tests/alerts_test.py b/tests/alerts_test.py index ae3e8e012..e8d49e491 100644 --- a/tests/alerts_test.py +++ b/tests/alerts_test.py @@ -12,15 +12,16 @@ from elastalert.alerts import BasicMatchString from elastalert.alerts import CommandAlerter from elastalert.alerts import EmailAlerter +from elastalert.alerts import HTTPPostAlerter from elastalert.alerts import JiraAlerter from elastalert.alerts import JiraFormattedMatchString from elastalert.alerts import MsTeamsAlerter from elastalert.alerts import PagerDutyAlerter -from elastalert.alerts import SimplePostAlerter from elastalert.alerts import SlackAlerter from elastalert.config import load_modules from elastalert.opsgenie import OpsGenieAlerter from elastalert.util import ts_add +from elastalert.util import ts_now class mock_rule: @@ -100,7 +101,7 @@ def test_email(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test'], mock.ANY), mock.call().close()] assert mock_smtp.mock_calls == expected @@ -165,7 +166,7 @@ def test_email_with_unicode_strings(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail(mock.ANY, [u'testing@test.test'], mock.ANY), mock.call().close()] assert mock_smtp.mock_calls == expected @@ -192,7 +193,29 @@ def test_email_with_auth(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), + mock.call().login('someone', 'hunter2'), + mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test'], mock.ANY), + mock.call().close()] + assert mock_smtp.mock_calls == expected + + +def test_email_with_cert_key(): + rule = {'name': 'test alert', 'email': ['testing@test.test', 'test@test.test'], 'from_addr': 'testfrom@test.test', + 'type': mock_rule(), 'timestamp_field': '@timestamp', 'email_reply_to': 'test@example.com', + 'alert_subject': 'Test alert for {0}', 'alert_subject_args': ['test_term'], 'smtp_auth_file': 'file.txt', + 'smtp_cert_file': 'dummy/cert.crt', 'smtp_key_file': 'dummy/client.key'} + with mock.patch('elastalert.alerts.SMTP') as mock_smtp: + with mock.patch('elastalert.alerts.yaml_loader') as mock_open: + mock_open.return_value = {'user': 'someone', 'password': 'hunter2'} + mock_smtp.return_value = mock.Mock() + alert = EmailAlerter(rule) + + alert.alert([{'test_term': 'test_value'}]) + expected = [mock.call('localhost'), + mock.call().ehlo(), + mock.call().has_extn('STARTTLS'), + mock.call().starttls(certfile='dummy/cert.crt', keyfile='dummy/client.key'), mock.call().login('someone', 'hunter2'), mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test'], mock.ANY), mock.call().close()] @@ -211,7 +234,7 @@ def test_email_with_cc(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test', 'tester@testing.testing'], mock.ANY), mock.call().close()] assert mock_smtp.mock_calls == expected @@ -236,7 +259,7 @@ def test_email_with_bcc(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test', 'tester@testing.testing'], mock.ANY), mock.call().close()] assert mock_smtp.mock_calls == expected @@ -261,7 +284,7 @@ def test_email_with_cc_and_bcc(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail( mock.ANY, [ @@ -305,7 +328,7 @@ def test_email_with_args(): expected = [mock.call('localhost'), mock.call().ehlo(), mock.call().has_extn('STARTTLS'), - mock.call().starttls(), + mock.call().starttls(certfile=None, keyfile=None), mock.call().sendmail(mock.ANY, ['testing@test.test', 'test@test.test'], mock.ANY), mock.call().close()] assert mock_smtp.mock_calls == expected @@ -495,6 +518,79 @@ def test_jira(): assert mock_jira.mock_calls == expected + # Only bump after 3d of inactivity + rule['jira_bump_after_inactivity'] = 3 + mock_issue = mock.Mock() + + # Check ticket is bumped if it is updated 4 days ago + mock_issue.fields.updated = str(ts_now() - datetime.timedelta(days=4)) + with nested( + mock.patch('elastalert.alerts.JIRA'), + mock.patch('elastalert.alerts.yaml_loader') + ) as (mock_jira, mock_open): + mock_open.return_value = {'user': 'jirauser', 'password': 'jirapassword'} + mock_jira.return_value = mock.Mock() + mock_jira.return_value.search_issues.return_value = [mock_issue] + mock_jira.return_value.priorities.return_value = [mock_priority] + mock_jira.return_value.fields.return_value = [] + + alert = JiraAlerter(rule) + alert.alert([{'test_term': 'test_value', '@timestamp': '2014-10-31T00:00:00'}]) + # Check add_comment is called + assert len(mock_jira.mock_calls) == 5 + assert '().add_comment' == mock_jira.mock_calls[4][0] + + # Check ticket is bumped is not bumped if ticket is updated right now + mock_issue.fields.updated = str(ts_now()) + with nested( + mock.patch('elastalert.alerts.JIRA'), + mock.patch('elastalert.alerts.yaml_loader') + ) as (mock_jira, mock_open): + mock_open.return_value = {'user': 'jirauser', 'password': 'jirapassword'} + mock_jira.return_value = mock.Mock() + mock_jira.return_value.search_issues.return_value = [mock_issue] + mock_jira.return_value.priorities.return_value = [mock_priority] + mock_jira.return_value.fields.return_value = [] + + alert = JiraAlerter(rule) + alert.alert([{'test_term': 'test_value', '@timestamp': '2014-10-31T00:00:00'}]) + # Only 4 calls for mock_jira since add_comment is not called + assert len(mock_jira.mock_calls) == 4 + + # Test match resolved values + rule = { + 'name': 'test alert', + 'jira_account_file': 'jirafile', + 'type': mock_rule(), + 'owner': 'the_owner', + 'jira_project': 'testproject', + 'jira_issuetype': 'testtype', + 'jira_server': 'jiraserver', + 'jira_label': 'testlabel', + 'jira_component': 'testcomponent', + 'jira_description': "DESC", + 'jira_watchers': ['testwatcher1', 'testwatcher2'], + 'timestamp_field': '@timestamp', + 'jira_affected_user': "#gmail.the_user" + } + mock_issue = mock.Mock() + mock_issue.fields.updated = str(ts_now() - datetime.timedelta(days=4)) + mock_fields = [ + {'name': 'affected user', 'id': 'affected_user_id', 'schema': {'type': 'string'}} + ] + with nested( + mock.patch('elastalert.alerts.JIRA'), + mock.patch('elastalert.alerts.yaml_loader') + ) as (mock_jira, mock_open): + mock_open.return_value = {'user': 'jirauser', 'password': 'jirapassword'} + mock_jira.return_value = mock.Mock() + mock_jira.return_value.search_issues.return_value = [mock_issue] + mock_jira.return_value.fields.return_value = mock_fields + mock_jira.return_value.priorities.return_value = [mock_priority] + alert = JiraAlerter(rule) + alert.alert([{'gmail.the_user': 'jdoe', '@timestamp': '2014-10-31T00:00:00'}]) + assert mock_jira.mock_calls[4][2]['affected_user_id'] == "jdoe" + def test_jira_arbitrary_field_support(): description_txt = "Description stuff goes here like a runbook link." @@ -951,16 +1047,17 @@ def test_slack_uses_custom_slack_channel(): assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) -def test_simple_alerter(): +def test_http_alerter_with_payload(): rule = { - 'name': 'Test Simple Rule', + 'name': 'Test HTTP Post Alerter With Payload', 'type': 'any', - 'simple_webhook_url': 'http://test.webhook.url', - 'alert_subject': 'Cool subject', + 'http_post_url': 'http://test.webhook.url', + 'http_post_payload': {'posted_name': 'somefield'}, + 'http_post_static_payload': {'name': 'somestaticname'}, 'alert': [] } load_modules(rule) - alert = SimplePostAlerter(rule) + alert = HTTPPostAlerter(rule) match = { '@timestamp': '2017-01-01T00:00:00', 'somefield': 'foobarbaz' @@ -968,11 +1065,74 @@ def test_simple_alerter(): with mock.patch('requests.post') as mock_post_request: alert.alert([match]) expected_data = { - 'rule': rule['name'], - 'matches': [match] + 'posted_name': 'foobarbaz', + 'name': 'somestaticname' } mock_post_request.assert_called_once_with( - rule['simple_webhook_url'], + rule['http_post_url'], + data=mock.ANY, + headers={'Content-Type': 'application/json', 'Accept': 'application/json;charset=utf-8'}, + proxies=None + ) + assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) + + +def test_http_alerter_with_payload_all_values(): + rule = { + 'name': 'Test HTTP Post Alerter With Payload', + 'type': 'any', + 'http_post_url': 'http://test.webhook.url', + 'http_post_payload': {'posted_name': 'somefield'}, + 'http_post_static_payload': {'name': 'somestaticname'}, + 'http_post_all_values': True, + 'alert': [] + } + load_modules(rule) + alert = HTTPPostAlerter(rule) + match = { + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match]) + expected_data = { + 'posted_name': 'foobarbaz', + 'name': 'somestaticname', + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + mock_post_request.assert_called_once_with( + rule['http_post_url'], + data=mock.ANY, + headers={'Content-Type': 'application/json', 'Accept': 'application/json;charset=utf-8'}, + proxies=None + ) + assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) + + +def test_http_alerter_without_payload(): + rule = { + 'name': 'Test HTTP Post Alerter Without Payload', + 'type': 'any', + 'http_post_url': 'http://test.webhook.url', + 'http_post_static_payload': {'name': 'somestaticname'}, + 'alert': [] + } + load_modules(rule) + alert = HTTPPostAlerter(rule) + match = { + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match]) + expected_data = { + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'foobarbaz', + 'name': 'somestaticname' + } + mock_post_request.assert_called_once_with( + rule['http_post_url'], data=mock.ANY, headers={'Content-Type': 'application/json', 'Accept': 'application/json;charset=utf-8'}, proxies=None @@ -1073,6 +1233,74 @@ def test_pagerduty_alerter_custom_incident_key_with_args(): assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) +def test_pagerduty_alerter_custom_alert_subject(): + rule = { + 'name': 'Test PD Rule', + 'type': 'any', + 'alert_subject': 'Hungry kittens', + 'pagerduty_service_key': 'magicalbadgers', + 'pagerduty_client_name': 'ponies inc.', + 'pagerduty_incident_key': 'custom {0}', + 'pagerduty_incident_key_args': ['somefield'], + 'alert': [] + } + load_modules(rule) + alert = PagerDutyAlerter(rule) + match = { + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match]) + expected_data = { + 'client': 'ponies inc.', + 'description': 'Hungry kittens', + 'details': { + 'information': 'Test PD Rule\n\n@timestamp: 2017-01-01T00:00:00\nsomefield: foobarbaz\n' + }, + 'event_type': 'trigger', + 'incident_key': 'custom foobarbaz', + 'service_key': 'magicalbadgers', + } + mock_post_request.assert_called_once_with(alert.url, data=mock.ANY, headers={'content-type': 'application/json'}, proxies=None) + assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) + + +def test_pagerduty_alerter_custom_alert_subject_with_args(): + rule = { + 'name': 'Test PD Rule', + 'type': 'any', + 'alert_subject': '{0} kittens', + 'alert_subject_args': ['somefield'], + 'pagerduty_service_key': 'magicalbadgers', + 'pagerduty_client_name': 'ponies inc.', + 'pagerduty_incident_key': 'custom {0}', + 'pagerduty_incident_key_args': ['someotherfield'], + 'alert': [] + } + load_modules(rule) + alert = PagerDutyAlerter(rule) + match = { + '@timestamp': '2017-01-01T00:00:00', + 'somefield': 'Stinky', + 'someotherfield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match]) + expected_data = { + 'client': 'ponies inc.', + 'description': 'Stinky kittens', + 'details': { + 'information': 'Test PD Rule\n\n@timestamp: 2017-01-01T00:00:00\nsomefield: Stinky\nsomeotherfield: foobarbaz\n' + }, + 'event_type': 'trigger', + 'incident_key': 'custom foobarbaz', + 'service_key': 'magicalbadgers', + } + mock_post_request.assert_called_once_with(alert.url, data=mock.ANY, headers={'content-type': 'application/json'}, proxies=None) + assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) + + def test_alert_text_kw(ea): rule = ea.rules[0].copy() rule['alert_text'] = '{field} at {time}' diff --git a/tests/base_test.py b/tests/base_test.py index 69326e450..f97a548b6 100644 --- a/tests/base_test.py +++ b/tests/base_test.py @@ -8,6 +8,7 @@ import elasticsearch import mock import pytest +from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import ElasticsearchException from elastalert.enhancements import BaseEnhancement @@ -251,6 +252,29 @@ def test_match_with_module(ea): mod.process.assert_called_with({'@timestamp': END, 'num_hits': 0, 'num_matches': 1}) +def test_match_with_module_from_pending(ea): + mod = BaseEnhancement(ea.rules[0]) + mod.process = mock.Mock() + ea.rules[0]['match_enhancements'] = [mod] + ea.rules[0].pop('aggregation') + pending_alert = {'match_body': {'foo': 'bar'}, 'rule_name': ea.rules[0]['name'], + 'alert_time': START_TIMESTAMP, '@timestamp': START_TIMESTAMP} + # First call, return the pending alert, second, no associated aggregated alerts + ea.writeback_es.search.side_effect = [{'hits': {'hits': [{'_id': 'ABCD', '_source': pending_alert}]}}, + {'hits': {'hits': []}}] + ea.send_pending_alerts() + assert mod.process.call_count == 0 + + # If aggregation is set, enhancement IS called + pending_alert = {'match_body': {'foo': 'bar'}, 'rule_name': ea.rules[0]['name'], + 'alert_time': START_TIMESTAMP, '@timestamp': START_TIMESTAMP} + ea.writeback_es.search.side_effect = [{'hits': {'hits': [{'_id': 'ABCD', '_source': pending_alert}]}}, + {'hits': {'hits': []}}] + ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10) + ea.send_pending_alerts() + assert mod.process.call_count == 1 + + def test_match_with_module_with_agg(ea): mod = BaseEnhancement(ea.rules[0]) mod.process = mock.Mock() @@ -745,6 +769,7 @@ def test_get_starttime(ea): endtime = '2015-01-01T00:00:00Z' mock_es = mock.Mock() mock_es.search.return_value = {'hits': {'hits': [{'_source': {'endtime': endtime}}]}} + mock_es.info.return_value = {'version': {'number': '2.0'}} ea.writeback_es = mock_es # 4 days old, will return endtime @@ -990,6 +1015,82 @@ def test_exponential_realert(ea): assert exponent == next_res.next() +def test_wait_until_responsive(ea): + """Unblock as soon as ElasticSearch becomes responsive.""" + + # Takes a while before becoming responsive. + ea.writeback_es.indices.exists.side_effect = [ + ConnectionError(), # ES is not yet responsive. + False, # index does not yet exist. + True, + ] + + clock = mock.MagicMock() + clock.side_effect = [0.0, 1.0, 2.0, 3.0, 4.0] + timeout = datetime.timedelta(seconds=3.5) + with mock.patch('time.sleep') as sleep: + ea.wait_until_responsive(timeout=timeout, clock=clock) + + # Sleep as little as we can. + sleep.mock_calls == [ + mock.call(1.0), + ] + + +def test_wait_until_responsive_timeout_es_not_available(ea, capsys): + """Bail out if ElasticSearch doesn't (quickly) become responsive.""" + + # Never becomes responsive :-) + ea.writeback_es.ping.return_value = False + ea.writeback_es.indices.exists.return_value = False + + clock = mock.MagicMock() + clock.side_effect = [0.0, 1.0, 2.0, 3.0] + timeout = datetime.timedelta(seconds=2.5) + with mock.patch('time.sleep') as sleep: + with pytest.raises(SystemExit) as exc: + ea.wait_until_responsive(timeout=timeout, clock=clock) + assert exc.value.code == 1 + + # Ensure we get useful diagnostics. + output, errors = capsys.readouterr() + assert 'Could not reach ElasticSearch at "es:14900".' in errors + + # Slept until we passed the deadline. + sleep.mock_calls == [ + mock.call(1.0), + mock.call(1.0), + mock.call(1.0), + ] + + +def test_wait_until_responsive_timeout_index_does_not_exist(ea, capsys): + """Bail out if ElasticSearch doesn't (quickly) become responsive.""" + + # Never becomes responsive :-) + ea.writeback_es.ping.return_value = True + ea.writeback_es.indices.exists.return_value = False + + clock = mock.MagicMock() + clock.side_effect = [0.0, 1.0, 2.0, 3.0] + timeout = datetime.timedelta(seconds=2.5) + with mock.patch('time.sleep') as sleep: + with pytest.raises(SystemExit) as exc: + ea.wait_until_responsive(timeout=timeout, clock=clock) + assert exc.value.code == 1 + + # Ensure we get useful diagnostics. + output, errors = capsys.readouterr() + assert 'Writeback index "wb" does not exist, did you run `elastalert-create-index`?' in errors + + # Slept until we passed the deadline. + sleep.mock_calls == [ + mock.call(1.0), + mock.call(1.0), + mock.call(1.0), + ] + + def test_stop(ea): """ The purpose of this test is to make sure that calling ElastAlerter.stop() will break it out of a ElastAlerter.start() loop. This method exists to provide a mechanism for running diff --git a/tests/conftest.py b/tests/conftest.py index efdc39e18..ca50a101a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import datetime +import logging import mock import os import pytest @@ -14,6 +15,28 @@ mock_info = {'status': 200, 'name': 'foo', 'version': {'number': '2.0'}} +@pytest.fixture(scope='function', autouse=True) +def reset_loggers(): + """Prevent logging handlers from capturing temporary file handles. + + For example, a test that uses the `capsys` fixture and calls + `logging.exception()` will initialize logging with a default handler that + captures `sys.stderr`. When the test ends, the file handles will be closed + and `sys.stderr` will be returned to its original handle, but the logging + will have a dangling reference to the temporary handle used in the `capsys` + fixture. + + """ + logger = logging.getLogger() + for handler in logger.handlers: + logger.removeHandler(handler) + + +class mock_es_indices_client(object): + def __init__(self): + self.exists = mock.Mock(return_value=True) + + class mock_es_client(object): def __init__(self, host='es', port=14900): self.host = host @@ -24,6 +47,8 @@ def __init__(self, host='es', port=14900): self.index = mock.Mock() self.delete = mock.Mock() self.info = mock.Mock(return_value=mock_info) + self.ping = mock.Mock(return_value=True) + self.indices = mock_es_indices_client() class mock_ruletype(object): diff --git a/tests/rules_test.py b/tests/rules_test.py index bbb94ae3c..341628101 100644 --- a/tests/rules_test.py +++ b/tests/rules_test.py @@ -18,6 +18,7 @@ from elastalert.ruletypes import PercentageMatchRule from elastalert.ruletypes import SpikeRule from elastalert.ruletypes import WhitelistRule +from elastalert.util import dt_to_ts from elastalert.util import EAException from elastalert.util import ts_now from elastalert.util import ts_to_dt @@ -497,7 +498,8 @@ def test_change(): def test_new_term(): rules = {'fields': ['a', 'b'], 'timestamp_field': '@timestamp', - 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash'} + 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash', + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} mock_res = {'aggregations': {'filtered': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, {'key': 'key2', 'doc_count': 5}]}}}} @@ -567,7 +569,8 @@ def test_new_term_nested_field(): rules = {'fields': ['a', 'b.c'], 'timestamp_field': '@timestamp', - 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash'} + 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash', + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} mock_res = {'aggregations': {'filtered': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, {'key': 'key2', 'doc_count': 5}]}}}} with mock.patch('elastalert.ruletypes.elasticsearch_client') as mock_es: @@ -590,7 +593,8 @@ def test_new_term_with_terms(): rules = {'fields': ['a'], 'timestamp_field': '@timestamp', 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash', 'query_key': 'a', - 'window_step_size': {'days': 2}} + 'window_step_size': {'days': 2}, + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} mock_res = {'aggregations': {'filtered': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, {'key': 'key2', 'doc_count': 5}]}}}} @@ -626,7 +630,8 @@ def test_new_term_with_terms(): def test_new_term_with_composite_fields(): rules = {'fields': [['a', 'b', 'c'], ['d', 'e.f']], 'timestamp_field': '@timestamp', - 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash'} + 'es_host': 'example.com', 'es_port': 10, 'index': 'logstash', + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} mock_res = { 'aggregations': { diff --git a/tests/util_test.py b/tests/util_test.py index 1a8b5cb11..498704321 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -1,5 +1,44 @@ # -*- coding: utf-8 -*- from elastalert.util import lookup_es_key, set_es_key, add_raw_postfix, replace_dots_in_field_names +from elastalert.util import ( + parse_deadline, + parse_duration, +) +import mock +import pytest +from datetime import ( + datetime, + timedelta, +) +from dateutil.parser import parse as dt + + +@pytest.mark.parametrize('spec, expected_delta', [ + ('hours=2', timedelta(hours=2)), + ('minutes=30', timedelta(minutes=30)), + ('seconds=45', timedelta(seconds=45)), +]) +def test_parse_duration(spec, expected_delta): + """``unit=num`` specs can be translated into ``timedelta`` instances.""" + assert parse_duration(spec) == expected_delta + + +@pytest.mark.parametrize('spec, expected_deadline', [ + ('hours=2', dt('2017-07-07T12:00:00.000Z')), + ('minutes=30', dt('2017-07-07T10:30:00.000Z')), + ('seconds=45', dt('2017-07-07T10:00:45.000Z')), +]) +def test_parse_deadline(spec, expected_deadline): + """``unit=num`` specs can be translated into ``datetime`` instances.""" + + # Note: Can't mock ``utcnow`` directly because ``datetime`` is a built-in. + class MockDatetime(datetime): + @staticmethod + def utcnow(): + return dt('2017-07-07T10:00:00.000Z') + + with mock.patch('datetime.datetime', MockDatetime): + assert parse_deadline(spec) == expected_deadline def test_setting_keys(ea):