Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up leader search #320

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ func TestHandover_GracefulShutdown(t *testing.T) {
defer cleanup()

addr := fmt.Sprintf("127.0.0.1:900%d", i+1)
log := func(l client.LogLevel, format string, a ...interface{}) {
format = fmt.Sprintf("%s - %d: %s: %s", time.Now().Format("15:04:01.000"), i, l.String(), format)
t.Logf(format, a...)
}
options := []app.Option{
app.WithAddress(addr),
app.WithLogFunc(log),
}
if i > 0 {
options = append(options, app.WithCluster([]string{"127.0.0.1:9001"}))
Expand Down Expand Up @@ -1292,8 +1297,8 @@ func Test_TxRowsAffected(t *testing.T) {
CREATE TABLE test (
id TEXT PRIMARY KEY,
value INT
);`);
require.NoError(t, err);
);`)
require.NoError(t, err)

// Insert watermark
err = tx(context.Background(), db, func(ctx context.Context, tx *sql.Tx) error {
Expand Down
35 changes: 22 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DialFunc = protocol.DialFunc

// Client speaks the dqlite wire protocol.
type Client struct {
protocol *protocol.Protocol
session *protocol.Session
}

// Option that can be used to tweak client parameters.
Expand Down Expand Up @@ -64,17 +64,26 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error
return nil, errors.Wrap(err, "failed to establish network connection")
}

protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne)
proto, err := protocol.Handshake(ctx, conn, protocol.VersionOne)
if err != nil {
conn.Close()
return nil, err
}

client := &Client{protocol: protocol}
sess := &protocol.Session{Protocol: proto, Address: address}
client := &Client{session: sess}

return client, nil
}

func (c *Client) call(ctx context.Context, request *protocol.Message, response *protocol.Message) error {
if err := c.session.Protocol.Call(ctx, request, response); err != nil {
c.session.Bad()
return err
}
return nil
}

// Leader returns information about the current leader, if any.
func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) {
request := protocol.Message{}
Expand All @@ -84,7 +93,7 @@ func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) {

protocol.EncodeLeader(&request)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return nil, errors.Wrap(err, "failed to send Leader request")
}

Expand All @@ -107,7 +116,7 @@ func (c *Client) Cluster(ctx context.Context) ([]NodeInfo, error) {

protocol.EncodeCluster(&request, protocol.ClusterFormatV1)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return nil, errors.Wrap(err, "failed to send Cluster request")
}

Expand Down Expand Up @@ -137,7 +146,7 @@ func (c *Client) Dump(ctx context.Context, dbname string) ([]File, error) {

protocol.EncodeDump(&request, dbname)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return nil, errors.Wrap(err, "failed to send dump request")
}

Expand Down Expand Up @@ -174,7 +183,7 @@ func (c *Client) Add(ctx context.Context, node NodeInfo) error {

protocol.EncodeAdd(&request, node.ID, node.Address)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return err
}

Expand Down Expand Up @@ -210,7 +219,7 @@ func (c *Client) Assign(ctx context.Context, id uint64, role NodeRole) error {

protocol.EncodeAssign(&request, id, uint64(role))

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return err
}

Expand All @@ -233,7 +242,7 @@ func (c *Client) Transfer(ctx context.Context, id uint64) error {

protocol.EncodeTransfer(&request, id)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return err
}

Expand All @@ -253,7 +262,7 @@ func (c *Client) Remove(ctx context.Context, id uint64) error {

protocol.EncodeRemove(&request, id)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return err
}

Expand All @@ -279,7 +288,7 @@ func (c *Client) Describe(ctx context.Context) (*NodeMetadata, error) {

protocol.EncodeDescribe(&request, protocol.RequestDescribeFormatV0)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return nil, err
}

