Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #67 from denimalpaca/add_api_calls_to_hook
Browse files Browse the repository at this point in the history
Add API Calls to Hook
  • Loading branch information
PubChimps committed Jan 12, 2023
2 parents 59f046e + 7a03f43 commit eb0f919
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 57 deletions.
74 changes: 73 additions & 1 deletion fivetran_provider/hooks/fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class FivetranHook(BaseHook):
api_protocol = "https"
api_host = "api.fivetran.com"
api_path_connectors = "v1/connectors/"
api_metadata_path_connectors = "v1/metadata/connectors/"
api_path_destinations = "v1/destinations/"
api_path_groups = "v1/groups/"

@staticmethod
def get_ui_field_behaviour() -> Dict:
Expand Down Expand Up @@ -149,7 +152,7 @@ def _connector_ui_url_logs(self, service_name, schema_name):
def _connector_ui_url_setup(self, service_name, schema_name):
return self._connector_ui_url(service_name, schema_name) + "/setup"

def get_connector(self, connector_id):
def get_connector(self, connector_id) -> dict:
"""
Fetches the detail of a connector.
:param connector_id: Fivetran connector_id, found in connector settings
Expand All @@ -164,6 +167,75 @@ def get_connector(self, connector_id):
resp = self._do_api_call(("GET", endpoint))
return resp["data"]

def get_connector_schemas(self, connector_id) -> dict:
"""
Fetches schema information of the connector.
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:return: schema details
:rtype: Dict
"""
if connector_id == "":
raise ValueError("No value specified for connector_id")
endpoint = self.api_path_connectors + connector_id + "/schemas"
resp = self._do_api_call(("GET", endpoint))
return resp["data"]

def get_metadata(self, connector_id, metadata) -> dict:
"""
Fetches metadata for a given metadata string and connector.
The Fivetran metadata API is currently in beta and available to
all Fivetran users on the enterprise plan and above.
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param metadata: The string to return the type of metadata from the API
:type metadata: str
:return: table or column metadata details
:rtype: Dict
"""
metadata_values = ("tables", "columns")
if connector_id == "":
raise ValueError("No value specified for connector_id")
if metadata not in metadata_values:
raise ValueError(
f"Got {metadata} for param 'metadata', expected one"
f" of: {metadata_values}"
)
endpoint = self.api_metadata_path_connectors + connector_id + "/" + metadata
resp = self._do_api_call(("GET", endpoint))
return resp["data"]

def get_destinations(self, group_id) -> dict:
"""
Fetches destination information for the given group.
:param group_id: The Fivetran group ID, returned by a connector API call.
:type group_id: str
:return: destination details
:rtype: Dict
"""
if group_id == "":
raise ValueError("No value specified for group_id")
endpoint = self.api_path_destinations + group_id
resp = self._do_api_call(("GET", endpoint))
return resp["data"]

def get_groups(self, group_id) -> dict:
"""
Fetches destination information for the given group.
:param group_id: The Fivetran group ID, returned by a connector API call.
:type group_id: str
:return: group details
:rtype: Dict
"""
if group_id == "":
raise ValueError("No value specified for connector_id")
endpoint = self.api_path_groups + group_id
resp = self._do_api_call(("GET", endpoint))
return resp["data"]

def check_connector(self, connector_id):
"""
Ensures connector configuration has been completed successfully and is in
Expand Down
203 changes: 180 additions & 23 deletions tests/hooks/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
"connected_by": "mournful_shalt",
"created_at": "2021-03-05T22:58:56.238875Z",
"succeeded_at": "2021-03-23T20:55:12.670390Z",
"failed_at": 'null',
"failed_at": "null",
"paused": False,
"pause_after_trial": False,
"sync_frequency": 360,
"schedule_type": "manual",
"status": {
Expand All @@ -42,57 +44,212 @@
"update_state": "on_schedule",
"is_historical_sync": False,
"tasks": [],
"warnings": []
"warnings": [],
},
"config": {
"latest_version": "1",
"sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...",
"named_range": "fivetran_test_range",
"authorization_method": "User OAuth",
"service_version": "1",
"last_synced_changes__utc_": "2021-03-23 20:54"
}
}
"last_synced_changes__utc_": "2021-03-23 20:54",
},
},
}

MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"enable_new_by_default": True,
"schema_change_handling": "ALLOW_ALL",
"schemas": {
"google_sheets.fivetran_google_sheets_spotify": {
"name_in_destination": "google_sheets.fivetran_google_sheets_spotify",
"enabled": True,
"tables": {
"table_1": {
"name_in_destination": "table_1",
"enabled": True,
"sync_mode": "SOFT_DELETE",
"enabled_patch_settings": {"allowed": True},
"columns": {
"column_1": {
"name_in_destination": "column_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason_code": "SYSTEM_COLUMN",
"reason": "The column does not support exclusion as it is a Primary Key",
},
},
},
}
},
}
},
},
}

MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"items": [
{
"id": "NjgyMDM0OQ",
"parent_id": "ZGVtbw",
"name_in_source": "subscription_periods",
"name_in_destination": "subscription_periods",
}
]
},
}

MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"items": [
{
"id": "MjE0NDM2ODE2",
"parent_id": "NjgyMDM0OQ",
"name_in_source": "_file",
"name_in_destination": "_file",
"type_in_source": "String",
"type_in_destination": "VARCHAR(256)",
"is_primary_key": True,
"is_foreign_key": False,
},
]
},
}

MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"id": "rarer_gradient",
"group_id": "rarer_gradient",
"service": "google_sheets",
"region": "GCP_US_EAST4",
"time_zone_offset": "-8",
"setup_status": "connected",
"config": {"schema": "google_sheets.fivetran_google_sheets_spotify"},
},
}

MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"id": "rarer_gradient",
"name": "GoogleSheets",
"created_at": "2022-12-12T17:14:33.790844Z",
},
}


# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`)
@mock.patch.dict('os.environ', AIRFLOW_CONN_CONN_FIVETRAN='http://API_KEY:API_SECRET@')
@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@")
class TestFivetranHook(unittest.TestCase):
"""
Test functions for Fivetran Hook.
"""
Test functions for Fivetran Hook.
Mocks responses from Fivetran API.
"""

@requests_mock.mock()
def test_get_connector(self, m):

m.get('https://api.fivetran.com/v1/connectors/interchangeable_revenge',
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD)

m.get(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id='conn_fivetran',
fivetran_conn_id="conn_fivetran",
)
result = hook.get_connector(connector_id="interchangeable_revenge")
assert result["status"]["setup_state"] == "connected"

result = hook.get_connector(connector_id='interchangeable_revenge')

assert result['status']['setup_state'] == 'connected'
@requests_mock.mock()
def test_get_connector_schemas(self, m):
m.get(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge/schemas",
json=MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.get_connector_schemas(connector_id="interchangeable_revenge")
assert result["schemas"]["google_sheets.fivetran_google_sheets_spotify"][
"enabled"
]

@requests_mock.mock()
def test_start_fivetran_sync(self, m):
def test_get_metadata_tables(self, m):
m.get(
"https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/tables",
json=MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.get_metadata(
connector_id="interchangeable_revenge", metadata="tables"
)
assert result["items"][0]["id"] == "NjgyMDM0OQ"

m.post('https://api.fivetran.com/v1/connectors/interchangeable_revenge/force',
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD)
@requests_mock.mock()
def test_get_metadata_columns(self, m):
m.get(
"https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/columns",
json=MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.get_metadata(
connector_id="interchangeable_revenge", metadata="columns"
)
assert result["items"][0]["id"] == "MjE0NDM2ODE2"

@requests_mock.mock()
def test_get_destinations(self, m):
m.get(
"https://api.fivetran.com/v1/destinations/rarer_gradient",
json=MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id='conn_fivetran',
fivetran_conn_id="conn_fivetran",
)
result = hook.get_destinations(group_id="rarer_gradient")
assert result["service"] == "google_sheets"

result = hook.start_fivetran_sync(connector_id='interchangeable_revenge')
@requests_mock.mock()
def test_get_groups(self, m):
m.get(
"https://api.fivetran.com/v1/groups/rarer_gradient",
json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.get_groups(group_id="rarer_gradient")
assert result["id"] == "rarer_gradient"
assert result["name"] == "GoogleSheets"

assert result['code'] == 'Success'
@requests_mock.mock()
def test_start_fivetran_sync(self, m):
m.get(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD,
)
m.post(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge/force",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.start_fivetran_sync(connector_id="interchangeable_revenge")
assert result["code"] == "Success"


if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit eb0f919

Please sign in to comment.