Skip to content

Commit

Permalink
✨ Add iter_pages to turn operation methods to async iterators.
Browse files Browse the repository at this point in the history
  • Loading branch information
rafalkrupinski committed Oct 20, 2024
1 parent bf58da5 commit 0970940
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and the format of this file is based on [Keep a Changelog](https://keepachangelo

### Added
- Accept session_factory in `ClientBase.__init__`.
- Helper function to iterate over pages.

### Fixed
- Handling collections in request bodies.
Expand Down
2 changes: 2 additions & 0 deletions src/lapidary/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'delete',
'get',
'head',
'iter_pages',
'patch',
'post',
'put',
Expand All @@ -38,4 +39,5 @@
from .model.error import HttpErrorResponse, LapidaryError, LapidaryResponseError, UnexpectedResponse
from .model.param_serialization import Form, FormExplode, SimpleMultimap, SimpleString
from .operation import delete, get, head, patch, post, put, trace
from .paging import iter_pages
from .types_ import ClientArgs, NamedAuth, SecurityRequirements, SessionFactory
58 changes: 58 additions & 0 deletions src/lapidary/runtime/paging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from collections.abc import AsyncIterable, Awaitable, Callable
from typing import Optional, TypeVar

from typing_extensions import ParamSpec, Unpack

P = ParamSpec('P')
R = TypeVar('R')
C = TypeVar('C')


def iter_pages(
fn: Callable[P, Awaitable[R]],
cursor_param_name: str,
get_cursor: Callable[[R], Optional[C]],
) -> Callable[P, AsyncIterable[R]]:
"""
Create a function that returns an async iterator over pages from the async operation function :param:`fn`.
The returned function can be called with the same parameters as :param:`fn` (except for the cursor parameter),
and returns an async iterator that yields results from :param:`fn`, handling pagination automatically.
The function :param:`fn` will be called initially without the cursor parameter and then called with the cursor parameter
as long as :param:`get_cursor` can extract a cursor from the result.
**Example:**
.. code:: python
async for page in iter_pages(client.fn, 'cursor', extractor_fn)(parameter=value):
# Process page
Typically, an API will use the same paging pattern for all operations supporting it, so it's a good idea to write a shortcut function:
.. code:: python
from lapidary.runtime import iter_pages as _iter_pages
def iter_pages[P, R](fn: Callable[P, Awaitable[R]]) -> Callable[P, AsyncIterable[R]]:
return _iter_pages(fn, 'cursor', lambda result: ...)
:param fn: An async function that retrieves a page of data.
:param cursor_param_name: The name of the cursor parameter in :param:`fn`.
:param get_cursor: A function that extracts a cursor value from the result of :param:`fn`. Return `None` to end the iteration.
"""

async def wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterable[R]:
result = await fn(*args, **kwargs) # type: ignore[call-arg]
yield result
cursor = get_cursor(result)

while cursor:
kwargs[cursor_param_name] = cursor
result = await fn(*args, **kwargs) # type: ignore[call-arg]
yield result

cursor = get_cursor(result)

return wrapper

0 comments on commit 0970940

Please sign in to comment.