Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination visitors stream #129

Merged
merged 11 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.1.0
* Add the the custom pagination support for the Visitors stream [#126](https://github.com/singer-io/tap-pendo/pull/129)

## 1.0.1
* Fix infinite loop issue [#126](https://github.com/singer-io/tap-pendo/pull/126)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="tap-pendo",
version="1.0.1",
version="1.1.0",
description="Singer.io tap for extracting data",
author="Stitch",
url="https://github.com/singer-io/tap-pendo",
Expand Down
44 changes: 38 additions & 6 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class Stream():
period = None
request_retry_count = 1
last_processed = None
last_synced_record = {}

# initialized the endpoint attribute which can be overriden by child streams based on
# the different parameters used by the stream.
Expand Down Expand Up @@ -507,7 +508,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response):
sub_stream.name, record[parent.key_properties[0]])

# After processing for all parent ids we can remove our resumption state
state.get('bookmarks').get(sub_stream.name).pop('last_processed')
if 'last_processed' in state.get('bookmarks').get(sub_stream.name):
state.get('bookmarks').get(sub_stream.name).pop('last_processed')

self.update_child_stream_bookmarks(state=state,
sub_stream=sub_stream,
Expand Down Expand Up @@ -729,7 +731,14 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs):
# Request retry
yield from self.request(endpoint, params, count, **kwargs)

def get_record_count(self):
return 0

def is_loop_required(self):
return False

def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
loop_for_records = self.is_loop_required()
stream_response = self.request(self.name, json=self.get_body()) or []

# Get and intialize sub-stream for the current stream
Expand All @@ -746,7 +755,7 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
# which flush out during sync_substream call above
stream_response = self.request(self.name, json=self.get_body()) or []

return (self.stream, stream_response), False
return (self.stream, stream_response), loop_for_records

class EventsBase(Stream):
DATE_WINDOW_SIZE = 1
Expand Down Expand Up @@ -1280,14 +1289,37 @@ def get_body(self, key_id=None, period=None, first=None):
"identified": not anons
}
}
}, {
"sort": ["visitorId"]
}, {
"filter": self.set_filter_value()
}, {
"limit": self.record_limit
}],
"requestId": "all-visitors",
"sort": [
"visitorId"
]
"requestId": "all-visitors"
}
}

def get_record_count(self):
# Get number of records to be fetched using current filter
body = self.get_body()
body["request"]["pipeline"].append({"count": None})
return list(self.request(self.name, json=body))[0]["count"]

def is_loop_required(self):
# If number of records equal to record then assume there are more records to be synced
# and save the last filter value. Otherwise assume we have extracted all records
return self.get_record_count() >= self.record_limit

def set_filter_value(self):
# Set the value of filter parameter in request body
if self.last_synced_record:
filter_value = f'visitorId>"{self.last_synced_record["visitor_id"]}"'
else:
filter_value = 'visitorId>\"\"'

return filter_value

def transform(self, record):
# Transform data of accounts into one level dictionary with following transformation
record['lastupdated'] = record.get('metadata').get('auto').get(
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def sync_stream(state, start_date, instance):
LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id)
counter.increment()

# preserve the last processed record which will be useful if stream supports pagination
instance.last_synced_record = record

# Update bookmark and write state for the stream with new_bookmark
instance.update_bookmark(state, instance.name, strftime(new_bookmark),
instance.replication_key)
Expand Down
2 changes: 1 addition & 1 deletion tests/tap_tester/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase):
# After 180 days visitor_history data cannot be synced
# so We need to manually create test data for visitors and vistor_history streams
# Once created, we should update this start date to optimise the execution time
START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z"
START_DATE_VISTOR_HISTORY = "2023-11-06T00:00:00Z"

@staticmethod
def name():
Expand Down
10 changes: 5 additions & 5 deletions tests/tap_tester/test_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def test_run(self):

# All these streams have similar implementation like guides and guide_events so removing this test to limit the execution time
# self.run_test("2020-09-01T00:00:00Z", "2021-03-01T00:00:00Z", {"features", "feature_events", "pages", "page_events", "events", "track_types", "track_events"})

# Visitors history can be retrieved only for 180 days so to reduce execution time setting first start time older than 180 days back
self.run_test(
start_date_1="2022-06-25T00:00:00Z",
start_date_2="2022-07-20T00:00:00Z",
start_date_1=self.START_DATE_VISTOR_HISTORY,
start_date_2="2023-11-09T00:00:00Z",
streams={"visitors", "visitor_history"})

def expected_metadata(self):
Expand Down Expand Up @@ -72,7 +72,7 @@ def run_test(self, start_date_1, start_date_2, streams):
self.start_date_1 = start_date_1
self.start_date_2 = start_date_2
self.streams = streams

self.start_date = self.start_date_1

expected_streams = streams
Expand Down Expand Up @@ -100,7 +100,7 @@ def run_test(self, start_date_1, start_date_2, streams):
##########################################################################
# Update START DATE Between Syncs
##########################################################################

LOGGER.info("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(
self.start_date, self.start_date_2))
self.start_date = self.start_date_2
Expand Down
84 changes: 84 additions & 0 deletions tests/tap_tester/test_visitors_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
from base import TestPendoBase


class PendoAllFieldsTest(TestPendoBase):
start_date = "2019-09-10T00:00:00Z"
record_limit = 50
include_anonymous_visitors = False
def name(self):
return "pendo_visitors_pagination_test"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return {
# To reduce the execution time to test this stream taking recently start_date
"start_date": self.start_date,
"lookback_window": "1",
"period": "dayRange",
"record_limit": self.record_limit,
"include_anonymous_visitors": self.include_anonymous_visitors
}

def test_run(self):
# Verify that visitors pagination logic work as expected named and anonymous visitors
# without impacting other stream replication
# Note: there are 21000+ named and anonymous visitors
self.run_pagination_test(expected_streams= {"accounts", "features", "feature_events", "visitors"},
start_date="2019-09-10T00:00:00Z",
record_limit=10000,
include_anonymous_visitors="true")

# Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history
# Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced
self.run_pagination_test(expected_streams={"visitors", "visitor_history"},
start_date=self.START_DATE_VISTOR_HISTORY,
record_limit=50,
include_anonymous_visitors="false")


def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors):
"""
This is a canary test to verify pagination implementation for the Visitors stream.
"""
self.streams_to_test = expected_streams
self.start_date = start_date
self.record_limit = record_limit
self.include_anonymous_visitors = include_anonymous_visitors

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# Grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

record_count_by_stream = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())

self.assertSetEqual(expected_streams, synced_stream_names)
for stream in expected_streams:
with self.subTest(stream=stream):
self.assertGreaterEqual(record_count_by_stream.get(stream), 1)
6 changes: 4 additions & 2 deletions tests/unittests/test_lazy_aggregation_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class TestLazyAggregationSync(unittest.TestCase):
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is present then also all data should be return for super stream
and sync_substream should be called
Expand All @@ -60,7 +61,8 @@ def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_select
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is not selected then also all data should be return for super stream
and sync_substream should not be called
Expand Down