Skip to content

Commit

Permalink
- fix minor pagination issue
Browse files Browse the repository at this point in the history
- update integration tests
  • Loading branch information
RushiT0122 committed Nov 9, 2023
1 parent e716b27 commit bde2cc7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 2 deletions.
3 changes: 2 additions & 1 deletion tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response):
sub_stream.name, record[parent.key_properties[0]])

# After processing for all parent ids we can remove our resumption state
state.get('bookmarks').get(sub_stream.name).pop('last_processed')
if 'last_processed' in state.get('bookmarks').get(sub_stream.name):
state.get('bookmarks').get(sub_stream.name).pop('last_processed')

self.update_child_stream_bookmarks(state=state,
sub_stream=sub_stream,
Expand Down
2 changes: 1 addition & 1 deletion tests/tap_tester/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase):
# After 180 days visitor_history data cannot be synced
# so We need to manually create test data for visitors and vistor_history streams
# Once created, we should update this start date to optimise the execution time
START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z"
START_DATE_VISTOR_HISTORY = "2023-11-05T00:00:00Z"

@staticmethod
def name():
Expand Down
83 changes: 83 additions & 0 deletions tests/tap_tester/test_visitors_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
from base import TestPendoBase


class PendoAllFieldsTest(TestPendoBase):
start_date = "2019-09-10T00:00:00Z"
record_limit = 50
include_anonymous_visitors = False
def name(self):
return "pendo_all_fields_test"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return {
# To reduce the execution time to test this stream taking recently start_date
"start_date": self.start_date,
"lookback_window": "1",
"period": "dayRange",
"record_limit": self.record_limit,
"include_anonymous_visitors": self.include_anonymous_visitors
}

def test_run(self):
# Verify that visitors pagination logic work as expected named and anonymous visitors
# Note: there are 21000+ named and anonymous visitors
self.run_pagination_test(expected_streams= {"visitors"},
start_date="2019-09-10T00:00:00Z",
record_limit=10000,
include_anonymous_visitors="true")

# Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history
# Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced
self.run_pagination_test(expected_streams={"visitors", "visitor_history"},
start_date=self.START_DATE_VISTOR_HISTORY,
record_limit=50,
include_anonymous_visitors="false")


def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors):
"""
This is a canary test to verify pagination implementation for the Visitors stream.
"""
self.streams_to_test = expected_streams
self.start_date = start_date
self.record_limit = record_limit
self.include_anonymous_visitors = include_anonymous_visitors

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# Grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

record_count_by_stream = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())

self.assertSetEqual(expected_streams, synced_stream_names)
for stream in expected_streams:
with self.subTest(stream=stream):
self.assertGreaterEqual(record_count_by_stream.get(stream), 1)

0 comments on commit bde2cc7

Please sign in to comment.