-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662
base: master
Are you sure you want to change the base?
Changes from 17 commits
1130a74
54a1f85
0bd2deb
2ad65d9
8861054
aa55ab7
4288b8d
1650e30
98c42a7
0f79069
5992e19
6a160c0
b09311f
0180e11
0c0c019
61dfb9b
5dd1121
7bd3056
bef1c03
c2e3bdb
9116da6
80186ca
1242296
b74b942
99bee1e
15cb6dd
9b5eb62
b82c21c
6a91848
15127a7
977c525
4e38c4e
f7f3e9d
7ab1bf1
69aa511
626e78d
e5efec4
b571301
c131645
10431bc
3fa228f
96cf12b
2ca0f74
55b53df
13b9ccb
d13a1e6
c9e1441
32941cc
651be92
f21b209
57127bc
5054469
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import copy | ||
import json | ||
import logging | ||
from datetime import datetime | ||
from functools import lru_cache | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union | ||
|
||
|
@@ -24,6 +25,7 @@ | |
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition | ||
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator | ||
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record | ||
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import DateTimeStreamStateConverter | ||
from airbyte_cdk.sources.streams.core import StreamData | ||
from airbyte_cdk.sources.types import StreamSlice | ||
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig | ||
|
@@ -203,6 +205,11 @@ class SliceEncoder(json.JSONEncoder): | |
def default(self, obj: Any) -> Any: | ||
if hasattr(obj, "__json_serializable__"): | ||
return obj.__json_serializable__() | ||
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we set 6 as a variable to avoid using magic numbers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't follow this -- Is there somewhere we would be serializing a datetime object? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it. However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition. |
||
|
||
# Let the base class default method raise the TypeError | ||
return super().default(obj) | ||
|
||
|
@@ -341,12 +348,17 @@ class CursorPartitionGenerator(PartitionGenerator): | |
across partitions. Each partition represents a subset of the stream's data and is determined by the cursor's state. | ||
""" | ||
|
||
_START_BOUNDARY = 0 | ||
_END_BOUNDARY = 1 | ||
|
||
def __init__( | ||
self, | ||
stream: Stream, | ||
message_repository: MessageRepository, | ||
cursor: Cursor, | ||
connector_state_converter: DateTimeStreamStateConverter, | ||
cursor_field: Optional[List[str]], | ||
slice_boundary_fields: Optional[Tuple[str, str]], | ||
): | ||
""" | ||
Initialize the CursorPartitionGenerator with a stream, sync mode, and cursor. | ||
|
@@ -362,6 +374,8 @@ def __init__( | |
self._cursor = cursor | ||
self._cursor_field = cursor_field | ||
self._state = self._cursor.state | ||
self._slice_boundary_fields = slice_boundary_fields | ||
self._connector_state_converter = connector_state_converter | ||
|
||
def generate(self) -> Iterable[Partition]: | ||
""" | ||
|
@@ -372,8 +386,18 @@ def generate(self) -> Iterable[Partition]: | |
|
||
:return: An iterable of StreamPartition objects. | ||
""" | ||
|
||
start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For current implementation of datetime cursor |
||
end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end" | ||
|
||
for slice_start, slice_end in self._cursor.generate_slices(): | ||
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end}) | ||
stream_slice = StreamSlice( | ||
partition={}, | ||
cursor_slice={ | ||
start_boundary: self._connector_state_converter.output_format(slice_start), | ||
end_boundary: self._connector_state_converter.output_format(slice_end), | ||
}, | ||
) | ||
|
||
yield StreamPartition( | ||
self._stream, | ||
|
@@ -386,7 +410,7 @@ def generate(self) -> Iterable[Partition]: | |
) | ||
|
||
|
||
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) | ||
@deprecated("Availability strategy has been soft deprecated. Do not use. Class is subject to removal", category=ExperimentalClassWarning) | ||
class AvailabilityStrategyFacade(AvailabilityStrategy): | ||
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): | ||
self._abstract_availability_strategy = abstract_availability_strategy | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability: | |
""" | ||
|
||
|
||
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use |
||
class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): | ||
""" | ||
An availability strategy that always indicates a stream is available. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,94 @@ | ||
# CDK Migration Guide | ||
|
||
## Upgrading to 6.x.x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we validate with @bnchrch what is the impact on the manifest-only sources and what is the migration path for them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, i'll add a note in the guide. But it sounds like and as for connector builder, that one actually doesn't use |
||
|
||
Version 6.x.x of the CDK introduces concurrent processing of low-code incremental streams. This is breaking because non-manifest only connectors must update their self-managed `run.py` and `source.py` files. This section is intended to clarify how to upgrade a low-code connector to use the Concurrent CDK to sync incremental streams. | ||
|
||
> [!NOTE] | ||
> This version introduces parallel processing of only incremental streams. | ||
> It does not include the parallel processing of substreams that rely on a parent stream | ||
> It also does not include processing of full-refresh streams in parallel. | ||
|
||
Low-code incremental streams that match any of the following criteria are not supported by concurrent as of this version: | ||
- Uses a custom implementation of the `DatetimeBasedCursor` component | ||
- The `DatetimeBasedCursor` defines a `step` which will partition a stream's request into time intervals AND a | ||
`AddedField` / `HttpRequester` / `RecordFilter` that relies on interpolation of the `stream_state` value. See below | ||
for the complete list | ||
|
||
In order to enable concurrency for a low-code connector, the following changes must be made: | ||
- In the connector's `source.py`, change the method signature to accept catalog, config, and state parameters. Change the invocation of `super()` to pass in those new parameters | ||
|
||
```python3 | ||
class SourceName(YamlDeclarativeSource): | ||
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): | ||
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"}) | ||
``` | ||
- In the connector's `run.py`, update it to pass variables | ||
|
||
```python3 | ||
def _get_source(args: List[str]): | ||
catalog_path = AirbyteEntrypoint.extract_catalog(args) | ||
config_path = AirbyteEntrypoint.extract_config(args) | ||
state_path = AirbyteEntrypoint.extract_state(args) | ||
try: | ||
return SourceName( | ||
SourceName.read_catalog(catalog_path) if catalog_path else None, | ||
SourceName.read_config(config_path) if config_path else None, | ||
SourceName.read_state(state_path) if state_path else None, | ||
) | ||
except Exception as error: | ||
print( | ||
orjson.dumps( | ||
AirbyteMessageSerializer.dump( | ||
AirbyteMessage( | ||
type=Type.TRACE, | ||
trace=AirbyteTraceMessage( | ||
type=TraceType.ERROR, | ||
emitted_at=int(datetime.now().timestamp() * 1000), | ||
error=AirbyteErrorTraceMessage( | ||
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}", | ||
stack_trace=traceback.format_exc(), | ||
), | ||
), | ||
) | ||
) | ||
).decode() | ||
) | ||
return None | ||
|
||
|
||
def run(): | ||
_args = sys.argv[1:] | ||
source = _get_source(_args) | ||
if source: | ||
launch(source, _args) | ||
``` | ||
|
||
- Add the `ConcurrencyLevel` component to the connector's `manifest.yaml` file | ||
|
||
```yaml | ||
concurrency_level: | ||
type: ConcurrencyLevel | ||
default_concurrency: "{{ config['num_workers'] or 10 }}" | ||
max_concurrency: 20 | ||
``` | ||
|
||
### Connectors that have streams that cannot be processed concurrently | ||
|
||
Connectors that have streams that use `stream_state` during interpolation and must be run synchronously until they are fixed or updated: | ||
- Http Requester | ||
- `source-insightly`: Uses an DatetimeBasedCursor with a step interval and the HttpRequester has request_parameters relying on `stream_state`. This should be replaced by `step_interval` | ||
- `source-intercom`: Uses a custom `incremental_sync` component and `stream_state` used as part of the HttpRequester request_body_json. However, because this processed on a single slice, `stream_interval` can be used | ||
- Record Filter | ||
- `source-chargebee`: Uses a custom `incremental_sync` component and `stream_state` in the RecordFilter condition. However, because this processed on a single slice, `stream_interval` can be used | ||
- `source-intercom`: Uses a custom `incremental_sync` component and `stream_state` used as part of the RecordFilter condition. However, because this processed on a single slice, `stream_interval` can be used | ||
- `source-railz`: Uses a custom `incremental_sync` component and `stream_state` used as part of the RecordFilter condition. This also uses multiple one month time intervals and is not currently compatible for concurrent | ||
- `source-tiktok-marketing`: Contains DatetimeBasedCursor with a step interval and relies on a CustomRecordFilter with a condition relying on `stream_state`. This should be replaced by `stream_interval` | ||
- `AddFields`: No connectors use `stream_state` when performing an additive transformation for a record | ||
|
||
To enable concurrency on these streams, `stream_state` should be removed from the interpolated value and replaced | ||
by a thread safe interpolation context like `stream_interval` or `stream_partition`. | ||
|
||
## Upgrading to 5.0.0 | ||
|
||
Version 5.0.0 of the CDK updates the `airbyte_cdk.models` dependency to replace Pydantic v2 models with Python `dataclasses`. It also | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Regarding the
# type: ignore
that was already there before your changes: Should we just make it part of the interface then? I'm not sure whyConnectionChecker.check
does not only takeDeclarativeSource
then. It seems like even the typing forSource
is too large asstreams
is defined on theAbstractStream
level. It feels like updating the typing issue, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposed temporary fix: https://github.com/airbytehq/airbyte/pull/46995/files#r1806474429