diff --git a/cmd/peer/main.go b/cmd/peer/main.go index bd8975e852b..ff62bde286c 100644 --- a/cmd/peer/main.go +++ b/cmd/peer/main.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package main import ( + "github.com/hyperledger/fabric/internal/peer/reconciler" _ "net/http/pprof" "os" "strings" @@ -50,6 +51,7 @@ func main() { mainCmd.AddCommand(channel.Cmd(nil)) mainCmd.AddCommand(lifecycle.Cmd(cryptoProvider)) mainCmd.AddCommand(snapshot.Cmd(cryptoProvider)) + mainCmd.AddCommand(reconciler.Cmd(nil)) // On failure Cobra prints the usage message and error string, so we only // need to exit with a non-0 status diff --git a/common/p2pmessage/metrics.go b/common/p2pmessage/metrics.go new file mode 100644 index 00000000000..cd8452edb4e --- /dev/null +++ b/common/p2pmessage/metrics.go @@ -0,0 +1,59 @@ +package p2pmessage + +import ( + "github.com/hyperledger/fabric/common/metrics" +) + +var ( + streamsOpened = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "streams_opened", + Help: "The number of GRPC streams that have been opened for the p2p_message service.", + } + streamsClosed = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "streams_closed", + Help: "The number of GRPC streams that have been closed for the p2p_message service.", + } + + requestsReceived = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "requests_received", + Help: "The number of p2p_message requests that have been received.", + LabelNames: []string{"channel", "filtered", "data_type"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}", + } + requestsCompleted = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "requests_completed", + Help: "The number of p2p_message requests that have been completed.", + LabelNames: []string{"channel", "filtered", "data_type", "success"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}.%{success}", + } + + blocksReconciled = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "blocks_reconciled", + Help: "The number of blocks sent by the p2p_message service.", + LabelNames: []string{"channel", "filtered", "data_type"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}", + } +) + +type Metrics struct { + StreamsOpened metrics.Counter + StreamsClosed metrics.Counter + RequestsReceived metrics.Counter + RequestsCompleted metrics.Counter + BlocksReconciled metrics.Counter +} + +func NewMetrics(p metrics.Provider) *Metrics { + return &Metrics{ + StreamsOpened: p.NewCounter(streamsOpened), + StreamsClosed: p.NewCounter(streamsClosed), + RequestsReceived: p.NewCounter(requestsReceived), + RequestsCompleted: p.NewCounter(requestsCompleted), + BlocksReconciled: p.NewCounter(blocksReconciled), + } +} diff --git a/common/p2pmessage/p2pmessage.go b/common/p2pmessage/p2pmessage.go new file mode 100644 index 00000000000..d20c50ae920 --- /dev/null +++ b/common/p2pmessage/p2pmessage.go @@ -0,0 +1,146 @@ +package p2pmessage + +import ( + "context" + "fmt" + "github.com/hyperledger/fabric/common/crypto" + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger/blockledger" + "github.com/hyperledger/fabric/common/policies" + "github.com/hyperledger/fabric/common/util" + gossipprivdata "github.com/hyperledger/fabric/gossip/privdata" + "github.com/hyperledger/fabric/internal/peer/protos" + "time" +) + +var logger = flogging.MustGetLogger("common.p2pmessage") + +//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager + +// ChainManager provides a way for the Handler to look up the Chain. +type ChainManager interface { + GetChain(chainID string) Chain +} + +//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain + +// Chain encapsulates chain operations and data. +type Chain interface { + // Sequence returns the current config sequence number, can be used to detect config changes + Sequence() uint64 + + // PolicyManager returns the current policy manager as specified by the chain configuration + PolicyManager() policies.Manager + + // Reader returns the chain Reader for the chain + Reader() blockledger.Reader + + // Errored returns a channel which closes when the backing consenter has errored + Errored() <-chan struct{} +} + +////go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker +// +//// PolicyChecker checks the envelope against the policy logic supplied by the +//// function. +//type PolicyChecker interface { +// CheckPolicy(envelope *cb.Envelope, channelID string) error +//} +// +//// The PolicyCheckerFunc is an adapter that allows the use of an ordinary +//// function as a PolicyChecker. +//type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error +// +//// CheckPolicy calls pcf(envelope, channelID) +//func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error { +// return pcf(envelope, channelID) +//} + +////go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector +// +//// Inspector verifies an appropriate binding between the message and the context. +//type Inspector interface { +// Inspect(context.Context, proto.Message) error +//} +// +//// The InspectorFunc is an adapter that allows the use of an ordinary +//// function as an Inspector. +//type InspectorFunc func(context.Context, proto.Message) error +// +//// Inspect calls inspector(ctx, p) +//func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error { +// return inspector(ctx, p) +//} + +// Handler handles server requests. +type Handler struct { + ExpirationCheckFunc func(identityBytes []byte) time.Time + ChainManager ChainManager + TimeWindow time.Duration + //BindingInspector Inspector + Metrics *Metrics +} + +// Server is a polymorphic structure to support generalization of this handler +// to be able to deliver different type of responses. +type Server struct { + Receiver +} + +// Receiver is used to receive enveloped seek requests. +type Receiver interface { + SendReconcileRequest() +} + +// NewHandler creates an implementation of the Handler interface. +func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics, expirationCheckDisabled bool) *Handler { + expirationCheck := crypto.ExpiresAt + if expirationCheckDisabled { + expirationCheck = noExpiration + } + return &Handler{ + ChainManager: cm, + TimeWindow: timeWindow, + //BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)), + Metrics: metrics, + ExpirationCheckFunc: expirationCheck, + } +} + +// Handle receives incoming deliver requests. +func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) { + addr := util.ExtractRemoteAddress(ctx) + logger.Debugf("Starting new p2p loop for %s", addr) + h.Metrics.StreamsOpened.Add(1) + defer h.Metrics.StreamsClosed.Add(1) + + reconcileResponse := &protos.ReconcileResponse{ + Success: false, + } + + // Calling Reconciler Service + // reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService() + reconciler := gossipprivdata.GetOnDemandReconcilerService(request.ChannelId) + fmt.Println(reconciler) + + if reconciler == nil { + reconcileResponse.Message = "no reconciler found for channel " + request.ChannelId + + return reconcileResponse, fmt.Errorf("no reconciler found for channel " + request.ChannelId) + } + response, err := reconciler.Reconcile(request.BlockNumber) + return &response, err +} + +// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header. +//func ExtractChannelHeaderCertHash(msg proto.Message) []byte { +// chdr, isChannelHeader := msg.(*cb.ChannelHeader) +// if !isChannelHeader || chdr == nil { +// return nil +// } +// return chdr.TlsCertHash +//} + +func noExpiration(_ []byte) time.Time { + return time.Time{} +} diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index f354a449bd7..3ebe8b92505 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -464,8 +464,8 @@ func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.Tx return nil } -//recommitLostBlocks retrieves blocks in specified range and commit the write set to either -//state DB or history DB or both +// recommitLostBlocks retrieves blocks in specified range and commit the write set to either +// state DB or history DB or both func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error { logger.Infof("Recommitting lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables) var err error @@ -762,6 +762,23 @@ func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledge return l.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock) } +// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the +// specified blocks which miss at least a private data of a eligible collection. +func (l *kvLedger) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + // the missing pvtData info in the pvtdataStore could belong to a block which is yet + // to be processed and committed to the blockStore and stateDB (such a scenario is possible + // after a peer rollback). In such cases, we cannot return missing pvtData info. Otherwise, + // we would end up in an inconsistent state database. + if l.isPvtstoreAheadOfBlkstore.Load().(bool) { + return nil, nil + } + // it is safe to not acquire a read lock on l.blockAPIsRWLock. Without a lock, the value of + // lastCommittedBlock can change due to a new block commit. As a result, we may not + // be able to fetch the missing data info of truly the most recent blocks. This + // decision was made to ensure that the regular block commit rate is not affected. + return l.pvtdataStore.GetMissingPvtDataInfoForSpecificBlock(blockNumber) +} + func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []byte) { var valueBytes []byte @@ -812,9 +829,12 @@ func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilte // DoesPvtDataInfoExist returns true when // (1) the ledger has pvtdata associated with the given block number (or) // (2) a few or all pvtdata associated with the given block number is missing but the -// missing info is recorded in the ledger (or) +// +// missing info is recorded in the ledger (or) +// // (3) the block is committed but it does not contain even a single -// transaction with pvtData. +// +// transaction with pvtData. func (l *kvLedger) DoesPvtDataInfoExist(blockNum uint64) (bool, error) { pvtStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight() if err != nil { diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index 38ad285d5d5..9abde93c197 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -536,6 +536,7 @@ type ConfigHistoryRetriever interface { // MissingPvtDataTracker allows getting information about the private data that is not missing on the peer type MissingPvtDataTracker interface { GetMissingPvtDataInfoForMostRecentBlocks(maxBlocks int) (MissingPvtDataInfo, error) + GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (MissingPvtDataInfo, error) } // MissingPvtDataInfo is a map of block number to MissingBlockPvtdataInfo diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 6a2975dd6d5..fe370c0b730 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package pvtdatastorage import ( + "fmt" "sync" "sync/atomic" "time" @@ -476,6 +477,41 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock) } +// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the +// specified block which miss at least a private data of a eligible collection. +func (s *Store) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + // we assume that this function would be called by the gossip only after processing the + // last retrieved missing pvtdata info and committing the same. + if blockNumber < 1 { + return nil, fmt.Errorf("invalid block number [%d]", blockNumber) + } + + lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock) + if blockNumber > lastCommittedBlock { + return nil, fmt.Errorf("block [%d] is not committed yet to the ledger", blockNumber) + } + + logger.Debug("fetching missing pvtdata entries from the de-prioritized list") + dePrioratizedQue, err := s.getMissingDataFromSpecificBlock(elgDeprioritizedMissingDataGroup, blockNumber) + if err != nil { + logger.Debugf("Error in fetching missing pvtdata entries from the de-prioritized list: %s", err) + return nil, err + } + logger.Debug("fetching missing pvtdata entries from the prioritized list") + prioritizedQue, err := s.getMissingDataFromSpecificBlock(elgPrioritizedMissingDataGroup, blockNumber) + if err != nil { + logger.Debugf("Error in fetching missing pvtdata entries from the prioritized list: %s", err) + return nil, err + } + + for k, v := range dePrioratizedQue { + prioritizedQue[k] = v // This will overwrite map1's entry if the key exists + } + + return prioritizedQue, nil + +} + func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) { missingPvtDataInfo := make(ledger.MissingPvtDataInfo) numberOfBlockProcessed := 0 @@ -550,6 +586,54 @@ func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDat return missingPvtDataInfo, nil } +func (s *Store) getMissingDataFromSpecificBlock(group []byte, blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + missingPvtDataInfo := make(ledger.MissingPvtDataInfo) + + startKey, endKey := createRangeScanKeysForElgMissingData(blockNumber, group) + dbItr, err := s.db.GetIterator(startKey, endKey) + if err != nil { + return nil, err + } + defer dbItr.Release() + + for dbItr.Next() { + missingDataKeyBytes := dbItr.Key() + missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes) + + // check whether the entry is expired. If so, move to the next item. + // As we may use the old lastCommittedBlock value, there is a possibility that + // this missing data is actually expired but we may get the stale information. + // Though it may leads to extra work of pulling the expired data, it will not + // affect the correctness. Further, as we try to fetch the most recent missing + // data (less possibility of expiring now), such scenario would be rare. In the + // best case, we can load the latest lastCommittedBlock value here atomically to + // make this scenario very rare. + expired, err := isExpired(missingDataKey.nsCollBlk, s.btlPolicy, blockNumber) + if err != nil { + return nil, err + } + if expired { + continue + } + + valueBytes := dbItr.Value() + bitmap, err := decodeMissingDataValue(valueBytes) + if err != nil { + return nil, err + } + + // for each transaction which misses private data, make an entry in missingBlockPvtDataInfo + for index, isSet := bitmap.NextSet(0); isSet; index, isSet = bitmap.NextSet(index + 1) { + txNum := uint64(index) + missingPvtDataInfo.Add(missingDataKey.blkNum, txNum, missingDataKey.ns, missingDataKey.coll) + } + + break + } + + return missingPvtDataInfo, nil +} + // FetchBootKVHashes returns the KVHashes from the data that was loaded from a snapshot at the time of // bootstrapping. This funciton returns an error if the supplied blkNum is greater than the last block // number in the booting snapshot diff --git a/core/peer/p2pmessage.go b/core/peer/p2pmessage.go new file mode 100644 index 00000000000..6d23026b08c --- /dev/null +++ b/core/peer/p2pmessage.go @@ -0,0 +1,23 @@ +package peer + +import ( + "context" + "github.com/hyperledger/fabric/common/p2pmessage" + "github.com/hyperledger/fabric/internal/peer/protos" +) + +// P2pMessageServer holds the dependencies necessary to create a deliver server +type P2pMessageServer struct { + DeliverHandler *p2pmessage.Handler + PolicyCheckerProvider PolicyCheckerProvider + CollectionPolicyChecker CollectionPolicyChecker + IdentityDeserializerMgr IdentityDeserializerManager +} + +// Deliver sends a stream of blocks to a client after commitment +func (s *P2pMessageServer) SendReconcileRequest(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) { + logger.Debugf("Starting new Deliver handler") + defer dumpStacktraceOnPanic() + + return s.DeliverHandler.Handle(ctx, request) +} diff --git a/core/peer/peer.go b/core/peer/peer.go index 64f1c4493d1..48c9e7afce2 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -8,6 +8,7 @@ package peer import ( "fmt" + "github.com/hyperledger/fabric/common/p2pmessage" "sync" "github.com/hyperledger/fabric-protos-go/common" @@ -118,6 +119,21 @@ func (p *Peer) updateTrustedRoots(cm channelconfig.Resources) { } } +// +// P2P Message service support structs for the peer +// + +type P2PMessageChainManager struct { + Peer *Peer +} + +func (p P2PMessageChainManager) GetChain(chainID string) p2pmessage.Chain { + if channel := p.Peer.Channel(chainID); channel != nil { + return channel + } + return nil +} + // // Deliver service support structs for the peer // diff --git a/gossip/privdata/reconcile.go b/gossip/privdata/reconcile.go index 145e88c245d..bcda187bdae 100644 --- a/gossip/privdata/reconcile.go +++ b/gossip/privdata/reconcile.go @@ -9,6 +9,7 @@ package privdata import ( "encoding/hex" "fmt" + "github.com/hyperledger/fabric/internal/peer/protos" "math" "sync" "time" @@ -53,6 +54,9 @@ type PvtDataReconciler interface { Start() // Stop function stops reconciler Stop() + + //Reconcile performs on demand reconcilation a block or transaction + Reconcile(uint65 uint64) (protos.ReconcileResponse, error) } type Reconciler struct { @@ -68,6 +72,25 @@ type Reconciler struct { committer.Committer } +var ReconcilerServiceRegistry = make(map[string]PvtDataReconciler) + +// SetOnDemandReconcilerService sets a reconciler service by name +func SetOnDemandReconcilerService(name string, reconciler PvtDataReconciler) { + ReconcilerServiceRegistry[name] = reconciler +} + +func GetOnDemandReconcilerService(name string) PvtDataReconciler { + + if len(ReconcilerServiceRegistry) == 0 { + return nil + } + if ReconcilerServiceRegistry[name] == nil { + return nil + } + + return ReconcilerServiceRegistry[name] +} + // NoOpReconciler non functional reconciler to be used // in case reconciliation has been disabled type NoOpReconciler struct { @@ -82,6 +105,11 @@ func (*NoOpReconciler) Stop() { // do nothing } +func (*NoOpReconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) { + logger.Debug("Private data reconciliation has been disabled") + return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil +} + // NewReconciler creates a new instance of reconciler func NewReconciler(channel string, metrics *metrics.PrivdataMetrics, c committer.Committer, fetcher ReconciliationFetcher, config *PrivdataConfig) *Reconciler { @@ -111,6 +139,10 @@ func (r *Reconciler) Start() { }) } +func (r *Reconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) { + return r.reconcileSpecific(num) +} + func (r *Reconciler) run() { for { select { @@ -126,6 +158,60 @@ func (r *Reconciler) run() { } } +// ReconcileSpecific initiates a reconcilation for specific block and returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error +func (r *Reconciler) reconcileSpecific(num uint64) (protos.ReconcileResponse, error) { + missingPvtDataTracker, err := r.GetMissingPvtDataTracker() + if err != nil { + r.logger.Error("reconciliation error when trying to get missingPvtDataTracker:", err) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + if missingPvtDataTracker == nil { + r.logger.Error("got nil as MissingPvtDataTracker, exiting...") + return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil + } + totalReconciled := 0 + + defer r.reportReconciliationDuration(time.Now()) + + // for { + missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForSpecificBlock(num) + if err != nil { + r.logger.Errorf("reconciliation error when trying to get missing pvt data info for the block [%d] blocks: error %v", num, err.Error()) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + // if missingPvtDataInfo is nil, len will return 0 + if len(missingPvtDataInfo) == 0 { + r.logger.Debug("Reconciliation cycle finished successfully. no items to reconcile") + return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. nothing to reconcile for blocks range [%d]", num)}, nil + } + + r.logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...") + + dig2collectionCfg, _, _ := r.getDig2CollectionConfig(missingPvtDataInfo) + fetchedData, err := r.FetchReconciledItems(dig2collectionCfg) + if err != nil { + r.logger.Error("reconciliation error when trying to fetch missing items from different peers:", err) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + + pvtDataToCommit := r.preparePvtDataToCommit(fetchedData.AvailableElements) + unreconciled := constructUnreconciledMissingData(dig2collectionCfg, fetchedData.AvailableElements) + pvtdataHashMismatch, err := r.CommitPvtDataOfOldBlocks(pvtDataToCommit, unreconciled) + if err != nil { + return protos.ReconcileResponse{Success: false, Message: "failed to commit private data"}, err + } + r.logMismatched(pvtdataHashMismatch) + //if minB < minBlock { + // minBlock = minB + //} + //if maxB > maxBlock { + // maxBlock = maxB + //} + totalReconciled += len(fetchedData.AvailableElements) + // } + return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d]", totalReconciled, num)}, nil +} + // returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error func (r *Reconciler) reconcile() error { missingPvtDataTracker, err := r.GetMissingPvtDataTracker() diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index d900d79e2a4..bb375690518 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -366,6 +366,10 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order reconciler = &gossipprivdata.NoOpReconciler{} } + // reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService() + // reconcilerServiceRegistry.SetOnDemandReconcilerService(channelID, reconciler) + gossipprivdata.SetOnDemandReconcilerService(channelID, reconciler) + pushAckTimeout := g.serviceConfig.PvtDataPushAckTimeout g.privateHandlers[channelID] = privateHandler{ support: support, diff --git a/internal/peer/common/deliverclient.go b/internal/peer/common/deliverclient.go index 7ebd63afcfe..0cb4bf95e4a 100644 --- a/internal/peer/common/deliverclient.go +++ b/internal/peer/common/deliverclient.go @@ -12,6 +12,7 @@ import ( pb "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/internal/peer/protos" "github.com/hyperledger/fabric/internal/pkg/identity" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" @@ -157,6 +158,14 @@ func seekHelper( return env } +type ordererP2PDeliverService struct { + protos.ReconcileServiceClient +} + +type peerP2PDeliverService struct { + protos.ReconcileServiceClient +} + type ordererDeliverService struct { ab.AtomicBroadcast_DeliverClient } diff --git a/internal/peer/common/ordererclient.go b/internal/peer/common/ordererclient.go index e7de91f57e3..277117c4893 100644 --- a/internal/peer/common/ordererclient.go +++ b/internal/peer/common/ordererclient.go @@ -8,8 +8,8 @@ package common import ( "context" "crypto/tls" - ab "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/internal/peer/protos" "github.com/hyperledger/fabric/internal/pkg/comm" "github.com/pkg/errors" ) @@ -66,3 +66,12 @@ func (oc *OrdererClient) Deliver() (ab.AtomicBroadcast_DeliverClient, error) { func (oc *OrdererClient) Certificate() tls.Certificate { return oc.CommonClient.Certificate() } + +func (oc *OrdererClient) P2PMessage() (protos.ReconcileServiceClient, error) { + conn, err := oc.CommonClient.NewConnection(oc.Address, comm.ServerNameOverride(oc.sn)) + if err != nil { + return nil, errors.WithMessagef(err, "orderer client failed to connect to %s", oc.Address) + } + // TODO: check to see if we should actually handle error before returning + return protos.NewReconcileServiceClient(conn), nil +} diff --git a/internal/peer/common/p2pmessage.go b/internal/peer/common/p2pmessage.go new file mode 100644 index 00000000000..8793176e607 --- /dev/null +++ b/internal/peer/common/p2pmessage.go @@ -0,0 +1,126 @@ +package common + +import ( + "context" + "fmt" + "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/internal/peer/protos" + "github.com/hyperledger/fabric/internal/pkg/identity" + "github.com/pkg/errors" +) + +// P2PMessageClient holds the necessary information to connect a client +// to an orderer/peer deliver service +type P2PMessageClient struct { + Signer identity.SignerSerializer + Service protos.ReconcileServiceClient + ChannelID string + TLSCertHash []byte +} + +func (p *P2PMessageClient) reconcileSpecified(blockNumber uint64) (*protos.ReconcileResponse, error) { + reconcileRequest := &protos.ReconcileRequest{ + BlockNumber: blockNumber, + ChannelId: p.ChannelID, + } + //env := reconHelper(p.ChannelID, seekPosition, p.TLSCertHash, p.Signer) + fmt.Println(reconcileRequest) + return p.Service.SendReconcileRequest(context.Background(), reconcileRequest) +} + +func (p *P2PMessageClient) ReconcileSpecifiedBlock(num uint64) (*protos.ReconcileResponse, error) { + logger.Debugf("Reconciling block %d", num) + response, err := p.reconcileSpecified(num) + if err != nil { + return nil, errors.WithMessage(err, "error getting specified block") + } + + return response, nil +} + +func (p *P2PMessageClient) Close() error { + logger.Debugf("Reconciliation client close") + return nil +} + +// NewP2PMessageClientForOrderer creates a new DeliverClient from an OrdererClient +func NewP2PMessageClientForOrderer(channelID string, signer identity.SignerSerializer) (*P2PMessageClient, error) { + oc, err := NewOrdererClientFromEnv() + if err != nil { + return nil, errors.WithMessage(err, "failed to create deliver client for orderer") + } + + dc, err := oc.P2PMessage() + if err != nil { + return nil, errors.WithMessage(err, "failed to create deliver client for orderer") + } + // check for client certificate and create hash if present + var tlsCertHash []byte + if len(oc.Certificate().Certificate) > 0 { + tlsCertHash = util.ComputeSHA256(oc.Certificate().Certificate[0]) + } + ds := &ordererP2PDeliverService{dc} + o := &P2PMessageClient{ + Signer: signer, + Service: ds, + ChannelID: channelID, + TLSCertHash: tlsCertHash, + } + return o, nil +} + +// NewP2PMessageClientForPeer creates a new DeliverClient from a PeerClient +func NewP2PMessageClientForPeer(channelID string, signer identity.SignerSerializer) (*P2PMessageClient, error) { + var tlsCertHash []byte + pc, err := NewPeerClientFromEnv() + if err != nil { + return nil, errors.WithMessage(err, "failed to create deliver client for peer") + } + + d, err := pc.P2PMessage() + if err != nil { + return nil, errors.WithMessage(err, "failed to create deliver client for peer") + } + + // check for client certificate and create hash if present + if len(pc.Certificate().Certificate) > 0 { + tlsCertHash = util.ComputeSHA256(pc.Certificate().Certificate[0]) + } + ds := &peerP2PDeliverService{d} + p := &P2PMessageClient{ + Signer: signer, + Service: ds, + ChannelID: channelID, + TLSCertHash: tlsCertHash, + } + return p, nil +} + +//func reconHelper( +// channelID string, +// position *ab.SeekPosition, +// tlsCertHash []byte, +// signer identity.SignerSerializer, +//) *cb.Envelope { +// seekInfo := &ab.SeekInfo{ +// Start: position, +// Stop: position, +// Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, +// } +// +// env, err := protoutil.CreateSignedEnvelopeWithTLSBinding( +// cb.HeaderType_DELIVER_SEEK_INFO, +// channelID, +// signer, +// seekInfo, +// int32(0), +// uint64(0), +// tlsCertHash, +// ) +// if err != nil { +// logger.Errorf("Error signing envelope: %s", err) +// return nil +// } +// +// return env +//} diff --git a/internal/peer/common/peerclient.go b/internal/peer/common/peerclient.go index 9c6d5548159..c7a129fae8a 100644 --- a/internal/peer/common/peerclient.go +++ b/internal/peer/common/peerclient.go @@ -9,6 +9,7 @@ package common import ( "context" "crypto/tls" + "github.com/hyperledger/fabric/internal/peer/protos" "io/ioutil" "time" @@ -123,6 +124,15 @@ func (pc *PeerClient) PeerDeliver() (pb.DeliverClient, error) { return pb.NewDeliverClient(conn), nil } +// P2PMessage returns a client for the Deliver service +func (pc *PeerClient) P2PMessage() (protos.ReconcileServiceClient, error) { + conn, err := pc.CommonClient.NewConnection(pc.Address, comm.ServerNameOverride(pc.sn)) + if err != nil { + return nil, errors.WithMessagef(err, "deliver client failed to connect to %s", pc.Address) + } + return protos.NewReconcileServiceClient(conn), nil +} + // Certificate returns the TLS client certificate (if available) func (pc *PeerClient) Certificate() tls.Certificate { return pc.CommonClient.Certificate() diff --git a/internal/peer/node/start.go b/internal/peer/node/start.go index ea1dfa76f78..71a0bf58422 100644 --- a/internal/peer/node/start.go +++ b/internal/peer/node/start.go @@ -9,6 +9,8 @@ package node import ( "context" "fmt" + "github.com/hyperledger/fabric/common/p2pmessage" + "github.com/hyperledger/fabric/internal/peer/protos" "io" "io/ioutil" "net" @@ -490,19 +492,33 @@ func serve(args []string) error { } } - metrics := deliver.NewMetrics(metricsProvider) + deliverMetrics := deliver.NewMetrics(metricsProvider) abServer := &peer.DeliverServer{ DeliverHandler: deliver.NewHandler( &peer.DeliverChainManager{Peer: peerInstance}, coreConfig.AuthenticationTimeWindow, mutualTLS, - metrics, + deliverMetrics, false, ), PolicyCheckerProvider: policyCheckerProvider, } pb.RegisterDeliverServer(peerServer.Server(), abServer) + // Block & Txn Reconcile Implementation + p2pMessageMetrics := p2pmessage.NewMetrics(metricsProvider) + p2pMessageServer := &peer.P2pMessageServer{ + DeliverHandler: p2pmessage.NewHandler( + &peer.P2PMessageChainManager{Peer: peerInstance}, + coreConfig.AuthenticationTimeWindow, + mutualTLS, + p2pMessageMetrics, + false, + ), + PolicyCheckerProvider: policyCheckerProvider, + } + protos.RegisterReconcileServiceServer(peerServer.Server(), p2pMessageServer) + // Create a self-signed CA for chaincode service ca, err := tlsgen.NewCA() if err != nil { diff --git a/internal/peer/protos/reconciler.pb.go b/internal/peer/protos/reconciler.pb.go new file mode 100644 index 00000000000..6966fd02a38 --- /dev/null +++ b/internal/peer/protos/reconciler.pb.go @@ -0,0 +1,227 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: reconciler.proto + +package protos + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// ReconcileRequest represents a request to reconcile a block. +type ReconcileRequest struct { + ChannelId string `protobuf:"bytes,1,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"` + BlockNumber uint64 `protobuf:"varint,2,opt,name=block_number,json=blockNumber,proto3" json:"block_number,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReconcileRequest) Reset() { *m = ReconcileRequest{} } +func (m *ReconcileRequest) String() string { return proto.CompactTextString(m) } +func (*ReconcileRequest) ProtoMessage() {} +func (*ReconcileRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_eb40bcc605516477, []int{0} +} + +func (m *ReconcileRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReconcileRequest.Unmarshal(m, b) +} +func (m *ReconcileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReconcileRequest.Marshal(b, m, deterministic) +} +func (m *ReconcileRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReconcileRequest.Merge(m, src) +} +func (m *ReconcileRequest) XXX_Size() int { + return xxx_messageInfo_ReconcileRequest.Size(m) +} +func (m *ReconcileRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReconcileRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReconcileRequest proto.InternalMessageInfo + +func (m *ReconcileRequest) GetChannelId() string { + if m != nil { + return m.ChannelId + } + return "" +} + +func (m *ReconcileRequest) GetBlockNumber() uint64 { + if m != nil { + return m.BlockNumber + } + return 0 +} + +// ReconcileResponse represents the response from a reconciliation request. +type ReconcileResponse struct { + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReconcileResponse) Reset() { *m = ReconcileResponse{} } +func (m *ReconcileResponse) String() string { return proto.CompactTextString(m) } +func (*ReconcileResponse) ProtoMessage() {} +func (*ReconcileResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_eb40bcc605516477, []int{1} +} + +func (m *ReconcileResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReconcileResponse.Unmarshal(m, b) +} +func (m *ReconcileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReconcileResponse.Marshal(b, m, deterministic) +} +func (m *ReconcileResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReconcileResponse.Merge(m, src) +} +func (m *ReconcileResponse) XXX_Size() int { + return xxx_messageInfo_ReconcileResponse.Size(m) +} +func (m *ReconcileResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReconcileResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ReconcileResponse proto.InternalMessageInfo + +func (m *ReconcileResponse) GetSuccess() bool { + if m != nil { + return m.Success + } + return false +} + +func (m *ReconcileResponse) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + +func init() { + proto.RegisterType((*ReconcileRequest)(nil), "protos.ReconcileRequest") + proto.RegisterType((*ReconcileResponse)(nil), "protos.ReconcileResponse") +} + +func init() { proto.RegisterFile("reconciler.proto", fileDescriptor_eb40bcc605516477) } + +var fileDescriptor_eb40bcc605516477 = []byte{ + // 197 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xc1, 0xca, 0x82, 0x40, + 0x14, 0x85, 0xf1, 0xe7, 0xa7, 0xf2, 0xd6, 0xc2, 0x86, 0x16, 0x53, 0x10, 0x98, 0x2b, 0x57, 0x2e, + 0xea, 0x21, 0x22, 0x82, 0x16, 0x63, 0x7b, 0xd1, 0xf1, 0x52, 0x92, 0xce, 0xd8, 0x5c, 0xed, 0xf9, + 0xa3, 0x31, 0x41, 0xaa, 0xd5, 0x70, 0xce, 0x07, 0x87, 0x6f, 0x2e, 0x78, 0x06, 0xa5, 0x56, 0xb2, + 0x28, 0xd1, 0x44, 0xb5, 0xd1, 0x8d, 0x66, 0x23, 0xfb, 0x50, 0x70, 0x06, 0x4f, 0xf4, 0x4c, 0xe0, + 0xbd, 0x45, 0x6a, 0xd8, 0x1a, 0x40, 0x5e, 0x53, 0xa5, 0xb0, 0x4c, 0x8a, 0x9c, 0x3b, 0xbe, 0x13, + 0xba, 0xc2, 0x7d, 0x37, 0x87, 0x9c, 0x6d, 0x60, 0x96, 0x95, 0x5a, 0xde, 0x12, 0xd5, 0x56, 0x19, + 0x1a, 0xfe, 0xe7, 0x3b, 0xe1, 0xbf, 0x98, 0xda, 0xee, 0x64, 0xab, 0x60, 0x0f, 0xf3, 0xc1, 0x2a, + 0xd5, 0x5a, 0x11, 0x32, 0x0e, 0x63, 0x6a, 0xa5, 0x44, 0x22, 0xbb, 0x39, 0x11, 0x7d, 0x7c, 0x91, + 0x0a, 0x89, 0xd2, 0x0b, 0xda, 0x31, 0x57, 0xf4, 0x71, 0x9b, 0x0c, 0xf4, 0x62, 0x34, 0x8f, 0x42, + 0x22, 0x3b, 0xc2, 0x22, 0x46, 0x95, 0x7f, 0x69, 0xf3, 0xee, 0x6b, 0x14, 0x7d, 0x92, 0xd5, 0xf2, + 0x07, 0xe9, 0xa4, 0xb2, 0xee, 0x0e, 0xbb, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6a, 0xe0, 0xc9, + 0x79, 0x22, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// ReconcileServiceClient is the client API for ReconcileService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ReconcileServiceClient interface { + // RPC method to send a reconcile request + SendReconcileRequest(ctx context.Context, in *ReconcileRequest, opts ...grpc.CallOption) (*ReconcileResponse, error) +} + +type reconcileServiceClient struct { + cc *grpc.ClientConn +} + +func NewReconcileServiceClient(cc *grpc.ClientConn) ReconcileServiceClient { + return &reconcileServiceClient{cc} +} + +func (c *reconcileServiceClient) SendReconcileRequest(ctx context.Context, in *ReconcileRequest, opts ...grpc.CallOption) (*ReconcileResponse, error) { + out := new(ReconcileResponse) + err := c.cc.Invoke(ctx, "/protos.ReconcileService/SendReconcileRequest", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ReconcileServiceServer is the server API for ReconcileService service. +type ReconcileServiceServer interface { + // RPC method to send a reconcile request + SendReconcileRequest(context.Context, *ReconcileRequest) (*ReconcileResponse, error) +} + +// UnimplementedReconcileServiceServer can be embedded to have forward compatible implementations. +type UnimplementedReconcileServiceServer struct { +} + +func (*UnimplementedReconcileServiceServer) SendReconcileRequest(ctx context.Context, req *ReconcileRequest) (*ReconcileResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SendReconcileRequest not implemented") +} + +func RegisterReconcileServiceServer(s *grpc.Server, srv ReconcileServiceServer) { + s.RegisterService(&_ReconcileService_serviceDesc, srv) +} + +func _ReconcileService_SendReconcileRequest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReconcileRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReconcileServiceServer).SendReconcileRequest(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protos.ReconcileService/SendReconcileRequest", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReconcileServiceServer).SendReconcileRequest(ctx, req.(*ReconcileRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _ReconcileService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "protos.ReconcileService", + HandlerType: (*ReconcileServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SendReconcileRequest", + Handler: _ReconcileService_SendReconcileRequest_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "reconciler.proto", +} diff --git a/internal/peer/protos/reconciler.proto b/internal/peer/protos/reconciler.proto new file mode 100644 index 00000000000..934c9c17080 --- /dev/null +++ b/internal/peer/protos/reconciler.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package protos; + +// ReconcileRequest represents a request to reconcile a block. +message ReconcileRequest { + string channel_id = 1; // The name of the channel. + uint64 block_number = 2; // The block number to reconcile. +} + +// ReconcileResponse represents the response from a reconciliation request. +message ReconcileResponse { + bool success = 1; // Indicates if the reconciliation was successful. + string message = 2; // A message providing additional information. +} + +// Define the service for reconciliation +service ReconcileService { + // RPC method to send a reconcile request + rpc SendReconcileRequest(ReconcileRequest) returns (ReconcileResponse); +} diff --git a/internal/peer/reconciler/block.go b/internal/peer/reconciler/block.go new file mode 100644 index 00000000000..419a6201d42 --- /dev/null +++ b/internal/peer/reconciler/block.go @@ -0,0 +1,80 @@ +package reconciler + +import ( + "fmt" + "github.com/hyperledger/fabric/internal/peer/common" + "github.com/spf13/cobra" + "strconv" +) + +func reconcileBlockCmd(cf *ReconcileCmdFactory) *cobra.Command { + reconcileBlockCmd := &cobra.Command{ + Use: "block", + Short: "reconcile on a block of a specified channel.", + Long: "get blockchain information of a specified channel. Requires '-c'.", + RunE: func(cmd *cobra.Command, args []string) error { + return reconcileBlock(cmd, args, cf) + }, + } + flagList := []string{ + "channelID", + } + attachFlags(reconcileBlockCmd, flagList) + + return reconcileBlockCmd +} + +func reconcileBlock(cmd *cobra.Command, args []string, rf *ReconcileCmdFactory) error { + + logger.Debugf("Received the arguments for reconcilation %d", args) + + if len(args) == 0 { + return fmt.Errorf("reconcile target required, block number") + } + if len(args) > 1 { + return fmt.Errorf("trailing args detected") + } + + blockNumber, err2 := strconv.Atoi(args[0]) + if err2 != nil { + return fmt.Errorf("fetch target illegal: %s", args[0]) + } + + if blockNumber < 0 { + return fmt.Errorf("block number must be above 0") + } + + // the global chainID filled by the "-c" command + if channelID == common.UndefinedParamValue { + return fmt.Errorf("must supply channel ID") + } + + // Parsing of the command line is done so silence cmd usage + cmd.SilenceUsage = true + + var err error + + if rf == nil { + rf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverRequired, OrdererNotRequired) + if err != nil { + return err + } + } + + response, err := rf.DeliverClient.ReconcileSpecifiedBlock(uint64(blockNumber)) + + if err != nil { + fmt.Printf("Failed to reconcile block %d: %v\n", blockNumber, err.Error()) + } else { + fmt.Println(response) + } + + // fmt.Printf("Received the arguments for reconcilation %v on channel %s\n", args, channelID) + + err = rf.DeliverClient.Close() + if err != nil { + return err + } + + return nil +} diff --git a/internal/peer/reconciler/reconciler.go b/internal/peer/reconciler/reconciler.go new file mode 100644 index 00000000000..4b75f712e63 --- /dev/null +++ b/internal/peer/reconciler/reconciler.go @@ -0,0 +1,137 @@ +package reconciler + +import ( + pb "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/internal/peer/common" + "github.com/hyperledger/fabric/internal/peer/protos" + "github.com/hyperledger/fabric/msp" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "strings" + "time" +) + +var logger = flogging.MustGetLogger("reconcileCmd") + +var ( + channelID string + timeout time.Duration +) + +const ( + EndorserNotRequired bool = false + OrdererNotRequired bool = false + PeerDeliverRequired bool = true +) + +// Cmd returns the cobra command for Node +func Cmd(cf *ReconcileCmdFactory) *cobra.Command { + reconcileCmd.AddCommand(reconcileBlockCmd(cf)) + return reconcileCmd +} + +// ReconcileCmdFactory holds the clients used by ReconcileCmdFactory +type ReconcileCmdFactory struct { + EndorserClient pb.EndorserClient + Signer msp.SigningIdentity + BroadcastClient common.BroadcastClient + DeliverClient reconClientIntf + BroadcastFactory BroadcastClientFactory +} + +var reconcileCmd = &cobra.Command{ + Use: "reconcile", + Short: "Reconcile block on channel: block|txn", + Long: "Reconcile a specific block or transaction on channel: block|txn.", + PersistentPreRun: func(cmd *cobra.Command, args []string) { + common.InitCmd(cmd, args) + //common.SetOrdererEnv(cmd, args) + }, +} + +var flags *pflag.FlagSet + +func init() { + resetFlags() +} + +// Explicitly define a method to facilitate tests +func resetFlags() { + flags = &pflag.FlagSet{} + + flags.StringVarP(&channelID, "channelID", "c", common.UndefinedParamValue, "Channel ID is mandatory, as the block can be reconciled with in a channel") + flags.DurationVarP(&timeout, "timeout", "t", 10*time.Second, "Reconcile response timeout") +} + +func attachFlags(cmd *cobra.Command, names []string) { + cmdFlags := cmd.Flags() + for _, name := range names { + if flag := flags.Lookup(name); flag != nil { + cmdFlags.AddFlag(flag) + } else { + logger.Fatalf("Could not find flag '%s' to attach to commond '%s'", name, cmd.Name()) + } + } +} + +type BroadcastClientFactory func() (common.BroadcastClient, error) + +type reconClientIntf interface { + ReconcileSpecifiedBlock(num uint64) (*protos.ReconcileResponse, error) + Close() error +} + +// InitCmdFactory init the ChannelCmdFactory with clients to endorser and orderer according to params +func InitCmdFactory(isEndorserRequired, isPeerDeliverRequired, isOrdererRequired bool) (*ReconcileCmdFactory, error) { + if isPeerDeliverRequired && isOrdererRequired { + // this is likely a bug during development caused by adding a new cmd + return nil, errors.New("ERROR - only a single deliver source is currently supported") + } + + var err error + cf := &ReconcileCmdFactory{} + + cf.Signer, err = common.GetDefaultSignerFnc() + if err != nil { + return nil, errors.WithMessage(err, "error getting default signer") + } + + cf.BroadcastFactory = func() (common.BroadcastClient, error) { + return common.GetBroadcastClientFnc() + } + + // for join and list, we need the endorser as well + if isEndorserRequired { + // creating an EndorserClient with these empty parameters will create a + // connection using the values of "peer.address" and + // "peer.tls.rootcert.file" + cf.EndorserClient, err = common.GetEndorserClientFnc(common.UndefinedParamValue, common.UndefinedParamValue) + if err != nil { + return nil, errors.WithMessage(err, "error getting endorser client for channel") + } + } + + // for fetching blocks from a peer + if isPeerDeliverRequired { + cf.DeliverClient, err = common.NewP2PMessageClientForPeer(channelID, cf.Signer) + if err != nil { + return nil, errors.WithMessage(err, "error getting deliver client for channel") + } + } + + // for create and fetch, we need the orderer as well + if isOrdererRequired { + if len(strings.Split(common.OrderingEndpoint, ":")) != 2 { + return nil, errors.Errorf("ordering service endpoint %s is not valid or missing", common.OrderingEndpoint) + } + cf.DeliverClient, err = common.NewP2PMessageClientForOrderer(channelID, cf.Signer) + if err != nil { + return nil, err + } + } + + logger.Infof("Endorser and orderer connections initialized") + return cf, nil +}