Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up method Stream._get_checkpoint_reader by 134% in PR #45701 (lazebnyi/source-stripe-salesforce-add-convert-to-concurrent-stream) #45720

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:

:param stream_state: The state of the stream as returned by get_stream_state
"""
pass

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand Down
58 changes: 13 additions & 45 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated import deprecated
from airbyte_cdk.sources.streams.checkpoint.cursor import Cursor

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
Expand Down Expand Up @@ -435,25 +436,13 @@ def _get_checkpoint_reader(
sync_mode: SyncMode,
stream_state: MutableMapping[str, Any],
) -> CheckpointReader:
mappings_or_slices = self.stream_slices(
cursor_field=cursor_field,
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_state=stream_state,
)

# Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and
# has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively
# misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no
# specific slice values. None is bad, and now I feel bad that I have to write this hack.
mappings_or_slices = self.stream_slices(cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state)
if mappings_or_slices == [None]:
mappings_or_slices = [{}]

slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2)
stream_classification = self._classify_stream(mappings_or_slices=iterable_for_detecting_format)

# Streams that override has_multiple_slices are explicitly indicating that they will iterate over
# multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as
# a backup. So if this value was already assigned to True by the stream, we don't need to reassign it
self.has_multiple_slices = self.has_multiple_slices or stream_classification.has_multiple_slices

cursor = self.get_cursor()
Expand All @@ -464,29 +453,25 @@ def _get_checkpoint_reader(

if cursor and stream_classification.is_legacy_format:
return LegacyCursorBasedCheckpointReader(stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True)
elif cursor:
if cursor:
return CursorBasedCheckpointReader(
stream_slices=slices_iterable_copy,
cursor=cursor,
read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
)
elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
# Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does
# not iterate over a static set of slices.
if checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
return ResumableFullRefreshCheckpointReader(stream_state=stream_state)
elif checkpoint_mode == CheckpointMode.INCREMENTAL:
if checkpoint_mode == CheckpointMode.INCREMENTAL:
return IncrementalCheckpointReader(stream_slices=slices_iterable_copy, stream_state=stream_state)
else:
return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy)
return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy)

@property
def _checkpoint_mode(self) -> CheckpointMode:
if self.is_resumable and len(self._wrapped_cursor_field()) > 0:
if self.is_resumable and self._wrapped_cursor_field():
return CheckpointMode.INCREMENTAL
elif self.is_resumable:
if self.is_resumable:
return CheckpointMode.RESUMABLE_FULL_REFRESH
else:
return CheckpointMode.FULL_REFRESH
return CheckpointMode.FULL_REFRESH

@staticmethod
def _classify_stream(mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]]) -> StreamClassification:
Expand All @@ -504,37 +489,20 @@ def _classify_stream(mappings_or_slices: Iterator[Optional[Union[Mapping[str, An
"""
if not mappings_or_slices:
raise ValueError("A stream should always have at least one slice")

try:
next_slice = next(mappings_or_slices)
if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice(partition={}, cursor_slice={}):
is_legacy_format = False
slice_has_value = False
elif next_slice == {}:
is_legacy_format = True
slice_has_value = False
elif isinstance(next_slice, StreamSlice):
is_legacy_format = False
slice_has_value = True
else:
is_legacy_format = True
slice_has_value = True
except StopIteration:
# If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically
# a valid case because it is up to the stream to define its slicing behavior
return StreamClassification(is_legacy_format=False, has_multiple_slices=False)

if slice_has_value:
# If the first slice contained a partition value from the result of stream_slices(), this is a substream that might
# have multiple parent records to iterate over
return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value)
is_legacy_format = not isinstance(next_slice, StreamSlice)
slice_has_value = bool(next_slice and (next_slice != StreamSlice(partition={}, cursor_slice={}) and next_slice != {}))

try:
# If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices
next(mappings_or_slices)
return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True)
except StopIteration:
# If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream
return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=False)
return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value)

def log_stream_sync_configuration(self) -> None:
"""
Expand Down
Loading