Skip to content

Commit

Permalink
[RFR for API Sources] fix issue where full refresh state is passed by…
Browse files Browse the repository at this point in the history
… regression tester (#42986)
  • Loading branch information
brianjlai authored Aug 3, 2024
1 parent 8b46896 commit a43b44d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 72 deletions.
7 changes: 7 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ def _read_stream(
stream_name = configured_stream.stream.name
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)

# This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard
# the state because the terminal state for a full refresh sync is not compatible with substream resumable full
# refresh state. This is only required when running live traffic regression testing since the platform normally
# handles whether to pass state
if stream_state == {"__ab_no_cursor_state_message": True}:
stream_state = {}

if "state" in dir(stream_instance):
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
logger.info(f"Setting state of {self.name} stream to {stream_state}")
Expand Down
72 changes: 0 additions & 72 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,78 +545,6 @@ def test_valid_full_refresh_read_with_slices(mocker):
assert messages == expected


# Delete this test as it's no longer relevant since we honor incoming state
def test_full_refresh_does_not_use_incoming_state(mocker):
"""Tests that running a full refresh sync does not use an incoming state message from the platform"""
pass
# We'll actually removed this filtering logic and will rely on the platform to dicate whether to pass state to the connector
# So in reality we can probably get rid of this test entirely
# slices = [{"1": "1"}, {"2": "2"}]
# When attempting to sync a slice, just output that slice as a record

# s1 = MockStream(
# [({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
# name="s1",
# )
# s2 = MockStream(
# [({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
# name="s2",
# )
#
# def stream_slices_side_effect(stream_state: Mapping[str, Any], **kwargs) -> List[Mapping[str, Any]]:
# if stream_state:
# return slices[1:]
# else:
# return slices
#
# mocker.patch.object(MockStream, "get_json_schema", return_value={})
# mocker.patch.object(MockStream, "stream_slices", side_effect=stream_slices_side_effect)
#
# state = [
# AirbyteStateMessage(
# type=AirbyteStateType.STREAM,
# stream=AirbyteStreamState(
# stream_descriptor=StreamDescriptor(name="s1"),
# stream_state=AirbyteStateBlob.parse_obj({"created_at": "2024-01-31"}),
# ),
# ),
# AirbyteStateMessage(
# type=AirbyteStateType.STREAM,
# stream=AirbyteStreamState(
# stream_descriptor=StreamDescriptor(name="s2"),
# stream_state=AirbyteStateBlob.parse_obj({"__ab_no_cursor_state_message": True}),
# ),
# ),
# ]
#
# src = MockSource(streams=[s1, s2])
# catalog = ConfiguredAirbyteCatalog(
# streams=[
# _configured_stream(s1, SyncMode.full_refresh),
# _configured_stream(s2, SyncMode.full_refresh),
# ]
# )
#
# expected = _fix_emitted_at(
# [
# _as_stream_status("s1", AirbyteStreamStatus.STARTED),
# _as_stream_status("s1", AirbyteStreamStatus.RUNNING),
# *_as_records("s1", slices),
# _as_state("s1", {"__ab_no_cursor_state_message": True}),
# _as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
# _as_stream_status("s2", AirbyteStreamStatus.STARTED),
# _as_stream_status("s2", AirbyteStreamStatus.RUNNING),
# *_as_records("s2", slices),
# _as_state("s2", {"__ab_no_cursor_state_message": True}),
# _as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
# ]
# )
#
# messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state)))
#
# assert messages == expected


@pytest.mark.parametrize(
"slices",
[[{"1": "1"}, {"2": "2"}], [{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}]],
Expand Down

0 comments on commit a43b44d

Please sign in to comment.