Skip to content

Commit

Permalink
Resolving issue caused by nested uses of the BusClient
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcharnock committed Aug 23, 2023
1 parent c8162d6 commit 0bb70f3
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 22 deletions.
14 changes: 11 additions & 3 deletions lightbus/client/subclients/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ async def fire_event(
raise InvalidEventArguments(
"Unexpected argument supplied when firing event {}.{}. Attempted to fire event with"
" {} arguments: {}. Unexpected argument(s): {}".format(
api_name, name, len(kwargs), sorted(kwargs.keys()), sorted(extra_arguments),
api_name,
name,
len(kwargs),
sorted(kwargs.keys()),
sorted(extra_arguments),
)
)

Expand Down Expand Up @@ -166,7 +170,6 @@ def get_event_listener(self, api_name: str, listener_name: str):
async def _on_message(
self, event_message: EventMessage, listener: Callable, options: dict, on_error: OnError
):

# TODO: Check events match those requested
logger.info(
L(
Expand Down Expand Up @@ -194,7 +197,12 @@ async def _on_message(
# put any errors into Lightbus' error queue, and therefore
# cause a shutdown
await queue_exception_checker(
run_user_provided_callable(listener, args=[event_message], kwargs=parameters),
run_user_provided_callable(
listener,
args=[event_message],
kwargs=parameters,
type_name="listener",
),
self.error_queue,
help=(
f"An error occurred while {listener} was handling an event. Lightbus will now"
Expand Down
6 changes: 4 additions & 2 deletions lightbus/client/subclients/rpc_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async def consume_rpcs(self, apis: List[Api] = None):
async def call_rpc_remote(
self, api_name: str, name: str, kwargs: dict = frozendict(), options: dict = frozendict()
):
""" Perform an RPC call
"""Perform an RPC call
Call an RPC and return the result.
"""
Expand Down Expand Up @@ -176,7 +176,9 @@ async def _call_rpc_local(self, api_name: str, name: str, kwargs: dict = frozend
method = getattr(api, name)
if self.config.api(api_name).cast_values:
kwargs = cast_to_signature(kwargs, method)
result = await run_user_provided_callable(method, args=[], kwargs=kwargs)
result = await run_user_provided_callable(
method, args=[], kwargs=kwargs, type_name="rpc"
)
except (asyncio.CancelledError, SuddenDeathException):
raise
except Exception as e:
Expand Down
3 changes: 0 additions & 3 deletions lightbus/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ def _handle(self, args, config, plugin_registry: PluginRegistry):
bus: BusPath
bus_module, bus = command_utilities.import_bus(args)

if isinstance(bus.client, ThreadLocalClientProxy):
bus.client.disable_proxy()

# Convert only & skip into a list of features to enable
if args.only or args.skip:
if args.only:
Expand Down
12 changes: 8 additions & 4 deletions lightbus/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def create(
flask: bool = False,
**kwargs,
) -> BusPath:
logger.debug("Creating BusClient")
client_proxy = ThreadLocalClientProxy(
partial(
create_client,
Expand Down Expand Up @@ -289,6 +290,12 @@ def proxied_client(self):
if not self.enabled:
return self.main_client

thread_name = threading.current_thread().name
if thread_name.startswith("hook_"):
# Hooks also get direct access to the main client so they can
# setup listeners
return self.main_client

if not hasattr(self.local, "client"):
logger.debug(f"Creating new client for thread {threading.current_thread().name}")
self.local.client = self._create_client()
Expand All @@ -297,10 +304,7 @@ def proxied_client(self):
def disable_proxy(self):
"""Disable the proxying and always use self.main_client
This is mainly useful within the lightbus worker (i.e. `lightbus run`).
In this case lightbus is in control of the process so can make sane choices
with regards its use of threads, rather than being at the mercy of some
other system (i.e. gunicorn, django's development server, etc)
This is useful in tests.
"""
self.enabled = False

Expand Down
10 changes: 8 additions & 2 deletions lightbus/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ async def execute(self, name, **kwargs):
for callback in self._hook_callbacks[key]:
await queue_exception_checker(
run_user_provided_callable(
callback, args=[], kwargs=dict(**self.extra_parameters, **kwargs)
callback,
args=[],
kwargs=dict(**self.extra_parameters, **kwargs),
type_name="hook",
),
self.error_queue,
)
Expand All @@ -43,7 +46,10 @@ async def execute(self, name, **kwargs):
key = CallbackKey(name, run_before_plugins=False)
for callback in self._hook_callbacks[key]:
await run_user_provided_callable(
callback, args=[], kwargs=dict(**self.extra_parameters, **kwargs)
callback,
args=[],
kwargs=dict(**self.extra_parameters, **kwargs),
type_name="hook",
)

def register_callback(self, name, fn, before_plugins=False):
Expand Down
10 changes: 4 additions & 6 deletions lightbus/utilities/async_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def cancel_and_log_exceptions(*tasks):
)


async def run_user_provided_callable(callable_, args, kwargs):
async def run_user_provided_callable(callable_, args, kwargs, type_name):
"""Run user provided code
If the callable is blocking (i.e. a regular function) it will be
Expand All @@ -131,9 +131,7 @@ async def run_user_provided_callable(callable_, args, kwargs):
exception = e
else:
try:
thread_pool_executor = ThreadPoolExecutor(
thread_name_prefix="user_provided_callable_tpe"
)
thread_pool_executor = ThreadPoolExecutor(thread_name_prefix=f"{type_name}_user_tpe")
future = asyncio.get_event_loop().run_in_executor(
executor=thread_pool_executor, func=lambda: callable_(*args, **kwargs)
)
Expand All @@ -159,7 +157,7 @@ async def call_every(*, callback, timedelta: datetime.timedelta, also_run_immedi
while True:
start_time = time()
if not first_run or also_run_immediately:
await run_user_provided_callable(callback, args=[], kwargs={})
await run_user_provided_callable(callback, args=[], kwargs={}, type_name="background")
total_execution_time = time() - start_time
sleep_time = max(0.0, timedelta.total_seconds() - total_execution_time)
await asyncio.sleep(sleep_time)
Expand All @@ -173,7 +171,7 @@ async def call_on_schedule(callback, schedule: "Job", also_run_immediately: bool

if not first_run or also_run_immediately:
schedule.last_run = datetime.datetime.now()
await run_user_provided_callable(callback, args=[], kwargs={})
await run_user_provided_callable(callback, args=[], kwargs={}, type_name="background")

td = schedule.next_run - datetime.datetime.now()
await asyncio.sleep(td.total_seconds())
Expand Down
1 change: 1 addition & 0 deletions lightbus_examples/ex01_quickstart/another_service/bus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# File: ./another_service/bus.py
import lightbus
from lightbus.utilities.async_tools import block

bus = lightbus.create()

Expand Down
4 changes: 2 additions & 2 deletions tests/utilities/test_async_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def call_me(a, b):
nonlocal called
called = True

await run_user_provided_callable(call_me, args=[1], kwargs={"b": 2})
await run_user_provided_callable(call_me, args=[1], kwargs={"b": 2}, type_name="test")
assert called


Expand All @@ -203,5 +203,5 @@ async def call_me(a, b):
nonlocal called
called = True

await run_user_provided_callable(call_me, args=[1], kwargs={"b": 2})
await run_user_provided_callable(call_me, args=[1], kwargs={"b": 2}, type_name="test")
assert called

0 comments on commit 0bb70f3

Please sign in to comment.