Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] subscribe channel does not receive message, even after a barrier among publisher and subscriber #4713

Open
youkaichao opened this issue Jul 24, 2024 · 4 comments

Comments

@youkaichao
Copy link

Please use this template for reporting suspected bugs or requests for help.

Issue description

Hi, team, thanks for the great project! I'm using zmq for broadcasting messages, in the vLLM project. And I encountered some bugs, that I think might be related with zmq.

Environment

  • libzmq version (commit hash if unreleased): pyzmq 26.0.3
  • OS: linux

Minimal test code / Steps to reproduce the issue

# test.py

from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT  # type: ignore

import torch

torch.distributed.init_process_group(backend="gloo")

rank = torch.distributed.get_rank()

world_size = torch.distributed.get_world_size()

context = Context()

if rank == 0:

    local_socket = context.socket(PUB)
    # bind to a random port
    local_socket.bind("tcp://*:*")
    local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    local_sync_socket = context.socket(REP)
    local_sync_socket.bind("tcp://*:*")
    local_sync_socket_port = local_sync_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    torch.distributed.broadcast_object_list([local_socket_port, local_sync_socket_port], src=0)

    # local readers
    for i in range(world_size - 1):
        recv = local_sync_socket.recv()
        assert recv == b"READY"
        local_sync_socket.send(b"READY")
    
    local_socket.send(b"READY")

    local_socket.send(b"data")

else:
    data = [None, None]
    torch.distributed.broadcast_object_list(data, src=0)
    local_socket_port, local_sync_socket_port = data

    local_socket = context.socket(SUB)
    local_socket.connect(f"tcp://localhost:{local_socket_port}")
    local_socket.setsockopt_string(SUBSCRIBE, "")

    local_sync_socket = context.socket(REQ)
    local_sync_socket.connect(f"tcp://localhost:{local_sync_socket_port}")

    local_sync_socket.send(b"READY")
    assert local_sync_socket.recv() == b"READY"

    assert local_socket.recv() == b"READY"
    assert local_socket.recv() == b"data"

run the code for about 100 times:

success_count=0
for ((i=1; i<=100; i++)); do
  torchrun --nproc-per-node=8 test.py && ((success_count++)) && echo "Success count: $success_count"
done

About once in 20~50 runs, it will hang. The reason is, even if I put a barrier for the publisher and all subscriber, some subscribers still don't get the message. Therefore, they are waiting forever at assert local_socket.recv() == b"READY"

I'm following https://zguide.zeromq.org/docs/chapter2/#Handling-Multiple-Sockets , to add a synchronization point before I publish anything. It works for most of the time. But sometimes it will fail, i.e. publish message before all subscriber are ready to subscribe the message.

I find adding a time.sleep(1) before local_socket.send(b"READY") helps, but that's not an elegant solution.

Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?

Thanks for the great project, and look forward to the solution!

@bluca bluca transferred this issue from zeromq/libzmq Jul 24, 2024
@jamesdillonharvey
Copy link
Contributor

"Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?"

You can use XPUB rather than PUB on your local_socket, then poll the local_socket for incoming subscription messages. Each recv on local_socket will contain a subscription message that starts with 1 followed by the subscription string. You can then count the number of subscribers connected and send once you reach the required number.

@youkaichao
Copy link
Author

Thanks for your quick response!

using XPUB seems to work. Is the subscription message guarenteed to be b'\x01'? This is what I get from executing the code, but I can't find documentation about it.

@youkaichao
Copy link
Author

Code using XPUB (so that I don't need an additional sync socket any more):

from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT, XPUB, XPUB_VERBOSE  # type: ignore

import torch

torch.distributed.init_process_group(backend="gloo")

rank = torch.distributed.get_rank()

world_size = torch.distributed.get_world_size()

context = Context()

if rank == 0:

    local_socket = context.socket(XPUB)
    local_socket.setsockopt(XPUB_VERBOSE, True)
    # bind to a random port
    local_socket.bind("tcp://*:*")
    local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    torch.distributed.broadcast_object_list([local_socket_port], src=0)

    # local readers
    for i in range(world_size - 1):
        local_socket.recv()

    local_socket.send(b"READY")

    local_socket.send(b"data")

else:
    data = [None]
    torch.distributed.broadcast_object_list(data, src=0)
    local_socket_port, = data

    local_socket = context.socket(SUB)
    local_socket.connect(f"tcp://localhost:{local_socket_port}")
    local_socket.setsockopt_string(SUBSCRIBE, "")

    assert local_socket.recv() == b"READY"
    assert local_socket.recv() == b"data"

@youkaichao
Copy link
Author

Just to confirm, @jamesdillonharvey is this issue a bug? I mean, when I use local sync sockets to make sure all subscribers joined, some subscribers are still not ready to receive the message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants