Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-stripe): add convert to concurrent stream #45701

Draft
wants to merge 43 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
228cae3
Add _convert_to_concurrent_stream to ConcurrentSourceAdapter
lazebnyi Aug 31, 2024
abf61bb
Fix format and mypy
lazebnyi Aug 31, 2024
09bd428
fix formatting
lazebnyi Aug 31, 2024
a50ce52
Merge branch 'master' into lazebnyi/cuncurrent-cdk-cursor-if-incremen…
lazebnyi Aug 31, 2024
437222f
Update _convert_to_concurrent_stream to use stream interface
lazebnyi Sep 10, 2024
860929d
Unify cursor initialization
lazebnyi Sep 10, 2024
39d3431
Merge branch 'lazebnyi/cuncurrent-cdk-cursor-if-incremental-implement…
lazebnyi Sep 10, 2024
9c579fc
Merge branch 'master' into lazebnyi/cuncurrent-cdk-cursor-if-incremen…
lazebnyi Sep 10, 2024
0c59b61
Fix typing
lazebnyi Sep 10, 2024
0200554
Add unittests
lazebnyi Sep 11, 2024
9c7edc7
Merge branch 'master' into lazebnyi/cuncurrent-cdk-cursor-if-incremen…
lazebnyi Sep 11, 2024
2f09e87
Update convert_to_concurrent_stream to public method and add cursor f…
lazebnyi Sep 12, 2024
e5592c5
Merge branch 'lazebnyi/cuncurrent-cdk-cursor-if-incremental-implement…
lazebnyi Sep 12, 2024
ab42093
Merge branch 'master' into lazebnyi/cuncurrent-cdk-cursor-if-incremen…
lazebnyi Sep 12, 2024
585c4b6
Fix formatting
lazebnyi Sep 12, 2024
a96de4b
Merge branch 'lazebnyi/cuncurrent-cdk-cursor-if-incremental-implement…
lazebnyi Sep 12, 2024
1e18669
Depricate old state format annotation and remove unnessesary vars
lazebnyi Sep 17, 2024
7c75335
Depricate old state format annotation
lazebnyi Sep 17, 2024
51808a1
Add dependency to dev CDK version
lazebnyi Sep 17, 2024
1b1d50c
Merge branch 'master' into lazebnyi/cuncurrent-cdk-cursor-if-incremen…
lazebnyi Sep 17, 2024
c0fd261
Update connectors versions
lazebnyi Sep 17, 2024
cdafd19
Update connectors versions
lazebnyi Sep 17, 2024
fdef3e7
Remove unused imports
lazebnyi Sep 18, 2024
1c88a20
Merge branch 'master' into lazebnyi/source-stripe-salesforce-add-conv…
lazebnyi Sep 20, 2024
90b454b
Rollback salesforce
lazebnyi Sep 20, 2024
aea8795
Filter streams by slice boundary
lazebnyi Sep 20, 2024
89faaaf
Update poetry lock
lazebnyi Sep 20, 2024
725b557
Merge branch 'master' into lazebnyi/source-stripe-salesforce-add-conv…
lazebnyi Sep 21, 2024
183b9f2
Update unittests
lazebnyi Sep 21, 2024
c822545
Merge branch 'lazebnyi/source-stripe-salesforce-add-convert-to-concur…
lazebnyi Sep 21, 2024
35cb07c
Fix state definition
lazebnyi Sep 21, 2024
f790888
Merge branch 'master' into lazebnyi/source-stripe-salesforce-add-conv…
lazebnyi Sep 21, 2024
0686421
Rollback stripe changes
lazebnyi Sep 21, 2024
0bd0c8b
Rollback typo
lazebnyi Sep 21, 2024
1b73037
Update tests
lazebnyi Sep 21, 2024
9f36bf0
Update test with mock state manager
lazebnyi Sep 21, 2024
545af70
Add stripe changes
lazebnyi Sep 21, 2024
8f88b73
Merge branch 'master' into lazebnyi/source-stripe-salesforce-add-conv…
lazebnyi Oct 1, 2024
387f0b2
Merge master to branch
lazebnyi Oct 2, 2024
b171792
Merge branch 'lazebnyi/source-stripe-salesforce-add-convert-to-concur…
lazebnyi Oct 2, 2024
9122d39
Fix formatting
lazebnyi Oct 2, 2024
2e09f5f
Bump CDK to latest
lazebnyi Oct 2, 2024
ae88251
Add note to changelog
lazebnyi Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.6.0
dockerImageTag: 5.7.0
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships
Expand Down
618 changes: 327 additions & 291 deletions airbyte-integrations/connectors/source-stripe/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.6.0"
version = "5.7.0"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,54 +538,30 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
]

