-
Notifications
You must be signed in to change notification settings - Fork 0
/
collection.go
123 lines (104 loc) · 2.51 KB
/
collection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import "bytes"
type Collection struct {
name []byte
root pgnum
dal *dal
}
func newCollection(name []byte, root pgnum) *Collection {
return &Collection{
name: name,
root: root,
}
}
// Find Returns an item according based on the given key by performing a binary search.
func (c *Collection) Find(key []byte) (*Item, error) {
n, err := c.dal.getNode(c.root)
if err != nil {
return nil, err
}
index, containingNode, _, err := n.findKey(key, true)
if err != nil {
return nil, err
}
if index == -1 {
return nil, nil
}
return containingNode.items[index], nil
}
func (c *Collection) Put(key []byte, value []byte) error {
i := newItem(key, value)
var root *Node
var err error
//if there no root assigned to the collection
if c.root == 0 {
root, err = c.dal.writeNode(c.dal.newNode([]*Item{i}, []pgnum{}))
if err != nil {
return err
}
c.root = root.pageNum
return nil
} else {
root, err = c.dal.getNode(c.root)
if err != nil {
return err
}
}
//Find the node where insertion should happen
insertionIndex, nodeToInsertIn, ancestorsIndexes, err := root.findKey(key, true)
if err != nil {
return err
}
if nodeToInsertIn.items != nil && bytes.Compare(nodeToInsertIn.items[insertionIndex].key, key) == 0 {
nodeToInsertIn.items[insertionIndex] = i
} else {
// Add item to the leaf node
nodeToInsertIn.addItem(i, insertionIndex)
}
_, err = c.dal.writeNode(nodeToInsertIn)
if err != nil {
return err
}
ancestors, err := c.getNodes(ancestorsIndexes)
if err != nil {
return err
}
// Rebalance the nodes all the way up. Start From one node before the last and go all the way up. Exclude root.
for i := len(ancestors) - 2; i >= 0; i-- {
pnode := ancestors[i]
node := ancestors[i+1]
nodeIndex := ancestorsIndexes[i+1]
if node.isOverPopulated() {
pnode.split(node, nodeIndex)
}
}
// Handle root
rootNode := ancestors[0]
if rootNode.isOverPopulated() {
newRoot := c.dal.newNode([]*Item{}, []pgnum{rootNode.pageNum})
newRoot.split(rootNode, 0)
// commit newly created root
newRoot, err = c.dal.writeNode(newRoot)
if err != nil {
return err
}
c.root = newRoot.pageNum
}
return nil
}
func (c *Collection) getNodes(indexes []int) ([]*Node, error) {
root, err := c.dal.getNode(c.root)
if err != nil {
return nil, err
}
nodes := []*Node{root}
child := root
for i := 1; i < len(indexes); i++ {
child, err = c.dal.getNode(child.childNodes[indexes[i]])
if err != nil {
return nil, err
}
nodes = append(nodes, child)
}
return nodes, nil
}