-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
broadcast.go
120 lines (94 loc) · 2.14 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright 2020 Clivern. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package cluster
import (
"encoding/json"
"fmt"
"github.com/hashicorp/memberlist"
)
// Broadcast struct
type Broadcast struct {
msg []byte
notify chan<- struct{}
}
// Invalidates ..
func (b *Broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
// Message ..
func (b *Broadcast) Message() []byte {
return b.msg
}
// Finished ..
func (b *Broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}
// Delegate struct
type Delegate struct {
State []byte
Broadcasts *memberlist.TransmitLimitedQueue
Cluster *Cluster
}
// NotifyMsg ..
func (d *Delegate) NotifyMsg(msg []byte) {
fmt.Printf(" === Received Broadcast of Remote State %s === \n", string(msg))
d.State = msg
}
// NodeMeta ..
func (d *Delegate) NodeMeta(limit int) []byte {
return []byte{}
}
// LocalState ..
func (d *Delegate) LocalState(join bool) []byte {
fmt.Println(" === Sharing Remote State for push/pull sync === ")
return d.State
}
// GetBroadcasts ..
func (d *Delegate) GetBroadcasts(overhead, limit int) [][]byte {
return d.Broadcasts.GetBroadcasts(overhead, limit)
}
// MergeRemoteState ..
func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
fmt.Printf(" === Merging Remote State %s for push/pull sync === \n", string(buf))
d.State = buf
}
// SetCluster ..
func (d *Delegate) SetCluster(cluster *Cluster) {
d.Cluster = cluster
d.Broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return d.Cluster.Memlist.NumMembers()
},
RetransmitMult: 3,
}
}
// UpdateState ..
func (d *Delegate) UpdateState(data []byte) {
d.Broadcasts.QueueBroadcast(&Broadcast{
msg: data,
notify: nil,
})
}
// Message struct
type Message struct {
Key string `json:"key"`
Value string `json:"value"`
}
// Bytes ..
func (m *Message) Bytes() []byte {
data, err := json.Marshal(m)
if err != nil {
return []byte("")
}
return data
}
// Load ..
func (m *Message) Load(data []byte) error {
if err := json.Unmarshal(data, m); err != nil {
return err
}
return nil
}