diff --git a/.gitignore b/.gitignore index 3f5684b..9192559 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ PyFlume.egg-info .vscode/* .tox .tox/* -*pyc \ No newline at end of file +*pyc +__pycache__ \ No newline at end of file diff --git a/pyflume/__init__.py b/pyflume/__init__.py index 1c241a5..861cec2 100644 --- a/pyflume/__init__.py +++ b/pyflume/__init__.py @@ -1,497 +1,7 @@ """Authenticates to Flume API, returns a list of devices and allows you to pull the latest sensor results over a period of time.""" -from datetime import datetime, timedelta, timezone -import json -import logging - -import jwt # install pyjwt - -try: - from zoneinfo import ZoneInfo # noqa: WPS433 -except ImportError: # Python < 3.9 - from backports.zoneinfo import ZoneInfo # noqa: WPS433,WPS440 - -from ratelimit import limits, sleep_and_retry -from requests import Session - -from pyflume.format_time import ( - format_start_month, - format_start_today, - format_start_week, - format_time, -) - -API_LIMIT = 60 - -CONST_OPERATION = 'SUM' -CONST_UNIT_OF_MEASUREMENT = 'GALLONS' - -DEFAULT_TIMEOUT = 30 - -API_BASE_URL = 'https://api.flumetech.com' -URL_OAUTH_TOKEN = '{0}{1}'.format(API_BASE_URL, '/oauth/token') # noqa: S105 -API_QUERY_URL = '{0}{1}'.format(API_BASE_URL, '/users/{user_id}/devices/{device_id}/query') -API_DEVICES_URL = '{0}{1}'.format(API_BASE_URL, '/users/{user_id}/devices') -API_NOTIFICATIONS_URL = '{0}{1}'.format(API_BASE_URL, '/users/{user_id}/notifications') - -LOGGER = logging.getLogger(__name__) - - -def _generate_api_query_payload(scan_interval, device_tz): - datetime_localtime = datetime.now(timezone.utc).astimezone(ZoneInfo(device_tz)) - - queries = [ - { - 'request_id': 'current_interval', - 'bucket': 'MIN', - 'since_datetime': format_time( - (datetime_localtime - scan_interval).replace(second=0), - ), - 'until_datetime': format_time(datetime_localtime.replace(second=0)), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'today', - 'bucket': 'DAY', - 'since_datetime': format_start_today(datetime_localtime), - 'until_datetime': format_time(datetime_localtime), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'week_to_date', - 'bucket': 'DAY', - 'since_datetime': format_start_week(datetime_localtime), - 'until_datetime': format_time(datetime_localtime), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'month_to_date', - 'bucket': 'MON', - 'since_datetime': format_start_month(datetime_localtime), - 'until_datetime': format_time(datetime_localtime), - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'last_60_min', - 'bucket': 'MIN', - 'since_datetime': format_time(datetime_localtime - timedelta(minutes=60)), - 'until_datetime': format_time(datetime_localtime), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'last_24_hrs', - 'bucket': 'HR', - 'since_datetime': format_time(datetime_localtime - timedelta(hours=24)), - 'until_datetime': format_time(datetime_localtime), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - { - 'request_id': 'last_30_days', - 'bucket': 'DAY', - 'since_datetime': format_time(datetime_localtime - timedelta(days=30)), # noqa: WPS432 - 'until_datetime': format_time(datetime_localtime), - 'operation': CONST_OPERATION, - 'units': CONST_UNIT_OF_MEASUREMENT, - }, - ] - return {'queries': queries} - - -class FlumeResponseError(Exception): - """ - Exception raised for errors in the Flume response. - - Attributes: - message -- explanation of the error - """ - - -def _response_error(message, response): - """Define a function to handle response errors from the Flume API. - - Args: - message (string): Message received as error - response (string): Response received as error - - Raises: - FlumeResponseError: Exception raised when the status code is not 200. - """ - # If the response code is 200 (OK), no error has occurred, so return immediately - if response.status_code == 200: # noqa: WPS432 - return - - # If the response code is 400 (Bad Request), retrieve the detailed error message - if response.status_code == 400: # noqa: WPS432 - error_message = json.loads(response.text)['detailed'][0] - else: - # For other error codes, retrieve the general error message - error_message = json.loads(response.text)['message'] - - # Raise a custom exception with a formatted message containing the error details - raise FlumeResponseError( - 'Message:{0}.\nResponse code returned:{1}.\nError message returned:{2}.'.format(message, response.status_code, error_message), - ) - - -class FlumeAuth(object): # noqa: WPS214 - """Interact with API Authentication.""" - - def __init__( # noqa: WPS211 - self, - username, - password, - client_id, - client_secret, - flume_token=None, - http_session=None, - timeout=DEFAULT_TIMEOUT, - ): - """ - - Initialize the data object. - - Args: - username: Username to authenticate. - password: Password to authenticate. - client_id: API client id. - client_secret: API client secret. - flume_token: Pass flume token to variable. - http_session: Requests Session() - timeout: Requests timeout for throttling. - - """ - - self._creds = { - 'client_id': client_id, - 'client_secret': client_secret, - 'username': username, - 'password': password, - } - - if http_session is None: - self._http_session = Session() - else: - self._http_session = http_session - - self._timeout = timeout - self._token = None - self._decoded_token = None - self.user_id = None - self.authorization_header = None - - self._load_token(flume_token) - self._verify_token() - - @property - def token(self): - """ - Return authorization token for session. - - Returns: - Returns the current JWT token. - - """ - return self._token - - def refresh_token(self): - """Refresh authorization token for session.""" - - payload = { - 'grant_type': 'refresh_token', - 'refresh_token': self._token['refresh_token'], - 'client_id': self._creds['client_id'], - 'client_secret': self._creds['client_secret'], - } - - self._load_token(self._request_token(payload)) - - def retrieve_token(self): - """Return authorization token for session.""" - - payload = dict({'grant_type': 'password'}, **self._creds) - self._load_token(self._request_token(payload)) - - def _load_token(self, token): - """ - Update _token, decode token, user_id and auth header. - - Args: - token: Authentication bearer token to be decoded. - - """ - jwt_options = {'verify_signature': False} - self._token = token - try: - self._decoded_token = jwt.decode(self._token['access_token'], options=jwt_options) - except jwt.exceptions.DecodeError: - LOGGER.debug('Poorly formatted Access Token, fetching token using _creds') - self.retrieve_token() - except TypeError: - LOGGER.debug('Token TypeError, fetching token using _creds') - self.retrieve_token() - - self.user_id = self._decoded_token['user_id'] - - self.authorization_header = { - 'authorization': 'Bearer {0}'.format(self._token.get('access_token')), - } - - def _request_token(self, payload): - """ - - Request Authorization Payload. - - Args: - payload: Request payload to get token request. - - Returns: - Return response Authentication Bearer token from request. - - """ - - headers = {'content-type': 'application/json'} - response = self._http_session.request( - 'POST', - URL_OAUTH_TOKEN, - json=payload, - headers=headers, - timeout=self._timeout, - ) - - LOGGER.debug('Token Payload: %s', payload) # noqa: WPS323 - LOGGER.debug('Token Response: %s', response.text) # noqa: WPS323 - - # Check for response errors. - _response_error("Can't get token for user {0}".format(self._creds.get('username')), response) - - return json.loads(response.text)['data'][0] - - def _verify_token(self): - """Check to see if token is expiring in 12 hours.""" - token_expiration = datetime.fromtimestamp(self._decoded_token['exp']) - time_difference = datetime.now() + timedelta(hours=12) # noqa: WPS432 - LOGGER.debug('Token expiration time: %s', token_expiration) # noqa: WPS323 - LOGGER.debug('Token comparison time: %s', time_difference) # noqa: WPS323 - - if token_expiration <= time_difference: - self.refresh_token() - - -class FlumeDeviceList(object): - """Get Flume Device List from API.""" - - def __init__( - self, - flume_auth, - http_session=None, - timeout=DEFAULT_TIMEOUT, - ): - """ - - Initialize the data object. - - Args: - flume_auth: Authentication object. - http_session: Requests Session() - timeout: Requests timeout for throttling. - - """ - self._timeout = timeout - self._flume_auth = flume_auth - - if http_session is None: - self._http_session = Session() - else: - self._http_session = http_session - - self.device_list = self.get_devices() - - def get_devices(self): - """ - Return all available devices from Flume API. - - Returns: - Json device list. - - """ - - url = API_DEVICES_URL.format(user_id=self._flume_auth.user_id) - query_string = {'user': 'true', 'location': 'true'} - - response = self._http_session.request( - 'GET', - url, - headers=self._flume_auth.authorization_header, - params=query_string, - timeout=self._timeout, - ) - - LOGGER.debug('get_devices Response: %s', response.text) # noqa: WPS323 - - # Check for response errors. - _response_error('Impossible to retreive devices', response) - - return response.json()['data'] - - -class FlumeNotificationList(object): - """Get Flume Notifications list from API.""" - - def __init__( - self, - flume_auth, - http_session=None, - timeout=DEFAULT_TIMEOUT, - read='false', - ): - """ - - Initialize the data object. - - Args: - flume_auth: Authentication object. - http_session: Requests Session() - timeout: Requests timeout for throttling. - read: state of notification list, have they been read, not read. - - """ - self._timeout = timeout - self._flume_auth = flume_auth - self._read = read - - if http_session is None: - self._http_session = Session() - else: - self._http_session = http_session - - self.notification_list = self.get_notifications() - - def get_notifications(self): - """ - Return all notifications from devices owned by the user. - - Returns: - Returns JSON list of notifications. - - """ - - url = API_NOTIFICATIONS_URL.format(user_id=self._flume_auth.user_id) - - query_string = { - 'limit': '50', - 'offset': '0', - 'sort_direction': 'ASC', - 'read': self._read, - } - - response = self._http_session.request( - 'GET', - url, - headers=self._flume_auth.authorization_header, - params=query_string, - timeout=self._timeout, - ) - - LOGGER.debug('get_notifications Response: %s', response.text) # noqa: WPS323 - - # Check for response errors. - _response_error('Impossible to retrieve notifications', response) - return response.json()['data'] - - -class FlumeData(object): - """Get the latest data and update the states.""" - - def __init__( # noqa: WPS211 - self, - flume_auth, - device_id, - device_tz, - scan_interval, - update_on_init=True, - http_session=None, - timeout=DEFAULT_TIMEOUT, - query_payload=None, - ): - """ - - Initialize the data object. - - Args: - flume_auth: Authentication object. - device_id: flume device id. - device_tz: timezone of device - scan_interval: duration of scan, ex: 60 minutes. - update_on_init: update on initialization. - http_session: Requests Session() - timeout: Requests timeout for throttling. - query_payload: Specific query_payload to request for device. - - """ - self._timeout = timeout - self._flume_auth = flume_auth - self._scan_interval = scan_interval - self.device_id = device_id - self.device_tz = device_tz - self.values = {} # noqa: WPS110 - if query_payload is None: - self.query_payload = _generate_api_query_payload( - self._scan_interval, device_tz, - ) - else: - self.query_payload = query_payload - if http_session is None: - self._http_session = Session() - else: - self._http_session = http_session - self._query_keys = [query['request_id'] for query in self.query_payload['queries']] - if update_on_init: - self.update() - - @sleep_and_retry - @limits(calls=2, period=API_LIMIT) - def update(self): - """ - Return updated value for session. - - Returns: - Returns status of update - - """ - return self.update_force() - - def update_force(self): - """Return updated value for session without auto retry or limits.""" - self.query_payload = _generate_api_query_payload( - self._scan_interval, self.device_tz, - ) - - url = API_QUERY_URL.format( - user_id=self._flume_auth.user_id, device_id=self.device_id, - ) - response = self._http_session.post( - url, - json=self.query_payload, - headers=self._flume_auth.authorization_header, - timeout=self._timeout, - ) - - LOGGER.debug('Update URL: %s', url) # noqa: WPS323 - LOGGER.debug('Update query_payload: %s', self.query_payload) # noqa: WPS323 - LOGGER.debug('Update Response: %s', response.text) # noqa: WPS323 - - # Check for response errors. - _response_error( - "Can't update flume data for user id {0}".format(self._flume_auth.user_id), response, - ) - - responses = response.json()['data'][0] - - self.values = { # noqa: WPS110 - k: responses[k][0]['value'] if len(responses[k]) == 1 else None # noqa: WPS221,WPS111 - for k in self._query_keys # noqa: WPS111 - } +from .auth import FlumeAuth # noqa: WPS300, F401 +from .data import FlumeData # noqa: WPS300, F401 +from .devices import FlumeDeviceList # noqa: WPS300, F401 +from .notifications import FlumeNotificationList # noqa: WPS300, F401 +from .usage import FlumeUsageAlertList # noqa: WPS300, F401 +from .leak import FlumeLeakList # noqa: WPS300, F401 diff --git a/pyflume/auth.py b/pyflume/auth.py new file mode 100644 index 0000000..299504f --- /dev/null +++ b/pyflume/auth.py @@ -0,0 +1,162 @@ +"""Authenticates to Flume API.""" +from datetime import datetime, timedelta +import json + +import jwt # install pyjwt +from requests import Session + +from .constants import DEFAULT_TIMEOUT, URL_OAUTH_TOKEN # noqa: WPS300 +from .utils import configure_logger, flume_response_error # noqa: WPS300 + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeAuth(object): # noqa: WPS214 + """Interact with API Authentication.""" + + def __init__( # noqa: WPS211 + self, + username, + password, + client_id, + client_secret, + flume_token=None, + http_session=None, + timeout=DEFAULT_TIMEOUT, + ): + """ + + Initialize the data object. + + Args: + username: Username to authenticate. + password: Password to authenticate. + client_id: API client id. + client_secret: API client secret. + flume_token: Pass flume token to variable. + http_session: Requests Session() + timeout: Requests timeout for throttling. + + """ + + self._creds = { + "client_id": client_id, + "client_secret": client_secret, + "username": username, + "password": password, + } + + if http_session is None: + self._http_session = Session() + else: + self._http_session = http_session + + self._timeout = timeout + self._token = None + self._decoded_token = None + self.user_id = None + self.authorization_header = None + + self._load_token(flume_token) + self._verify_token() + + @property + def token(self): + """ + Return authorization token for session. + + Returns: + Returns the current JWT token. + + """ + return self._token + + def refresh_token(self): + """Refresh authorization token for session.""" + + payload = { + "grant_type": "refresh_token", + "refresh_token": self._token["refresh_token"], + "client_id": self._creds["client_id"], + "client_secret": self._creds["client_secret"], + } + + self._load_token(self._request_token(payload)) + + def retrieve_token(self): + """Return authorization token for session.""" + + payload = dict({"grant_type": "password"}, **self._creds) + self._load_token(self._request_token(payload)) + + def _load_token(self, token): + """ + Update _token, decode token, user_id and auth header. + + Args: + token: Authentication bearer token to be decoded. + + """ + jwt_options = {"verify_signature": False} + self._token = token + try: + self._decoded_token = jwt.decode( + self._token["access_token"], + options=jwt_options, + ) + except jwt.exceptions.DecodeError: + LOGGER.debug("Poorly formatted Access Token, fetching token using _creds") + self.retrieve_token() + except TypeError: + LOGGER.debug("Token TypeError, fetching token using _creds") + self.retrieve_token() + + self.user_id = self._decoded_token["user_id"] + + self.authorization_header = { + "authorization": "Bearer {0}".format(self._token.get("access_token")), + } + + def _request_token(self, payload): + """ + + Request Authorization Payload. + + Args: + payload: Request payload to get token request. + + Returns: + Return response Authentication Bearer token from request. + + """ + + headers = {"content-type": "application/json"} + response = self._http_session.request( + "POST", + URL_OAUTH_TOKEN, + json=payload, + headers=headers, + timeout=self._timeout, + ) + + LOGGER.debug("Token Payload: %s", payload) # noqa: WPS323 + LOGGER.debug("Token Response: %s", response.text) # noqa: WPS323 + + # Check for response errors. + flume_response_error( + "Can't get token for user {0}".format(self._creds.get("username")), + response, + ) + + return json.loads(response.text)["data"][0] + + def _verify_token(self): + """Check to see if token is expiring in 12 hours.""" + token_expiration = datetime.fromtimestamp(self._decoded_token["exp"]) + time_difference = datetime.now() + timedelta(hours=12) # noqa: WPS432 + LOGGER.debug("Token expiration time: %s", token_expiration) # noqa: WPS323 + LOGGER.debug("Token comparison time: %s", time_difference) # noqa: WPS323 + + if token_expiration <= time_difference: + self.refresh_token() diff --git a/pyflume/constants.py b/pyflume/constants.py new file mode 100644 index 0000000..18a9211 --- /dev/null +++ b/pyflume/constants.py @@ -0,0 +1,21 @@ +"""Constants to support PyFlume.""" +# Time-related constants +API_LIMIT = 60 +DEFAULT_TIMEOUT = 30 + +# Operation constants +CONST_OPERATION = "SUM" +CONST_UNIT_OF_MEASUREMENT = "GALLONS" + +# Base URL +API_BASE_URL = "https://api.flumetech.com" + +# Endpoints +URL_OAUTH_TOKEN = f"{API_BASE_URL}/oauth/token" +API_QUERY_URL = f"{API_BASE_URL}/users/{{user_id}}/devices/{{device_id}}/query" +API_DEVICES_URL = f"{API_BASE_URL}/users/{{user_id}}/devices" +API_NOTIFICATIONS_URL = f"{API_BASE_URL}/users/{{user_id}}/notifications" +API_LEAK_URL = ( + f"{API_BASE_URL}/users/{{user_id}}/devices/{{device_id}}/leaks/active" +) +API_USAGE_URL = f"{API_BASE_URL}/users/{{user_id}}/usage-alerts" diff --git a/pyflume/data.py b/pyflume/data.py new file mode 100644 index 0000000..cec3827 --- /dev/null +++ b/pyflume/data.py @@ -0,0 +1,115 @@ +"""Retrieve data from Flume API.""" +from ratelimit import limits, sleep_and_retry +from requests import Session + +from .constants import API_LIMIT, API_QUERY_URL, DEFAULT_TIMEOUT # noqa: WPS300 +from .utils import ( # noqa: WPS300 + configure_logger, + flume_response_error, + generate_api_query_payload, +) + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeData(object): + """Get the latest data and update the states.""" + + def __init__( # noqa: WPS211 + self, + flume_auth, + device_id, + device_tz, + scan_interval, + update_on_init=True, + http_session=None, + timeout=DEFAULT_TIMEOUT, + query_payload=None, + ): + """ + + Initialize the data object. + + Args: + flume_auth: Authentication object. + device_id: flume device id. + device_tz: timezone of device + scan_interval: duration of scan, ex: 60 minutes. + update_on_init: update on initialization. + http_session: Requests Session() + timeout: Requests timeout for throttling. + query_payload: Specific query_payload to request for device. + + """ + self._timeout = timeout + self._flume_auth = flume_auth + self._scan_interval = scan_interval + self.device_id = device_id + self.device_tz = device_tz + self.values = {} # noqa: WPS110 + if query_payload is None: + self.query_payload = generate_api_query_payload( + self._scan_interval, + device_tz, + ) + else: + self.query_payload = query_payload + if http_session is None: + self._http_session = Session() + else: + self._http_session = http_session + self._query_keys = [ + query["request_id"] for query in self.query_payload["queries"] + ] + if update_on_init: + self.update() + + @sleep_and_retry + @limits(calls=2, period=API_LIMIT) + def update(self): + """ + Return updated value for session. + + Returns: + Returns status of update + + """ + return self.update_force() + + def update_force(self): + """Return updated value for session without auto retry or limits.""" + self.query_payload = generate_api_query_payload( + self._scan_interval, + self.device_tz, + ) + + url = API_QUERY_URL.format( + user_id=self._flume_auth.user_id, + device_id=self.device_id, + ) + response = self._http_session.post( + url, + json=self.query_payload, + headers=self._flume_auth.authorization_header, + timeout=self._timeout, + ) + + LOGGER.debug("Update URL: %s", url) # noqa: WPS323 + LOGGER.debug("Update query_payload: %s", self.query_payload) # noqa: WPS323 + LOGGER.debug("Update Response: %s", response.text) # noqa: WPS323 + + # Check for response errors. + flume_response_error( + "Can't update flume data for user id {0}".format(self._flume_auth.user_id), + response, + ) + + responses = response.json()["data"][0] + + self.values = { # noqa: WPS110 + k: responses[k][0]["value"] + if len(responses[k]) == 1 + else None # noqa: WPS221,WPS111 + for k in self._query_keys # noqa: WPS111 + } diff --git a/pyflume/devices.py b/pyflume/devices.py new file mode 100644 index 0000000..23dbd60 --- /dev/null +++ b/pyflume/devices.py @@ -0,0 +1,65 @@ +"""Retrieve Devices from Flume API.""" +from requests import Session + +from .constants import API_DEVICES_URL, DEFAULT_TIMEOUT # noqa: WPS300 +from .utils import configure_logger, flume_response_error # noqa: WPS300 + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeDeviceList(object): + """Get Flume Device List from API.""" + + def __init__( + self, + flume_auth, + http_session=None, + timeout=DEFAULT_TIMEOUT, + ): + """ + + Initialize the data object. + + Args: + flume_auth: Authentication object. + http_session: Requests Session() + timeout: Requests timeout for throttling. + + """ + self._timeout = timeout + self._flume_auth = flume_auth + + if http_session is None: + self._http_session = Session() + else: + self._http_session = http_session + + self.device_list = self.get_devices() + + def get_devices(self): + """ + Return all available devices from Flume API. + + Returns: + Json device list. + + """ + + url = API_DEVICES_URL.format(user_id=self._flume_auth.user_id) + query_string = {"user": "true", "location": "true"} + + response = self._http_session.request( + "GET", + url, + headers=self._flume_auth.authorization_header, + params=query_string, + timeout=self._timeout, + ) + + LOGGER.debug("get_devices Response: %s", response.text) # noqa: WPS323 + + # Check for response errors. + flume_response_error("Impossible to retreive devices", response) + + return response.json()["data"] diff --git a/pyflume/format_time.py b/pyflume/format_time.py deleted file mode 100644 index 587ff0c..0000000 --- a/pyflume/format_time.py +++ /dev/null @@ -1,69 +0,0 @@ -"""All functions for formatting time.""" - -from datetime import datetime, timedelta - - -def format_time(time): - """ - Format time based on strftime. - - Args: - time: Expected time as datetime.datetime class - - Returns: - Formatted time. - - """ - return time.replace(second=0).strftime('%Y-%m-%d %H:%M:%S') # noqa: WPS323 - - -def format_start_today(time): - """ - Format time starting at 00:00:00 provided datetime. - - Args: - time: Expected time as datetime.datetime class - - Returns: - Formatted time. - - """ - return format_time(datetime.combine(time, datetime.min.time())) - - -def format_start_month(time): - """ - Format time starting at the first of the month for provided datetime. - - Args: - time: Expected time as datetime.datetime class - - Returns: - Formatted time. - - """ - return format_time( - datetime.combine( - time.replace(day=1), - datetime.min.time(), - ), - ) - - -def format_start_week(time): - """ - Format time starting at the start of week for provided datetime. - - Args: - time: Expected time as datetime.datetime class - - Returns: - Formatted time. - - """ - return format_time( - datetime.combine( - time - timedelta(days=time.weekday()), - datetime.min.time(), - ), - ) diff --git a/pyflume/leak.py b/pyflume/leak.py new file mode 100644 index 0000000..523d66d --- /dev/null +++ b/pyflume/leak.py @@ -0,0 +1,77 @@ +"""Retrieve leak notifications from Flume API.""" +from requests import Session + +from .constants import API_LEAK_URL, DEFAULT_TIMEOUT # noqa: WPS300 +from .utils import configure_logger, flume_response_error # noqa: WPS300 + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeLeakList(object): + """Get Flume Flume Leak Notifications from API.""" + + def __init__( # noqa: WPS211 + self, + flume_auth, + device_id, + http_session=None, + timeout=DEFAULT_TIMEOUT, + read="false", + ): + """ + + Initialize the data object. + + Args: + flume_auth: Authentication object. + device_id: The Device ID to query. + http_session: Requests Session() + timeout: Requests timeout for throttling. + read: state of leak notification list, have they been read, not read. + + """ + self._timeout = timeout + self._flume_auth = flume_auth + self._read = read + self.device_id = device_id + + if http_session is None: + self._http_session = Session() + else: + self._http_session = http_session + + self.leak_alert_list = self.get_leak_alerts() + + def get_leak_alerts(self): + """Return all leak alerts from devices owned by the user. + + Returns: + Returns JSON list of leak notifications. + """ + + url = API_LEAK_URL.format( + user_id=self._flume_auth.user_id, + device_id=self.device_id, + ) + + query_string = { + "limit": "50", + "offset": "0", + "sort_direction": "ASC", + "read": self._read, + } + + response = self._http_session.request( + "GET", + url, + headers=self._flume_auth.authorization_header, + params=query_string, + timeout=self._timeout, + ) + + LOGGER.debug(f"get_leak_alerts Response: {response.text}") + + # Check for response errors. + flume_response_error("Impossible to retrieve leak alerts", response) + return response.json()["data"] diff --git a/pyflume/notifications.py b/pyflume/notifications.py new file mode 100644 index 0000000..0841c54 --- /dev/null +++ b/pyflume/notifications.py @@ -0,0 +1,66 @@ +"""Retrieve notifications from Flume API.""" +from typing import Any, Dict, Optional + +from requests import Session + +from .constants import API_NOTIFICATIONS_URL, DEFAULT_TIMEOUT # noqa: WPS300 +from .utils import configure_logger, flume_response_error # noqa: WPS300 + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeNotificationList(object): + """Get Flume Notifications list from API.""" + + def __init__( + self, + flume_auth, + http_session: Optional[Session] = None, + timeout: int = DEFAULT_TIMEOUT, + read: str = "false", + ) -> None: + """ + Initialize the FlumeNotificationList object. + + Args: + flume_auth: Authentication object. + http_session: Optional Requests Session(). + timeout: Requests timeout for throttling, default DEFAULT_TIMEOUT. + read: state of notification list, default "false". + """ + self._timeout = timeout + self._flume_auth = flume_auth + self._read = read + self._http_session = http_session or Session() + self.notification_list = self.get_notifications() + + def get_notifications(self) -> Dict[str, Any]: + """Return all notifications from devices owned by the user. + + Returns: + Dict[str, Any]: Notification JSON message from API. + """ + + url = API_NOTIFICATIONS_URL.format(user_id=self._flume_auth.user_id) + + query_string = { + "limit": "50", + "offset": "0", + "sort_direction": "ASC", + "read": self._read, + } + + response = self._http_session.request( + "GET", + url, + headers=self._flume_auth.authorization_header, + params=query_string, + timeout=self._timeout, + ) + + LOGGER.debug(f"get_notifications Response: {response.text}") + + # Check for response errors. + flume_response_error("Impossible to retrieve notifications", response) + return response.json()["data"] diff --git a/pyflume/usage.py b/pyflume/usage.py new file mode 100644 index 0000000..42cdffa --- /dev/null +++ b/pyflume/usage.py @@ -0,0 +1,71 @@ +"""Retrieve usage alert notifications from Flume API.""" +from requests import Session + +from .constants import API_USAGE_URL, DEFAULT_TIMEOUT # noqa: WPS300 +from .utils import configure_logger, flume_response_error # noqa: WPS300 + +# Configure logging +LOGGER = configure_logger(__name__) + + +class FlumeUsageAlertList(object): + """Get Flume Usage Alert list from API.""" + + def __init__( + self, + flume_auth, + http_session=None, + timeout=DEFAULT_TIMEOUT, + read="false", + ): + """ + + Initialize the data object. + + Args: + flume_auth: Authentication object. + http_session: Requests Session() + timeout: Requests timeout for throttling. + read: state of usage alert list, have they been read, not read. + + """ + self._timeout = timeout + self._flume_auth = flume_auth + self._read = read + + if http_session is None: + self._http_session = Session() + else: + self._http_session = http_session + + self.usage_alert_list = self.get_usage_alerts() + + def get_usage_alerts(self): + """Return all usage alerts from devices owned by teh user. + + Returns: + Returns JSON list of usage alerts. + """ + + url = API_USAGE_URL.format(user_id=self._flume_auth.user_id) + + query_string = { + "limit": "50", + "offset": "0", + "sort_direction": "ASC", + "read": self._read, + } + + response = self._http_session.request( + "GET", + url, + headers=self._flume_auth.authorization_header, + params=query_string, + timeout=self._timeout, + ) + + LOGGER.debug(f"get_usage_alerts Response: {response.text}") + + # Check for response errors. + flume_response_error("Impossible to retrieve usage alert", response) + return response.json()["data"] diff --git a/pyflume/utils.py b/pyflume/utils.py new file mode 100644 index 0000000..5122594 --- /dev/null +++ b/pyflume/utils.py @@ -0,0 +1,215 @@ +"""All functions to support Flume App.""" +from datetime import datetime, timedelta, timezone +import json +import logging + +from .constants import CONST_OPERATION, CONST_UNIT_OF_MEASUREMENT # noqa: WPS300 + +try: + from zoneinfo import ZoneInfo # noqa: WPS433 +except ImportError: # Python < 3.9 + from backports.zoneinfo import ZoneInfo # noqa: WPS433,WPS440 + + +def configure_logger(name): + """Configure and return a custom logger for the given name. + + Args: + name (string): Name of logger + + Returns: + object: Logger Handler + """ + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + + logger_handler = logging.StreamHandler() + formatter = logging.Formatter( + "{asctime} - {name} - {levelname} - {message}", + style="{", + ) + logger_handler.setFormatter(formatter) + + logger.addHandler(logger_handler) + + return logger + + +def format_time(time): + """ + Format time based on strftime. + + Args: + time: Expected time as datetime.datetime class + + Returns: + Formatted time. + + """ + return time.replace(second=0).strftime("%Y-%m-%d %H:%M:%S") # noqa: WPS323 + + +def format_start_today(time): + """ + Format time starting at 00:00:00 provided datetime. + + Args: + time: Expected time as datetime.datetime class + + Returns: + Formatted time. + + """ + return format_time(datetime.combine(time, datetime.min.time())) + + +def format_start_month(time): + """ + Format time starting at the first of the month for provided datetime. + + Args: + time: Expected time as datetime.datetime class + + Returns: + Formatted time. + + """ + return format_time( + datetime.combine( + time.replace(day=1), + datetime.min.time(), + ), + ) + + +def format_start_week(time): + """ + Format time starting at the start of week for provided datetime. + + Args: + time: Expected time as datetime.datetime class + + Returns: + Formatted time. + + """ + return format_time( + datetime.combine( + time - timedelta(days=time.weekday()), + datetime.min.time(), + ), + ) + + +class FlumeResponseError(Exception): + """ + Exception raised for errors in the Flume response. + + Attributes: + message -- explanation of the error + """ + + +def flume_response_error(message, response): + """Define a function to handle response errors from the Flume API. + + Args: + message (string): Message received as error + response (string): Response received as error + + Raises: + FlumeResponseError: Exception raised when the status code is not 200. + """ + # If the response code is 200 (OK), no error has occurred, so return immediately + if response.status_code == 200: # noqa: WPS432 + return + + # If the response code is 400 (Bad Request), retrieve the detailed error message + if response.status_code == 400: # noqa: WPS432 + error_message = json.loads(response.text)["detailed"][0] + else: + # For other error codes, retrieve the general error message + error_message = json.loads(response.text)["message"] + + # Raise a custom exception with a formatted message containing the error details + raise FlumeResponseError( + "Message:{0}.\nResponse code returned:{1}.\nError message returned:{2}.".format( + message, response.status_code, error_message, + ), + ) + + +def generate_api_query_payload(scan_interval, device_tz): + """Generate API Query payload to support getting data from Flume API. + + Args: + scan_interval (_type_): Interval to scan. + device_tz (_type_): Time Zone of Flume device. + + Returns: + JSON: API Query to retrieve API details. + """ + datetime_localtime = datetime.now(timezone.utc).astimezone(ZoneInfo(device_tz)) + + queries = [ + { + "request_id": "current_interval", + "bucket": "MIN", + "since_datetime": format_time( + (datetime_localtime - scan_interval).replace(second=0), + ), + "until_datetime": format_time(datetime_localtime.replace(second=0)), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "today", + "bucket": "DAY", + "since_datetime": format_start_today(datetime_localtime), + "until_datetime": format_time(datetime_localtime), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "week_to_date", + "bucket": "DAY", + "since_datetime": format_start_week(datetime_localtime), + "until_datetime": format_time(datetime_localtime), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "month_to_date", + "bucket": "MON", + "since_datetime": format_start_month(datetime_localtime), + "until_datetime": format_time(datetime_localtime), + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "last_60_min", + "bucket": "MIN", + "since_datetime": format_time(datetime_localtime - timedelta(minutes=60)), + "until_datetime": format_time(datetime_localtime), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "last_24_hrs", + "bucket": "HR", + "since_datetime": format_time(datetime_localtime - timedelta(hours=24)), + "until_datetime": format_time(datetime_localtime), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + { + "request_id": "last_30_days", + "bucket": "DAY", + "since_datetime": format_time( + datetime_localtime - timedelta(days=30), # noqa: WPS432 + ), + "until_datetime": format_time(datetime_localtime), + "operation": CONST_OPERATION, + "units": CONST_UNIT_OF_MEASUREMENT, + }, + ] + return {"queries": queries} diff --git a/setup.cfg b/setup.cfg index bcdebfa..a626631 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,6 +5,7 @@ testpaths = tests exclude = .venv,.git,.tox,docs,venv,bin,lib,deps,build doctests = True # To work with Black +max-module-members = 10 max-line-length = 88 # E501: line too long # W503: Line break occurred before a binary operator @@ -24,7 +25,11 @@ ignore = WPS402, WPS226, WPS204, - WPS412 + WPS412, + Q000, + WPS305, + I001, + WPS100 [isort] # https://github.com/timothycrosley/isort diff --git a/setup.py b/setup.py index 0ede951..ba4389e 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setuptools.setup( name='PyFlume', - version='0.7.2', + version='0.8.0', author='ChrisMandich', author_email='Chris@Mandich.net', description='Package to integrate with Flume Sensor', diff --git a/tests/fixtures/leak.json b/tests/fixtures/leak.json new file mode 100644 index 0000000..e33ea2a --- /dev/null +++ b/tests/fixtures/leak.json @@ -0,0 +1,15 @@ +{ + "success": true, + "code": 602, + "message": "Request OK", + "http_code": 200, + "http_message": "OK", + "detailed": null, + "data": [ + { + "active": true + } + ], + "count": 1, + "pagination": null +} \ No newline at end of file diff --git a/tests/fixtures/usage.json b/tests/fixtures/usage.json new file mode 100644 index 0000000..177b867 --- /dev/null +++ b/tests/fixtures/usage.json @@ -0,0 +1,715 @@ +{ + "success": true, + "code": 602, + "message": "Request OK", + "http_code": 200, + "http_message": "OK", + "detailed": null, + "data": [ + { + "id": 5243137, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-22T12:41:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-22 05:42:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-22 05:27:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5255122, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-23T11:14:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-23 04:15:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-23 04:00:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5257651, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-23T13:30:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-23 06:31:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-23 06:16:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5270444, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-24T12:41:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-24 05:42:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-24 05:27:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5274373, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-24T18:17:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-24 11:18:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-24 11:03:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5283712, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-25T12:40:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-25 05:41:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-25 05:26:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5296972, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-26T13:09:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-26 06:10:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-26 05:55:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5310367, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-27T12:41:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-27 05:42:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-27 05:27:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5323808, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-28T13:07:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-28 06:08:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-28 05:53:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5351044, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-06-29T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-06-29 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-06-29 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5359491, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-01T11:14:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-01 04:15:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-01 04:00:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5361864, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-01T13:29:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-01 06:30:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-01 06:15:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5373614, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-02T12:43:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-02 05:44:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-02 05:29:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5384498, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-03T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-03 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-03 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5411226, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-05T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-05 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-05 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5413585, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-05T13:24:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-05 06:25:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-05 06:10:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5426430, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-06T12:45:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-06 05:46:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-06 05:31:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5437976, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-07T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-07 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-07 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5440080, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-07T13:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-07 06:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-07 05:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5453334, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-08T12:46:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-08 05:47:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-08 05:32:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5465242, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-09T11:14:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-09 04:15:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-09 04:00:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5467031, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-09T13:14:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-09 06:15:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-09 06:00:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5480078, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-10T13:11:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-10 06:12:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-10 05:57:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5492257, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-11T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-11 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-11 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5494981, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-11T13:22:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-11 06:23:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-11 06:08:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5521358, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-13T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-13 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-13 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5537687, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-14T12:49:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-14 05:50:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-14 05:35:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5549658, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-15T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-15 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-15 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5551885, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-15T13:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-15 06:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-15 05:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5564244, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-16T12:50:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-16 05:51:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-16 05:36:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5575597, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-17T11:12:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-17 04:13:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-17 03:58:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5591234, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-18T12:51:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-18 05:52:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-18 05:37:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5603583, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-19T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-19 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-19 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5606381, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-19T13:29:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-19 06:30:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-19 06:15:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5620785, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-20T12:52:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-20 05:53:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-20 05:38:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5634037, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-21T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-21 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-21 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5636547, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-21T13:20:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-21 06:21:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-21 06:06:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5651243, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-22T12:54:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-22 05:55:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-22 05:40:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5663989, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-23T11:12:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-23 04:13:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-23 03:58:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5679244, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-24T12:54:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-24 05:55:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-24 05:40:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5691434, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-25T11:13:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-25 04:14:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-25 03:59:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5720925, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-27T11:12:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-27 04:13:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-27 03:58:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5723700, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-27T13:27:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-27 06:28:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-27 06:13:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5737526, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-28T12:58:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-28 05:59:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-28 05:44:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5749779, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-29T11:12:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-29 04:13:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-29 03:58:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5752512, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-29T13:27:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-29 06:28:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-29 06:13:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5765668, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-30T12:59:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-30 06:00:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-30 05:45:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5777345, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-31T11:12:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-31 04:13:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-31 03:58:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5779249, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-07-31T13:18:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-07-31 06:19:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-07-31 06:04:00" + }, + "event_rule_name": "High Flow Alert" + }, + { + "id": 5793493, + "device_id": "6248148189204194987", + "triggered_datetime": "2022-08-01T13:00:00.000Z", + "flume_leak": false, + "query": { + "request_id": "SYSTEM_TRIGGERED_USAGE_ALERT", + "until_datetime": "2022-08-01 06:01:00", + "tz": "America/Los_Angeles", + "bucket": "MIN", + "since_datetime": "2022-08-01 05:46:00" + }, + "event_rule_name": "High Flow Alert" + } + ], + "count": 122, + "pagination": { + "next": "/users/1111/usage-alerts?offset=50&limit=50", + "prev": null + } +} \ No newline at end of file diff --git a/tests/test_init.py b/tests/test_init.py index d85eb9f..b5ee309 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -13,25 +13,25 @@ # Local application/library-specific imports import pyflume -CONST_TOKEN_FILE = 'token.json' # noqa: S105 -CONST_HTTP_METHOD_POST = 'post' -CONST_USERNAME = 'username' # noqa: S105 -CONST_PASSWORD = 'password' # noqa: S105 -CONST_CLIENT_ID = 'client_id' # noqa: S105 -CONST_CLIENT_SECRET = 'client_secret' # noqa: S105 -CONST_USER_ID = 'user_id' -CONST_FLUME_TOKEN = MappingProxyType({ - 'token_type' : 'bearer', - 'expires_in' : 604800, - 'refresh_token' : 'fdb8fdbecf1d03ce5e6125c067733c0d51de209c', - 'access_token' : 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoidXNlcl9pZCIsImV4cCI6Mjk5OTk5OTk5OTcsIngiOiJ5eiJ9.utb2yzcMImBFhDx_mssC_HU0mbfo0D_-VAQOetw5_h0', -}) +CONST_TOKEN_FILE = "token.json" # noqa: S105 +CONST_HTTP_METHOD_POST = "post" +CONST_USERNAME = "username" # noqa: S105 +CONST_PASSWORD = "password" # noqa: S105 +CONST_CLIENT_ID = "client_id" # noqa: S105 +CONST_CLIENT_SECRET = "client_secret" # noqa: S105 +CONST_USER_ID = "user_id" +CONST_FLUME_TOKEN = MappingProxyType( + { + "token_type": "bearer", + "expires_in": 604800, + "refresh_token": "fdb8fdbecf1d03ce5e6125c067733c0d51de209c", + "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoidXNlcl9pZCIsImV4cCI6Mjk5OTk5OTk5OTcsIngiOiJ5eiJ9.utb2yzcMImBFhDx_mssC_HU0mbfo0D_-VAQOetw5_h0", + }, +) def load_fixture(filename): - """ - - Load a fixture. + """Load a fixture. Args: filename: File to load fixture. @@ -40,7 +40,7 @@ def load_fixture(filename): File fixture contents. """ - path = os.path.join(os.path.dirname(__file__), 'fixtures', filename) + path = os.path.join(os.path.dirname(__file__), "fixtures", filename) with open(path) as fptr: return fptr.read() @@ -61,9 +61,18 @@ def test_init(self, mock): mock: Requests mock. """ - mock.register_uri(CONST_HTTP_METHOD_POST, pyflume.URL_OAUTH_TOKEN, text=load_fixture(CONST_TOKEN_FILE)) + mock.register_uri( + CONST_HTTP_METHOD_POST, + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) auth = pyflume.FlumeAuth( - CONST_USERNAME, CONST_PASSWORD, CONST_CLIENT_ID, CONST_CLIENT_SECRET, CONST_FLUME_TOKEN, http_session=Session(), + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, + http_session=Session(), ) assert auth.user_id == CONST_USER_ID # noqa: S101 @@ -73,22 +82,28 @@ class TestFlumeDeviceList(unittest.TestCase): @requests_mock.Mocker() def test_init(self, mock): - """ - - Test initialization for Flume Device List. + """Test initialization for Flume Device List. Args: mock: Requests mock. """ - mock.register_uri(CONST_HTTP_METHOD_POST, pyflume.URL_OAUTH_TOKEN, text=load_fixture(CONST_TOKEN_FILE)) mock.register_uri( - 'get', - pyflume.API_DEVICES_URL.format(user_id=CONST_USER_ID), - text=load_fixture('devices.json'), + CONST_HTTP_METHOD_POST, + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) + mock.register_uri( + "get", + pyflume.constants.API_DEVICES_URL.format(user_id=CONST_USER_ID), + text=load_fixture("devices.json"), ) flume_auth = pyflume.FlumeAuth( - CONST_USERNAME, CONST_PASSWORD, CONST_CLIENT_ID, CONST_CLIENT_SECRET, CONST_FLUME_TOKEN, + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, ) flume_devices = pyflume.FlumeDeviceList(flume_auth) @@ -110,14 +125,22 @@ def test_init(self, mock): mock: Requests mock. """ - mock.register_uri(CONST_HTTP_METHOD_POST, pyflume.URL_OAUTH_TOKEN, text=load_fixture(CONST_TOKEN_FILE)) mock.register_uri( - 'get', - pyflume.API_NOTIFICATIONS_URL.format(user_id=CONST_USER_ID), - text=load_fixture('notification.json'), + CONST_HTTP_METHOD_POST, + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) + mock.register_uri( + "get", + pyflume.constants.API_NOTIFICATIONS_URL.format(user_id=CONST_USER_ID), + text=load_fixture("notification.json"), ) flume_auth = pyflume.FlumeAuth( - CONST_USERNAME, CONST_PASSWORD, CONST_CLIENT_ID, CONST_CLIENT_SECRET, CONST_FLUME_TOKEN, + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, ) flume_notifications = pyflume.FlumeNotificationList(flume_auth) @@ -126,34 +149,119 @@ def test_init(self, mock): assert notifications[0][CONST_USER_ID] == 1111 # noqa: S101,WPS432 -class TestFlumeData(unittest.TestCase): - """Test Flume Data Test.""" +class TestFlumeUsageAlerts(unittest.TestCase): + """Test Flume Usage Alerts Test.""" + + @requests_mock.Mocker() + def test_init(self, mock): + """ + + Test initialization for Flume Usage Alerts List. + + Args: + mock: Requests mock. + + """ + mock.register_uri( + CONST_HTTP_METHOD_POST, + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) + mock.register_uri( + "get", + pyflume.constants.API_USAGE_URL.format(user_id=CONST_USER_ID), + text=load_fixture("usage.json"), + ) + flume_auth = pyflume.FlumeAuth( + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, + ) + + flume_alerts = pyflume.FlumeUsageAlertList(flume_auth) + alerts = flume_alerts.get_usage_alerts() + assert len(alerts) == 50 # noqa: S101, WPS432 + assert alerts[0]["device_id"] == "6248148189204194987" # noqa: S101 + assert alerts[0]["event_rule_name"] == "High Flow Alert" # noqa: S101 + + +class TestFlumeLeakList(unittest.TestCase): + """Test Flume Leak List Test.""" @requests_mock.Mocker() def test_init(self, mock): """ - Test initialization for Flume Data. + Test initialization for Flume Usage Leak List. Args: mock: Requests mock. """ - mock.register_uri(CONST_HTTP_METHOD_POST, pyflume.URL_OAUTH_TOKEN, text=load_fixture(CONST_TOKEN_FILE)) mock.register_uri( CONST_HTTP_METHOD_POST, - pyflume.API_QUERY_URL.format(user_id=CONST_USER_ID, device_id='device_id'), - text=load_fixture('query.json'), + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) + mock.register_uri( + "get", + pyflume.constants.API_LEAK_URL.format( + user_id=CONST_USER_ID, + device_id="6248148189204194987", + ), + text=load_fixture("leak.json"), + ) + flume_auth = pyflume.FlumeAuth( + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, + ) + + flume_leaks = pyflume.FlumeLeakList(flume_auth, "6248148189204194987") + alerts = flume_leaks.get_leak_alerts() + assert len(alerts) == 1 # noqa: S101 + assert alerts[0]["active"] # noqa: S101 + + +class TestFlumeData(unittest.TestCase): + """Test Flume Data Test.""" + + @requests_mock.Mocker() + def test_init(self, mock): + """Test initialization for Flume Data. + + Args: + mock: Requests mock. + """ + mock.register_uri( + CONST_HTTP_METHOD_POST, + pyflume.constants.URL_OAUTH_TOKEN, + text=load_fixture(CONST_TOKEN_FILE), + ) + mock.register_uri( + CONST_HTTP_METHOD_POST, + pyflume.constants.API_QUERY_URL.format( + user_id=CONST_USER_ID, device_id="device_id", + ), + text=load_fixture("query.json"), ) flume_auth = pyflume.FlumeAuth( - CONST_USERNAME, CONST_PASSWORD, CONST_CLIENT_ID, CONST_CLIENT_SECRET, CONST_FLUME_TOKEN, + CONST_USERNAME, + CONST_PASSWORD, + CONST_CLIENT_ID, + CONST_CLIENT_SECRET, + CONST_FLUME_TOKEN, ) flume = pyflume.FlumeData( flume_auth, - 'device_id', - 'America/Los_Angeles', + "device_id", + "America/Los_Angeles", SCAN_INTERVAL, http_session=Session(), update_on_init=False, @@ -162,11 +270,11 @@ def test_init(self, mock): flume.update() assert flume.values == { # noqa: S101 - 'current_interval': 14.38855184, - 'today': 56.6763912, - 'week_to_date': 1406.07065872, - 'month_to_date': 56.6763912, - 'last_60_min': 14.38855184, - 'last_24_hrs': 258.9557672, - 'last_30_days': 5433.56753264, + "current_interval": 14.38855184, + "today": 56.6763912, + "week_to_date": 1406.07065872, + "month_to_date": 56.6763912, + "last_60_min": 14.38855184, + "last_24_hrs": 258.9557672, + "last_30_days": 5433.56753264, } diff --git a/tox.ini b/tox.ini index b71cf5d..d1a0aca 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,6 @@ skip_missing_interpreters = True deps = pyjwt pytest - pytz ratelimit requests requests_mock