Skip to content

Commit

Permalink
fix: Improve automatic queue finding for Celery (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
karls authored Jun 3, 2024
1 parent 20ab90a commit 8662e5b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
17 changes: 11 additions & 6 deletions judoscale/celery/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ def __init__(self, config: Config, broker: Celery):
logger.debug(f"Redis is at {self.redis.connection_pool}")

system_queues = {"unacked", "unacked_index"}
user_queues = {
q.decode("utf-8") if isinstance(q, bytes) else q
for q in self.redis.scan_iter(match="[^_]*", _type="list")
}
logger.debug(f"Found initial queues: {list(user_queues)}")
self._celery_queues = user_queues - system_queues
for q in self.redis.scan_iter(_type="list"):
queue_name = q.decode() if isinstance(q, bytes) else q
if (
queue_name in system_queues
or queue_name.startswith("_kombu")
or queue_name.endswith("celery.pidbox")
):
continue
self._celery_queues.add(queue_name)

logger.debug(f"Found initial queues: {list(self._celery_queues)}")
self.task_sent_handler.start()

@property
Expand Down
11 changes: 11 additions & 0 deletions tests/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ def test_queues_empty(self, heroku_worker_1, celery):
collector = CeleryMetricsCollector(heroku_worker_1, celery)
assert collector.queues == set()

def test_removing_various_celery_queues(self, heroku_worker_1, celery):
celery.connection_for_read().channel().client.scan_iter.return_value = [
b"unacked",
b"unacked_index",
b"_kombu.binding.celeryev",
b"e752fa70-f772-3c04-b05d-79b2a79ce766.reply.celery.pidbox",
b"user_queue",
]
collector = CeleryMetricsCollector(heroku_worker_1, celery)
assert collector.queues == {"user_queue"}

def test_collect_empty_queue(self, worker_1, celery):
celery.connection_for_read().channel().client.scan_iter.return_value = [b"foo"]
celery.connection_for_read().channel().client.lindex.return_value = None
Expand Down

0 comments on commit 8662e5b

Please sign in to comment.