From 20b63ec1500afa9b17214fcd249737b57bda4e53 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Thu, 24 Oct 2024 17:31:14 +0900 Subject: [PATCH] fix: Keep polling when GPFS job is running (#2961) Backported-from: main (24.12) Backported-to: 24.03 Backport-of: 2961 --- changes/2961.fix.md | 1 + src/ai/backend/storage/gpfs/gpfs_client.py | 34 ++++++++++++++-------- 2 files changed, 23 insertions(+), 12 deletions(-) create mode 100644 changes/2961.fix.md diff --git a/changes/2961.fix.md b/changes/2961.fix.md new file mode 100644 index 0000000000..5dde2ce6ec --- /dev/null +++ b/changes/2961.fix.md @@ -0,0 +1 @@ +Let GPFS client keep polling when GPFS job is running diff --git a/src/ai/backend/storage/gpfs/gpfs_client.py b/src/ai/backend/storage/gpfs/gpfs_client.py index 3fe642b83c..1342ab2abe 100644 --- a/src/ai/backend/storage/gpfs/gpfs_client.py +++ b/src/ai/backend/storage/gpfs/gpfs_client.py @@ -8,7 +8,13 @@ import aiohttp from aiohttp import BasicAuth, web -from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_fixed +from tenacity import ( + AsyncRetrying, + TryAgain, + retry_if_exception_type, + stop_after_attempt, + wait_fixed, +) from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import BinarySize @@ -128,23 +134,27 @@ async def _build_request( except web.HTTPUnauthorized: raise GPFSUnauthorizedError - async def _wait_for_job_done(self, jobs: List[GPFSJob]) -> None: + async def _wait_for_job_done(self, jobs: list[GPFSJob]) -> None: for job_to_wait in jobs: async for attempt in AsyncRetrying( wait=wait_fixed(0.5), - stop=stop_after_attempt(100), - retry=retry_if_exception_type(web.HTTPNotFound), + stop=stop_after_attempt(120), + retry=retry_if_exception_type(TryAgain) | retry_if_exception_type(web.HTTPNotFound), ): with attempt: job = await self.get_job(job_to_wait.jobId) - if job.status == GPFSJobStatus.COMPLETED: - return - elif job.status == GPFSJobStatus.FAILED: - raise GPFSJobFailedError( - job.result.to_json() if job.result is not None else "" - ) - elif job.status == GPFSJobStatus.CANCELLED: - raise GPFSJobCancelledError + match job.status: + case GPFSJobStatus.RUNNING | GPFSJobStatus.CANCELLING: + raise TryAgain + case GPFSJobStatus.COMPLETED: + return + case GPFSJobStatus.FAILED: + log.error(f"Failed to run GPFS job. (e:{str(jobs)})") + raise GPFSJobFailedError( + job.result.to_json() if job.result is not None else "" + ) + case GPFSJobStatus.CANCELLED: + raise GPFSJobCancelledError @contextlib.asynccontextmanager async def _build_session(self) -> AsyncIterator[aiohttp.ClientSession]: