Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Generic subscription options while adding contract listener #1396

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.

Check failure on line 1 in internal/blockchain/ethereum/ethereum.go

View workflow job for this annotation

GitHub Actions / build

Expected:2024, Actual: 2023 Kaleido, Inc. (goheader)
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -888,10 +888,12 @@

subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
var subscriptionOptions *fftypes.JSONAny
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
subscriptionOptions = listener.Options.SubscriptionOptions
}
result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi, subscriptionOptions)
if err != nil {
return err
}
Expand Down
52 changes: 52 additions & 0 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,58 @@ func TestAddSubscription(t *testing.T) {
assert.NoError(t, err)
}

func TestAddSubscriptionGenericOptionsPassed(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListener{
Location: fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
}.String()),
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "Changed",
Params: fftypes.FFIParams{
{
Name: "value",
Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`),
},
},
},
},
Options: &core.ContractListenerOptions{
FirstEvent: string(core.SubOptsFirstEventOldest),
SubscriptionOptions: fftypes.JSONAnyPtr(`{ "genericOption": "generic" }`),
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
func(req *http.Request) (*http.Response, error) {

var reqBody subscription
if err := json.NewDecoder(req.Body).Decode(&reqBody); err != nil {
return httpmock.NewStringResponse(400, ""), err
}

expectedOptions := sub.Options.SubscriptionOptions.JSONObject()
actualOptions := reqBody.Options.JSONObject()

assert.Equal(t, expectedOptions, actualOptions)

return httpmock.NewJsonResponse(200, &subscription{})
})

err := e.AddContractListener(context.Background(), sub)

assert.NoError(t, err)
}

func TestAddSubscriptionWithoutLocation(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down
6 changes: 4 additions & 2 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type subscription struct {
EthCompatAddress string `json:"address,omitempty"`
EthCompatEvent *abi.Entry `json:"event,omitempty"`
Filters []fftypes.JSONAny `json:"filters"`
Options *fftypes.JSONAny `json:"options"`
subscriptionCheckpoint
}

Expand Down Expand Up @@ -182,7 +183,7 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) {
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, options *fftypes.JSONAny) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch firstEvent {
case string(core.SubOptsFirstEventOldest):
Expand All @@ -195,6 +196,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
Stream: stream,
FromBlock: firstEvent,
EthCompatEvent: abi,
Options: options,
}

if location != nil {
Expand Down Expand Up @@ -267,7 +269,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
name = v1Name
}
location := &Location{Address: instancePath}
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, nil); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/contract_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.

Check failure on line 1 in pkg/core/contract_listener.go

View workflow job for this annotation

GitHub Actions / build

Expected:2023, Actual: 2022 Kaleido, Inc. (goheader)
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -44,7 +44,8 @@
Status interface{} `ffstruct:"ContractListenerWithStatus" json:"status,omitempty" ffexcludeinput:"true"`
}
type ContractListenerOptions struct {
FirstEvent string `ffstruct:"ContractListenerOptions" json:"firstEvent,omitempty"`
FirstEvent string `ffstruct:"ContractListenerOptions" json:"firstEvent,omitempty"`
SubscriptionOptions *fftypes.JSONAny `ffstruct:"ContractListenerOptions" json:"subscriptionOptions,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling with what this field should be named. For context, in the older ETHConnect codebase we used to call these "subscriptions". In the newer FFTM API (which EVMConnect is based on), we now refer to these as "listeners" to help distinguish them from FireFly Subscriptions (which are a higher level object). So this field actually ends up maping to Listener.Options
https://github.com/hyperledger/firefly-transaction-manager/blob/fe5ace887c9f541c3053fd31d937d46ce6adc3c9/pkg/apitypes/api_types.go#L212

But calling this field ContractListenerOptions.ListenerOptions seems a bit redundant 🤔

@awrichar or @peterbroadhurst Do you have any suggestions here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nguyer - sorry I missed this before.

I think the consistent pattern here with other things, would be that the options are passed through to EVMConnect. If you look at invoke APIs, or the tokens - that's what we do I believe.

}

type ListenerStatusError struct {
Expand Down
Loading