Skip to content

Commit

Permalink
Merge pull request #187 from coinbase/patrick/unbuffered-reconciler
Browse files Browse the repository at this point in the history
[reconciler] Use buffered channel
  • Loading branch information
patrick-ogrady authored Oct 15, 2020
2 parents 0230083 + 6675e31 commit 978c228
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 55 deletions.
11 changes: 1 addition & 10 deletions reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package reconciler
import (
"fmt"

"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/types"
)

Expand Down Expand Up @@ -70,17 +69,9 @@ func WithSeenAccounts(seen []*AccountCurrency) Option {
}
}

// WithLookupBalanceByBlock sets lookupBlockByBalance
// and instantiates the correct changeQueue.
// WithLookupBalanceByBlock sets lookupBlockByBalance.
func WithLookupBalanceByBlock(lookup bool) Option {
return func(r *Reconciler) {
// When lookupBalanceByBlock is disabled, we must check
// balance changes asynchronously. Using a buffered
// channel allows us to add balance changes without blocking.
if !lookup {
r.changeQueue = make(chan *parser.BalanceChange, backlogThreshold)
}

// We don't do anything if lookup == true because the default
// is already to create a non-buffered channel.
r.lookupBalanceByBlock = lookup
Expand Down
92 changes: 58 additions & 34 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ type Reconciler struct {
// exemptions allows for certain reconciliation failures
// to be skipped.
exemptions []*types.BalanceExemption

// LastIndexChecked is the last block index reconciled actively.
lastIndexMutex sync.Mutex
lastIndexChecked int64
}

// New creates a new Reconciler.
Expand All @@ -215,19 +219,17 @@ func New(
options ...Option,
) *Reconciler {
r := &Reconciler{
helper: helper,
handler: handler,
inactiveFrequency: defaultInactiveFrequency,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: map[string]struct{}{},
inactiveQueue: []*InactiveEntry{},

// When lookupBalanceByBlock is enabled, we check
// balance changes synchronously.
helper: helper,
handler: handler,
inactiveFrequency: defaultInactiveFrequency,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: map[string]struct{}{},
inactiveQueue: []*InactiveEntry{},
lookupBalanceByBlock: defaultLookupBalanceByBlock,
changeQueue: make(chan *parser.BalanceChange),
changeQueue: make(chan *parser.BalanceChange, backlogThreshold),
lastIndexChecked: -1,
}

for _, opt := range options {
Expand Down Expand Up @@ -284,36 +286,42 @@ func (r *Reconciler) QueueChanges(
return err
}

if !r.lookupBalanceByBlock {
// All changes will have the same block. Continue
// if we are too far behind to start reconciling.
//
// Note: we don't return here so that we can ensure
// all seen accounts are added to the inactiveAccountQueue.
if block.Index < r.highWaterMark {
continue
}
// All changes will have the same block. Continue
// if we are too far behind to start reconciling.
if block.Index < r.highWaterMark {
continue
}

select {
case r.changeQueue <- change:
default:
if r.debugLogging {
log.Println("skipping active enqueue because backlog")
}
}
} else {
// Block until all checked for a block or context is Done
select {
case r.changeQueue <- change:
case <-ctx.Done():
return ctx.Err()
select {
case r.changeQueue <- change:
default:
if r.debugLogging {
log.Printf(
"skipping active enqueue because backlog has %d items",
backlogThreshold,
)
}
}
}

return nil
}

// QueueSize is a helper that returns the total
// number of items currently enqueued for active
// reconciliation.
func (r *Reconciler) QueueSize() int {
return len(r.changeQueue)
}

// LastIndexReconciled is the last block index
// reconciled. This is used to ensure all the
// enqueued accounts for a particular block have
// been reconciled.
func (r *Reconciler) LastIndexReconciled() int64 {
return r.lastIndexChecked
}

// CompareBalance checks to see if the computed balance of an account
// is equal to the live balance of an account. This function ensures
// balance is checked correctly in the case of orphaned blocks.
Expand Down Expand Up @@ -680,6 +688,22 @@ func (r *Reconciler) reconcileActiveAccounts(
if err != nil {
return err
}

// Update the lastIndexChecked value if the block
// index is greater. We don't acquire the lock
// to make this check to improve performance.
if balanceChange.Block.Index > r.lastIndexChecked {
r.lastIndexMutex.Lock()

// In the time since making the check and acquiring
// the lock, the lastIndexChecked could've increased
// so we check it again.
if balanceChange.Block.Index > r.lastIndexChecked {
r.lastIndexChecked = balanceChange.Block.Index
}

r.lastIndexMutex.Unlock()
}
}
}
}
Expand Down
27 changes: 16 additions & 11 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ func TestReconcile_SuccessOnlyActive(t *testing.T) {
false,
)

assert.Equal(t, int64(-1), r.LastIndexReconciled())

go func() {
err := r.Reconcile(ctx)
assert.Contains(t, context.Canceled.Error(), err.Error())
Expand All @@ -856,6 +858,7 @@ func TestReconcile_SuccessOnlyActive(t *testing.T) {
time.Sleep(1 * time.Second)
cancel()

assert.Equal(t, block2.Index, r.LastIndexReconciled())
mockHelper.AssertExpectations(t)
mockHandler.AssertExpectations(t)
})
Expand Down Expand Up @@ -952,11 +955,6 @@ func TestReconcile_HighWaterMark(t *testing.T) {
false,
)

go func() {
err := r.Reconcile(ctx)
assert.Contains(t, context.Canceled.Error(), err.Error())
}()

err := r.QueueChanges(ctx, block, []*parser.BalanceChange{
{
Account: accountCurrency.Account,
Expand All @@ -974,6 +972,12 @@ func TestReconcile_HighWaterMark(t *testing.T) {
},
})
assert.NoError(t, err)
assert.Equal(t, r.QueueSize(), 4) // includes interesting accounts

go func() {
err := r.Reconcile(ctx)
assert.Contains(t, context.Canceled.Error(), err.Error())
}()

time.Sleep(1 * time.Second)
cancel()
Expand Down Expand Up @@ -1095,12 +1099,6 @@ func TestReconcile_FailureOnlyActive(t *testing.T) {
false,
)

go func() {
err := r.Reconcile(ctx)
assert.Error(t, err)
assert.Contains(t, "reconciliation failed", err.Error())
}()

err := r.QueueChanges(ctx, block, []*parser.BalanceChange{
{
Account: accountCurrency.Account,
Expand All @@ -1110,6 +1108,13 @@ func TestReconcile_FailureOnlyActive(t *testing.T) {
},
})
assert.NoError(t, err)
assert.Equal(t, r.QueueSize(), 1)

go func() {
err := r.Reconcile(ctx)
assert.Error(t, err)
assert.Contains(t, "reconciliation failed", err.Error())
}()

time.Sleep(1 * time.Second)

Expand Down

0 comments on commit 978c228

Please sign in to comment.