forked from ctiom/kitchen
-
Notifications
You must be signed in to change notification settings - Fork 10
/
manager.go
154 lines (143 loc) · 4.04 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package kitchen
import (
"context"
"github.com/go-preform/kitchen/delivery"
"sync"
)
// A Manager is a struct for managing menus for scaling.
type Manager struct {
menus map[string]IMenu
menuById []IMenu
menuInitializers []func() IMenu
serveMenuNames []string
lock sync.Mutex
server delivery.ILogistic
localHostUrl string
localRepPort uint16
hostUrl string
repPort uint16
}
// NewDeliveryManager creates a new Manager.
// localHostUrl is the local host url which typically host from docker/kubernetes.
// localRepPort is the local port for the manager to listen to and exported for foreign call.
func NewDeliveryManager(localHostUrl string, localRepPort uint16) IManager {
return &Manager{
localHostUrl: localHostUrl,
localRepPort: localRepPort,
}
}
// SelectServeMenus selects the menus to serve.
// not select = serves all
// should call after Init
func (m *Manager) SelectServeMenus(menuNamesNilIsAll ...string) IManager {
m.lock.Lock()
m.serveMenuNames = menuNamesNilIsAll
m.lock.Unlock()
if m.server != nil {
m.server.SetOrderHandlerPerMenu(m.getOrderHandlers())
}
return m
}
// DisableMenu disables a menu.
// should call after Init
func (m *Manager) DisableMenu(name string) IManager {
m.lock.Lock()
if m.serveMenuNames == nil {
m.serveMenuNames = make([]string, len(m.menuById))
for i, menu := range m.menuById {
m.serveMenuNames[i] = menu.Name()
}
}
for i, n := range m.serveMenuNames {
if n == name {
m.serveMenuNames = append(m.serveMenuNames[:i], m.serveMenuNames[i+1:]...)
}
}
m.lock.Unlock()
if m.server != nil {
m.server.SetOrderHandlerPerMenu(m.getOrderHandlers())
}
return m
}
// Init initializes the manager and start listening.
func (m *Manager) Init(ctx context.Context) (IManager, error) {
m.initMenus()
var (
err error
)
m.server = delivery.NewServer(m.localHostUrl, m.localRepPort, m.hostUrl, m.repPort)
var (
handlers = make([]func(context.Context, *delivery.Order), len(m.menuById))
)
if len(m.serveMenuNames) > 0 {
for _, menuName := range m.serveMenuNames {
handlers[m.menus[menuName].ID()] = m.menuById[m.menus[menuName].ID()].orderDish
}
} else {
for _, menu := range m.menuById {
handlers[menu.ID()] = menu.orderDish
}
}
m.server.SetOrderHandlerPerMenu(handlers)
err = m.server.Init(ctx)
if err != nil {
return nil, err
}
return m, err
}
func (m *Manager) getOrderHandlers() []func(context.Context, *delivery.Order) {
var (
handlers = make([]func(context.Context, *delivery.Order), len(m.menuById))
)
m.lock.Lock()
if m.serveMenuNames == nil {
for _, menu := range m.menuById {
handlers[menu.ID()] = menu.orderDish
}
} else {
for _, menuName := range m.serveMenuNames {
handlers[m.menus[menuName].ID()] = m.menuById[m.menus[menuName].ID()].orderDish
}
}
m.lock.Unlock()
return handlers
}
func (m *Manager) initMenus() {
m.menus = make(map[string]IMenu)
m.menuById = make([]IMenu, len(m.menuInitializers))
for i, menuInitializer := range m.menuInitializers {
menu := menuInitializer()
menu.setManager(m, uint32(i))
if len(m.serveMenuNames) > 0 {
for _, name := range m.serveMenuNames {
if menu.Name() == name {
m.menus[name] = menu
m.menuById[i] = menu
break
}
}
} else {
m.menus[menu.Name()] = menu
m.menuById[i] = menu
}
}
}
// AddMenu adds a menu to the manager.
// menuInitializer is a function that returns a menu, TODO should like menu to dispose when disabled.
func (m *Manager) AddMenu(menuInitializer func() IMenu) IManager {
m.menuInitializers = append(m.menuInitializers, menuInitializer)
return m
}
// SetMainKitchen sets the main host for the manager.
func (m *Manager) SetMainKitchen(url string, port uint16) IManager {
m.hostUrl = url
m.repPort = port
if m.server != nil {
m.server.SwitchLeader(url, port)
}
return m
}
// Order orders a dish to the cluster.
func (m *Manager) order(dish IDish) (func(ctx context.Context, input []byte) (output []byte, err error), error) {
return m.server.Order(uint16(dish.Menu().ID()), uint16(dish.Id()))
}