Skip to content

Commit

Permalink
Fix child event streams
Browse files Browse the repository at this point in the history
  • Loading branch information
RushiT0122 committed Apr 3, 2024
1 parent 4e07ab3 commit 527fd3a
Showing 1 changed file with 50 additions and 43 deletions.
93 changes: 50 additions & 43 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from tap_pendo import utils as tap_pendo_utils
from tap_pendo.exception import QueryExceededMemoryLimitException, Server42xRateLimitError


KEY_PROPERTIES = ['id']
US_BASE_URL = "https://app.pendo.io"
EU_BASE_URL = "https://app.eu.pendo.io"
Expand Down Expand Up @@ -807,49 +808,51 @@ def get_first_parameter_value(self, body):
def get_last_parameter_value(self, body):
return body['request']['pipeline'][0]['source']['timeSeries'].get('last', 'now()')

def get_record_count(self, first, last):
def get_record_count(self, first, last, key_id):
# Get number of records to be fetched using current filter
body = self.get_body(None, self.period, first, last)
body = self.get_body(key_id, self.period, first, last)
filter_index = self.get_pipeline_key_index(body, "filter")
body["request"]["pipeline"][filter_index]["filter"] = body["request"]["pipeline"][filter_index]["filter"].replace(
">=", ">")
body["request"]["pipeline"].pop(self.get_pipeline_key_index(body, 'limit'))
body["request"]["pipeline"].append({"count": None})
return self.request(self.name, json=body)["results"][0]["count"]

def create_date_windows(self, from_date, to_date):
def create_date_windows(self, from_date, to_date, key_id=None):
max_record_count = 5 * API_RECORD_LIMIT
mid_date = int(from_date + (to_date - from_date) / 2)
first_record_count = self.get_record_count(from_date, mid_date)
last_record_count = self.get_record_count(mid_date + 1, to_date)
first_record_count = self.get_record_count(from_date, mid_date, key_id)
last_record_count = self.get_record_count(mid_date + 1, to_date, key_id)
date_windows = list()
if first_record_count:
if first_record_count <= max_record_count:
date_windows.append((from_date, mid_date, first_record_count))
else:
date_windows += self.create_date_windows(from_date + 1, mid_date)
date_windows += self.create_date_windows(from_date + 1, mid_date, key_id)

if last_record_count:
if last_record_count <= max_record_count:
date_windows.append((mid_date, to_date, last_record_count))
else:
date_windows += self.create_date_windows(mid_date + 1, to_date)
date_windows += self.create_date_windows(mid_date + 1, to_date, key_id)

# merge the adjacent windows if their sum is lesser than max record limit
merged_date_windows = []
current_start, current_end, record_count = date_windows[0]

for start, end, value in date_windows[1:]:
if record_count + value < max_record_count:
current_end = end
record_count += value
else:
merged_date_windows.append((current_start, current_end, record_count))
current_start, current_end, record_count = start, end, value
if len(date_windows) > 1:
merged_date_windows = []
current_start, current_end, record_count = date_windows[0]

for start, end, value in date_windows[1:]:
if record_count + value < max_record_count:
current_end = end
record_count += value
else:
merged_date_windows.append((current_start, current_end, record_count))
current_start, current_end, record_count = start, end, value

merged_date_windows.append((current_start, current_end, record_count))
merged_date_windows.append((current_start, current_end, record_count))
date_windows = merged_date_windows

return merged_date_windows
return date_windows

def remove_duplicate_records(self, new_records, last_processed):
if not last_processed:
Expand Down Expand Up @@ -884,28 +887,32 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
end_time = int(datetime.now().timestamp() * 1000)

if self.date_windows is None:
self.date_windows = self.create_date_windows(start_time, end_time)
self.date_windows = self.create_date_windows(start_time, end_time, key_id)
LOGGER.info("Date windows identified:")
for window in self.date_windows:
LOGGER.info(f"\t{window}")

start_time, end_time, record_count = self.date_windows.pop(0)
body = self.get_body(key_id, period, start_time, end_time)
body = self.set_time_series_first(body, [], start_time, end_time)
body = self.set_request_body_filters(body, start_time, [])
if len(self.date_windows):
start_time, end_time, record_count = self.date_windows.pop(0)
body = self.get_body(key_id, period, start_time, end_time)
body = self.set_time_series_first(body, [], start_time, end_time)
body = self.set_request_body_filters(body, start_time, [])

stream_records = self.request(self.name, json=body).get('results') or []

stream_records = self.request(self.name, json=body).get('results') or []
if len(stream_records) >= self.record_limit:
# if record limit has reached then there is a possibility to have more records to extract
# so we should set the start next extraction from replication value of last record.
# But we must remove overlapping records from first sync
stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records)

if len(stream_records) >= self.record_limit:
# if record limit has reached then there is a possibility to have more records to extract
# so we should set the start next extraction from replication value of last record.
# But we must remove overlapping records from first sync
stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records)
start_time = self.last_processed[0][humps.decamelize(self.replication_key)]
self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed)))

start_time = self.last_processed[0][humps.decamelize(self.replication_key)]
self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed)))
return (self.stream, stream_records), len(self.date_windows) > 0

return (self.stream, stream_records), len(self.date_windows) > 0
self.date_windows = None
return (self.stream, []), False

class Accounts(Stream):
name = "accounts"
Expand Down Expand Up @@ -986,8 +993,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}})
return body

Expand Down Expand Up @@ -1084,8 +1091,8 @@ def __init__(self, config):
self.period = config.get('period')
self.replication_key = 'browser_time'

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}})
return body

Expand All @@ -1099,8 +1106,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}})
return body

Expand All @@ -1113,8 +1120,8 @@ def __init__(self, config):
super().__init__(config=config)
self.replication_key = 'browser_time'

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}})
return body

Expand Down Expand Up @@ -1210,8 +1217,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}})
return body

Expand Down

0 comments on commit 527fd3a

Please sign in to comment.