Skip to content

Commit

Permalink
Add reschedule_time=0
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed May 30, 2023
1 parent 8a27b4a commit cc49fec
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def get_connector_async(self, connector_id):
resp = await self._do_api_call_async(("GET", endpoint))
return resp["data"]

async def get_sync_status_async(self, connector_id, previous_completed_at, reschedule_time):
async def get_sync_status_async(self, connector_id, previous_completed_at, reschedule_time=0):
"""
For sensor, return True if connector's 'succeeded_at' field has updated.
Expand Down Expand Up @@ -131,7 +131,7 @@ async def get_sync_status_async(self, connector_id, previous_completed_at, resch
# if sync in resheduled start, wait for time recommended by Fivetran
# or manually specified, then restart sync
if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual":
self.log.info(f'Connector is in "rescheduled" state and needs to be manually restarted')
self.log.info('Connector is in "rescheduled" state and needs to be manually restarted')
self.pause_and_restart(connector_details["status"]["rescheduled_for"], reschedule_time)
return False

Expand All @@ -152,6 +152,7 @@ def pause_and_restart(self, connector_id, reschedule_for, reschedule_time):
While a connector is syncing, if it falls into a reschedule state,
wait for a time either specified by the user of recommended by Fivetran,
Then restart a sync
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
Expand All @@ -163,22 +164,24 @@ def pause_and_restart(self, connector_id, reschedule_for, reschedule_time):
:type reschedule_time: int
"""
if reschedule_time:
self.log.info(f'Starting connector again in "{reschedule_time}" seconds')
log_statement = f'Starting connector again in "{reschedule_time}" seconds'
self.log.info(log_statement)
time.sleep(reschedule_time)
else:
wait_time = (
self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC")
).seconds
self.log.info(f'Starting connector again in "{wait_time}" seconds')
log_statement = f'Starting connector again in "{wait_time}" seconds'
self.log.info(log_statement)
time.sleep(wait_time)

self.log.info("Restarting connector now")
return self.start_fivetran_sync(connector_id)

def _parse_timestamp(self, api_time):
"""
Returns either the pendulum-parsed actual timestamp or
a very out-of-date timestamp if not set
Returns either the pendulum-parsed actual timestamp or a very out-of-date timestamp if not set.
:param api_time: timestamp format as returned by the Fivetran API.
:type api_time: str
:rtype: Pendulum.DateTime
Expand Down

0 comments on commit cc49fec

Please sign in to comment.