Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Feature/ondemand reconciler for pvtdata in blocks #4997

Open
wants to merge 3 commits into
base: release-2.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package main

import (
"github.com/hyperledger/fabric/internal/peer/reconciler"
_ "net/http/pprof"
"os"
"strings"
Expand Down Expand Up @@ -50,6 +51,7 @@ func main() {
mainCmd.AddCommand(channel.Cmd(nil))
mainCmd.AddCommand(lifecycle.Cmd(cryptoProvider))
mainCmd.AddCommand(snapshot.Cmd(cryptoProvider))
mainCmd.AddCommand(reconciler.Cmd(nil))

// On failure Cobra prints the usage message and error string, so we only
// need to exit with a non-0 status
Expand Down
59 changes: 59 additions & 0 deletions common/p2pmessage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package p2pmessage

import (
"github.com/hyperledger/fabric/common/metrics"
)

var (
streamsOpened = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "streams_opened",
Help: "The number of GRPC streams that have been opened for the p2p_message service.",
}
streamsClosed = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "streams_closed",
Help: "The number of GRPC streams that have been closed for the p2p_message service.",
}

requestsReceived = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "requests_received",
Help: "The number of p2p_message requests that have been received.",
LabelNames: []string{"channel", "filtered", "data_type"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}",
}
requestsCompleted = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "requests_completed",
Help: "The number of p2p_message requests that have been completed.",
LabelNames: []string{"channel", "filtered", "data_type", "success"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}.%{success}",
}

blocksReconciled = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "blocks_reconciled",
Help: "The number of blocks sent by the p2p_message service.",
LabelNames: []string{"channel", "filtered", "data_type"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}",
}
)

type Metrics struct {
StreamsOpened metrics.Counter
StreamsClosed metrics.Counter
RequestsReceived metrics.Counter
RequestsCompleted metrics.Counter
BlocksReconciled metrics.Counter
}

func NewMetrics(p metrics.Provider) *Metrics {
return &Metrics{
StreamsOpened: p.NewCounter(streamsOpened),
StreamsClosed: p.NewCounter(streamsClosed),
RequestsReceived: p.NewCounter(requestsReceived),
RequestsCompleted: p.NewCounter(requestsCompleted),
BlocksReconciled: p.NewCounter(blocksReconciled),
}
}
146 changes: 146 additions & 0 deletions common/p2pmessage/p2pmessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package p2pmessage

import (
"context"
"fmt"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
gossipprivdata "github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/internal/peer/protos"
"time"
)

var logger = flogging.MustGetLogger("common.p2pmessage")

//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager

// ChainManager provides a way for the Handler to look up the Chain.
type ChainManager interface {
GetChain(chainID string) Chain
}

//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain

// Chain encapsulates chain operations and data.
type Chain interface {
// Sequence returns the current config sequence number, can be used to detect config changes
Sequence() uint64

// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager

// Reader returns the chain Reader for the chain
Reader() blockledger.Reader

// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
}

////go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker
//
//// PolicyChecker checks the envelope against the policy logic supplied by the
//// function.
//type PolicyChecker interface {
// CheckPolicy(envelope *cb.Envelope, channelID string) error
//}
//
//// The PolicyCheckerFunc is an adapter that allows the use of an ordinary
//// function as a PolicyChecker.
//type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error
//
//// CheckPolicy calls pcf(envelope, channelID)
//func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error {
// return pcf(envelope, channelID)
//}

////go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector
//
//// Inspector verifies an appropriate binding between the message and the context.
//type Inspector interface {
// Inspect(context.Context, proto.Message) error
//}
//
//// The InspectorFunc is an adapter that allows the use of an ordinary
//// function as an Inspector.
//type InspectorFunc func(context.Context, proto.Message) error
//
//// Inspect calls inspector(ctx, p)
//func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error {
// return inspector(ctx, p)
//}

// Handler handles server requests.
type Handler struct {
ExpirationCheckFunc func(identityBytes []byte) time.Time
ChainManager ChainManager
TimeWindow time.Duration
//BindingInspector Inspector
Metrics *Metrics
}

// Server is a polymorphic structure to support generalization of this handler
// to be able to deliver different type of responses.
type Server struct {
Receiver
}

// Receiver is used to receive enveloped seek requests.
type Receiver interface {
SendReconcileRequest()
}

// NewHandler creates an implementation of the Handler interface.
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics, expirationCheckDisabled bool) *Handler {
expirationCheck := crypto.ExpiresAt
if expirationCheckDisabled {
expirationCheck = noExpiration
}
return &Handler{
ChainManager: cm,
TimeWindow: timeWindow,
//BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
Metrics: metrics,
ExpirationCheckFunc: expirationCheck,
}
}

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new p2p loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)

reconcileResponse := &protos.ReconcileResponse{
Success: false,
}

// Calling Reconciler Service
// reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService()
reconciler := gossipprivdata.GetOnDemandReconcilerService(request.ChannelId)
fmt.Println(reconciler)

if reconciler == nil {
reconcileResponse.Message = "no reconciler found for channel " + request.ChannelId

return reconcileResponse, fmt.Errorf("no reconciler found for channel " + request.ChannelId)
}
response, err := reconciler.Reconcile(request.BlockNumber)
return &response, err
}

// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header.
//func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
// chdr, isChannelHeader := msg.(*cb.ChannelHeader)
// if !isChannelHeader || chdr == nil {
// return nil
// }
// return chdr.TlsCertHash
//}

func noExpiration(_ []byte) time.Time {
return time.Time{}
}
28 changes: 24 additions & 4 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.Tx
return nil
}

//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
// recommitLostBlocks retrieves blocks in specified range and commit the write set to either
// state DB or history DB or both
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {
logger.Infof("Recommitting lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables)
var err error
Expand Down Expand Up @@ -762,6 +762,23 @@ func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledge
return l.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock)
}

// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the
// specified blocks which miss at least a private data of a eligible collection.
func (l *kvLedger) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
// the missing pvtData info in the pvtdataStore could belong to a block which is yet
// to be processed and committed to the blockStore and stateDB (such a scenario is possible
// after a peer rollback). In such cases, we cannot return missing pvtData info. Otherwise,
// we would end up in an inconsistent state database.
if l.isPvtstoreAheadOfBlkstore.Load().(bool) {
return nil, nil
}
// it is safe to not acquire a read lock on l.blockAPIsRWLock. Without a lock, the value of
// lastCommittedBlock can change due to a new block commit. As a result, we may not
// be able to fetch the missing data info of truly the most recent blocks. This
// decision was made to ensure that the regular block commit rate is not affected.
return l.pvtdataStore.GetMissingPvtDataInfoForSpecificBlock(blockNumber)
}

func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []byte) {
var valueBytes []byte

Expand Down Expand Up @@ -812,9 +829,12 @@ func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilte
// DoesPvtDataInfoExist returns true when
// (1) the ledger has pvtdata associated with the given block number (or)
// (2) a few or all pvtdata associated with the given block number is missing but the
// missing info is recorded in the ledger (or)
//
// missing info is recorded in the ledger (or)
//
// (3) the block is committed but it does not contain even a single
// transaction with pvtData.
//
// transaction with pvtData.
func (l *kvLedger) DoesPvtDataInfoExist(blockNum uint64) (bool, error) {
pvtStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ type ConfigHistoryRetriever interface {
// MissingPvtDataTracker allows getting information about the private data that is not missing on the peer
type MissingPvtDataTracker interface {
GetMissingPvtDataInfoForMostRecentBlocks(maxBlocks int) (MissingPvtDataInfo, error)
GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (MissingPvtDataInfo, error)
}

// MissingPvtDataInfo is a map of block number to MissingBlockPvtdataInfo
Expand Down
84 changes: 84 additions & 0 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package pvtdatastorage

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -476,6 +477,41 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M
return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock)
}

// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the
// specified block which miss at least a private data of a eligible collection.
func (s *Store) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
// we assume that this function would be called by the gossip only after processing the
// last retrieved missing pvtdata info and committing the same.
if blockNumber < 1 {
return nil, fmt.Errorf("invalid block number [%d]", blockNumber)
}

lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock)
if blockNumber > lastCommittedBlock {
return nil, fmt.Errorf("block [%d] is not committed yet to the ledger", blockNumber)
}

logger.Debug("fetching missing pvtdata entries from the de-prioritized list")
dePrioratizedQue, err := s.getMissingDataFromSpecificBlock(elgDeprioritizedMissingDataGroup, blockNumber)
if err != nil {
logger.Debugf("Error in fetching missing pvtdata entries from the de-prioritized list: %s", err)
return nil, err
}
logger.Debug("fetching missing pvtdata entries from the prioritized list")
prioritizedQue, err := s.getMissingDataFromSpecificBlock(elgPrioritizedMissingDataGroup, blockNumber)
if err != nil {
logger.Debugf("Error in fetching missing pvtdata entries from the prioritized list: %s", err)
return nil, err
}

for k, v := range dePrioratizedQue {
prioritizedQue[k] = v // This will overwrite map1's entry if the key exists
}

return prioritizedQue, nil

}

func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) {
missingPvtDataInfo := make(ledger.MissingPvtDataInfo)
numberOfBlockProcessed := 0
Expand Down Expand Up @@ -550,6 +586,54 @@ func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDat
return missingPvtDataInfo, nil
}

func (s *Store) getMissingDataFromSpecificBlock(group []byte, blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
missingPvtDataInfo := make(ledger.MissingPvtDataInfo)

startKey, endKey := createRangeScanKeysForElgMissingData(blockNumber, group)
dbItr, err := s.db.GetIterator(startKey, endKey)
if err != nil {
return nil, err
}
defer dbItr.Release()

for dbItr.Next() {
missingDataKeyBytes := dbItr.Key()
missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes)

// check whether the entry is expired. If so, move to the next item.
// As we may use the old lastCommittedBlock value, there is a possibility that
// this missing data is actually expired but we may get the stale information.
// Though it may leads to extra work of pulling the expired data, it will not
// affect the correctness. Further, as we try to fetch the most recent missing
// data (less possibility of expiring now), such scenario would be rare. In the
// best case, we can load the latest lastCommittedBlock value here atomically to
// make this scenario very rare.
expired, err := isExpired(missingDataKey.nsCollBlk, s.btlPolicy, blockNumber)
if err != nil {
return nil, err
}
if expired {
continue
}

valueBytes := dbItr.Value()
bitmap, err := decodeMissingDataValue(valueBytes)
if err != nil {
return nil, err
}

// for each transaction which misses private data, make an entry in missingBlockPvtDataInfo
for index, isSet := bitmap.NextSet(0); isSet; index, isSet = bitmap.NextSet(index + 1) {
txNum := uint64(index)
missingPvtDataInfo.Add(missingDataKey.blkNum, txNum, missingDataKey.ns, missingDataKey.coll)
}

break
}

return missingPvtDataInfo, nil
}

// FetchBootKVHashes returns the KVHashes from the data that was loaded from a snapshot at the time of
// bootstrapping. This funciton returns an error if the supplied blkNum is greater than the last block
// number in the booting snapshot
Expand Down
Loading
Loading