Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fallback to polling when RPC fails #226

Merged
merged 2 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions relayer/chains/evm/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (p *Provider) startFromHeight(ctx context.Context, lastSavedHeight uint64)
}

// Subscribe listens to new blocks and sends them to the channel
func (p *Provider) Subscribe(ctx context.Context, ticker *time.Ticker, blockInfoChan chan *relayertypes.BlockInfo) {
func (p *Provider) Subscribe(ctx context.Context, ticker *time.Ticker, blockInfoChan chan *relayertypes.BlockInfo) error {
ch := make(chan ethTypes.Log)
sub, err := p.client.Subscribe(ctx, ethereum.FilterQuery{
Addresses: p.blockReq.Addresses,
Expand All @@ -254,18 +254,18 @@ func (p *Provider) Subscribe(ctx context.Context, ticker *time.Ticker, blockInfo
if err != nil {
p.log.Error("failed to subscribe", zap.Error(err))
ticker.Reset(time.Second * 3)
return
return err
}
defer sub.Unsubscribe()
p.log.Info("Subscribed to new blocks", zap.Uint64("from", p.blockReq.FromBlock.Uint64()), zap.Any("address", p.blockReq.Addresses))
for {
select {
case <-ctx.Done():
return
return nil
case err := <-sub.Err():
p.log.Error("subscription error", zap.Error(err))
ticker.Reset(time.Second * 3)
return
return p.Listener(ctx, p.GetLastSavedBlockHeight(), blockInfoChan)
case log := <-ch:
message, err := p.getRelayMessageFromLog(log)
if err != nil {
Expand Down
29 changes: 20 additions & 9 deletions relayer/chains/evm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ type Config struct {
}

type Provider struct {
client IClient
log *zap.Logger
cfg *Config
StartHeight uint64
blockReq ethereum.FilterQuery
wallet *keystore.Key
kms kms.KMS
contracts map[string]providerTypes.EventMap
NonceTracker types.NonceTrackerI
client IClient
log *zap.Logger
cfg *Config
StartHeight uint64
blockReq ethereum.FilterQuery
wallet *keystore.Key
kms kms.KMS
contracts map[string]providerTypes.EventMap
NonceTracker types.NonceTrackerI
LastSavedHeightFunc func() uint64
}

func (p *Config) NewProvider(ctx context.Context, log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) {
Expand Down Expand Up @@ -439,3 +440,13 @@ func (p *Provider) EstimateGas(ctx context.Context, message *providerTypes.Messa
}
return p.client.EstimateGas(ctx, msg)
}

// SetLastSavedBlockHeightFunc sets the function to save the last saved block height
func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
p.LastSavedHeightFunc = f
}

// GetLastSavedBlockHeight returns the last saved block height
func (p *Provider) GetLastSavedBlockHeight() uint64 {
return p.LastSavedHeightFunc()
}
6 changes: 3 additions & 3 deletions relayer/chains/icon/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, incomin
Height: types.NewHexInt(processedheight),
EventFilter: p.GetMonitorEventFilters(),
Logs: types.NewHexInt(1),
ProgressInterval: types.NewHexInt(15),
ProgressInterval: types.NewHexInt(25),
}

for {
Expand All @@ -76,7 +76,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, incomin
return err
}
if height > 0 {
eventReq.Height = types.NewHexInt(height + 1)
eventReq.Height = types.NewHexInt(height)
}
return nil
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, incomin
if errors.Is(err, context.Canceled) {
return
}
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 3)
reconnect()
p.log.Warn("error occured during monitor event", zap.Error(err))
}
Expand Down
23 changes: 17 additions & 6 deletions relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ func (c *Config) Enabled() bool {
}

type Provider struct {
log *zap.Logger
cfg *Config
wallet module.Wallet
client *Client
kms kms.KMS
contracts map[string]providerTypes.EventMap
log *zap.Logger
cfg *Config
wallet module.Wallet
client *Client
kms kms.KMS
contracts map[string]providerTypes.EventMap
LastSavedHeightFunc func() uint64
}

func (p *Provider) NID() string {
Expand Down Expand Up @@ -258,3 +259,13 @@ func (p *Provider) ExecuteRollback(ctx context.Context, sn uint64) error {
}
return nil
}

// SetLastSavedBlockHeightFunc sets the function to save the last saved block height
func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
p.LastSavedHeightFunc = f
}

// GetLastSavedBlockHeight returns the last saved block height
func (p *Provider) GetLastSavedBlockHeight() uint64 {
return p.LastSavedHeightFunc()
}
46 changes: 27 additions & 19 deletions relayer/chains/wasm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
var _ provider.ChainProvider = (*Provider)(nil)

type Provider struct {
logger *zap.Logger
cfg *Config
client IClient
kms kms.KMS
wallet sdkTypes.AccountI
contracts map[string]relayTypes.EventMap
eventList []sdkTypes.Event
logger *zap.Logger
cfg *Config
client IClient
kms kms.KMS
wallet sdkTypes.AccountI
contracts map[string]relayTypes.EventMap
eventList []sdkTypes.Event
LastSavedHeightFunc func() uint64
}

func (p *Provider) QueryLatestHeight(ctx context.Context) (uint64, error) {
Expand Down Expand Up @@ -119,16 +120,13 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn
return ctx.Err()
case <-subscribeStarter.C:
subscribeStarter.Stop()
go p.SubscribeMessageEvents(ctx, blockInfoChan, &types.SubscribeOpts{
Address: p.cfg.Contracts[relayTypes.ConnectionContract],
Method: EventTypeWasmMessage,
Height: latestHeight,
})
go p.SubscribeMessageEvents(ctx, blockInfoChan, &types.SubscribeOpts{
Address: p.cfg.Contracts[relayTypes.XcallContract],
Method: EventTypeWasmCallMessage,
Height: latestHeight,
})
for _, event := range p.contracts {
go p.SubscribeMessageEvents(ctx, blockInfoChan, &types.SubscribeOpts{
Address: event.Address,
Method: event.GetWasmMsgType(),
Height: latestHeight,
})
}
default:
if startHeight < latestHeight {
p.logger.Debug("Query started", zap.Uint64("from-height", startHeight), zap.Uint64("to-height", latestHeight))
Expand Down Expand Up @@ -781,9 +779,19 @@ func (p *Provider) SubscribeMessageEvents(ctx context.Context, blockInfoChan cha
time.Sleep(time.Second * 1)
continue
}
p.logger.Debug("http client reconnected")
return p.SubscribeMessageEvents(ctx, blockInfoChan, opts)
p.logger.Info("http client reconnected")
return p.Listener(ctx, p.GetLastSavedHeight(), blockInfoChan)
}
}
}
}

// SetLastSavedHeightFunc sets the function to save the last saved height
func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
p.LastSavedHeightFunc = f
}

