Skip to content

Commit

Permalink
fix: fix data race in test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
robotomize committed May 18, 2021
1 parent cd215a9 commit f3fcbe9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 53 deletions.
50 changes: 29 additions & 21 deletions internal/dispatcher/db_tx_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ package dispatcher
import (
"context"
"fmt"
"sod/internal/database"
"sod/internal/logging"
metricDb "sod/internal/metric/database"
"sod/internal/metric/model"
"sync"
"time"

"github.com/go-sod/sod/internal/database"
"github.com/go-sod/sod/internal/logging"
metricDb "github.com/go-sod/sod/internal/metric/database"
"github.com/go-sod/sod/internal/metric/model"
)

func newDbTxExecutor(db *database.DB, opts dbTxExecutorOptions, shutdownCh chan<- error) *dbTxExecutor {
return &dbTxExecutor{metricDb: metricDb.New(db), opts: opts, shutdownCh: shutdownCh}
func newDBTxExecutor(db *database.DB, opts dbTxExecutorOptions, shutdownCh chan<- error) *dbTxExecutor {
return &dbTxExecutor{metricDB: metricDb.New(db), opts: opts, shutdownCh: shutdownCh}
}

// dbTxExecutorOptions Returns the structure with configuration options
type dbTxExecutorOptions struct {
dbFlushSize int
dbFlushTime time.Duration
deps pullDependencies
flushSize int
flushTime time.Duration
deps pullDependencies
}

// A structure that represents the database transaction execution service.
Expand All @@ -28,7 +29,7 @@ type dbTxExecutor struct {
mtx sync.RWMutex

opts dbTxExecutorOptions
metricDb *metricDb.DB
metricDB *metricDb.DB
// Buffer that accumulates metric data for adding
buf []model.Metric
shutdownCh chan<- error
Expand All @@ -38,32 +39,32 @@ type dbTxExecutor struct {
func (tx *dbTxExecutor) shutdown() error {
tx.mtx.Lock()
if err := tx.opts.deps.appendMetricsFn(context.Background(), tx.buf); err != nil {
return fmt.Errorf("txExecutor: append many operation failed: %v", err)
return fmt.Errorf("txExecutor: write many operation failed: %w", err)
}
tx.buf = tx.buf[:0]
tx.mtx.Unlock()
return nil
}

// This is the main method for adding data. It adds data to the buffer.
// If the buffer is full, it calls the bulkAppend method
func (tx *dbTxExecutor) append(ctx context.Context, data model.Metric) {
// If the buffer is full, it calls the flush method
func (tx *dbTxExecutor) write(ctx context.Context, data model.Metric) {
tx.mtx.Lock()
if tx.buf == nil {
tx.buf = []model.Metric{}
if len(tx.buf) == 0 {
tx.buf = make([]model.Metric, 0)
}

tx.buf = append(tx.buf, data)
bufLen := len(tx.buf)
tx.mtx.Unlock()

if bufLen >= tx.opts.dbFlushSize {
go tx.bulkAppend(ctx)
if bufLen >= tx.opts.flushSize {
go tx.flush(ctx)
}
}

// Bulk adds data to persistent storage and clears the buffer
func (tx *dbTxExecutor) bulkAppend(ctx context.Context) {
func (tx *dbTxExecutor) flush(ctx context.Context) {
logger := logging.FromContext(ctx)

tx.mtx.Lock()
Expand All @@ -73,20 +74,27 @@ func (tx *dbTxExecutor) bulkAppend(ctx context.Context) {
tx.mtx.Unlock()
// call appendMetricsFn
if err := tx.opts.deps.appendMetricsFn(context.Background(), tmpBuf); err != nil {
logger.Errorf("txExecutor: append many operation failed: %v", err)
logger.Errorf("txExecutor: flush operation failed: %v", err)
}
}

func (tx *dbTxExecutor) len() int {
tx.mtx.RLock()
defer tx.mtx.RUnlock()

return len(tx.buf)
}

// Every n seconds, data from the buffer must be inserted into the database
func (tx *dbTxExecutor) flusher(ctx context.Context) {
defer func() {
tx.shutdownCh <- tx.shutdown()
}()
ticker := time.NewTicker(tx.opts.dbFlushTime)
ticker := time.NewTicker(tx.opts.flushTime)
for {
select {
case <-ticker.C:
tx.bulkAppend(ctx)
tx.flush(ctx)
case <-ctx.Done():
return
}
Expand Down
90 changes: 58 additions & 32 deletions internal/dispatcher/db_tx_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package dispatcher
import (
"context"
"errors"
"sod/internal/geom"
"sod/internal/metric/model"
"sync/atomic"
"sync"
"testing"
"time"

"github.com/go-sod/sod/internal/geom"
"github.com/go-sod/sod/internal/metric/model"
)

func TestDbxExecutorFlusher(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shutdownCh chan error
Expand All @@ -37,17 +39,22 @@ func TestDbxExecutorFlusher(t *testing.T) {
}

for _, test := range tests {
test := test

t.Run(test.name, func(t *testing.T) {
length := 0
bit := int64(0)
t.Parallel()
appendedCnt := 0
mtx := sync.RWMutex{}
txExecutor := &dbTxExecutor{
opts: dbTxExecutorOptions{
dbFlushTime: 1 * time.Second,
flushTime: 500 * time.Millisecond,
deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
if atomic.LoadInt64(&bit) == 0 {
length = len(metrics)
atomic.StoreInt64(&bit, 1)
mtx.Lock()
defer mtx.Unlock()

if len(metrics) > 0 {
appendedCnt = len(metrics)
}

return nil
Expand All @@ -62,18 +69,20 @@ func TestDbxExecutorFlusher(t *testing.T) {
txExecutor.buf = test.batch
go txExecutor.flusher(ctx)

time.Sleep(test.waitingTime * 2)
cancel()
time.Sleep(test.waitingTime)

if length != test.expectedLen {
cancel()
mtx.RLock()
if appendedCnt != test.expectedLen {
t.Errorf(
"calling the flusher method, the length of the inserted data got: %v, expected: %v",
length,
appendedCnt,
test.expectedLen,
)
}
mtx.RUnlock()

if len(txExecutor.buf) != test.expectedBufLen {
if txExecutor.len() != test.expectedBufLen {
t.Errorf(
"calling the shutdown method, the length of buffer got: %v, expected: %v",
len(txExecutor.buf),
Expand All @@ -85,6 +94,7 @@ func TestDbxExecutorFlusher(t *testing.T) {
}

func TestDbTxExecutorAppend(t *testing.T) {
t.Parallel()
tests := []struct {
name string
items []model.Metric
Expand Down Expand Up @@ -120,20 +130,28 @@ func TestDbTxExecutorAppend(t *testing.T) {
}

for _, test := range tests {
test := test

t.Run(test.name, func(t *testing.T) {
txExecutor := &dbTxExecutor{opts: dbTxExecutorOptions{deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
return nil
t.Parallel()
txExecutor := &dbTxExecutor{
opts: dbTxExecutorOptions{
flushSize: 10,
deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
return nil
},
},
},
}}}
}

for _, item := range test.items {
txExecutor.append(context.Background(), item)
txExecutor.write(context.Background(), item)
}

if len(txExecutor.buf) != test.expectedLen {
if txExecutor.len() != test.expectedLen {
t.Errorf(
"calling the append method, the length of the inserted data got: %v, expected: %v",
"calling the write method, the length of the inserted data got: %v, expected: %v",
len(txExecutor.buf),
test.expectedLen,
)
Expand All @@ -142,7 +160,8 @@ func TestDbTxExecutorAppend(t *testing.T) {
}
}

func TestDbTxExecutorBulkAppend(t *testing.T) {
func TestDbTxExecutorFlush(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shutdownCh chan error
Expand Down Expand Up @@ -174,30 +193,34 @@ func TestDbTxExecutorBulkAppend(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
length := 0
txExecutor := &dbTxExecutor{
buf: test.buf,
opts: dbTxExecutorOptions{deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
length = len(metrics)
return nil
buf: test.buf[:],
opts: dbTxExecutorOptions{
deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
length = len(metrics)
return nil
},
},
}}}
},
}

txExecutor.bulkAppend(context.Background())
txExecutor.flush(context.Background())

if length != test.expectedLen {
t.Errorf(
"calling the bulkAppend method, the length of the inserted data got: %v, expected: %v",
"calling the flush method, the length of the inserted data got: %v, expected: %v",
length,
test.expectedLen,
)
}

if len(txExecutor.buf) != test.expectedBufLen {
t.Errorf(
"calling the bulkAppend method, the length of buffer got: %v, expected: %v",
"calling the flush method, the length of buffer got: %v, expected: %v",
len(txExecutor.buf),
test.expectedBufLen,
)
Expand All @@ -207,6 +230,7 @@ func TestDbTxExecutorBulkAppend(t *testing.T) {
}

func TestDbTxExecutorShutdown(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shutdownCh chan error
Expand Down Expand Up @@ -238,7 +262,9 @@ func TestDbTxExecutorShutdown(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
length := 0
txExecutor := &dbTxExecutor{opts: dbTxExecutorOptions{deps: pullDependencies{
appendMetricsFn: func(ctx context.Context, metrics []model.Metric) error {
Expand Down

0 comments on commit f3fcbe9

Please sign in to comment.