-
-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvements to the hybrid_setup example #400
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #400 +/- ##
=======================================
Coverage 24.02% 24.02%
=======================================
Files 41 41
Lines 1286 1286
Branches 254 254
=======================================
Hits 309 309
Misses 949 949
Partials 28 28 ☔ View full report in Codecov by Sentry. |
It appears there’s a potential real bug with Celery using a failover broker where the worker spits out ---------------------------------------------------------------------------- gevent_worker_container: awesome_gagarin -----------------------------------------------------------------------------
Worker init handler called!
-------------- celery_test_worker@be92e51fcaac v5.5.0b3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.10.4-linuxkit-aarch64-with-glibc2.36 2024-09-09 10:21:23
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_test_app:0xffffb3e11090
- ** ---------- .> transport: amqp://guest:**@7c93855c3ebd:5672//
- ** ---------- .> results: redis://6fad1e8a1d98/0
- *** --- * --- .> concurrency: 42 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. pytest_celery.vendors.worker.tasks.add
. pytest_celery.vendors.worker.tasks.add_replaced
. pytest_celery.vendors.worker.tasks.fail
. pytest_celery.vendors.worker.tasks.identity
. pytest_celery.vendors.worker.tasks.noop
. pytest_celery.vendors.worker.tasks.ping
. pytest_celery.vendors.worker.tasks.sleep
. pytest_celery.vendors.worker.tasks.xsum
. tests.vendors.workers.tasks.identity
. tests.vendors.workers.tasks.job
. tests.vendors.workers.tasks.noop
[2024-09-09 10:21:23,734: INFO/MainProcess] Connected to amqp://guest:**@7c93855c3ebd:5672//
[2024-09-09 10:21:23,736: INFO/MainProcess] mingle: searching for neighbors
[2024-09-09 10:21:24,755: INFO/MainProcess] mingle: all alone
[2024-09-09 10:21:24,765: INFO/MainProcess] pidbox: Connected to amqp://guest:**@7c93855c3ebd:5672//.
[2024-09-09 10:21:24,770: INFO/MainProcess] celery_test_worker@be92e51fcaac ready.
[2024-09-09 10:21:26,019: INFO/MainProcess] sync with celery_test_worker@3b2b8e57edab
[2024-09-09 10:21:27,670: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 759, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.11/site-packages/celery/worker/loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 660, in recv
return self._sock.recv(*args)
^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer
[2024-09-09 10:21:27,672: WARNING/MainProcess] /usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2024-09-09 10:21:27,672: INFO/MainProcess] Temporarily reducing the prefetch count to 4 to avoid over-fetching since 0 tasks are currently being processed.
The prefetch count will be gradually restored to 0 as the tasks complete processing.
[2024-09-09 10:21:27,672: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "src/gevent/_waiter.py", line 122, in gevent._gevent_c_waiter.Waiter.switch
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/pidbox.py", line 118, in loop
connection.drain_events(timeout=1.0)
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 660, in recv
return self._sock.recv(*args)
^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess] ConnectionResetError: [Errno 104] Connection reset by peer
[2024-09-09 10:21:27,673: WARNING/MainProcess] 2024-09-09T10:21:27Z
[2024-09-09 10:21:27,673: WARNING/MainProcess]
[2024-09-09 10:21:27,673: WARNING/MainProcess] <built-in method switch of gevent._gevent_c_greenlet_primitives.TrackedRawGreenlet object at 0xffffb2c92000> failed with ConnectionResetError
[2024-09-09 10:21:27,674: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2024-09-09 10:21:27,677: INFO/MainProcess] Connected to amqp://guest:**@c3e0fa11984c:5672//
[2024-09-09 10:21:27,678: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2024-09-09 10:21:27,683: INFO/MainProcess] mingle: searching for neighbors
[2024-09-09 10:21:28,708: INFO/MainProcess] mingle: all alone
[2024-09-09 10:21:28,720: INFO/MainProcess] Task tests.vendors.workers.tasks.job[b50e6d29-75f1-4130-8e30-bdac7f6a4ab1] received
[2024-09-09 10:21:28,728: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[2f2315d4-930b-4cb1-96fc-0ceee484f503] received
[2024-09-09 10:21:28,732: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[f9818c20-a181-4386-aaeb-8d6a4c72fe75] received
[2024-09-09 10:21:28,735: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 158, in __call__
retval = fun(*final_args, **final_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 709, in _restore_prefetch_count_after_connection_restart
self.qos.set(self.qos.value)
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/common.py", line 439, in set
self.callback(prefetch_count=new_value)
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/tasks.py", line 56, in set_prefetch_count
return c.task_consumer.qos(
^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/messaging.py", line 585, in qos
return self.channel.basic_qos(prefetch_size,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/channel.py", line 1894, in basic_qos
return self.send_method(
^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 79, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 99, in wait
self.connection.drain_events(timeout=timeout)
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 666, in recv
self._wait(self._read_event)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
[2024-09-09 10:21:28,736: WARNING/MainProcess] gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffffb2338040>>
[2024-09-09 10:21:28,736: WARNING/MainProcess]
During handling of the above exception, another exception occurred:
[2024-09-09 10:21:28,736: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/greenlet.py", line 908, in gevent._gevent_cgreenlet.Greenlet.run
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/concurrency/gevent.py", line 23, in apply_target
return base.apply_target(target, args, kwargs, callback, accept_callback,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/concurrency/base.py", line 28, in apply_target
accept_callback(pid or getpid(), monotonic())
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/request.py", line 514, in on_accepted
self.acknowledge()
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/request.py", line 649, in acknowledge
self._on_ack(logger, self._connection_errors)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 168, in __call__
svpending(*ca, **ck)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 161, in __call__
return self.throw()
^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 217, in throw
self.throw1(exc)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 210, in throw1
self.on_error(*self.args + (exc,), **self.kwargs)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 709, in _restore_prefetch_count_after_connection_restart
self.qos.set(self.qos.value)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/common.py", line 439, in set
self.callback(prefetch_count=new_value)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/tasks.py", line 56, in set_prefetch_count
return c.task_consumer.qos(
^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/kombu/messaging.py", line 585, in qos
return self.channel.basic_qos(prefetch_size,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/channel.py", line 1894, in basic_qos
return self.send_method(
^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 79, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 99, in wait
self.connection.drain_events(timeout=timeout)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 666, in recv
self._wait(self._read_event)
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess] File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
[2024-09-09 10:21:28,736: WARNING/MainProcess] gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffffb2338040>>
[2024-09-09 10:21:28,736: WARNING/MainProcess] 2024-09-09T10:21:28Z
[2024-09-09 10:21:28,736: WARNING/MainProcess]
[2024-09-09 10:21:28,737: WARNING/MainProcess] <Greenlet at 0xffffb2307a60: apply_target(<function TaskPool._make_killable_target.<locals>., ('tests.vendors.workers.tasks.identity', 'f9818c20, {}, <bound method create_request_cls.<locals>.Request., <bound method Request.on_accepted of <Request: tes, <function TaskPool.__init__.<locals>.<lambda> at 0, timeout=None, timeout_callback=<bound method Request.on_timeout of <Request: test)> failed with ConcurrentObjectUseError
[2024-09-09 10:21:28,738: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[2f2315d4-930b-4cb1-96fc-0ceee484f503] succeeded in 0.009394500000013295s: 'Hello, '
[2024-09-09 10:21:28,741: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno -2] Name or service not known.
Will retry using next failover.
[2024-09-09 10:21:28,743: INFO/MainProcess] pidbox: Connected to amqp://guest:**@c3e0fa11984c:5672//.
[2024-09-09 10:21:31,716: INFO/MainProcess] sync with celery_test_worker@3b2b8e57edab |
To also reduce false negative CI failures