Skip to content

Commit

Permalink
perf(airbyte-cdk): performance enhancement (#42441)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 authored Jul 29, 2024
1 parent 5a4d887 commit f5bea19
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 126 deletions.
19 changes: 11 additions & 8 deletions airbyte-cdk/python/airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
)

import time
from copy import copy
from typing import Any, List, MutableMapping

from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, AirbyteMessage, OrchestratorType, Type


class ObservedDict(dict):
def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserver, update_on_unchanged_value=True) -> None:
non_observed_mapping = non_observed_mapping.copy()
class ObservedDict(dict): # type: ignore # disallow_any_generics is set to True, and dict is equivalent to dict[Any]
def __init__(
self, non_observed_mapping: MutableMapping[Any, Any], observer: ConfigObserver, update_on_unchanged_value: bool = True
) -> None:
non_observed_mapping = copy(non_observed_mapping)
self.observer = observer
self.update_on_unchanged_value = update_on_unchanged_value
for item, value in non_observed_mapping.items():
Expand All @@ -29,7 +32,7 @@ def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserve
value[i] = ObservedDict(sub_value, observer)
super().__init__(non_observed_mapping)

def __setitem__(self, item: Any, value: Any):
def __setitem__(self, item: Any, value: Any) -> None:
"""Override dict.__setitem__ by:
1. Observing the new value if it is a dict
2. Call observer update if the new value is different from the previous one
Expand Down Expand Up @@ -58,7 +61,7 @@ def update(self) -> None:
emit_configuration_as_airbyte_control_message(self.config)


def observe_connector_config(non_observed_connector_config: MutableMapping[str, Any]):
def observe_connector_config(non_observed_connector_config: MutableMapping[str, Any]) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
connector_config_observer = ConfigObserver()
Expand All @@ -67,16 +70,16 @@ def observe_connector_config(non_observed_connector_config: MutableMapping[str,
return observed_connector_config


def emit_configuration_as_airbyte_control_message(config: MutableMapping):
def emit_configuration_as_airbyte_control_message(config: MutableMapping[str, Any]) -> None:
"""
WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
See the airbyte_cdk.sources.message package
"""
airbyte_message = create_connector_config_control_message(config)
print(airbyte_message.json(exclude_unset=True))
print(airbyte_message.model_dump_json(exclude_unset=True))


def create_connector_config_control_message(config):
def create_connector_config_control_message(config: MutableMapping[str, Any]) -> AirbyteMessage:
control_message = AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=time.time() * 1000,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ def handle_request(args: List[str]) -> AirbyteMessage:
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error handling request: {str(exc)}")
m = error.as_airbyte_message()
print(error.as_airbyte_message().json(exclude_unset=True))
print(error.as_airbyte_message().model_dump_json(exclude_unset=True))
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ def run(self, args: List[str]) -> None:
parsed_args = self.parse_args(args)
output_messages = self.run_cmd(parsed_args)
for message in output_messages:
print(message.json(exclude_unset=True))
print(message.model_dump_json(exclude_unset=True))
26 changes: 13 additions & 13 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,18 @@ def read(

@staticmethod
def handle_record_counts(message: AirbyteMessage, stream_message_count: DefaultDict[HashableStreamDescriptor, float]) -> AirbyteMessage:
if message.type == Type.RECORD:
stream_message_count[message_utils.get_stream_descriptor(message)] += 1.0

elif message.type == Type.STATE:
stream_descriptor = message_utils.get_stream_descriptor(message)

# Set record count from the counter onto the state message
message.state.sourceStats = message.state.sourceStats or AirbyteStateStats()
message.state.sourceStats.recordCount = stream_message_count.get(stream_descriptor, 0.0)

# Reset the counter
stream_message_count[stream_descriptor] = 0.0
match message.type:
case Type.RECORD:
stream_message_count[HashableStreamDescriptor(name=message.record.stream, namespace=message.record.namespace)] += 1.0
case Type.STATE:
stream_descriptor = message_utils.get_stream_descriptor(message)

# Set record count from the counter onto the state message
message.state.sourceStats = message.state.sourceStats or AirbyteStateStats()
message.state.sourceStats.recordCount = stream_message_count.get(stream_descriptor, 0.0)

# Reset the counter
stream_message_count[stream_descriptor] = 0.0
return message

@staticmethod
Expand All @@ -200,7 +200,7 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,

@staticmethod
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> Any:
return airbyte_message.json(exclude_unset=True)
return airbyte_message.model_dump_json(exclude_unset=True)

@classmethod
def extract_state(cls, args: List[str]) -> Optional[Any]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def format(self, record: logging.LogRecord) -> str:
message = super().format(record)
message = filter_secrets(message)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message))
return log_message.json(exclude_unset=True) # type: ignore
return log_message.model_dump_json(exclude_unset=True) # type: ignore

@staticmethod
def extract_extra_args_from_record(record: logging.LogRecord) -> Mapping[str, Any]:
Expand Down
9 changes: 5 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,11 @@ def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage]
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
"""
if isinstance(record_data_or_message, AirbyteMessage):
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())
match record_data_or_message:
case AirbyteMessage():
return record_data_or_message
case _:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())

@property
def message_repository(self) -> Union[None, MessageRepository]:
Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from functools import cached_property, lru_cache
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
Expand Down Expand Up @@ -125,7 +125,7 @@ def logger(self) -> logging.Logger:

has_multiple_slices = False

@property
@cached_property
def name(self) -> str:
"""
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
Expand Down Expand Up @@ -396,7 +396,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
"""
return None

@deprecated(version="0.1.49", reason="You should use explicit state property instead, see IncrementalMixin docs.")
@deprecated(version="0.1.49", reason="You should use explicit state property instead, see IncrementalMixin docs.", action="ignore")
def get_updated_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def _send(
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
message = stream_status_as_airbyte_message(StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons).json(
exclude_unset=True
)
message = stream_status_as_airbyte_message(
StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
).model_dump_json(exclude_unset=True)

# Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
# wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def coerce_catalog_as_full_refresh(catalog: AirbyteCatalog) -> AirbyteCatalog:
stream.default_cursor_field = None

# remove nulls
return AirbyteCatalog.parse_raw(coerced_catalog.json(exclude_unset=True, exclude_none=True))
return AirbyteCatalog.parse_raw(coerced_catalog.model_dump_json(exclude_unset=True, exclude_none=True))
41 changes: 21 additions & 20 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Any, Mapping
import time
from collections.abc import Mapping as ABCMapping
from typing import Any, Mapping, Optional

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, AirbyteTraceMessage
from airbyte_cdk.models import Type as MessageType
Expand All @@ -15,24 +15,25 @@ def stream_data_to_airbyte_message(
stream_name: str,
data_or_message: StreamData,
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
schema: Mapping[str, Any] = None,
schema: Optional[Mapping[str, Any]] = None,
) -> AirbyteMessage:
if schema is None:
schema = {}

if isinstance(data_or_message, Mapping):
data = dict(data_or_message)
now_millis = int(datetime.datetime.now().timestamp() * 1000)
# Transform object fields according to config. Most likely you will
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)
elif isinstance(data_or_message, AirbyteTraceMessage):
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
elif isinstance(data_or_message, AirbyteLogMessage):
return AirbyteMessage(type=MessageType.LOG, log=data_or_message)
else:
raise ValueError(f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}")
match data_or_message:
case ABCMapping():
data = dict(data_or_message)
now_millis = time.time_ns() // 1_000_000
# Transform object fields according to config. Most likely you will
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)
case AirbyteTraceMessage():
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
case AirbyteLogMessage():
return AirbyteMessage(type=MessageType.LOG, log=data_or_message)
case _:
raise ValueError(f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}")
6 changes: 4 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def read(
with tempfile.TemporaryDirectory() as tmp_directory:
tmp_directory_path = Path(tmp_directory)
config_file = make_file(tmp_directory_path / "config.json", config)
catalog_file = make_file(tmp_directory_path / "catalog.json", catalog.json())
catalog_file = make_file(tmp_directory_path / "catalog.json", catalog.model_dump_json())
args = [
"read",
"--config",
Expand All @@ -191,7 +191,9 @@ def read(
args.extend(
[
"--state",
make_file(tmp_directory_path / "state.json", f"[{','.join([stream_state.json() for stream_state in state])}]"),
make_file(
tmp_directory_path / "state.json", f"[{','.join([stream_state.model_dump_json() for stream_state in state])}]"
),
]
)

Expand Down
21 changes: 11 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/utils/message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@


def get_stream_descriptor(message: AirbyteMessage) -> HashableStreamDescriptor:
if message.type == Type.RECORD:
return HashableStreamDescriptor(name=message.record.stream, namespace=message.record.namespace)
elif message.type == Type.STATE:
if not message.state.stream or not message.state.stream.stream_descriptor:
raise ValueError("State message was not in per-stream state format, which is required for record counts.")
return HashableStreamDescriptor(
name=message.state.stream.stream_descriptor.name, namespace=message.state.stream.stream_descriptor.namespace
)
else:
raise NotImplementedError(f"get_stream_descriptor is not implemented for message type '{message.type}'.")
match message.type:
case Type.RECORD:
return HashableStreamDescriptor(name=message.record.stream, namespace=message.record.namespace)
case Type.STATE:
if not message.state.stream or not message.state.stream.stream_descriptor:
raise ValueError("State message was not in per-stream state format, which is required for record counts.")
return HashableStreamDescriptor(
name=message.state.stream.stream_descriptor.name, namespace=message.state.stream.stream_descriptor.namespace
)
case _:
raise NotImplementedError(f"get_stream_descriptor is not implemented for message type '{message.type}'.")
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def emit_message(self) -> None:
Prints the exception as an AirbyteTraceMessage.
Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
"""
message = self.as_airbyte_message().json(exclude_unset=True)
message = self.as_airbyte_message().model_dump_json(exclude_unset=True)
filtered_message = filter_secrets(message)
print(filtered_message)

Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/python/cdk-migrations.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CDK Migration Guide

## Upgrading to 4.0.0

Updated the codebase to utilize new Python syntax features. As a result, support for Python 3.9 has been dropped. The minimum required Python version is now 3.10.

## Upgrading to 3.0.0
Version 3.0.0 of the CDK updates the `HTTPStream` class by reusing the `HTTPClient` under the hood.

Expand Down
Loading

0 comments on commit f5bea19

Please sign in to comment.