Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Blockchain Transaction IDs to FireFly Transaction objects and add blockchain IDs to events #448

Merged
merged 8 commits into from
Jan 27, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ ALTER TABLE transactions ADD COLUMN info BYTEA;

CREATE INDEX transactions_protocol_id ON transactions(protocol_id);
CREATE INDEX transactions_ref ON transactions(ref);

DROP INDEX transactions_blockchain_ids;
ALTER TABLE transactions DROP COLUMN blockchain_ids;
COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ ALTER TABLE transactions DROP COLUMN signer;
ALTER TABLE transactions DROP COLUMN hash;
ALTER TABLE transactions DROP COLUMN protocol_id;
ALTER TABLE transactions DROP COLUMN info;

ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024);
CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids);
COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ALTER TABLE transactions ADD COLUMN info BYTEA;

CREATE INDEX transactions_protocol_id ON transactions(protocol_id);
CREATE INDEX transactions_ref ON transactions(ref);

DROP INDEX transactions_blockchain_ids;
ALTER TABLE transactions DROP COLUMN blockchain_ids;
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ALTER TABLE transactions DROP COLUMN signer;
ALTER TABLE transactions DROP COLUMN hash;
ALTER TABLE transactions DROP COLUMN protocol_id;
ALTER TABLE transactions DROP COLUMN info;

ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024);
CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids);
22 changes: 22 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5078,6 +5078,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down Expand Up @@ -9837,6 +9841,11 @@ paths:
schema:
default: 120s
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: blockchainids
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: created
Expand Down Expand Up @@ -9902,6 +9911,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down Expand Up @@ -9945,6 +9958,11 @@ paths:
schema:
default: 120s
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: blockchainids
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: created
Expand Down Expand Up @@ -10010,6 +10028,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down
2 changes: 1 addition & 1 deletion internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestCalcPinsFail(t *testing.T) {
Messages: []*fftypes.Message{
{Header: fftypes.MessageHeader{
Group: gid,
Topics: fftypes.FFNameArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
},
Expand Down
40 changes: 23 additions & 17 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ func ethHexFormatB32(b *fftypes.Bytes32) string {

func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
sBlockNumber := msgJSON.GetString("blockNumber")
sTransactionIndex := msgJSON.GetString("transactionIndex")
sTransactionHash := msgJSON.GetString("transactionHash")
blockNumber := msgJSON.GetInt64("blockNumber")
txIndex := msgJSON.GetInt64("transactionIndex")
logIndex := msgJSON.GetInt64("logIndex")
dataJSON := msgJSON.GetObject("data")
authorAddress := dataJSON.GetString("author")
ns := dataJSON.GetString("namespace")
Expand All @@ -205,7 +207,6 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

if sBlockNumber == "" ||
sTransactionIndex == "" ||
awrichar marked this conversation as resolved.
Show resolved Hide resolved
sTransactionHash == "" ||
authorAddress == "" ||
sUUIDs == "" ||
Expand Down Expand Up @@ -257,12 +258,13 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
BatchPayloadRef: sPayloadRef,
Contexts: contexts,
Event: blockchain.Event{
Source: e.Name(),
Name: "BatchPin",
ProtocolID: sTransactionHash,
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
BlockchainTXID: sTransactionHash,
Source: e.Name(),
Name: "BatchPin",
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
},
}

Expand All @@ -272,6 +274,9 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
sTransactionHash := msgJSON.GetString("transactionHash")
blockNumber := msgJSON.GetInt64("blockNumber")
txIndex := msgJSON.GetInt64("transactionIndex")
logIndex := msgJSON.GetInt64("logIndex")
sub := msgJSON.GetString("subId")
signature := msgJSON.GetString("signature")
dataJSON := msgJSON.GetObject("data")
Expand All @@ -284,19 +289,20 @@ func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSON
}
delete(msgJSON, "data")

event := &blockchain.ContractEvent{
event := &blockchain.EventWithSubscription{
Subscription: sub,
Event: blockchain.Event{
Source: e.Name(),
Name: name,
ProtocolID: sTransactionHash,
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
BlockchainTXID: sTransactionHash,
Source: e.Name(),
Name: name,
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
},
}

return e.callbacks.ContractEvent(event)
return e.callbacks.BlockchainEvent(event)
}

func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error {
Expand All @@ -321,7 +327,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject)
updateType = fftypes.OpStatusFailed
}
l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message)
return e.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply)
return e.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply)
}

func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{}) error {
Expand Down
14 changes: 10 additions & 4 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
em.On("BlockchainOpUpdate",
operationID,
fftypes.OpStatusSucceeded,
"0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8",
"",
mock.Anything).Return(nil)

Expand Down Expand Up @@ -1022,6 +1023,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
txsu := em.On("BlockchainOpUpdate",
operationID,
fftypes.OpStatusFailed,
"",
"Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument",
mock.Anything).Return(fmt.Errorf("Shutdown"))
done := make(chan struct{})
Expand Down Expand Up @@ -1292,15 +1294,19 @@ func TestHandleMessageContractEvent(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(nil)
em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool {
assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID)
assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID)
return true
})).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)

ev := em.Calls[0].Arguments[0].(*blockchain.ContractEvent)
ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription)
assert.Equal(t, "sub2", ev.Subscription)
assert.Equal(t, "Changed", ev.Event.Name)

Expand Down Expand Up @@ -1351,7 +1357,7 @@ func TestHandleMessageContractEventNoTimestamp(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(nil)
em.On("BlockchainEvent", mock.Anything).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
Expand Down Expand Up @@ -1387,7 +1393,7 @@ func TestHandleMessageContractEventError(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(fmt.Errorf("pop"))
em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop"))

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/ffi_param_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSchemaTypeInvalidFFIType(t *testing.T) {

func TestSchemaTypeMissing(t *testing.T) {
_, err := NewTestSchema(`{}`)
assert.Regexp(t, "missing properties: 'type'", err)
awrichar marked this conversation as resolved.
Show resolved Hide resolved
assert.Regexp(t, "missing properties", err)
}

func TestSchemaDetailsTypeMissing(t *testing.T) {
Expand Down
40 changes: 24 additions & 16 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb
}

sTransactionHash := msgJSON.GetString("transactionId")
blockNumber := msgJSON.GetInt64("blockNumber")
transactionIndex := msgJSON.GetInt64("transactionIndex")
eventIndex := msgJSON.GetInt64("eventIndex")
signer := payload.GetString("signer")
ns := payload.GetString("namespace")
sUUIDs := payload.GetString("uuids")
Expand Down Expand Up @@ -315,12 +318,13 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb
BatchPayloadRef: sPayloadRef,
Contexts: contexts,
Event: blockchain.Event{
Source: f.Name(),
Name: "BatchPin",
ProtocolID: sTransactionHash,
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
BlockchainTXID: sTransactionHash,
Source: f.Name(),
Name: "BatchPin",
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex),
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
},
}

Expand All @@ -337,6 +341,9 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb
delete(msgJSON, "payload")

sTransactionHash := msgJSON.GetString("transactionId")
blockNumber := msgJSON.GetInt64("blockNumber")
transactionIndex := msgJSON.GetInt64("transactionIndex")
eventIndex := msgJSON.GetInt64("eventIndex")
sub := msgJSON.GetString("subId")
name := msgJSON.GetString("eventName")
sTimestamp := msgJSON.GetString("timestamp")
Expand All @@ -346,19 +353,20 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb
// Continue with zero timestamp
}

event := &blockchain.ContractEvent{
event := &blockchain.EventWithSubscription{
Subscription: sub,
Event: blockchain.Event{
Source: f.Name(),
Name: name,
ProtocolID: sTransactionHash,
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
BlockchainTXID: sTransactionHash,
Source: f.Name(),
Name: name,
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex),
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
},
}

return f.callbacks.ContractEvent(event)
return f.callbacks.BlockchainEvent(event)
}

func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error {
Expand All @@ -367,7 +375,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er
headers := reply.GetObject("headers")
requestID := headers.GetString("requestId")
replyType := headers.GetString("type")
txHash := reply.GetString("transactionHash")
txHash := reply.GetString("transactionId")
message := reply.GetString("errorMessage")
if requestID == "" || replyType == "" {
l.Errorf("Reply cannot be processed: %+v", reply)
Expand All @@ -383,7 +391,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er
updateType = fftypes.OpStatusFailed
}
l.Infof("Fabconnect '%s' reply tx=%s (request=%s) %s", replyType, txHash, requestID, message)
return f.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply)
return f.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply)
}

func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) error {
Expand Down
Loading