From 527fd3aa08cf6c32cd13cad2a2d00b8f4892ecbb Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Wed, 3 Apr 2024 16:43:55 +0000 Subject: [PATCH] Fix child event streams --- tap_pendo/streams.py | 93 ++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 1a5d6eb..83e2552 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -22,6 +22,7 @@ from tap_pendo import utils as tap_pendo_utils from tap_pendo.exception import QueryExceededMemoryLimitException, Server42xRateLimitError + KEY_PROPERTIES = ['id'] US_BASE_URL = "https://app.pendo.io" EU_BASE_URL = "https://app.eu.pendo.io" @@ -807,9 +808,9 @@ def get_first_parameter_value(self, body): def get_last_parameter_value(self, body): return body['request']['pipeline'][0]['source']['timeSeries'].get('last', 'now()') - def get_record_count(self, first, last): + def get_record_count(self, first, last, key_id): # Get number of records to be fetched using current filter - body = self.get_body(None, self.period, first, last) + body = self.get_body(key_id, self.period, first, last) filter_index = self.get_pipeline_key_index(body, "filter") body["request"]["pipeline"][filter_index]["filter"] = body["request"]["pipeline"][filter_index]["filter"].replace( ">=", ">") @@ -817,39 +818,41 @@ def get_record_count(self, first, last): body["request"]["pipeline"].append({"count": None}) return self.request(self.name, json=body)["results"][0]["count"] - def create_date_windows(self, from_date, to_date): + def create_date_windows(self, from_date, to_date, key_id=None): max_record_count = 5 * API_RECORD_LIMIT mid_date = int(from_date + (to_date - from_date) / 2) - first_record_count = self.get_record_count(from_date, mid_date) - last_record_count = self.get_record_count(mid_date + 1, to_date) + first_record_count = self.get_record_count(from_date, mid_date, key_id) + last_record_count = self.get_record_count(mid_date + 1, to_date, key_id) date_windows = list() if first_record_count: if first_record_count <= max_record_count: date_windows.append((from_date, mid_date, first_record_count)) else: - date_windows += self.create_date_windows(from_date + 1, mid_date) + date_windows += self.create_date_windows(from_date + 1, mid_date, key_id) if last_record_count: if last_record_count <= max_record_count: date_windows.append((mid_date, to_date, last_record_count)) else: - date_windows += self.create_date_windows(mid_date + 1, to_date) + date_windows += self.create_date_windows(mid_date + 1, to_date, key_id) # merge the adjacent windows if their sum is lesser than max record limit - merged_date_windows = [] - current_start, current_end, record_count = date_windows[0] - - for start, end, value in date_windows[1:]: - if record_count + value < max_record_count: - current_end = end - record_count += value - else: - merged_date_windows.append((current_start, current_end, record_count)) - current_start, current_end, record_count = start, end, value + if len(date_windows) > 1: + merged_date_windows = [] + current_start, current_end, record_count = date_windows[0] + + for start, end, value in date_windows[1:]: + if record_count + value < max_record_count: + current_end = end + record_count += value + else: + merged_date_windows.append((current_start, current_end, record_count)) + current_start, current_end, record_count = start, end, value - merged_date_windows.append((current_start, current_end, record_count)) + merged_date_windows.append((current_start, current_end, record_count)) + date_windows = merged_date_windows - return merged_date_windows + return date_windows def remove_duplicate_records(self, new_records, last_processed): if not last_processed: @@ -884,28 +887,32 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): end_time = int(datetime.now().timestamp() * 1000) if self.date_windows is None: - self.date_windows = self.create_date_windows(start_time, end_time) + self.date_windows = self.create_date_windows(start_time, end_time, key_id) LOGGER.info("Date windows identified:") for window in self.date_windows: LOGGER.info(f"\t{window}") - start_time, end_time, record_count = self.date_windows.pop(0) - body = self.get_body(key_id, period, start_time, end_time) - body = self.set_time_series_first(body, [], start_time, end_time) - body = self.set_request_body_filters(body, start_time, []) + if len(self.date_windows): + start_time, end_time, record_count = self.date_windows.pop(0) + body = self.get_body(key_id, period, start_time, end_time) + body = self.set_time_series_first(body, [], start_time, end_time) + body = self.set_request_body_filters(body, start_time, []) + + stream_records = self.request(self.name, json=body).get('results') or [] - stream_records = self.request(self.name, json=body).get('results') or [] + if len(stream_records) >= self.record_limit: + # if record limit has reached then there is a possibility to have more records to extract + # so we should set the start next extraction from replication value of last record. + # But we must remove overlapping records from first sync + stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records) - if len(stream_records) >= self.record_limit: - # if record limit has reached then there is a possibility to have more records to extract - # so we should set the start next extraction from replication value of last record. - # But we must remove overlapping records from first sync - stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records) + start_time = self.last_processed[0][humps.decamelize(self.replication_key)] + self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed))) - start_time = self.last_processed[0][humps.decamelize(self.replication_key)] - self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed))) + return (self.stream, stream_records), len(self.date_windows) > 0 - return (self.stream, stream_records), len(self.date_windows) > 0 + self.date_windows = None + return (self.stream, []), False class Accounts(Stream): name = "accounts" @@ -986,8 +993,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}}) return body @@ -1084,8 +1091,8 @@ def __init__(self, config): self.period = config.get('period') self.replication_key = 'browser_time' - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}}) return body @@ -1099,8 +1106,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}}) return body @@ -1113,8 +1120,8 @@ def __init__(self, config): super().__init__(config=config) self.replication_key = 'browser_time' - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}}) return body @@ -1210,8 +1217,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}}) return body