-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunky.go
executable file
·104 lines (91 loc) · 1.78 KB
/
chunky.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package chunky
import (
"errors"
"reflect"
)
type Builder interface {
Read() (interface{}, bool)
Write(interface{})
Len() int
Schedule()
Factor(int) int
Close() error
}
type Chunk struct {
bufChan reflect.Value
tt reflect.Type
cap int
factor int
fn func(interface{})
close chan int
}
func NewChunk(input interface{}, fn func(interface{}), factor int) Builder {
chunky := &Chunk{}
chunky.bufChan = reflect.ValueOf(input)
chunky.fn = fn
chunky.tt = chunky.bufChan.Type()
chunky.cap = chunky.bufChan.Cap()
chunky.close = make(chan int)
chunky.Factor(factor)
return chunky
}
func (chunk *Chunk) Factor(factor int) int {
if factor > 0 {
chunk.factor = factor
} else {
chunk.factor = 1
}
return chunk.factor
}
func (chunk *Chunk) Len() int {
return chunk.bufChan.Len()
}
func (chunk *Chunk) Read() (interface{}, bool) {
v, ok := chunk.bufChan.Recv()
return v.Interface(), ok
}
func (chunk *Chunk) Write(data interface{}) {
d := reflect.ValueOf(data)
chunk.bufChan.Send(d)
}
func (chunk *Chunk) Close() error {
select {
case chunk.close <- 1:
return nil
default:
return errors.New("Failed to write")
}
}
func (chunk *Chunk) Schedule() {
for {
select {
case <-chunk.close:
return
default:
}
var remainder int
length := chunk.Len()
if length == 0 {
continue
}
factor := chunk.factor
reduce := length / factor
remainder = length - (reduce * factor)
if length < (chunk.cap / factor) {
reduce = length
factor = 1
}
for i := 1; i <= factor; i++ {
if i == factor && remainder > 0 {
reduce = reduce + remainder
}
newVal := reflect.MakeChan(chunk.tt, reduce)
for r := 0; r < reduce; r++ {
if v, ok := chunk.bufChan.Recv(); ok {
newVal.Send(v)
}
}
go chunk.fn(newVal.Interface())
}
}
}