-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.go
116 lines (90 loc) · 2.7 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"strconv"
"github.com/film42/pgreba/config"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)
type HealthCheckWebService struct {
healthChecker *HealthChecker
}
func (hc *HealthCheckWebService) apiGetIsPrimary(w http.ResponseWriter, r *http.Request) {
nodeInfo, err := hc.healthChecker.dataSource.GetNodeInfo()
if err != nil {
// Return a 500. Something bad happened.
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !nodeInfo.IsPrimary() {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(nodeInfo)
}
func (hc *HealthCheckWebService) apiGetIsReplica(w http.ResponseWriter, r *http.Request) {
nodeInfo, err := hc.healthChecker.dataSource.GetNodeInfo()
if err != nil {
// Return a 500. Something bad happened.
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// if replica OR byte lag exceeds max_allowable_byte_lag then return 500
if !nodeInfo.IsReplica() || maxAllowableByteLagExceeded(r, nodeInfo) {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(nodeInfo)
}
func maxAllowableByteLagExceeded(r *http.Request, nodeInfo *NodeInfo) bool {
maxAllowableByteLagString := r.URL.Query().Get("max_allowable_byte_lag")
// If byte lag was not specified, assume the replica is up to date.
if len(maxAllowableByteLagString) == 0 {
return false
}
maxAllowableByteLag, err := strconv.ParseInt(maxAllowableByteLagString, 10, 64)
if err != nil {
panic(err)
}
return nodeInfo.ByteLag > maxAllowableByteLag
}
func main() {
versionPtr := flag.Bool("version", false, "Print the teecp version and exit.")
flag.Parse()
if *versionPtr {
fmt.Println("1.3.2")
return
}
if len(os.Args) < 2 {
panic(errors.New("Please provide a path to config yml."))
}
pathToConfig := os.Args[1]
cfg, err := config.ParseConfig(pathToConfig)
if err != nil {
panic(err)
}
ds := NewPgReplicationDataSource(cfg)
defer ds.Close()
// Wrap the data source in a caching layer to prevent
// many concurrent health-checks from bogging things down.
ds = NewCachedDataSource(ds)
// Fake data source
fds := new(fakeDataSource)
fds = fds
hc := NewHealthChecker(ds)
hcs := &HealthCheckWebService{healthChecker: hc}
router := mux.NewRouter()
router.Use(func(next http.Handler) http.Handler {
return handlers.LoggingHandler(log.Writer(), next)
})
router.HandleFunc("/", hcs.apiGetIsPrimary).Methods("GET")
router.HandleFunc("/primary", hcs.apiGetIsPrimary).Methods("GET")
// For replicas
router.HandleFunc("/replica", hcs.apiGetIsReplica).Methods("GET")
log.Println("Listening on :8000")
http.ListenAndServe(":8000", router)
}