-
Notifications
You must be signed in to change notification settings - Fork 14
/
pool.go
162 lines (141 loc) · 4.19 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package iocontrol
import (
"io"
"sync"
"time"
)
// WriterPool creates instances of iocontrol.ThrottlerWriter that are
// managed such that they collectively do not exceed a certain rate.
//
// The default value of WriterPool is not to be used, create instances
// with `NewWriterPool`.
type WriterPool struct {
mu sync.Mutex
maxRate int
maxBurst time.Duration
givenOut map[ThrottlerWriter]struct{}
}
// NewWriterPool creates a pool that ensures the writers it wraps will
// respect an overall maxRate, with maxBurst resolution. The semantics
// of the wrapped writers are the same as those of using a plain
// ThrottledWriter.
func NewWriterPool(maxRate int, maxBurst time.Duration) *WriterPool {
return &WriterPool{
maxRate: maxRate,
maxBurst: maxBurst,
givenOut: make(map[ThrottlerWriter]struct{}),
}
}
// Get a throttled writer that wraps w.
func (pool *WriterPool) Get(w io.Writer) (writer io.Writer, release func()) {
// don't export a ThrottlerWriter to prevent users changing the rate
// and expecting their change to be respected, since we might modify
// the rate under their feet
// make the initial rate be 0, the actual rate is
// set in the call to `setSharedRates`.
wr := ThrottledWriter(w, 0, pool.maxBurst)
pool.mu.Lock()
pool.givenOut[wr] = struct{}{}
pool.setSharedRates()
pool.mu.Unlock()
return wr, func() {
pool.mu.Lock()
delete(pool.givenOut, wr)
pool.setSharedRates()
pool.mu.Unlock()
}
}
// SetRate of the pool, updating each given out writer to respect the
// newly set rate. Returns the old rate.
func (pool *WriterPool) SetRate(rate int) int {
pool.mu.Lock()
old := pool.maxRate
pool.maxRate = rate
pool.setSharedRates()
pool.mu.Unlock()
return old
}
// Len is the number of currently given out throttled writers.
func (pool *WriterPool) Len() int {
pool.mu.Lock()
l := len(pool.givenOut)
pool.mu.Unlock()
return l
}
// must be called with a lock held on `pool.mu`
func (pool *WriterPool) setSharedRates() {
if len(pool.givenOut) == 0 {
return
}
perSecPerWriter := pool.maxRate / len(pool.givenOut)
for writer := range pool.givenOut {
writer.SetRate(perSecPerWriter)
}
}
// ReaderPool creates instances of iocontrol.ThrottlerReader that are
// managed such that they collectively do not exceed a certain rate.
//
// The default value of ReaderPool is not to be used, create instances
// with `NewReaderPool`.
type ReaderPool struct {
mu sync.Mutex
maxRate int
maxBurst time.Duration
givenOut map[ThrottlerReader]struct{}
}
// NewReaderPool creates a pool that ensures the writers it wraps will
// respect an overall maxRate, with maxBurst resolution. The semantics
// of the wrapped writers are the same as those of using a plain
// ThrottledReader.
func NewReaderPool(maxRate int, maxBurst time.Duration) *ReaderPool {
return &ReaderPool{
maxRate: maxRate,
maxBurst: maxBurst,
givenOut: make(map[ThrottlerReader]struct{}),
}
}
// Get a throttled reader that wraps r.
func (pool *ReaderPool) Get(r io.Reader) (reader io.Reader, release func()) {
// don't export a ThrottlerReader to prevent users changing the rate
// and expecting their change to be respected, since we might modify
// the rate under their feet
// make the initial rate be 0, the actual rate is
// set in the call to `setSharedRates`.
rd := ThrottledReader(r, 0, pool.maxBurst)
pool.mu.Lock()
pool.givenOut[rd] = struct{}{}
pool.setSharedRates()
pool.mu.Unlock()
return rd, func() {
pool.mu.Lock()
delete(pool.givenOut, rd)
pool.setSharedRates()
pool.mu.Unlock()
}
}
// SetRate of the pool, updating each given out reader to respect the
// newly set rate. Returns the old rate.
func (pool *ReaderPool) SetRate(rate int) int {
pool.mu.Lock()
old := pool.maxRate
pool.maxRate = rate
pool.setSharedRates()
pool.mu.Unlock()
return old
}
// Len is the number of currently given out throttled readers.
func (pool *ReaderPool) Len() int {
pool.mu.Lock()
defer pool.mu.Unlock()
return len(pool.givenOut)
}
// must be called with a lock held on `pool.mu`
func (pool *ReaderPool) setSharedRates() {
if len(pool.givenOut) == 0 {
return
}
perSecPerReader := pool.maxRate / len(pool.givenOut)
for reader := range pool.givenOut {
reader.SetRate(perSecPerReader)
}
}