Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination visitors stream #129

Merged
merged 11 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response):
sub_stream.name, record[parent.key_properties[0]])

# After processing for all parent ids we can remove our resumption state
state.get('bookmarks').get(sub_stream.name).pop('last_processed')
if 'last_processed' in state.get('bookmarks').get(sub_stream.name):
state.get('bookmarks').get(sub_stream.name).pop('last_processed')

self.update_child_stream_bookmarks(state=state,
sub_stream=sub_stream,
Expand Down Expand Up @@ -729,7 +730,17 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs):
# Request retry
yield from self.request(endpoint, params, count, **kwargs)

def get_record_count(self):
return 0

def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
loop_for_records = False
# Get total number of record count.
if self.name in ["visitors"]:
# If number of records equal to record then assume there are more records to be synced
# and save the last filter value. Otherwise assume we have extracted all records
loop_for_records = self.get_record_count() >= self.record_limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest not following this approach as it creates inverted dependancy, this function belongs to the generic implementation of the stream class, and this condition refers to the name of a child class.

i would suggest that you modify this method in the Visitors class below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or you can convert this into a generic implementation


stream_response = self.request(self.name, json=self.get_body()) or []

# Get and intialize sub-stream for the current stream
Expand All @@ -746,7 +757,7 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
# which flush out during sync_substream call above
stream_response = self.request(self.name, json=self.get_body()) or []

return (self.stream, stream_response), False
return (self.stream, stream_response), loop_for_records

class EventsBase(Stream):
DATE_WINDOW_SIZE = 1
Expand Down Expand Up @@ -1263,6 +1274,7 @@ class Visitors(LazyAggregationStream):
replication_method = "INCREMENTAL"
replication_key = "lastupdated"
key_properties = ["visitor_id"]
last_processed = None

def get_body(self, key_id=None, period=None, first=None):
include_anonymous_visitors = self.config.get('include_anonymous_visitors') or DEFAULT_INCLUDE_ANONYMOUS_VISITORS
Expand All @@ -1280,14 +1292,32 @@ def get_body(self, key_id=None, period=None, first=None):
"identified": not anons
}
}
}, {
"sort": ["visitorId"]
}, {
"filter": self.set_filter_value()
}, {
"limit": self.record_limit
}],
"requestId": "all-visitors",
"sort": [
"visitorId"
]
"requestId": "all-visitors"
}
}

def get_record_count(self):
# Get number of records to be fetched using current filter
body = self.get_body()
body["request"]["pipeline"].append({"count": None})
return list(self.request(self.name, json=body))[0]["count"]

def set_filter_value(self):
# Set the value of filter parameter in request body
if self.last_processed:
filter_value = f'visitorId>"{self.last_processed["visitor_id"]}"'
else:
filter_value = 'visitorId>\"\"'

return filter_value

def transform(self, record):
# Transform data of accounts into one level dictionary with following transformation
record['lastupdated'] = record.get('metadata').get('auto').get(
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def sync_stream(state, start_date, instance):
LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id)
counter.increment()

# preserve the last processed record which will be useful if stream supports pagination
instance.last_processed = record

# Update bookmark and write state for the stream with new_bookmark
instance.update_bookmark(state, instance.name, strftime(new_bookmark),
instance.replication_key)
Expand Down
2 changes: 1 addition & 1 deletion tests/tap_tester/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase):
# After 180 days visitor_history data cannot be synced
# so We need to manually create test data for visitors and vistor_history streams
# Once created, we should update this start date to optimise the execution time
START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z"
START_DATE_VISTOR_HISTORY = "2023-11-05T00:00:00Z"

@staticmethod
def name():
Expand Down
83 changes: 83 additions & 0 deletions tests/tap_tester/test_visitors_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
from base import TestPendoBase


class PendoAllFieldsTest(TestPendoBase):
start_date = "2019-09-10T00:00:00Z"
record_limit = 50
include_anonymous_visitors = False
def name(self):
return "pendo_all_fields_test"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return {
# To reduce the execution time to test this stream taking recently start_date
"start_date": self.start_date,
"lookback_window": "1",
"period": "dayRange",
"record_limit": self.record_limit,
"include_anonymous_visitors": self.include_anonymous_visitors
}

def test_run(self):
# Verify that visitors pagination logic work as expected named and anonymous visitors
# Note: there are 21000+ named and anonymous visitors
self.run_pagination_test(expected_streams= {"visitors"},
start_date="2019-09-10T00:00:00Z",
record_limit=10000,
include_anonymous_visitors="true")

# Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history
# Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced
self.run_pagination_test(expected_streams={"visitors", "visitor_history"},
start_date=self.START_DATE_VISTOR_HISTORY,
record_limit=50,
include_anonymous_visitors="false")


def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors):
"""
This is a canary test to verify pagination implementation for the Visitors stream.
"""
self.streams_to_test = expected_streams
self.start_date = start_date
self.record_limit = record_limit
self.include_anonymous_visitors = include_anonymous_visitors

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# Grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

record_count_by_stream = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())

self.assertSetEqual(expected_streams, synced_stream_names)
for stream in expected_streams:
with self.subTest(stream=stream):
self.assertGreaterEqual(record_count_by_stream.get(stream), 1)
10 changes: 8 additions & 2 deletions tests/unittests/test_lazy_aggregation_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from unittest import mock
from tap_pendo.streams import Visitors

# from debugpy import listen, wait_for_client
shantanu73 marked this conversation as resolved.
Show resolved Hide resolved
# listen(8000)
# wait_for_client()

class Mockresponse:
def __init__(self, resp, status_code, headers=None, raise_error=False):
self.status_code = status_code
Expand Down Expand Up @@ -39,7 +43,8 @@ class TestLazyAggregationSync(unittest.TestCase):
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is present then also all data should be return for super stream
and sync_substream should be called
Expand All @@ -60,7 +65,8 @@ def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_select
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is not selected then also all data should be return for super stream
and sync_substream should not be called
Expand Down