forked from joncrlsn/dque
-
Notifications
You must be signed in to change notification settings - Fork 0
/
segment_test.go
253 lines (214 loc) · 7.7 KB
/
segment_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// segement_test.go
package dque
//
// White box texting of the aSegment struct and methods.
//
import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
)
// item1 is the thing we'll be storing in the queue
type item1 struct {
Name string
}
// item1Builder creates a new item and returns a pointer to it.
// This is used when we load a queue from disk.
func item1Builder() interface{} {
return &item1{}
}
// Test_segment verifies the behavior of one segment.
func TestSegment(t *testing.T) {
testDir := "./TestSegment"
os.RemoveAll(testDir)
if err := os.Mkdir(testDir, 0755); err != nil {
t.Fatalf("Error creating directory from the TestSegment method: %s\n", err)
}
// Create a new segment of the queue
seg, err := newQueueSegment(testDir, 1, false, item1Builder)
if err != nil {
t.Fatalf("newQueueSegment('%s') failed with '%s'\n", testDir, err.Error())
}
//
// Add some items and remove one
//
assert(t, seg.add(&item1{Name: "Number 1"}) == nil, "failed to add item1")
assert(t, 1 == seg.size(), "Expected size of 1")
assert(t, seg.add(&item1{Name: "Number 2"}) == nil, "failed to add item2")
assert(t, 2 == seg.size(), "Expected size of 2")
_, err = seg.remove()
if err != nil {
t.Fatalf("Remove() failed with '%s'\n", err.Error())
}
assert(t, 1 == seg.size(), "Expected size of 1")
assert(t, 2 == seg.sizeOnDisk(), "Expected sizeOnDisk of 2")
assert(t, seg.add(&item1{Name: "item3"}) == nil, "failed to add item3")
assert(t, 2 == seg.size(), "Expected size of 2")
_, err = seg.remove()
if err != nil {
t.Fatalf("Remove() failed with '%s'\n", err.Error())
}
assert(t, 1 == seg.size(), "Expected size of 1")
//
// Recreate the segment from disk and remove the remaining item
//
seg, err = openQueueSegment(testDir, 1, false, item1Builder)
if err != nil {
t.Fatalf("openQueueSegment('%s') failed with '%s'\n", testDir, err.Error())
}
assert(t, 1 == seg.size(), "Expected size of 1")
_, err = seg.remove()
if err != nil {
if err != errEmptySegment {
t.Fatalf("Remove() failed with '%s'\n", err.Error())
}
}
assert(t, 0 == seg.size(), "Expected size of 0")
// Cleanup
if err := os.RemoveAll(testDir); err != nil {
t.Fatalf("Error cleaning up directory from the TestSegment method with '%s'\n", err.Error())
}
}
// TestSegment_ErrCorruptedSegment tests error handling for corrupted data
func TestSegment_ErrCorruptedSegment(t *testing.T) {
testDir := "./TestSegmentError"
os.RemoveAll(testDir)
defer os.RemoveAll((testDir))
if err := os.Mkdir(testDir, 0755); err != nil {
t.Fatalf("Error creating directory in the TestSegment_ErrCorruptedSegment method: %s\n", err)
}
f, err := os.Create((&qSegment{dirPath: testDir}).filePath())
if err != nil {
t.Fatal(err)
}
// expect an 8 byte object, but only write 7 bytes
if _, err := f.Write([]byte{0, 0, 0, 8, 1, 2, 3, 4, 5, 6, 7}); err != nil {
t.Fatal(err)
}
f.Close()
_, err = openQueueSegment(testDir, 0, false, func() interface{} { return make([]byte, 8) })
if err == nil {
t.Fatal("expected ErrCorruptedSegment but got nil")
}
// // go >= 1.13:
// var corruptedError ErrCorruptedSegment
// if !errors.As(err, &corruptedError) {
// t.Fatalf("expected ErrCorruptedSegment but got %T: %s", err, err)
// }
corruptedError, ok := unwrapError(unwrapError(err)).(ErrCorruptedSegment)
if !ok {
t.Fatalf("expected ErrCorruptedSegment but got %T: %s", err, err)
}
if corruptedError.Path != "TestSegmentError/0000000000000.dque" {
t.Fatalf("unexpected file path: %s", corruptedError.Path)
}
if corruptedError.Error() != "segment file TestSegmentError/0000000000000.dque is corrupted: error reading gob data from file: unexpected EOF" {
t.Fatalf("wrong error message: %s", corruptedError.Error())
}
}
func unwrapError(err error) error {
return err.(interface{ Unwrap() error }).Unwrap()
}
// TestSegment_Open verifies the behavior of the openSegment function.
func TestSegment_openQueueSegment_failIfNew(t *testing.T) {
testDir := "./TestSegment_Open"
os.RemoveAll(testDir)
if err := os.Mkdir(testDir, 0755); err != nil {
t.Fatalf("Error creating directory in the TestSegment_Open method: %s\n", err)
}
seg, err := openQueueSegment(testDir, 1, false, item1Builder)
if err == nil {
t.Fatalf("openQueueSegment('%s') should have failed because it should be new\n", testDir)
}
assert(t, seg == nil, "segment after failure must be nil")
// Cleanup
if err := os.RemoveAll(testDir); err != nil {
t.Fatalf("Error cleaning up directory from the TestSegment_Open method with '%s'\n", err.Error())
}
}
// TestSegment_Turbo verifies the behavior of the turboOn() and turboOff() methods.
func TestSegment_Turbo(t *testing.T) {
testDir := "./TestSegment"
os.RemoveAll(testDir)
if err := os.Mkdir(testDir, 0755); err != nil {
t.Fatalf("Error creating directory in the TestSegment_Turbo method: %s\n", err)
}
seg, err := newQueueSegment(testDir, 10, false, item1Builder)
if err != nil {
t.Fatalf("newQueueSegment('%s') failed\n", testDir)
}
// turbo is off so expect syncCount to change
assert(t, seg.add(&item1{Name: "Number 1"}) == nil, "failed to add item1")
assert(t, 1 == seg.size(), "Expected size of 1")
assert(t, 1 == seg.syncCount, "syncCount must be 1")
// Turn on turbo and expect sync count to stay the same.
seg.turboOn()
assert(t, seg.add(&item1{Name: "Number 2"}) == nil, "failed to add item2")
assert(t, 2 == seg.size(), "Expected size of 2")
assert(t, 1 == seg.syncCount, "syncCount must still be 1")
// Turn off turbo and expect the syncCount to increase when remove is called.
if err = seg.turboOff(); err != nil {
t.Fatalf("Unexpecte error turning off turbo('%s')\n", testDir)
}
// seg.turboOff() calls seg.turboSync() which increments syncCount
assert(t, 2 == seg.syncCount, "syncCount must be 2 now")
_, err = seg.remove()
if err != nil {
t.Fatalf("Remove() failed with '%s'\n", err.Error())
}
// seg.remove() calls seg._sync() which increments syncCount
assert(t, 3 == seg.syncCount, "syncCount must be 3 now")
// Cleanup
if err := os.RemoveAll(testDir); err != nil {
t.Fatalf("Error cleaning up directory from the TestSegment_Open method with '%s'\n", err.Error())
}
}
type ItemByteSlice struct {
Item []byte
}
func itemByteSliceBuilder() interface{} {
return &ItemByteSlice{}
}
// TestSegment_BufferSafe make sure items aren't broken if they share some data structures
func TestSegment_BufferSafe(t *testing.T) {
testDir := "./TestSegment_Open"
os.RemoveAll(testDir)
if err := os.Mkdir(testDir, 0755); err != nil {
t.Fatalf("Error creating directory in the TestSegment_Open method: %s\n", err)
}
seg, err := newQueueSegment(testDir, 10, false, itemByteSliceBuilder)
if err != nil {
t.Fatalf("newQueueSegment('%s') failed\n", testDir)
}
buf := make([]byte, 1024)
n := copy(buf, []byte("foobar"))
seg.add(&ItemByteSlice{buf[:n]})
n = copy(buf, []byte("somewords"))
seg.add(&ItemByteSlice{buf[:n]})
data, err := seg.remove()
if err != nil {
t.Fatalf("remove item from seg error: %s", err.Error())
}
item := string(data.(*ItemByteSlice).Item)
assert(t, item == "foobar", "item content not match, found: %s", item)
data, err = seg.remove()
if err != nil {
t.Fatalf("remove item from seg error: %s", err.Error())
}
item = string(data.(*ItemByteSlice).Item)
assert(t, item == "somewords", "item content not match, found: %s", item)
// Cleanup
if err := os.RemoveAll(testDir); err != nil {
t.Fatalf("Error cleaning up directory from the TestSegment_Open method with '%s'\n", err.Error())
}
}
// assert fails the test if the condition is false.
func assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
if !condition {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
tb.FailNow()
}
}