Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1130a74
initial work to create the ConcurrentDeclarativeSource that can run i…
brianjlai Sep 23, 2024
54a1f85
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Sep 26, 2024
0bd2deb
adding more tests and fixing bugs for only syncing streams in catalog…
brianjlai Oct 1, 2024
2ad65d9
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 2, 2024
8861054
fix a few more merge conflict errors
brianjlai Oct 2, 2024
aa55ab7
Fix tests and add cursor granularity to the cursor partition generator
brianjlai Oct 3, 2024
4288b8d
integrate YamlDeclarativeSource with ConcurrentDeclarativeSource and …
brianjlai Oct 9, 2024
1650e30
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 9, 2024
98c42a7
rebase, formatting, fix tests, add new test cases for concurrency level
brianjlai Oct 9, 2024
0f79069
forgot to remove change to test
brianjlai Oct 9, 2024
5992e19
fix mypy errors and a few others bugs and testing
brianjlai Oct 12, 2024
6a160c0
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 12, 2024
b09311f
add logic to skip streams using non-thread safe stream_state, pr feed…
brianjlai Oct 16, 2024
0180e11
fix formatting and mypy checks
brianjlai Oct 16, 2024
0c0c019
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 16, 2024
61dfb9b
fix more mypy
brianjlai Oct 16, 2024
5dd1121
mypy
brianjlai Oct 16, 2024
7bd3056
pr feedback and updates to source-sentry for testing
brianjlai Oct 17, 2024
bef1c03
sentry lockfile
brianjlai Oct 17, 2024
c2e3bdb
update base image
brianjlai Oct 17, 2024
9116da6
bump amplitude dependencies and versions for testing
brianjlai Oct 17, 2024
80186ca
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
1242296
add logging for incremental streams that are not thread safe
brianjlai Oct 17, 2024
b74b942
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
99bee1e
remove amplitude version bump
brianjlai Oct 17, 2024
15cb6dd
get rid of stream_state interpolation in sentry
brianjlai Oct 18, 2024
9b5eb62
whatever
brianjlai Oct 19, 2024
b82c21c
parse DatetimeBasedCursorModel to ConcurrentCursor, bugfixes, pr feed…
brianjlai Oct 19, 2024
6a91848
formatting + mypy
brianjlai Oct 19, 2024
15127a7
fix mypy by replacing empty tuple() with None to make it truly optional
brianjlai Oct 19, 2024
977c525
remove local cdk from sentry
brianjlai Oct 19, 2024
4e38c4e
update lockfile
brianjlai Oct 19, 2024
f7f3e9d
swapped updating lockfiles
brianjlai Oct 19, 2024
7ab1bf1
reducing granularity of events stream cursor partitions which only s…
brianjlai Oct 21, 2024
69aa511
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 22, 2024
626e78d
add more complete testing of the datetimebasedcursor model to concurr…
brianjlai Oct 22, 2024
e5efec4
add concurrency to chargebee and fix usage of epoch timestamps
brianjlai Oct 23, 2024
b571301
extra space
brianjlai Oct 23, 2024
c131645
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 23, 2024
10431bc
fix mypy errors
brianjlai Oct 23, 2024
3fa228f
formatting
brianjlai Oct 23, 2024
96cf12b
fix lockfile and remove jira
brianjlai Oct 23, 2024
2ca0f74
fix spec command
brianjlai Oct 24, 2024
55b53df
Maxi297/fix streams interface (#46995)
maxi297 Oct 25, 2024
13b9ccb
add is_resumable to concurrent DefaultStream
brianjlai Oct 25, 2024
d13a1e6
remove lockfile changes to individual connectors
brianjlai Oct 25, 2024
c9e1441
update sentry lockfile
brianjlai Oct 25, 2024
32941cc
fix tests and increase retry attempts
brianjlai Oct 25, 2024
651be92
reduce chargebee concurrency to 1 for testing
brianjlai Oct 26, 2024
f21b209
test keeping the cursor on the declarative stream
brianjlai Oct 26, 2024
57127bc
remove cursor on the declarative stream
brianjlai Oct 26, 2024
5054469
remove chargebee concurrency for testing
brianjlai Oct 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
"""

@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec.
:param include_concurrent_streams: Concurrent sources can be made up of streams that can be run concurrently and
ones that must be run synchronously. By default, for backwards compatibility this is disabled.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Contributor

@maxi297 maxi297 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Regarding the # type: ignore that was already there before your changes: Should we just make it part of the interface then? I'm not sure why ConnectionChecker.check does not only take DeclarativeSource then. It seems like even the typing for Source is too large as streams is defined on the AbstractStream level. It feels like updating the typing issue, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,18 @@ def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
# Check if the new runtime lookback window is greater than the current config lookback
if parse_duration(runtime_lookback_window) > config_lookback:
self._lookback_window = InterpolatedString.create(runtime_lookback_window, parameters={})

def get_start_datetime(self) -> MinMaxDatetime:
return self._start_datetime

def get_end_datetime(self) -> Optional[MinMaxDatetime]:
return self._end_datetime

def get_step(self) -> Union[timedelta, Duration]:
return self._step

def get_partition_field_start(self) -> InterpolatedString:
return self._partition_field_start

def get_partition_field_end(self) -> InterpolatedString:
return self._partition_field_end
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def eval(self, config: Config, **kwargs: Any) -> Any:
return self.string
if self._is_plain_string is None:
# Let's check whether output from evaluation is the same as input.
# This indicates occurence of a plain string, not a template and we can skip Jinja in subsequent runs.
# This indicates occurrence of a plain string, not a template and we can skip Jinja in subsequent runs.
evaluated = self._interpolation.eval(self.string, config, self.default, parameters=self._parameters, **kwargs)
self._is_plain_string = self.string == evaluated
return evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple, Union

import yaml
from airbyte_cdk.models import (
Expand Down Expand Up @@ -88,7 +88,7 @@ def connection_checker(self) -> ConnectionChecker:
else:
raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}")

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
stream_configs = self._stream_configs(self._source_config)

Expand Down Expand Up @@ -159,7 +159,7 @@ def read(
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from deprecated import deprecated

RequestInput = Union[str, Mapping[str, str]]
ValidRequestTypes = (str, list)
Expand Down Expand Up @@ -109,3 +111,34 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

@deprecated("This class is temporary and used to incrementally deliver low-code to concurrent", category=ExperimentalClassWarning)
def request_options_contain_stream_state(self) -> bool:
"""
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
stream_state.
"""

return (
self._check_if_interpolation_uses_stream_state(self.request_parameters)
or self._check_if_interpolation_uses_stream_state(self.request_headers)
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
)

@staticmethod
def _check_if_interpolation_uses_stream_state(request_input: Optional[Union[RequestInput, NestedMapping]]) -> bool:
if not request_input:
return False
elif isinstance(request_input, str):
return "stream_state" in request_input
else:
for key, val in request_input.items():
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
# of a NestedMapping where the value is a string.
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
# in our code
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,37 @@
#

import pkgutil
from typing import Any
from typing import Any, List, Mapping, Optional

import yaml
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
from airbyte_cdk.sources.types import ConnectionDefinition


class YamlDeclarativeSource(ManifestDeclarativeSource):
class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
"""Declarative source defined by a yaml file"""

def __init__(self, path_to_yaml: str, debug: bool = False) -> None:
def __init__(
self,
path_to_yaml: str,
debug: bool = False,
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config, debug)

super().__init__(
catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
config=config or {},
state=state or [],
source_config=source_config,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import json
import logging
from datetime import datetime
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

Expand All @@ -24,6 +25,7 @@
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import DateTimeStreamStateConverter
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
Expand Down Expand Up @@ -203,6 +205,11 @@ class SliceEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if hasattr(obj, "__json_serializable__"):
return obj.__json_serializable__()

# This needs to be revisited as we can't lose precision
if isinstance(obj, datetime):
return list(obj.timetuple())[0:6]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set 6 as a variable to avoid using magic numbers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this -- Is there somewhere we would be serializing a datetime object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.

Copy link
Contributor Author

@brianjlai brianjlai Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it.

However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again

I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition.


# Let the base class default method raise the TypeError
return super().default(obj)

Expand Down Expand Up @@ -341,12 +348,17 @@ class CursorPartitionGenerator(PartitionGenerator):
across partitions. Each partition represents a subset of the stream's data and is determined by the cursor's state.
"""

_START_BOUNDARY = 0
_END_BOUNDARY = 1

def __init__(
self,
stream: Stream,
message_repository: MessageRepository,
cursor: Cursor,
connector_state_converter: DateTimeStreamStateConverter,
cursor_field: Optional[List[str]],
slice_boundary_fields: Optional[Tuple[str, str]],
):
"""
Initialize the CursorPartitionGenerator with a stream, sync mode, and cursor.
Expand All @@ -362,6 +374,8 @@ def __init__(
self._cursor = cursor
self._cursor_field = cursor_field
self._state = self._cursor.state
self._slice_boundary_fields = slice_boundary_fields
self._connector_state_converter = connector_state_converter

def generate(self) -> Iterable[Partition]:
"""
Expand All @@ -372,8 +386,18 @@ def generate(self) -> Iterable[Partition]:

:return: An iterable of StreamPartition objects.
"""

start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current implementation of datetime cursor self._slice_boundary_fields never has none value

end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end"

for slice_start, slice_end in self._cursor.generate_slices():
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end})
stream_slice = StreamSlice(
partition={},
cursor_slice={
start_boundary: self._connector_state_converter.output_format(slice_start),
end_boundary: self._connector_state_converter.output_format(slice_end),
},
)

yield StreamPartition(
self._stream,
Expand All @@ -386,7 +410,7 @@ def generate(self) -> Iterable[Partition]:
)


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@deprecated("Availability strategy has been soft deprecated. Do not use. Class is subject to removal", category=ExperimentalClassWarning)
class AvailabilityStrategyFacade(AvailabilityStrategy):
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
self._abstract_availability_strategy = abstract_availability_strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability:
"""


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use

class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy):
"""
An availability strategy that always indicates a stream is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType)

lower = max(lower, self._start) if self._start else lower
if not self._slice_range or lower + self._slice_range >= upper:
yield lower, upper
if self._cursor_granularity:
yield lower, upper - self._cursor_granularity
else:
yield lower, upper
else:
stop_processing = False
current_lower_boundary = lower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,17 @@ def parse_timestamp(self, timestamp: str) -> datetime:
if not isinstance(dt_object, DateTime):
raise ValueError(f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})")
return dt_object # type: ignore # we are manually type checking because pendulum.parse may return different types


class CustomOutputFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
"""
Datetime State converter that emits state according to the supplied datetime format. The converter supports reading
incoming state in any valid datetime format via Pendulum.
"""

def __init__(self, datetime_format: str, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None):
super().__init__(is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity)
self._datetime_format = datetime_format

def output_format(self, timestamp: datetime) -> str:
return timestamp.strftime(self._datetime_format)
89 changes: 89 additions & 0 deletions airbyte-cdk/python/cdk-migrations.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,94 @@
# CDK Migration Guide

## Upgrading to 6.x.x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate with @bnchrch what is the impact on the manifest-only sources and what is the migration path for them?

Copy link
Contributor Author

@brianjlai brianjlai Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, i'll add a note in the guide. But it sounds like source-declarative-manifest is pinned to an explicit version so we can migrate on our own time. But we basically need to modify the respective run.py and source.py files the way we do individual connectors and then bump the version. And that in turn is actually the easiest way to release this to the most connectors since no manifest-only connectors define their own run.py

and as for connector builder, that one actually doesn't use YamlDeclarativeSource it uses SourceDeclarativeManifest which doesn't contain breaking changes. I think it's up for debate whether that should switch over, but for now it would be non breaking for there


Version 6.x.x of the CDK introduces concurrent processing of low-code incremental streams. This is breaking because non-manifest only connectors must update their self-managed `run.py` and `source.py` files. This section is intended to clarify how to upgrade a low-code connector to use the Concurrent CDK to sync incremental streams.

> [!NOTE]
> This version introduces parallel processing of only incremental streams.
> It does not include the parallel processing of substreams that rely on a parent stream
> It also does not include processing of full-refresh streams in parallel.

Low-code incremental streams that match any of the following criteria are not supported by concurrent as of this version:
- Uses a custom implementation of the `DatetimeBasedCursor` component
- The `DatetimeBasedCursor` defines a `step` which will partition a stream's request into time intervals AND a
`AddedField` / `HttpRequester` / `RecordFilter` that relies on interpolation of the `stream_state` value. See below
for the complete list

In order to enable concurrency for a low-code connector, the following changes must be made:
- In the connector's `source.py`, change the method signature to accept catalog, config, and state parameters. Change the invocation of `super()` to pass in those new parameters

```python3
class SourceName(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})
```
- In the connector's `run.py`, update it to pass variables

```python3
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceName(
SourceName.read_catalog(catalog_path) if catalog_path else None,
SourceName.read_config(config_path) if config_path else None,
SourceName.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
return None


def run():
_args = sys.argv[1:]
source = _get_source(_args)
if source:
launch(source, _args)
```

- Add the `ConcurrencyLevel` component to the connector's `manifest.yaml` file

```yaml
concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config['num_workers'] or 10 }}"
max_concurrency: 20
```

### Connectors that have streams that cannot be processed concurrently

Connectors that have streams that use `stream_state` during interpolation and must be run synchronously until they are fixed or updated:
- Http Requester
- `source-insightly`: Uses an DatetimeBasedCursor with a step interval and the HttpRequester has request_parameters relying on `stream_state`. This should be replaced by `step_interval`
- `source-intercom`: Uses a custom `incremental_sync` component and `stream_state` used as part of the HttpRequester request_body_json. However, because this processed on a single slice, `stream_interval` can be used
- Record Filter
- `source-chargebee`: Uses a custom `incremental_sync` component and `stream_state` in the RecordFilter condition. However, because this processed on a single slice, `stream_interval` can be used
- `source-intercom`: Uses a custom `incremental_sync` component and `stream_state` used as part of the RecordFilter condition. However, because this processed on a single slice, `stream_interval` can be used
- `source-railz`: Uses a custom `incremental_sync` component and `stream_state` used as part of the RecordFilter condition. This also uses multiple one month time intervals and is not currently compatible for concurrent
- `source-tiktok-marketing`: Contains DatetimeBasedCursor with a step interval and relies on a CustomRecordFilter with a condition relying on `stream_state`. This should be replaced by `stream_interval`
- `AddFields`: No connectors use `stream_state` when performing an additive transformation for a record

To enable concurrency on these streams, `stream_state` should be removed from the interpolated value and replaced
by a thread safe interpolation context like `stream_interval` or `stream_partition`.

## Upgrading to 5.0.0

Version 5.0.0 of the CDK updates the `airbyte_cdk.models` dependency to replace Pydantic v2 models with Python `dataclasses`. It also
Expand Down
Loading
Loading