Skip to content

Commit

Permalink
Merge pull request #73 from kaleido-io/more-endpoints
Browse files Browse the repository at this point in the history
Added query endpoints, refactored query handling into syncdispatcher
  • Loading branch information
peterbroadhurst authored Jan 21, 2022
2 parents afde10c + 4d675ce commit 9b005b6
Show file tree
Hide file tree
Showing 34 changed files with 3,041 additions and 376 deletions.
53 changes: 43 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
# firefly-fabconnect

A reliable REST and websocket API to interact with a Fabric network and stream events.

## Architecture

### High Level Components

![high level architecture](/images/arch-1.jpg)

### Objects and Flows

![objects and flows architecture](/images/arch-2.png)
![kafkal handler architecture](/images/arch-3.png)


The component provides 3 high level sets of API endpoints:

- Client MSPs (aka the wallet): registering and enrolling identities to be used for signing transactions
- Transactions: submit transactions and query for transaction result/receipts
- Events: subscribe to events with regex based filter and stream to the client app via websocket

## Getting Started

After checking out the repo, simply run `make` to build and test.

To launch, first prepare the 2 configurations files:

- sample main config file:

```json
Expand Down Expand Up @@ -54,12 +60,21 @@ To launch, first prepare the 2 configurations files:
- the standard Fabric common connection profile (CCP) file that describes the target Fabric network, at the location specified in the main config file above under `rpc.configPath`. For details on the CCP file, see [Fabric SDK documentation](https://hyperledger.github.io/fabric-sdk-node/release-1.4/tutorial-network-config.html). Note that the CCP file must contain the `client` section, which is required for the fabconnect to act as a client to Fabric networks.

Use the following command to launch the connector:

```
./fabconnect -f "/Users/me/Documents/ff-test/config.json"
```

### API Specification

The API spec can be accessed at the endpoint `/api`, which is presented in the Swagger UI.

![swagger ui](/images/swagger-ui.png)

### Hierarchical Configurations

Every configuration parameter can be specified in one of the following ways:

- configuration file that is specified with the `-f` command line parameter. this is overriden by...
- environment variables that follows the naming convention:
- given a configuration property in the configuration JSON "prop1.prop2"
Expand All @@ -71,15 +86,19 @@ Every configuration parameter can be specified in one of the following ways:
- the command line parameter should be `--prop1-prop2` or a shorthand variation

### Support for both Static and Dynamic Network Topology

There is support for using a full connection profile that describes the entire network, without relying on the peer's discovery service to discover the list of peers to send transaction proposals to. A sample connection profile can be seen in the folder [test/fixture/ccp.yml](/test/fixture/ccp.yml). This mode will be running if both `rpc.useGatewayClient` and `rpc.useGatewayServer` are missing or set to `false`.

There is also support for using the dynamic gateway client by relying on the peer's discovery service with a minimal connection profile. A sample connection profile can be seen in the folder [test/fixture/ccp-short.yml](/test/fixture/ccp-short.yml). This mode will be running if `rpc.useGatewayClient` is set to `true`.

Support for server-based gateway support, available in Fabric 2.4, is coming soon.

### Structured Data Support for Transaction Input with Schema Validation

When calling the `POST /transactions` endpoint, input data can be provided in any of the following formats:

- in the "traditional" array of strings corresponding to the target function's list of input parameters:

```json
POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-channel=default-channel&fly-chaincode=asset_transfer
{
Expand All @@ -90,7 +109,9 @@ POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-cha
"args": ["asset204", "red", "10", "Tom", "123000"]
}
```

- provide a `payloadSchema` property in the input payload `headers`, using [JSON Schema](https://json-schema.org/) to define the list of parameters. The root type must be an `array`, with `prefixItems` to define the sequence of parameters:

```json
POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-channel=default-channel&fly-chaincode=asset_transfer
{
Expand Down Expand Up @@ -121,7 +142,9 @@ POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-cha
}
}
```

- when using `payloadSchema`, complex parameter structures are supported. Suppose the `CreateAsset` function has the following signature:

```golang
type Asset struct {
ID string `json:"ID"`
Expand All @@ -141,7 +164,9 @@ func (s *SmartContract) CreateAsset(ctx contractapi.TransactionContextInterface,
// implementation...
}
```

Note that the `appraisal` parameter is a complex type, the transaction input data can be specified as follows:

```json
POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-channel=default-channel&fly-chaincode=asset_transfer
{
Expand Down Expand Up @@ -185,9 +210,11 @@ POST http://localhost:3000/transactions?fly-sync=true&fly-signer=user001&fly-cha
```

### JSON Data Support in Events

If a chaincode publishes events with string or JSON data, fabconnect can be instructed to decode them from the byte array before sending the event to the listening client application. The decoding instructions can be provided during subscription.

For example, the following chaincode publishes an event containing a JSON structure in the payload:

```golang
asset := Asset{
ID: id,
Expand All @@ -202,22 +229,25 @@ For example, the following chaincode publishes an event containing a JSON struct
assetJSON, _ := json.Marshal(asset)
ctx.GetStub().SetEvent("AssetCreated", assetJSON)
```

An event subscription can be created as follows which contains instructions to decode the payload bytes:

```json
{
"stream": "es-31e85b01-6440-4cc3-63e9-2aafc0d06466",
"channel": "default-channel",
"name": "sub-1",
"signer": "user001",
"fromBlock": "100",
"filter": {
"chaincodeId": "assettransfercomplex"
},
"payloadType": "stringifiedJSON"
"stream": "es-31e85b01-6440-4cc3-63e9-2aafc0d06466",
"channel": "default-channel",
"name": "sub-1",
"signer": "user001",
"fromBlock": "100",
"filter": {
"chaincodeId": "assettransfercomplex"
},
"payloadType": "stringifiedJSON"
}
```

Notice the `payloadType` property, which instructs fabconnect to decode the payload bytes into a JSON structure. As a result the client will receive the event JSON as follows:

```json
[
{
Expand All @@ -243,11 +273,13 @@ Notice the `payloadType` property, which instructs fabconnect to decode the payl
Besides `stringifiedJSON`, `string` is also supported as the payload type which represents UTF-8 encoded strings.

### Fixes Needed for multiple subscriptions under the same event stream

The current `fabric-sdk-go` uses an internal cache for event services, which builds keys only using the channel ID. This means if there are multiple subscriptions targeting the same channel, but specify different `fromBlock` parameters, only the first instance will be effective. All subsequent subscriptions will share the same event service, rendering their own `fromBlock` configuration ineffective.

A fix has been provided for this in the forked repository [https://github.com/kaleido-io/fabric-sdk-go](https://github.com/kaleido-io/fabric-sdk-go).

Follow these simple steps to integrate this fix (until it's contributed back to the official repo):

- clone the repository https://github.com/kaleido-io/fabric-sdk-go and place it peer to the `firefly-fabconnect` folder:
```
workspace-root
Expand All @@ -262,4 +294,5 @@ Follow these simple steps to integrate this fix (until it's contributed back to
- rebuild with `make`

### License

This project is licensed under the Apache 2 License - see the [`LICENSE`](LICENSE) file for details.
Binary file added images/swagger-ui.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 4 additions & 3 deletions internal/events/submanager_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Kaleido
// Copyright 2021 Kaleido

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import (

"github.com/hyperledger/firefly-fabconnect/internal/events/api"
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
"github.com/hyperledger/firefly-fabconnect/internal/fabric/test"
"github.com/hyperledger/firefly-fabconnect/internal/kvstore"
"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestActionChildCleanup(t *testing.T) {
dir := tempdir(t)
defer cleanup(t, dir)
sm := newTestSubscriptionManager()
sm.rpc = mockRPCClient("")
sm.rpc = test.MockRPCClient("")
sm.db = kvstore.NewLDBKeyValueStore(path.Join(dir, "db"))
_ = sm.db.Init()
defer sm.db.Close()
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestStreamAndSubscriptionErrors(t *testing.T) {
dir := tempdir(t)
defer cleanup(t, dir)
sm := newTestSubscriptionManager()
sm.rpc = mockRPCClient("")
sm.rpc = test.MockRPCClient("")
sm.db = kvstore.NewLDBKeyValueStore(path.Join(dir, "db"))
_ = sm.db.Init()
defer sm.db.Close()
Expand Down
13 changes: 10 additions & 3 deletions internal/events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) {
blockNumber := strconv.FormatUint(evt.BlockNumber, 10)
if ts, ok := s.ep.stream.blockTimestampCache.Get(blockNumber); ok {
// we found the timestamp for the block in our local cache, assert it's type and return, no need to query the chain
evt.Timestamp = ts.(int64)
timestamps := ts.([]int64)
evt.Timestamp = timestamps[evt.TransactionIndex]
return
}
// we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp
Expand All @@ -165,8 +166,14 @@ func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) {
evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp.
return
}
evt.Timestamp = block.Timestamp
s.ep.stream.blockTimestampCache.Add(blockNumber, evt.Timestamp)
// blocks in Fabric does not have a timestamp. instead only transactions have their own timestamps
// so each entry in the cache is a slice of (tx timestamp)
timestamps := make([]int64, len(block.Transactions))
for idx, tx := range block.Transactions {
timestamps[idx] = tx.Timestamp
}
s.ep.stream.blockTimestampCache.Add(blockNumber, timestamps)
evt.Timestamp = timestamps[evt.TransactionIndex]
}

func (s *subscription) unsubscribe(deleting bool) {
Expand Down
3 changes: 2 additions & 1 deletion internal/events/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"fmt"
"testing"

"github.com/hyperledger/firefly-fabconnect/internal/fabric/test"
"github.com/stretchr/testify/assert"
)

func TestCreateWebhookSub(t *testing.T) {
assert := assert.New(t)

rpc := mockRPCClient("")
rpc := test.MockRPCClient("")
m := &mockSubMgr{}
m.stream = newTestStream(m)

Expand Down
70 changes: 4 additions & 66 deletions internal/events/test_helper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Kaleido
// Copyright 2022 Kaleido

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,16 +22,10 @@ import (
"os"
"testing"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/firefly-fabconnect/internal/conf"
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
"github.com/hyperledger/firefly-fabconnect/internal/fabric/test"
"github.com/hyperledger/firefly-fabconnect/internal/kvstore"
mockfabric "github.com/hyperledger/firefly-fabconnect/mocks/fabric/client"
mockkvstore "github.com/hyperledger/firefly-fabconnect/mocks/kvstore"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -115,7 +109,7 @@ func newTestStream(submgr subscriptionManager) *eventStream {

func newTestSubscriptionManager() *subscriptionMGR {
smconf := &conf.EventstreamConf{}
rpc := mockRPCClient("")
rpc := test.MockRPCClient("")
sm := NewSubscriptionManager(smconf, rpc, newMockWebSocket()).(*subscriptionMGR)
sm.db = &mockkvstore.KVStore{}
sm.config.WebhooksAllowPrivateIPs = true
Expand Down Expand Up @@ -180,57 +174,8 @@ func testEvent(subID string) *eventData {
}
}

func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient {
rpc := &mockfabric.RPCClient{}
blockEventChan := make(chan *fab.BlockEvent)
ccEventChan := make(chan *fab.CCEvent)
var roBlockEventChan <-chan *fab.BlockEvent = blockEventChan
var roCCEventChan <-chan *fab.CCEvent = ccEventChan
res := &fab.BlockchainInfoResponse{
BCI: &common.BlockchainInfo{
Height: 10,
},
}
rawBlock := &utils.RawBlock{
Header: &common.BlockHeader{
Number: uint64(20),
},
}
block := &utils.Block{
Number: uint64(20),
Timestamp: int64(1000000),
}
rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil)
rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil)
rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil)
rpc.On("Unregister", mock.Anything).Return()

go func() {
if fromBlock == "0" {
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(1),
}
}
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(11),
}
ccEventChan <- &fab.CCEvent{
BlockNumber: uint64(10),
TxID: "3144a3ad43dcc11374832bbb71561320de81fd80d69cc8e26a9ea7d3240a5e84",
ChaincodeID: "asset_transfer",
}
if len(withReset) > 0 {
blockEventChan <- &fab.BlockEvent{
Block: constructBlock(11),
}
}
}()

return rpc
}

func setupTestSubscription(sm *subscriptionMGR, stream *eventStream, subscriptionName, fromBlock string, withReset ...bool) *eventsapi.SubscriptionInfo {
rpc := mockRPCClient(fromBlock, withReset...)
rpc := test.MockRPCClient(fromBlock, withReset...)
sm.rpc = rpc
spec := &eventsapi.SubscriptionInfo{
Name: subscriptionName,
Expand All @@ -243,10 +188,3 @@ func setupTestSubscription(sm *subscriptionMGR, stream *eventStream, subscriptio

return spec
}

func constructBlock(number uint64) *common.Block {
mockTx := eventmocks.NewTransactionWithCCEvent("testTxID", peer.TxValidationCode_VALID, "testChaincodeID", "testCCEventName", []byte("testPayload"))
mockBlock := eventmocks.NewBlock("testChannelID", mockTx)
mockBlock.Header.Number = number
return mockBlock
}
4 changes: 1 addition & 3 deletions internal/fabric/client/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package client

import (
//nolint

"github.com/hyperledger/fabric-sdk-go/pkg/client/ledger"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core"
Expand Down Expand Up @@ -90,7 +88,7 @@ func (l *ledgerClientWrapper) queryTransaction(channelId, signer, txId string) (
}

ret := make(map[string]interface{})
ret["tx"] = tx
ret["transaction"] = tx
ret["raw"] = envelope
return ret, nil
}
Expand Down
Loading

0 comments on commit 9b005b6

Please sign in to comment.