-
Notifications
You must be signed in to change notification settings - Fork 0
/
proposer.go
141 lines (129 loc) · 3.45 KB
/
proposer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package paxos
import (
"github.com/sirupsen/logrus"
)
type Proposer struct {
id int
seq int
proposalNumber int
proposalValue string
acceptors map[int]messageData
node PaxosNode
}
func NewProposer(id int, value string, node PaxosNode, acceptors ...int) *Proposer{
newProposer := Proposer{
id: id,
seq: 0,
proposalValue: value,
node: node,
}
newProposer.acceptors = make(map[int]messageData, len(acceptors))
for _, acceptor := range acceptors {
newProposer.acceptors[acceptor] = messageData{}
}
return &newProposer
}
func (p *Proposer) majority() int {
return len(p.acceptors)/2 + 1
}
func (p *Proposer) getProposerNumber() int {
p.proposalNumber = p.seq << 4 | p.id
return p.proposalNumber
}
func (p *Proposer) getPromiseCount() int {
promiseCount := 0
for _, message := range p.acceptors {
logrus.WithFields(logrus.Fields{
"Proposer ID": p.id,
"Acceptor Count": len(p.acceptors),
"Current Proposal Number": p.getProposerNumber(),
"Message Sequence Number": message.getMessageNumber(),
}).Info("Proposer information")
if message.getMessageNumber() == p.getProposerNumber() {
promiseCount+=1
}
}
return promiseCount
}
// consistency quorum
func (p *Proposer) reachedMajority() bool {
return p.getPromiseCount() > p.majority()
}
// send prepare message to the majority of acceptors
func (p *Proposer) prepare() []messageData {
p.seq+=1
sentCount := 0
var messageList []messageData
for acceptorID, _ := range p.acceptors {
message := messageData{
messageSender: p.id,
messageRecipient: acceptorID,
messageCategory: PrepareMessage,
messageNumber: p.getProposerNumber(),
value: p.proposalValue,
}
messageList = append(messageList, message)
sentCount+=1
if sentCount > p.majority() {
break
}
}
return messageList
}
// send propose message to the majority of acceptors
func (p *Proposer) propose() []messageData {
sentCount := 0
var messageList []messageData
for acceptorID, acceptorMessage := range p.acceptors {
if acceptorMessage.getMessageNumber() == p.getProposerNumber() {
message := messageData{
messageSender: p.id,
messageRecipient: acceptorID,
messageCategory: ProposeMessage,
messageNumber: p.getProposerNumber(),
value: p.proposalValue,
}
messageList = append(messageList, message)
sentCount += 1
}
if sentCount > p.majority() {
break
}
}
return messageList
}
func (p *Proposer) receivePromise(promiseMessage messageData) {
promise := p.acceptors[promiseMessage.messageSender]
if promise.getMessageNumber() < promiseMessage.getMessageNumber() {
p.acceptors[promiseMessage.messageSender] = promiseMessage
if promiseMessage.getMessageNumber() > p.getProposerNumber() {
p.proposalNumber = promiseMessage.getMessageNumber()
p.proposalValue = promiseMessage.getProposalValue()
}
}
}
func (p *Proposer) Run() {
for !p.reachedMajority() {
messageList := p.prepare()
for _, message := range messageList {
p.node.send(message)
}
msg := p.node.receive()
if msg==nil {
continue
}
msg.printMessage("Proposer received message")
if msg.messageCategory == AckMessage {
logrus.Infof("Ack message received from %d", msg.messageSender)
p.receivePromise(*msg)
}else {
logrus.Fatal("Unsupported Message format")
}
}
// Majority has been reached
// Proposer now sends message to the acceptor
proposerMessageList := p.propose()
for _, message := range proposerMessageList {
p.node.send(message)
}
}