From e69649a5f7db989f3d4866c9f4f7c8175cea06e8 Mon Sep 17 00:00:00 2001 From: PubChimps Date: Mon, 23 Jan 2023 12:52:28 -0800 Subject: [PATCH 1/5] resync on reschedule --- fivetran_provider/hooks/fivetran.py | 41 ++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 7ca52ff..42a2bde 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -356,7 +356,7 @@ def get_last_sync(self, connector_id, xcom=""): last_sync = succeeded_at if succeeded_at > failed_at else failed_at return last_sync - def get_sync_status(self, connector_id, previous_completed_at): + def get_sync_status(self, connector_id, previous_completed_at, reschedule_time=0): """ For sensor, return True if connector's 'succeeded_at' field has updated. :param connector_id: Fivetran connector_id, found in connector settings @@ -365,9 +365,10 @@ def get_sync_status(self, connector_id, previous_completed_at): :param previous_completed_at: The last time the connector ran, collected on Sensor initialization. :type previous_completed_at: pendulum.datetime.DateTime + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int """ - # @todo Need logic here to tell if the sync is not running at all and not - # likely to run in the near future. connector_details = self.get_connector(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) failed_at = self._parse_timestamp(connector_details["failed_at"]) @@ -386,7 +387,13 @@ def get_sync_status(self, connector_id, previous_completed_at): sync_state = connector_details["status"]["sync_state"] self.log.info(f'Connector "{connector_id}": sync_state = {sync_state}') - + + #if sync in resheduled start, wait for time recommended by Fivetran + #or manually specified, then restart sync + if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": + self.log.info(f'Connector is in "rescheduled" state and needs to be manually restarted') + self.pause_and_restart(connector_details["status"]["rescheduled_for"], reschedule_time) + return False # Check if sync started by FivetranOperator has finished # indicated by new 'succeeded_at' timestamp if current_completed_at > previous_completed_at: @@ -399,6 +406,32 @@ def get_sync_status(self, connector_id, previous_completed_at): else: return False + def pause_and_restart(connector_id, reschedule_for, reschedule_time): + """ + While a connector is syncing, if it falls into a reschedule state, + wait for a time either specified by the user of recommended by Fivetran, + Then restart a sync + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param reschedule_for: From connector details, if schedule_type is manual, + then the connector expects triggering the event at the designated UTC time + :type reschedule_for: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int + """ + if reschedule_time: + self.log.info(f'Starting connector again in "{reschedule_time}" seconds') + time.sleep(reschedule_time) + else: + wait_time = (_parse_timestamp(reschedule_for).add(minutes=1)-pendulum.now(tz='UTC')).seconds + self.log.info(f'Starting connector again in "{wait_time}" seconds') + time.sleep(wait_time) + + self.log.info("Restarting connector now") + return self.start_fivetran_sync(connector_id) + def _parse_timestamp(self, api_time): """ Returns either the pendulum-parsed actual timestamp or From dc3ee738c4f61ce2127cb6feb8ea6f582ea4178d Mon Sep 17 00:00:00 2001 From: PubChimps Date: Mon, 23 Jan 2023 12:54:53 -0800 Subject: [PATCH 2/5] black formatting --- fivetran_provider/hooks/fivetran.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 42a2bde..0b357dc 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -387,12 +387,19 @@ def get_sync_status(self, connector_id, previous_completed_at, reschedule_time=0 sync_state = connector_details["status"]["sync_state"] self.log.info(f'Connector "{connector_id}": sync_state = {sync_state}') - - #if sync in resheduled start, wait for time recommended by Fivetran - #or manually specified, then restart sync - if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": - self.log.info(f'Connector is in "rescheduled" state and needs to be manually restarted') - self.pause_and_restart(connector_details["status"]["rescheduled_for"], reschedule_time) + + # if sync in resheduled start, wait for time recommended by Fivetran + # or manually specified, then restart sync + if ( + sync_state == "rescheduled" + and connector_details["schedule_type"] == "manual" + ): + self.log.info( + f'Connector is in "rescheduled" state and needs to be manually restarted' + ) + self.pause_and_restart( + connector_details["status"]["rescheduled_for"], reschedule_time + ) return False # Check if sync started by FivetranOperator has finished # indicated by new 'succeeded_at' timestamp @@ -414,7 +421,7 @@ def pause_and_restart(connector_id, reschedule_for, reschedule_time): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str - :param reschedule_for: From connector details, if schedule_type is manual, + :param reschedule_for: From connector details, if schedule_type is manual, then the connector expects triggering the event at the designated UTC time :type reschedule_for: str :param reschedule_time: Optional, if connector is in reset state @@ -425,10 +432,12 @@ def pause_and_restart(connector_id, reschedule_for, reschedule_time): self.log.info(f'Starting connector again in "{reschedule_time}" seconds') time.sleep(reschedule_time) else: - wait_time = (_parse_timestamp(reschedule_for).add(minutes=1)-pendulum.now(tz='UTC')).seconds + wait_time = ( + _parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC") + ).seconds self.log.info(f'Starting connector again in "{wait_time}" seconds') time.sleep(wait_time) - + self.log.info("Restarting connector now") return self.start_fivetran_sync(connector_id) From 47163a68068f10f09bfef8e142e6cf9f70099b9b Mon Sep 17 00:00:00 2001 From: PubChimps Date: Mon, 23 Jan 2023 14:10:28 -0800 Subject: [PATCH 3/5] add reschedule_time to sensor --- fivetran_provider/sensors/fivetran.py | 9 +++++++-- setup.cfg | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index 50c2485..7ae3f56 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -34,9 +34,12 @@ class FivetranSensor(BaseSensorOperator): :type fivetran_retry_limit: Optional[int] :param fivetran_retry_delay: Time to wait before retrying API request :type fivetran_retry_delay: int - :param xcom: If used, FivetranSensor receives timestamp of previously + :param xcom: Optional, if used, FivetranSensor receives timestamp of previously completed sync from FivetranOperator via XCOM :type xcom: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int """ # Define which fields get jinjaified @@ -51,6 +54,7 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", + reschedule_time=0, **kwargs: Any ) -> None: super().__init__(**kwargs) @@ -62,6 +66,7 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + self.reschedule_time = reschedule_time def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -78,4 +83,4 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync( self.connector_id, self.xcom ) - return hook.get_sync_status(self.connector_id, self.previous_completed_at) + return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) diff --git a/setup.cfg b/setup.cfg index 4c98b19..6267f2b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = airflow-provider-fivetran +name = test-airflow-provider-fivetran-thumbtack version = 1.1.4 description = A Fivetran provider for Apache Airflow long_description = file: README.md From ca5efeca552e04c95f4b007ccf8ff574b5ed432a Mon Sep 17 00:00:00 2001 From: PubChimps Date: Mon, 23 Jan 2023 14:13:35 -0800 Subject: [PATCH 4/5] Revert "add reschedule_time to sensor" This reverts commit 47163a68068f10f09bfef8e142e6cf9f70099b9b. --- fivetran_provider/sensors/fivetran.py | 9 ++------- setup.cfg | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index 7ae3f56..50c2485 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -34,12 +34,9 @@ class FivetranSensor(BaseSensorOperator): :type fivetran_retry_limit: Optional[int] :param fivetran_retry_delay: Time to wait before retrying API request :type fivetran_retry_delay: int - :param xcom: Optional, if used, FivetranSensor receives timestamp of previously + :param xcom: If used, FivetranSensor receives timestamp of previously completed sync from FivetranOperator via XCOM :type xcom: str - :param reschedule_time: Optional, if connector is in reset state - number of seconds to wait before restarting, else Fivetran suggestion used - :type reschedule_time: int """ # Define which fields get jinjaified @@ -54,7 +51,6 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", - reschedule_time=0, **kwargs: Any ) -> None: super().__init__(**kwargs) @@ -66,7 +62,6 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom - self.reschedule_time = reschedule_time def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -83,4 +78,4 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync( self.connector_id, self.xcom ) - return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) + return hook.get_sync_status(self.connector_id, self.previous_completed_at) diff --git a/setup.cfg b/setup.cfg index 6267f2b..4c98b19 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = test-airflow-provider-fivetran-thumbtack +name = airflow-provider-fivetran version = 1.1.4 description = A Fivetran provider for Apache Airflow long_description = file: README.md From ce36f506c84691fa61b34ede1eb92e8f5c0dbf3d Mon Sep 17 00:00:00 2001 From: PubChimps Date: Mon, 23 Jan 2023 14:16:33 -0800 Subject: [PATCH 5/5] add reschedule_time to sensor --- fivetran_provider/sensors/fivetran.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index 50c2485..780da71 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -37,6 +37,9 @@ class FivetranSensor(BaseSensorOperator): :param xcom: If used, FivetranSensor receives timestamp of previously completed sync from FivetranOperator via XCOM :type xcom: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int """ # Define which fields get jinjaified @@ -51,6 +54,7 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", + reschedule_time: int = 0, **kwargs: Any ) -> None: super().__init__(**kwargs) @@ -62,6 +66,7 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + self.reschedule_time = reschedule_time def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -78,4 +83,4 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync( self.connector_id, self.xcom ) - return hook.get_sync_status(self.connector_id, self.previous_completed_at) + return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time)