From 34ceb75375f22457d5bf9594bf783ec7a61e8671 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Mon, 22 Jan 2024 21:57:29 +0800 Subject: [PATCH 1/4] Adding ready probe, edge case for base/avalanche --- cmd/start.go | 8 +++++--- pkg/timelock/health.go | 39 ++++++++++++++++++++++++++----------- pkg/timelock/health_test.go | 12 ++++++------ pkg/timelock/timelock.go | 31 ++++++++++++++++++++++++++--- 4 files changed, 67 insertions(+), 23 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index b3f5014..8cd80ad 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -27,9 +27,11 @@ func startCommand() *cobra.Command { if err != nil { logs.Fatal().Msgf("error initializing configuration: %s", err.Error()) } - // Set healthStatus to error on startup. - // Will be set to "ok" once the rpc connection and subscription is successful. - timelock.SetHealthStatus(timelock.HealthStatusOK) + // Set healthStatus to OK on startup. + // Set readyStatus to Error on startup. + // Will be set to OK once the rpc connection and subscription is successful. + timelock.SetLiveStatus(timelock.HealthStatusOK) + timelock.SetReadyStatus(timelock.HealthStatusError) startCmd.Flags().StringVarP(&nodeURL, "node-url", "n", timelockConf.NodeURL, "RPC Endpoint for the target blockchain") startCmd.Flags().StringVarP(&timelockAddress, "timelock-address", "a", timelockConf.TimelockAddress, "Address of the target Timelock contract") diff --git a/pkg/timelock/health.go b/pkg/timelock/health.go index acc1525..de4e065 100644 --- a/pkg/timelock/health.go +++ b/pkg/timelock/health.go @@ -15,21 +15,37 @@ const ( HealthStatusError ) -var healthStatus atomic.Value +var liveStatus atomic.Value +var readyStatus atomic.Value -// SetHealthStatus sets the health status. -func SetHealthStatus(status HealthStatus) { - healthStatus.Store(status) +func SetLiveStatus(status HealthStatus) { + liveStatus.Store(status) } -// GetHealthStatus gets the current health status. -func GetHealthStatus() HealthStatus { - return healthStatus.Load().(HealthStatus) +func GetLiveStatus() HealthStatus { + return liveStatus.Load().(HealthStatus) } -// Respond to liveness probe based on health status. -func healthHandler(w http.ResponseWriter, r *http.Request) { - status := GetHealthStatus() +func SetReadyStatus(status HealthStatus) { + readyStatus.Store(status) +} + +func GetReadyStatus() HealthStatus { + return readyStatus.Load().(HealthStatus) +} + +func liveHandler(w http.ResponseWriter, r *http.Request) { + status := GetLiveStatus() + respond(status, w, r) +} + +// Respond to readiness probe based on ready status. +func readyHandler(w http.ResponseWriter, r *http.Request) { + status := GetReadyStatus() + respond(status, w, r) +} + +func respond(status HealthStatus, w http.ResponseWriter, r *http.Request) { var err error if status == HealthStatusOK { _, err = w.Write([]byte("OK")) @@ -46,7 +62,8 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { // Starts a http server, serving the healthz endpoint. func StartHTTPHealthServer() { mux := http.NewServeMux() - mux.HandleFunc("/healthz", healthHandler) + mux.HandleFunc("/healthz", liveHandler) + mux.HandleFunc("/ready", readyHandler) server := &http.Server{ Addr: ":8080", diff --git a/pkg/timelock/health_test.go b/pkg/timelock/health_test.go index 4b8d4f0..453bb6f 100644 --- a/pkg/timelock/health_test.go +++ b/pkg/timelock/health_test.go @@ -8,15 +8,15 @@ import ( func TestHealthHandler(t *testing.T) { // Initialize healthStatus to HealthStatusOK - SetHealthStatus(HealthStatusOK) + SetLiveStatus(HealthStatusOK) // Create a request to the healthz endpoint req := httptest.NewRequest("GET", "http://example.com/healthz", nil) // Create a ResponseRecorder to capture the response rr := httptest.NewRecorder() - // Call the healthHandler function - healthHandler(rr, req) + // Call the liveHandler function + liveHandler(rr, req) // Check the response status code if status := rr.Code; status != http.StatusOK { @@ -30,15 +30,15 @@ func TestHealthHandler(t *testing.T) { } // Change healthStatus to HealthStatusError - SetHealthStatus(HealthStatusError) + SetLiveStatus(HealthStatusError) // Create a new request to the healthz endpoint req = httptest.NewRequest("GET", "http://example.com/healthz", nil) // Create a new ResponseRecorder rr = httptest.NewRecorder() - // Call the healthHandler function again - healthHandler(rr, req) + // Call the liveHandler function again + liveHandler(rr, req) // Check the response status code if status := rr.Code; status != http.StatusInternalServerError { diff --git a/pkg/timelock/timelock.go b/pkg/timelock/timelock.go index 019bd07..3243608 100644 --- a/pkg/timelock/timelock.go +++ b/pkg/timelock/timelock.go @@ -11,6 +11,7 @@ import ( "sync" "syscall" "time" + "strings" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" @@ -38,6 +39,7 @@ type Worker struct { ABI *abi.ABI address []common.Address fromBlock *big.Int + useBlockOffset bool pollPeriod int64 logger *zerolog.Logger privateKey *ecdsa.PrivateKey @@ -73,6 +75,11 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st return nil, fmt.Errorf("from block can't be a negative number (minimum value 0): got %d", pollPeriod) } + useBlockOffset := false + if strings.Contains(nodeURL, "avalanche-testnet") || strings.Contains(nodeURL, "base-testnet") { + useBlockOffset = true + } + if _, err := crypto.HexToECDSA(privateKey); err != nil { return nil, fmt.Errorf("the provided private key is not valid: got %s", privateKey) } @@ -119,6 +126,7 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st pollPeriod: pollPeriod, logger: logger, privateKey: privateKeyECDSA, + useBlockOffset: useBlockOffset, scheduler: *newScheduler(defaultSchedulerDelay), } @@ -148,12 +156,28 @@ func (tw *Worker) Listen(ctx context.Context) error { } }() + if tw.useBlockOffset { + tw.logger.Info().Msgf("RPC takes time to respond, will set closer blockOffset") + latestBlockNumber, err := tw.ethClient.BlockNumber(ctx) + if err != nil { + return err + } + tw.logger.Info().Msgf("Queried latest block number %v", latestBlockNumber) + if latestBlockNumber >= 10000 { + tw.fromBlock = new(big.Int).SetUint64(latestBlockNumber - 10000) + } else { + tw.fromBlock = new(big.Int).SetUint64(latestBlockNumber) + } + tw.logger.Info().Msgf("Setting from-block to %v", tw.fromBlock.String()) + } + // FilterQuery to be feed to the subscription and FilterLogs. query := ethereum.FilterQuery{ Addresses: tw.address, FromBlock: tw.fromBlock, } + tw.logger.Info().Msgf("Starting subscription") // Create the new subscription with the predefined query. sub, err := tw.ethClient.SubscribeFilterLogs(ctx, query, logCh) if err != nil { @@ -173,7 +197,8 @@ func (tw *Worker) Listen(ctx context.Context) error { }() // Setting healthStatus here because we want to make sure subscription is up. - SetHealthStatus(HealthStatusOK) + tw.logger.Info().Msgf("Initial subscription complete") + SetReadyStatus(HealthStatusOK) // This is the goroutine watching over the subscription. // We want wg.Done() to cancel the whole execution, so don't add more than 1 to wg. @@ -240,13 +265,13 @@ func (tw *Worker) Listen(ctx context.Context) error { if err != nil { tw.logger.Info().Msgf("subscription: %s", err.Error()) loop = false - SetHealthStatus(HealthStatusError) + SetReadyStatus(HealthStatusError) } case signal := <-stopCh: tw.logger.Info().Msgf("received OS signal %s", signal) loop = false - SetHealthStatus(HealthStatusError) + SetReadyStatus(HealthStatusError) } } wg.Done() From f58a6adbf174c9b51114de999cb86fee6d33854b Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Mon, 22 Jan 2024 22:01:25 +0800 Subject: [PATCH 2/4] Comment changes --- cmd/start.go | 2 +- pkg/timelock/health_test.go | 4 ++-- pkg/timelock/timelock.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 8cd80ad..6ea5991 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -27,7 +27,7 @@ func startCommand() *cobra.Command { if err != nil { logs.Fatal().Msgf("error initializing configuration: %s", err.Error()) } - // Set healthStatus to OK on startup. + // Set liveStatus to OK on startup. // Set readyStatus to Error on startup. // Will be set to OK once the rpc connection and subscription is successful. timelock.SetLiveStatus(timelock.HealthStatusOK) diff --git a/pkg/timelock/health_test.go b/pkg/timelock/health_test.go index 453bb6f..d62f9b9 100644 --- a/pkg/timelock/health_test.go +++ b/pkg/timelock/health_test.go @@ -7,7 +7,7 @@ import ( ) func TestHealthHandler(t *testing.T) { - // Initialize healthStatus to HealthStatusOK + // Initialize liveStatus to HealthStatusOK SetLiveStatus(HealthStatusOK) // Create a request to the healthz endpoint @@ -29,7 +29,7 @@ func TestHealthHandler(t *testing.T) { t.Errorf("Handler returned unexpected body: got %v want %v", body, expectedBody) } - // Change healthStatus to HealthStatusError + // Change liveStatus to HealthStatusError SetLiveStatus(HealthStatusError) // Create a new request to the healthz endpoint diff --git a/pkg/timelock/timelock.go b/pkg/timelock/timelock.go index 3243608..efe1e5a 100644 --- a/pkg/timelock/timelock.go +++ b/pkg/timelock/timelock.go @@ -8,10 +8,10 @@ import ( "net/url" "os" "os/signal" + "strings" "sync" "syscall" "time" - "strings" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" @@ -76,7 +76,7 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st } useBlockOffset := false - if strings.Contains(nodeURL, "avalanche-testnet") || strings.Contains(nodeURL, "base-testnet") { + if strings.Contains(nodeURL, "avalanche-testnet") || strings.Contains(nodeURL, "base-testnet") { useBlockOffset = true } @@ -196,7 +196,7 @@ func (tw *Worker) Listen(ctx context.Context) error { } }() - // Setting healthStatus here because we want to make sure subscription is up. + // Setting readyStatus here because we want to make sure subscription is up. tw.logger.Info().Msgf("Initial subscription complete") SetReadyStatus(HealthStatusOK) From c3dc08dd2aeda05413fa04a1368e0bbd5854a696 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Mon, 22 Jan 2024 22:17:13 +0800 Subject: [PATCH 3/4] formatting --- pkg/timelock/health.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/timelock/health.go b/pkg/timelock/health.go index de4e065..35c2725 100644 --- a/pkg/timelock/health.go +++ b/pkg/timelock/health.go @@ -36,16 +36,16 @@ func GetReadyStatus() HealthStatus { func liveHandler(w http.ResponseWriter, r *http.Request) { status := GetLiveStatus() - respond(status, w, r) + respond(status, w) } // Respond to readiness probe based on ready status. func readyHandler(w http.ResponseWriter, r *http.Request) { status := GetReadyStatus() - respond(status, w, r) + respond(status, w) } -func respond(status HealthStatus, w http.ResponseWriter, r *http.Request) { +func respond(status HealthStatus, w http.ResponseWriter) { var err error if status == HealthStatusOK { _, err = w.Write([]byte("OK")) From 30a32fa7a9f8ac029a8cdd1112c816d2e245d890 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Tue, 23 Jan 2024 21:45:11 +0800 Subject: [PATCH 4/4] Removing block offset --- pkg/timelock/timelock.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/pkg/timelock/timelock.go b/pkg/timelock/timelock.go index efe1e5a..9ff8d17 100644 --- a/pkg/timelock/timelock.go +++ b/pkg/timelock/timelock.go @@ -8,7 +8,6 @@ import ( "net/url" "os" "os/signal" - "strings" "sync" "syscall" "time" @@ -39,7 +38,6 @@ type Worker struct { ABI *abi.ABI address []common.Address fromBlock *big.Int - useBlockOffset bool pollPeriod int64 logger *zerolog.Logger privateKey *ecdsa.PrivateKey @@ -75,11 +73,6 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st return nil, fmt.Errorf("from block can't be a negative number (minimum value 0): got %d", pollPeriod) } - useBlockOffset := false - if strings.Contains(nodeURL, "avalanche-testnet") || strings.Contains(nodeURL, "base-testnet") { - useBlockOffset = true - } - if _, err := crypto.HexToECDSA(privateKey); err != nil { return nil, fmt.Errorf("the provided private key is not valid: got %s", privateKey) } @@ -126,7 +119,6 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st pollPeriod: pollPeriod, logger: logger, privateKey: privateKeyECDSA, - useBlockOffset: useBlockOffset, scheduler: *newScheduler(defaultSchedulerDelay), } @@ -156,21 +148,6 @@ func (tw *Worker) Listen(ctx context.Context) error { } }() - if tw.useBlockOffset { - tw.logger.Info().Msgf("RPC takes time to respond, will set closer blockOffset") - latestBlockNumber, err := tw.ethClient.BlockNumber(ctx) - if err != nil { - return err - } - tw.logger.Info().Msgf("Queried latest block number %v", latestBlockNumber) - if latestBlockNumber >= 10000 { - tw.fromBlock = new(big.Int).SetUint64(latestBlockNumber - 10000) - } else { - tw.fromBlock = new(big.Int).SetUint64(latestBlockNumber) - } - tw.logger.Info().Msgf("Setting from-block to %v", tw.fromBlock.String()) - } - // FilterQuery to be feed to the subscription and FilterLogs. query := ethereum.FilterQuery{ Addresses: tw.address,