Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the hybrid_setup example #400

Closed
wants to merge 3 commits into from

Conversation

Nusnus
Copy link
Member

@Nusnus Nusnus commented Sep 9, 2024

To also reduce false negative CI failures

@Nusnus Nusnus added the examples Pytest Celery examples label Sep 9, 2024
@Nusnus Nusnus self-assigned this Sep 9, 2024
Copy link

codecov bot commented Sep 9, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 24.02%. Comparing base (4775c23) to head (b57db12).
Report is 33 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #400   +/-   ##
=======================================
  Coverage   24.02%   24.02%           
=======================================
  Files          41       41           
  Lines        1286     1286           
  Branches      254      254           
=======================================
  Hits          309      309           
  Misses        949      949           
  Partials       28       28           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Nusnus
Copy link
Member Author

Nusnus commented Sep 9, 2024

It appears there’s a potential real bug with Celery using a failover broker where the worker spits out gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffffb2338040>> after the first broker dies and the failover broker connects.

---------------------------------------------------------------------------- gevent_worker_container: awesome_gagarin -----------------------------------------------------------------------------
Worker init handler called!

 -------------- celery_test_worker@be92e51fcaac v5.5.0b3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.10.4-linuxkit-aarch64-with-glibc2.36 2024-09-09 10:21:23
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_test_app:0xffffb3e11090
- ** ---------- .> transport:   amqp://guest:**@7c93855c3ebd:5672//
- ** ---------- .> results:     redis://6fad1e8a1d98/0
- *** --- * --- .> concurrency: 42 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . pytest_celery.vendors.worker.tasks.add
  . pytest_celery.vendors.worker.tasks.add_replaced
  . pytest_celery.vendors.worker.tasks.fail
  . pytest_celery.vendors.worker.tasks.identity
  . pytest_celery.vendors.worker.tasks.noop
  . pytest_celery.vendors.worker.tasks.ping
  . pytest_celery.vendors.worker.tasks.sleep
  . pytest_celery.vendors.worker.tasks.xsum
  . tests.vendors.workers.tasks.identity
  . tests.vendors.workers.tasks.job
  . tests.vendors.workers.tasks.noop

