Skip to content

Commit

Permalink
feat(events): run listeners concurrently (#2083)
Browse files Browse the repository at this point in the history
* feat(events): run listeners concurrently

* fix: flaky tests
  • Loading branch information
provinzkraut authored Jul 28, 2023
1 parent 7d48a88 commit dddd5de
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
9 changes: 6 additions & 3 deletions litestar/events/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import AsyncExitStack
from functools import partial
from typing import TYPE_CHECKING, Any, Sequence

if sys.version_info < (3, 9):
Expand Down Expand Up @@ -78,15 +79,17 @@ def __init__(self, listeners: Sequence[EventListener]) -> None:

@staticmethod
async def _worker(receive_stream: MemoryObjectReceiveStream) -> None:
"""Worker that runs in a separate task and continuously pulls events from asyncio queue.
"""Run items from ``receive_stream`` in a task group.
Returns:
None
"""
async with receive_stream:
async with receive_stream, anyio.create_task_group() as task_group:
async for item in receive_stream:
fn, args, kwargs = item
await fn(*args, **kwargs)
if kwargs:
fn = partial(fn, **kwargs)
task_group.start_soon(fn, *args)

async def __aenter__(self) -> SimpleEventEmitter:
self._exit_stack = AsyncExitStack()
Expand Down
31 changes: 15 additions & 16 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from time import sleep
from typing import Any
from unittest.mock import MagicMock

Expand Down Expand Up @@ -37,17 +36,19 @@ async def listener_fn(*args: Any, **kwargs: Any) -> None:
return listener_fn


@pytest.mark.parametrize("listener", [lazy_fixture("sync_listener"), lazy_fixture("async_listener")])
def test_event_listener(mock: MagicMock, listener: EventListener, anyio_backend: AnyIOBackend) -> None:
@pytest.mark.parametrize("event_listener", [lazy_fixture("sync_listener"), lazy_fixture("async_listener")])
def test_event_listener(mock: MagicMock, event_listener: EventListener, anyio_backend: AnyIOBackend) -> None:
@get("/")
def route_handler(request: Request[Any, Any, Any]) -> None:
request.app.emit("test_event", "positional", keyword="keyword-value")

with create_test_client(route_handlers=[route_handler], listeners=[listener], backend=anyio_backend) as client:
with create_test_client(
route_handlers=[route_handler], listeners=[event_listener], backend=anyio_backend
) as client:
response = client.get("/")
assert response.status_code == HTTP_200_OK
sleep(0.01)
mock.assert_called_with("positional", keyword="keyword-value")

mock.assert_called_with("positional", keyword="keyword-value")


async def test_shutdown_awaits_pending(async_listener: EventListener, mock: MagicMock) -> None:
Expand All @@ -69,9 +70,9 @@ def route_handler(request: Request[Any, Any, Any]) -> None:
route_handlers=[route_handler], listeners=[async_listener, sync_listener], backend=anyio_backend
) as client:
response = client.get("/")
sleep(0.01)
assert response.status_code == HTTP_200_OK
assert mock.call_count == 2

assert mock.call_count == 2


def test_multiple_event_ids(mock: MagicMock, anyio_backend: AnyIOBackend) -> None:
Expand All @@ -84,14 +85,12 @@ def route_handler(request: Request[Any, Any, Any], event_id: int) -> None:
request.app.emit(f"test_event_{event_id}")

with create_test_client(route_handlers=[route_handler], listeners=[event_handler], backend=anyio_backend) as client:
response = client.get("/1")
sleep(0.01)
assert response.status_code == HTTP_200_OK
assert mock.call_count == 1
response = client.get("/2")
sleep(0.01)
assert response.status_code == HTTP_200_OK
assert mock.call_count == 2
response_1 = client.get("/1")
response_2 = client.get("/2")
assert response_1.status_code == HTTP_200_OK
assert response_2.status_code == HTTP_200_OK

assert mock.call_count == 2


async def test_raises_when_decorator_called_without_callable() -> None:
Expand Down

0 comments on commit dddd5de

Please sign in to comment.