Skip to content

Commit

Permalink
update abnormal test
Browse files Browse the repository at this point in the history
  • Loading branch information
midavadim committed May 7, 2024
1 parent e510be1 commit 92655a8
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"type": "STREAM",
"stream": {
"stream_state": {
"36152117": { "date": "2030-01-01" },
"41833532": { "date": "2030-01-01" },
"36152117": { "date": "2030-01-01" }
"41833755": { "date": "2030-01-01" },
"41833700": { "date": "2030-01-01" }
},
"stream_descriptor": { "name": "funnels" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,28 +253,6 @@ def stream_slices(self) -> Iterable[StreamSlice]:
yield from []


class FunnelsLegacyToPerPartitionStateMigration(LegacyToPerPartitionStateMigration):
"""
Gor error when use custom StateMigration:
custom_component_class(**kwargs):
TypeError: LegacyToPerPartitionStateMigration.__init__() missing 2 required positional arguments: 'partition_router', 'cursor'
"""

partition_router: SubstreamPartitionRouter = None
cursor: DatetimeBasedCursor = None
config: Mapping[str, Any]
parameters: Mapping[str, Any]

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
state = super().migrate(stream_state)
for partition_state in state.get("states", []):
# add empty parent_slice attr to partition
if "parent_slice" not in partition_state.get("partition", {}):
partition_state["partition"]["parent_slice"] = {}
return state


@dataclass
class EngagePaginationStrategy(PageIncrement):
"""
Expand Down

0 comments on commit 92655a8

Please sign in to comment.