You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have been searching for an actor framework in Python with good asyncio integration and appear to have found it!! However, running the examples from the docs on using actors with async/await yields an error. The complete code I am trying to run (copied from the examples) follows:
importasynciofromdask.distributedimportClientclassCounter:
""" A simple class to manage an incrementing counter """n=0def__init__(self):
self.n=0defincrement(self):
self.n+=1returnself.ndefadd(self, x):
self.n+=xreturnself.nasyncdeftest_async(client: Client) ->None:
counter=awaitclient.submit(Counter, actor=True)
awaitcounter.increment()
n=awaitcounter.nprint(f'n = {n}')
if__name__=='__main__':
client=Client()
asyncio.run(test_async(client))
Here is the full text of the error:
Traceback (most recent call last):
File "D:\Users\charl\Documents\2_dask\dask_test.py", line 30, in <module>
asyncio.run(test_async(client))
File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\charl\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 684, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "D:\Users\charl\Documents\2_dask\dask_test.py", line 21, in test_async
counter = await client.submit(Counter, actor=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\Users\charl\Documents\2_dask\.venv\Lib\site-packages\distributed\client.py", line 620, in __await__
return self.result().__await__()
^^^^^^^^^^^^^^^^^^^^^^^
File "D:\Users\charl\Documents\2_dask\.venv\Lib\site-packages\distributed\actor.py", line 165, in __getattr__
attr = getattr(self._cls, key)
^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: type object 'Counter' has no attribute '__await__'. Did you mean: '__init__'?
2024-11-16 13:03:27,968 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:62438' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'Counter-23599479-2f43-4126-92ac-58b1f172a47e'} (stimulus_id='handle-worker-cleanup-1731783807.9682791')
Dask clients create their own event loop if invoked normally (most users don't understand async, so we do some magic here). You're creating a client in a synchronous way and then trying to use it within a different event loop, which is confusing things. The client will switch to async mode automatically if the code is run within its own event loop (there should be some attribute on the Client for this if memory serves) or you can make a client within your async code. Here is a version of your code that does work:
importasynciofromdask.distributedimportClientclassCounter:
""" A simple class to manage an incrementing counter """n=0def__init__(self):
self.n=0defincrement(self):
self.n+=1returnself.ndefadd(self, x):
self.n+=xreturnself.nasyncdeftest_async() ->None:
asyncwithClient(asynchronous=True) asclient:
counter=awaitclient.submit(Counter, actor=True)
awaitcounter.increment()
n=awaitcounter.nprint(f'n = {n}')
if__name__=='__main__':
asyncio.run(test_async())
I have been searching for an actor framework in Python with good asyncio integration and appear to have found it!! However, running the examples from the docs on using actors with async/await yields an error. The complete code I am trying to run (copied from the examples) follows:
Here is the full text of the error:
Also as a side note, this text from the example using async in an actor function appears to have a typo (should be
class Waiter
notdef Waiter
)The text was updated successfully, but these errors were encountered: