-
Notifications
You must be signed in to change notification settings - Fork 5
/
chann.go
268 lines (247 loc) · 7.44 KB
/
chann.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2021 The golang.design Initiative Authors.
// All rights reserved. Use of this source code is governed
// by a MIT license that can be found in the LICENSE file.
//
// Written by Changkun Ou <changkun.de>
// Package chann providesa a unified channel package.
//
// The package is compatible with existing buffered and unbuffered
// channels. For example, in Go, to create a buffered or unbuffered
// channel, one uses built-in function `make` to create a channel:
//
// ch := make(chan int) // unbuffered channel
// ch := make(chan int, 42) // or buffered channel
//
// However, all these channels have a finite capacity for caching, and
// it is impossible to create a channel with unlimited capacity, namely,
// an unbounded channel.
//
// This package provides the ability to create all possible types of
// channels. To create an unbuffered or a buffered channel:
//
// ch := chann.New[int](chann.Cap(0)) // unbuffered channel
// ch := chann.New[int](chann.Cap(42)) // or buffered channel
//
// More importantly, when the capacity of the channel is unspecified,
// or provided as negative values, the created channel is an unbounded
// channel:
//
// ch := chann.New[int]() // unbounded channel
// ch := chann.New[int](chann.Cap(-42)) // or unbounded channel
//
// Furthermore, all channels provides methods to send (In()),
// receive (Out()), and close (Close()).
//
// Note that to close a channel, must use Close() method instead of the
// language built-in method
// Two additional methods: ApproxLen and Cap returns the current status
// of the channel: an approximation of the current length of the channel,
// as well as the current capacity of the channel.
//
// See https://golang.design/research/ultimate-channel to understand
// the motivation of providing this package and the possible use cases
// with this package.
package chann // import "golang.design/x/chann"
import (
"sync/atomic"
)
// Opt represents an option to configure the created channel. The current possible
// option is Cap.
type Opt func(*config)
// Cap is the option to configure the capacity of a creating buffer.
// if the provided number is 0, Cap configures the creating buffer to a
// unbuffered channel; if the provided number is a positive integer, then
// Cap configures the creating buffer to a buffered channel with the given
// number of capacity for caching. If n is a negative integer, then it
// configures the creating channel to become an unbounded channel.
func Cap(n int) Opt {
return func(s *config) {
switch {
case n == 0:
s.cap = int64(0)
s.typ = unbuffered
case n > 0:
s.cap = int64(n)
s.typ = buffered
default:
s.cap = int64(-1)
s.typ = unbounded
}
}
}
// Chann is a generic channel abstraction that can be either buffered,
// unbuffered, or unbounded. To create a new channel, use New to allocate
// one, and use Cap to configure the capacity of the channel.
type Chann[T any] struct {
in, out chan T
close chan struct{}
cfg *config
q []T
}
// New returns a Chann that may be a buffered, an unbuffered or an
// unbounded channel. To configure the type of the channel, use Cap.
//
// By default, or without specification, the function returns an unbounded
// channel with unlimited capacity.
//
// ch := chann.New[float64]()
// // or
// ch := chann.New[float64](chann.Cap(-1))
//
// If the chann.Cap specified a non-negative integer, the returned channel
// is either unbuffered (0) or buffered (positive).
//
// An unbounded channel is not a buffered channel with infinite capacity,
// and they have different memory model semantics in terms of receiving
// a value: The recipient of a buffered channel is immediately available
// after a send is complete. However, the recipient of an unbounded channel
// may be available within a bounded time frame after a send is complete.
//
// Note that although the input arguments are specified as variadic parameter
// list, however, the function panics if there is more than one option is
// provided.
func New[T any](opts ...Opt) *Chann[T] {
cfg := &config{
cap: -1, len: 0,
typ: unbounded,
}
if len(opts) > 1 {
panic("chann: too many arguments")
}
for _, o := range opts {
o(cfg)
}
ch := &Chann[T]{cfg: cfg, close: make(chan struct{})}
switch ch.cfg.typ {
case unbuffered:
ch.in = make(chan T)
ch.out = ch.in
case buffered:
ch.in = make(chan T, ch.cfg.cap)
ch.out = ch.in
case unbounded:
ch.in = make(chan T, 16)
ch.out = make(chan T, 16)
go ch.unboundedProcessing()
}
return ch
}
// In returns the send channel of the given Chann, which can be used to
// send values to the channel. If one closes the channel using close(),
// it will result in a runtime panic. Instead, use Close() method.
func (ch *Chann[T]) In() chan<- T { return ch.in }
// Out returns the receive channel of the given Chann, which can be used
// to receive values from the channel.
func (ch *Chann[T]) Out() <-chan T { return ch.out }
// Close closes the channel gracefully.
func (ch *Chann[T]) Close() {
switch ch.cfg.typ {
case buffered, unbuffered:
close(ch.in)
close(ch.close)
default:
ch.close <- struct{}{}
}
}
// unboundedProcessing is a processing loop that implements unbounded
// channel semantics.
func (ch *Chann[T]) unboundedProcessing() {
var nilT T
ch.q = make([]T, 0, 1<<10)
for {
select {
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
ch.q = append(ch.q, e)
case <-ch.close:
ch.unboundedTerminate()
return
}
for len(ch.q) > 0 {
select {
case ch.out <- ch.q[0]:
atomic.AddInt64(&ch.cfg.len, -1)
ch.q[0] = nilT
ch.q = ch.q[1:]
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
ch.q = append(ch.q, e)
case <-ch.close:
ch.unboundedTerminate()
return
}
}
if cap(ch.q) < 1<<5 {
ch.q = make([]T, 0, 1<<10)
}
}
}
// unboundedTerminate terminates the unbounde channel's processing loop
// and make sure all unprocessed elements either be consumed if there is
// a pending receiver.
func (ch *Chann[T]) unboundedTerminate() {
var nilT T
close(ch.in)
for e := range ch.in {
ch.q = append(ch.q, e)
}
for len(ch.q) > 0 {
select {
case ch.out <- ch.q[0]:
// The default branch exists because we need guarantee
// the loop can terminate. If there is a receiver, the
// first case will ways be selected. See #3.
default:
}
ch.q[0] = nilT // de-reference earlier to help GC
ch.q = ch.q[1:]
}
close(ch.out)
close(ch.close)
}
// isClose reports the close status of a channel.
func (ch *Chann[T]) isClosed() bool {
select {
case <-ch.close:
return true
default:
return false
}
}
// Len returns an approximation of the length of the channel.
//
// Note that in a concurrent scenario, the returned length of a channel
// may never be accurate. Hence the function is named with an Approx prefix.
func (ch *Chann[T]) Len() int {
switch ch.cfg.typ {
case buffered, unbuffered:
return len(ch.in)
default:
return int(atomic.LoadInt64(&ch.cfg.len)) + len(ch.in) + len(ch.out)
}
}
// Cap returns the capacity of the channel.
func (ch *Chann[T]) Cap() int {
switch ch.cfg.typ {
case buffered, unbuffered:
return cap(ch.in)
default:
return int(atomic.LoadInt64(&ch.cfg.cap)) + cap(ch.in) + cap(ch.out)
}
}
type chanType int
const (
unbuffered chanType = iota
buffered
unbounded
)
type config struct {
typ chanType
len, cap int64
}