-
Notifications
You must be signed in to change notification settings - Fork 2
/
rebaser.go
105 lines (92 loc) · 2.27 KB
/
rebaser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package distil
import (
"math"
"github.com/SoftwareDefinedBuildings/btrdb-go"
)
// This specifies an input data preprocessor. It may do anything, but is
// typically used for rebasing input streams (removing duplicates and)
// padding missing values
type Rebaser interface {
Process(start, end int64, input chan btrdb.StandardValue) chan btrdb.StandardValue
}
// Return a rebaser that does not modify input data
func RebasePassthrough() Rebaser {
return &noRebase{}
}
type noRebase struct{}
func (n *noRebase) Process(start, end int64, input chan btrdb.StandardValue) chan btrdb.StandardValue {
return input
}
type padSnapRebaser struct {
freq int64
}
func RebasePadSnap(freq int64) Rebaser {
return &padSnapRebaser{ freq: freq }
}
func (rb *padSnapRebaser) Process(start, end int64, input chan btrdb.StandardValue) chan btrdb.StandardValue {
rv := make(chan btrdb.StandardValue, 1000)
const NANO = int64(1000000000)
period := NANO / rb.freq
offset := period / 2
snap := func(T *int64) {
subsec := *T % NANO
sec := *T - subsec
cycle := (subsec + offset) / period
if cycle >= rb.freq {
sec += NANO
cycle -= rb.freq
}
subsec = cycle * period
*T = sec + subsec
}
discard := func(c chan btrdb.StandardValue) {
for _ = range c {
}
}
snap(&start)
snap(&end)
go func() {
expectedTime := start
for v := range input {
//First snap the point
snap(&v.Time)
//Now see if it is before the point we are expecting
if v.Time < expectedTime {
continue //drop it
}
//If it is greater than what we expect, emit until we hit it or the end
for v.Time > expectedTime {
rv <- btrdb.StandardValue{Time: expectedTime, Value: math.NaN()}
expectedTime += period
snap(&expectedTime)
if expectedTime >= end {
close(rv)
discard(input)
return
}
}
//If it is what we expect, emit it
if v.Time == expectedTime {
rv <- v
expectedTime += period
snap(&expectedTime)
if expectedTime >= end {
close(rv)
discard(input)
return
}
continue
}
}
//Now we ran out of input. Pad until output
for expectedTime < end {
rv <- btrdb.StandardValue{Time: expectedTime, Value: math.NaN()}
expectedTime += period
snap(&expectedTime)
}
close(rv)
discard(input)
return
}()
return rv
}