Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] authored and sunank200 committed May 30, 2023
1 parent 1a0fdf6 commit 8a27b4a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 28 deletions.
23 changes: 6 additions & 17 deletions fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import time
from typing import Any, Dict, cast

import aiohttp
import pendulum
import time
from aiohttp import ClientResponseError
from airflow.exceptions import AirflowException
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -130,16 +130,9 @@ async def get_sync_status_async(self, connector_id, previous_completed_at, resch

# if sync in resheduled start, wait for time recommended by Fivetran
# or manually specified, then restart sync
if (
sync_state == "rescheduled"
and connector_details["schedule_type"] == "manual"
):
self.log.info(
f'Connector is in "rescheduled" state and needs to be manually restarted'
)
self.pause_and_restart(
connector_details["status"]["rescheduled_for"], reschedule_time
)
if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual":
self.log.info(f'Connector is in "rescheduled" state and needs to be manually restarted')
self.pause_and_restart(connector_details["status"]["rescheduled_for"], reschedule_time)
return False

# Check if sync started by airflow has finished
Expand Down Expand Up @@ -174,7 +167,7 @@ def pause_and_restart(self, connector_id, reschedule_for, reschedule_time):
time.sleep(reschedule_time)
else:
wait_time = (
self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC")
self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC")
).seconds
self.log.info(f'Starting connector again in "{wait_time}" seconds')
time.sleep(wait_time)
Expand All @@ -190,11 +183,7 @@ def _parse_timestamp(self, api_time):
:type api_time: str
:rtype: Pendulum.DateTime
"""
return (
pendulum.parse(api_time)
if api_time is not None
else pendulum.from_timestamp(-1)
)
return pendulum.parse(api_time) if api_time is not None else pendulum.from_timestamp(-1)

async def get_last_sync_async(self, connector_id, xcom=""):
"""
Expand Down
20 changes: 9 additions & 11 deletions fivetran_provider_async/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ class FivetranTrigger(BaseTrigger):
"""

def __init__(
self,
task_id: str,
connector_id: str,
fivetran_conn_id: str,
previous_completed_at: pendulum.DateTime | None = None,
xcom: str = "",
poke_interval: float = 4.0,
reschedule_time: int = 0,
self,
task_id: str,
connector_id: str,
fivetran_conn_id: str,
previous_completed_at: pendulum.DateTime | None = None,
xcom: str = "",
poke_interval: float = 4.0,
reschedule_time: int = 0,
):
super().__init__()
self.task_id = task_id
Expand Down Expand Up @@ -69,9 +69,7 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id, self.xcom)
while True:
res = await hook.get_sync_status_async(
self.connector_id,
self.previous_completed_at,
self.reschedule_time
self.connector_id, self.previous_completed_at, self.reschedule_time
)
if res == "success":
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id)
Expand Down

0 comments on commit 8a27b4a

Please sign in to comment.