This repository has been archived by the owner on Sep 15, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 143
/
kad_rpc.go
261 lines (225 loc) · 7.15 KB
/
kad_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright (C) 2013-2018, The MetaCurrency Project (Eric Harris-Braun, Arthur Brock, et. al.)
// Use of this source code is governed by GPLv3 found in the LICENSE file
//
// This code is adapted from the libp2p project, specifically:
// https://github.com/libp2p/go-libp2p-kad-dht/routing.go
// The ipfs use of kademlia is substantially different than that needed by holochain so we remove
// parts we don't need and add others, also we have do our message wire-formats and encoding
// differently, so our RPC handlers are need to be different.
package holochain
import (
"context"
"errors"
"fmt"
. "github.com/holochain/holochain-proto/hash"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
_ "sync"
_ "time"
)
var ErrDHTUnexpectedTypeInBody error = errors.New("unexpected type in message body")
type FindNodeReq struct {
H Hash
}
// an encodable version of pstore.PeerInfo which gob doesn't like
// also libp2p encodes other stuff like connection type into this
// which we may have to do too.
type PeerInfo struct {
ID []byte // byte version peer.ID
Addrs [][]byte // byte version of multiaddrs
}
type CloserPeersResp struct {
CloserPeers []PeerInfo // note this is not a pstore.PeerInfo which can't be serialized by gob.
}
// The number of closer peers to send on requests.
var CloserPeerCount = KValue
// FindLocal looks for a peer with a given ID connected to this node and returns its peer info
func (node *Node) FindLocal(id peer.ID) pstore.PeerInfo {
p := node.routingTable.Find(id)
if p != "" {
return node.peerstore.PeerInfo(p)
}
return pstore.PeerInfo{}
}
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is and respond with
// any closer peers if not.
func (node *Node) findPeerSingle(ctx context.Context, p peer.ID, hash Hash) (closerPeers []*pstore.PeerInfo, err error) {
node.log.Logf("Sending FIND_NODE_REQUEST to %v for hash: %v\n", p, hash)
pmes := node.NewMessage(FIND_NODE_REQUEST, FindNodeReq{H: hash})
var resp Message
resp, err = node.Send(ctx, KademliaProtocol, p, pmes)
if err != nil {
return
}
response, ok := resp.Body.(CloserPeersResp)
if !ok {
err = ErrDHTUnexpectedTypeInBody
return
}
closerPeers = peerInfos2Pis(response.CloserPeers)
return
}
// nearestPeersToHash returns the routing tables closest peers to a given hash
func (node *Node) nearestPeersToHash(hash *Hash, count int) []peer.ID {
// fmt.Printf("%v NearestPeers to %s: ", node.HashAddr.Pretty()[2:4], hash.String())
closer := node.routingTable.NearestPeers(*hash, count)
return closer
}
// betterPeersForHash returns nearestPeersToHash, but iff closer than self.
func (node *Node) betterPeersForHash(hash *Hash, p peer.ID, excludeSelf bool, count int) []peer.ID {
closer := node.nearestPeersToHash(hash, count)
// no node? nil
if closer == nil {
node.log.Logf("no closer peers to send to %v", p)
return nil
}
if excludeSelf {
two := []peer.ID{node.HashAddr, closer[0]}
two = SortClosestPeers(two, *hash)
if two[0] == node.HashAddr {
return nil
}
}
var filtered []peer.ID
for _, clp := range closer {
// == to self? thats bad
if clp == node.HashAddr {
Debug("attempted to return self! this shouldn't happen...")
return nil
}
// Dont send a peer back themselves
if clp == p {
continue
}
filtered = append(filtered, clp)
}
// ok seems like closer nodes
return filtered
}
// FindPeer searches for a peer with given ID.
// it is also an implementation the FindPeer() method of the RoutedHost interface in go-libp2p/p2p/host/routed
// and makes the Node object the "Router"
func (node *Node) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
// Check if were already connected to them
if pi := node.FindLocal(id); pi.ID != "" {
return pi, nil
}
hashID := HashFromPeerID(id)
peers := node.routingTable.NearestPeers(hashID, AlphaValue)
if len(peers) == 0 {
return pstore.PeerInfo{}, ErrEmptyRoutingTable
}
// Sanity...
for _, p := range peers {
if p == id {
node.log.Logf("found target peer in list of closest peers...")
return node.peerstore.PeerInfo(p), nil
}
}
// setup the Query
query := node.newQuery(hashID, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
/* notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})*/
closerPeers, err := node.findPeerSingle(ctx, p, hashID)
if err != nil {
return nil, err
}
// see if we got the peer here
for _, npi := range closerPeers {
if npi.ID == id {
return &dhtQueryResult{
peer: npi,
success: true,
}, nil
}
}
/* notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: clpeerInfos,
})*/
return &dhtQueryResult{closerPeers: closerPeers}, nil
})
// run it!
result, err := query.Run(ctx, peers)
if err != nil {
return pstore.PeerInfo{}, err
}
node.log.Logf("FindPeer %v %v", id, result.success)
if result.peer.ID == "" {
return pstore.PeerInfo{}, ErrHashNotFound
}
return *result.peer, nil
}
// KademliaReceiver implements the handler for the kademlia RPC protocol messages
func KademliaReceiver(h *Holochain, m *Message) (response interface{}, err error) {
dht := h.dht
node := h.node
switch m.Type {
case FIND_NODE_REQUEST:
dht.dlog.Logf("KademliaReceiver got: %v", m)
switch t := m.Body.(type) {
case FindNodeReq:
p := m.From
var closest []peer.ID
resp := CloserPeersResp{}
// if looking for self... special case where we send it on CloserPeers.
x := HashFromPeerID(node.HashAddr)
if x.Equal(t.H) {
closest = []peer.ID{node.HashAddr}
} else {
closest = node.betterPeersForHash(&t.H, p, false, CloserPeerCount)
}
if closest == nil {
dht.dlog.Logf("could not find any peers")
return &resp, nil
}
resp.CloserPeers = node.peers2PeerInfos(closest)
response = &resp
default:
err = ErrDHTUnexpectedTypeInBody
}
default:
err = fmt.Errorf("message type %d not in holochain-kademlia protocol", int(m.Type))
}
return
}
// PI2PeerInfos convert the closest PeerInfos to a serializable type and gets their addrs from the peerstore
func (node *Node) peers2PeerInfos(peers []peer.ID) []PeerInfo {
var pis []PeerInfo
infos := pstore.PeerInfos(node.peerstore, peers)
for _, pi := range infos {
if len(pi.Addrs) > 0 {
addrs := make([][]byte, len(pi.Addrs))
for i, a := range pi.Addrs {
addrs[i] = a.Bytes()
}
pis = append(pis, PeerInfo{ID: []byte(pi.ID), Addrs: addrs})
}
}
return pis
}
// PeerInfos2Pis convert a list of PeerInfo structs to a list of pstore.PeerInfo
func peerInfos2Pis(peerInfos []PeerInfo) []*pstore.PeerInfo {
pis := make([]*pstore.PeerInfo, 0, len(peerInfos))
for _, pi := range peerInfos {
peerInfo := pstore.PeerInfo{ID: peer.ID(pi.ID)}
if len(pi.Addrs) > 0 {
maddrs := make([]ma.Multiaddr, 0, len(pi.Addrs))
for _, addr := range pi.Addrs {
maddr, err := ma.NewMultiaddrBytes(addr)
if err != nil {
Infof("error decoding Multiaddr for peer: %s", peerInfo.ID)
continue
}
maddrs = append(maddrs, maddr)
}
peerInfo.Addrs = maddrs
}
pis = append(pis, &peerInfo)
}
return pis
}