-
Notifications
You must be signed in to change notification settings - Fork 0
/
VolcanoPlanner.scala
144 lines (133 loc) · 5.4 KB
/
VolcanoPlanner.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package core.planner.volcano
import core.execution.Operator
import core.planner.Planner
import core.planner.volcano.logicalplan.LogicalPlan
import core.planner.volcano.memo.{Group, GroupImplementation}
import core.planner.volcano.rules.{ImplementationRule, TransformationRule}
import core.ql.Statement
class VolcanoPlanner extends Planner[VolcanoPlannerContext] {
// round
private val initialRound = 0
// multi-stage transformation
private val transformationRules: Seq[Seq[TransformationRule]] = TransformationRule.ruleBatches
// combined implementation rule, from all implementation rules
private val combinedImplementationRule: ImplementationRule = ImplementationRule.combined
override def getPlan(expr: Statement)(implicit ctx: VolcanoPlannerContext): Operator = {
initialize(expr)
explore()
implement()
ctx.rootGroup.implementation match {
case Some(implementation) => implementation.physicalPlan.operator()
case None => throw new Exception("No implementation found, something went wrong!")
}
}
/**
* Initialization phase
*/
def initialize(query: Statement)(implicit ctx: VolcanoPlannerContext): Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan.toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}
/**
* Exploration phase
*/
def explore()(implicit ctx: VolcanoPlannerContext): Unit = {
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1)
}
}
//noinspection DuplicatedCode
private def exploreGroup(
group: Group,
rules: Seq[TransformationRule],
round: Int
)(implicit ctx: VolcanoPlannerContext): Unit = {
while (!group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if (!equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if (!equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if (!group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}
/**
* Implementation phase
*/
def implement()(implicit ctx: VolcanoPlannerContext): Unit = {
ctx.rootGroup.implementation = Option(implementGroup(ctx.rootGroup, combinedImplementationRule))
}
private def implementGroup(group: Group, combinedRule: ImplementationRule)(
implicit ctx: VolcanoPlannerContext
): GroupImplementation = {
group.implementation match {
case Some(implementation) => implementation
case None =>
var bestImplementation = Option.empty[GroupImplementation]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option(childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some(currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option(
GroupImplementation(
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option(
GroupImplementation(
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}
}