Skip to content

Commit

Permalink
fix: Missing notification of bgtask cancellation or failure when shut…
Browse files Browse the repository at this point in the history
…ting down the server (#2579)

Co-authored-by: Joongi Kim <joongi@lablup.com>
Backported-from: main (24.12)
Backported-to: 23.09
Backport-of: 2579
  • Loading branch information
jopemachine and achimnol committed Oct 24, 2024
1 parent 204e0b1 commit 16e514a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions changes/2579.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix missing notification of cancellation or failure of background tasks when shutting down the server
41 changes: 41 additions & 0 deletions src/ai/backend/common/bgtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,47 @@ async def push_bgtask_events(
A aiohttp-based server-sent events (SSE) responder that pushes the bgtask updates
to the clients.
"""
async with sse_response(request) as resp:
try:
async for event, extra_data in self.poll_bgtask_event(task_id):
body: dict[str, Any] = {
"task_id": str(event.task_id),
"message": event.message,
}
match event:
case BgtaskUpdatedEvent():
body["current_progress"] = event.current_progress
body["total_progress"] = event.total_progress
await resp.send(json.dumps(body), event=event.name, retry=5)
case BgtaskDoneEvent():
if extra_data:
body.update(extra_data)
await resp.send(
json.dumps(body), event="bgtask_" + extra_data["status"]
)
else:
await resp.send("{}", event="bgtask_done")
await resp.send("{}", event="server_close")
case BgtaskCancelledEvent():
await resp.send(json.dumps(body), event="bgtask_cancelled")
await resp.send("{}", event="server_close")
case BgtaskFailedEvent():
await resp.send(json.dumps(body), event="bgtask_failed")
await resp.send("{}", event="server_close")
except:
log.exception("")
raise
finally:
return resp

async def poll_bgtask_event(
self,
task_id: uuid.UUID,
) -> AsyncIterator[tuple[BgtaskEvents, dict]]:
"""
RHS of return tuple will be filled with extra informations when needed
(e.g. progress information of task when callee is trying to poll information of already completed one)
"""
tracker_key = f"bgtask.{task_id}"
redis_producer = self.event_producer.redis_client
task_info = await redis_helper.execute(
Expand Down

0 comments on commit 16e514a

Please sign in to comment.