-
Notifications
You must be signed in to change notification settings - Fork 10
/
main.go
133 lines (118 loc) · 3.63 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"fmt"
gklog "github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/stitchfix/flotilla-os/clients/cluster"
"github.com/stitchfix/flotilla-os/clients/logs"
"github.com/stitchfix/flotilla-os/clients/metrics"
"github.com/stitchfix/flotilla-os/clients/middleware"
"github.com/stitchfix/flotilla-os/config"
"github.com/stitchfix/flotilla-os/execution/engine"
"github.com/stitchfix/flotilla-os/flotilla"
flotillaLog "github.com/stitchfix/flotilla-os/log"
"github.com/stitchfix/flotilla-os/queue"
"github.com/stitchfix/flotilla-os/state"
"log"
"os"
)
func main() {
args := os.Args
if len(args) < 2 {
fmt.Println("Usage: flotilla-os <conf_dir>")
os.Exit(1)
}
//
// Use go-kit for structured logging
//
l := gklog.NewLogfmtLogger(gklog.NewSyncWriter(os.Stderr))
l = gklog.With(l, "ts", gklog.DefaultTimestampUTC)
eventSinks := []flotillaLog.EventSink{flotillaLog.NewLocalEventSink()}
logger := flotillaLog.NewLogger(l, eventSinks)
//
// Wrap viper for configuration
//
confDir := args[1]
c, err := config.NewConfig(&confDir)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize config"))
os.Exit(1)
}
//
// Instantiate metrics client.
//
if err = metrics.InstantiateClient(c); err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize metrics client"))
os.Exit(1)
}
//
// Get state manager for reading and writing
// state about definitions and runs
//
stateManager, err := state.NewStateManager(c, logger)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize state manager"))
os.Exit(1)
}
//
// Get registry client for validating images
//
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize registry client"))
os.Exit(1)
}
//
// Get cluster client for validating definitions
// against execution clusters
//
eksClusterClient, err := cluster.NewClusterClient(c, state.EKSEngine)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize EKS cluster client"))
//TODO
//os.Exit(1)
}
eksLogsClient, err := logs.NewLogsClient(c, logger, state.EKSEngine)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize EKS logs client"))
//TODO
//os.Exit(1)
}
//
// Get queue manager for queuing runs
//
eksQueueManager, err := queue.NewQueueManager(c, state.EKSEngine)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize eks queue manager"))
os.Exit(1)
}
emrQueueManager, err := queue.NewQueueManager(c, state.EKSSparkEngine)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize eks queue manager"))
os.Exit(1)
}
//
// Get execution engine for interacting with backend
// execution management framework (eg. EKS)
//
eksExecutionEngine, err := engine.NewExecutionEngine(c, eksQueueManager, state.EKSEngine, logger)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize EKS execution engine"))
os.Exit(1)
}
emrExecutionEngine, err := engine.NewExecutionEngine(c, eksQueueManager, state.EKSSparkEngine, logger)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize EMR execution engine"))
os.Exit(1)
}
middlewareClient, err := middleware.NewClient()
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize middleware client"))
os.Exit(1)
}
app, err := flotilla.NewApp(c, logger, eksLogsClient, eksExecutionEngine, stateManager, eksClusterClient, eksQueueManager, emrExecutionEngine, emrQueueManager, middlewareClient)
if err != nil {
fmt.Printf("%+v\n", errors.Wrap(err, "unable to initialize app"))
os.Exit(1)
}
log.Fatal(app.Run())
}