Expand All @@ -305,7 +314,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error {

protocol.EncodeWeight(&request, weight)

if err := c.protocol.Call(ctx, &request, &response); err != nil {
if err := c.call(ctx, &request, &response); err != nil {
return err
}

Expand All @@ -318,7 +327,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error {

// Close the client.
func (c *Client) Close() error {
return c.protocol.Close()
return c.session.Close()
}

// Create a client options object with sane defaults.
Expand Down
9 changes: 0 additions & 9 deletions client/client_export_test.go

This file was deleted.

44 changes: 0 additions & 44 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

dqlite "github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/internal/protocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -33,49 +32,6 @@ func TestClient_Leader(t *testing.T) {
assert.Equal(t, leader.Address, "@1001")
}

func TestClient_Dump(t *testing.T) {
node, cleanup := newNode(t)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

client, err := client.New(ctx, node.BindAddress())
require.NoError(t, err)
defer client.Close()

// Open a database and create a test table.
request := protocol.Message{}
request.Init(4096)

response := protocol.Message{}
response.Init(4096)

protocol.EncodeOpen(&request, "test.db", 0, "volatile")

p := client.Protocol()
err = p.Call(ctx, &request, &response)
require.NoError(t, err)

db, err := protocol.DecodeDb(&response)
require.NoError(t, err)

protocol.EncodeExecSQLV0(&request, uint64(db), "CREATE TABLE foo (n INT)", nil)

err = p.Call(ctx, &request, &response)
require.NoError(t, err)

files, err := client.Dump(ctx, "test.db")
require.NoError(t, err)

require.Len(t, files, 2)
assert.Equal(t, "test.db", files[0].Name)
assert.Equal(t, 4096, len(files[0].Data))

assert.Equal(t, "test.db-wal", files[1].Name)
assert.Equal(t, 8272, len(files[1].Data))
}

func TestClient_Cluster(t *testing.T) {
node, cleanup := newNode(t)
defer cleanup()
Expand Down
6 changes: 4 additions & 2 deletions client/database_store.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !nosqlite3
// +build !nosqlite3

package client
Expand All @@ -8,8 +9,9 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"
"github.com/canonical/go-dqlite/internal/protocol"
_ "github.com/mattn/go-sqlite3" // Go SQLite bindings
"github.com/pkg/errors"
)

// Option that can be used to tweak node store parameters.
Expand All @@ -21,6 +23,7 @@ type nodeStoreOptions struct {

// DatabaseNodeStore persists a list addresses of dqlite nodes in a SQL table.
type DatabaseNodeStore struct {
protocol.Compass
db *sql.DB // Database handle to use.
schema string // Name of the schema holding the servers table.
table string // Name of the servers table.
Expand Down Expand Up @@ -154,4 +157,3 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error {

return nil
}

5 changes: 3 additions & 2 deletions client/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien
config := protocol.Config{
Dial: o.DialFunc,
ConcurrentLeaderConns: o.ConcurrentLeaderConns,
PermitShared: true,
}
connector := protocol.NewConnector(0, store, config, o.LogFunc)
protocol, err := connector.Connect(ctx)
sess, err := connector.Connect(ctx)
if err != nil {
return nil, err
}

client := &Client{protocol: protocol}
client := &Client{sess}

return client, nil
}
1 change: 1 addition & 0 deletions client/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var NewInmemNodeStore = protocol.NewInmemNodeStore

// Persists a list addresses of dqlite nodes in a YAML file.
type YamlNodeStore struct {
protocol.Compass
path string
servers []NodeInfo
mu sync.RWMutex
Expand Down
3 changes: 2 additions & 1 deletion cmd/dqlite-demo/dqlite-demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os/signal"
"path/filepath"
"strings"
"time"

"github.com/canonical/go-dqlite/app"
"github.com/canonical/go-dqlite/client"
Expand Down Expand Up @@ -50,7 +51,7 @@ Complete documentation is available at https://github.com/canonical/go-dqlite`,
}

options := []app.Option{app.WithAddress(db), app.WithCluster(*join), app.WithLogFunc(logFunc),
app.WithDiskMode(diskMode)}
app.WithDiskMode(diskMode), app.WithRolesAdjustmentFrequency(5 * time.Second)}

// Set TLS options
if (crt != "" && key == "") || (key != "" && crt == "") {
Expand Down
6 changes: 3 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func WithTracing(level client.LogLevel) Option {
}
}

// NewDriver creates a new dqlite driver, which also implements the
// New creates a new dqlite driver, which also implements the
// driver.Driver interface.
func New(store client.NodeStore, options ...Option) (*Driver, error) {
o := defaultOptions()
Expand Down Expand Up @@ -274,11 +274,11 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
tracing: c.driver.tracing,
}

var err error
conn.protocol, err = connector.Connect(ctx)
sess, err := connector.Connect(ctx)
if err != nil {
return nil, driverError(conn.log, errors.Wrap(err, "failed to create dqlite connection"))
}
conn.protocol = sess.Protocol

conn.request.Init(4096)
conn.response.Init(4096)
Expand Down
37 changes: 34 additions & 3 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"strings"
"testing"
"time"

dqlite "github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
Expand Down Expand Up @@ -619,7 +620,7 @@ func Test_DescribeLastEntry(t *testing.T) {
dir, dirCleanup := newDir(t)
defer dirCleanup()
_, cleanup := newNode(t, dir)
store := newStore(t, "@1")
store := newStore(t, bindAddress)
log := logging.Test(t)
drv, err := dqlitedriver.New(store, dqlitedriver.WithLogFunc(log))
require.NoError(t, err)
Expand Down Expand Up @@ -648,13 +649,43 @@ func Test_DescribeLastEntry(t *testing.T) {
assert.Equal(t, info.Term, uint64(1))
}

func Test_Dump(t *testing.T) {
drv, cleanup := newDriver(t)
defer cleanup()

conn, err := drv.Open("test.db")
require.NoError(t, err)

_, err = conn.(driver.ExecerContext).ExecContext(context.Background(), `CREATE TABLE foo (n INT)`, nil)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

client, err := client.New(ctx, bindAddress)
require.NoError(t, err)
defer client.Close()

files, err := client.Dump(ctx, "test.db")
require.NoError(t, err)

require.Len(t, files, 2)
assert.Equal(t, "test.db", files[0].Name)
assert.Equal(t, 4096, len(files[0].Data))

assert.Equal(t, "test.db-wal", files[1].Name)
assert.Equal(t, 8272, len(files[1].Data))
}

const bindAddress = "@1"

func newDriver(t *testing.T) (*dqlitedriver.Driver, func()) {
t.Helper()

dir, dirCleanup := newDir(t)
_, nodeCleanup := newNode(t, dir)

store := newStore(t, "@1")
store := newStore(t, bindAddress)

log := logging.Test(t)

Expand Down Expand Up @@ -683,7 +714,7 @@ func newStore(t *testing.T, address string) client.NodeStore {
func newNode(t *testing.T, dir string) (*dqlite.Node, func()) {
t.Helper()

server, err := dqlite.New(uint64(1), "@1", dir, dqlite.WithBindAddress("@1"))
server, err := dqlite.New(uint64(1), bindAddress, dir, dqlite.WithBindAddress(bindAddress))
require.NoError(t, err)

err = server.Start()
Expand Down
1 change: 1 addition & 0 deletions internal/protocol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type Config struct {
BackoffCap time.Duration // Maximum connection retry backoff value,
RetryLimit uint // Maximum number of retries, or 0 for unlimited.
ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership.
PermitShared bool
}
Loading
Loading