Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up leader search #320

Merged
merged 23 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type App struct {
tls *tlsSetup
dialFunc client.DialFunc
store client.NodeStore
lc *client.Connector
driver *driver.Driver
driverName string
log client.LogFunc
Expand Down Expand Up @@ -239,6 +240,13 @@ func New(dir string, options ...Option) (app *App, err error) {
nodeBindAddress = nodeBindAddress[1:]
}

lc := client.NewLeaderConnector(
store,
client.WithDialFunc(driverDial),
client.WithLogFunc(o.Log),
client.WithConcurrentLeaderConns(*o.ConcurrentLeaderConns),
)

app = &App{
id: info.ID,
address: info.Address,
Expand All @@ -247,6 +255,7 @@ func New(dir string, options ...Option) (app *App, err error) {
nodeBindAddress: nodeBindAddress,
store: store,
dialFunc: driverDial,
lc: lc,
driver: driver,
driverName: driverName,
log: o.Log,
Expand Down Expand Up @@ -325,7 +334,7 @@ func (a *App) Handover(ctx context.Context) error {
ctx, cancel = context.WithTimeout(ctx, time.Minute)
defer cancel()

cli, err := a.Leader(ctx)
cli, err := a.FindLeader(ctx)
if err != nil {
return fmt.Errorf("find leader: %w", err)
}
Expand Down Expand Up @@ -379,7 +388,7 @@ func (a *App) Handover(ctx context.Context) error {
return fmt.Errorf("transfer leadership: %w", err)
}
}
cli, err = a.Leader(ctx)
cli, err = a.FindLeader(ctx)
if err != nil {
return fmt.Errorf("find new leader: %w", err)
}
Expand Down Expand Up @@ -485,14 +494,24 @@ func (a *App) Open(ctx context.Context, database string) (*sql.DB, error) {
return db, nil
}

// Leader returns a client connected to the current cluster leader, if any.
// Leader returns a client connected to the cluster leader.
//
// Prefer to use FindLeader instead unless you need to pass custom options.
func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Client, error) {
allOptions := a.clientOptions()
allOptions = append(allOptions, options...)

return client.FindLeader(ctx, a.store, allOptions...)
}

// FindLeader returns a client connected to the cluster leader.
//
// Compared to Leader, this method avoids opening extra connections int many
// cases, but doesn't accept custom options.
func (a *App) FindLeader(ctx context.Context) (*client.Client, error) {
return a.lc.Connect(ctx)
}

// Client returns a client connected to the local node.
func (a *App) Client(ctx context.Context) (*client.Client, error) {
return client.New(ctx, a.nodeBindAddress)
Expand Down Expand Up @@ -545,7 +564,7 @@ func (a *App) run(ctx context.Context, options *options, join bool) {
}
return
case <-time.After(delay):
cli, err := a.Leader(ctx)
cli, err := a.FindLeader(ctx)
if err != nil {
continue
}
Expand Down Expand Up @@ -739,7 +758,11 @@ func (a *App) makeRolesChanges(nodes []client.NodeInfo) RolesChanges {

// Return the options to use for client.FindLeader() or client.New()
func (a *App) clientOptions() []client.Option {
return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log), client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns)}
return []client.Option{
client.WithDialFunc(a.dialFunc),
client.WithLogFunc(a.log),
client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns),
}
}

func (a *App) debug(format string, args ...interface{}) {
Expand Down
9 changes: 7 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ func TestHandover_GracefulShutdown(t *testing.T) {
defer cleanup()

addr := fmt.Sprintf("127.0.0.1:900%d", i+1)
log := func(l client.LogLevel, format string, a ...interface{}) {
format = fmt.Sprintf("%s - %d: %s: %s", time.Now().Format("15:04:01.000"), i, l.String(), format)
t.Logf(format, a...)
}
options := []app.Option{
app.WithAddress(addr),
app.WithLogFunc(log),
}
if i > 0 {
options = append(options, app.WithCluster([]string{"127.0.0.1:9001"}))
Expand Down Expand Up @@ -1292,8 +1297,8 @@ func Test_TxRowsAffected(t *testing.T) {
CREATE TABLE test (
id TEXT PRIMARY KEY,
value INT
);`);
require.NoError(t, err);
);`)
require.NoError(t, err)

// Insert watermark
err = tx(context.Background(), db, func(ctx context.Context, tx *sql.Tx) error {
Expand Down
52 changes: 48 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type options struct {
DialFunc DialFunc
LogFunc LogFunc
ConcurrentLeaderConns int64
PermitShared bool
}

// WithDialFunc sets a custom dial function for creating the client network
Expand Down Expand Up @@ -64,15 +65,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error
return nil, errors.Wrap(err, "failed to establish network connection")
}

protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne)
protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne, address)
if err != nil {
conn.Close()
return nil, err
}

client := &Client{protocol: protocol}

return client, nil
return &Client{protocol}, nil
}

// Leader returns information about the current leader, if any.
Expand Down Expand Up @@ -329,3 +328,48 @@ func defaultOptions() *options {
ConcurrentLeaderConns: protocol.MaxConcurrentLeaderConns,
}
}

// Connector is a reusable configuration for creating new Clients.
//
// In some cases, Connector.Connect can take advantage of state stored in the
// Connector to be more efficient than New or FindLeader, so prefer to use a
// Connector whenever several Clients need to be created with the same
// parameters.
type Connector protocol.Connector

// NewLeaderConnector creates a Connector that will yield Clients connected to
// the cluster leader.
func NewLeaderConnector(store NodeStore, options ...Option) *Connector {
opts := defaultOptions()
for _, o := range options {
o(opts)
}
config := protocol.Config{
Dial: opts.DialFunc,
ConcurrentLeaderConns: opts.ConcurrentLeaderConns,
PermitShared: opts.PermitShared,
}
inner := protocol.NewLeaderConnector(store, config, opts.LogFunc)
return (*Connector)(inner)
}