[2024-09-09 10:21:23,734: INFO/MainProcess] Connected to amqp://guest:**@7c93855c3ebd:5672//
[2024-09-09 10:21:23,736: INFO/MainProcess] mingle: searching for neighbors
[2024-09-09 10:21:24,755: INFO/MainProcess] mingle: all alone
[2024-09-09 10:21:24,765: INFO/MainProcess] pidbox: Connected to amqp://guest:**@7c93855c3ebd:5672//.
[2024-09-09 10:21:24,770: INFO/MainProcess] celery_test_worker@be92e51fcaac ready.
[2024-09-09 10:21:26,019: INFO/MainProcess] sync with celery_test_worker@3b2b8e57edab
[2024-09-09 10:21:27,670: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 759, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.11/site-packages/celery/worker/loops.py", line 130, in synloop
    connection.drain_events(timeout=2.0)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events
    return connection.drain_events(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
    frame = self.transport.read_frame()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
    frame_header = read(7, True)
                   ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
    s = recv(n - len(rbuf))
        ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 660, in recv
    return self._sock.recv(*args)
           ^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer
[2024-09-09 10:21:27,672: WARNING/MainProcess] /usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.

  warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)

[2024-09-09 10:21:27,672: INFO/MainProcess] Temporarily reducing the prefetch count to 4 to avoid over-fetching since 0 tasks are currently being processed.
The prefetch count will be gradually restored to 0 as the tasks complete processing.
[2024-09-09 10:21:27,672: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "src/gevent/_waiter.py", line 122, in gevent._gevent_c_waiter.Waiter.switch
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/pidbox.py", line 118, in loop
    connection.drain_events(timeout=1.0)
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events
    return connection.drain_events(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,672: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
    frame = self.transport.read_frame()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
    frame_header = read(7, True)
                   ^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
    s = recv(n - len(rbuf))
        ^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 660, in recv
    return self._sock.recv(*args)
           ^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:27,673: WARNING/MainProcess] ConnectionResetError: [Errno 104] Connection reset by peer
[2024-09-09 10:21:27,673: WARNING/MainProcess] 2024-09-09T10:21:27Z
[2024-09-09 10:21:27,673: WARNING/MainProcess]
[2024-09-09 10:21:27,673: WARNING/MainProcess] <built-in method switch of gevent._gevent_c_greenlet_primitives.TrackedRawGreenlet object at 0xffffb2c92000> failed with ConnectionResetError
[2024-09-09 10:21:27,674: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2024-09-09 10:21:27,677: INFO/MainProcess] Connected to amqp://guest:**@c3e0fa11984c:5672//
[2024-09-09 10:21:27,678: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2024-09-09 10:21:27,683: INFO/MainProcess] mingle: searching for neighbors
[2024-09-09 10:21:28,708: INFO/MainProcess] mingle: all alone
[2024-09-09 10:21:28,720: INFO/MainProcess] Task tests.vendors.workers.tasks.job[b50e6d29-75f1-4130-8e30-bdac7f6a4ab1] received
[2024-09-09 10:21:28,728: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[2f2315d4-930b-4cb1-96fc-0ceee484f503] received
[2024-09-09 10:21:28,732: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[f9818c20-a181-4386-aaeb-8d6a4c72fe75] received
[2024-09-09 10:21:28,735: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 158, in __call__
    retval = fun(*final_args, **final_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 709, in _restore_prefetch_count_after_connection_restart
    self.qos.set(self.qos.value)
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/common.py", line 439, in set
    self.callback(prefetch_count=new_value)
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/tasks.py", line 56, in set_prefetch_count
    return c.task_consumer.qos(
           ^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/messaging.py", line 585, in qos
    return self.channel.basic_qos(prefetch_size,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/channel.py", line 1894, in basic_qos
    return self.send_method(
           ^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 79, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
    frame = self.transport.read_frame()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
    frame_header = read(7, True)
                   ^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
    s = recv(n - len(rbuf))
        ^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,735: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 666, in recv
    self._wait(self._read_event)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
[2024-09-09 10:21:28,736: WARNING/MainProcess] gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffffb2338040>>
[2024-09-09 10:21:28,736: WARNING/MainProcess]
During handling of the above exception, another exception occurred:
[2024-09-09 10:21:28,736: WARNING/MainProcess] Traceback (most recent call last):
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/greenlet.py", line 908, in gevent._gevent_cgreenlet.Greenlet.run
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/concurrency/gevent.py", line 23, in apply_target
    return base.apply_target(target, args, kwargs, callback, accept_callback,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/concurrency/base.py", line 28, in apply_target
    accept_callback(pid or getpid(), monotonic())
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/request.py", line 514, in on_accepted
    self.acknowledge()
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/request.py", line 649, in acknowledge
    self._on_ack(logger, self._connection_errors)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 168, in __call__
    svpending(*ca, **ck)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 161, in __call__
    return self.throw()
           ^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 217, in throw
    self.throw1(exc)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/vine/promises.py", line 210, in throw1
    self.on_error(*self.args + (exc,), **self.kwargs)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 709, in _restore_prefetch_count_after_connection_restart
    self.qos.set(self.qos.value)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/common.py", line 439, in set
    self.callback(prefetch_count=new_value)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/tasks.py", line 56, in set_prefetch_count
    return c.task_consumer.qos(
           ^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/kombu/messaging.py", line 585, in qos
    return self.channel.basic_qos(prefetch_size,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/channel.py", line 1894, in basic_qos
    return self.send_method(
           ^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 79, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
    frame = self.transport.read_frame()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
    frame_header = read(7, True)
                   ^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
    s = recv(n - len(rbuf))
        ^^^^^^^^^^^^^^^^^^^
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "/usr/local/lib/python3.11/site-packages/gevent/_socketcommon.py", line 666, in recv
    self._wait(self._read_event)
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
[2024-09-09 10:21:28,736: WARNING/MainProcess]   File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
[2024-09-09 10:21:28,736: WARNING/MainProcess] gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffffb2338040>>
[2024-09-09 10:21:28,736: WARNING/MainProcess] 2024-09-09T10:21:28Z
[2024-09-09 10:21:28,736: WARNING/MainProcess]
[2024-09-09 10:21:28,737: WARNING/MainProcess] <Greenlet at 0xffffb2307a60: apply_target(<function TaskPool._make_killable_target.<locals>., ('tests.vendors.workers.tasks.identity', 'f9818c20, {}, <bound method create_request_cls.<locals>.Request., <bound method Request.on_accepted of <Request: tes, <function TaskPool.__init__.<locals>.<lambda> at 0, timeout=None, timeout_callback=<bound method Request.on_timeout of <Request: test)> failed with ConcurrentObjectUseError
[2024-09-09 10:21:28,738: INFO/MainProcess] Task tests.vendors.workers.tasks.identity[2f2315d4-930b-4cb1-96fc-0ceee484f503] succeeded in 0.009394500000013295s: 'Hello, '
[2024-09-09 10:21:28,741: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@7c93855c3ebd:5672//: [Errno -2] Name or service not known.
Will retry using next failover.

[2024-09-09 10:21:28,743: INFO/MainProcess] pidbox: Connected to amqp://guest:**@c3e0fa11984c:5672//.
[2024-09-09 10:21:31,716: INFO/MainProcess] sync with celery_test_worker@3b2b8e57edab

@Nusnus Nusnus closed this Sep 9, 2024
@Nusnus Nusnus deleted the hybrid_setup branch September 9, 2024 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
examples Pytest Celery examples
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant