Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean-up code is not executed in some cases with streaming responses if client cancels the request #412

Open
vrslev opened this issue Oct 4, 2024 · 11 comments

Comments

@vrslev
Copy link

vrslev commented Oct 4, 2024

This is continuation of the discussion from #286 (comment)

Hello! I experience an issue when running Litestar with Granian that I do not have when use Uvicorn. In the previous discussion I don't think I've provided a sufficient illustration.

Here’s a new one:

import asyncio
import math
import typing

import litestar
from litestar.response import ServerSentEvent


async def iter_sse_data() -> typing.AsyncIterable[str]:
    yield "hi"
    try:
        print("about to sleep indefinitely")
        await asyncio.sleep(math.inf)
    finally:
        print("cleaned up")


@litestar.get("/")
async def root() -> ServerSentEvent:
    return ServerSentEvent(iter_sse_data())


app = litestar.Litestar([root])
  1. Run the app with Granian.
  2. Run curl http://localhost:8000/, wait for data: hi and press Ctrl+C.
  3. See that logs do not contain cleaned up.

Run the app with Uvicorn—and you’ll see that cleaned up was printed.

@vrslev
Copy link
Author

vrslev commented Oct 4, 2024

I'm happy to file an issue in Litestar repository, but it seems weird to me that Uvicorn behavior is different.

@gi0baro
Copy link
Member

gi0baro commented Oct 10, 2024

Hi @vrslev,
I probably need to clarify the underlying Granian behaviour before everything else, as there is a major difference when compared to uvicorn – which is indeed the difference you experience when comparing the two:

Granian won't be able to notify to the application when the connection gets closed by the client. As explained in #286, this is due to the design of the Hyper crate, which is used by Granian to deal with HTTP protocol. Uvicorn, on the other side, rely on asyncio Protocol, which includes this capability.
This means Granian will notify the application about the closed connection only on a receive or send operation. You can think about this difference in the same way of eager and lazy code.

Now, getting back to the various MREs you provided, we can divide those into two main categories:

  • cases in which you keep receiving/sending data
  • cases in which you block indefinitely waiting for a connection close

Now, while the former ones should work as expected in Granian, as the only difference when compared to Uvicorn is that you delay the application cancellation to the time between two send operations, the latter cannot be implemented at all in Granian (which is what #286 OP is about).

To my perspective, given all of your examples are based on SSE, you should be able to do cleanup operations with no issues, as soon as you don't block the send iterator and use a separated asyncio task to achieve the result.
Conditions in which an async iterator responsible of sending data blocks indefinitely are borderline cases for SSE, and should probably be implemented otherwise (websockets sounds like a better option to me).

Which is also why #286 is related, but not representative of your case: the whole point behind that issue is to stop running expensive operations if the client disconnects, not to perform blocking operations before/during/after the request.

To summarise: while #286 might be resolved in the future (but again, that really depends on the underlying libraries), it is quite hard for me to imagine granian would ever support blocking streams in an async protocol – to be even more clear, even if #286 will be fixed, it won't change the fact blocking an iterator forever will block the whole app forever.

@vrslev
Copy link
Author

vrslev commented Oct 10, 2024

Hi @gi0baro! Thanks for rich clarification!

My initial MRE contained Redis BLPOP that block indefinitely:

import contextlib
import typing

import litestar
import redis.asyncio
from litestar.response import ServerSentEvent


@contextlib.asynccontextmanager
async def create_redis_client() -> typing.AsyncGenerator[redis.asyncio.Redis, None]:
    redis_client = redis.asyncio.Redis(host="localhost", port="6379")
    try:
        await redis_client.initialize()
        print("initialized redis client")
        yield redis_client
    finally:
        await redis_client.aclose()
        print("closed redis client")


redis_list_key = "whatever"


async def iter_sse_data() -> typing.AsyncIterable[str]:
    async with create_redis_client() as redis_client:
        while True:
            try:
                # BLPOP blocks redis client until an item in list is available,
                # i. e. you can't do anything with the client while waiting here.
                _list_key, event_content = await redis_client.blpop(redis_list_key)
            except BaseException as exception:
                print("caught an exception from blpop:", exception)
                raise exception

            yield event_content


@litestar.get("/sse")
async def root() -> ServerSentEvent:
    return ServerSentEvent(iter_sse_data())


app = litestar.Litestar([root])

Removing blocking operation and changing clean-up step to be executed in separate asyncio task, indeed, makes it work as expected:

import asyncio
import contextlib
import typing

import litestar
import redis.asyncio
from litestar.response import ServerSentEvent


@contextlib.asynccontextmanager
async def create_redis_client() -> typing.AsyncGenerator[redis.asyncio.Redis, None]:
    redis_client = redis.asyncio.Redis(host="localhost", port="6379")
    try:
        await redis_client.initialize()
        print("initialized redis client")
        yield redis_client
    finally:

        async def close_redis_client() -> None:
            await redis_client.aclose()
            print("closed redis client")

        asyncio.create_task(close_redis_client())


async def iter_sse_data() -> typing.AsyncIterable[str]:
    async with create_redis_client() as redis_client:
        while True:
            if blpop_result := await redis_client.blpop("whatever", timeout=5):
                _list_key, event_content = blpop_result
                yield event_content
            else:
                yield None


@litestar.get("/")
async def root() -> ServerSentEvent:
    return ServerSentEvent(iter_sse_data())


app = litestar.Litestar([root])

@gi0baro
Copy link
Member

gi0baro commented Oct 10, 2024

@vrslev out of curiosity, any specific reason for using a single list key with blpop instead of pub/sub?

@vrslev
Copy link
Author

vrslev commented Oct 10, 2024

We needed exactly-once delivery with persistence

@vrslev
Copy link
Author

vrslev commented Oct 10, 2024

Also, it's one-to-one, so there's no need for anything more sotisficated

@vrslev
Copy link
Author

vrslev commented Oct 10, 2024

As I remember, sub in Redis is blocking operation as well

@gi0baro
Copy link
Member

gi0baro commented Oct 10, 2024

As I remember, sub in Redis is blocking operation as well

It's an async iterable, so for the specific context yes. The main difference is that it doesn't lock the entire redis client, so you won't need to initialize/destroy the client per every request.

@vrslev
Copy link
Author

vrslev commented Oct 11, 2024

Hi again! As I can see, there're two issues:

Litestar immediately cancels execution of async iterator when client disconnects

To my perspective, given all of your examples are based on SSE, you should be able to do cleanup operations with no issues, as soon as you don't block the send iterator and use a separated asyncio task to achieve the result.

This can be mitigated with use of anyio.CancelScope(shield=True) in case of Uvicorn, and with running the clean-up in separate task (asyncio.create_task()) in case of Granian.

I'm not sure who'd be responsible for this issue: Litestar or Granian, since the behavior differs between Uvicorn and Granian. I've posted the issue for Litestar before starting discussion in #286: litestar-org/litestar#3772, though now I think it should be updated but not sure how exactly.

Support for blocking streams in async protocol

it is quite hard for me to imagine granian would ever support blocking streams in an async protocol

This one is when you block async iterator indefinitely. MRE in the first comment of this issue is sufficient, though the title should be updated probably.

As I understand, it can be a "wontfix" issue, but it definitely belongs to Granian.

@gi0baro
Copy link
Member

gi0baro commented Oct 15, 2024

I'd say litestar-org/litestar#3772 should be addressed on their side (based on their own judgement) independently of Granian – also 'cause I can't see any way for Granian to fix that on its side. On the fact that the solution appears to be different from Uvicorn, I'd say the separated task should work on Uvicorn as well – but I need to better check the code to be sure.

On the other aspect, I think it can be also seen as an indirect consequence of #286, meaning that having #286 fixed would at least give options to manually handle a blocking case using cancellation as a trigger. So if we come to the agreement that this is not related to SSE/iterators per se, I'm willing to merge the two issues.

@vrslev
Copy link
Author

vrslev commented Oct 16, 2024

Understood 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants