From abea1135a80721a8ef3ca57e4045cbca8087c9a6 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Wed, 8 Nov 2023 10:31:00 +0000 Subject: [PATCH 01/11] add custom pagination support for Visitors stream --- tap_pendo/streams.py | 29 ++++++++++++++++++++++++++++- tap_pendo/sync.py | 3 +++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 8ba4575..fd71ec5 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -729,7 +729,17 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs): # Request retry yield from self.request(endpoint, params, count, **kwargs) + def get_record_count(self): + return 0 + def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): + loop_for_records = False + # Get total number of record count. + if self.name in ["visitors"]: + # If number of records equal to record then assume there are more records to be synced + # and save the last filter value. Otherwise assume we have extracted all records + loop_for_records = self.get_record_count() >= self.record_limit + stream_response = self.request(self.name, json=self.get_body()) or [] # Get and intialize sub-stream for the current stream @@ -746,7 +756,7 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): # which flush out during sync_substream call above stream_response = self.request(self.name, json=self.get_body()) or [] - return (self.stream, stream_response), False + return (self.stream, stream_response), loop_for_records class EventsBase(Stream): DATE_WINDOW_SIZE = 1 @@ -1263,6 +1273,7 @@ class Visitors(LazyAggregationStream): replication_method = "INCREMENTAL" replication_key = "lastupdated" key_properties = ["visitor_id"] + last_processed = None def get_body(self, key_id=None, period=None, first=None): include_anonymous_visitors = self.config.get('include_anonymous_visitors') or DEFAULT_INCLUDE_ANONYMOUS_VISITORS @@ -1280,6 +1291,12 @@ def get_body(self, key_id=None, period=None, first=None): "identified": not anons } } + }, { + "sort": ["metadata.auto.idhash"] + }, { + "filter": f"metadata.auto.idhash>{self.set_filter_value()}" + }, { + "limit": self.record_limit }], "requestId": "all-visitors", "sort": [ @@ -1288,6 +1305,16 @@ def get_body(self, key_id=None, period=None, first=None): } } + def get_record_count(self): + # Get number of records to be fetched using current filter + body = self.get_body() + body["request"]["pipeline"].append({"count": None}) + return list(self.request(self.name, json=body))[0]["count"] + + def set_filter_value(self): + # Set the value of filter parameter in request body + return self.last_processed["metadata_auto"]["idhash"] if self.last_processed else 1 + def transform(self, record): # Transform data of accounts into one level dictionary with following transformation record['lastupdated'] = record.get('metadata').get('auto').get( diff --git a/tap_pendo/sync.py b/tap_pendo/sync.py index 934a407..93d5d51 100644 --- a/tap_pendo/sync.py +++ b/tap_pendo/sync.py @@ -59,6 +59,9 @@ def sync_stream(state, start_date, instance): LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id) counter.increment() + # preserve the last processed record which will be useful if stream supports pagination + instance.last_processed = record + # Update bookmark and write state for the stream with new_bookmark instance.update_bookmark(state, instance.name, strftime(new_bookmark), instance.replication_key) From 45b8b2e7e947848289d79f4150c9bfd1824dce2d Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Wed, 8 Nov 2023 10:51:31 +0000 Subject: [PATCH 02/11] update unittests --- tests/unittests/test_lazy_aggregation_sync.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_lazy_aggregation_sync.py b/tests/unittests/test_lazy_aggregation_sync.py index 2df5546..7d853d8 100644 --- a/tests/unittests/test_lazy_aggregation_sync.py +++ b/tests/unittests/test_lazy_aggregation_sync.py @@ -3,6 +3,10 @@ from unittest import mock from tap_pendo.streams import Visitors +# from debugpy import listen, wait_for_client +# listen(8000) +# wait_for_client() + class Mockresponse: def __init__(self, resp, status_code, headers=None, raise_error=False): self.status_code = status_code @@ -39,7 +43,8 @@ class TestLazyAggregationSync(unittest.TestCase): @mock.patch("requests.Session.send") @mock.patch("tap_pendo.streams.Stream.is_selected") @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) - def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + @mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100) + def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request): ''' Verify that if sub stream is present then also all data should be return for super stream and sync_substream should be called @@ -60,7 +65,8 @@ def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_select @mock.patch("requests.Session.send") @mock.patch("tap_pendo.streams.Stream.is_selected") @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) - def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + @mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100) + def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request): ''' Verify that if sub stream is not selected then also all data should be return for super stream and sync_substream should not be called From e716b2732d6e4fc2bd1d7e674fb103a1a650a617 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Wed, 8 Nov 2023 12:12:28 +0000 Subject: [PATCH 03/11] fix the vistors filter value --- tap_pendo/streams.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index fd71ec5..203b2fc 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -1292,16 +1292,13 @@ def get_body(self, key_id=None, period=None, first=None): } } }, { - "sort": ["metadata.auto.idhash"] + "sort": ["visitorId"] }, { - "filter": f"metadata.auto.idhash>{self.set_filter_value()}" + "filter": self.set_filter_value() }, { "limit": self.record_limit }], - "requestId": "all-visitors", - "sort": [ - "visitorId" - ] + "requestId": "all-visitors" } } @@ -1313,7 +1310,12 @@ def get_record_count(self): def set_filter_value(self): # Set the value of filter parameter in request body - return self.last_processed["metadata_auto"]["idhash"] if self.last_processed else 1 + if self.last_processed: + filter_value = f'visitorId>"{self.last_processed["visitor_id"]}"' + else: + filter_value = 'visitorId>\"\"' + + return filter_value def transform(self, record): # Transform data of accounts into one level dictionary with following transformation From bde2cc7c4a126c1d4c6f0b69b8023d9bdc90e665 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Thu, 9 Nov 2023 08:58:46 +0000 Subject: [PATCH 04/11] - fix minor pagination issue - update integration tests --- tap_pendo/streams.py | 3 +- tests/tap_tester/base.py | 2 +- tests/tap_tester/test_visitors_pagination.py | 83 ++++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 tests/tap_tester/test_visitors_pagination.py diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 203b2fc..0cee53b 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -507,7 +507,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response): sub_stream.name, record[parent.key_properties[0]]) # After processing for all parent ids we can remove our resumption state - state.get('bookmarks').get(sub_stream.name).pop('last_processed') + if 'last_processed' in state.get('bookmarks').get(sub_stream.name): + state.get('bookmarks').get(sub_stream.name).pop('last_processed') self.update_child_stream_bookmarks(state=state, sub_stream=sub_stream, diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py index c2b58d9..ee2e983 100644 --- a/tests/tap_tester/base.py +++ b/tests/tap_tester/base.py @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase): # After 180 days visitor_history data cannot be synced # so We need to manually create test data for visitors and vistor_history streams # Once created, we should update this start date to optimise the execution time - START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z" + START_DATE_VISTOR_HISTORY = "2023-11-05T00:00:00Z" @staticmethod def name(): diff --git a/tests/tap_tester/test_visitors_pagination.py b/tests/tap_tester/test_visitors_pagination.py new file mode 100644 index 0000000..14472f3 --- /dev/null +++ b/tests/tap_tester/test_visitors_pagination.py @@ -0,0 +1,83 @@ +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner +from base import TestPendoBase + + +class PendoAllFieldsTest(TestPendoBase): + start_date = "2019-09-10T00:00:00Z" + record_limit = 50 + include_anonymous_visitors = False + def name(self): + return "pendo_all_fields_test" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return { + # To reduce the execution time to test this stream taking recently start_date + "start_date": self.start_date, + "lookback_window": "1", + "period": "dayRange", + "record_limit": self.record_limit, + "include_anonymous_visitors": self.include_anonymous_visitors + } + + def test_run(self): + # Verify that visitors pagination logic work as expected named and anonymous visitors + # Note: there are 21000+ named and anonymous visitors + self.run_pagination_test(expected_streams= {"visitors"}, + start_date="2019-09-10T00:00:00Z", + record_limit=10000, + include_anonymous_visitors="true") + + # Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history + # Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced + self.run_pagination_test(expected_streams={"visitors", "visitor_history"}, + start_date=self.START_DATE_VISTOR_HISTORY, + record_limit=50, + include_anonymous_visitors="false") + + + def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors): + """ + This is a canary test to verify pagination implementation for the Visitors stream. + """ + self.streams_to_test = expected_streams + self.start_date = start_date + self.record_limit = record_limit + self.include_anonymous_visitors = include_anonymous_visitors + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + # Grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set( + fields_from_field_level_md) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + + self.assertSetEqual(expected_streams, synced_stream_names) + for stream in expected_streams: + with self.subTest(stream=stream): + self.assertGreaterEqual(record_count_by_stream.get(stream), 1) From 39235a18ba79df4f9899ce3343f2dcd2d3e04909 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Thu, 9 Nov 2023 11:34:08 +0000 Subject: [PATCH 05/11] fix test name and start date --- tests/tap_tester/base.py | 2 +- tests/tap_tester/test_visitors_pagination.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py index ee2e983..d499aea 100644 --- a/tests/tap_tester/base.py +++ b/tests/tap_tester/base.py @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase): # After 180 days visitor_history data cannot be synced # so We need to manually create test data for visitors and vistor_history streams # Once created, we should update this start date to optimise the execution time - START_DATE_VISTOR_HISTORY = "2023-11-05T00:00:00Z" + START_DATE_VISTOR_HISTORY = "2023-06-01T00:00:00Z" @staticmethod def name(): diff --git a/tests/tap_tester/test_visitors_pagination.py b/tests/tap_tester/test_visitors_pagination.py index 14472f3..cd59df9 100644 --- a/tests/tap_tester/test_visitors_pagination.py +++ b/tests/tap_tester/test_visitors_pagination.py @@ -9,7 +9,7 @@ class PendoAllFieldsTest(TestPendoBase): record_limit = 50 include_anonymous_visitors = False def name(self): - return "pendo_all_fields_test" + return "pendo_visitors_pagination_test" def get_properties(self, original: bool = True): """Configuration properties required for the tap.""" From adc1e55f2134b685519b6fcf4e211b8c6f2c2c55 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Thu, 9 Nov 2023 15:25:50 +0000 Subject: [PATCH 06/11] fix the review --- tap_pendo/streams.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 0cee53b..85d9703 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -733,14 +733,11 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs): def get_record_count(self): return 0 - def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): - loop_for_records = False - # Get total number of record count. - if self.name in ["visitors"]: - # If number of records equal to record then assume there are more records to be synced - # and save the last filter value. Otherwise assume we have extracted all records - loop_for_records = self.get_record_count() >= self.record_limit + def is_loop_required(self): + return False + def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): + loop_for_records = self.is_loop_required() stream_response = self.request(self.name, json=self.get_body()) or [] # Get and intialize sub-stream for the current stream @@ -1309,6 +1306,11 @@ def get_record_count(self): body["request"]["pipeline"].append({"count": None}) return list(self.request(self.name, json=body))[0]["count"] + def is_loop_required(self): + # If number of records equal to record then assume there are more records to be synced + # and save the last filter value. Otherwise assume we have extracted all records + return self.get_record_count() >= self.record_limit + def set_filter_value(self): # Set the value of filter parameter in request body if self.last_processed: From 2559c58909a569dab7a65a8b7653c598682ba57f Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Fri, 10 Nov 2023 12:53:17 +0000 Subject: [PATCH 07/11] fix regression --- tap_pendo/streams.py | 7 ++++--- tap_pendo/sync.py | 2 +- tests/tap_tester/base.py | 2 +- tests/tap_tester/test_visitors_pagination.py | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 85d9703..744f3ee 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -141,6 +141,7 @@ class Stream(): period = None request_retry_count = 1 last_processed = None + last_synced_record = None # initialized the endpoint attribute which can be overriden by child streams based on # the different parameters used by the stream. @@ -1271,7 +1272,7 @@ class Visitors(LazyAggregationStream): replication_method = "INCREMENTAL" replication_key = "lastupdated" key_properties = ["visitor_id"] - last_processed = None + last_synced_record = None def get_body(self, key_id=None, period=None, first=None): include_anonymous_visitors = self.config.get('include_anonymous_visitors') or DEFAULT_INCLUDE_ANONYMOUS_VISITORS @@ -1313,8 +1314,8 @@ def is_loop_required(self): def set_filter_value(self): # Set the value of filter parameter in request body - if self.last_processed: - filter_value = f'visitorId>"{self.last_processed["visitor_id"]}"' + if self.last_synced_record: + filter_value = f'visitorId>"{self.last_synced_record["visitor_id"]}"' else: filter_value = 'visitorId>\"\"' diff --git a/tap_pendo/sync.py b/tap_pendo/sync.py index 93d5d51..c27b1a0 100644 --- a/tap_pendo/sync.py +++ b/tap_pendo/sync.py @@ -60,7 +60,7 @@ def sync_stream(state, start_date, instance): counter.increment() # preserve the last processed record which will be useful if stream supports pagination - instance.last_processed = record + instance.last_synced_record = record # Update bookmark and write state for the stream with new_bookmark instance.update_bookmark(state, instance.name, strftime(new_bookmark), diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py index d499aea..280892f 100644 --- a/tests/tap_tester/base.py +++ b/tests/tap_tester/base.py @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase): # After 180 days visitor_history data cannot be synced # so We need to manually create test data for visitors and vistor_history streams # Once created, we should update this start date to optimise the execution time - START_DATE_VISTOR_HISTORY = "2023-06-01T00:00:00Z" + START_DATE_VISTOR_HISTORY = "2023-11-06T00:00:00Z" @staticmethod def name(): diff --git a/tests/tap_tester/test_visitors_pagination.py b/tests/tap_tester/test_visitors_pagination.py index cd59df9..77e0a74 100644 --- a/tests/tap_tester/test_visitors_pagination.py +++ b/tests/tap_tester/test_visitors_pagination.py @@ -24,8 +24,9 @@ def get_properties(self, original: bool = True): def test_run(self): # Verify that visitors pagination logic work as expected named and anonymous visitors + # without impacting other stream replication # Note: there are 21000+ named and anonymous visitors - self.run_pagination_test(expected_streams= {"visitors"}, + self.run_pagination_test(expected_streams= {"accounts", "features", "feature_events", "visitors"}, start_date="2019-09-10T00:00:00Z", record_limit=10000, include_anonymous_visitors="true") From dd54ff0fe204282fb839448e53ca2e4b8d62ac6c Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Fri, 10 Nov 2023 13:06:29 +0000 Subject: [PATCH 08/11] fix lint --- tap_pendo/streams.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 744f3ee..48d7eb2 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -141,7 +141,7 @@ class Stream(): period = None request_retry_count = 1 last_processed = None - last_synced_record = None + last_synced_record = {} # initialized the endpoint attribute which can be overriden by child streams based on # the different parameters used by the stream. @@ -1272,7 +1272,6 @@ class Visitors(LazyAggregationStream): replication_method = "INCREMENTAL" replication_key = "lastupdated" key_properties = ["visitor_id"] - last_synced_record = None def get_body(self, key_id=None, period=None, first=None): include_anonymous_visitors = self.config.get('include_anonymous_visitors') or DEFAULT_INCLUDE_ANONYMOUS_VISITORS From 8a2a4e6bb065602271157ef8f632a9520a06ada4 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Fri, 10 Nov 2023 15:45:26 +0000 Subject: [PATCH 09/11] update start date to reduce the execution time and minor refoctoring --- tests/tap_tester/test_start_date.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/tap_tester/test_start_date.py b/tests/tap_tester/test_start_date.py index a09db54..09cb44e 100644 --- a/tests/tap_tester/test_start_date.py +++ b/tests/tap_tester/test_start_date.py @@ -31,11 +31,11 @@ def test_run(self): # All these streams have similar implementation like guides and guide_events so removing this test to limit the execution time # self.run_test("2020-09-01T00:00:00Z", "2021-03-01T00:00:00Z", {"features", "feature_events", "pages", "page_events", "events", "track_types", "track_events"}) - + # Visitors history can be retrieved only for 180 days so to reduce execution time setting first start time older than 180 days back self.run_test( - start_date_1="2022-06-25T00:00:00Z", - start_date_2="2022-07-20T00:00:00Z", + start_date_1=self.START_DATE_VISTOR_HISTORY, + start_date_2="2023-11-09T00:00:00Z", streams={"visitors", "visitor_history"}) def expected_metadata(self): @@ -72,7 +72,7 @@ def run_test(self, start_date_1, start_date_2, streams): self.start_date_1 = start_date_1 self.start_date_2 = start_date_2 self.streams = streams - + self.start_date = self.start_date_1 expected_streams = streams @@ -100,7 +100,7 @@ def run_test(self, start_date_1, start_date_2, streams): ########################################################################## # Update START DATE Between Syncs ########################################################################## - + LOGGER.info("REPLICATION START DATE CHANGE: {} ===>>> {} ".format( self.start_date, self.start_date_2)) self.start_date = self.start_date_2 From ed72fba9a6ae45b0b70f905cf03f96f4c50fce92 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Tue, 14 Nov 2023 14:08:44 +0000 Subject: [PATCH 10/11] setup and changelog update --- CHANGELOG.md | 3 +++ setup.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef28894..ac3b8f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.1.0 + * Add the the custom pagination support for the Visitors stream [#126](https://github.com/singer-io/tap-pendo/pull/129) + ## 1.0.1 * Fix infinite loop issue [#126](https://github.com/singer-io/tap-pendo/pull/126) diff --git a/setup.py b/setup.py index 648c67e..ef4025a 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="tap-pendo", - version="1.0.1", + version="1.1.0", description="Singer.io tap for extracting data", author="Stitch", url="https://github.com/singer-io/tap-pendo", From e927287d3f6cca66a8ddfe7f201348dbd7d1af88 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Tue, 14 Nov 2023 14:42:37 +0000 Subject: [PATCH 11/11] remove unused debug statement --- tests/unittests/test_lazy_aggregation_sync.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/unittests/test_lazy_aggregation_sync.py b/tests/unittests/test_lazy_aggregation_sync.py index 7d853d8..9daaf78 100644 --- a/tests/unittests/test_lazy_aggregation_sync.py +++ b/tests/unittests/test_lazy_aggregation_sync.py @@ -3,10 +3,6 @@ from unittest import mock from tap_pendo.streams import Visitors -# from debugpy import listen, wait_for_client -# listen(8000) -# wait_for_client() - class Mockresponse: def __init__(self, resp, status_code, headers=None, raise_error=False): self.status_code = status_code