Skip to content

Commit

Permalink
Merge pull request #5083 from oasisprotocol/ptrus/feature/p2p-mgr-upd…
Browse files Browse the repository at this point in the history
…ates

go/p2p/PeerManager: support subscribing to peer updates
  • Loading branch information
ptrus authored Dec 5, 2022
2 parents 8a08747 + 076b4d8 commit 7f38a06
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .changelog/5083.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/p2p/PeerManager: enable subscribing to peer updates

Adds `WatchUpdates` method to the `PeerManager` which allows subscribing to
peer updates (peers being added or removed).
15 changes: 12 additions & 3 deletions go/p2p/rpc/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/oasisprotocol/oasis-core/go/common/pubsub"
)

var errUnsupported = fmt.Errorf("unsupported: p2p is disabled")

type nopPeerManager struct{}

// Implements PeerManager.
Expand Down Expand Up @@ -35,6 +39,11 @@ func (*nopPeerManager) RecordSuccess(peerID peer.ID, latency time.Duration) {
func (*nopPeerManager) RemovePeer(peerID peer.ID) {
}

// Implements PeersUpdates.
func (*nopPeerManager) WatchUpdates() (<-chan *PeerUpdate, pubsub.ClosableSubscription, error) {
return nil, nil, errUnsupported
}

type nopClient struct{}

// Implements Client.
Expand All @@ -45,7 +54,7 @@ func (c *nopClient) Call(
body, rsp interface{},
opts ...CallOption,
) (PeerFeedback, error) {
return nil, fmt.Errorf("unsupported: p2p is disabled")
return nil, errUnsupported
}

// Implements Client.
Expand All @@ -56,7 +65,7 @@ func (c *nopClient) CallOne(
body, rsp interface{},
opts ...CallOption,
) (PeerFeedback, error) {
return nil, fmt.Errorf("unsupported: p2p is disabled")
return nil, errUnsupported
}

// Implements Client.
Expand All @@ -67,7 +76,7 @@ func (c *nopClient) CallMulti(
body, rspTyp interface{},
opts ...CallMultiOption,
) ([]interface{}, []PeerFeedback, error) {
return nil, nil, fmt.Errorf("unsupported: p2p is disabled")
return nil, nil, errUnsupported
}

// Implements Client.
Expand Down
55 changes: 47 additions & 8 deletions go/p2p/rpc/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
)

const (
Expand Down Expand Up @@ -115,6 +116,26 @@ type PeerManager interface {
// GetBestPeers returns a set of peers sorted by the probability that they will be able to
// answer our requests the fastest with some randomization.
GetBestPeers(opts ...BestPeersOption) []core.PeerID

// WatchUpdates returns a channel that produces a stream of messages on peer updates.
WatchUpdates() (<-chan *PeerUpdate, pubsub.ClosableSubscription, error)
}

// PeerUpdate is a peer update event.
type PeerUpdate struct {
ID core.PeerID

PeerAdded *PeerAdded
PeerRemoved *PeerRemoved
}

// PeerAdded is an event emitted when a new peer is added.
type PeerAdded struct{}

// PeerRemoved is an event emitted when a peer is removed.
type PeerRemoved struct {
// BadPeer indicates that the peer was removed due to being recorded as a bad peer.
BadPeer bool
}

type peerStats struct {
Expand Down Expand Up @@ -150,8 +171,9 @@ type peerManager struct {
host core.Host
protocolID protocol.ID

peers map[core.PeerID]*peerStats
ignoredPeers map[core.PeerID]bool
peerUpdatesNotifier *pubsub.Broker
peers map[core.PeerID]*peerStats
ignoredPeers map[core.PeerID]bool

stickyPeer core.PeerID

Expand Down Expand Up @@ -180,6 +202,7 @@ func (mgr *peerManager) AddPeer(peerID core.PeerID) {
"peer_id", peerID,
"num_peers", len(mgr.peers),
)
mgr.peerUpdatesNotifier.Broadcast(&PeerUpdate{ID: peerID, PeerAdded: &PeerAdded{}})
}

func (mgr *peerManager) RemovePeer(peerID core.PeerID) {
Expand All @@ -196,6 +219,7 @@ func (mgr *peerManager) RemovePeer(peerID core.PeerID) {
"peer_id", peerID,
"num_peers", len(mgr.peers),
)
mgr.peerUpdatesNotifier.Broadcast(&PeerUpdate{ID: peerID, PeerRemoved: &PeerRemoved{}})
}

func (mgr *peerManager) RecordSuccess(peerID core.PeerID, latency time.Duration) {
Expand Down Expand Up @@ -244,8 +268,14 @@ func (mgr *peerManager) RecordBadPeer(peerID core.PeerID) {

mgr.p2p.BlockPeer(peerID)
mgr.ignoredPeers[peerID] = true

if _, exists := mgr.peers[peerID]; !exists {
return
}

delete(mgr.peers, peerID)
mgr.unstickPeerLocked(peerID)
mgr.peerUpdatesNotifier.Broadcast(&PeerUpdate{ID: peerID, PeerRemoved: &PeerRemoved{BadPeer: true}})
}

func (mgr *peerManager) unstickPeerLocked(peerID core.PeerID) {
Expand All @@ -258,6 +288,14 @@ func (mgr *peerManager) unstickPeerLocked(peerID core.PeerID) {
}
}

func (mgr *peerManager) WatchUpdates() (<-chan *PeerUpdate, pubsub.ClosableSubscription, error) {
typedCh := make(chan *PeerUpdate)
sub := mgr.peerUpdatesNotifier.Subscribe()
sub.Unwrap(typedCh)

return typedCh, sub, nil
}

func (mgr *peerManager) GetBestPeers(opts ...BestPeersOption) []core.PeerID {
mgr.Lock()
defer mgr.Unlock()
Expand Down Expand Up @@ -414,12 +452,13 @@ func NewPeerManager(p2p P2P, protocolID protocol.ID, opts ...PeerManagerOption)
}

mgr := &peerManager{
p2p: p2p,
host: p2p.Host(),
protocolID: protocolID,
peers: make(map[core.PeerID]*peerStats),
ignoredPeers: make(map[core.PeerID]bool),
opts: &pmo,
p2p: p2p,
host: p2p.Host(),
protocolID: protocolID,
peerUpdatesNotifier: pubsub.NewBroker(false),
peers: make(map[core.PeerID]*peerStats),
ignoredPeers: make(map[core.PeerID]bool),
opts: &pmo,
logger: logging.GetLogger("p2p/rpc/peermgr").With(
"protocol_id", protocolID,
),
Expand Down
93 changes: 93 additions & 0 deletions go/p2p/rpc/peermgmt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package rpc

import (
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

type testP2P struct {
host host.Host
}

// BlockPeer implements P2P.
func (*testP2P) BlockPeer(peerID peer.ID) {
}

// Host implements P2P.
func (t *testP2P) Host() host.Host {
return t.host
}

// RegisterProtocol implements P2P.
func (*testP2P) RegisterProtocol(p protocol.ID, min int, total int) {
}

func TestWatchUpdates(t *testing.T) {
require := require.New(t)

// Prepare a p2p host.
listenAddr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
require.NoError(err, "NewMultiaddr failed")
host, err := libp2p.New(
libp2p.ListenAddrs(listenAddr),
)
require.NoError(err, "libp2p.New failed")
defer host.Close()

peerMgr := NewPeerManager(&testP2P{host}, testProtocol)

ch, sub, err := peerMgr.WatchUpdates()
require.NoError(err, "WatchUpdates")
defer sub.Close()

// No events expected.
select {
case ev := <-ch:
t.Fatalf("received unexpected event: %+v", ev)
case <-time.After(100 * time.Millisecond):
}

peer1, peer2, peer3 := core.PeerID("peer-1"), core.PeerID("peer-2"), core.PeerID("peer-3")

// Add/remove peers.
peerMgr.AddPeer(peer1)
peerMgr.AddPeer(peer2)
peerMgr.RecordBadPeer(peer1)
peerMgr.RecordBadPeer(peer1)
peerMgr.AddPeer(peer3)
peerMgr.AddPeer(peer3)
peerMgr.RemovePeer(peer2)
peerMgr.RemovePeer(peer2)

// Ensure expected events are received.
expectedEvents := []*PeerUpdate{
{ID: peer1, PeerAdded: &PeerAdded{}},
{ID: peer2, PeerAdded: &PeerAdded{}},
{ID: peer1, PeerRemoved: &PeerRemoved{BadPeer: true}},
{ID: peer3, PeerAdded: &PeerAdded{}},
{ID: peer2, PeerRemoved: &PeerRemoved{}},
}
for _, next := range expectedEvents {
select {
case ev := <-ch:
require.Equal(next, ev, "should receive expected event")
case <-time.After(2 * time.Second):
t.Fatalf("failed to receive expected event: %+v", next)
}
}

// No more events expected.
select {
case ev := <-ch:
t.Fatalf("received unexpected event: %+v", ev)
case <-time.After(100 * time.Millisecond):
}
}

0 comments on commit 7f38a06

Please sign in to comment.