Skip to content

Commit

Permalink
Merge pull request #141 from hyperledger/receipt-query
Browse files Browse the repository at this point in the history
Update TranscationReceipt to include decoding of events
  • Loading branch information
Chengxuan authored Jun 14, 2024
2 parents 3f1f078 + 6fe5ef7 commit 655c326
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 135 deletions.
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50`
|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`false`
|notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50`
|receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10`
|required|Number of confirmations required to consider a transaction/event final|`int`|`20`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/hashicorp/golang-lru v1.0.2
github.com/hyperledger/firefly-common v1.4.8
github.com/hyperledger/firefly-signer v1.1.13
github.com/hyperledger/firefly-transaction-manager v1.3.14
github.com/hyperledger/firefly-transaction-manager v1.3.15
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/hyperledger/firefly-common v1.4.8 h1:0o1Qp1c5YzQo8nbnX+gAo9SVd2tR4Z9U
github.com/hyperledger/firefly-common v1.4.8/go.mod h1:dXewcVMFNON2SvQ1UPvu64OWUt77+M3p8qy61lT1kE4=
github.com/hyperledger/firefly-signer v1.1.13 h1:eiHjc6HPRG8AzXUCUgm51qqX1I9BokiuiiqJ89XwK4M=
github.com/hyperledger/firefly-signer v1.1.13/go.mod h1:pK6kivzBFSue3zpJSQpH67VasnLLbwBJOBUNv0zHbRA=
github.com/hyperledger/firefly-transaction-manager v1.3.14 h1:qK5wFQhEkZosPd/rvlcVHiSc+5ZCl+LEgpKk2a+9wKw=
github.com/hyperledger/firefly-transaction-manager v1.3.14/go.mod h1:N3BoHh8+dWG710oQKuNiXmJNEOBBeLTsQ8GpZ41vhog=
github.com/hyperledger/firefly-transaction-manager v1.3.15 h1:IyWIId+uytqjIRMxROk5OqOcdHMzJFGFKpQQybiISOU=
github.com/hyperledger/firefly-transaction-manager v1.3.15/go.mod h1:N3BoHh8+dWG710oQKuNiXmJNEOBBeLTsQ8GpZ41vhog=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
35 changes: 35 additions & 0 deletions internal/ethereum/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,41 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibility

}

func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *testing.T) {

_, c, mRPC, done := newTestConnector(t)
bl := c.blockListener
bl.blockPollingInterval = 1 * time.Microsecond
bl.hederaCompatibilityMode = true

block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef")

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(1000)
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*[]ethtypes.HexBytes0xPrefix)
*hbh = []ethtypes.HexBytes0xPrefix{
block1003Hash,
}
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
go done() // Close after we've processed the log
})

bl.checkStartedLocked()

c.WaitClosed()

mRPC.AssertExpectations(t)

}

func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode(t *testing.T) {

_, c, mRPC, done := newTestConnector(t)
Expand Down
144 changes: 144 additions & 0 deletions internal/ethereum/event_enricher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright © 2024 Kaleido, Inl.c.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"bytes"
"context"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

type eventEnricher struct {
connector *ethConnector
extractSigner bool
}

func (ee *eventEnricher) filterEnrichEthLog(ctx context.Context, f *eventFilter, methods []*abi.Entry, ethLog *logJSONRPC) (_ *ffcapi.Event, matched bool, decoded bool, err error) {

// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
transactionIndex := ethLog.TransactionIndex.BigInt().Int64()
logIndex := ethLog.LogIndex.BigInt().Int64()
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
log.L(ctx).Debugf("skipping event '%s' topicMatches=%t addrMatches=%t", protoID, topicMatches, addrMatches)
return nil, matched, decoded, nil
}
matched = true

log.L(ctx).Infof("detected event '%s'", protoID)
data, decoded := ee.decodeLogData(ctx, f.Event, ethLog.Topics, ethLog.Data)

info := eventInfo{
logJSONRPC: *ethLog,
}

var timestamp *fftypes.FFTime
if ee.connector.eventBlockTimestamps {
bi, err := ee.connector.blockListener.getBlockInfoByHash(ctx, ethLog.BlockHash.String())
if bi == nil || err != nil {
log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err)
return nil, matched, decoded, err // This is an error condition, rather than just something we cannot enrich
}
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
}

if len(methods) > 0 || ee.extractSigner {
txInfo, err := ee.connector.getTransactionInfo(ctx, ethLog.TransactionHash)
if txInfo == nil || err != nil {
if txInfo == nil {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': transaction hash not found", ethLog.TransactionHash)
} else {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err)
}
return nil, matched, decoded, err // This is an error condition, rather than just something we cannot enrich
}
if ee.extractSigner {
info.InputSigner = txInfo.From
}
if len(methods) > 0 {
ee.matchMethod(ctx, methods, txInfo, &info)
}
}

signature := f.Signature
return &ffcapi.Event{
ID: ffcapi.EventID{
Signature: signature,
BlockHash: ethLog.BlockHash.String(),
TransactionHash: ethLog.TransactionHash.String(),
BlockNumber: fftypes.FFuint64(blockNumber),
TransactionIndex: fftypes.FFuint64(transactionIndex),
LogIndex: fftypes.FFuint64(logIndex),
Timestamp: timestamp,
},
Info: &info,
Data: data,
}, matched, decoded, nil
}

func (ee *eventEnricher) decodeLogData(ctx context.Context, event *abi.Entry, topics []ethtypes.HexBytes0xPrefix, data ethtypes.HexBytes0xPrefix) (*fftypes.JSONAny, bool) {
var b []byte
v, err := event.DecodeEventDataCtx(ctx, topics, data)
if err == nil {
b, err = ee.connector.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Errorf("Failed to process event log: %s", err)
return nil, false
}
return fftypes.JSONAnyPtrBytes(b), true
}

func (ee *eventEnricher) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo *txInfoJSONRPC, info *eventInfo) {
if len(txInfo.Input) < 4 {
log.L(ctx).Debugf("No function selector available for TX '%s'", txInfo.Hash)
return
}
functionID := txInfo.Input[0:4]
var method *abi.Entry
for _, m := range methods {
if bytes.Equal(m.FunctionSelectorBytes(), functionID) {
method = m
break
}
}
if method == nil {
log.L(ctx).Debugf("Function selector '%s' for TX '%s' does not match any of the supplied methods", functionID.String(), txInfo.Hash)
return
}
info.InputMethod = method.String()
v, err := method.DecodeCallDataCtx(ctx, txInfo.Input)
var b []byte
if err == nil {
b, err = ee.connector.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Warnf("Failed to decode input for TX '%s' using '%s'", txInfo.Hash, info.InputMethod)
return
}
info.InputArgs = fftypes.JSONAnyPtrBytes(b)
}
111 changes: 8 additions & 103 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package ethereum

import (
"bytes"
"context"
"encoding/json"
"math/big"
Expand Down Expand Up @@ -59,6 +58,7 @@ type listener struct {
id *fftypes.UUID
c *ethConnector
es *eventStream
ee *eventEnricher
hwmMux sync.Mutex // Protects checkpoint of an individual listener. May hold ES lock when taking this, must NOT attempt to obtain ES lock while holding this
hwmBlock int64
config listenerConfig
Expand Down Expand Up @@ -239,124 +239,29 @@ func (l *listener) listenerCatchupLoop() {
}
}

func (l *listener) decodeLogData(ctx context.Context, event *abi.Entry, topics []ethtypes.HexBytes0xPrefix, data ethtypes.HexBytes0xPrefix) *fftypes.JSONAny {
var b []byte
v, err := event.DecodeEventDataCtx(ctx, topics, data)
if err == nil {
b, err = l.c.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Errorf("Failed to process event log: %s", err)
return nil
}
return fftypes.JSONAnyPtrBytes(b)
}

func (l *listener) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo *txInfoJSONRPC, info *eventInfo) {
if len(txInfo.Input) < 4 {
log.L(ctx).Debugf("No function selector available for TX '%s'", txInfo.Hash)
return
}
functionID := txInfo.Input[0:4]
var method *abi.Entry
for _, m := range methods {
if bytes.Equal(m.FunctionSelectorBytes(), functionID) {
method = m
break
}
}
if method == nil {
log.L(ctx).Debugf("Function selector '%s' for TX '%s' does not match any of the supplied methods", functionID.String(), txInfo.Hash)
return
}
info.InputMethod = method.String()
v, err := method.DecodeCallDataCtx(ctx, txInfo.Input)
var b []byte
if err == nil {
b, err = l.c.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Warnf("Failed to decode input for TX '%s' using '%s'", txInfo.Hash, info.InputMethod)
return
}
info.InputArgs = fftypes.JSONAnyPtrBytes(b)
}

func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool, error) {
func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, methods []*abi.Entry, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool, error) {

// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
transactionIndex := ethLog.TransactionIndex.BigInt().Int64()
logIndex := ethLog.LogIndex.BigInt().Int64()
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)
if blockNumber < l.hwmBlock {
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, protoID, l.hwmBlock)
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, getEventProtoID(blockNumber, transactionIndex, logIndex), l.hwmBlock)
return nil, false, nil
}

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
log.L(ctx).Debugf("Listener %s skipping event '%s' topicMatches=%t addrMatches=%t", l.id, protoID, topicMatches, addrMatches)
return nil, false, nil
e, matched, _, err := l.ee.filterEnrichEthLog(ctx, f, methods, ethLog)
if !matched || err != nil {
return nil, false, err
}

log.L(ctx).Infof("Listener %s detected event '%s'", l.id, protoID)
data := l.decodeLogData(ctx, f.Event, ethLog.Topics, ethLog.Data)

info := eventInfo{
logJSONRPC: *ethLog,
}

var timestamp *fftypes.FFTime
if l.c.eventBlockTimestamps {
bi, err := l.c.blockListener.getBlockInfoByHash(ctx, ethLog.BlockHash.String())
if bi == nil || err != nil {
log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err)
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
}

if len(l.config.options.Methods) > 0 || l.config.options.Signer {
txInfo, err := l.c.getTransactionInfo(ctx, ethLog.TransactionHash)
if txInfo == nil || err != nil {
if txInfo == nil {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': transaction hash not found", ethLog.TransactionHash)
} else {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err)
}
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
if l.config.options.Signer {
info.InputSigner = txInfo.From
}
if len(l.config.options.Methods) > 0 {
l.matchMethod(ctx, l.config.options.Methods, txInfo, &info)
}
}

signature := f.Signature
e.ID.ListenerID = l.id
return &ffcapi.ListenerEvent{
Checkpoint: &listenerCheckpoint{
Block: blockNumber,
TransactionIndex: transactionIndex,
LogIndex: logIndex,
},
Event: &ffcapi.Event{
ID: ffcapi.EventID{
ListenerID: l.id,
Signature: signature,
BlockHash: ethLog.BlockHash.String(),
TransactionHash: ethLog.TransactionHash.String(),
BlockNumber: fftypes.FFuint64(blockNumber),
TransactionIndex: fftypes.FFuint64(transactionIndex),
LogIndex: fftypes.FFuint64(logIndex),
Timestamp: timestamp,
},
Info: &info,
Data: data,
},
Event: e,
}, true, nil
}
Loading

0 comments on commit 655c326

Please sign in to comment.