// GetLastSavedHeight returns the last saved height
func (p *Provider) GetLastSavedHeight() uint64 {
return p.LastSavedHeightFunc()
}
1 change: 1 addition & 0 deletions relayer/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ChainProvider interface {
Route(ctx context.Context, message *types.Message, callback types.TxResponseFunc) error
ShouldReceiveMessage(ctx context.Context, message *types.Message) (bool, error)
ShouldSendMessage(ctx context.Context, message *types.Message) (bool, error)
SetLastSavedHeightFunc(func() uint64)
MessageReceived(ctx context.Context, key *types.MessageKey) (bool, error)
SetAdmin(context.Context, string) error

Expand Down
4 changes: 3 additions & 1 deletion relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func NewRelayer(log *zap.Logger, db store.Store, chains map[string]*Chain, fresh
chainRuntime.LastSavedHeight = lastSavedHeight
}
chainRuntimes[chain.NID()] = chainRuntime

chainRuntime.Provider.SetLastSavedHeightFunc(func() uint64 {
return chainRuntime.LastSavedHeight
})
}

return &Relayer{
Expand Down
10 changes: 9 additions & 1 deletion relayer/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
XcallContract = "xcall"
ConnectionContract = "connection"
SupportedContracts = []string{XcallContract, ConnectionContract}
RetryInterval = 5 * time.Second
RetryInterval = 3 * time.Second
)

type BlockInfo struct {
Expand Down Expand Up @@ -54,6 +54,14 @@ func (c ContractConfigMap) Validate() error {
return nil
}

// GetWasmMsgType returns the wasm message type
func (m *EventMap) GetWasmMsgType() string {
for wasmType := range m.SigType {
return wasmType
}
return ""
}

func (m *Message) MessageKey() *MessageKey {
return NewMessageKey(m.Sn, m.Src, m.Dst, m.EventType)
}
Expand Down
Loading