-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662
base: master
Are you sure you want to change the base?
Conversation
…n the concurrent framework
… and correctly merging intervals
…fix bugs found during functional testing
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
|
||
connector_state_converter = CustomOutputFormatConcurrentStreamStateConverter( | ||
datetime_format=declarative_cursor_attributes.datetime_format, | ||
is_sequential_state=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By disabling is_sequential_state
, we will automatically move connectors from their sequential state to per-partition concurrent state. This is the ideal end goal we want, however, if we were to revert the connector to a previous version that doesn't use the concurrent CDK, the state message would not be compatible.
It feels like to de-risk this a little bit on the early release, we should set this to true so we accept sequential state (i.e. {"updated_at": "2024-12-12"}
and also emit it back to the platform as such
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we could add a mechanism to handle this situation by using the lowest value to set the sequential state if we revert the changes?
@@ -32,3 +33,6 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> | |||
The error object will be cast to string to display the problem to the user. | |||
""" | |||
return self.connection_checker.check_connection(self, logger, config) | |||
|
|||
def all_streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
return self.streams(config=config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non concurrent low-code sources, these are equivalent, but we override this implementation in ConcurrentDeclarativeSource
to create a single list of both concurrent and synchronous sources so that we properly generate catalogs and other things
cursor_field=[declarative_cursor_attributes.cursor_field] | ||
if declarative_cursor_attributes.cursor_field is not None | ||
else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor_field=[declarative_cursor_attributes.cursor_field] | |
if declarative_cursor_attributes.cursor_field is not None | |
else None, | |
cursor_field=( | |
[declarative_cursor_attributes.cursor_field] if declarative_cursor_attributes.cursor_field is not None else None | |
), |
catalog: ConfiguredAirbyteCatalog, | ||
concurrent_stream_names: set[str], | ||
) -> ConfiguredAirbyteCatalog: | ||
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]) | |
catalog.streams = [stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names] | |
return catalog |
⚡️ Codeflash found optimizations for this PR📄
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got distracted, just posting this so comments don't get lost, will read thoroughly later
@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._parameters = parameters | |||
|
|||
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | |||
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface | |||
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Teach me your ways, what is the difference betweeen streams and all_streams? (ignore if it's in this PR, reading through it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, I mention it in a comment: https://github.com/airbytehq/airbyte/pull/46662/files#r1792816384
But the reason why we can't just rewrite the streams()
method is because within the existing Python CDK core.py
, when processing synchronous streams, we invoke the streams()
method and in that context we don't want to return the concurrent streams that aren't compatible in that are of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep this is addressed in my latest commit using the optional param include_concurrent_streams
@@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability: | |||
""" | |||
|
|||
|
|||
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the implementation looks great—nice work! However, we still need to make a few updates. After that I can approve.
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set 6 as a variable to avoid using magic numbers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed
@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._parameters = parameters | |||
|
|||
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | |||
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface | |||
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.
concurrency_level = concurrency_level_component.get_concurrency_level() | ||
initial_number_of_partitions_to_generate = concurrency_level // 2 | ||
else: | ||
concurrency_level = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to move the value from the code into a class variable may be to improve readability
def all_streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() | ||
|
||
def _separate_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think the name _group_streams
would be more informative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm fine with _group_streams
for slice_start, slice_end in self._cursor.generate_slices(): | ||
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end}) | ||
|
||
start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current implementation of datetime cursor self._slice_boundary_fields
never has none value
from source_amplitude import SourceAmplitude | ||
|
||
|
||
def _get_source(args: List[str]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to mention in PR description clearly that this is will be a breaking change. And add info about this to cdk-migration file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep that is the plan, I wrote up a migration guide that will be included in the next commit I push explaining what needs to change in run.py
and source.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple minor comments. I'll defer to Serhii for approval.
Also can you run regression tests w/ one/two of the test connectors?
|
||
state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later | ||
|
||
self.logger.info(f"what is config: {config}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a personal debugging log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is. i'm removing this thank you
declarative_cursor.get_partition_field_end().eval(config=config), | ||
) | ||
|
||
interpolated_state_date = declarative_cursor.get_start_datetime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: interpolated_state_date
typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eyes!
end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end" | ||
|
||
wam = list(self._cursor.generate_slices()) | ||
for slice_start, slice_end in wam: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems like it would be more memory efficient to directly iterate over the generated slices, is there a specific reason for saving to a list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct. I had originally added this for debugging to see the entire set of slices easier, but you are correct this should be iterable
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this -- Is there somewhere we would be serializing a datetime object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it.
However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again
I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition.
…pports second precision
…ent cursor transformation
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/9710
What
Adds the new
ConcurrentDeclarativeSource
class which serves as the way we adapt the existing YamlDeclarativeSource used by all low-code connectors into being runnable within the Concurrent CDK framework. This PR combines all the other previous units of work so that low-code streams are translated into concurrentDefaultStream
instances.Another big aspect of this review is how we gate the streams that will run concurrently:
How
The overall design is predicated on introducing a new class
ConcurrentDeclarativeSource
which behaves as a kind of adapter between the existing entrypoint.py that all syncs are triggered from and theConcurrentSource
which is responsible for running certain streams using the Concurrent CDK engine.The
ConcurrentDeclarativeSource
inherits the ManifestDeclarativeSource so that we can reuse the logic to parse a manifest into low-code runtime components and allow the inspection of it's components to decide whether it can be run concurrently or synchronously.The last big part of the code is this puts into place the logic to transform a low-code stream's
DatetimeBasedCursor
into aConcurrentCursor
. The reason why we need to do this is that the interfaces for a low-code cursor and concurrent cursor differ in a few specific ways and trying to make them both fit the same interface created a frankenstein class that proved to be even more unwieldly. In prior PRs, see 45413, it was determined that there was feature parity so now we perform the transformation and supply it to the concurrent engine to handle date window partitioning and state management.Something else important to note is that there are some specific cases where an incremental stream cannot be run as a concurrent source. Since we introduced the language, we've allow
stream_state
to be a valid interpolation for various components. However, because partitions can be run in any order and complete at anytime,stream_state
managed by the ConcurrentCursor is not a thread-safe value anymore (vs when it was managed sequentially). I inspected the schema and our repo for it's usage. For streams usingstream_state
in an unsafe way, we make it a synchronous stream, but we should fix those connectors to use the thread safestream_interval
and ultimately get rid of the extra code later.Short term how we enable this
I've included two examples of how connectors can uptake concurrent processing. They are the same and will be deleted before merging.
The two things that need to be changed are:
run.py
- Our previous design for connectors did not take any arguments passed to the connector from the platform. This is a significant limitation because the concurrent framework is entirely based around instantiating things like cursors up front before performing a read. I haven't found a great way to avoid changing this as this is a limitation of the Concurrent CDKsource.py
- Oncerun.py
is updated to pass in the various operation arguments like state, config, and catalog, we need to pass them to theConcurrentDeclarativeSource
constructorReview guide
concurrent_declarative_source.py
datetime_based_cursor.py
adapter.py
datetime_stream_state_converter.py
yaml_declarative_source.py
source-sentry
orsource-amplitude
User Impact
This is considered a breaking CDK change because connectors will need to follow the included migration guide to update a connectors
run.py
andsource.py
filesCan this PR be safely reverted and rolled back?
Yes, because this isn't release yet. However, this does pose a risk once we move a connector to concurrent because once we start emitting the new state format, then it is much harder to go backward since the connector cannot process concurrent state. See my comment in the code for more