Skip to content

Commit

Permalink
Report busy job metrics for correct queues
Browse files Browse the repository at this point in the history
  • Loading branch information
karls committed Nov 30, 2023
1 parent 969e173 commit 7e0675e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 8 deletions.
14 changes: 7 additions & 7 deletions judoscale/celery/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ def collect(self) -> List[Metric]:
if not self.should_collect:
return metrics

if self.adapter_config["TRACK_BUSY_JOBS"] and (
workers_tasks := self.inspect.active()
):
if self.adapter_config["TRACK_BUSY_JOBS"]:
busy_counts = defaultdict(lambda: 0)
for active_tasks in workers_tasks.values():
for task in active_tasks:
busy_counts[task["delivery_info"]["routing_key"]] += 1
if workers_tasks := self.inspect.active():
for active_tasks in workers_tasks.values():
for task in active_tasks:
busy_counts[task["delivery_info"]["routing_key"]] += 1

for queue, count in busy_counts.items():
for queue in self.queues:
count = busy_counts[queue]
metrics.append(Metric.for_busy_queue(queue_name=queue, busy_jobs=count))

logger.debug(f"Collecting metrics for queues {list(self.queues)}")
Expand Down
52 changes: 51 additions & 1 deletion tests/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def test_collect_with_busy_jobs(self, worker_1, celery, monkeypatch):
celery.connection_for_read().channel().client.lindex.return_value = bytes(
json.dumps({"properties": {"published_at": now - 60}}), "utf-8"
)
celery.connection_for_read().channel().client.llen.return_value = 1

worker_1["CELERY"] = {"TRACK_BUSY_JOBS": True}
collector = CeleryMetricsCollector(worker_1, celery)
Expand All @@ -240,6 +239,57 @@ def test_collect_with_busy_jobs(self, worker_1, celery, monkeypatch):
assert metrics[1].queue_name == "foo"
assert metrics[1].value == approx(60000, abs=100)

def test_collect_with_busy_jobs_and_user_defined_queues(
self, worker_1, celery, monkeypatch
):
now = time.time()

inspect = Mock()
inspect.active.return_value = {
"some_worker": [{"name": "a_task", "delivery_info": {"routing_key": "foo"}}]
}

monkeypatch.setattr(celery.control, "inspect", lambda: inspect)
celery.connection_for_read().channel().client.scan_iter.return_value = [
b"foo",
]

def mock_lindex(queue, _):
return {
"bar": {},
"foo": bytes(
json.dumps({"properties": {"published_at": now - 60}}), "utf-8"
),
}[queue]

monkeypatch.setattr(
celery.connection_for_read().channel().client, "lindex", mock_lindex
)

worker_1["CELERY"] = {"QUEUES": ["foo", "bar"], "TRACK_BUSY_JOBS": True}
collector = CeleryMetricsCollector(worker_1, celery)
metrics = collector.collect()

assert len(metrics) == 4
metrics = sorted(metrics, key=lambda m: m.queue_name)
metrics = sorted(metrics, key=lambda m: m.measurement)

assert metrics[0].measurement == "busy"
assert metrics[0].queue_name == "bar"
assert metrics[0].value == 0

assert metrics[1].measurement == "busy"
assert metrics[1].queue_name == "foo"
assert metrics[1].value == 1

assert metrics[2].measurement == "queue_time"
assert metrics[2].queue_name == "bar"
assert metrics[2].value == 0

assert metrics[3].measurement == "queue_time"
assert metrics[3].queue_name == "foo"
assert metrics[3].value == approx(60000, abs=100)


class TestRQMetricsCollector:
def test_adapter_config(self, render_worker):
Expand Down

0 comments on commit 7e0675e

Please sign in to comment.