Skip to content

Commit

Permalink
Merge pull request #30 from smartcontractkit/block-number
Browse files Browse the repository at this point in the history
Adding ready probe, edge case for base/avalanche
  • Loading branch information
yashnevatia authored Jan 24, 2024
2 parents 3162087 + 30a32fa commit ee8610c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 26 deletions.
8 changes: 5 additions & 3 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func startCommand() *cobra.Command {
if err != nil {
logs.Fatal().Msgf("error initializing configuration: %s", err.Error())
}
// Set healthStatus to error on startup.
// Will be set to "ok" once the rpc connection and subscription is successful.
timelock.SetHealthStatus(timelock.HealthStatusOK)
// Set liveStatus to OK on startup.
// Set readyStatus to Error on startup.
// Will be set to OK once the rpc connection and subscription is successful.
timelock.SetLiveStatus(timelock.HealthStatusOK)
timelock.SetReadyStatus(timelock.HealthStatusError)

startCmd.Flags().StringVarP(&nodeURL, "node-url", "n", timelockConf.NodeURL, "RPC Endpoint for the target blockchain")
startCmd.Flags().StringVarP(&timelockAddress, "timelock-address", "a", timelockConf.TimelockAddress, "Address of the target Timelock contract")
Expand Down
39 changes: 28 additions & 11 deletions pkg/timelock/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,37 @@ const (
HealthStatusError
)

var healthStatus atomic.Value
var liveStatus atomic.Value
var readyStatus atomic.Value

// SetHealthStatus sets the health status.
func SetHealthStatus(status HealthStatus) {
healthStatus.Store(status)
func SetLiveStatus(status HealthStatus) {
liveStatus.Store(status)
}

// GetHealthStatus gets the current health status.
func GetHealthStatus() HealthStatus {
return healthStatus.Load().(HealthStatus)
func GetLiveStatus() HealthStatus {
return liveStatus.Load().(HealthStatus)
}

// Respond to liveness probe based on health status.
func healthHandler(w http.ResponseWriter, r *http.Request) {
status := GetHealthStatus()
func SetReadyStatus(status HealthStatus) {
readyStatus.Store(status)
}

func GetReadyStatus() HealthStatus {
return readyStatus.Load().(HealthStatus)
}

func liveHandler(w http.ResponseWriter, r *http.Request) {
status := GetLiveStatus()
respond(status, w)
}

// Respond to readiness probe based on ready status.
func readyHandler(w http.ResponseWriter, r *http.Request) {
status := GetReadyStatus()
respond(status, w)
}

func respond(status HealthStatus, w http.ResponseWriter) {
var err error
if status == HealthStatusOK {
_, err = w.Write([]byte("OK"))
Expand All @@ -46,7 +62,8 @@ func healthHandler(w http.ResponseWriter, r *http.Request) {
// Starts a http server, serving the healthz endpoint.
func StartHTTPHealthServer() {
mux := http.NewServeMux()
mux.HandleFunc("/healthz", healthHandler)
mux.HandleFunc("/healthz", liveHandler)
mux.HandleFunc("/ready", readyHandler)

server := &http.Server{
Addr: ":8080",
Expand Down
16 changes: 8 additions & 8 deletions pkg/timelock/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
)

func TestHealthHandler(t *testing.T) {
// Initialize healthStatus to HealthStatusOK
SetHealthStatus(HealthStatusOK)
// Initialize liveStatus to HealthStatusOK
SetLiveStatus(HealthStatusOK)

// Create a request to the healthz endpoint
req := httptest.NewRequest("GET", "http://example.com/healthz", nil)
// Create a ResponseRecorder to capture the response
rr := httptest.NewRecorder()

// Call the healthHandler function
healthHandler(rr, req)
// Call the liveHandler function
liveHandler(rr, req)

// Check the response status code
if status := rr.Code; status != http.StatusOK {
Expand All @@ -29,16 +29,16 @@ func TestHealthHandler(t *testing.T) {
t.Errorf("Handler returned unexpected body: got %v want %v", body, expectedBody)
}

// Change healthStatus to HealthStatusError
SetHealthStatus(HealthStatusError)
// Change liveStatus to HealthStatusError
SetLiveStatus(HealthStatusError)

// Create a new request to the healthz endpoint
req = httptest.NewRequest("GET", "http://example.com/healthz", nil)
// Create a new ResponseRecorder
rr = httptest.NewRecorder()

// Call the healthHandler function again
healthHandler(rr, req)
// Call the liveHandler function again
liveHandler(rr, req)

// Check the response status code
if status := rr.Code; status != http.StatusInternalServerError {
Expand Down
10 changes: 6 additions & 4 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (tw *Worker) Listen(ctx context.Context) error {
FromBlock: tw.fromBlock,
}

tw.logger.Info().Msgf("Starting subscription")
// Create the new subscription with the predefined query.
sub, err := tw.ethClient.SubscribeFilterLogs(ctx, query, logCh)
if err != nil {
Expand All @@ -172,8 +173,9 @@ func (tw *Worker) Listen(ctx context.Context) error {
}
}()

// Setting healthStatus here because we want to make sure subscription is up.
SetHealthStatus(HealthStatusOK)
// Setting readyStatus here because we want to make sure subscription is up.
tw.logger.Info().Msgf("Initial subscription complete")
SetReadyStatus(HealthStatusOK)

// This is the goroutine watching over the subscription.
// We want wg.Done() to cancel the whole execution, so don't add more than 1 to wg.
Expand Down Expand Up @@ -240,13 +242,13 @@ func (tw *Worker) Listen(ctx context.Context) error {
if err != nil {
tw.logger.Info().Msgf("subscription: %s", err.Error())
loop = false
SetHealthStatus(HealthStatusError)
SetReadyStatus(HealthStatusError)
}

case signal := <-stopCh:
tw.logger.Info().Msgf("received OS signal %s", signal)
loop = false
SetHealthStatus(HealthStatusError)
SetReadyStatus(HealthStatusError)
}
}
wg.Done()
Expand Down

0 comments on commit ee8610c

Please sign in to comment.