Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FivetranOperatorAsync hook and trigger to handle reschedule changes #25

Merged
merged 12 commits into from
Jun 7, 2023
56 changes: 55 additions & 1 deletion fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import time
from typing import Any, Dict, cast

import aiohttp
import pendulum
from aiohttp import ClientResponseError
from airflow.exceptions import AirflowException
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -44,6 +46,7 @@ async def _do_api_call_async(self, endpoint_info, json=None):
request_func = session.get
elif method == "POST":
request_func = session.post
headers.update({"Content-Type": "application/json;version=2"})
elif method == "PATCH":
request_func = session.patch
headers.update({"Content-Type": "application/json;version=2"})
Expand Down Expand Up @@ -92,7 +95,7 @@ async def get_connector_async(self, connector_id):
resp = await self._do_api_call_async(("GET", endpoint))
return resp["data"]

async def get_sync_status_async(self, connector_id, previous_completed_at):
async def get_sync_status_async(self, connector_id, previous_completed_at, reschedule_time=0):
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
"""
For sensor, return True if connector's 'succeeded_at' field has updated.

Expand All @@ -102,6 +105,9 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
:param previous_completed_at: The last time the connector ran, collected on Sensor
initialization.
:type previous_completed_at: pendulum.datetime.DateTime
:param reschedule_time: Optional, if connector is in reset state
number of seconds to wait before restarting, else Fivetran suggestion used
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
:type reschedule_time: int
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
"""
connector_details = await self.get_connector_async(connector_id)
succeeded_at = self._parse_timestamp(connector_details["succeeded_at"])
Expand All @@ -122,6 +128,13 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
sync_state = connector_details["status"]["sync_state"]
self.log.info('Connector "%s": sync_state = "%s"', connector_id, sync_state)

# if sync in resheduled start, wait for time recommended by Fivetran
# or manually specified, then restart sync
if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual":
self.log.info('Connector is in "rescheduled" state and needs to be manually restarted')
self.pause_and_restart(connector_details["status"]["rescheduled_for"], reschedule_time)
return False

# Check if sync started by airflow has finished
# indicated by new 'succeeded_at' timestamp
if current_completed_at > previous_completed_at:
Expand All @@ -134,6 +147,47 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
job_status = "pending"
return job_status

def pause_and_restart(self, connector_id, reschedule_for, reschedule_time):
"""
While a connector is syncing, if it falls into a reschedule state,
wait for a time either specified by the user of recommended by Fivetran,
Then restart a sync

:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param reschedule_for: From connector details, if schedule_type is manual,
then the connector expects triggering the event at the designated UTC time
:type reschedule_for: str
:param reschedule_time: Optional, if connector is in reset state
number of seconds to wait before restarting, else Fivetran suggestion used
:type reschedule_time: int
"""
if reschedule_time:
log_statement = f'Starting connector again in "{reschedule_time}" seconds'
self.log.info(log_statement)
time.sleep(reschedule_time)
else:
wait_time = (
self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC")
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
).seconds
log_statement = f'Starting connector again in "{wait_time}" seconds'
self.log.info(log_statement)
time.sleep(wait_time)
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

self.log.info("Restarting connector now")
return self.start_fivetran_sync(connector_id)

def _parse_timestamp(self, api_time):
"""
Returns either the pendulum-parsed actual timestamp or a very out-of-date timestamp if not set.

:param api_time: timestamp format as returned by the Fivetran API.
:type api_time: str
:rtype: Pendulum.DateTime
"""
return pendulum.parse(api_time) if api_time is not None else pendulum.from_timestamp(-1)

async def get_last_sync_async(self, connector_id, xcom=""):
"""
Get the last time Fivetran connector completed a sync.
Expand Down
8 changes: 7 additions & 1 deletion fivetran_provider_async/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class FivetranTrigger(BaseTrigger):
:param xcom: If used, FivetranSensorAsync receives timestamp of previously
completed sync
:param poke_interval: polling period in seconds to check for the status
:param reschedule_time: Optional, if connector is in reset state
number of seconds to wait before restarting, else Fivetran suggestion used
"""

def __init__(
Expand All @@ -31,6 +33,7 @@ def __init__(
previous_completed_at: pendulum.DateTime | None = None,
xcom: str = "",
poke_interval: float = 4.0,
reschedule_time: int = 0,
):
super().__init__()
self.task_id = task_id
Expand All @@ -39,6 +42,7 @@ def __init__(
self.previous_completed_at = previous_completed_at
self.xcom = xcom
self.poke_interval = poke_interval
self.reschedule_time = reschedule_time

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes FivetranTrigger arguments and classpath."""
Expand All @@ -64,7 +68,9 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
if self.previous_completed_at is None:
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id, self.xcom)
while True:
res = await hook.get_sync_status_async(self.connector_id, self.previous_completed_at)
res = await hook.get_sync_status_async(
self.connector_id, self.previous_completed_at, self.reschedule_time
)
if res == "success":
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id)
msg = "Fivetran connector %s finished syncing at %s" % (
Expand Down
19 changes: 18 additions & 1 deletion tests/hooks/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ async def test_fivetran_hook_get_sync_status_async(
hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran")
mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS
result = await hook.get_sync_status_async(
connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at
connector_id="interchangeable_revenge",
previous_completed_at=mock_previous_completed_at,
reschedule_time=60,
)
assert result == expected_result

Expand All @@ -74,6 +76,21 @@ async def test_fivetran_hook_get_sync_status_async_exception(mock_api_call_async
assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value)


@pytest.mark.asyncio
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync")
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async")
async def test_fivetran_hook_pause_and_restart(mock_api_call_async_response, mock_start_fivetran_sync):
"""Tests that pause_and_restart method for manual mode with reschedule time set."""
hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran")
mock_start_fivetran_sync.return_value = True
mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS

result = hook.pause_and_restart(
connector_id="interchangeable_revenge", reschedule_for="manual", reschedule_time=60
)
assert result is True


@pytest.mark.asyncio
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async")
async def test_fivetran_hook_get_last_sync_async_no_xcom(mock_api_call_async_response):
Expand Down