// NewDirectConnector creates a Connector that will yield Clients connected to
// the node with the given ID and address.
func NewDirectConnector(id uint64, address string, options ...Option) *Connector {
opts := defaultOptions()
for _, o := range options {
o(opts)
}
config := protocol.Config{Dial: opts.DialFunc}
inner := protocol.NewDirectConnector(id, address, config, opts.LogFunc)
return (*Connector)(inner)
}

// Connect opens a Client based on the Connector's configuration.
func (connector *Connector) Connect(ctx context.Context) (*Client, error) {
protocol, err := (*protocol.Connector)(connector).Connect(ctx)
if err != nil {
return nil, err
}
return &Client{protocol}, nil
}
9 changes: 0 additions & 9 deletions client/client_export_test.go

This file was deleted.

44 changes: 0 additions & 44 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

dqlite "github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/internal/protocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -33,49 +32,6 @@ func TestClient_Leader(t *testing.T) {
assert.Equal(t, leader.Address, "@1001")
}

func TestClient_Dump(t *testing.T) {
node, cleanup := newNode(t)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

client, err := client.New(ctx, node.BindAddress())
require.NoError(t, err)
defer client.Close()

// Open a database and create a test table.
request := protocol.Message{}
request.Init(4096)

response := protocol.Message{}
response.Init(4096)

protocol.EncodeOpen(&request, "test.db", 0, "volatile")

p := client.Protocol()
err = p.Call(ctx, &request, &response)
require.NoError(t, err)

db, err := protocol.DecodeDb(&response)
require.NoError(t, err)

protocol.EncodeExecSQLV0(&request, uint64(db), "CREATE TABLE foo (n INT)", nil)

err = p.Call(ctx, &request, &response)
require.NoError(t, err)

files, err := client.Dump(ctx, "test.db")
require.NoError(t, err)

require.Len(t, files, 2)
assert.Equal(t, "test.db", files[0].Name)
assert.Equal(t, 4096, len(files[0].Data))

assert.Equal(t, "test.db-wal", files[1].Name)
assert.Equal(t, 8272, len(files[1].Data))
}

func TestClient_Cluster(t *testing.T) {
node, cleanup := newNode(t)
defer cleanup()
Expand Down
3 changes: 1 addition & 2 deletions client/database_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"
_ "github.com/mattn/go-sqlite3" // Go SQLite bindings
"github.com/pkg/errors"
)

// Option that can be used to tweak node store parameters.
Expand Down Expand Up @@ -154,4 +154,3 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error {

return nil
}

22 changes: 1 addition & 21 deletions client/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package client

import (
"context"

"github.com/canonical/go-dqlite/internal/protocol"
)

// FindLeader returns a Client connected to the current cluster leader.
Expand All @@ -13,23 +11,5 @@ import (
// function will keep retrying (with a capped exponential backoff) until the
// given context is canceled.
func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Client, error) {
o := defaultOptions()

for _, option := range options {
option(o)
}

config := protocol.Config{
Dial: o.DialFunc,
ConcurrentLeaderConns: o.ConcurrentLeaderConns,
}
connector := protocol.NewConnector(0, store, config, o.LogFunc)
protocol, err := connector.Connect(ctx)
if err != nil {
return nil, err
}

client := &Client{protocol: protocol}

return client, nil
return NewLeaderConnector(store, options...).Connect(ctx)
}
40 changes: 26 additions & 14 deletions client/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,47 @@ import (
)

func TestMembership(t *testing.T) {
infos, cleanup := setup(t)
defer cleanup()

store := client.NewInmemNodeStore()
store.Set(context.Background(), infos[:1])

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

client, err := client.FindLeader(ctx, store)
require.NoError(t, err)
defer client.Close()

err = client.Add(ctx, infos[1])
require.NoError(t, err)
}

func setup(t *testing.T) ([]client.NodeInfo, func()) {
n := 3
nodes := make([]*dqlite.Node, n)
infos := make([]client.NodeInfo, n)
var cleanups []func()

for i := range nodes {
id := uint64(i + 1)
address := fmt.Sprintf("@test-%d", id)
dir, cleanup := newDir(t)
defer cleanup()
cleanups = append(cleanups, cleanup)
node, err := dqlite.New(id, address, dir, dqlite.WithBindAddress(address))
require.NoError(t, err)
nodes[i] = node
infos[i].ID = id
infos[i].Address = address
err = node.Start()
require.NoError(t, err)
defer node.Close()
cleanups = append(cleanups, func() { node.Close() })
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

store := client.NewInmemNodeStore()
store.Set(context.Background(), []client.NodeInfo{infos[0]})

client, err := client.FindLeader(ctx, store)
require.NoError(t, err)
defer client.Close()

err = client.Add(ctx, infos[1])
require.NoError(t, err)
return infos, func() {
for i := len(cleanups) - 1; i >= 0; i-- {
cleanups[i]()
}
}
}
8 changes: 6 additions & 2 deletions cmd/dqlite-demo/dqlite-demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ Complete documentation is available at https://github.com/canonical/go-dqlite`,
log.Printf(fmt.Sprintf("%s: %s: %s\n", api, l.String(), format), a...)
}

options := []app.Option{app.WithAddress(db), app.WithCluster(*join), app.WithLogFunc(logFunc),
app.WithDiskMode(diskMode)}
options := []app.Option{
app.WithAddress(db),
app.WithCluster(*join),
app.WithLogFunc(logFunc),
app.WithDiskMode(diskMode),
}

// Set TLS options
if (crt != "" && key == "") || (key != "" && crt == "") {
Expand Down
Loading
Loading