diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 2f493c0..41b2285 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -35,6 +35,9 @@ class FivetranHook(BaseHook): api_protocol = "https" api_host = "api.fivetran.com" api_path_connectors = "v1/connectors/" + api_metadata_path_connectors = "v1/metadata/connectors/" + api_path_destinations = "v1/destinations/" + api_path_groups = "v1/groups/" @staticmethod def get_ui_field_behaviour() -> Dict: @@ -149,7 +152,7 @@ def _connector_ui_url_logs(self, service_name, schema_name): def _connector_ui_url_setup(self, service_name, schema_name): return self._connector_ui_url(service_name, schema_name) + "/setup" - def get_connector(self, connector_id): + def get_connector(self, connector_id) -> dict: """ Fetches the detail of a connector. :param connector_id: Fivetran connector_id, found in connector settings @@ -164,6 +167,75 @@ def get_connector(self, connector_id): resp = self._do_api_call(("GET", endpoint)) return resp["data"] + def get_connector_schemas(self, connector_id) -> dict: + """ + Fetches schema information of the connector. + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :return: schema details + :rtype: Dict + """ + if connector_id == "": + raise ValueError("No value specified for connector_id") + endpoint = self.api_path_connectors + connector_id + "/schemas" + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_metadata(self, connector_id, metadata) -> dict: + """ + Fetches metadata for a given metadata string and connector. + + The Fivetran metadata API is currently in beta and available to + all Fivetran users on the enterprise plan and above. + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param metadata: The string to return the type of metadata from the API + :type metadata: str + :return: table or column metadata details + :rtype: Dict + """ + metadata_values = ("tables", "columns") + if connector_id == "": + raise ValueError("No value specified for connector_id") + if metadata not in metadata_values: + raise ValueError( + f"Got {metadata} for param 'metadata', expected one" + f" of: {metadata_values}" + ) + endpoint = self.api_metadata_path_connectors + connector_id + "/" + metadata + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_destinations(self, group_id) -> dict: + """ + Fetches destination information for the given group. + :param group_id: The Fivetran group ID, returned by a connector API call. + :type group_id: str + :return: destination details + :rtype: Dict + """ + if group_id == "": + raise ValueError("No value specified for group_id") + endpoint = self.api_path_destinations + group_id + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_groups(self, group_id) -> dict: + """ + Fetches destination information for the given group. + :param group_id: The Fivetran group ID, returned by a connector API call. + :type group_id: str + :return: group details + :rtype: Dict + """ + if group_id == "": + raise ValueError("No value specified for connector_id") + endpoint = self.api_path_groups + group_id + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + def check_connector(self, connector_id): """ Ensures connector configuration has been completed successfully and is in diff --git a/tests/hooks/test_hooks.py b/tests/hooks/test_hooks.py index 06ff5ba..476b9dc 100644 --- a/tests/hooks/test_hooks.py +++ b/tests/hooks/test_hooks.py @@ -33,7 +33,9 @@ "connected_by": "mournful_shalt", "created_at": "2021-03-05T22:58:56.238875Z", "succeeded_at": "2021-03-23T20:55:12.670390Z", - "failed_at": 'null', + "failed_at": "null", + "paused": False, + "pause_after_trial": False, "sync_frequency": 360, "schedule_type": "manual", "status": { @@ -42,7 +44,7 @@ "update_state": "on_schedule", "is_historical_sync": False, "tasks": [], - "warnings": [] + "warnings": [], }, "config": { "latest_version": "1", @@ -50,49 +52,204 @@ "named_range": "fivetran_test_range", "authorization_method": "User OAuth", "service_version": "1", - "last_synced_changes__utc_": "2021-03-23 20:54" - } - } + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + +MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "enable_new_by_default": True, + "schema_change_handling": "ALLOW_ALL", + "schemas": { + "google_sheets.fivetran_google_sheets_spotify": { + "name_in_destination": "google_sheets.fivetran_google_sheets_spotify", + "enabled": True, + "tables": { + "table_1": { + "name_in_destination": "table_1", + "enabled": True, + "sync_mode": "SOFT_DELETE", + "enabled_patch_settings": {"allowed": True}, + "columns": { + "column_1": { + "name_in_destination": "column_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason_code": "SYSTEM_COLUMN", + "reason": "The column does not support exclusion as it is a Primary Key", + }, + }, + }, + } + }, + } + }, + }, +} + +MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "NjgyMDM0OQ", + "parent_id": "ZGVtbw", + "name_in_source": "subscription_periods", + "name_in_destination": "subscription_periods", + } + ] + }, +} + +MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "MjE0NDM2ODE2", + "parent_id": "NjgyMDM0OQ", + "name_in_source": "_file", + "name_in_destination": "_file", + "type_in_source": "String", + "type_in_destination": "VARCHAR(256)", + "is_primary_key": True, + "is_foreign_key": False, + }, + ] + }, +} + +MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "rarer_gradient", + "group_id": "rarer_gradient", + "service": "google_sheets", + "region": "GCP_US_EAST4", + "time_zone_offset": "-8", + "setup_status": "connected", + "config": {"schema": "google_sheets.fivetran_google_sheets_spotify"}, + }, +} + +MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "rarer_gradient", + "name": "GoogleSheets", + "created_at": "2022-12-12T17:14:33.790844Z", + }, } # Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict('os.environ', AIRFLOW_CONN_CONN_FIVETRAN='http://API_KEY:API_SECRET@') +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") class TestFivetranHook(unittest.TestCase): - """ - Test functions for Fivetran Hook. + """ + Test functions for Fivetran Hook. Mocks responses from Fivetran API. """ @requests_mock.mock() def test_get_connector(self, m): - - m.get('https://api.fivetran.com/v1/connectors/interchangeable_revenge', - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD) - + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) hook = FivetranHook( - fivetran_conn_id='conn_fivetran', + fivetran_conn_id="conn_fivetran", ) + result = hook.get_connector(connector_id="interchangeable_revenge") + assert result["status"]["setup_state"] == "connected" - result = hook.get_connector(connector_id='interchangeable_revenge') - - assert result['status']['setup_state'] == 'connected' + @requests_mock.mock() + def test_get_connector_schemas(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/schemas", + json=MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_connector_schemas(connector_id="interchangeable_revenge") + assert result["schemas"]["google_sheets.fivetran_google_sheets_spotify"][ + "enabled" + ] @requests_mock.mock() - def test_start_fivetran_sync(self, m): + def test_get_metadata_tables(self, m): + m.get( + "https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/tables", + json=MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_metadata( + connector_id="interchangeable_revenge", metadata="tables" + ) + assert result["items"][0]["id"] == "NjgyMDM0OQ" - m.post('https://api.fivetran.com/v1/connectors/interchangeable_revenge/force', - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD) + @requests_mock.mock() + def test_get_metadata_columns(self, m): + m.get( + "https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/columns", + json=MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_metadata( + connector_id="interchangeable_revenge", metadata="columns" + ) + assert result["items"][0]["id"] == "MjE0NDM2ODE2" + @requests_mock.mock() + def test_get_destinations(self, m): + m.get( + "https://api.fivetran.com/v1/destinations/rarer_gradient", + json=MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD, + ) hook = FivetranHook( - fivetran_conn_id='conn_fivetran', + fivetran_conn_id="conn_fivetran", ) + result = hook.get_destinations(group_id="rarer_gradient") + assert result["service"] == "google_sheets" - result = hook.start_fivetran_sync(connector_id='interchangeable_revenge') + @requests_mock.mock() + def test_get_groups(self, m): + m.get( + "https://api.fivetran.com/v1/groups/rarer_gradient", + json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_groups(group_id="rarer_gradient") + assert result["id"] == "rarer_gradient" + assert result["name"] == "GoogleSheets" - assert result['code'] == 'Success' + @requests_mock.mock() + def test_start_fivetran_sync(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.start_fivetran_sync(connector_id="interchangeable_revenge") + assert result["code"] == "Success" -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/operators/test_operators.py b/tests/operators/test_operators.py index fce67c8..2bf14e5 100644 --- a/tests/operators/test_operators.py +++ b/tests/operators/test_operators.py @@ -33,7 +33,7 @@ "connected_by": "mournful_shalt", "created_at": "2021-03-05T22:58:56.238875Z", "succeeded_at": "2021-03-23T20:55:12.670390Z", - "failed_at": 'null', + "failed_at": "null", "sync_frequency": 360, "schedule_type": "manual", "status": { @@ -42,7 +42,7 @@ "update_state": "on_schedule", "is_historical_sync": False, "tasks": [], - "warnings": [] + "warnings": [], }, "config": { "latest_version": "1", @@ -50,17 +50,17 @@ "named_range": "fivetran_test_range", "authorization_method": "User OAuth", "service_version": "1", - "last_synced_changes__utc_": "2021-03-23 20:54" - } - } + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, } # Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict('os.environ', AIRFLOW_CONN_CONN_FIVETRAN='http://API_KEY:API_SECRET@') +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") class TestFivetranOperator(unittest.TestCase): - """ - Test functions for Fivetran Operator. + """ + Test functions for Fivetran Operator. Mocks responses from Fivetran API. """ @@ -68,24 +68,30 @@ class TestFivetranOperator(unittest.TestCase): @requests_mock.mock() def test_fivetran_operator(self, m): - m.get('https://api.fivetran.com/v1/connectors/interchangeable_revenge', - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD) - m.patch('https://api.fivetran.com/v1/connectors/interchangeable_revenge', - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD) - m.post('https://api.fivetran.com/v1/connectors/interchangeable_revenge/force', - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD) + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.patch( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) operator = FivetranOperator( - task_id='fivetran-task', - fivetran_conn_id='conn_fivetran', - connector_id='interchangeable_revenge', + task_id="fivetran-task", + fivetran_conn_id="conn_fivetran", + connector_id="interchangeable_revenge", ) result = operator.execute({}) log.info(result) - assert result['code'] == 'Success' + assert result["code"] == "Success" -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/sensors/test_sensors.py b/tests/sensors/test_sensors.py index 748c066..26270ea 100644 --- a/tests/sensors/test_sensors.py +++ b/tests/sensors/test_sensors.py @@ -36,7 +36,7 @@ "connected_by": "mournful_shalt", "created_at": "2021-03-05T22:58:56.238875Z", "succeeded_at": "2021-03-23T20:55:12.670390Z", - "failed_at": 'null', + "failed_at": "null", "sync_frequency": 360, "schedule_type": "manual", "status": { @@ -45,7 +45,7 @@ "update_state": "on_schedule", "is_historical_sync": False, "tasks": [], - "warnings": [] + "warnings": [], }, "config": { "latest_version": "1", @@ -53,34 +53,34 @@ "named_range": "fivetran_test_range", "authorization_method": "User OAuth", "service_version": "1", - "last_synced_changes__utc_": "2021-03-23 20:54" - } - } + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, } # Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict('os.environ', AIRFLOW_CONN_CONN_FIVETRAN='http://API_KEY:API_SECRET@') +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") class TestFivetranSensor(unittest.TestCase): - """ - Test functions for Fivetran Operator. + """ + Test functions for Fivetran Operator. Mocks responses from Fivetran API. """ - @mock.patch.object(FivetranSensor, 'poke', 'returned_sync_status') + @mock.patch.object(FivetranSensor, "poke", "returned_sync_status") @requests_mock.mock() def test_del(self, m): sensor = FivetranSensor( - task_id='my_fivetran_sensor', - fivetran_conn_id='conn_fivetran', - connector_id='interchangeable_revenge' + task_id="my_fivetran_sensor", + fivetran_conn_id="conn_fivetran", + connector_id="interchangeable_revenge", ) log.info(sensor.poke) - assert sensor.poke == 'returned_sync_status' + assert sensor.poke == "returned_sync_status" -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()