diff --git a/db/migrations/postgres/000056_drop_transactions_columns.down.sql b/db/migrations/postgres/000056_refactor_transactions_columns.down.sql similarity index 81% rename from db/migrations/postgres/000056_drop_transactions_columns.down.sql rename to db/migrations/postgres/000056_refactor_transactions_columns.down.sql index c9208a5f8..fa6aa34f0 100644 --- a/db/migrations/postgres/000056_drop_transactions_columns.down.sql +++ b/db/migrations/postgres/000056_refactor_transactions_columns.down.sql @@ -7,4 +7,7 @@ ALTER TABLE transactions ADD COLUMN info BYTEA; CREATE INDEX transactions_protocol_id ON transactions(protocol_id); CREATE INDEX transactions_ref ON transactions(ref); + +DROP INDEX transactions_blockchain_ids; +ALTER TABLE transactions DROP COLUMN blockchain_ids; COMMIT; diff --git a/db/migrations/postgres/000056_drop_transactions_columns.up.sql b/db/migrations/postgres/000056_refactor_transactions_columns.up.sql similarity index 68% rename from db/migrations/postgres/000056_drop_transactions_columns.up.sql rename to db/migrations/postgres/000056_refactor_transactions_columns.up.sql index 22c8ee5b4..6b04cf7ec 100644 --- a/db/migrations/postgres/000056_drop_transactions_columns.up.sql +++ b/db/migrations/postgres/000056_refactor_transactions_columns.up.sql @@ -7,4 +7,7 @@ ALTER TABLE transactions DROP COLUMN signer; ALTER TABLE transactions DROP COLUMN hash; ALTER TABLE transactions DROP COLUMN protocol_id; ALTER TABLE transactions DROP COLUMN info; + +ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024); +CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids); COMMIT; diff --git a/db/migrations/sqlite/000056_drop_transactions_columns.down.sql b/db/migrations/sqlite/000056_refactor_transactions_columns.down.sql similarity index 81% rename from db/migrations/sqlite/000056_drop_transactions_columns.down.sql rename to db/migrations/sqlite/000056_refactor_transactions_columns.down.sql index 9712c8584..b795e4c4e 100644 --- a/db/migrations/sqlite/000056_drop_transactions_columns.down.sql +++ b/db/migrations/sqlite/000056_refactor_transactions_columns.down.sql @@ -6,3 +6,6 @@ ALTER TABLE transactions ADD COLUMN info BYTEA; CREATE INDEX transactions_protocol_id ON transactions(protocol_id); CREATE INDEX transactions_ref ON transactions(ref); + +DROP INDEX transactions_blockchain_ids; +ALTER TABLE transactions DROP COLUMN blockchain_ids; diff --git a/db/migrations/sqlite/000056_drop_transactions_columns.up.sql b/db/migrations/sqlite/000056_refactor_transactions_columns.up.sql similarity index 67% rename from db/migrations/sqlite/000056_drop_transactions_columns.up.sql rename to db/migrations/sqlite/000056_refactor_transactions_columns.up.sql index ef85b5eca..57d264399 100644 --- a/db/migrations/sqlite/000056_drop_transactions_columns.up.sql +++ b/db/migrations/sqlite/000056_refactor_transactions_columns.up.sql @@ -6,3 +6,6 @@ ALTER TABLE transactions DROP COLUMN signer; ALTER TABLE transactions DROP COLUMN hash; ALTER TABLE transactions DROP COLUMN protocol_id; ALTER TABLE transactions DROP COLUMN info; + +ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024); +CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids); diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 84d27cb11..5fcc64e0e 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -5078,6 +5078,10 @@ paths: application/json: schema: properties: + blockchainIds: + items: + type: string + type: array created: {} id: {} namespace: @@ -9837,6 +9841,11 @@ paths: schema: default: 120s type: string + - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' + in: query + name: blockchainids + schema: + type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query name: created @@ -9902,6 +9911,10 @@ paths: application/json: schema: properties: + blockchainIds: + items: + type: string + type: array created: {} id: {} namespace: @@ -9945,6 +9958,11 @@ paths: schema: default: 120s type: string + - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' + in: query + name: blockchainids + schema: + type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query name: created @@ -10010,6 +10028,10 @@ paths: application/json: schema: properties: + blockchainIds: + items: + type: string + type: array created: {} id: {} namespace: diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 181c8cf52..8d62b160d 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -294,7 +294,7 @@ func TestCalcPinsFail(t *testing.T) { Messages: []*fftypes.Message{ {Header: fftypes.MessageHeader{ Group: gid, - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }}, }, }, diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 8e8f3e1a8..a68709941 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -188,8 +188,10 @@ func ethHexFormatB32(b *fftypes.Bytes32) string { func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { sBlockNumber := msgJSON.GetString("blockNumber") - sTransactionIndex := msgJSON.GetString("transactionIndex") sTransactionHash := msgJSON.GetString("transactionHash") + blockNumber := msgJSON.GetInt64("blockNumber") + txIndex := msgJSON.GetInt64("transactionIndex") + logIndex := msgJSON.GetInt64("logIndex") dataJSON := msgJSON.GetObject("data") authorAddress := dataJSON.GetString("author") ns := dataJSON.GetString("namespace") @@ -205,7 +207,6 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON } if sBlockNumber == "" || - sTransactionIndex == "" || sTransactionHash == "" || authorAddress == "" || sUUIDs == "" || @@ -257,12 +258,13 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON BatchPayloadRef: sPayloadRef, Contexts: contexts, Event: blockchain.Event{ - Source: e.Name(), - Name: "BatchPin", - ProtocolID: sTransactionHash, - Output: dataJSON, - Info: msgJSON, - Timestamp: timestamp, + BlockchainTXID: sTransactionHash, + Source: e.Name(), + Name: "BatchPin", + ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex), + Output: dataJSON, + Info: msgJSON, + Timestamp: timestamp, }, } @@ -272,6 +274,9 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { sTransactionHash := msgJSON.GetString("transactionHash") + blockNumber := msgJSON.GetInt64("blockNumber") + txIndex := msgJSON.GetInt64("transactionIndex") + logIndex := msgJSON.GetInt64("logIndex") sub := msgJSON.GetString("subId") signature := msgJSON.GetString("signature") dataJSON := msgJSON.GetObject("data") @@ -284,19 +289,20 @@ func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSON } delete(msgJSON, "data") - event := &blockchain.ContractEvent{ + event := &blockchain.EventWithSubscription{ Subscription: sub, Event: blockchain.Event{ - Source: e.Name(), - Name: name, - ProtocolID: sTransactionHash, - Output: dataJSON, - Info: msgJSON, - Timestamp: timestamp, + BlockchainTXID: sTransactionHash, + Source: e.Name(), + Name: name, + ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex), + Output: dataJSON, + Info: msgJSON, + Timestamp: timestamp, }, } - return e.callbacks.ContractEvent(event) + return e.callbacks.BlockchainEvent(event) } func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error { @@ -321,7 +327,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) updateType = fftypes.OpStatusFailed } l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message) - return e.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply) + return e.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply) } func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{}) error { diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index b623d6a11..8ce0b6219 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -983,6 +983,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { em.On("BlockchainOpUpdate", operationID, fftypes.OpStatusSucceeded, + "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8", "", mock.Anything).Return(nil) @@ -1022,6 +1023,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { txsu := em.On("BlockchainOpUpdate", operationID, fftypes.OpStatusFailed, + "", "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument", mock.Anything).Return(fmt.Errorf("Shutdown")) done := make(chan struct{}) @@ -1292,7 +1294,11 @@ func TestHandleMessageContractEvent(t *testing.T) { ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5", } - em.On("ContractEvent", mock.Anything).Return(nil) + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID) + assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID) + return true + })).Return(nil) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -1300,7 +1306,7 @@ func TestHandleMessageContractEvent(t *testing.T) { err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - ev := em.Calls[0].Arguments[0].(*blockchain.ContractEvent) + ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription) assert.Equal(t, "sub2", ev.Subscription) assert.Equal(t, "Changed", ev.Event.Name) @@ -1351,7 +1357,7 @@ func TestHandleMessageContractEventNoTimestamp(t *testing.T) { ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5", } - em.On("ContractEvent", mock.Anything).Return(nil) + em.On("BlockchainEvent", mock.Anything).Return(nil) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -1387,7 +1393,7 @@ func TestHandleMessageContractEventError(t *testing.T) { ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5", } - em.On("ContractEvent", mock.Anything).Return(fmt.Errorf("pop")) + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) diff --git a/internal/blockchain/ethereum/ffi_param_validator_test.go b/internal/blockchain/ethereum/ffi_param_validator_test.go index 0352760cc..ab8298e2f 100644 --- a/internal/blockchain/ethereum/ffi_param_validator_test.go +++ b/internal/blockchain/ethereum/ffi_param_validator_test.go @@ -108,7 +108,7 @@ func TestSchemaTypeInvalidFFIType(t *testing.T) { func TestSchemaTypeMissing(t *testing.T) { _, err := NewTestSchema(`{}`) - assert.Regexp(t, "missing properties: 'type'", err) + assert.Regexp(t, "missing properties", err) } func TestSchemaDetailsTypeMissing(t *testing.T) { diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index fb2c246f9..4e6a978e3 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -265,6 +265,9 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb } sTransactionHash := msgJSON.GetString("transactionId") + blockNumber := msgJSON.GetInt64("blockNumber") + transactionIndex := msgJSON.GetInt64("transactionIndex") + eventIndex := msgJSON.GetInt64("eventIndex") signer := payload.GetString("signer") ns := payload.GetString("namespace") sUUIDs := payload.GetString("uuids") @@ -315,12 +318,13 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb BatchPayloadRef: sPayloadRef, Contexts: contexts, Event: blockchain.Event{ - Source: f.Name(), - Name: "BatchPin", - ProtocolID: sTransactionHash, - Output: *payload, - Info: msgJSON, - Timestamp: fftypes.UnixTime(timestamp), + BlockchainTXID: sTransactionHash, + Source: f.Name(), + Name: "BatchPin", + ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex), + Output: *payload, + Info: msgJSON, + Timestamp: fftypes.UnixTime(timestamp), }, } @@ -337,6 +341,9 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb delete(msgJSON, "payload") sTransactionHash := msgJSON.GetString("transactionId") + blockNumber := msgJSON.GetInt64("blockNumber") + transactionIndex := msgJSON.GetInt64("transactionIndex") + eventIndex := msgJSON.GetInt64("eventIndex") sub := msgJSON.GetString("subId") name := msgJSON.GetString("eventName") sTimestamp := msgJSON.GetString("timestamp") @@ -346,19 +353,20 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb // Continue with zero timestamp } - event := &blockchain.ContractEvent{ + event := &blockchain.EventWithSubscription{ Subscription: sub, Event: blockchain.Event{ - Source: f.Name(), - Name: name, - ProtocolID: sTransactionHash, - Output: *payload, - Info: msgJSON, - Timestamp: fftypes.UnixTime(timestamp), + BlockchainTXID: sTransactionHash, + Source: f.Name(), + Name: name, + ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex), + Output: *payload, + Info: msgJSON, + Timestamp: fftypes.UnixTime(timestamp), }, } - return f.callbacks.ContractEvent(event) + return f.callbacks.BlockchainEvent(event) } func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error { @@ -367,7 +375,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er headers := reply.GetObject("headers") requestID := headers.GetString("requestId") replyType := headers.GetString("type") - txHash := reply.GetString("transactionHash") + txHash := reply.GetString("transactionId") message := reply.GetString("errorMessage") if requestID == "" || replyType == "" { l.Errorf("Reply cannot be processed: %+v", reply) @@ -383,7 +391,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er updateType = fftypes.OpStatusFailed } l.Infof("Fabconnect '%s' reply tx=%s (request=%s) %s", replyType, txHash, requestID, message) - return f.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply) + return f.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply) } func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) error { diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 0fc4f468e..7717ab2bf 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -6,7 +6,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -570,20 +570,24 @@ func TestHandleMessageBatchPinOK(t *testing.T) { data := []byte(` [ { - "chaincodeId": "firefly", - "blockNumber": 91, - "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", - "eventName": "BatchPin", - "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiUW1mNDEyalFaaXVWVXRkZ25CMzZGWEZYN3hnNVY2S0ViU0o0ZHBRdWhrTHlmRCIsImNvbnRleHRzIjpbIjB4NjhlNGRhNzlmODA1YmNhNWI5MTJiY2RhOWM2M2QwM2U2ZTg2NzEwOGRhYmI5Yjk0NDEwOWFlYTU0MWVmNTIyYSIsIjB4MTliODIwOTNkZTVjZTkyYTAxZTMzMzA0OGU4NzdlMjM3NDM1NGJmODQ2ZGQwMzQ4NjRlZjZmZmJkNjQzODc3MSJdfQ==", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiUW1mNDEyalFaaXVWVXRkZ25CMzZGWEZYN3hnNVY2S0ViU0o0ZHBRdWhrTHlmRCIsImNvbnRleHRzIjpbIjB4NjhlNGRhNzlmODA1YmNhNWI5MTJiY2RhOWM2M2QwM2U2ZTg2NzEwOGRhYmI5Yjk0NDEwOWFlYTU0MWVmNTIyYSIsIjB4MTliODIwOTNkZTVjZTkyYTAxZTMzMzA0OGU4NzdlMjM3NDM1NGJmODQ2ZGQwMzQ4NjRlZjZmZmJkNjQzODc3MSJdfQ==", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" }, { - "chaincodeId": "firefly", - "blockNumber": 77, - "transactionId": "a488800a70c8f765871611168d422fb29cc37da2d0a196a3200c8068ba1706fd", - "eventName": "BatchPin", - "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiUW1mNDEyalFaaXVWVXRkZ25CMzZGWEZYN3hnNVY2S0ViU0o0ZHBRdWhrTHlmRCIsImNvbnRleHRzIjpbIjB4NjhlNGRhNzlmODA1YmNhNWI5MTJiY2RhOWM2M2QwM2U2ZTg2NzEwOGRhYmI5Yjk0NDEwOWFlYTU0MWVmNTIyYSIsIjB4MTliODIwOTNkZTVjZTkyYTAxZTMzMzA0OGU4NzdlMjM3NDM1NGJmODQ2ZGQwMzQ4NjRlZjZmZmJkNjQzODc3MSJdfQ==", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 77, + "transactionIndex": 0, + "eventIndex": 0, + "transactionId": "a488800a70c8f765871611168d422fb29cc37da2d0a196a3200c8068ba1706fd", + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiUW1mNDEyalFaaXVWVXRkZ25CMzZGWEZYN3hnNVY2S0ViU0o0ZHBRdWhrTHlmRCIsImNvbnRleHRzIjpbIjB4NjhlNGRhNzlmODA1YmNhNWI5MTJiY2RhOWM2M2QwM2U2ZTg2NzEwOGRhYmI5Yjk0NDEwOWFlYTU0MWVmNTIyYSIsIjB4MTliODIwOTNkZTVjZTkyYTAxZTMzMzA0OGU4NzdlMjM3NDM1NGJmODQ2ZGQwMzQ4NjRlZjZmZmJkNjQzODc3MSJdfQ==", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" } ]`) @@ -595,7 +599,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { ID: "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e", } - em.On("BatchPinComplete", mock.Anything, "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", mock.Anything, mock.Anything).Return(nil) + em.On("BatchPinComplete", mock.Anything, "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server").Return(nil) var events []interface{} err := json.Unmarshal(data, &events) @@ -609,7 +613,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { assert.Equal(t, "847d3bfd-0742-49ef-b65d-3fed15f5b0a6", b.BatchID.String()) assert.Equal(t, "d71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", b.BatchHash.String()) assert.Equal(t, "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", b.BatchPayloadRef) - assert.Equal(t, "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", em.Calls[0].Arguments[1]) + assert.Equal(t, "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", em.Calls[1].Arguments[1]) assert.Len(t, b.Contexts, 2) assert.Equal(t, "68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", b.Contexts[0].String()) assert.Equal(t, "19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771", b.Contexts[1].String()) @@ -622,12 +626,12 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { data := []byte(` [ { - "chaincodeId": "firefly", - "blockNumber": 91, - "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", - "eventName": "BatchPin", - "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMyMDQwLCJuYW5vcyI6MjI5MjM1MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiIiwiY29udGV4dHMiOlsiMHg2OGU0ZGE3OWY4MDViY2E1YjkxMmJjZGE5YzYzZDAzZTZlODY3MTA4ZGFiYjliOTQ0MTA5YWVhNTQxZWY1MjJhIiwiMHgxOWI4MjA5M2RlNWNlOTJhMDFlMzMzMDQ4ZTg3N2UyMzc0MzU0YmY4NDZkZDAzNDg2NGVmNmZmYmQ2NDM4NzcxIl19", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMyMDQwLCJuYW5vcyI6MjI5MjM1MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiIiwiY29udGV4dHMiOlsiMHg2OGU0ZGE3OWY4MDViY2E1YjkxMmJjZGE5YzYzZDAzZTZlODY3MTA4ZGFiYjliOTQ0MTA5YWVhNTQxZWY1MjJhIiwiMHgxOWI4MjA5M2RlNWNlOTJhMDFlMzMzMDQ4ZTg3N2UyMzc0MzU0YmY4NDZkZDAzNDg2NGVmNmZmYmQ2NDM4NzcxIl19", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" } ]`) @@ -666,12 +670,12 @@ func TestHandleMessageBatchPinExit(t *testing.T) { data := []byte(` [ { - "chaincodeId": "firefly", - "blockNumber": 91, - "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", - "eventName": "BatchPin", - "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMyMDQwLCJuYW5vcyI6MjI5MjM1MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiIiwiY29udGV4dHMiOlsiMHg2OGU0ZGE3OWY4MDViY2E1YjkxMmJjZGE5YzYzZDAzZTZlODY3MTA4ZGFiYjliOTQ0MTA5YWVhNTQxZWY1MjJhIiwiMHgxOWI4MjA5M2RlNWNlOTJhMDFlMzMzMDQ4ZTg3N2UyMzc0MzU0YmY4NDZkZDAzNDg2NGVmNmZmYmQ2NDM4NzcxIl19", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMyMDQwLCJuYW5vcyI6MjI5MjM1MDAwfSwibmFtZXNwYWNlIjoibnMxIiwidXVpZHMiOiIweGUxOWFmOGIzOTA2MDQwNTE4MTJkNzU5N2QxOWFkZmI5ODQ3ZDNiZmQwNzQyNDllZmI2NWQzZmVkMTVmNWIwYTYiLCJiYXRjaEhhc2giOiIweGQ3MWViMTM4ZDc0YzIyOWEzODhlYjBlMWFiYzAzZjRjN2NiYjIxZDRmYzRiODM5ZmJmMGVjNzNlNDI2M2Y2YmUiLCJwYXlsb2FkUmVmIjoiIiwiY29udGV4dHMiOlsiMHg2OGU0ZGE3OWY4MDViY2E1YjkxMmJjZGE5YzYzZDAzZTZlODY3MTA4ZGFiYjliOTQ0MTA5YWVhNTQxZWY1MjJhIiwiMHgxOWI4MjA5M2RlNWNlOTJhMDFlMzMzMDQ4ZTg3N2UyMzc0MzU0YmY4NDZkZDAzNDg2NGVmNmZmYmQ2NDM4NzcxIl19", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" } ]`) @@ -697,11 +701,11 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { data := []byte(` [ { - "chaincodeId": "firefly", - "blockNumber": 91, - "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", - "eventName": "BatchPin", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "eventName": "BatchPin", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" } ]`) @@ -723,11 +727,11 @@ func TestHandleMessageUnknownEventName(t *testing.T) { data := []byte(` [ { - "chaincodeId": "firefly", - "blockNumber": 91, - "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", - "eventName": "UnknownEvent", - "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "eventName": "UnknownEvent", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" } ]`) @@ -904,6 +908,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { txsu := em.On("BlockchainOpUpdate", operationID, fftypes.OpStatusFailed, + "", "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument", mock.Anything).Return(fmt.Errorf("Shutdown")) done := make(chan struct{}) @@ -932,20 +937,22 @@ func TestHandleReceiptTXSuccess(t *testing.T) { operationID := fftypes.NewUUID() data := []byte(`{ "_id": "748e7587-9e72-4244-7351-808f69b88291", - "headers": { - "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", - "requestId": "` + operationID.String() + `", - "requestOffset": "", - "timeElapsed": 0.475721, - "timeReceived": "2021-08-27T03:04:34.199742Z", - "type": "TransactionSuccess" - }, - "receivedAt": 1630033474675 + "headers": { + "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", + "requestId": "` + operationID.String() + `", + "requestOffset": "", + "timeElapsed": 0.475721, + "timeReceived": "2021-08-27T03:04:34.199742Z", + "type": "TransactionSuccess" + }, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "receivedAt": 1630033474675 }`) em.On("BlockchainOpUpdate", operationID, fftypes.OpStatusSucceeded, + "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", "", mock.Anything).Return(nil) @@ -987,15 +994,15 @@ func TestHandleReceiptBadRequestID(t *testing.T) { var reply fftypes.JSONObject data := []byte(`{ "_id": "748e7587-9e72-4244-7351-808f69b88291", - "headers": { - "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", - "requestId": "bad-UUID", - "requestOffset": "", - "timeElapsed": 0.475721, - "timeReceived": "2021-08-27T03:04:34.199742Z", - "type": "TransactionSuccess" - }, - "receivedAt": 1630033474675 + "headers": { + "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", + "requestId": "bad-UUID", + "requestOffset": "", + "timeElapsed": 0.475721, + "timeReceived": "2021-08-27T03:04:34.199742Z", + "type": "TransactionSuccess" + }, + "receivedAt": 1630033474675 }`) err := json.Unmarshal(data, &reply) @@ -1018,20 +1025,22 @@ func TestHandleReceiptFailedTx(t *testing.T) { operationID := fftypes.NewUUID() data := []byte(`{ "_id": "748e7587-9e72-4244-7351-808f69b88291", - "headers": { - "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", - "requestId": "` + operationID.String() + `", - "requestOffset": "", - "timeElapsed": 0.475721, - "timeReceived": "2021-08-27T03:04:34.199742Z", - "type": "TransactionFailure" - }, - "receivedAt": 1630033474675 + "headers": { + "id": "0ef91fb6-09c5-4ca2-721c-74b4869097c2", + "requestId": "` + operationID.String() + `", + "requestOffset": "", + "timeElapsed": 0.475721, + "timeReceived": "2021-08-27T03:04:34.199742Z", + "type": "TransactionFailure" + }, + "receivedAt": 1630033474675, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" }`) em.On("BlockchainOpUpdate", operationID, fftypes.OpStatusFailed, + "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", "", mock.Anything).Return(nil) @@ -1191,6 +1200,8 @@ func TestHandleMessageContractEvent(t *testing.T) { "chaincodeId": "basic", "blockNumber": 10, "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "transactionIndex": 20, + "eventIndex": 30, "eventName": "AssetCreated", "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" @@ -1205,7 +1216,11 @@ func TestHandleMessageContractEvent(t *testing.T) { ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5", } - em.On("ContractEvent", mock.Anything).Return(nil) + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", e.BlockchainTXID) + assert.Equal(t, "000000000010/000020/000030", e.Event.ProtocolID) + return true + })).Return(nil) var events []interface{} err := json.Unmarshal(data, &events) @@ -1213,7 +1228,7 @@ func TestHandleMessageContractEvent(t *testing.T) { err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - ev := em.Calls[0].Arguments[0].(*blockchain.ContractEvent) + ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription) assert.Equal(t, "sb-cb37cc07-e873-4f58-44ab-55add6bba320", ev.Subscription) assert.Equal(t, "AssetCreated", ev.Event.Name) @@ -1227,11 +1242,13 @@ func TestHandleMessageContractEvent(t *testing.T) { assert.Equal(t, outputs, ev.Event.Output) info := fftypes.JSONObject{ - "blockNumber": float64(10), - "chaincodeId": "basic", - "eventName": "AssetCreated", - "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320", - "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "blockNumber": float64(10), + "chaincodeId": "basic", + "eventName": "AssetCreated", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320", + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "transactionIndex": float64(20), + "eventIndex": float64(30), } assert.Equal(t, info, ev.Event.Info) @@ -1289,7 +1306,7 @@ func TestHandleMessageContractEventError(t *testing.T) { ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5", } - em.On("ContractEvent", mock.Anything).Return(fmt.Errorf("pop")) + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data, &events) diff --git a/internal/broadcast/definition.go b/internal/broadcast/definition.go index 8bf600111..17f218eb3 100644 --- a/internal/broadcast/definition.go +++ b/internal/broadcast/definition.go @@ -76,7 +76,7 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns st Namespace: ns, Type: fftypes.MessageTypeDefinition, Identity: *signingIdentity, - Topics: fftypes.FFNameArray{def.Topic()}, + Topics: fftypes.FFStringArray{def.Topic()}, Tag: string(tag), TxType: fftypes.TransactionTypeBatchPin, }, diff --git a/internal/database/sqlcommon/transaction_sql.go b/internal/database/sqlcommon/transaction_sql.go index c9aa443f8..883cc73e8 100644 --- a/internal/database/sqlcommon/transaction_sql.go +++ b/internal/database/sqlcommon/transaction_sql.go @@ -34,9 +34,11 @@ var ( "namespace", "created", "status", + "blockchain_ids", } transactionFilterFieldMap = map[string]string{ - "type": "ttype", + "type": "ttype", + "blockchainids": "blockchain_ids", } ) @@ -49,7 +51,7 @@ func (s *SQLCommon) UpsertTransaction(ctx context.Context, transaction *fftypes. // Do a select within the transaction to determine if the UUID already exists transactionRows, _, err := s.queryTx(ctx, tx, - sq.Select("seq"). + sq.Select("blockchain_ids"). From("transactions"). Where(sq.Eq{"id": transaction.ID}), ) @@ -57,15 +59,19 @@ func (s *SQLCommon) UpsertTransaction(ctx context.Context, transaction *fftypes. return err } existing := transactionRows.Next() - transactionRows.Close() if existing { + var existingBlockchainIDs fftypes.FFStringArray + _ = transactionRows.Scan(&existingBlockchainIDs) + transaction.BlockchainIDs = transaction.BlockchainIDs.MergeLower(existingBlockchainIDs) + transactionRows.Close() // Update the transaction if _, err = s.updateTx(ctx, tx, sq.Update("transactions"). Set("ttype", string(transaction.Type)). Set("namespace", transaction.Namespace). Set("status", transaction.Status). + Set("blockchain_ids", transaction.BlockchainIDs). Where(sq.Eq{"id": transaction.ID}), func() { s.callbacks.UUIDCollectionNSEvent(database.CollectionTransactions, fftypes.ChangeEventTypeUpdated, transaction.Namespace, transaction.ID) @@ -74,6 +80,7 @@ func (s *SQLCommon) UpsertTransaction(ctx context.Context, transaction *fftypes. return err } } else { + transactionRows.Close() // Insert a transaction transaction.Created = fftypes.Now() if _, err = s.insertTx(ctx, tx, @@ -85,6 +92,7 @@ func (s *SQLCommon) UpsertTransaction(ctx context.Context, transaction *fftypes. transaction.Namespace, transaction.Created, transaction.Status, + transaction.BlockchainIDs, ), func() { s.callbacks.UUIDCollectionNSEvent(database.CollectionTransactions, fftypes.ChangeEventTypeCreated, transaction.Namespace, transaction.ID) @@ -105,6 +113,7 @@ func (s *SQLCommon) transactionResult(ctx context.Context, row *sql.Rows) (*ffty &transaction.Namespace, &transaction.Created, &transaction.Status, + &transaction.BlockchainIDs, ) if err != nil { return nil, i18n.WrapError(ctx, err, i18n.MsgDBReadErr, "transactions") diff --git a/internal/database/sqlcommon/transaction_sql_test.go b/internal/database/sqlcommon/transaction_sql_test.go index dd6babf83..cd829d11c 100644 --- a/internal/database/sqlcommon/transaction_sql_test.go +++ b/internal/database/sqlcommon/transaction_sql_test.go @@ -38,10 +38,11 @@ func TestTransactionE2EWithDB(t *testing.T) { // Create a new transaction entry transactionID := fftypes.NewUUID() transaction := &fftypes.Transaction{ - ID: transactionID, - Type: fftypes.TransactionTypeBatchPin, - Namespace: "ns1", - Status: fftypes.OpStatusPending, + ID: transactionID, + Type: fftypes.TransactionTypeBatchPin, + Namespace: "ns1", + Status: fftypes.OpStatusPending, + BlockchainIDs: fftypes.FFStringArray{"tx1"}, } s.callbacks.On("UUIDCollectionNSEvent", database.CollectionTransactions, fftypes.ChangeEventTypeCreated, "ns1", transactionID, mock.Anything).Return() @@ -60,15 +61,19 @@ func TestTransactionE2EWithDB(t *testing.T) { // Update the transaction transactionUpdated := &fftypes.Transaction{ - ID: transactionID, - Type: fftypes.TransactionTypeBatchPin, - Namespace: "ns1", - Created: transaction.Created, - Status: fftypes.OpStatusFailed, + ID: transactionID, + Type: fftypes.TransactionTypeBatchPin, + Namespace: "ns1", + Created: transaction.Created, + Status: fftypes.OpStatusFailed, + BlockchainIDs: fftypes.FFStringArray{"tx2", "tx3"}, // additive } err = s.UpsertTransaction(context.Background(), transactionUpdated) assert.NoError(t, err) + // Expect merged + transactionUpdated.BlockchainIDs = fftypes.FFStringArray{"tx1", "tx2", "tx3"} + // Check we get the exact same message back - note the removal of one of the transaction elements transactionRead, err = s.GetTransactionByID(ctx, transactionID) assert.NoError(t, err) diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index a4cfbd66f..623b75a1a 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -439,7 +439,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { Messages: []*fftypes.Message{ {Header: fftypes.MessageHeader{ ID: nil, /* missing */ - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }}, }, }, @@ -466,7 +466,7 @@ func TestProcessSkipDupMsg(t *testing.T) { Messages: []*fftypes.Message{ {Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), - Topics: fftypes.FFNameArray{"topic1", "topic2"}, + Topics: fftypes.FFStringArray{"topic1", "topic2"}, }}, }, }, @@ -497,7 +497,7 @@ func TestProcessMsgFailGetPins(t *testing.T) { Messages: []*fftypes.Message{ {Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }}, }, }, @@ -528,9 +528,9 @@ func TestProcessMsgFailBadPin(t *testing.T) { Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }, - Pins: fftypes.FFNameArray{"!Wrong"}, + Pins: fftypes.FFStringArray{"!Wrong"}, }) assert.NoError(t, err) @@ -547,9 +547,9 @@ func TestProcessMsgFailGetNextPins(t *testing.T) { Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }, - Pins: fftypes.FFNameArray{fftypes.NewRandB32().String()}, + Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, }) assert.EqualError(t, err, "pop") @@ -567,9 +567,9 @@ func TestProcessMsgFailDispatch(t *testing.T) { err := ag.processMessage(ag.ctx, &fftypes.Batch{}, false, 12345, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }, - Pins: fftypes.FFNameArray{fftypes.NewRandB32().String()}, + Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, }) assert.EqualError(t, err, "pop") @@ -595,9 +595,9 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, }, - Pins: fftypes.FFNameArray{pin.String()}, + Pins: fftypes.FFStringArray{pin.String()}, }) assert.EqualError(t, err, "pop") diff --git a/internal/events/batch_pin_complete.go b/internal/events/batch_pin_complete.go index 5adc35896..ba6ecbe11 100644 --- a/internal/events/batch_pin_complete.go +++ b/internal/events/batch_pin_complete.go @@ -73,10 +73,11 @@ func (em *eventManager) handlePrivatePinComplete(batchPin *blockchain.BatchPin) func (em *eventManager) persistBatchTransaction(ctx context.Context, batchPin *blockchain.BatchPin) error { return em.database.UpsertTransaction(ctx, &fftypes.Transaction{ - ID: batchPin.TransactionID, - Namespace: batchPin.Namespace, - Type: fftypes.TransactionTypeBatchPin, - Status: fftypes.OpStatusSucceeded, + ID: batchPin.TransactionID, + Namespace: batchPin.Namespace, + Type: fftypes.TransactionTypeBatchPin, + Status: fftypes.OpStatusSucceeded, + BlockchainIDs: fftypes.NewFFStringArray(batchPin.Event.BlockchainTXID), }) } diff --git a/internal/events/batch_pin_complete_test.go b/internal/events/batch_pin_complete_test.go index ef3f526d1..c503c6401 100644 --- a/internal/events/batch_pin_complete_test.go +++ b/internal/events/batch_pin_complete_test.go @@ -150,7 +150,7 @@ func TestBatchPinCompleteOkPrivate(t *testing.T) { mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(nil) mbi := &blockchainmocks.Plugin{} - err = em.BatchPinComplete(mbi, batch, "0x12345") + err = em.BatchPinComplete(mbi, batch, "0xffffeeee") assert.NoError(t, err) // Call through to persistBatch - the hash of our batch will be invalid, @@ -171,6 +171,9 @@ func TestSequencedBroadcastRetrieveIPFSFail(t *testing.T) { BatchID: fftypes.NewUUID(), BatchPayloadRef: "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", Contexts: []*fftypes.Bytes32{fftypes.NewRandB32()}, + Event: blockchain.Event{ + BlockchainTXID: "0x12345", + }, } cancel() // to avoid retry @@ -178,7 +181,7 @@ func TestSequencedBroadcastRetrieveIPFSFail(t *testing.T) { mpi.On("RetrieveData", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) mbi := &blockchainmocks.Plugin{} - err := em.BatchPinComplete(mbi, batch, "0x12345") + err := em.BatchPinComplete(mbi, batch, "0xffffeeee") mpi.AssertExpectations(t) assert.Regexp(t, "FF10158", err) } @@ -200,7 +203,7 @@ func TestBatchPinCompleteBadData(t *testing.T) { mpi.On("RetrieveData", mock.Anything, mock.Anything).Return(batchReadCloser, nil) mbi := &blockchainmocks.Plugin{} - err := em.BatchPinComplete(mbi, batch, "0x12345") + err := em.BatchPinComplete(mbi, batch, "0xffffeeee") assert.NoError(t, err) // We do not return a blocking error in the case of bad data stored in IPFS } @@ -222,6 +225,9 @@ func TestBatchPinCompleteBadNamespace(t *testing.T) { batch := &blockchain.BatchPin{ Namespace: "!bad", TransactionID: fftypes.NewUUID(), + Event: blockchain.Event{ + BlockchainTXID: "0x12345", + }, } mbi := &blockchainmocks.Plugin{} diff --git a/internal/events/contract_event.go b/internal/events/blockchain_event.go similarity index 96% rename from internal/events/contract_event.go rename to internal/events/blockchain_event.go index d8e9a7424..c30b5cbf1 100644 --- a/internal/events/contract_event.go +++ b/internal/events/blockchain_event.go @@ -53,7 +53,7 @@ func (em *eventManager) persistBlockchainEvent(ctx context.Context, chainEvent * return nil } -func (em *eventManager) ContractEvent(event *blockchain.ContractEvent) error { +func (em *eventManager) BlockchainEvent(event *blockchain.EventWithSubscription) error { return em.retry.Do(em.ctx, "persist contract event", func(attempt int) (bool, error) { err := em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { // TODO: should cache this lookup for efficiency diff --git a/internal/events/contract_event_test.go b/internal/events/blockchain_event_test.go similarity index 90% rename from internal/events/contract_event_test.go rename to internal/events/blockchain_event_test.go index 4cd1672e1..808432889 100644 --- a/internal/events/contract_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -31,10 +31,11 @@ func TestContractEventWithRetries(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - ev := &blockchain.ContractEvent{ + ev := &blockchain.EventWithSubscription{ Subscription: "sb-1", Event: blockchain.Event{ - Name: "Changed", + BlockchainTXID: "0xabcd1234", + Name: "Changed", Output: fftypes.JSONObject{ "value": "1", }, @@ -62,7 +63,7 @@ func TestContractEventWithRetries(t *testing.T) { return e.Type == fftypes.EventTypeBlockchainEvent && e.Reference != nil && e.Reference == eventID })).Return(nil).Once() - err := em.ContractEvent(ev) + err := em.BlockchainEvent(ev) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -72,10 +73,11 @@ func TestContractEventUnknownSubscription(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - ev := &blockchain.ContractEvent{ + ev := &blockchain.EventWithSubscription{ Subscription: "sb-1", Event: blockchain.Event{ - Name: "Changed", + BlockchainTXID: "0xabcd1234", + Name: "Changed", Output: fftypes.JSONObject{ "value": "1", }, @@ -88,7 +90,7 @@ func TestContractEventUnknownSubscription(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdi.On("GetContractSubscriptionByProtocolID", mock.Anything, "sb-1").Return(nil, nil) - err := em.ContractEvent(ev) + err := em.BlockchainEvent(ev) assert.NoError(t, err) mdi.AssertExpectations(t) diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 3feb780b1..d07bdc493 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -427,7 +427,7 @@ func TestFilterEventsMatch(t *testing.T) { }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, Tag: "tag1", Group: nil, Identity: fftypes.Identity{ @@ -444,7 +444,7 @@ func TestFilterEventsMatch(t *testing.T) { }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ - Topics: fftypes.FFNameArray{"topic1"}, + Topics: fftypes.FFStringArray{"topic1"}, Tag: "tag2", Group: gid1, Identity: fftypes.Identity{ @@ -461,7 +461,7 @@ func TestFilterEventsMatch(t *testing.T) { }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ - Topics: fftypes.FFNameArray{"topic2"}, + Topics: fftypes.FFStringArray{"topic2"}, Tag: "tag1", Group: nil, Identity: fftypes.Identity{ diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 529e5ca8c..ebf092611 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -56,9 +56,9 @@ type EventManager interface { WaitStop() // Bound blockchain callbacks - OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState blockchain.TransactionStatus, errorMessage string, opOutput fftypes.JSONObject) error + OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error BatchPinComplete(bi blockchain.Plugin, batch *blockchain.BatchPin, signingIdentity string) error - ContractEvent(event *blockchain.ContractEvent) error + BlockchainEvent(event *blockchain.EventWithSubscription) error // Bound dataexchange callbacks TransferResult(dx dataexchange.Plugin, trackingID string, status fftypes.OpStatus, update fftypes.TransportStatusUpdate) error diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index c6bac4129..e54fc8055 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -17,37 +17,56 @@ package events import ( + "context" + "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) -func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - op, err := em.database.GetOperationByID(em.ctx, operationID) +func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error { + op, err := em.database.GetOperationByID(ctx, operationID) if err != nil || op == nil { log.L(em.ctx).Warnf("Operation update '%s' ignored, as it was not submitted by this node", operationID) return nil } - update := database.OperationQueryFactory.NewUpdate(em.ctx). + update := database.OperationQueryFactory.NewUpdate(ctx). Set("status", txState). Set("error", errorMessage). Set("output", opOutput) - if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { + if err := em.database.UpdateOperation(ctx, op.ID, update); err != nil { return err } + setTXFailed := false + // Special handling for OpTypeTokenTransfer, which writes an event when it fails if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { - txUpdate := database.TransactionQueryFactory.NewUpdate(em.ctx).Set("status", txState) - if err := em.database.UpdateTransaction(em.ctx, op.Transaction, txUpdate); err != nil { + setTXFailed = true + event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) + if err := em.database.InsertEvent(ctx, event); err != nil { return err } + } - event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) - if err := em.database.InsertEvent(em.ctx, event); err != nil { - return err + tx, err := em.database.GetTransactionByID(ctx, op.Transaction) + if tx != nil { + if setTXFailed { + tx.Status = fftypes.OpStatusFailed } + tx.BlockchainIDs = tx.BlockchainIDs.AppendLowerUnique(blockchainTXID) + err = em.database.UpsertTransaction(ctx, tx) } + if err != nil { + return err + } + return nil } + +func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error { + return em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { + return em.operationUpdateCtx(ctx, operationID, txState, blockchainTXID, errorMessage, opOutput) + }) +} diff --git a/internal/events/operation_update_test.go b/internal/events/operation_update_test.go index 887bc2f78..876dd550c 100644 --- a/internal/events/operation_update_test.go +++ b/internal/events/operation_update_test.go @@ -17,6 +17,7 @@ package events import ( + "context" "fmt" "testing" @@ -34,11 +35,17 @@ func TestOperationUpdateSuccess(t *testing.T) { mbi := &blockchainmocks.Plugin{} opID := fftypes.NewUUID() + txid := fftypes.NewUUID() + mdi.On("RunAsGroup", em.ctx, mock.Anything).Run(func(args mock.Arguments) { + args[1].(func(ctx context.Context) error)(em.ctx) + }).Return(nil) mdi.On("GetOperationByID", em.ctx, opID).Return(&fftypes.Operation{ID: opID}, nil) mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", em.ctx, mock.Anything).Return(&fftypes.Transaction{ID: txid}, nil) + mdi.On("UpsertTransaction", em.ctx, mock.Anything).Return(nil) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + err := em.OperationUpdate(mdi, opID, fftypes.OpStatusFailed, "", "some error", info) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -55,7 +62,7 @@ func TestOperationUpdateNotFound(t *testing.T) { mdi.On("GetOperationByID", em.ctx, opID).Return(nil, fmt.Errorf("pop")) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + err := em.operationUpdateCtx(em.ctx, opID, fftypes.OpStatusFailed, "", "some error", info) assert.NoError(t, err) // swallowed after logging mdi.AssertExpectations(t) @@ -73,7 +80,7 @@ func TestOperationUpdateError(t *testing.T) { mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(fmt.Errorf("pop")) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + err := em.operationUpdateCtx(em.ctx, opID, fftypes.OpStatusFailed, "", "some error", info) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -94,13 +101,14 @@ func TestOperationUpdateTransferFail(t *testing.T) { mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) - mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" })).Return(nil) + mdi.On("GetTransactionByID", em.ctx, mock.Anything).Return(&fftypes.Transaction{ID: fftypes.NewUUID()}, nil) + mdi.On("UpsertTransaction", em.ctx, mock.Anything).Return(nil) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info) + err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "", "some error", info) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -121,10 +129,12 @@ func TestOperationUpdateTransferTransactionFail(t *testing.T) { mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) - mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(fmt.Errorf("pop")) + mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", em.ctx, mock.Anything).Return(&fftypes.Transaction{ID: fftypes.NewUUID()}, nil) + mdi.On("UpsertTransaction", em.ctx, mock.Anything).Return(fmt.Errorf("pop")) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info) + err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "", "some error", info) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -145,13 +155,12 @@ func TestOperationUpdateTransferEventFail(t *testing.T) { mdi.On("GetOperationByID", em.ctx, op.ID).Return(op, nil) mdi.On("UpdateOperation", em.ctx, op.ID, mock.Anything).Return(nil) - mdi.On("UpdateTransaction", em.ctx, op.Transaction, mock.Anything).Return(nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" })).Return(fmt.Errorf("pop")) info := fftypes.JSONObject{"some": "info"} - err := em.OperationUpdate(mbi, op.ID, fftypes.OpStatusFailed, "some error", info) + err := em.operationUpdateCtx(em.ctx, op.ID, fftypes.OpStatusFailed, "", "some error", info) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index 61ad3cc67..8e8dfa550 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -41,21 +41,22 @@ func addPoolDetailsFromPlugin(ffPool *fftypes.TokenPool, pluginPool *tokens.Toke } } -func poolTransaction(pool *fftypes.TokenPool, status fftypes.OpStatus) *fftypes.Transaction { +func poolTransaction(pool *fftypes.TokenPool, status fftypes.OpStatus, blockchainTXID string) *fftypes.Transaction { return &fftypes.Transaction{ - ID: pool.TX.ID, - Status: status, - Namespace: pool.Namespace, - Type: pool.TX.Type, + ID: pool.TX.ID, + Status: status, + Namespace: pool.Namespace, + Type: pool.TX.Type, + BlockchainIDs: fftypes.NewFFStringArray(blockchainTXID), } } -func (em *eventManager) confirmPool(ctx context.Context, pool *fftypes.TokenPool, ev *blockchain.Event) error { +func (em *eventManager) confirmPool(ctx context.Context, pool *fftypes.TokenPool, ev *blockchain.Event, blockchainTXID string) error { chainEvent := buildBlockchainEvent(pool.Namespace, nil, ev, &pool.TX) if err := em.persistBlockchainEvent(ctx, chainEvent); err != nil { return err } - tx := poolTransaction(pool, fftypes.OpStatusSucceeded) + tx := poolTransaction(pool, fftypes.OpStatusSucceeded, blockchainTXID) if err := em.database.UpsertTransaction(ctx, tx); err != nil { return err } @@ -154,7 +155,7 @@ func (em *eventManager) TokenPoolCreated(ti tokens.Plugin, pool *tokens.TokenPoo } else if msg != nil { batchID = msg.BatchID // trigger rewind after completion of database transaction } - return em.confirmPool(ctx, existingPool, &pool.Event) + return em.confirmPool(ctx, existingPool, &pool.Event, pool.Event.BlockchainTXID) } // See if this pool was submitted locally and needs to be announced diff --git a/internal/events/token_pool_created_test.go b/internal/events/token_pool_created_test.go index 952141df9..895a9653d 100644 --- a/internal/events/token_pool_created_test.go +++ b/internal/events/token_pool_created_test.go @@ -47,8 +47,9 @@ func TestTokenPoolCreatedIgnore(t *testing.T) { TransactionID: txID, Connector: "erc1155", Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } @@ -74,8 +75,9 @@ func TestTokenPoolCreatedIgnoreNoTX(t *testing.T) { TransactionID: nil, Connector: "erc1155", Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } @@ -98,9 +100,10 @@ func TestTokenPoolCreatedConfirm(t *testing.T) { Connector: "erc1155", TransactionID: txID, Event: blockchain.Event{ - Name: "TokenPool", - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + Name: "TokenPool", + ProtocolID: "tx1", + Info: info, }, } storedPool := &fftypes.TokenPool{ @@ -157,8 +160,9 @@ func TestTokenPoolCreatedAlreadyConfirmed(t *testing.T) { Connector: "erc1155", TransactionID: txID, Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } storedPool := &fftypes.TokenPool{ @@ -196,8 +200,9 @@ func TestTokenPoolCreatedMigrate(t *testing.T) { Connector: "magic-tokens", TransactionID: txID, Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } storedPool := &fftypes.TokenPool{ @@ -267,7 +272,7 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) { return e.Name == event.Name })).Return(fmt.Errorf("pop")) - err := em.confirmPool(em.ctx, storedPool, event) + err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -303,7 +308,7 @@ func TestConfirmPoolTxFail(t *testing.T) { return tx.Type == fftypes.TransactionTypeTokenPool })).Return(fmt.Errorf("pop")) - err := em.confirmPool(em.ctx, storedPool, event) + err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -340,7 +345,7 @@ func TestConfirmPoolUpsertFail(t *testing.T) { })).Return(nil) mdi.On("UpsertTokenPool", em.ctx, storedPool).Return(fmt.Errorf("pop")) - err := em.confirmPool(em.ctx, storedPool, event) + err := em.confirmPool(em.ctx, storedPool, event, "0xffffeeee") assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -373,8 +378,9 @@ func TestTokenPoolCreatedAnnounce(t *testing.T) { TransactionID: txID, Connector: "erc1155", Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } @@ -419,8 +425,9 @@ func TestTokenPoolCreatedAnnounceBadOpInputID(t *testing.T) { TransactionID: txID, Connector: "erc1155", Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } @@ -457,8 +464,9 @@ func TestTokenPoolCreatedAnnounceBadOpInputNS(t *testing.T) { TransactionID: txID, Connector: "erc1155", Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "tx1", + Info: info, }, } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index fc126c20c..1a4e06645 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -79,10 +79,11 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke } tx := &fftypes.Transaction{ - ID: transfer.TX.ID, - Status: fftypes.OpStatusSucceeded, - Namespace: transfer.Namespace, - Type: transfer.TX.Type, + ID: transfer.TX.ID, + Status: fftypes.OpStatusSucceeded, + Namespace: transfer.Namespace, + Type: transfer.TX.Type, + BlockchainIDs: fftypes.NewFFStringArray(transfer.Event.BlockchainTXID), } if err := em.database.UpsertTransaction(ctx, tx); err != nil { return false, err diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 9e27f87a2..4054dccd8 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -48,9 +48,10 @@ func newTransfer() *tokens.TokenTransfer { }, }, Event: blockchain.Event{ - Name: "Transfer", - ProtocolID: "tx1", - Info: fftypes.JSONObject{"some": "info"}, + BlockchainTXID: "0xffffeeee", + Name: "Transfer", + ProtocolID: "0000/0000/0000", + Info: fftypes.JSONObject{"some": "info"}, }, } } @@ -348,8 +349,9 @@ func TestTokensTransferredWithMessageReceived(t *testing.T) { Amount: *fftypes.NewFFBigInt(1), }, Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, }, } pool := &fftypes.TokenPool{ @@ -406,8 +408,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { Amount: *fftypes.NewFFBigInt(1), }, Event: blockchain.Event{ - ProtocolID: "tx1", - Info: info, + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, }, } pool := &fftypes.TokenPool{ diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index eaccc0304..80a6dfcca 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -252,4 +252,5 @@ var ( MsgFFISchemaParseFail = ffm("FF10332", "Failed to parse schema for param '%s'", 400) MsgFFISchemaCompileFail = ffm("FF10333", "Failed compile schema for param '%s'", 400) MsgPluginInitializationFailed = ffm("FF10334", "Plugin initialization error", 500) + MsgSafeCharsOnly = ffm("FF10335", "Field '%s' must include only alphanumerics (a-zA-Z0-9), dot (.), dash (-) and underscore (_)", 400) ) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 855bf0f76..d08cb054b 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -30,12 +30,12 @@ type boundCallbacks struct { ei events.EventManager } -func (bc *boundCallbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState blockchain.TransactionStatus, errorMessage string, opOutput fftypes.JSONObject) error { - return bc.ei.OperationUpdate(bc.bi, operationID, txState, errorMessage, opOutput) +func (bc *boundCallbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error { + return bc.ei.OperationUpdate(bc.bi, operationID, txState, blockchainTXID, errorMessage, opOutput) } -func (bc *boundCallbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - return bc.ei.OperationUpdate(plugin, operationID, txState, errorMessage, opOutput) +func (bc *boundCallbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error { + return bc.ei.OperationUpdate(plugin, operationID, txState, blockchainTXID, errorMessage, opOutput) } func (bc *boundCallbacks) BatchPinComplete(batch *blockchain.BatchPin, signingIdentity string) error { @@ -62,6 +62,6 @@ func (bc *boundCallbacks) TokensTransferred(plugin tokens.Plugin, transfer *toke return bc.ei.TokensTransferred(plugin, transfer) } -func (bc *boundCallbacks) ContractEvent(event *blockchain.ContractEvent) error { - return bc.ei.ContractEvent(event) +func (bc *boundCallbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { + return bc.ei.BlockchainEvent(event) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index e85963476..fd530631c 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -49,12 +49,12 @@ func TestBoundCallbacks(t *testing.T) { err := bc.BatchPinComplete(batch, "0x12345") assert.EqualError(t, err, "pop") - mei.On("OperationUpdate", mbi, opID, fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) - err = bc.BlockchainOpUpdate(opID, fftypes.OpStatusFailed, "error info", info) + mei.On("OperationUpdate", mbi, opID, fftypes.OpStatusFailed, "0xffffeeee", "error info", info).Return(fmt.Errorf("pop")) + err = bc.BlockchainOpUpdate(opID, fftypes.OpStatusFailed, "0xffffeeee", "error info", info) assert.EqualError(t, err, "pop") - mei.On("OperationUpdate", mti, opID, fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) - err = bc.TokenOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info) + mei.On("OperationUpdate", mti, opID, fftypes.OpStatusFailed, "0xffffeeee", "error info", info).Return(fmt.Errorf("pop")) + err = bc.TokenOpUpdate(mti, opID, fftypes.OpStatusFailed, "0xffffeeee", "error info", info) assert.EqualError(t, err, "pop") mei.On("TransferResult", mdx, "tracking12345", fftypes.OpStatusFailed, mock.Anything).Return(fmt.Errorf("pop")) @@ -79,7 +79,7 @@ func TestBoundCallbacks(t *testing.T) { err = bc.TokensTransferred(mti, transfer) assert.EqualError(t, err, "pop") - mei.On("ContractEvent", mock.AnythingOfType("*blockchain.ContractEvent")).Return(fmt.Errorf("pop")) - err = bc.ContractEvent(&blockchain.ContractEvent{}) + mei.On("BlockchainEvent", mock.AnythingOfType("*blockchain.EventWithSubscription")).Return(fmt.Errorf("pop")) + err = bc.BlockchainEvent(&blockchain.EventWithSubscription{}) assert.EqualError(t, err, "pop") } diff --git a/internal/privatemessaging/groupmanager.go b/internal/privatemessaging/groupmanager.go index c9d2be997..32ed6a41a 100644 --- a/internal/privatemessaging/groupmanager.go +++ b/internal/privatemessaging/groupmanager.go @@ -116,7 +116,7 @@ func (gm *groupManager) groupInit(ctx context.Context, signer *fftypes.Identity, Type: fftypes.MessageTypeGroupInit, Identity: *signer, Tag: string(fftypes.SystemTagDefineGroup), - Topics: fftypes.FFNameArray{group.Topic()}, + Topics: fftypes.FFStringArray{group.Topic()}, TxType: fftypes.TransactionTypeBatchPin, }, Data: fftypes.DataRefs{ diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 222bfffec..0ee1242dc 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -155,6 +155,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) requestID := data.GetString("id") success := data.GetBool("success") message := data.GetString("message") + transactionHash := data.GetString("transactionHash") if requestID == "" { l.Errorf("Reply cannot be processed - missing fields: %+v", data) return nil // Swallow this and move on @@ -169,10 +170,11 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) replyType = fftypes.OpStatusFailed } l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message) - return ft.callbacks.TokenOpUpdate(ft, opID, replyType, message, data) + return ft.callbacks.TokenOpUpdate(ft, opID, replyType, transactionHash, message, data) } func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { + eventProtocolID := data.GetString("id") tokenType := data.GetString("type") protocolID := data.GetString("poolId") standard := data.GetString("standard") // this is optional @@ -212,12 +214,13 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON Connector: ft.configuredName, Standard: standard, Event: blockchain.Event{ - Source: ft.Name() + ":" + ft.configuredName, - Name: "TokenPool", - ProtocolID: txHash, - Output: rawOutput, - Info: tx, - Timestamp: timestamp, + BlockchainTXID: txHash, + Source: ft.Name() + ":" + ft.configuredName, + Name: "TokenPool", + ProtocolID: eventProtocolID, + Output: rawOutput, + Info: tx, + Timestamp: timestamp, }, } @@ -226,7 +229,7 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON } func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTransferType, data fftypes.JSONObject) (err error) { - protocolID := data.GetString("id") + eventProtocolID := data.GetString("id") poolProtocolID := data.GetString("poolId") operatorAddress := data.GetString("operator") fromAddress := data.GetString("from") @@ -254,7 +257,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTran eventName = "Transfer" } - if protocolID == "" || + if eventProtocolID == "" || poolProtocolID == "" || operatorAddress == "" || value == "" || @@ -291,7 +294,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTran From: fromAddress, To: toAddress, Amount: amount, - ProtocolID: protocolID, + ProtocolID: eventProtocolID, Key: operatorAddress, Message: transferData.Message, MessageHash: transferData.MessageHash, @@ -301,12 +304,13 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTran }, }, Event: blockchain.Event{ - Source: ft.Name() + ":" + ft.configuredName, - Name: eventName, - ProtocolID: txHash, - Output: rawOutput, - Info: tx, - Timestamp: timestamp, + BlockchainTXID: txHash, + Source: ft.Name() + ":" + ft.configuredName, + Name: eventName, + ProtocolID: eventProtocolID, + Output: rawOutput, + Info: tx, + Timestamp: timestamp, }, } diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 82a5e858a..4651fe363 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -433,24 +433,26 @@ func TestEvents(t *testing.T) { }.String() // receipt: success - mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusSucceeded, "0xffffeeee", "", mock.Anything).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "4", "event": "receipt", "data": fftypes.JSONObject{ - "id": opID.String(), - "success": true, + "id": opID.String(), + "success": true, + "transactionHash": "0xffffeeee", }, }.String() // receipt: failure - mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusFailed, "0xffffeeee", "", mock.Anything).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "5", "event": "receipt", "data": fftypes.JSONObject{ - "id": opID.String(), - "success": false, + "id": opID.String(), + "success": false, + "transactionHash": "0xffffeeee", }, }.String() @@ -464,18 +466,19 @@ func TestEvents(t *testing.T) { // token-pool: invalid uuid (success) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { - return p.ProtocolID == "F1" && p.Type == fftypes.TokenTypeFungible && p.Key == "0x0" && p.TransactionID == nil && p.Event.ProtocolID == "abc" + return p.ProtocolID == "F1" && p.Type == fftypes.TokenTypeFungible && p.Key == "0x0" && p.TransactionID == nil && p.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "7", "event": "token-pool", "data": fftypes.JSONObject{ + "id": "000000000010/000020/000030/000040", "type": "fungible", "poolId": "F1", "operator": "0x0", "data": fftypes.JSONObject{"tx": "bad"}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -484,18 +487,19 @@ func TestEvents(t *testing.T) { // token-pool: success mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { - return p.ProtocolID == "F1" && p.Type == fftypes.TokenTypeFungible && p.Key == "0x0" && txID.Equals(p.TransactionID) && p.Event.ProtocolID == "abc" + return p.ProtocolID == "F1" && p.Type == fftypes.TokenTypeFungible && p.Key == "0x0" && txID.Equals(p.TransactionID) && p.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "8", "event": "token-pool", "data": fftypes.JSONObject{ + "id": "000000000010/000020/000030/000040", "type": "fungible", "poolId": "F1", "operator": "0x0", "data": fftypes.JSONObject{"tx": txID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -523,7 +527,7 @@ func TestEvents(t *testing.T) { "amount": "bad", "data": fftypes.JSONObject{"tx": txID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -532,20 +536,20 @@ func TestEvents(t *testing.T) { // token-mint: success mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *tokens.TokenTransfer) bool { - return t.Amount.Int().Int64() == 2 && t.To == "0x0" && t.TokenIndex == "" && *t.TX.ID == *txID && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "abc" + return t.Amount.Int().Int64() == 2 && t.To == "0x0" && t.TokenIndex == "" && *t.TX.ID == *txID && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "11", "event": "token-mint", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "F1", "operator": "0x0", "to": "0x0", "amount": "2", "data": fftypes.JSONObject{"tx": txID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -554,13 +558,13 @@ func TestEvents(t *testing.T) { // token-mint: invalid uuid (success) mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *tokens.TokenTransfer) bool { - return t.Amount.Int().Int64() == 1 && t.To == "0x0" && t.TokenIndex == "1" && t.PoolProtocolID == "N1" && t.Event.ProtocolID == "abc" + return t.Amount.Int().Int64() == 1 && t.To == "0x0" && t.TokenIndex == "1" && t.PoolProtocolID == "N1" && t.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "12", "event": "token-mint", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "N1", "tokenIndex": "1", "operator": "0x0", @@ -568,7 +572,7 @@ func TestEvents(t *testing.T) { "amount": "1", "data": fftypes.JSONObject{"tx": "bad"}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -580,7 +584,7 @@ func TestEvents(t *testing.T) { "id": "13", "event": "token-transfer", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "F1", "tokenIndex": "0", "operator": "0x0", @@ -588,7 +592,7 @@ func TestEvents(t *testing.T) { "amount": "2", "data": fftypes.JSONObject{"tx": txID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -597,13 +601,13 @@ func TestEvents(t *testing.T) { // token-transfer: bad message hash (success) mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *tokens.TokenTransfer) bool { - return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "abc" + return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "14", "event": "token-transfer", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "F1", "operator": "0x0", "from": "0x0", @@ -611,7 +615,7 @@ func TestEvents(t *testing.T) { "amount": "2", "data": fftypes.JSONObject{"tx": txID.String(), "messageHash": "bad"}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -621,13 +625,13 @@ func TestEvents(t *testing.T) { // token-transfer: success messageID := fftypes.NewUUID() mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *tokens.TokenTransfer) bool { - return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" && messageID.Equals(t.Message) && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "abc" + return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" && messageID.Equals(t.Message) && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "15", "event": "token-transfer", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "F1", "operator": "0x0", "from": "0x0", @@ -635,7 +639,7 @@ func TestEvents(t *testing.T) { "amount": "2", "data": fftypes.JSONObject{"tx": txID.String(), "message": messageID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() @@ -644,13 +648,13 @@ func TestEvents(t *testing.T) { // token-burn: success mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *tokens.TokenTransfer) bool { - return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.TokenIndex == "0" && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "abc" + return t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.TokenIndex == "0" && t.PoolProtocolID == "F1" && t.Event.ProtocolID == "000000000010/000020/000030/000040" })).Return(nil).Once() fromServer <- fftypes.JSONObject{ "id": "16", "event": "token-burn", "data": fftypes.JSONObject{ - "id": "1.0.0", + "id": "000000000010/000020/000030/000040", "poolId": "F1", "tokenIndex": "0", "operator": "0x0", @@ -658,7 +662,7 @@ func TestEvents(t *testing.T) { "amount": "2", "data": fftypes.JSONObject{"tx": txID.String()}.String(), "transaction": fftypes.JSONObject{ - "transactionHash": "abc", + "transactionHash": "0xffffeeee", }, }, }.String() diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index e44d1ee52..9017ab624 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -28,13 +28,13 @@ func (_m *Callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingIdentit return r0 } -// BlockchainOpUpdate provides a mock function with given fields: operationID, txState, errorMessage, opOutput -func (_m *Callbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - ret := _m.Called(operationID, txState, errorMessage, opOutput) +// BlockchainEvent provides a mock function with given fields: event +func (_m *Callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { + ret := _m.Called(event) var r0 error - if rf, ok := ret.Get(0).(func(*fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { - r0 = rf(operationID, txState, errorMessage, opOutput) + if rf, ok := ret.Get(0).(func(*blockchain.EventWithSubscription) error); ok { + r0 = rf(event) } else { r0 = ret.Error(0) } @@ -42,13 +42,13 @@ func (_m *Callbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState fftyp return r0 } -// ContractEvent provides a mock function with given fields: event -func (_m *Callbacks) ContractEvent(event *blockchain.ContractEvent) error { - ret := _m.Called(event) +// BlockchainOpUpdate provides a mock function with given fields: operationID, txState, blockchainTXID, errorMessage, opOutput +func (_m *Callbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID string, errorMessage string, opOutput fftypes.JSONObject) error { + ret := _m.Called(operationID, txState, blockchainTXID, errorMessage, opOutput) var r0 error - if rf, ok := ret.Get(0).(func(*blockchain.ContractEvent) error); ok { - r0 = rf(event) + if rf, ok := ret.Get(0).(func(*fftypes.UUID, fftypes.OpStatus, string, string, fftypes.JSONObject) error); ok { + r0 = rf(operationID, txState, blockchainTXID, errorMessage, opOutput) } else { r0 = ret.Error(0) } diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index df3d55a76..541d100ea 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -65,6 +65,20 @@ func (_m *EventManager) BatchPinComplete(bi blockchain.Plugin, batch *blockchain return r0 } +// BlockchainEvent provides a mock function with given fields: event +func (_m *EventManager) BlockchainEvent(event *blockchain.EventWithSubscription) error { + ret := _m.Called(event) + + var r0 error + if rf, ok := ret.Get(0).(func(*blockchain.EventWithSubscription) error); ok { + r0 = rf(event) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ChangeEvents provides a mock function with given fields: func (_m *EventManager) ChangeEvents() chan<- *fftypes.ChangeEvent { ret := _m.Called() @@ -81,20 +95,6 @@ func (_m *EventManager) ChangeEvents() chan<- *fftypes.ChangeEvent { return r0 } -// ContractEvent provides a mock function with given fields: event -func (_m *EventManager) ContractEvent(event *blockchain.ContractEvent) error { - ret := _m.Called(event) - - var r0 error - if rf, ok := ret.Get(0).(func(*blockchain.ContractEvent) error); ok { - r0 = rf(event) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // CreateUpdateDurableSubscription provides a mock function with given fields: ctx, subDef, mustNew func (_m *EventManager) CreateUpdateDurableSubscription(ctx context.Context, subDef *fftypes.Subscription, mustNew bool) error { ret := _m.Called(ctx, subDef, mustNew) @@ -208,13 +208,13 @@ func (_m *EventManager) NewSubscriptions() chan<- *fftypes.UUID { return r0 } -// OperationUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput -func (_m *EventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) +// OperationUpdate provides a mock function with given fields: plugin, operationID, txState, blockchainTXID, errorMessage, opOutput +func (_m *EventManager) OperationUpdate(plugin fftypes.Named, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID string, errorMessage string, opOutput fftypes.JSONObject) error { + ret := _m.Called(plugin, operationID, txState, blockchainTXID, errorMessage, opOutput) var r0 error - if rf, ok := ret.Get(0).(func(fftypes.Named, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, operationID, txState, errorMessage, opOutput) + if rf, ok := ret.Get(0).(func(fftypes.Named, *fftypes.UUID, fftypes.OpStatus, string, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, operationID, txState, blockchainTXID, errorMessage, opOutput) } else { r0 = ret.Error(0) } diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index c631b8357..895ea0a86 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -14,13 +14,13 @@ type Callbacks struct { mock.Mock } -// TokenOpUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput -func (_m *Callbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) +// TokenOpUpdate provides a mock function with given fields: plugin, operationID, txState, blockchainTXID, errorMessage, opOutput +func (_m *Callbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID string, errorMessage string, opOutput fftypes.JSONObject) error { + ret := _m.Called(plugin, operationID, txState, blockchainTXID, errorMessage, opOutput) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, operationID, txState, errorMessage, opOutput) + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, operationID, txState, blockchainTXID, errorMessage, opOutput) } else { r0 = ret.Error(0) } diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 08ea9e743..fb283a013 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -78,7 +78,7 @@ type Callbacks interface { // Only the party submitting the transaction will see this data. // // Error should will only be returned in shutdown scenarios - BlockchainOpUpdate(operationID *fftypes.UUID, txState TransactionStatus, errorMessage string, opOutput fftypes.JSONObject) error + BlockchainOpUpdate(operationID *fftypes.UUID, txState TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error // BatchPinComplete notifies on the arrival of a sequenced batch of messages, which might have been // submitted by us, or by any other authorized party in the network. @@ -86,8 +86,8 @@ type Callbacks interface { // Error should will only be returned in shutdown scenarios BatchPinComplete(batch *BatchPin, signingIdentity string) error - // ContractEvent notifies on the arrival of any event from a user-created subscription - ContractEvent(event *ContractEvent) error + // BlockchainEvent notifies on the arrival of any event from a user-created subscription. + BlockchainEvent(event *EventWithSubscription) error } // Capabilities the supported featureset of the blockchain @@ -161,9 +161,13 @@ type Event struct { // Timestamp is the time the event was emitted from the blockchain Timestamp *fftypes.FFTime + + // We capture the blockchain TXID as in the case + // of a FireFly transaction we want to reflect that blockchain TX back onto the FireFly TX object + BlockchainTXID string } -type ContractEvent struct { +type EventWithSubscription struct { Event // Subscription is the ID assigned to a custom contract subscription by the connector diff --git a/pkg/database/filter_test.go b/pkg/database/filter_test.go index f19e2c445..850bf021b 100644 --- a/pkg/database/filter_test.go +++ b/pkg/database/filter_test.go @@ -230,7 +230,7 @@ func TestBuildMessageJSONConvert(t *testing.T) { assert.Equal(t, `( output == null ) && ( output == '{}' ) && ( output == '{}' ) && ( output == '{"some":"value"}' )`, f.String()) } -func TestBuildFFNameArrayConvert(t *testing.T) { +func TestBuildFFStringArrayConvert(t *testing.T) { fb := MessageQueryFactory.NewFilter(context.Background()) f, err := fb.And( fb.Eq("topics", nil), @@ -339,6 +339,6 @@ func TestStringsForTypes(t *testing.T) { now := fftypes.Now() assert.Equal(t, now.String(), (&timeField{t: now}).String()) assert.Equal(t, `{"some":"value"}`, (&jsonField{b: []byte(`{"some":"value"}`)}).String()) - assert.Equal(t, "t1,t2", (&ffNameArrayField{na: fftypes.FFNameArray{"t1", "t2"}}).String()) + assert.Equal(t, "t1,t2", (&ffNameArrayField{na: fftypes.FFStringArray{"t1", "t2"}}).String()) assert.Equal(t, "true", (&boolField{b: true}).String()) } diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 51162e0bf..b61f494ed 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -660,12 +660,12 @@ var MessageQueryFactory = &queryFields{ "type": &StringField{}, "author": &StringField{}, "key": &StringField{}, - "topics": &FFNameArrayField{}, + "topics": &FFStringArrayField{}, "tag": &StringField{}, "group": &Bytes32Field{}, "created": &TimeField{}, "hash": &Bytes32Field{}, - "pins": &FFNameArrayField{}, + "pins": &FFStringArrayField{}, "state": &StringField{}, "confirmed": &TimeField{}, "sequence": &Int64Field{}, @@ -692,11 +692,12 @@ var BatchQueryFactory = &queryFields{ // TransactionQueryFactory filter fields for transactions var TransactionQueryFactory = &queryFields{ - "id": &UUIDField{}, - "type": &StringField{}, - "status": &StringField{}, - "created": &TimeField{}, - "namespace": &StringField{}, + "id": &UUIDField{}, + "type": &StringField{}, + "status": &StringField{}, + "created": &TimeField{}, + "namespace": &StringField{}, + "blockchainids": &FFStringArrayField{}, } // DataQueryFactory filter fields for data diff --git a/pkg/database/query_fields.go b/pkg/database/query_fields.go index 96c768a52..d28c0eb2e 100644 --- a/pkg/database/query_fields.go +++ b/pkg/database/query_fields.go @@ -297,17 +297,17 @@ func (f *JSONField) getSerialization() FieldSerialization { return &jsonField{} func (f *JSONField) filterAsString() bool { return true } func (f *JSONField) description() string { return "JSON-blob" } -type FFNameArrayField struct{} -type ffNameArrayField struct{ na fftypes.FFNameArray } +type FFStringArrayField struct{} +type ffNameArrayField struct{ na fftypes.FFStringArray } func (f *ffNameArrayField) Scan(src interface{}) (err error) { return f.na.Scan(src) } -func (f *ffNameArrayField) Value() (driver.Value, error) { return f.na.String(), nil } -func (f *ffNameArrayField) String() string { return f.na.String() } -func (f *FFNameArrayField) getSerialization() FieldSerialization { return &ffNameArrayField{} } -func (f *FFNameArrayField) filterAsString() bool { return true } -func (f *FFNameArrayField) description() string { return "String-array" } +func (f *ffNameArrayField) Value() (driver.Value, error) { return f.na.String(), nil } +func (f *ffNameArrayField) String() string { return f.na.String() } +func (f *FFStringArrayField) getSerialization() FieldSerialization { return &ffNameArrayField{} } +func (f *FFStringArrayField) filterAsString() bool { return true } +func (f *FFStringArrayField) description() string { return "String-array" } type BoolField struct{} type boolField struct{ b bool } diff --git a/pkg/database/query_fields_test.go b/pkg/database/query_fields_test.go index 82e551009..6b656bab5 100644 --- a/pkg/database/query_fields_test.go +++ b/pkg/database/query_fields_test.go @@ -195,9 +195,9 @@ func TestBoolField(t *testing.T) { } -func TestFFNameArrayField(t *testing.T) { +func TestFFStringArrayField(t *testing.T) { - fd := &FFNameArrayField{} + fd := &FFStringArrayField{} assert.NotEmpty(t, fd.description()) f := ffNameArrayField{} diff --git a/pkg/fftypes/jsonobject.go b/pkg/fftypes/jsonobject.go index 16ef5f147..df5f1d444 100644 --- a/pkg/fftypes/jsonobject.go +++ b/pkg/fftypes/jsonobject.go @@ -21,6 +21,7 @@ import ( "crypto/sha256" "database/sql/driver" "encoding/json" + "math/big" "strconv" "strings" @@ -57,6 +58,20 @@ func (jd JSONObject) GetString(key string) string { return s } +func (jd JSONObject) GetInteger(key string) *big.Int { + s := jd.GetString(key) + i, ok := big.NewInt(0).SetString(s, 0) + if !ok { + log.L(context.Background()).Errorf("Invalid int value '%+v' for key '%s'", s, key) + return big.NewInt(0) + } + return i +} + +func (jd JSONObject) GetInt64(key string) int64 { + return jd.GetInteger(key).Int64() +} + func (jd JSONObject) GetBool(key string) bool { vInterface := jd[key] switch vt := vInterface.(type) { diff --git a/pkg/fftypes/jsonobject_test.go b/pkg/fftypes/jsonobject_test.go index 13a99c284..983427566 100644 --- a/pkg/fftypes/jsonobject_test.go +++ b/pkg/fftypes/jsonobject_test.go @@ -155,7 +155,9 @@ func TestJSONNestedSafeGet(t *testing.T) { } ], "string_array": ["str1","str2"], - "wrong": null + "wrong": null, + "int1": "0xfeedbeef", + "int2": "12345" } `), &jd) assert.NoError(t, err) @@ -181,6 +183,10 @@ func TestJSONNestedSafeGet(t *testing.T) { GetString("some"), ) + assert.Equal(t, int64(0xfeedbeef), jd.GetInt64("int1")) + assert.Equal(t, int64(12345), jd.GetInt64("int2")) + assert.Equal(t, int64(0), jd.GetInt64("wrong")) + sa, ok := jd.GetStringArrayOk("wrong") assert.False(t, ok) assert.Empty(t, sa) diff --git a/pkg/fftypes/message.go b/pkg/fftypes/message.go index 515009df5..faf75d293 100644 --- a/pkg/fftypes/message.go +++ b/pkg/fftypes/message.go @@ -71,12 +71,12 @@ type MessageHeader struct { Type MessageType `json:"type" ffenum:"messagetype"` TxType TransactionType `json:"txtype,omitempty"` Identity - Created *FFTime `json:"created,omitempty"` - Namespace string `json:"namespace,omitempty"` - Group *Bytes32 `json:"group,omitempty"` - Topics FFNameArray `json:"topics,omitempty"` - Tag string `json:"tag,omitempty"` - DataHash *Bytes32 `json:"datahash,omitempty"` + Created *FFTime `json:"created,omitempty"` + Namespace string `json:"namespace,omitempty"` + Group *Bytes32 `json:"group,omitempty"` + Topics FFStringArray `json:"topics,omitempty"` + Tag string `json:"tag,omitempty"` + DataHash *Bytes32 `json:"datahash,omitempty"` } // Message is the envelope by which coordinated data exchange can happen between parties in the network @@ -89,7 +89,7 @@ type Message struct { State MessageState `json:"state,omitempty" ffenum:"messagestate"` Confirmed *FFTime `json:"confirmed,omitempty"` Data DataRefs `json:"data"` - Pins FFNameArray `json:"pins,omitempty"` + Pins FFStringArray `json:"pins,omitempty"` Sequence int64 `json:"-"` // Local database sequence used internally for batch assembly } @@ -168,7 +168,7 @@ func (m *Message) Seal(ctx context.Context) (err error) { if len(m.Header.Topics) == 0 { m.Header.Topics = []string{DefaultTopic} } - if err := m.Header.Topics.Validate(ctx, "header.topics"); err != nil { + if err := m.Header.Topics.Validate(ctx, "header.topics", true); err != nil { return err } if m.Header.Tag != "" { @@ -210,7 +210,7 @@ func (m *Message) DupDataCheck(ctx context.Context) (err error) { } func (m *Message) Verify(ctx context.Context) error { - if err := m.Header.Topics.Validate(ctx, "header.topics"); err != nil { + if err := m.Header.Topics.Validate(ctx, "header.topics", true); err != nil { return err } if m.Header.Tag != "" { diff --git a/pkg/fftypes/namearray.go b/pkg/fftypes/namearray.go deleted file mode 100644 index 2bda91ace..000000000 --- a/pkg/fftypes/namearray.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright © 2022 Kaleido, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package fftypes - -import ( - "context" - "database/sql/driver" - "fmt" - "strings" - - "github.com/hyperledger/firefly/internal/i18n" -) - -// FFNameArray is an array of strings, each conforming to the requirements of a FireFly name -type FFNameArray []string - -// Because each FFName has a max length of 64, 15 names (plus comma delimeters) is a safe max -// to pack into a string column of length 1024 -const FFNameArrayMax = 15 - -func (na FFNameArray) Value() (driver.Value, error) { - if na == nil { - return "", nil - } - return strings.Join([]string(na), ","), nil -} - -func (na *FFNameArray) Scan(src interface{}) error { - switch st := src.(type) { - case string: - if st == "" { - *na = []string{} - return nil - } - *na = strings.Split(st, ",") - return nil - case []byte: - if len(st) == 0 { - *na = []string{} - return nil - } - *na = strings.Split(string(st), ",") - return nil - case FFNameArray: - *na = st - return nil - case nil: - *na = []string{} - return nil - default: - return i18n.NewError(context.Background(), i18n.MsgScanFailed, src, na) - } -} - -func (na FFNameArray) String() string { - if na == nil { - return "" - } - return strings.Join([]string(na), ",") -} - -func (na FFNameArray) Validate(ctx context.Context, fieldName string) error { - dupCheck := make(map[string]bool) - for i, n := range na { - if dupCheck[n] { - return i18n.NewError(ctx, i18n.MsgDuplicateArrayEntry, fieldName, i, n) - } - dupCheck[n] = true - if err := ValidateFFNameField(ctx, n, fmt.Sprintf("%s[%d]", fieldName, i)); err != nil { - return err - } - } - if len(na) > FFNameArrayMax { - return i18n.NewError(ctx, i18n.MsgTooManyItems, fieldName, FFNameArrayMax, len(na)) - } - return nil -} diff --git a/pkg/fftypes/stringarray.go b/pkg/fftypes/stringarray.go new file mode 100644 index 000000000..144e7aeb9 --- /dev/null +++ b/pkg/fftypes/stringarray.go @@ -0,0 +1,138 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftypes + +import ( + "context" + "database/sql/driver" + "fmt" + "sort" + "strings" + + "github.com/hyperledger/firefly/internal/i18n" +) + +// FFStringArray is an array of strings, each conforming to the requirements of a FireFly name +type FFStringArray []string + +// Because each FFName has a max length of 64, 15 names (plus comma delimeters) is a safe max +// to pack into a string column of length 1024 +const FFStringNameItemsMax = 15 + +// FFStringArrayStandardMax is the standard length we set as a VARCHAR max in tables for a string array +const FFStringArrayStandardMax = 1024 + +func NewFFStringArray(initialContent ...string) FFStringArray { + sa := make(FFStringArray, 0, len(initialContent)) + for _, s := range initialContent { + if s != "" { + sa = append(sa, s) + } + } + return sa +} + +func (sa FFStringArray) Value() (driver.Value, error) { + if sa == nil { + return "", nil + } + return strings.Join([]string(sa), ","), nil +} + +func (sa *FFStringArray) Scan(src interface{}) error { + switch st := src.(type) { + case string: + if st == "" { + *sa = []string{} + return nil + } + *sa = strings.Split(st, ",") + return nil + case []byte: + if len(st) == 0 { + *sa = []string{} + return nil + } + *sa = strings.Split(string(st), ",") + return nil + case FFStringArray: + *sa = st + return nil + case nil: + *sa = []string{} + return nil + default: + return i18n.NewError(context.Background(), i18n.MsgScanFailed, src, sa) + } +} + +func (sa FFStringArray) String() string { + if sa == nil { + return "" + } + return strings.Join([]string(sa), ",") +} + +func (sa FFStringArray) Validate(ctx context.Context, fieldName string, isName bool) error { + var totalLength int + dupCheck := make(map[string]bool) + for i, n := range sa { + if dupCheck[n] { + return i18n.NewError(ctx, i18n.MsgDuplicateArrayEntry, fieldName, i, n) + } + dupCheck[n] = true + totalLength += len(n) + if isName { + if err := ValidateFFNameField(ctx, n, fmt.Sprintf("%s[%d]", fieldName, i)); err != nil { + return err + } + } else { + if err := ValidateSafeCharsOnly(ctx, n, fmt.Sprintf("%s[%d]", fieldName, i)); err != nil { + return err + } + } + } + if isName && len(sa) > FFStringNameItemsMax { + return i18n.NewError(ctx, i18n.MsgTooManyItems, fieldName, FFStringNameItemsMax, len(sa)) + } + if totalLength > FFStringArrayStandardMax { + return i18n.NewError(ctx, i18n.MsgFieldTooLong, fieldName, FFStringArrayStandardMax) + } + return nil +} + +func (sa FFStringArray) AppendLowerUnique(s string) FFStringArray { + for _, existing := range sa { + if s == "" || strings.EqualFold(s, existing) { + return sa + } + } + return append(sa, strings.ToLower(s)) +} + +// MergeLower returns a new array with a unique set of sorted lower case strings +func (sa FFStringArray) MergeLower(osa FFStringArray) FFStringArray { + res := make(FFStringArray, 0, len(sa)+len(osa)) + for _, s := range sa { + res = res.AppendLowerUnique(s) + } + for _, s := range osa { + res = res.AppendLowerUnique(s) + } + sort.Strings(res) + return res +} diff --git a/pkg/fftypes/namearray_test.go b/pkg/fftypes/stringarray_test.go similarity index 50% rename from pkg/fftypes/namearray_test.go rename to pkg/fftypes/stringarray_test.go index dba3101c4..459d895db 100644 --- a/pkg/fftypes/namearray_test.go +++ b/pkg/fftypes/stringarray_test.go @@ -19,40 +19,57 @@ package fftypes import ( "context" "fmt" + "strings" "testing" "github.com/stretchr/testify/assert" ) -func TestFFNameArrayVerifyTooLong(t *testing.T) { - na := make(FFNameArray, 16) +func TestFFStringArrayVerifyTooLong(t *testing.T) { + na := make(FFStringArray, 16) for i := 0; i < 16; i++ { na[i] = fmt.Sprintf("item_%d", i) } - err := na.Validate(context.Background(), "field1") + err := na.Validate(context.Background(), "field1", true) assert.Regexp(t, `FF10227.*field1`, err) } -func TestFFNameArrayVerifyDuplicate(t *testing.T) { - na := FFNameArray{"value1", "value2", "value1"} - err := na.Validate(context.Background(), "field1") +func TestFFStringArrayVerifyDuplicate(t *testing.T) { + na := FFStringArray{"value1", "value2", "value1"} + err := na.Validate(context.Background(), "field1", true) assert.Regexp(t, `FF10228.*field1`, err) } -func TestFFNameArrayVerifyBadName(t *testing.T) { - na := FFNameArray{"!valid"} - err := na.Validate(context.Background(), "field1") +func TestFFStringArrayVerifyBadName(t *testing.T) { + na := FFStringArray{"!valid"} + err := na.Validate(context.Background(), "field1", true) assert.Regexp(t, `FF10131.*field1\[0\]`, err) } -func TestFFNameArrayScanValue(t *testing.T) { +func TestFFStringArrayVerifyBadNonName(t *testing.T) { + na := FFStringArray{"!valid"} + err := na.Validate(context.Background(), "field1", false) + assert.Regexp(t, `FF10335.*field1\[0\]`, err) +} + +func TestFFStringArrayVerifyTooLongTotal(t *testing.T) { + longstr := strings.Builder{} + for i := 0; i < (FFStringArrayStandardMax + 1); i++ { + longstr.WriteRune('a') + } + na := FFStringArray{longstr.String()} + err := na.Validate(context.Background(), "field1", false) + assert.Regexp(t, `FF10188.*field1`, err) +} + +func TestFFStringArrayScanValue(t *testing.T) { - na1 := FFNameArray{"name1", "name2"} + na1 := FFStringArray{"name1", "name2"} v, err := na1.Value() assert.NoError(t, err) assert.Equal(t, "name1,name2", v) - var na2 FFNameArray + var na2 FFStringArray assert.Equal(t, "", na2.String()) v, err = na2.Value() assert.Equal(t, "", v) @@ -60,12 +77,12 @@ func TestFFNameArrayScanValue(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "name1,name2", na2.String()) - var na3 FFNameArray + var na3 FFStringArray err = na3.Scan([]byte("name1,name2")) assert.NoError(t, err) assert.Equal(t, "name1,name2", na3.String()) - var na4 FFNameArray + var na4 FFStringArray err = na4.Scan([]byte(nil)) assert.NoError(t, err) assert.Equal(t, "", na4.String()) @@ -76,18 +93,26 @@ func TestFFNameArrayScanValue(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "", v) - var na5 FFNameArray + var na5 FFStringArray err = na5.Scan("") assert.NoError(t, err) - assert.Equal(t, FFNameArray{}, na5) + assert.Equal(t, FFStringArray{}, na5) assert.Equal(t, "", na5.String()) - var na6 FFNameArray + var na6 FFStringArray err = na6.Scan(42) assert.Regexp(t, "FF10125", err) - var na7 FFNameArray - err = na7.Scan(FFNameArray{"test1", "test2"}) - assert.Equal(t, FFNameArray{"test1", "test2"}, na7) + var na7 FFStringArray + err = na7.Scan(FFStringArray{"test1", "test2"}) + assert.Equal(t, FFStringArray{"test1", "test2"}, na7) + +} + +func TestFFStringArrayMergeFold(t *testing.T) { + + sa := NewFFStringArray("name2", "NAME1") + assert.Equal(t, FFStringArray{"name1", "name2", "name3"}, sa.MergeLower(FFStringArray{"name3"})) + assert.Equal(t, FFStringArray{"name1", "name2", "name3", "name4"}, sa.MergeLower(FFStringArray{"NAME4", "NAME3", "name1", "name2"})) } diff --git a/pkg/fftypes/transaction.go b/pkg/fftypes/transaction.go index a9d539e9e..97d7ff153 100644 --- a/pkg/fftypes/transaction.go +++ b/pkg/fftypes/transaction.go @@ -41,9 +41,10 @@ type TransactionRef struct { // node, with the correlation information to look them up on the underlying // ledger technology type Transaction struct { - ID *UUID `json:"id,omitempty"` - Namespace string `json:"namespace,omitempty"` - Type TransactionType `json:"type" ffenum:"txtype"` - Created *FFTime `json:"created"` - Status OpStatus `json:"status"` + ID *UUID `json:"id,omitempty"` + Namespace string `json:"namespace,omitempty"` + Type TransactionType `json:"type" ffenum:"txtype"` + Created *FFTime `json:"created"` + Status OpStatus `json:"status"` + BlockchainIDs FFStringArray `json:"blockchainIds,omitempty"` } diff --git a/pkg/fftypes/validations.go b/pkg/fftypes/validations.go index 378840327..028be1289 100644 --- a/pkg/fftypes/validations.go +++ b/pkg/fftypes/validations.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,9 +24,17 @@ import ( ) var ( - ffNameValidator = regexp.MustCompile(`^[0-9a-zA-Z]([0-9a-zA-Z._-]{0,62}[0-9a-zA-Z])?$`) + ffNameValidator = regexp.MustCompile(`^[0-9a-zA-Z]([0-9a-zA-Z._-]{0,62}[0-9a-zA-Z])?$`) + ffSafeCharsValidator = regexp.MustCompile(`^[0-9a-zA-Z._-]*$`) ) +func ValidateSafeCharsOnly(ctx context.Context, str string, fieldName string) error { + if !ffSafeCharsValidator.MatchString(str) { + return i18n.NewError(ctx, i18n.MsgSafeCharsOnly, fieldName) + } + return nil +} + func ValidateFFNameField(ctx context.Context, str string, fieldName string) error { if !ffNameValidator.MatchString(str) { return i18n.NewError(ctx, i18n.MsgInvalidName, fieldName) diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 62434ea39..787f410dc 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -70,7 +70,7 @@ type Callbacks interface { // Only the party submitting the transaction will see this data. // // Error should will only be returned in shutdown scenarios - TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error + TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) error // TokenPoolCreated notifies on the creation of a new token pool, which might have been // submitted by us, or by any other authorized party in the network. diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 70fffa463..284800d4a 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -93,7 +93,7 @@ func validateReceivedMessages(ts *testState, client *resty.Client, msgType fftyp assert.Equal(ts.t, count, len(messages)) assert.Equal(ts.t, txtype, (messages)[idx].Header.TxType) assert.Equal(ts.t, "default", (messages)[idx].Header.Namespace) - assert.Equal(ts.t, fftypes.FFNameArray{"default"}, (messages)[idx].Header.Topics) + assert.Equal(ts.t, fftypes.FFStringArray{"default"}, (messages)[idx].Header.Topics) data := GetData(ts.t, client, ts.startTime, 200) var msgData *fftypes.Data