state_manager = ConnectorStateManager(state=self._state)
converter = EpochValueConcurrentStreamStateConverter()
start = datetime.fromtimestamp(self._start_date_to_timestamp(config), timezone.utc)
slice_range = timedelta(days=config["slice_range"])

return [
self._to_concurrent(
self.convert_to_concurrent_stream(
logger,
stream,
datetime.fromtimestamp(self._start_date_to_timestamp(config), timezone.utc),
timedelta(days=config["slice_range"]),
state_manager,
cursor=self.initialize_cursor(
stream,
state_manager,
converter,
self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream)),
start,
converter.get_end_provider(),
slice_range=slice_range,
),
)
if self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
else stream
for stream in streams
]

def _to_concurrent(
self, stream: Stream, fallback_start: datetime, slice_range: timedelta, state_manager: ConnectorStateManager
) -> Stream:
if stream.name in self._streams_configured_as_full_refresh:
return StreamFacade.create_from_stream(
stream,
self,
entrypoint_logger,
self._create_empty_state(),
FinalStateCursor(stream_name=stream.name, stream_namespace=stream.namespace, message_repository=self.message_repository),
)

state = state_manager.get_stream_state(stream.name, stream.namespace)
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
if slice_boundary_fields:
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
converter = EpochValueConcurrentStreamStateConverter()
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
state_manager.get_stream_state(stream.name, stream.namespace),
self.message_repository,
state_manager,
converter,
cursor_field,
slice_boundary_fields,
fallback_start,
converter.get_end_provider(),
timedelta(seconds=0),
slice_range,
)
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, state, cursor)

return stream

def _create_empty_state(self) -> MutableMapping[str, Any]:
return {}

@staticmethod
def _start_date_to_timestamp(config: Mapping[str, Any]) -> int:
if "start_date" not in config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ def test_rate_limit_max_attempts_exceeded(self, http_mocker: HttpMocker) -> None
FailureType.system_error,
FailureType.config_error,
]
assert "Request rate limit exceeded" in actual_messages.errors[0].trace.error.internal_message

assert "Request rate limit exceeded" in actual_messages.errors[0].trace.error.message

@HttpMocker()
def test_incremental_rate_limit_max_attempts_exceeded(self, http_mocker: HttpMocker) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def test_when_streams_return_full_refresh_as_concurrent():
_NO_STATE,
).streams(_a_valid_config())

# bank_accounts (as it is defined as full_refresh)
# balance_transactions, events, files, file_links and shipping_rates (as it is always concurrent now)
assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 6
assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 5


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 5.7.0 | 2024-10-10 | [45701](https://github.com/airbytehq/airbyte/pull/45701) | Update source to use `convert_to_concurrent_stream` |
| 5.6.0 | 2024-09-10 | [44891](https://github.com/airbytehq/airbyte/pull/44891) | Update `Payment Methods` stream |
| 5.5.4 | 2024-09-09 | [45348](https://github.com/airbytehq/airbyte/pull/45348) | Remove `stripe` python package |
| 5.5.3 | 2024-09-03 | [45101](https://github.com/airbytehq/airbyte/pull/45101) | Fix regression following pagination issue fix |
Expand Down
Loading