Skip to content

Commit

Permalink
[source-klaviyo] - Update to CDK v3.9.0 (#42121)
Browse files Browse the repository at this point in the history
  • Loading branch information
pnilan authored Jul 24, 2024
1 parent 8bf87e8 commit 38fe6ae
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,6 @@
"destination_sync_mode": "append",
"primary_key": [["id"]]
},
{
"stream": {
"name": "lists_detailed",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": ["updated"],
"destination_sync_mode": "append",
"primary_key": [["id"]]
},
{
"stream": {
"name": "flows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
dockerImageTag: 2.7.8
dockerImageTag: 2.8.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-klaviyo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.7.8"
version = "2.8.0"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_klaviyo"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte_cdk = "^2"
airbyte_cdk = "^3"

[tool.poetry.scripts]
source-klaviyo = "source_klaviyo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from requests import HTTPError, codes


class KlaviyoAvailabilityStrategy(HttpAvailabilityStrategy):
def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional[Source], error: HTTPError
) -> Dict[int, str]:
reasons_for_codes: Dict[int, str] = super().reasons_for_unavailable_status_codes(stream, logger, source, error)
reasons_for_codes: Dict[int, str] = {}
for status_code, error_resolution in DEFAULT_ERROR_MAPPING.items():
reasons_for_codes[status_code] = error_resolution.error_message
reasons_for_codes[codes.UNAUTHORIZED] = (
"This is most likely due to insufficient permissions on the credentials in use. "
f"Try to create and use an API key with read permission for the '{stream.name}' stream granted"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ def update_target_records_with_included(
yield target_record

def extract_records_by_path(self, response: requests.Response, field_paths: list = None) -> Iterable[Mapping[str, Any]]:
response_body = self.decoder.decode(response)
try:
response_body = response.json()
except Exception as e:
raise Exception(f"Failed to parse response body as JSON: {e}")

# Extract data from the response body based on the provided field paths
if not field_paths:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Optional, Union

import requests
from airbyte_cdk.sources.streams.http.error_handlers import DefaultBackoffStrategy
from source_klaviyo.exceptions import KlaviyoBackoffError


class KlaviyoBackoffStrategy(DefaultBackoffStrategy):
def __init__(self, max_time: int, name: str) -> None:

self._max_time = max_time
self._name = name

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs
) -> Optional[float]:

if isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code == 429:
retry_after = response_or_exception.headers.get("Retry-After")
retry_after = float(retry_after) if retry_after else None
if retry_after and retry_after >= self._max_time:
raise KlaviyoBackoffError(
f"Stream {self._name} has reached rate limit with 'Retry-After' of {retry_after} seconds, exit from stream."
)
return float(retry_after) if retry_after else None
return None
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ definitions:
authenticator: "#/definitions/authenticator"
http_method: GET
error_handler:
type: CompositeErrorHandler
error_handlers:
- type: DefaultErrorHandler
backoff_strategies:
- type: WaitTimeFromHeader
header: "Retry-After"
response_filters:
- type: HttpResponseFilter
action: RETRY
http_codes: [429]
- type: DefaultErrorHandler # adding this DefaultErrorHandler for 5XX error codes
- type: DefaultErrorHandler
response_filters:
- type: HttpResponseFilter
action: FAIL
http_codes: [401, 403]
error_message: Please provide a valid API key and make sure it has permissions to read specified streams.
type: DefaultErrorHandler
backoff_strategies:
- type: WaitTimeFromHeader
header: "Retry-After"
response_filters:
- type: HttpResponseFilter
action: RATE_LIMITED
http_codes: [429]
- type: HttpResponseFilter
action: FAIL
http_codes: [401, 403]
failure_type: config_error
error_message: Please provide a valid API key and make sure it has permissions to read specified streams.
request_headers:
Accept: "application/json"
Revision: "2023-10-15"
Expand Down Expand Up @@ -242,7 +238,7 @@ definitions:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_klaviyo.components.inclouded_fields_extractor.KlaviyoIncludedFieldExtractor
class_name: source_klaviyo.components.included_fields_extractor.KlaviyoIncludedFieldExtractor
field_path: ["data"]
requester:
$ref: "#/definitions/requester"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin, StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction
from requests import Response
from source_klaviyo.components.klaviyo_backoff_strategy import KlaviyoBackoffStrategy

from .availability_strategy import KlaviyoAvailabilityStrategy
from .exceptions import KlaviyoBackoffError
Expand Down Expand Up @@ -111,15 +115,11 @@ def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], lat
current_stream_state[self.cursor_field] = latest_cursor.isoformat()
return current_stream_state

def backoff_time(self, response: Response) -> Optional[float]:
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
retry_after = float(retry_after) if retry_after else None
if retry_after and retry_after >= self.max_time:
raise KlaviyoBackoffError(
f"Stream {self.name} has reached rate limit with 'Retry-After' of {retry_after} seconds, exit from stream."
)
return retry_after
def get_backoff_strategy(self) -> BackoffStrategy:
return KlaviyoBackoffStrategy(max_time=self.max_time, name=self.name)

def get_error_handler(self) -> ErrorHandler:
return HttpStatusErrorHandler(logger=self.logger, error_mapping=DEFAULT_ERROR_MAPPING, max_retries=self.max_retries)

def read_records(
self,
Expand Down Expand Up @@ -233,8 +233,6 @@ def path(self, **kwargs) -> str:


class CampaignsDetailed(Campaigns):
raise_on_http_errors = False

def parse_response(self, response: Response, **kwargs: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
for record in super().parse_response(response, **kwargs):
yield self._transform_record(record)
Expand All @@ -246,24 +244,32 @@ def _transform_record(self, record: Mapping[str, Any]) -> Mapping[str, Any]:

def _set_recipient_count(self, record: Mapping[str, Any]) -> None:
campaign_id = record["id"]
recipient_count_request = self._create_prepared_request(
path=f"{self.url_base}campaign-recipient-estimations/{campaign_id}",
_, recipient_count_response = self._http_client.send_request(
url=f"{self.url_base}campaign-recipient-estimations/{campaign_id}",
request_kwargs={},
headers=self.request_headers(),
http_method="GET",
)
recipient_count_response = self._send_request(recipient_count_request, {})
record["estimated_recipient_count"] = (
recipient_count_response.json().get("data", {}).get("attributes", {}).get("estimated_recipient_count", 0)
)

def _set_campaign_message(self, record: Mapping[str, Any]) -> None:
message_id = record.get("attributes", {}).get("message")
if message_id:
campaign_message_request = self._create_prepared_request(
path=f"{self.url_base}campaign-messages/{message_id}", headers=self.request_headers()
_, campaign_message_response = self._http_client.send_request(
url=f"{self.url_base}campaign-messages/{message_id}", request_kwargs={}, headers=self.request_headers(), http_method="GET"
)
campaign_message_response = self._send_request(campaign_message_request, {})
record["campaign_message"] = campaign_message_response.json().get("data")

def get_error_handler(self) -> ErrorHandler:

error_mapping = DEFAULT_ERROR_MAPPING | {
404: ErrorResolution(ResponseAction.IGNORE, FailureType.config_error, "Resource not found. Ignoring.")
}

return HttpStatusErrorHandler(logger=self.logger, error_mapping=error_mapping, max_retries=self.max_retries)


class Flows(IncrementalKlaviyoStreamWithArchivedRecords):
"""Docs: https://developers.klaviyo.com/en/reference/get_flows"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest
from requests.models import Response
from source_klaviyo.components.inclouded_fields_extractor import KlaviyoIncludedFieldExtractor
from source_klaviyo.components.included_fields_extractor import KlaviyoIncludedFieldExtractor


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
400,
False,
(
"Unable to connect to stream metrics - "
"Bad request. Please check your request parameters."
),
),
(
403,
False,
(
"Unable to connect to stream metrics - Please provide a valid API key and "
"make sure it has permissions to read specified streams."
"Please provide a valid API key and make sure it has permissions to read specified streams."
),
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from typing import Any, List, Mapping, Optional
from unittest import mock
from unittest.mock import patch

import pendulum
import pytest
Expand Down Expand Up @@ -138,19 +139,19 @@ def test_availability_strategy(self):
)
def test_backoff_time(self, status_code, retry_after, expected_time):
stream = SomeStream(api_key=API_KEY)
response_mock = mock.MagicMock()
response_mock = mock.MagicMock(spec=requests.Response)
response_mock.status_code = status_code
response_mock.headers = {"Retry-After": retry_after}
assert stream.backoff_time(response_mock) == expected_time
assert stream.get_backoff_strategy().backoff_time(response_mock) == expected_time

def test_backoff_time_large_retry_after(self):
stream = SomeStream(api_key=API_KEY)
response_mock = mock.MagicMock()
response_mock = mock.MagicMock(spec=requests.Response)
response_mock.status_code = 429
retry_after = stream.max_time + 5
response_mock.headers = {"Retry-After": retry_after}
with pytest.raises(KlaviyoBackoffError) as e:
stream.backoff_time(response_mock)
stream.get_backoff_strategy().backoff_time(response_mock)
error_message = (
f"Stream some_stream has reached rate limit with 'Retry-After' of {float(retry_after)} seconds, "
"exit from stream."
Expand Down Expand Up @@ -549,13 +550,12 @@ def test_set_recipient_count_not_found(self, requests_mock):
campaign_id = "1"
record = {"id": campaign_id, "attributes": {"name": "Campaign"}}

requests_mock.register_uri(
"GET",
f"https://a.klaviyo.com/api/campaign-recipient-estimations/{campaign_id}",
status_code=404,
json={},
)
stream._set_recipient_count(record)
mocked_response = mock.MagicMock(spec=requests.Response)
mocked_response.ok = False
mocked_response.status_code = 404
mocked_response.json.return_value = {}
with patch.object(stream._http_client, "send_request", return_value=(mock.MagicMock(spec=requests.PreparedRequest), mocked_response)):
stream._set_recipient_count(record)
assert record["estimated_recipient_count"] == 0

def test_set_campaign_message(self, requests_mock):
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Stream `Events Detailed` contains field `name` for `metric` relationship - addit

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|
| 2.8.0 | 2024-07-19 | [XXXXX](https://github.com/airbytehq/airbyte/pull/XXXXX) | Migrate to CDK v3.9.0 |
| 2.7.8 | 2024-07-20 | [42185](https://github.com/airbytehq/airbyte/pull/42185) | Update dependencies |
| 2.7.7 | 2024-07-08 | [40608](https://github.com/airbytehq/airbyte/pull/40608) | Update the `events_detailed` stream to improve efficiency using the events API |
| 2.7.6 | 2024-07-13 | [41903](https://github.com/airbytehq/airbyte/pull/41903) | Update dependencies |
Expand Down

0 comments on commit 38fe6ae

Please sign in to comment.