Skip to content

CaliDog/asyncpool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AsyncPool

Asyncio coroutine worker pool. No more juggling bounded semaphores and annoying timeouts, and allows you to run through millions of pieces of data efficiently.

Adapted from the awesome worker pool found at https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d

Installation

pip install asyncpool

Example Usage

import asyncpool
import logging
import asyncio

async def example_coro(initial_number, result_queue):
    print("Processing Value! -> {} * 2 = {}".format(initial_number, initial_number * 2))
    await asyncio.sleep(1)
    await result_queue.put(initial_number * 2)

async def result_reader(queue):
    while True:
        value = await queue.get()
        if value is None:
            break
        print("Got value! -> {}".format(value))

async def run():
    result_queue = asyncio.Queue()

    reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)

    # Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
    async with asyncpool.AsyncPool(loop, num_workers=10, name="ExamplePool",
                             logger=logging.getLogger("ExamplePool"),
                             worker_co=example_coro, max_task_time=300,
                             log_every_n=10) as pool:
        for i in range(50):
            await pool.push(i, result_queue)

    await result_queue.put(None)
    await reader_future

loop = asyncio.get_event_loop()

loop.run_until_complete(run())

About

Asyncio coroutine worker pool

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages