Skip to content

Commit

Permalink
perf(airbyte-cdk): performance enhancement:: add PrintBuffer (#42568)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 authored Aug 2, 2024
1 parent c906540 commit 90f7e40
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 6 deletions.
11 changes: 6 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.connector_state_manager import HashableStreamDescriptor
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
from airbyte_cdk.utils import is_cloud_environment, message_utils
from airbyte_cdk.utils import PrintBuffer, is_cloud_environment, message_utils
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -232,10 +232,11 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
def launch(source: Source, args: List[str]) -> None:
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="", flush=True)
with PrintBuffer():
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="", flush=True)


def _init_internal_request_filter() -> None:
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
from .is_cloud_environment import is_cloud_environment
from .schema_inferrer import SchemaInferrer
from .traced_exception import AirbyteTracedException
from .print_buffer import PrintBuffer

__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment"]
__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment", "PrintBuffer"]
70 changes: 70 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/print_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import sys
import time
from io import StringIO
from threading import RLock
from types import TracebackType
from typing import Optional


class PrintBuffer:
"""
A class to buffer print statements and flush them at a specified interval.
The PrintBuffer class is designed to capture and buffer output that would
normally be printed to the standard output (stdout). This can be useful for
scenarios where you want to minimize the number of I/O operations by grouping
multiple print statements together and flushing them as a single operation.
Attributes:
buffer (StringIO): A buffer to store the messages before flushing.
flush_interval (float): The time interval (in seconds) after which the buffer is flushed.
last_flush_time (float): The last time the buffer was flushed.
lock (RLock): A reentrant lock to ensure thread-safe operations.
Methods:
write(message: str) -> None:
Writes a message to the buffer and flushes if the interval has passed.
flush() -> None:
Flushes the buffer content to the standard output.
__enter__() -> "PrintBuffer":
Enters the runtime context related to this object, redirecting stdout and stderr.
__exit__(exc_type, exc_val, exc_tb) -> None:
Exits the runtime context and restores the original stdout and stderr.
"""

def __init__(self, flush_interval: float = 0.1):
self.buffer = StringIO()
self.flush_interval = flush_interval
self.last_flush_time = time.monotonic()
self.lock = RLock()

def write(self, message: str) -> None:
with self.lock:
self.buffer.write(message)
current_time = time.monotonic()
if (current_time - self.last_flush_time) >= self.flush_interval:
self.flush()
self.last_flush_time = current_time

def flush(self) -> None:
with self.lock:
combined_message = self.buffer.getvalue()
sys.__stdout__.write(combined_message) # type: ignore[union-attr]
self.buffer = StringIO()

def __enter__(self) -> "PrintBuffer":
self.old_stdout, self.old_stderr = sys.stdout, sys.stderr
# Used to disable buffering during the pytest session, because it is not compatible with capsys
if "pytest" not in str(type(sys.stdout)).lower():
sys.stdout = self
sys.stderr = self
return self

def __exit__(self, exc_type: Optional[BaseException], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]) -> None:
self.flush()
sys.stdout, sys.stderr = self.old_stdout, self.old_stderr

0 comments on commit 90f7e40

Please sign in to comment.