Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot get actors to work with async / await #8940

Open
Joehunk opened this issue Nov 16, 2024 · 2 comments
Open

Cannot get actors to work with async / await #8940

Joehunk opened this issue Nov 16, 2024 · 2 comments

Comments

@Joehunk
Copy link

Joehunk commented Nov 16, 2024

I have been searching for an actor framework in Python with good asyncio integration and appear to have found it!! However, running the examples from the docs on using actors with async/await yields an error. The complete code I am trying to run (copied from the examples) follows:

import asyncio
from dask.distributed import Client

class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

    def add(self, x):
        self.n += x
        return self.n
  

async def test_async(client: Client) -> None:
    counter = await client.submit(Counter, actor=True)

    await counter.increment()
    n = await counter.n
    print(f'n = {n}')


if __name__ == '__main__':
    client = Client()
    asyncio.run(test_async(client))

Here is the full text of the error:

Traceback (most recent call last):
  File "D:\Users\charl\Documents\2_dask\dask_test.py", line 30, in <module>
    asyncio.run(test_async(client))
  File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 684, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "D:\Users\charl\Documents\2_dask\dask_test.py", line 21, in test_async
    counter = await client.submit(Counter, actor=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\charl\Documents\2_dask\.venv\Lib\site-packages\distributed\client.py", line 620, in __await__
    return self.result().__await__()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\charl\Documents\2_dask\.venv\Lib\site-packages\distributed\actor.py", line 165, in __getattr__
    attr = getattr(self._cls, key)
           ^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: type object 'Counter' has no attribute '__await__'. Did you mean: '__init__'?
2024-11-16 13:03:27,968 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:62438' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'Counter-23599479-2f43-4126-92ac-58b1f172a47e'} (stimulus_id='handle-worker-cleanup-1731783807.9682791')

Also as a side note, this text from the example using async in an actor function appears to have a typo (should be class Waiter not def Waiter)

@mrocklin
Copy link
Member

Dask clients create their own event loop if invoked normally (most users don't understand async, so we do some magic here). You're creating a client in a synchronous way and then trying to use it within a different event loop, which is confusing things. The client will switch to async mode automatically if the code is run within its own event loop (there should be some attribute on the Client for this if memory serves) or you can make a client within your async code. Here is a version of your code that does work:

import asyncio
from dask.distributed import Client

class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

    def add(self, x):
        self.n += x
        return self.n


async def test_async() -> None:
    async with Client(asynchronous=True) as client:
        counter = await client.submit(Counter, actor=True)

        await counter.increment()
        n = await counter.n
        print(f'n = {n}')


if __name__ == '__main__':
    asyncio.run(test_async())

@jacobtomlinson jacobtomlinson transferred this issue from dask/community Nov 18, 2024
@jacobtomlinson
Copy link
Member

Transferring this issue to dask/distributed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants