From 55d14950eb4e750b9b848e25b995f9654f751381 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Wed, 18 Sep 2024 18:14:36 -0400 Subject: [PATCH 01/23] Remember the address of the last known leader Signed-off-by: Cole Miller --- client/database_store.go | 6 +- client/store.go | 1 + cmd/dqlite-demo/dqlite-demo.go | 3 +- driver/driver.go | 2 +- internal/protocol/connector.go | 88 ++++++++++++++++++----------- internal/protocol/connector_test.go | 17 +++++- internal/protocol/store.go | 34 +++++++++++ 7 files changed, 112 insertions(+), 39 deletions(-) diff --git a/client/database_store.go b/client/database_store.go index 665bb02e..3a6f43d4 100644 --- a/client/database_store.go +++ b/client/database_store.go @@ -1,3 +1,4 @@ +//go:build !nosqlite3 // +build !nosqlite3 package client @@ -8,8 +9,9 @@ import ( "fmt" "strings" - "github.com/pkg/errors" + "github.com/canonical/go-dqlite/internal/protocol" _ "github.com/mattn/go-sqlite3" // Go SQLite bindings + "github.com/pkg/errors" ) // Option that can be used to tweak node store parameters. @@ -21,6 +23,7 @@ type nodeStoreOptions struct { // DatabaseNodeStore persists a list addresses of dqlite nodes in a SQL table. type DatabaseNodeStore struct { + protocol.Compass db *sql.DB // Database handle to use. schema string // Name of the schema holding the servers table. table string // Name of the servers table. @@ -154,4 +157,3 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error { return nil } - diff --git a/client/store.go b/client/store.go index 6e12646d..d8428b22 100644 --- a/client/store.go +++ b/client/store.go @@ -30,6 +30,7 @@ var NewInmemNodeStore = protocol.NewInmemNodeStore // Persists a list addresses of dqlite nodes in a YAML file. type YamlNodeStore struct { + protocol.Compass path string servers []NodeInfo mu sync.RWMutex diff --git a/cmd/dqlite-demo/dqlite-demo.go b/cmd/dqlite-demo/dqlite-demo.go index 0b9dae40..b3bad60b 100644 --- a/cmd/dqlite-demo/dqlite-demo.go +++ b/cmd/dqlite-demo/dqlite-demo.go @@ -13,6 +13,7 @@ import ( "os/signal" "path/filepath" "strings" + "time" "github.com/canonical/go-dqlite/app" "github.com/canonical/go-dqlite/client" @@ -50,7 +51,7 @@ Complete documentation is available at https://github.com/canonical/go-dqlite`, } options := []app.Option{app.WithAddress(db), app.WithCluster(*join), app.WithLogFunc(logFunc), - app.WithDiskMode(diskMode)} + app.WithDiskMode(diskMode), app.WithRolesAdjustmentFrequency(5 * time.Second)} // Set TLS options if (crt != "" && key == "") || (key != "" && crt == "") { diff --git a/driver/driver.go b/driver/driver.go index 9431c92d..cdb5784e 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -189,7 +189,7 @@ func WithTracing(level client.LogLevel) Option { } } -// NewDriver creates a new dqlite driver, which also implements the +// New creates a new dqlite driver, which also implements the // driver.Driver interface. func New(store client.NodeStore, options ...Option) (*Driver, error) { o := defaultOptions() diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index d1181393..1fd95257 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -27,12 +27,18 @@ type DialFunc func(context.Context, string) (net.Conn, error) // Connector is in charge of creating a dqlite SQL client connected to the // current leader of a cluster. type Connector struct { - id uint64 // Conn ID to use when registering against the server. - store NodeStore // Used to get and update current cluster servers. + id uint64 // Conn ID to use when registering against the server. + store NodeStoreLeaderTracker config Config // Connection parameters. log logging.Func // Logging function. } +type nonTracking struct{ NodeStore } + +func (nt *nonTracking) Guess() string { return "" } +func (nt *nonTracking) Point(string) {} +func (nt *nonTracking) Shake() {} + // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector { @@ -60,9 +66,14 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * config.ConcurrentLeaderConns = MaxConcurrentLeaderConns } + nslt, ok := store.(NodeStoreLeaderTracker) + if !ok { + nslt = &nonTracking{store} + } + connector := &Connector{ id: id, - store: store, + store: nslt, config: config, log: log, } @@ -122,9 +133,17 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { return protocol, nil } -// Make a single attempt to establish a connection to the leader server trying -// all addresses available in the store. func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Protocol, error) { + if addr := c.store.Guess(); addr != "" { + // TODO In the event of failure, we could still use the second + // return value to guide the next stage of the search. + if p, _, _ := c.connectAttemptOne(ctx, ctx, addr, log); p != nil { + log(logging.Debug, "server %s: connected on fast path", addr) + return p, nil + } + c.store.Shake() + } + servers, err := c.store.Get(ctx) if err != nil { return nil, errors.Wrap(err, "get servers") @@ -146,19 +165,23 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P sem := semaphore.NewWeighted(c.config.ConcurrentLeaderConns) - protocolCh := make(chan *Protocol) + type pair struct { + protocol *Protocol + address string + } + leaderCh := make(chan pair) wg := &sync.WaitGroup{} wg.Add(len(servers)) go func() { wg.Wait() - close(protocolCh) + close(leaderCh) }() // Make an attempt for each address until we find the leader. for _, server := range servers { - go func(server NodeInfo, pc chan<- *Protocol) { + go func(server NodeInfo, pc chan<- pair) { defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { @@ -170,72 +193,61 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P return } - log := func(l logging.Level, format string, a ...interface{}) { - format = fmt.Sprintf("server %s: ", server.Address) + format - log(l, format, a...) - } - - ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() - protocol, leader, err := c.connectAttemptOne(origCtx, ctx, server.Address, log) if err != nil { // This server is unavailable, try with the next target. - log(logging.Warn, err.Error()) + log(logging.Warn, "server %s: %s", server.Address, err.Error()) return } if protocol != nil { // We found the leader - log(logging.Debug, "connected") - pc <- protocol + pc <- pair{protocol, server.Address} return } if leader == "" { // This server does not know who the current leader is, // try with the next target. - log(logging.Warn, "no known leader") + log(logging.Warn, "server %s: no known leader", server.Address) return } // If we get here, it means this server reported that another // server is the leader, let's close the connection to this // server and try with the suggested one. - log(logging.Debug, "connect to reported leader %s", leader) - - ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + log(logging.Debug, "server %s: connect to reported leader %s", server.Address, leader) protocol, _, err = c.connectAttemptOne(origCtx, ctx, leader, log) if err != nil { // The leader reported by the previous server is // unavailable, try with the next target. - log(logging.Warn, "reported leader unavailable err=%v", err) + log(logging.Warn, "server %s: reported leader unavailable err=%v", leader, err) return } if protocol == nil { // The leader reported by the target server does not consider itself // the leader, try with the next target. - log(logging.Warn, "reported leader server is not the leader") + log(logging.Warn, "server %s: reported leader server is not the leader", leader) return } - log(logging.Debug, "connected") - pc <- protocol - }(server, protocolCh) + pc <- pair{protocol, leader} + }(server, leaderCh) } // Read from protocol chan, cancel context - protocol, ok := <-protocolCh + leader, ok := <-leaderCh if !ok { return nil, ErrNoAvailableLeader } + log(logging.Debug, "server %s: connected on fallback path", leader.address) + c.store.Point(leader.address) cancel() - for extra := range protocolCh { - extra.Close() + for extra := range leaderCh { + extra.protocol.Close() } - return protocol, nil + return leader.protocol, nil } // Perform the initial handshake using the given protocol version. @@ -278,7 +290,15 @@ func (c *Connector) connectAttemptOne( address string, log logging.Func, ) (*Protocol, string, error) { - dialCtx, cancel := context.WithTimeout(dialCtx, c.config.DialTimeout) + log = func(l logging.Level, format string, a ...interface{}) { + format = fmt.Sprintf("server %s: ", address) + format + log(l, format, a...) + } + + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + + dialCtx, cancel = context.WithTimeout(dialCtx, c.config.DialTimeout) defer cancel() // Establish the connection. diff --git a/internal/protocol/connector_test.go b/internal/protocol/connector_test.go index 2c24e5de..1223bd69 100644 --- a/internal/protocol/connector_test.go +++ b/internal/protocol/connector_test.go @@ -35,7 +35,22 @@ func TestConnector_Success(t *testing.T) { assert.NoError(t, client.Close()) check([]string{ - "DEBUG: attempt 1: server @test-0: connected", + "DEBUG: attempt 1: server @test-0: connected on fallback path", + }) + + log, check = newLogFunc(t) + connector = protocol.NewConnector(0, store, protocol.Config{}, log) + + ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + client, err = connector.Connect(ctx) + require.NoError(t, err) + + assert.NoError(t, client.Close()) + + check([]string{ + "DEBUG: attempt 1: server @test-0: connected on fast path", }) } diff --git a/internal/protocol/store.go b/internal/protocol/store.go index 5930e5c5..60d4ad30 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -46,6 +46,7 @@ type NodeStore interface { // InmemNodeStore keeps the list of servers in memory. type InmemNodeStore struct { + Compass mu sync.RWMutex servers []NodeInfo } @@ -74,3 +75,36 @@ func (i *InmemNodeStore) Set(ctx context.Context, servers []NodeInfo) error { i.servers = servers return nil } + +type NodeStoreLeaderTracker interface { + NodeStore + Guess() string + Point(string) + Shake() +} + +type Compass struct { + mu sync.RWMutex + lastKnownLeader string +} + +func (co *Compass) Guess() string { + co.mu.RLock() + defer co.mu.RUnlock() + + return co.lastKnownLeader +} + +func (co *Compass) Point(address string) { + co.mu.Lock() + defer co.mu.Unlock() + + co.lastKnownLeader = address +} + +func (co *Compass) Shake() { + co.mu.Lock() + defer co.mu.Unlock() + + co.lastKnownLeader = "" +} From 3bd3a699372796e0d1333932cd8a3a76457530d9 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 23 Sep 2024 23:25:11 -0400 Subject: [PATCH 02/23] Introduce a reusable leader connection Signed-off-by: Cole Miller --- app/app_test.go | 9 ++- client/client.go | 35 +++++---- client/client_export_test.go | 9 --- client/client_test.go | 44 ------------ client/leader.go | 5 +- driver/driver.go | 4 +- driver/driver_test.go | 37 +++++++++- internal/protocol/config.go | 1 + internal/protocol/connector.go | 106 +++++++++++++++++----------- internal/protocol/connector_test.go | 40 +++++++++++ internal/protocol/protocol_test.go | 2 +- internal/protocol/store.go | 100 ++++++++++++++++++++++++-- 12 files changed, 266 insertions(+), 126 deletions(-) delete mode 100644 client/client_export_test.go diff --git a/app/app_test.go b/app/app_test.go index d773b866..b6ee60c2 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -401,8 +401,13 @@ func TestHandover_GracefulShutdown(t *testing.T) { defer cleanup() addr := fmt.Sprintf("127.0.0.1:900%d", i+1) + log := func(l client.LogLevel, format string, a ...interface{}) { + format = fmt.Sprintf("%s - %d: %s: %s", time.Now().Format("15:04:01.000"), i, l.String(), format) + t.Logf(format, a...) + } options := []app.Option{ app.WithAddress(addr), + app.WithLogFunc(log), } if i > 0 { options = append(options, app.WithCluster([]string{"127.0.0.1:9001"})) @@ -1292,8 +1297,8 @@ func Test_TxRowsAffected(t *testing.T) { CREATE TABLE test ( id TEXT PRIMARY KEY, value INT -);`); - require.NoError(t, err); +);`) + require.NoError(t, err) // Insert watermark err = tx(context.Background(), db, func(ctx context.Context, tx *sql.Tx) error { diff --git a/client/client.go b/client/client.go index 76987b23..0da67d29 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,7 @@ type DialFunc = protocol.DialFunc // Client speaks the dqlite wire protocol. type Client struct { - protocol *protocol.Protocol + session *protocol.Session } // Option that can be used to tweak client parameters. @@ -64,17 +64,26 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error return nil, errors.Wrap(err, "failed to establish network connection") } - protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne) + proto, err := protocol.Handshake(ctx, conn, protocol.VersionOne) if err != nil { conn.Close() return nil, err } - client := &Client{protocol: protocol} + sess := &protocol.Session{Protocol: proto, Address: address} + client := &Client{session: sess} return client, nil } +func (c *Client) call(ctx context.Context, request *protocol.Message, response *protocol.Message) error { + if err := c.session.Protocol.Call(ctx, request, response); err != nil { + c.session.Bad() + return err + } + return nil +} + // Leader returns information about the current leader, if any. func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) { request := protocol.Message{} @@ -84,7 +93,7 @@ func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) { protocol.EncodeLeader(&request) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Leader request") } @@ -107,7 +116,7 @@ func (c *Client) Cluster(ctx context.Context) ([]NodeInfo, error) { protocol.EncodeCluster(&request, protocol.ClusterFormatV1) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Cluster request") } @@ -137,7 +146,7 @@ func (c *Client) Dump(ctx context.Context, dbname string) ([]File, error) { protocol.EncodeDump(&request, dbname) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send dump request") } @@ -174,7 +183,7 @@ func (c *Client) Add(ctx context.Context, node NodeInfo) error { protocol.EncodeAdd(&request, node.ID, node.Address) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return err } @@ -210,7 +219,7 @@ func (c *Client) Assign(ctx context.Context, id uint64, role NodeRole) error { protocol.EncodeAssign(&request, id, uint64(role)) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return err } @@ -233,7 +242,7 @@ func (c *Client) Transfer(ctx context.Context, id uint64) error { protocol.EncodeTransfer(&request, id) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return err } @@ -253,7 +262,7 @@ func (c *Client) Remove(ctx context.Context, id uint64) error { protocol.EncodeRemove(&request, id) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return err } @@ -279,7 +288,7 @@ func (c *Client) Describe(ctx context.Context) (*NodeMetadata, error) { protocol.EncodeDescribe(&request, protocol.RequestDescribeFormatV0) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return nil, err } @@ -305,7 +314,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { protocol.EncodeWeight(&request, weight) - if err := c.protocol.Call(ctx, &request, &response); err != nil { + if err := c.call(ctx, &request, &response); err != nil { return err } @@ -318,7 +327,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { // Close the client. func (c *Client) Close() error { - return c.protocol.Close() + return c.session.Close() } // Create a client options object with sane defaults. diff --git a/client/client_export_test.go b/client/client_export_test.go deleted file mode 100644 index 5fa73b48..00000000 --- a/client/client_export_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package client - -import ( - "github.com/canonical/go-dqlite/internal/protocol" -) - -func (c *Client) Protocol() *protocol.Protocol { - return c.protocol -} diff --git a/client/client_test.go b/client/client_test.go index 27ac2e45..f2090970 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -10,7 +10,6 @@ import ( dqlite "github.com/canonical/go-dqlite" "github.com/canonical/go-dqlite/client" - "github.com/canonical/go-dqlite/internal/protocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -33,49 +32,6 @@ func TestClient_Leader(t *testing.T) { assert.Equal(t, leader.Address, "@1001") } -func TestClient_Dump(t *testing.T) { - node, cleanup := newNode(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - client, err := client.New(ctx, node.BindAddress()) - require.NoError(t, err) - defer client.Close() - - // Open a database and create a test table. - request := protocol.Message{} - request.Init(4096) - - response := protocol.Message{} - response.Init(4096) - - protocol.EncodeOpen(&request, "test.db", 0, "volatile") - - p := client.Protocol() - err = p.Call(ctx, &request, &response) - require.NoError(t, err) - - db, err := protocol.DecodeDb(&response) - require.NoError(t, err) - - protocol.EncodeExecSQLV0(&request, uint64(db), "CREATE TABLE foo (n INT)", nil) - - err = p.Call(ctx, &request, &response) - require.NoError(t, err) - - files, err := client.Dump(ctx, "test.db") - require.NoError(t, err) - - require.Len(t, files, 2) - assert.Equal(t, "test.db", files[0].Name) - assert.Equal(t, 4096, len(files[0].Data)) - - assert.Equal(t, "test.db-wal", files[1].Name) - assert.Equal(t, 8272, len(files[1].Data)) -} - func TestClient_Cluster(t *testing.T) { node, cleanup := newNode(t) defer cleanup() diff --git a/client/leader.go b/client/leader.go index d98ce2bb..8e357730 100644 --- a/client/leader.go +++ b/client/leader.go @@ -22,14 +22,15 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien config := protocol.Config{ Dial: o.DialFunc, ConcurrentLeaderConns: o.ConcurrentLeaderConns, + PermitShared: true, } connector := protocol.NewConnector(0, store, config, o.LogFunc) - protocol, err := connector.Connect(ctx) + sess, err := connector.Connect(ctx) if err != nil { return nil, err } - client := &Client{protocol: protocol} + client := &Client{sess} return client, nil } diff --git a/driver/driver.go b/driver/driver.go index cdb5784e..5b8f736b 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -274,11 +274,11 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) { tracing: c.driver.tracing, } - var err error - conn.protocol, err = connector.Connect(ctx) + sess, err := connector.Connect(ctx) if err != nil { return nil, driverError(conn.log, errors.Wrap(err, "failed to create dqlite connection")) } + conn.protocol = sess.Protocol conn.request.Init(4096) conn.response.Init(4096) diff --git a/driver/driver_test.go b/driver/driver_test.go index 5004a13a..7cd6f469 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -22,6 +22,7 @@ import ( "os" "strings" "testing" + "time" dqlite "github.com/canonical/go-dqlite" "github.com/canonical/go-dqlite/client" @@ -619,7 +620,7 @@ func Test_DescribeLastEntry(t *testing.T) { dir, dirCleanup := newDir(t) defer dirCleanup() _, cleanup := newNode(t, dir) - store := newStore(t, "@1") + store := newStore(t, bindAddress) log := logging.Test(t) drv, err := dqlitedriver.New(store, dqlitedriver.WithLogFunc(log)) require.NoError(t, err) @@ -648,13 +649,43 @@ func Test_DescribeLastEntry(t *testing.T) { assert.Equal(t, info.Term, uint64(1)) } +func Test_Dump(t *testing.T) { + drv, cleanup := newDriver(t) + defer cleanup() + + conn, err := drv.Open("test.db") + require.NoError(t, err) + + _, err = conn.(driver.ExecerContext).ExecContext(context.Background(), `CREATE TABLE foo (n INT)`, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + client, err := client.New(ctx, bindAddress) + require.NoError(t, err) + defer client.Close() + + files, err := client.Dump(ctx, "test.db") + require.NoError(t, err) + + require.Len(t, files, 2) + assert.Equal(t, "test.db", files[0].Name) + assert.Equal(t, 4096, len(files[0].Data)) + + assert.Equal(t, "test.db-wal", files[1].Name) + assert.Equal(t, 8272, len(files[1].Data)) +} + +const bindAddress = "@1" + func newDriver(t *testing.T) (*dqlitedriver.Driver, func()) { t.Helper() dir, dirCleanup := newDir(t) _, nodeCleanup := newNode(t, dir) - store := newStore(t, "@1") + store := newStore(t, bindAddress) log := logging.Test(t) @@ -683,7 +714,7 @@ func newStore(t *testing.T, address string) client.NodeStore { func newNode(t *testing.T, dir string) (*dqlite.Node, func()) { t.Helper() - server, err := dqlite.New(uint64(1), "@1", dir, dqlite.WithBindAddress("@1")) + server, err := dqlite.New(uint64(1), bindAddress, dir, dqlite.WithBindAddress(bindAddress)) require.NoError(t, err) err = server.Start() diff --git a/internal/protocol/config.go b/internal/protocol/config.go index 0555b4ac..6ebddd62 100644 --- a/internal/protocol/config.go +++ b/internal/protocol/config.go @@ -13,4 +13,5 @@ type Config struct { BackoffCap time.Duration // Maximum connection retry backoff value, RetryLimit uint // Maximum number of retries, or 0 for unlimited. ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. + PermitShared bool } diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 1fd95257..163a31ab 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -35,9 +35,11 @@ type Connector struct { type nonTracking struct{ NodeStore } -func (nt *nonTracking) Guess() string { return "" } -func (nt *nonTracking) Point(string) {} -func (nt *nonTracking) Shake() {} +func (nt *nonTracking) Guess() string { return "" } +func (nt *nonTracking) Point(string) {} +func (nt *nonTracking) Shake() {} +func (nt *nonTracking) Lease() *Session { return nil } +func (nt *nonTracking) Unlease(*Session) error { return nil } // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. @@ -84,8 +86,22 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * // Connect finds the leader server and returns a connection to it. // // If the connector is stopped before a leader is found, nil is returned. -func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { - var protocol *Protocol +func (c *Connector) Connect(ctx context.Context) (*Session, error) { + var protocol *Session + + if c.config.PermitShared { + sess := c.store.Lease() + if sess != nil { + leader, err := askLeader(ctx, sess.Protocol) + if err == nil && sess.Address == leader { + c.log(logging.Debug, "reusing shared connection to %s", sess.Address) + return sess, nil + } + c.log(logging.Debug, "discarding shared connection to %s", sess.Address) + sess.Bad() + sess.Close() + } + } strategies := makeRetryStrategies(c.config.BackoffFactor, c.config.BackoffCap, c.config.RetryLimit) @@ -133,13 +149,13 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { return protocol, nil } -func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Protocol, error) { +func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Session, error) { if addr := c.store.Guess(); addr != "" { // TODO In the event of failure, we could still use the second // return value to guide the next stage of the search. if p, _, _ := c.connectAttemptOne(ctx, ctx, addr, log); p != nil { log(logging.Debug, "server %s: connected on fast path", addr) - return p, nil + return &Session{Protocol: p, Address: addr}, nil } c.store.Shake() } @@ -165,11 +181,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P sem := semaphore.NewWeighted(c.config.ConcurrentLeaderConns) - type pair struct { - protocol *Protocol - address string - } - leaderCh := make(chan pair) + leaderCh := make(chan *Session) wg := &sync.WaitGroup{} wg.Add(len(servers)) @@ -181,7 +193,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P // Make an attempt for each address until we find the leader. for _, server := range servers { - go func(server NodeInfo, pc chan<- pair) { + go func(server NodeInfo, pc chan<- *Session) { defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { @@ -201,7 +213,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P } if protocol != nil { // We found the leader - pc <- pair{protocol, server.Address} + pc <- &Session{Protocol: protocol, Address: server.Address} return } if leader == "" { @@ -229,7 +241,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P log(logging.Warn, "server %s: reported leader server is not the leader", leader) return } - pc <- pair{protocol, leader} + pc <- &Session{Protocol: protocol, Address: leader} }(server, leaderCh) } @@ -238,16 +250,17 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P if !ok { return nil, ErrNoAvailableLeader } - log(logging.Debug, "server %s: connected on fallback path", leader.address) - c.store.Point(leader.address) + log(logging.Debug, "server %s: connected on fallback path", leader.Address) + c.store.Point(leader.Address) + leader.Tracker = c.store cancel() for extra := range leaderCh { - extra.protocol.Close() + extra.Close() } - return leader.protocol, nil + return leader, nil } // Perform the initial handshake using the given protocol version. @@ -319,32 +332,11 @@ func (c *Connector) connectAttemptOne( return nil, "", err } - // Send the initial Leader request. - request := Message{} - request.Init(16) - response := Message{} - response.Init(512) - - EncodeLeader(&request) - - if err := protocol.Call(ctx, &request, &response); err != nil { - protocol.Close() - cause := errors.Cause(err) - // Best-effort detection of a pre-1.0 dqlite node: when sent - // version 1 it should close the connection immediately. - if err, ok := cause.(*net.OpError); ok && !err.Timeout() || cause == io.EOF { - return nil, "", errBadProtocol - } - - return nil, "", err - } - - _, leader, err := DecodeNodeCompat(protocol, &response) + leader, err := askLeader(ctx, protocol) if err != nil { protocol.Close() return nil, "", err } - switch leader { case "": // Currently this server does not know about any leader. @@ -352,8 +344,10 @@ func (c *Connector) connectAttemptOne( return nil, "", nil case address: // This server is the leader, register ourselves and return. - request.reset() - response.reset() + request := Message{} + request.Init(16) + response := Message{} + response.Init(512) EncodeClient(&request, c.id) @@ -380,6 +374,32 @@ func (c *Connector) connectAttemptOne( } } +func askLeader(ctx context.Context, protocol *Protocol) (string, error) { + request := Message{} + request.Init(16) + response := Message{} + response.Init(512) + + EncodeLeader(&request) + + if err := protocol.Call(ctx, &request, &response); err != nil { + cause := errors.Cause(err) + // Best-effort detection of a pre-1.0 dqlite node: when sent + // version 1 it should close the connection immediately. + if err, ok := cause.(*net.OpError); ok && !err.Timeout() || cause == io.EOF { + return "", errBadProtocol + } + + return "", err + } + + _, leader, err := DecodeNodeCompat(protocol, &response) + if err != nil { + return "", err + } + return leader, nil +} + // Return a retry strategy with exponential backoff, capped at the given amount // of time and possibly with a maximum number of retries. func makeRetryStrategies(factor, cap time.Duration, limit uint) []strategy.Strategy { diff --git a/internal/protocol/connector_test.go b/internal/protocol/connector_test.go index 1223bd69..828552d7 100644 --- a/internal/protocol/connector_test.go +++ b/internal/protocol/connector_test.go @@ -54,6 +54,46 @@ func TestConnector_Success(t *testing.T) { }) } +// Open a connection with PermitShared set and then close it. Then, +// do the same thing again and verify that original connection is re-used. +func TestConnector_PermitShared(t *testing.T) { + address, cleanup := newNode(t, 0) + defer cleanup() + + store := newStore(t, []string{address}) + + log, check := newLogFunc(t) + connector := protocol.NewConnector(0, store, protocol.Config{}, log) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + client, err := connector.Connect(ctx) + require.NoError(t, err) + + assert.NoError(t, client.Close()) + + check([]string{ + "DEBUG: attempt 1: server @test-0: connected on fallback path", + }) + + log, check = newLogFunc(t) + config := protocol.Config{PermitShared: true} + connector = protocol.NewConnector(0, store, config, log) + + ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + client, err = connector.Connect(ctx) + require.NoError(t, err) + + assert.NoError(t, client.Close()) + + check([]string{ + "DEBUG: reusing shared connection to @test-0", + }) +} + // The network connection can't be established within the specified number of // attempts. func TestConnector_LimitRetries(t *testing.T) { diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go index df5d07e2..8f1f334e 100644 --- a/internal/protocol/protocol_test.go +++ b/internal/protocol/protocol_test.go @@ -165,7 +165,7 @@ func newProtocol(t *testing.T) (*protocol.Protocol, func()) { serverCleanup() } - return client, cleanup + return client.Protocol, cleanup } // Perform a client call. diff --git a/internal/protocol/store.go b/internal/protocol/store.go index 60d4ad30..288ee2ef 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -76,35 +76,121 @@ func (i *InmemNodeStore) Set(ctx context.Context, servers []NodeInfo) error { return nil } -type NodeStoreLeaderTracker interface { - NodeStore +// Session is a connection to a dqlite server with some attached metadata. +// +// The additional metadata is used to reuse the connection when possible. +type Session struct { + Protocol *Protocol + // The address of the server this session is connected to. + Address string + // Tracker points back to the LeaderTracker from which this session was leased, + // if any. + Tracker LeaderTracker +} + +// Bad marks the session as bad, so that it won't be reused. +func (sess *Session) Bad() { + sess.Protocol.mu.Lock() + defer sess.Protocol.mu.Unlock() + + sess.Tracker = nil +} + +// Close returns the session to its parent tracker if appropriate, +// or closes the underlying connection otherwise. +func (sess *Session) Close() error { + if tr := sess.Tracker; tr != nil { + return tr.Unlease(sess) + } + return sess.Protocol.Close() +} + +// A LeaderTracker stores the address of the last known cluster leader, +// and possibly a reusable connection to it. +type LeaderTracker interface { + // Guess returns the address of the last known leader, or nil if none has been recorded. Guess() string + // Point records the address of the current leader. Point(string) + // Shake unsets the recorded leader address. Shake() + + // Lease returns an existing session against a node that was once the leader, + // or nil if no existing session is available. + // + // The caller should not assume that the session's connection is still valid, + // that the remote node is still the leader, or that any particular operations + // have previously been performed on the session. + // When closed, the session will be returned to this tracker, unless + // another session has taken its place in the tracker's session slot + // or the session was marked as bad. + Lease() *Session + // Unlease passes ownership of a session to the tracker. + // + // The session need not have been obtained from a call to Lease. + // It will be made available for reuse by future calls to Lease. + Unlease(*Session) error +} + +// A NodeStoreLeaderTracker is a node store that also tracks the current leader. +type NodeStoreLeaderTracker interface { + NodeStore + LeaderTracker } +// Compass can be used to embed LeaderTracker functionality in another type. type Compass struct { - mu sync.RWMutex - lastKnownLeader string + mu sync.RWMutex + lastKnownLeaderAddr string + + session *Session } func (co *Compass) Guess() string { co.mu.RLock() defer co.mu.RUnlock() - return co.lastKnownLeader + return co.lastKnownLeaderAddr } func (co *Compass) Point(address string) { co.mu.Lock() defer co.mu.Unlock() - co.lastKnownLeader = address + co.lastKnownLeaderAddr = address } func (co *Compass) Shake() { co.mu.Lock() defer co.mu.Unlock() - co.lastKnownLeader = "" + co.lastKnownLeaderAddr = "" +} + +func (co *Compass) Lease() (sess *Session) { + co.mu.Lock() + defer co.mu.Unlock() + + if sess, co.session = co.session, nil; sess != nil { + sess.Tracker = co + } + return +} + +func (co *Compass) Unlease(sess *Session) error { + co.mu.Lock() + + if co.session == nil { + co.session = sess + co.mu.Unlock() + return nil + } else { + // Another call to Unlease has already filled the tracker's + // session slot, so just close this session. (Don't call + // sess.Close, as that would lead to recursion.) Also, unlock + // the mutex before closing the session, just so we know + // that it is never locked for longer than a single assignment. + co.mu.Unlock() + return sess.Protocol.Close() + } } From 0aa753ba9aace9e601cc0c2083b918b00f90802f Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 16:40:34 -0400 Subject: [PATCH 03/23] Always try to connect at least once This makes it easier to inject and test connection attempt failures. Signed-off-by: Cole Miller --- internal/protocol/connector.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 163a31ab..a1dd96bc 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -113,11 +113,13 @@ func (c *Connector) Connect(ctx context.Context) (*Session, error) { c.log(l, format, a...) } - select { - case <-ctx.Done(): - // Stop retrying - return nil - default: + if attempt > 1 { + select { + case <-ctx.Done(): + // Stop retrying + return nil + default: + } } var err error From 9db8e48010365fb0f13153e50c4170779c6fb6f1 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 16:43:16 -0400 Subject: [PATCH 04/23] Move leader tracking information to Protocol Signed-off-by: Cole Miller --- client/client.go | 37 +++--- client/leader.go | 4 +- driver/driver.go | 4 +- internal/protocol/connector.go | 179 +++++++++++++---------------- internal/protocol/protocol.go | 123 +++++++------------- internal/protocol/protocol_test.go | 6 +- internal/protocol/store.go | 91 +++------------ 7 files changed, 157 insertions(+), 287 deletions(-) diff --git a/client/client.go b/client/client.go index 0da67d29..43cfa37a 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,7 @@ type DialFunc = protocol.DialFunc // Client speaks the dqlite wire protocol. type Client struct { - session *protocol.Session + proto *protocol.Protocol } // Option that can be used to tweak client parameters. @@ -64,24 +64,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error return nil, errors.Wrap(err, "failed to establish network connection") } - proto, err := protocol.Handshake(ctx, conn, protocol.VersionOne) + proto, err := protocol.Handshake(ctx, conn, protocol.VersionOne, address) if err != nil { conn.Close() return nil, err } - sess := &protocol.Session{Protocol: proto, Address: address} - client := &Client{session: sess} - - return client, nil -} - -func (c *Client) call(ctx context.Context, request *protocol.Message, response *protocol.Message) error { - if err := c.session.Protocol.Call(ctx, request, response); err != nil { - c.session.Bad() - return err - } - return nil + return &Client{proto}, nil } // Leader returns information about the current leader, if any. @@ -93,7 +82,7 @@ func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) { protocol.EncodeLeader(&request) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Leader request") } @@ -116,7 +105,7 @@ func (c *Client) Cluster(ctx context.Context) ([]NodeInfo, error) { protocol.EncodeCluster(&request, protocol.ClusterFormatV1) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Cluster request") } @@ -146,7 +135,7 @@ func (c *Client) Dump(ctx context.Context, dbname string) ([]File, error) { protocol.EncodeDump(&request, dbname) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send dump request") } @@ -183,7 +172,7 @@ func (c *Client) Add(ctx context.Context, node NodeInfo) error { protocol.EncodeAdd(&request, node.ID, node.Address) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return err } @@ -219,7 +208,7 @@ func (c *Client) Assign(ctx context.Context, id uint64, role NodeRole) error { protocol.EncodeAssign(&request, id, uint64(role)) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return err } @@ -242,7 +231,7 @@ func (c *Client) Transfer(ctx context.Context, id uint64) error { protocol.EncodeTransfer(&request, id) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return err } @@ -262,7 +251,7 @@ func (c *Client) Remove(ctx context.Context, id uint64) error { protocol.EncodeRemove(&request, id) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return err } @@ -288,7 +277,7 @@ func (c *Client) Describe(ctx context.Context) (*NodeMetadata, error) { protocol.EncodeDescribe(&request, protocol.RequestDescribeFormatV0) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return nil, err } @@ -314,7 +303,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { protocol.EncodeWeight(&request, weight) - if err := c.call(ctx, &request, &response); err != nil { + if err := c.proto.Call(ctx, &request, &response); err != nil { return err } @@ -327,7 +316,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { // Close the client. func (c *Client) Close() error { - return c.session.Close() + return c.proto.Close() } // Create a client options object with sane defaults. diff --git a/client/leader.go b/client/leader.go index 8e357730..1b159df7 100644 --- a/client/leader.go +++ b/client/leader.go @@ -25,12 +25,12 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien PermitShared: true, } connector := protocol.NewConnector(0, store, config, o.LogFunc) - sess, err := connector.Connect(ctx) + proto, err := connector.Connect(ctx) if err != nil { return nil, err } - client := &Client{sess} + client := &Client{proto} return client, nil } diff --git a/driver/driver.go b/driver/driver.go index 5b8f736b..c6ed49ec 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -274,11 +274,11 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) { tracing: c.driver.tracing, } - sess, err := connector.Connect(ctx) + proto, err := connector.Connect(ctx) if err != nil { return nil, driverError(conn.log, errors.Wrap(err, "failed to create dqlite connection")) } - conn.protocol = sess.Protocol + conn.protocol = proto conn.request.Init(4096) conn.response.Init(4096) diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index a1dd96bc..00b991e2 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -33,13 +33,19 @@ type Connector struct { log logging.Func // Logging function. } +// nonTracking extends any NodeStore with no-op leader tracking. +// +// This is used as a fallback when the NodeStore used by the connector doesn't +// implement NodeStoreLeaderTracker. This can only be the case for a custom NodeStore +// provided by downstream. In this case, the connector will behave as it did before +// the LeaderTracker optimizations were introduced. type nonTracking struct{ NodeStore } -func (nt *nonTracking) Guess() string { return "" } -func (nt *nonTracking) Point(string) {} -func (nt *nonTracking) Shake() {} -func (nt *nonTracking) Lease() *Session { return nil } -func (nt *nonTracking) Unlease(*Session) error { return nil } +func (nt *nonTracking) GetLeaderAddr() string { return "" } +func (nt *nonTracking) SetLeaderAddr(string) {} +func (nt *nonTracking) UnsetLeaderAddr() {} +func (nt *nonTracking) TakeSharedProtocol() *Protocol { return nil } +func (nt *nonTracking) DonateSharedProtocol(*Protocol) bool { return false } // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. @@ -73,40 +79,29 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * nslt = &nonTracking{store} } - connector := &Connector{ - id: id, - store: nslt, - config: config, - log: log, - } - - return connector + return &Connector{id: id, store: nslt, config: config, log: log} } -// Connect finds the leader server and returns a connection to it. +// Connect returns a connection to the cluster leader. // -// If the connector is stopped before a leader is found, nil is returned. -func (c *Connector) Connect(ctx context.Context) (*Session, error) { - var protocol *Session - +// If the connector was configured with PermitShared, and a reusable connection +// is available from the leader tracker, that connection is returned. Otherwise, +// a new connection is opened. +func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { if c.config.PermitShared { - sess := c.store.Lease() - if sess != nil { - leader, err := askLeader(ctx, sess.Protocol) - if err == nil && sess.Address == leader { - c.log(logging.Debug, "reusing shared connection to %s", sess.Address) - return sess, nil + if sharedProto := c.store.TakeSharedProtocol(); sharedProto != nil { + if leaderAddr, err := askLeader(ctx, sharedProto); err == nil && sharedProto.addr == leaderAddr { + c.log(logging.Debug, "reusing shared connection to %s", sharedProto.addr) + c.store.SetLeaderAddr(leaderAddr) + return sharedProto, nil } - c.log(logging.Debug, "discarding shared connection to %s", sess.Address) - sess.Bad() - sess.Close() + c.log(logging.Debug, "discarding shared connection to %s", sharedProto.addr) + sharedProto.Bad() + sharedProto.Close() } } - strategies := makeRetryStrategies(c.config.BackoffFactor, c.config.BackoffCap, c.config.RetryLimit) - - // The retry strategy should be configured to retry indefinitely, until - // the given context is done. + var proto *Protocol err := retry.Retry(func(attempt uint) error { log := func(l logging.Level, format string, a ...interface{}) { format = fmt.Sprintf("attempt %d: ", attempt) + format @@ -123,51 +118,54 @@ func (c *Connector) Connect(ctx context.Context) (*Session, error) { } var err error - protocol, err = c.connectAttemptAll(ctx, log) - if err != nil { - return err - } - - return nil - }, strategies...) + proto, err = c.connectAttemptAll(ctx, log) + return err + }, c.config.RetryStrategies()...) - if err != nil { - // We exhausted the number of retries allowed by the configured - // strategy. - return nil, ErrNoAvailableLeader - } - - if ctx.Err() != nil { + if err != nil || ctx.Err() != nil { return nil, ErrNoAvailableLeader } // At this point we should have a connected protocol object, since the // retry loop didn't hit any error and the given context hasn't // expired. - if protocol == nil { + if proto == nil { panic("no protocol object") } - return protocol, nil + c.store.SetLeaderAddr(proto.addr) + if c.config.PermitShared { + proto.tracker = c.store + } + + return proto, nil } -func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Session, error) { - if addr := c.store.Guess(); addr != "" { +// connectAttemptAll tries to establish a new connection to the cluster leader. +// +// First, if the address of the last known leader has been recorded, try +// to connect to that server and confirm its leadership. This is a fast path +// for stable clusters that avoids opening lots of connections. If that fails, +// fall back to probing all servers in parallel, checking whether each +// is the leader itself or knows who the leader is. +func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Protocol, error) { + if addr := c.store.GetLeaderAddr(); addr != "" { // TODO In the event of failure, we could still use the second // return value to guide the next stage of the search. - if p, _, _ := c.connectAttemptOne(ctx, ctx, addr, log); p != nil { + if proto, _, _ := c.connectAttemptOne(ctx, ctx, addr, log); proto != nil { log(logging.Debug, "server %s: connected on fast path", addr) - return &Session{Protocol: p, Address: addr}, nil + return proto, nil } - c.store.Shake() + c.store.UnsetLeaderAddr() } servers, err := c.store.Get(ctx) if err != nil { return nil, errors.Wrap(err, "get servers") } - - // Sort servers by Role, from low to high. + // Probe voters before standbys before spares. Only voters can potentially + // be the leader, and standbys are more likely to know who the leader is + // than spares since they participate more in the cluster. sort.Slice(servers, func(i, j int) bool { return servers[i].Role < servers[j].Role }) @@ -181,21 +179,16 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*S ctx, cancel := context.WithCancel(ctx) defer cancel() + leaderCh := make(chan *Protocol) sem := semaphore.NewWeighted(c.config.ConcurrentLeaderConns) - - leaderCh := make(chan *Session) - wg := &sync.WaitGroup{} wg.Add(len(servers)) - go func() { wg.Wait() close(leaderCh) }() - - // Make an attempt for each address until we find the leader. for _, server := range servers { - go func(server NodeInfo, pc chan<- *Session) { + go func(server NodeInfo) { defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { @@ -203,70 +196,46 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*S } defer sem.Release(1) - if ctx.Err() != nil { - return - } - - protocol, leader, err := c.connectAttemptOne(origCtx, ctx, server.Address, log) + proto, leader, err := c.connectAttemptOne(origCtx, ctx, server.Address, log) if err != nil { - // This server is unavailable, try with the next target. - log(logging.Warn, "server %s: %s", server.Address, err.Error()) + log(logging.Warn, "server %s: %v", server.Address, err) return - } - if protocol != nil { - // We found the leader - pc <- &Session{Protocol: protocol, Address: server.Address} + } else if proto != nil { + leaderCh <- proto return - } - if leader == "" { - // This server does not know who the current leader is, - // try with the next target. + } else if leader == "" { log(logging.Warn, "server %s: no known leader", server.Address) return } - // If we get here, it means this server reported that another - // server is the leader, let's close the connection to this - // server and try with the suggested one. + // Try the server that the original server thinks is the leader. log(logging.Debug, "server %s: connect to reported leader %s", server.Address, leader) - - protocol, _, err = c.connectAttemptOne(origCtx, ctx, leader, log) + proto, _, err = c.connectAttemptOne(origCtx, ctx, leader, log) if err != nil { - // The leader reported by the previous server is - // unavailable, try with the next target. - log(logging.Warn, "server %s: reported leader unavailable err=%v", leader, err) + log(logging.Warn, "server %s: %v", leader, err) return - } - if protocol == nil { - // The leader reported by the target server does not consider itself - // the leader, try with the next target. + } else if proto == nil { log(logging.Warn, "server %s: reported leader server is not the leader", leader) return } - pc <- &Session{Protocol: protocol, Address: leader} - }(server, leaderCh) + leaderCh <- proto + }(server) } - // Read from protocol chan, cancel context leader, ok := <-leaderCh + cancel() if !ok { return nil, ErrNoAvailableLeader } - log(logging.Debug, "server %s: connected on fallback path", leader.Address) - c.store.Point(leader.Address) - leader.Tracker = c.store - - cancel() - + log(logging.Debug, "server %s: connected on fallback path", leader.addr) for extra := range leaderCh { extra.Close() } - return leader, nil } // Perform the initial handshake using the given protocol version. -func Handshake(ctx context.Context, conn net.Conn, version uint64) (*Protocol, error) { +func Handshake(ctx context.Context, conn net.Conn, version uint64, addr string) (*Protocol, error) { // Latest protocol version. protocol := make([]byte, 8) binary.LittleEndian.PutUint64(protocol, version) @@ -286,7 +255,7 @@ func Handshake(ctx context.Context, conn net.Conn, version uint64) (*Protocol, e return nil, errors.Wrap(io.ErrShortWrite, "short handshake write") } - return newProtocol(version, conn), nil + return &Protocol{conn: conn, version: version, addr: addr}, nil } // Connect to the given dqlite server and check if it's the leader. @@ -310,6 +279,10 @@ func (c *Connector) connectAttemptOne( log(l, format, a...) } + if ctx.Err() != nil { + return nil, "", ctx.Err() + } + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) defer cancel() @@ -323,11 +296,11 @@ func (c *Connector) connectAttemptOne( } version := VersionOne - protocol, err := Handshake(ctx, conn, version) + protocol, err := Handshake(ctx, conn, version, address) if err == errBadProtocol { log(logging.Warn, "unsupported protocol %d, attempt with legacy", version) version = VersionLegacy - protocol, err = Handshake(ctx, conn, version) + protocol, err = Handshake(ctx, conn, version, address) } if err != nil { conn.Close() @@ -376,6 +349,8 @@ func (c *Connector) connectAttemptOne( } } +// TODO move client logic including Leader method to Protocol, +// and get rid of this. func askLeader(ctx context.Context, protocol *Protocol) (string, error) { request := Message{} request.Init(16) diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 8a48ffcf..4a3c0021 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -13,21 +13,12 @@ import ( // Protocol sends and receive the dqlite message on the wire. type Protocol struct { - version uint64 // Protocol version - conn net.Conn // Underlying network connection. - closeCh chan struct{} // Stops the heartbeat when the connection gets closed - mu sync.Mutex // Serialize requests - netErr error // A network error occurred -} - -func newProtocol(version uint64, conn net.Conn) *Protocol { - protocol := &Protocol{ - version: version, - conn: conn, - closeCh: make(chan struct{}), - } - - return protocol + version uint64 // Protocol version + conn net.Conn // Underlying network connection. + mu sync.Mutex // Serialize requests + netErr error // A network error occurred + addr string + tracker LeaderTracker } // Call invokes a dqlite RPC, sending a request message and receiving a @@ -38,14 +29,15 @@ func (p *Protocol) Call(ctx context.Context, request, response *Message) (err er p.mu.Lock() defer p.mu.Unlock() - if p.netErr != nil { - return p.netErr + if err = p.netErr; err != nil { + return } defer func() { if err == nil { return } + p.Bad() switch errors.Cause(err).(type) { case *net.OpError: p.netErr = err @@ -75,13 +67,16 @@ func (p *Protocol) Call(ctx context.Context, request, response *Message) (err er } // More is used when a request maps to multiple responses. -func (p *Protocol) More(ctx context.Context, response *Message) error { - return p.recv(response) +func (p *Protocol) More(ctx context.Context, response *Message) (err error) { + if err = p.recv(response); err != nil { + p.Bad() + } + return } // Interrupt sends an interrupt request and awaits for the server's empty // response. -func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Message) error { +func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Message) (err error) { // We need to take a lock since the dqlite server currently does not // support concurrent requests. p.mu.Lock() @@ -95,12 +90,18 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me EncodeInterrupt(request, 0) - if err := p.send(request); err != nil { + defer func() { + if err != nil { + p.Bad() + } + }() + + if err = p.send(request); err != nil { return errors.Wrap(err, "failed to send interrupt request") } for { - if err := p.recv(response); err != nil { + if err = p.recv(response); err != nil { return errors.Wrap(err, "failed to receive response") } @@ -114,10 +115,25 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me return nil } -// Close the client connection. +// Bad prevents a protocol from being reused when it is released. +// +// There is no need to call Bad after a method of Protocol returns an error. +// Only call Bad when the protocol is deemed unsuitable for reuse for some +// higher-level reason. +func (p *Protocol) Bad() { + p.tracker = nil +} + +// Close releases a protocol. +// +// If the protocol was associated with a LeaderTracker, it will be made +// available for reuse by that tracker. Otherwise, the underlying connection +// will be closed. func (p *Protocol) Close() error { - close(p.closeCh) - return p.conn.Close() + if tr := p.tracker; tr == nil || !tr.DonateSharedProtocol(p) { + return p.conn.Close() + } + return nil } func (p *Protocol) send(req *Message) error { @@ -237,63 +253,6 @@ func (p *Protocol) recvFill(buf []byte) (int, error) { return -1, io.ErrNoProgress } -/* -func (p *Protocol) heartbeat() { - request := Message{} - request.Init(16) - response := Message{} - response.Init(512) - - for { - delay := c.heartbeatTimeout / 3 - - //c.logger.Debug("sending heartbeat", zap.Duration("delay", delay)) - time.Sleep(delay) - - // Check if we've been closed. - select { - case <-c.closeCh: - return - default: - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - - EncodeHeartbeat(&request, uint64(time.Now().Unix())) - - err := c.Call(ctx, &request, &response) - - // We bail out upon failures. - // - // TODO: make the client survive temporary disconnections. - if err != nil { - cancel() - //c.logger.Error("heartbeat failed", zap.Error(err)) - return - } - - //addresses, err := DecodeNodes(&response) - _, err = DecodeNodes(&response) - if err != nil { - cancel() - //c.logger.Error("invalid heartbeat response", zap.Error(err)) - return - } - - // if err := c.store.Set(ctx, addresses); err != nil { - // cancel() - // c.logger.Error("failed to update servers", zap.Error(err)) - // return - // } - - cancel() - - request.Reset() - response.Reset() - } -} -*/ - // DecodeNodeCompat handles also pre-1.0 legacy server messages. func DecodeNodeCompat(protocol *Protocol, response *Message) (uint64, string, error) { if protocol.version == VersionLegacy { diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go index 8f1f334e..6510b552 100644 --- a/internal/protocol/protocol_test.go +++ b/internal/protocol/protocol_test.go @@ -156,16 +156,16 @@ func newProtocol(t *testing.T) (*protocol.Protocol, func()) { ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) defer cancel() - client, err := connector.Connect(ctx) + proto, err := connector.Connect(ctx) require.NoError(t, err) cleanup := func() { - client.Close() + proto.Close() serverCleanup() } - return client.Protocol, cleanup + return proto, cleanup } // Perform a client call. diff --git a/internal/protocol/store.go b/internal/protocol/store.go index 288ee2ef..fd1d0e42 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -76,60 +76,15 @@ func (i *InmemNodeStore) Set(ctx context.Context, servers []NodeInfo) error { return nil } -// Session is a connection to a dqlite server with some attached metadata. -// -// The additional metadata is used to reuse the connection when possible. -type Session struct { - Protocol *Protocol - // The address of the server this session is connected to. - Address string - // Tracker points back to the LeaderTracker from which this session was leased, - // if any. - Tracker LeaderTracker -} - -// Bad marks the session as bad, so that it won't be reused. -func (sess *Session) Bad() { - sess.Protocol.mu.Lock() - defer sess.Protocol.mu.Unlock() - - sess.Tracker = nil -} - -// Close returns the session to its parent tracker if appropriate, -// or closes the underlying connection otherwise. -func (sess *Session) Close() error { - if tr := sess.Tracker; tr != nil { - return tr.Unlease(sess) - } - return sess.Protocol.Close() -} - // A LeaderTracker stores the address of the last known cluster leader, // and possibly a reusable connection to it. type LeaderTracker interface { - // Guess returns the address of the last known leader, or nil if none has been recorded. - Guess() string - // Point records the address of the current leader. - Point(string) - // Shake unsets the recorded leader address. - Shake() - - // Lease returns an existing session against a node that was once the leader, - // or nil if no existing session is available. - // - // The caller should not assume that the session's connection is still valid, - // that the remote node is still the leader, or that any particular operations - // have previously been performed on the session. - // When closed, the session will be returned to this tracker, unless - // another session has taken its place in the tracker's session slot - // or the session was marked as bad. - Lease() *Session - // Unlease passes ownership of a session to the tracker. - // - // The session need not have been obtained from a call to Lease. - // It will be made available for reuse by future calls to Lease. - Unlease(*Session) error + GetLeaderAddr() string + SetLeaderAddr(string) + UnsetLeaderAddr() + + TakeSharedProtocol() *Protocol + DonateSharedProtocol(*Protocol) (accepted bool) } // A NodeStoreLeaderTracker is a node store that also tracks the current leader. @@ -138,59 +93,51 @@ type NodeStoreLeaderTracker interface { LeaderTracker } -// Compass can be used to embed LeaderTracker functionality in another type. +// Compass implements LeaderTracker and is intended for embedding into a NodeStore. type Compass struct { mu sync.RWMutex lastKnownLeaderAddr string - session *Session + proto *Protocol } -func (co *Compass) Guess() string { +func (co *Compass) GetLeaderAddr() string { co.mu.RLock() defer co.mu.RUnlock() return co.lastKnownLeaderAddr } -func (co *Compass) Point(address string) { +func (co *Compass) SetLeaderAddr(address string) { co.mu.Lock() defer co.mu.Unlock() co.lastKnownLeaderAddr = address } -func (co *Compass) Shake() { +func (co *Compass) UnsetLeaderAddr() { co.mu.Lock() defer co.mu.Unlock() co.lastKnownLeaderAddr = "" } -func (co *Compass) Lease() (sess *Session) { +func (co *Compass) TakeSharedProtocol() (proto *Protocol) { co.mu.Lock() defer co.mu.Unlock() - if sess, co.session = co.session, nil; sess != nil { - sess.Tracker = co + if proto, co.proto = co.proto, nil; proto != nil { + proto.tracker = co } return } -func (co *Compass) Unlease(sess *Session) error { +func (co *Compass) DonateSharedProtocol(proto *Protocol) (accepted bool) { co.mu.Lock() + defer co.mu.Unlock() - if co.session == nil { - co.session = sess - co.mu.Unlock() - return nil - } else { - // Another call to Unlease has already filled the tracker's - // session slot, so just close this session. (Don't call - // sess.Close, as that would lead to recursion.) Also, unlock - // the mutex before closing the session, just so we know - // that it is never locked for longer than a single assignment. - co.mu.Unlock() - return sess.Protocol.Close() + if accepted = co.proto == nil; accepted { + co.proto = proto } + return } From 87297e865f512702f3222195d2109f6146f518f9 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 16:43:55 -0400 Subject: [PATCH 05/23] Refactor generation of retry strategies Signed-off-by: Cole Miller --- internal/protocol/config.go | 31 +++++++++++++++++++++++++++++-- internal/protocol/connector.go | 32 -------------------------------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/internal/protocol/config.go b/internal/protocol/config.go index 6ebddd62..f02a06f2 100644 --- a/internal/protocol/config.go +++ b/internal/protocol/config.go @@ -2,6 +2,9 @@ package protocol import ( "time" + + "github.com/Rican7/retry/backoff" + "github.com/Rican7/retry/strategy" ) // Config holds various configuration parameters for a dqlite client. @@ -9,9 +12,33 @@ type Config struct { Dial DialFunc // Network dialer. DialTimeout time.Duration // Timeout for establishing a network connection . AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership. + RetryLimit uint // Maximum number of retries, or 0 for unlimited. BackoffFactor time.Duration // Exponential backoff factor for retries. BackoffCap time.Duration // Maximum connection retry backoff value, - RetryLimit uint // Maximum number of retries, or 0 for unlimited. ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. - PermitShared bool + PermitShared bool // Whether it's okay to return a reused connection. +} + +// RetryStrategies returns a configuration for the retry package based on a Config. +func (config Config) RetryStrategies() (strategies []strategy.Strategy) { + limit, factor, cap := config.RetryLimit, config.BackoffFactor, config.BackoffCap + // Fix for change in behavior: https://github.com/Rican7/retry/pull/12 + if limit++; limit > 1 { + strategies = append(strategies, strategy.Limit(limit)) + } + backoffFunc := backoff.BinaryExponential(factor) + strategies = append(strategies, + func(attempt uint) bool { + if attempt > 0 { + duration := backoffFunc(attempt) + // Duration might be negative in case of integer overflow. + if !(0 < duration && duration <= cap) { + duration = cap + } + time.Sleep(duration) + } + return true + }, + ) + return } diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 00b991e2..28f206d3 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -11,8 +11,6 @@ import ( "time" "github.com/Rican7/retry" - "github.com/Rican7/retry/backoff" - "github.com/Rican7/retry/strategy" "github.com/canonical/go-dqlite/logging" "github.com/pkg/errors" "golang.org/x/sync/semaphore" @@ -377,34 +375,4 @@ func askLeader(ctx context.Context, protocol *Protocol) (string, error) { return leader, nil } -// Return a retry strategy with exponential backoff, capped at the given amount -// of time and possibly with a maximum number of retries. -func makeRetryStrategies(factor, cap time.Duration, limit uint) []strategy.Strategy { - limit += 1 // Fix for change in behavior: https://github.com/Rican7/retry/pull/12 - backoff := backoff.BinaryExponential(factor) - - strategies := []strategy.Strategy{} - - if limit > 1 { - strategies = append(strategies, strategy.Limit(limit)) - } - - strategies = append(strategies, - func(attempt uint) bool { - if attempt > 0 { - duration := backoff(attempt) - // Duration might be negative in case of integer overflow. - if duration > cap || duration <= 0 { - duration = cap - } - time.Sleep(duration) - } - - return true - }, - ) - - return strategies -} - var errBadProtocol = fmt.Errorf("bad protocol") From 1b43389f30fdf507de8004ef31e7e66c55865b56 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 16:49:43 -0400 Subject: [PATCH 06/23] More comprehensive/principled leader tracker testing Signed-off-by: Cole Miller --- internal/protocol/connector_test.go | 142 +++++++++++++++++++--------- 1 file changed, 95 insertions(+), 47 deletions(-) diff --git a/internal/protocol/connector_test.go b/internal/protocol/connector_test.go index 828552d7..784dc4bc 100644 --- a/internal/protocol/connector_test.go +++ b/internal/protocol/connector_test.go @@ -37,61 +37,109 @@ func TestConnector_Success(t *testing.T) { check([]string{ "DEBUG: attempt 1: server @test-0: connected on fallback path", }) +} - log, check = newLogFunc(t) - connector = protocol.NewConnector(0, store, protocol.Config{}, log) - - ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - client, err = connector.Connect(ctx) - require.NoError(t, err) - - assert.NoError(t, client.Close()) +// Check the interaction of Connector.Connect with a leader tracker. +// +// The leader tracker potentially stores two pieces of data, an address and a shared connection. +// This gives us four states: INIT (have neither address nor connection), HAVE_ADDR, HAVE_CONN, and HAVE_BOTH. +// Transitions between these states are triggered by Connector.Connect and Protocol.Close. +// This test methodically triggers all the possible transitions and checks that they have +// the intended externally-observable effects. +func TestConnector_LeaderTracker(t *testing.T) { + // options is a configuration for calling Connector.Connect + // in order to trigger a specific state transition. + type options struct { + config protocol.Config + injectFailure bool + returnProto bool + expectedLog []string + } - check([]string{ - "DEBUG: attempt 1: server @test-0: connected on fast path", - }) -} + injectFailure := func(o *options) { + o.injectFailure = true + o.expectedLog = append(o.expectedLog, "WARN: attempt 1: server @test-0: context deadline exceeded") + } + permitShared := func(o *options) { + o.config.PermitShared = true + } + returnProto := func(o *options) { + o.returnProto = true + } + expectDiscard := func(o *options) { + o.expectedLog = append(o.expectedLog, "DEBUG: discarding shared connection to @test-0") + } + expectFallback := func(o *options) { + o.expectedLog = append(o.expectedLog, "DEBUG: attempt 1: server @test-0: connected on fallback path") + } + expectFast := func(o *options) { + o.expectedLog = append(o.expectedLog, "DEBUG: attempt 1: server @test-0: connected on fast path") + } + expectShared := func(o *options) { + o.expectedLog = append(o.expectedLog, "DEBUG: reusing shared connection to @test-0") + } -// Open a connection with PermitShared set and then close it. Then, -// do the same thing again and verify that original connection is re-used. -func TestConnector_PermitShared(t *testing.T) { address, cleanup := newNode(t, 0) defer cleanup() - store := newStore(t, []string{address}) - log, check := newLogFunc(t) - connector := protocol.NewConnector(0, store, protocol.Config{}, log) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - client, err := connector.Connect(ctx) - require.NoError(t, err) - - assert.NoError(t, client.Close()) - - check([]string{ - "DEBUG: attempt 1: server @test-0: connected on fallback path", - }) - - log, check = newLogFunc(t) - config := protocol.Config{PermitShared: true} - connector = protocol.NewConnector(0, store, config, log) - - ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - client, err = connector.Connect(ctx) - require.NoError(t, err) - - assert.NoError(t, client.Close()) + check := func(opts ...func(*options)) *protocol.Protocol { + o := &options{config: protocol.Config{RetryLimit: 1}} + for _, opt := range opts { + opt(o) + } + log, checkLog := newLogFunc(t) + connector := protocol.NewConnector(0, store, o.config, log) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + if o.injectFailure { + ctx, cancel = context.WithDeadline(ctx, time.Unix(1, 0)) + defer cancel() + } + proto, err := connector.Connect(ctx) + if o.injectFailure { + require.Equal(t, protocol.ErrNoAvailableLeader, err) + } else { + require.NoError(t, err) + } + checkLog(o.expectedLog) + if o.returnProto { + return proto + } else if err == nil { + assert.NoError(t, proto.Close()) + } + return nil + } - check([]string{ - "DEBUG: reusing shared connection to @test-0", - }) + // INIT -> INIT + check(injectFailure) + // INIT -> HAVE_ADDR + check(expectFallback) + // HAVE_ADDR -> HAVE_ADDR + proto := check(permitShared, expectFast, returnProto) + // We need an extra protocol to trigger INIT->HAVE_CONN later. + // Grab one here where it doesn't cause a state transition. + protoForLater := check(permitShared, expectFast, returnProto) + // HAVE_ADDR -> HAVE_BOTH + assert.NoError(t, proto.Close()) + // HAVE_BOTH -> HAVE_BOTH + check(expectFast) + // HAVE_BOTH -> HAVE_CONN + check(injectFailure) + // HAVE_CONN -> HAVE_CONN + check(injectFailure) + // HAVE_CONN -> INIT + check(permitShared, expectDiscard, injectFailure) + // INIT -> HAVE_CONN + assert.NoError(t, protoForLater.Close()) + // HAVE_CONN -> HAVE_BOTH + check(expectFallback) + // HAVE_BOTH -> HAVE_ADDR + proto = check(permitShared, expectShared, returnProto) + proto.Bad() + assert.NoError(t, proto.Close()) + // HAVE_ADDR -> INIT + check(injectFailure) } // The network connection can't be established within the specified number of From 33bf955f82eef9c9c086f24f638e4c57c16c403b Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 17:34:00 -0400 Subject: [PATCH 07/23] Client-level test of connection reuse Signed-off-by: Cole Miller --- client/leader_test.go | 87 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 14 deletions(-) diff --git a/client/leader_test.go b/client/leader_test.go index f1739728..586a97e9 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -3,6 +3,7 @@ package client_test import ( "context" "fmt" + "reflect" "testing" "time" @@ -11,16 +12,81 @@ import ( "github.com/stretchr/testify/require" ) +// client.FindLeader recycles leader connections intelligently. +func TestReuse(t *testing.T) { + infos, cleanup := setup(t) + defer cleanup() + + store := client.NewInmemNodeStore() + store.Set(context.Background(), infos[:1]) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cli, err := client.FindLeader(ctx, store) + require.NoError(t, err) + firstProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + require.NoError(t, cli.Close()) + + cli, err = client.FindLeader(ctx, store) + require.NoError(t, err) + secondProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + + // The first leader connection was returned to the leader tracker + // when we closed the client, and is returned by the second call + // to FindLeader. + require.Equal(t, firstProto, secondProto) + + // The reused connection is good to go. + err = cli.Add(ctx, infos[1]) + require.NoError(t, err) + + // Trigger an unsuccessful Protocol operation that will cause the + // client's connection to be marked as unsuitable for reuse. + shortCtx, cancel := context.WithDeadline(context.Background(), time.Unix(1, 0)) + cancel() + _, err = cli.Cluster(shortCtx) + require.Error(t, err) + require.NoError(t, cli.Close()) + + cli, err = client.FindLeader(ctx, store) + require.NoError(t, err) + thirdProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + require.NoError(t, cli.Close()) + + // The previous connection was not reused. + require.NotEqual(t, secondProto, thirdProto) +} + func TestMembership(t *testing.T) { + infos, cleanup := setup(t) + defer cleanup() + + store := client.NewInmemNodeStore() + store.Set(context.Background(), infos[:1]) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + client, err := client.FindLeader(ctx, store) + require.NoError(t, err) + defer client.Close() + + err = client.Add(ctx, infos[1]) + require.NoError(t, err) +} + +func setup(t *testing.T) ([]client.NodeInfo, func()) { n := 3 nodes := make([]*dqlite.Node, n) infos := make([]client.NodeInfo, n) + var cleanups []func() for i := range nodes { id := uint64(i + 1) address := fmt.Sprintf("@test-%d", id) dir, cleanup := newDir(t) - defer cleanup() + cleanups = append(cleanups, cleanup) node, err := dqlite.New(id, address, dir, dqlite.WithBindAddress(address)) require.NoError(t, err) nodes[i] = node @@ -28,19 +94,12 @@ func TestMembership(t *testing.T) { infos[i].Address = address err = node.Start() require.NoError(t, err) - defer node.Close() + cleanups = append(cleanups, func() { node.Close() }) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - store := client.NewInmemNodeStore() - store.Set(context.Background(), []client.NodeInfo{infos[0]}) - - client, err := client.FindLeader(ctx, store) - require.NoError(t, err) - defer client.Close() - - err = client.Add(ctx, infos[1]) - require.NoError(t, err) + return infos, func() { + for i := len(cleanups) - 1; i >= 0; i-- { + cleanups[i]() + } + } } From 4d3510c45c4a88997f0b6264b73528d7ab4b3583 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 17:44:07 -0400 Subject: [PATCH 08/23] Names, docs Signed-off-by: Cole Miller --- client/database_store.go | 2 +- client/store.go | 2 +- internal/protocol/store.go | 61 ++++++++++++++++++++++---------------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/client/database_store.go b/client/database_store.go index 3a6f43d4..23029c17 100644 --- a/client/database_store.go +++ b/client/database_store.go @@ -23,7 +23,7 @@ type nodeStoreOptions struct { // DatabaseNodeStore persists a list addresses of dqlite nodes in a SQL table. type DatabaseNodeStore struct { - protocol.Compass + protocol.LT db *sql.DB // Database handle to use. schema string // Name of the schema holding the servers table. table string // Name of the servers table. diff --git a/client/store.go b/client/store.go index d8428b22..87c21343 100644 --- a/client/store.go +++ b/client/store.go @@ -30,7 +30,7 @@ var NewInmemNodeStore = protocol.NewInmemNodeStore // Persists a list addresses of dqlite nodes in a YAML file. type YamlNodeStore struct { - protocol.Compass + protocol.LT path string servers []NodeInfo mu sync.RWMutex diff --git a/internal/protocol/store.go b/internal/protocol/store.go index fd1d0e42..ceaf1224 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -46,7 +46,7 @@ type NodeStore interface { // InmemNodeStore keeps the list of servers in memory. type InmemNodeStore struct { - Compass + LT mu sync.RWMutex servers []NodeInfo } @@ -83,7 +83,18 @@ type LeaderTracker interface { SetLeaderAddr(string) UnsetLeaderAddr() + // TakeSharedProtocol transfers the reusable connection to the caller, + // if one is stored. TakeSharedProtocol() *Protocol + // DonateSharedProtocol possibly stores the given connection for reuse. + // + // The tracker has only one slot for a reusable connection. If the slot + // is already occupied, DonateSharedProtocol does nothing and returns + // false. Otherwise, it adopts the connection and returns true. + // + // This is called automatically by Protocol.Close for a connection that + // was obtained from TakeSharedProtocol. It's okay to call it with a + // connection that didn't come from TakeSharedProtocol, though. DonateSharedProtocol(*Protocol) (accepted bool) } @@ -93,51 +104,51 @@ type NodeStoreLeaderTracker interface { LeaderTracker } -// Compass implements LeaderTracker and is intended for embedding into a NodeStore. -type Compass struct { +// LT implements LeaderTracker and is intended for embedding into a NodeStore. +type LT struct { mu sync.RWMutex lastKnownLeaderAddr string proto *Protocol } -func (co *Compass) GetLeaderAddr() string { - co.mu.RLock() - defer co.mu.RUnlock() +func (lt *LT) GetLeaderAddr() string { + lt.mu.RLock() + defer lt.mu.RUnlock() - return co.lastKnownLeaderAddr + return lt.lastKnownLeaderAddr } -func (co *Compass) SetLeaderAddr(address string) { - co.mu.Lock() - defer co.mu.Unlock() +func (lt *LT) SetLeaderAddr(address string) { + lt.mu.Lock() + defer lt.mu.Unlock() - co.lastKnownLeaderAddr = address + lt.lastKnownLeaderAddr = address } -func (co *Compass) UnsetLeaderAddr() { - co.mu.Lock() - defer co.mu.Unlock() +func (lt *LT) UnsetLeaderAddr() { + lt.mu.Lock() + defer lt.mu.Unlock() - co.lastKnownLeaderAddr = "" + lt.lastKnownLeaderAddr = "" } -func (co *Compass) TakeSharedProtocol() (proto *Protocol) { - co.mu.Lock() - defer co.mu.Unlock() +func (lt *LT) TakeSharedProtocol() (proto *Protocol) { + lt.mu.Lock() + defer lt.mu.Unlock() - if proto, co.proto = co.proto, nil; proto != nil { - proto.tracker = co + if proto, lt.proto = lt.proto, nil; proto != nil { + proto.tracker = lt } return } -func (co *Compass) DonateSharedProtocol(proto *Protocol) (accepted bool) { - co.mu.Lock() - defer co.mu.Unlock() +func (lt *LT) DonateSharedProtocol(proto *Protocol) (accepted bool) { + lt.mu.Lock() + defer lt.mu.Unlock() - if accepted = co.proto == nil; accepted { - co.proto = proto + if accepted = lt.proto == nil; accepted { + lt.proto = proto } return } From 021b0b1d0bb4fd34ff482ca037bfc7ebb91e2ba6 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 18:24:44 -0400 Subject: [PATCH 09/23] Don't remain client field Signed-off-by: Cole Miller --- client/client.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/client/client.go b/client/client.go index 43cfa37a..9652f70d 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,7 @@ type DialFunc = protocol.DialFunc // Client speaks the dqlite wire protocol. type Client struct { - proto *protocol.Protocol + protocol *protocol.Protocol } // Option that can be used to tweak client parameters. @@ -82,7 +82,7 @@ func (c *Client) Leader(ctx context.Context) (*NodeInfo, error) { protocol.EncodeLeader(&request) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Leader request") } @@ -105,7 +105,7 @@ func (c *Client) Cluster(ctx context.Context) ([]NodeInfo, error) { protocol.EncodeCluster(&request, protocol.ClusterFormatV1) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send Cluster request") } @@ -135,7 +135,7 @@ func (c *Client) Dump(ctx context.Context, dbname string) ([]File, error) { protocol.EncodeDump(&request, dbname) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return nil, errors.Wrap(err, "failed to send dump request") } @@ -172,7 +172,7 @@ func (c *Client) Add(ctx context.Context, node NodeInfo) error { protocol.EncodeAdd(&request, node.ID, node.Address) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return err } @@ -208,7 +208,7 @@ func (c *Client) Assign(ctx context.Context, id uint64, role NodeRole) error { protocol.EncodeAssign(&request, id, uint64(role)) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return err } @@ -231,7 +231,7 @@ func (c *Client) Transfer(ctx context.Context, id uint64) error { protocol.EncodeTransfer(&request, id) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return err } @@ -251,7 +251,7 @@ func (c *Client) Remove(ctx context.Context, id uint64) error { protocol.EncodeRemove(&request, id) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return err } @@ -277,7 +277,7 @@ func (c *Client) Describe(ctx context.Context) (*NodeMetadata, error) { protocol.EncodeDescribe(&request, protocol.RequestDescribeFormatV0) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return nil, err } @@ -303,7 +303,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { protocol.EncodeWeight(&request, weight) - if err := c.proto.Call(ctx, &request, &response); err != nil { + if err := c.protocol.Call(ctx, &request, &response); err != nil { return err } @@ -316,7 +316,7 @@ func (c *Client) Weight(ctx context.Context, weight uint64) error { // Close the client. func (c *Client) Close() error { - return c.proto.Close() + return c.protocol.Close() } // Create a client options object with sane defaults. From 69140bc3e70c05ec95a317c57d0b86c1582df204 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 18:29:16 -0400 Subject: [PATCH 10/23] Clean up change to dqlite-demo.go Signed-off-by: Cole Miller --- cmd/dqlite-demo/dqlite-demo.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/dqlite-demo/dqlite-demo.go b/cmd/dqlite-demo/dqlite-demo.go index b3bad60b..ea73a21a 100644 --- a/cmd/dqlite-demo/dqlite-demo.go +++ b/cmd/dqlite-demo/dqlite-demo.go @@ -50,8 +50,12 @@ Complete documentation is available at https://github.com/canonical/go-dqlite`, log.Printf(fmt.Sprintf("%s: %s: %s\n", api, l.String(), format), a...) } - options := []app.Option{app.WithAddress(db), app.WithCluster(*join), app.WithLogFunc(logFunc), - app.WithDiskMode(diskMode), app.WithRolesAdjustmentFrequency(5 * time.Second)} + options := []app.Option{ + app.WithAddress(db), + app.WithCluster(*join), + app.WithLogFunc(logFunc), + app.WithDiskMode(diskMode), + } // Set TLS options if (crt != "" && key == "") || (key != "" && crt == "") { From ca72e1a23737a7532469e9812048983c85fdcce4 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 18:41:43 -0400 Subject: [PATCH 11/23] Fixup Signed-off-by: Cole Miller --- client/leader_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/leader_test.go b/client/leader_test.go index 586a97e9..7221733a 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -25,12 +25,12 @@ func TestReuse(t *testing.T) { cli, err := client.FindLeader(ctx, store) require.NoError(t, err) - firstProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + firstProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) cli, err = client.FindLeader(ctx, store) require.NoError(t, err) - secondProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + secondProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() // The first leader connection was returned to the leader tracker // when we closed the client, and is returned by the second call @@ -51,7 +51,7 @@ func TestReuse(t *testing.T) { cli, err = client.FindLeader(ctx, store) require.NoError(t, err) - thirdProto := reflect.ValueOf(cli).Elem().FieldByName("proto").Pointer() + thirdProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) // The previous connection was not reused. From cf404cc6e06e877214b76f368997f85b57134389 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 30 Sep 2024 19:33:06 -0400 Subject: [PATCH 12/23] Fixup Signed-off-by: Cole Miller --- cmd/dqlite-demo/dqlite-demo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/dqlite-demo/dqlite-demo.go b/cmd/dqlite-demo/dqlite-demo.go index ea73a21a..70fde471 100644 --- a/cmd/dqlite-demo/dqlite-demo.go +++ b/cmd/dqlite-demo/dqlite-demo.go @@ -13,7 +13,6 @@ import ( "os/signal" "path/filepath" "strings" - "time" "github.com/canonical/go-dqlite/app" "github.com/canonical/go-dqlite/client" From a914e16051b441b5f0a76504783881ef79e5ec1f Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 1 Oct 2024 12:00:09 -0400 Subject: [PATCH 13/23] Clean up diff a bit more Signed-off-by: Cole Miller --- client/client.go | 4 ++-- client/leader.go | 7 ++----- internal/protocol/connector.go | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 9652f70d..2ed71fd4 100644 --- a/client/client.go +++ b/client/client.go @@ -64,13 +64,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error return nil, errors.Wrap(err, "failed to establish network connection") } - proto, err := protocol.Handshake(ctx, conn, protocol.VersionOne, address) + protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne, address) if err != nil { conn.Close() return nil, err } - return &Client{proto}, nil + return &Client{protocol}, nil } // Leader returns information about the current leader, if any. diff --git a/client/leader.go b/client/leader.go index 1b159df7..1051ba95 100644 --- a/client/leader.go +++ b/client/leader.go @@ -25,12 +25,9 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien PermitShared: true, } connector := protocol.NewConnector(0, store, config, o.LogFunc) - proto, err := connector.Connect(ctx) + protocol, err := connector.Connect(ctx) if err != nil { return nil, err } - - client := &Client{proto} - - return client, nil + return &Client{protocol}, nil } diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 28f206d3..d82cb01a 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -77,7 +77,7 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * nslt = &nonTracking{store} } - return &Connector{id: id, store: nslt, config: config, log: log} + return &Connector{id, nslt, config, log} } // Connect returns a connection to the cluster leader. @@ -99,7 +99,7 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { } } - var proto *Protocol + var protocol *Protocol err := retry.Retry(func(attempt uint) error { log := func(l logging.Level, format string, a ...interface{}) { format = fmt.Sprintf("attempt %d: ", attempt) + format @@ -116,7 +116,7 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { } var err error - proto, err = c.connectAttemptAll(ctx, log) + protocol, err = c.connectAttemptAll(ctx, log) return err }, c.config.RetryStrategies()...) @@ -127,16 +127,16 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { // At this point we should have a connected protocol object, since the // retry loop didn't hit any error and the given context hasn't // expired. - if proto == nil { + if protocol == nil { panic("no protocol object") } - c.store.SetLeaderAddr(proto.addr) + c.store.SetLeaderAddr(protocol.addr) if c.config.PermitShared { - proto.tracker = c.store + protocol.tracker = c.store } - return proto, nil + return protocol, nil } // connectAttemptAll tries to establish a new connection to the cluster leader. @@ -194,12 +194,12 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P } defer sem.Release(1) - proto, leader, err := c.connectAttemptOne(origCtx, ctx, server.Address, log) + protocol, leader, err := c.connectAttemptOne(origCtx, ctx, server.Address, log) if err != nil { log(logging.Warn, "server %s: %v", server.Address, err) return - } else if proto != nil { - leaderCh <- proto + } else if protocol != nil { + leaderCh <- protocol return } else if leader == "" { log(logging.Warn, "server %s: no known leader", server.Address) @@ -208,15 +208,15 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P // Try the server that the original server thinks is the leader. log(logging.Debug, "server %s: connect to reported leader %s", server.Address, leader) - proto, _, err = c.connectAttemptOne(origCtx, ctx, leader, log) + protocol, _, err = c.connectAttemptOne(origCtx, ctx, leader, log) if err != nil { log(logging.Warn, "server %s: %v", leader, err) return - } else if proto == nil { + } else if protocol == nil { log(logging.Warn, "server %s: reported leader server is not the leader", leader) return } - leaderCh <- proto + leaderCh <- protocol }(server) } From 71628faf428b1f3db124d7029d00b64e7bd83235 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 19:28:35 -0400 Subject: [PATCH 14/23] Move leader tracking to protocol.Connector Signed-off-by: Cole Miller --- client/database_store.go | 2 +- client/leader.go | 17 +++-- client/store.go | 2 +- driver/driver.go | 24 +++---- internal/protocol/config.go | 1 - internal/protocol/connector.go | 118 +++++++++++++++++++++------------ internal/protocol/protocol.go | 6 +- internal/protocol/store.go | 79 +--------------------- 8 files changed, 107 insertions(+), 142 deletions(-) diff --git a/client/database_store.go b/client/database_store.go index 23029c17..9c1f2a01 100644 --- a/client/database_store.go +++ b/client/database_store.go @@ -23,7 +23,7 @@ type nodeStoreOptions struct { // DatabaseNodeStore persists a list addresses of dqlite nodes in a SQL table. type DatabaseNodeStore struct { - protocol.LT + protocol.LeaderTracker db *sql.DB // Database handle to use. schema string // Name of the schema holding the servers table. table string // Name of the servers table. diff --git a/client/leader.go b/client/leader.go index 1051ba95..b64ef70a 100644 --- a/client/leader.go +++ b/client/leader.go @@ -13,6 +13,12 @@ import ( // function will keep retrying (with a capped exponential backoff) until the // given context is canceled. func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Client, error) { + return NewLeaderConnector(store, options...).Find(ctx) +} + +type LeaderConnector protocol.Connector + +func NewLeaderConnector(store NodeStore, options ...Option) *LeaderConnector { o := defaultOptions() for _, option := range options { @@ -22,12 +28,15 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien config := protocol.Config{ Dial: o.DialFunc, ConcurrentLeaderConns: o.ConcurrentLeaderConns, - PermitShared: true, } - connector := protocol.NewConnector(0, store, config, o.LogFunc) - protocol, err := connector.Connect(ctx) + pc := protocol.NewConnector(0, store, config, o.LogFunc) + return (*LeaderConnector)(pc) +} + +func (lc *LeaderConnector) Find(ctx context.Context) (*Client, error) { + proto, err := (*protocol.Connector)(lc).ConnectPermitShared(ctx) if err != nil { return nil, err } - return &Client{protocol}, nil + return &Client{proto}, nil } diff --git a/client/store.go b/client/store.go index 87c21343..a83976be 100644 --- a/client/store.go +++ b/client/store.go @@ -30,7 +30,7 @@ var NewInmemNodeStore = protocol.NewInmemNodeStore // Persists a list addresses of dqlite nodes in a YAML file. type YamlNodeStore struct { - protocol.LT + protocol.LeaderTracker path string servers []NodeInfo mu sync.RWMutex diff --git a/driver/driver.go b/driver/driver.go index c6ed49ec..f4f1f17a 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -247,8 +247,9 @@ func defaultOptions() *options { // A Connector represents a driver in a fixed configuration and can create any // number of equivalent Conns for use by multiple goroutines. type Connector struct { - uri string - driver *Driver + uri string + driver *Driver + protocol *protocol.Connector } // Connect returns a connection to the database. @@ -263,18 +264,13 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) { defer cancel() } - // TODO: generate a client ID. - config := c.driver.clientConfig - config.ConcurrentLeaderConns = *c.driver.concurrentLeaderConns - connector := protocol.NewConnector(0, c.driver.store, config, c.driver.log) - conn := &Conn{ log: c.driver.log, contextTimeout: c.driver.contextTimeout, tracing: c.driver.tracing, } - proto, err := connector.Connect(ctx) + proto, err := c.protocol.Connect(ctx) if err != nil { return nil, driverError(conn.log, errors.Wrap(err, "failed to create dqlite connection")) } @@ -304,12 +300,16 @@ func (c *Connector) Driver() driver.Driver { return c.driver } -// OpenConnector must parse the name in the same format that Driver.Open -// parses the name parameter. +// OpenConnector creates a reusable Connector for a specific database. func (d *Driver) OpenConnector(name string) (driver.Connector, error) { + // TODO: generate a client ID. + config := d.clientConfig + config.ConcurrentLeaderConns = *d.concurrentLeaderConns + pc := protocol.NewConnector(0, d.store, config, d.log) connector := &Connector{ - uri: name, - driver: d, + uri: name, + driver: d, + protocol: pc, } return connector, nil } diff --git a/internal/protocol/config.go b/internal/protocol/config.go index f02a06f2..d18bed1e 100644 --- a/internal/protocol/config.go +++ b/internal/protocol/config.go @@ -16,7 +16,6 @@ type Config struct { BackoffFactor time.Duration // Exponential backoff factor for retries. BackoffCap time.Duration // Maximum connection retry backoff value, ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. - PermitShared bool // Whether it's okay to return a reused connection. } // RetryStrategies returns a configuration for the retry package based on a Config. diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index d82cb01a..315611a1 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -22,29 +22,64 @@ const MaxConcurrentLeaderConns int64 = 10 // DialFunc is a function that can be used to establish a network connection. type DialFunc func(context.Context, string) (net.Conn, error) +type LeaderTracker struct { + mu sync.RWMutex + lastKnownLeaderAddr string + + proto *Protocol +} + +func (lt *LeaderTracker) GetLeaderAddr() string { + lt.mu.RLock() + defer lt.mu.RUnlock() + + return lt.lastKnownLeaderAddr +} + +func (lt *LeaderTracker) SetLeaderAddr(address string) { + lt.mu.Lock() + defer lt.mu.Unlock() + + lt.lastKnownLeaderAddr = address +} + +func (lt *LeaderTracker) UnsetLeaderAddr() { + lt.mu.Lock() + defer lt.mu.Unlock() + + lt.lastKnownLeaderAddr = "" +} + +func (lt *LeaderTracker) TakeSharedProtocol() (proto *Protocol) { + lt.mu.Lock() + defer lt.mu.Unlock() + + if proto, lt.proto = lt.proto, nil; proto != nil { + proto.lt = lt + } + return +} + +func (lt *LeaderTracker) DonateSharedProtocol(proto *Protocol) (accepted bool) { + lt.mu.Lock() + defer lt.mu.Unlock() + + if accepted = lt.proto == nil; accepted { + lt.proto = proto + } + return +} + // Connector is in charge of creating a dqlite SQL client connected to the // current leader of a cluster. type Connector struct { id uint64 // Conn ID to use when registering against the server. - store NodeStoreLeaderTracker + store NodeStore + lt *LeaderTracker config Config // Connection parameters. log logging.Func // Logging function. } -// nonTracking extends any NodeStore with no-op leader tracking. -// -// This is used as a fallback when the NodeStore used by the connector doesn't -// implement NodeStoreLeaderTracker. This can only be the case for a custom NodeStore -// provided by downstream. In this case, the connector will behave as it did before -// the LeaderTracker optimizations were introduced. -type nonTracking struct{ NodeStore } - -func (nt *nonTracking) GetLeaderAddr() string { return "" } -func (nt *nonTracking) SetLeaderAddr(string) {} -func (nt *nonTracking) UnsetLeaderAddr() {} -func (nt *nonTracking) TakeSharedProtocol() *Protocol { return nil } -func (nt *nonTracking) DonateSharedProtocol(*Protocol) bool { return false } - // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector { @@ -72,33 +107,35 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * config.ConcurrentLeaderConns = MaxConcurrentLeaderConns } - nslt, ok := store.(NodeStoreLeaderTracker) - if !ok { - nslt = &nonTracking{store} - } + lt := &LeaderTracker{} - return &Connector{id, nslt, config, log} + return &Connector{id, store, lt, config, log} } -// Connect returns a connection to the cluster leader. -// -// If the connector was configured with PermitShared, and a reusable connection -// is available from the leader tracker, that connection is returned. Otherwise, -// a new connection is opened. -func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { - if c.config.PermitShared { - if sharedProto := c.store.TakeSharedProtocol(); sharedProto != nil { - if leaderAddr, err := askLeader(ctx, sharedProto); err == nil && sharedProto.addr == leaderAddr { - c.log(logging.Debug, "reusing shared connection to %s", sharedProto.addr) - c.store.SetLeaderAddr(leaderAddr) - return sharedProto, nil - } - c.log(logging.Debug, "discarding shared connection to %s", sharedProto.addr) - sharedProto.Bad() - sharedProto.Close() +// Connect returns a connection to the cluster leader, possibly recycled from +// the LeaderTracker. +func (c *Connector) ConnectPermitShared(ctx context.Context) (*Protocol, error) { + if sharedProto := c.lt.TakeSharedProtocol(); sharedProto != nil { + if leaderAddr, err := askLeader(ctx, sharedProto); err == nil && sharedProto.addr == leaderAddr { + c.log(logging.Debug, "reusing shared connection to %s", sharedProto.addr) + c.lt.SetLeaderAddr(leaderAddr) + return sharedProto, nil } + c.log(logging.Debug, "discarding shared connection to %s", sharedProto.addr) + sharedProto.Bad() + sharedProto.Close() + } + + protocol, err := c.Connect(ctx) + if err != nil { + return nil, err } + protocol.lt = c.lt + return protocol, nil +} +// Connect returns a new connection to the cluster leader. +func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { var protocol *Protocol err := retry.Retry(func(attempt uint) error { log := func(l logging.Level, format string, a ...interface{}) { @@ -131,10 +168,7 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { panic("no protocol object") } - c.store.SetLeaderAddr(protocol.addr) - if c.config.PermitShared { - protocol.tracker = c.store - } + c.lt.SetLeaderAddr(protocol.addr) return protocol, nil } @@ -147,14 +181,14 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { // fall back to probing all servers in parallel, checking whether each // is the leader itself or knows who the leader is. func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*Protocol, error) { - if addr := c.store.GetLeaderAddr(); addr != "" { + if addr := c.lt.GetLeaderAddr(); addr != "" { // TODO In the event of failure, we could still use the second // return value to guide the next stage of the search. if proto, _, _ := c.connectAttemptOne(ctx, ctx, addr, log); proto != nil { log(logging.Debug, "server %s: connected on fast path", addr) return proto, nil } - c.store.UnsetLeaderAddr() + c.lt.UnsetLeaderAddr() } servers, err := c.store.Get(ctx) diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 4a3c0021..7b402c31 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -18,7 +18,7 @@ type Protocol struct { mu sync.Mutex // Serialize requests netErr error // A network error occurred addr string - tracker LeaderTracker + lt *LeaderTracker } // Call invokes a dqlite RPC, sending a request message and receiving a @@ -121,7 +121,7 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me // Only call Bad when the protocol is deemed unsuitable for reuse for some // higher-level reason. func (p *Protocol) Bad() { - p.tracker = nil + p.lt = nil } // Close releases a protocol. @@ -130,7 +130,7 @@ func (p *Protocol) Bad() { // available for reuse by that tracker. Otherwise, the underlying connection // will be closed. func (p *Protocol) Close() error { - if tr := p.tracker; tr == nil || !tr.DonateSharedProtocol(p) { + if tr := p.lt; tr == nil || !tr.DonateSharedProtocol(p) { return p.conn.Close() } return nil diff --git a/internal/protocol/store.go b/internal/protocol/store.go index ceaf1224..95e65af4 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -46,7 +46,7 @@ type NodeStore interface { // InmemNodeStore keeps the list of servers in memory. type InmemNodeStore struct { - LT + LeaderTracker mu sync.RWMutex servers []NodeInfo } @@ -75,80 +75,3 @@ func (i *InmemNodeStore) Set(ctx context.Context, servers []NodeInfo) error { i.servers = servers return nil } - -// A LeaderTracker stores the address of the last known cluster leader, -// and possibly a reusable connection to it. -type LeaderTracker interface { - GetLeaderAddr() string - SetLeaderAddr(string) - UnsetLeaderAddr() - - // TakeSharedProtocol transfers the reusable connection to the caller, - // if one is stored. - TakeSharedProtocol() *Protocol - // DonateSharedProtocol possibly stores the given connection for reuse. - // - // The tracker has only one slot for a reusable connection. If the slot - // is already occupied, DonateSharedProtocol does nothing and returns - // false. Otherwise, it adopts the connection and returns true. - // - // This is called automatically by Protocol.Close for a connection that - // was obtained from TakeSharedProtocol. It's okay to call it with a - // connection that didn't come from TakeSharedProtocol, though. - DonateSharedProtocol(*Protocol) (accepted bool) -} - -// A NodeStoreLeaderTracker is a node store that also tracks the current leader. -type NodeStoreLeaderTracker interface { - NodeStore - LeaderTracker -} - -// LT implements LeaderTracker and is intended for embedding into a NodeStore. -type LT struct { - mu sync.RWMutex - lastKnownLeaderAddr string - - proto *Protocol -} - -func (lt *LT) GetLeaderAddr() string { - lt.mu.RLock() - defer lt.mu.RUnlock() - - return lt.lastKnownLeaderAddr -} - -func (lt *LT) SetLeaderAddr(address string) { - lt.mu.Lock() - defer lt.mu.Unlock() - - lt.lastKnownLeaderAddr = address -} - -func (lt *LT) UnsetLeaderAddr() { - lt.mu.Lock() - defer lt.mu.Unlock() - - lt.lastKnownLeaderAddr = "" -} - -func (lt *LT) TakeSharedProtocol() (proto *Protocol) { - lt.mu.Lock() - defer lt.mu.Unlock() - - if proto, lt.proto = lt.proto, nil; proto != nil { - proto.tracker = lt - } - return -} - -func (lt *LT) DonateSharedProtocol(proto *Protocol) (accepted bool) { - lt.mu.Lock() - defer lt.mu.Unlock() - - if accepted = lt.proto == nil; accepted { - lt.proto = proto - } - return -} From ff43520f299554d8c774df056a361ae2efb160ce Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 19:41:22 -0400 Subject: [PATCH 15/23] Fix up tests Signed-off-by: Cole Miller --- client/leader_test.go | 12 +++++++----- internal/protocol/connector_test.go | 20 +++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/client/leader_test.go b/client/leader_test.go index 7221733a..ddd89a90 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/require" ) -// client.FindLeader recycles leader connections intelligently. -func TestReuse(t *testing.T) { +// LeaderConnector recycles connections intelligently. +func TestLeaderConnector(t *testing.T) { infos, cleanup := setup(t) defer cleanup() @@ -23,12 +23,14 @@ func TestReuse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - cli, err := client.FindLeader(ctx, store) + connector := client.NewLeaderConnector(store) + + cli, err := connector.Find(ctx) require.NoError(t, err) firstProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) - cli, err = client.FindLeader(ctx, store) + cli, err = connector.Find(ctx) require.NoError(t, err) secondProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() @@ -49,7 +51,7 @@ func TestReuse(t *testing.T) { require.Error(t, err) require.NoError(t, cli.Close()) - cli, err = client.FindLeader(ctx, store) + cli, err = connector.Find(ctx) require.NoError(t, err) thirdProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) diff --git a/internal/protocol/connector_test.go b/internal/protocol/connector_test.go index 784dc4bc..adf06baf 100644 --- a/internal/protocol/connector_test.go +++ b/internal/protocol/connector_test.go @@ -50,10 +50,10 @@ func TestConnector_LeaderTracker(t *testing.T) { // options is a configuration for calling Connector.Connect // in order to trigger a specific state transition. type options struct { - config protocol.Config injectFailure bool returnProto bool expectedLog []string + permitShared bool } injectFailure := func(o *options) { @@ -61,7 +61,7 @@ func TestConnector_LeaderTracker(t *testing.T) { o.expectedLog = append(o.expectedLog, "WARN: attempt 1: server @test-0: context deadline exceeded") } permitShared := func(o *options) { - o.config.PermitShared = true + o.permitShared = true } returnProto := func(o *options) { o.returnProto = true @@ -82,21 +82,26 @@ func TestConnector_LeaderTracker(t *testing.T) { address, cleanup := newNode(t, 0) defer cleanup() store := newStore(t, []string{address}) - + log, checkLog := newLogFunc(t) + connector := protocol.NewConnector(0, store, protocol.Config{RetryLimit: 1}, log) check := func(opts ...func(*options)) *protocol.Protocol { - o := &options{config: protocol.Config{RetryLimit: 1}} + o := &options{} for _, opt := range opts { opt(o) } - log, checkLog := newLogFunc(t) - connector := protocol.NewConnector(0, store, o.config, log) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() if o.injectFailure { ctx, cancel = context.WithDeadline(ctx, time.Unix(1, 0)) defer cancel() } - proto, err := connector.Connect(ctx) + var proto *protocol.Protocol + var err error + if o.permitShared { + proto, err = connector.ConnectPermitShared(ctx) + } else { + proto, err = connector.Connect(ctx) + } if o.injectFailure { require.Equal(t, protocol.ErrNoAvailableLeader, err) } else { @@ -407,6 +412,7 @@ func newLogFunc(t *testing.T) (logging.Func, func([]string)) { } check := func(expected []string) { assert.Equal(t, expected, messages) + messages = messages[:0] } return log, check } From 0da97bb0c1932f33858745b1f26105c13cf77d0d Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 20:16:39 -0400 Subject: [PATCH 16/23] Take advantage of leader tracking for App Signed-off-by: Cole Miller --- app/app.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/app/app.go b/app/app.go index 3c148277..f9aeffbd 100644 --- a/app/app.go +++ b/app/app.go @@ -40,6 +40,7 @@ type App struct { tls *tlsSetup dialFunc client.DialFunc store client.NodeStore + lc *client.LeaderConnector driver *driver.Driver driverName string log client.LogFunc @@ -239,6 +240,13 @@ func New(dir string, options ...Option) (app *App, err error) { nodeBindAddress = nodeBindAddress[1:] } + lc := client.NewLeaderConnector( + store, + client.WithDialFunc(driverDial), + client.WithLogFunc(o.Log), + client.WithConcurrentLeaderConns(*o.ConcurrentLeaderConns), + ) + app = &App{ id: info.ID, address: info.Address, @@ -247,6 +255,7 @@ func New(dir string, options ...Option) (app *App, err error) { nodeBindAddress: nodeBindAddress, store: store, dialFunc: driverDial, + lc: lc, driver: driver, driverName: driverName, log: o.Log, @@ -325,7 +334,7 @@ func (a *App) Handover(ctx context.Context) error { ctx, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() - cli, err := a.Leader(ctx) + cli, err := a.FindLeader(ctx) if err != nil { return fmt.Errorf("find leader: %w", err) } @@ -379,7 +388,7 @@ func (a *App) Handover(ctx context.Context) error { return fmt.Errorf("transfer leadership: %w", err) } } - cli, err = a.Leader(ctx) + cli, err = a.FindLeader(ctx) if err != nil { return fmt.Errorf("find new leader: %w", err) } @@ -493,6 +502,10 @@ func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Cli return client.FindLeader(ctx, a.store, allOptions...) } +func (a *App) FindLeader(ctx context.Context) (*client.Client, error) { + return a.lc.Find(ctx) +} + // Client returns a client connected to the local node. func (a *App) Client(ctx context.Context) (*client.Client, error) { return client.New(ctx, a.nodeBindAddress) @@ -545,7 +558,7 @@ func (a *App) run(ctx context.Context, options *options, join bool) { } return case <-time.After(delay): - cli, err := a.Leader(ctx) + cli, err := a.FindLeader(ctx) if err != nil { continue } @@ -739,7 +752,11 @@ func (a *App) makeRolesChanges(nodes []client.NodeInfo) RolesChanges { // Return the options to use for client.FindLeader() or client.New() func (a *App) clientOptions() []client.Option { - return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log), client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns)} + return []client.Option{ + client.WithDialFunc(a.dialFunc), + client.WithLogFunc(a.log), + client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns), + } } func (a *App) debug(format string, args ...interface{}) { From 30922adac52b15cc4a06e9d1e5c5784cdf6a55b1 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 20:21:21 -0400 Subject: [PATCH 17/23] Docs Signed-off-by: Cole Miller --- app/app.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/app/app.go b/app/app.go index f9aeffbd..880f7545 100644 --- a/app/app.go +++ b/app/app.go @@ -494,7 +494,9 @@ func (a *App) Open(ctx context.Context, database string) (*sql.DB, error) { return db, nil } -// Leader returns a client connected to the current cluster leader, if any. +// Leader returns a client connected to the cluster leader. +// +// Prefer to use FindLeader instead unless you need to pass custom options. func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Client, error) { allOptions := a.clientOptions() allOptions = append(allOptions, options...) @@ -502,6 +504,10 @@ func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Cli return client.FindLeader(ctx, a.store, allOptions...) } +// FindLeader returns a client connected to the cluster leader. +// +// Compared to Leader, this method avoids opening extra connections int many +// cases, but doesn't accept custom options. func (a *App) FindLeader(ctx context.Context) (*client.Client, error) { return a.lc.Find(ctx) } From b3c99baba5fd587c9f4a2af548d61ff02675b908 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 20:24:55 -0400 Subject: [PATCH 18/23] Drop embedded LeaderTracker Signed-off-by: Cole Miller --- client/database_store.go | 2 -- client/store.go | 1 - internal/protocol/store.go | 1 - 3 files changed, 4 deletions(-) diff --git a/client/database_store.go b/client/database_store.go index 9c1f2a01..17ef7eb9 100644 --- a/client/database_store.go +++ b/client/database_store.go @@ -9,7 +9,6 @@ import ( "fmt" "strings" - "github.com/canonical/go-dqlite/internal/protocol" _ "github.com/mattn/go-sqlite3" // Go SQLite bindings "github.com/pkg/errors" ) @@ -23,7 +22,6 @@ type nodeStoreOptions struct { // DatabaseNodeStore persists a list addresses of dqlite nodes in a SQL table. type DatabaseNodeStore struct { - protocol.LeaderTracker db *sql.DB // Database handle to use. schema string // Name of the schema holding the servers table. table string // Name of the servers table. diff --git a/client/store.go b/client/store.go index a83976be..6e12646d 100644 --- a/client/store.go +++ b/client/store.go @@ -30,7 +30,6 @@ var NewInmemNodeStore = protocol.NewInmemNodeStore // Persists a list addresses of dqlite nodes in a YAML file. type YamlNodeStore struct { - protocol.LeaderTracker path string servers []NodeInfo mu sync.RWMutex diff --git a/internal/protocol/store.go b/internal/protocol/store.go index 95e65af4..5930e5c5 100644 --- a/internal/protocol/store.go +++ b/internal/protocol/store.go @@ -46,7 +46,6 @@ type NodeStore interface { // InmemNodeStore keeps the list of servers in memory. type InmemNodeStore struct { - LeaderTracker mu sync.RWMutex servers []NodeInfo } From 42dfb375071d3c9b0d69782f3bfcc8496e64f43a Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 13:42:56 -0400 Subject: [PATCH 19/23] Fixes First, avoid infinite recursion when logging in connectAttemptOne. Second, fix the tests by logging the error from Semaphore.Acquire. It seems that before we bumped the version of golang.org/x/sync/semaphore, this type didn't honor the context deadline, hence why the tests were fine for me locally without merging in the go.mod updates that are on master. Signed-off-by: Cole Miller --- internal/protocol/connector.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 315611a1..bcb02eeb 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -224,6 +224,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { + log(logging.Warn, "server %s: %v", server.Address, err) return } defer sem.Release(1) @@ -304,11 +305,11 @@ func (c *Connector) connectAttemptOne( dialCtx context.Context, ctx context.Context, address string, - log logging.Func, + origLog logging.Func, ) (*Protocol, string, error) { - log = func(l logging.Level, format string, a ...interface{}) { + log := func(l logging.Level, format string, a ...interface{}) { format = fmt.Sprintf("server %s: ", address) + format - log(l, format, a...) + origLog(l, format, a...) } if ctx.Err() != nil { From 162c3e5af516e051247f83e50e772b33f8331a0f Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Wed, 16 Oct 2024 13:41:04 -0400 Subject: [PATCH 20/23] Unified client connector Signed-off-by: Cole Miller --- app/app.go | 5 +- client/client.go | 41 ++++++++++ client/leader.go | 29 +------ client/leader_test.go | 10 +-- driver/driver.go | 3 +- internal/protocol/config.go | 3 +- internal/protocol/connector.go | 112 +++++++++++++++++++--------- internal/protocol/connector_test.go | 58 ++++++-------- internal/protocol/protocol_test.go | 2 +- 9 files changed, 152 insertions(+), 111 deletions(-) diff --git a/app/app.go b/app/app.go index 880f7545..acd0288e 100644 --- a/app/app.go +++ b/app/app.go @@ -40,7 +40,7 @@ type App struct { tls *tlsSetup dialFunc client.DialFunc store client.NodeStore - lc *client.LeaderConnector + lc *client.Connector driver *driver.Driver driverName string log client.LogFunc @@ -245,6 +245,7 @@ func New(dir string, options ...Option) (app *App, err error) { client.WithDialFunc(driverDial), client.WithLogFunc(o.Log), client.WithConcurrentLeaderConns(*o.ConcurrentLeaderConns), + client.WithPermitShared(true), ) app = &App{ @@ -509,7 +510,7 @@ func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Cli // Compared to Leader, this method avoids opening extra connections int many // cases, but doesn't accept custom options. func (a *App) FindLeader(ctx context.Context) (*client.Client, error) { - return a.lc.Find(ctx) + return a.lc.Connect(ctx) } // Client returns a client connected to the local node. diff --git a/client/client.go b/client/client.go index 2ed71fd4..51d2a0ff 100644 --- a/client/client.go +++ b/client/client.go @@ -22,6 +22,7 @@ type options struct { DialFunc DialFunc LogFunc LogFunc ConcurrentLeaderConns int64 + PermitShared bool } // WithDialFunc sets a custom dial function for creating the client network @@ -50,6 +51,12 @@ func WithConcurrentLeaderConns(maxConns int64) Option { } } +func WithPermitShared(permit bool) Option { + return func(o *options) { + o.PermitShared = permit + } +} + // New creates a new client connected to the dqlite node with the given // address. func New(ctx context.Context, address string, options ...Option) (*Client, error) { @@ -327,3 +334,37 @@ func defaultOptions() *options { ConcurrentLeaderConns: protocol.MaxConcurrentLeaderConns, } } + +type Connector protocol.Connector + +func NewLeaderConnector(store NodeStore, options ...Option) *Connector { + opts := defaultOptions() + for _, o := range options { + o(opts) + } + config := protocol.Config{ + Dial: opts.DialFunc, + ConcurrentLeaderConns: opts.ConcurrentLeaderConns, + PermitShared: opts.PermitShared, + } + inner := protocol.NewLeaderConnector(store, config, opts.LogFunc) + return (*Connector)(inner) +} + +func NewDirectConnector(id uint64, address string, options ...Option) *Connector { + opts := defaultOptions() + for _, o := range options { + o(opts) + } + config := protocol.Config{Dial: opts.DialFunc} + inner := protocol.NewDirectConnector(id, address, config, opts.LogFunc) + return (*Connector)(inner) +} + +func (connector *Connector) Connect(ctx context.Context) (*Client, error) { + protocol, err := (*protocol.Connector)(connector).Connect(ctx) + if err != nil { + return nil, err + } + return &Client{protocol}, nil +} diff --git a/client/leader.go b/client/leader.go index b64ef70a..71c433d5 100644 --- a/client/leader.go +++ b/client/leader.go @@ -2,8 +2,6 @@ package client import ( "context" - - "github.com/canonical/go-dqlite/internal/protocol" ) // FindLeader returns a Client connected to the current cluster leader. @@ -13,30 +11,5 @@ import ( // function will keep retrying (with a capped exponential backoff) until the // given context is canceled. func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Client, error) { - return NewLeaderConnector(store, options...).Find(ctx) -} - -type LeaderConnector protocol.Connector - -func NewLeaderConnector(store NodeStore, options ...Option) *LeaderConnector { - o := defaultOptions() - - for _, option := range options { - option(o) - } - - config := protocol.Config{ - Dial: o.DialFunc, - ConcurrentLeaderConns: o.ConcurrentLeaderConns, - } - pc := protocol.NewConnector(0, store, config, o.LogFunc) - return (*LeaderConnector)(pc) -} - -func (lc *LeaderConnector) Find(ctx context.Context) (*Client, error) { - proto, err := (*protocol.Connector)(lc).ConnectPermitShared(ctx) - if err != nil { - return nil, err - } - return &Client{proto}, nil + return NewLeaderConnector(store, options...).Connect(ctx) } diff --git a/client/leader_test.go b/client/leader_test.go index ddd89a90..e4c3fa2b 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -23,20 +23,20 @@ func TestLeaderConnector(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - connector := client.NewLeaderConnector(store) + connector := client.NewLeaderConnector(store, client.WithPermitShared(true)) - cli, err := connector.Find(ctx) + cli, err := connector.Connect(ctx) require.NoError(t, err) firstProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) - cli, err = connector.Find(ctx) + cli, err = connector.Connect(ctx) require.NoError(t, err) secondProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() // The first leader connection was returned to the leader tracker // when we closed the client, and is returned by the second call - // to FindLeader. + // to Connect. require.Equal(t, firstProto, secondProto) // The reused connection is good to go. @@ -51,7 +51,7 @@ func TestLeaderConnector(t *testing.T) { require.Error(t, err) require.NoError(t, cli.Close()) - cli, err = connector.Find(ctx) + cli, err = connector.Connect(ctx) require.NoError(t, err) thirdProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() require.NoError(t, cli.Close()) diff --git a/driver/driver.go b/driver/driver.go index f4f1f17a..a260104b 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -302,10 +302,9 @@ func (c *Connector) Driver() driver.Driver { // OpenConnector creates a reusable Connector for a specific database. func (d *Driver) OpenConnector(name string) (driver.Connector, error) { - // TODO: generate a client ID. config := d.clientConfig config.ConcurrentLeaderConns = *d.concurrentLeaderConns - pc := protocol.NewConnector(0, d.store, config, d.log) + pc := protocol.NewLeaderConnector(d.store, config, d.log) connector := &Connector{ uri: name, driver: d, diff --git a/internal/protocol/config.go b/internal/protocol/config.go index d18bed1e..f2514fe8 100644 --- a/internal/protocol/config.go +++ b/internal/protocol/config.go @@ -12,10 +12,11 @@ type Config struct { Dial DialFunc // Network dialer. DialTimeout time.Duration // Timeout for establishing a network connection . AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership. - RetryLimit uint // Maximum number of retries, or 0 for unlimited. BackoffFactor time.Duration // Exponential backoff factor for retries. BackoffCap time.Duration // Maximum connection retry backoff value, + RetryLimit uint // Maximum number of retries, or 0 for unlimited. ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. + PermitShared bool } // RetryStrategies returns a configuration for the retry package based on a Config. diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index bcb02eeb..7aee263b 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -70,72 +70,110 @@ func (lt *LeaderTracker) DonateSharedProtocol(proto *Protocol) (accepted bool) { return } -// Connector is in charge of creating a dqlite SQL client connected to the -// current leader of a cluster. type Connector struct { - id uint64 // Conn ID to use when registering against the server. - store NodeStore - lt *LeaderTracker - config Config // Connection parameters. - log logging.Func // Logging function. + clientID uint64 // Conn ID to use when registering against the server. + store NodeStore + nodeID uint64 + nodeAddress string + lt *LeaderTracker + config Config // Connection parameters. + log logging.Func // Logging function. } // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. -func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector { +func NewLeaderConnector(store NodeStore, config Config, log logging.Func) *Connector { if config.Dial == nil { config.Dial = Dial } - if config.DialTimeout == 0 { config.DialTimeout = 5 * time.Second } - if config.AttemptTimeout == 0 { config.AttemptTimeout = 15 * time.Second } - if config.BackoffFactor == 0 { config.BackoffFactor = 100 * time.Millisecond } - if config.BackoffCap == 0 { config.BackoffCap = time.Second } - if config.ConcurrentLeaderConns == 0 { config.ConcurrentLeaderConns = MaxConcurrentLeaderConns } - lt := &LeaderTracker{} - - return &Connector{id, store, lt, config, log} + return &Connector{ + store: store, + lt: &LeaderTracker{}, + config: config, + log: log, + } } -// Connect returns a connection to the cluster leader, possibly recycled from -// the LeaderTracker. -func (c *Connector) ConnectPermitShared(ctx context.Context) (*Protocol, error) { - if sharedProto := c.lt.TakeSharedProtocol(); sharedProto != nil { - if leaderAddr, err := askLeader(ctx, sharedProto); err == nil && sharedProto.addr == leaderAddr { - c.log(logging.Debug, "reusing shared connection to %s", sharedProto.addr) - c.lt.SetLeaderAddr(leaderAddr) - return sharedProto, nil - } - c.log(logging.Debug, "discarding shared connection to %s", sharedProto.addr) - sharedProto.Bad() - sharedProto.Close() +func NewDirectConnector(id uint64, address string, config Config, log logging.Func) *Connector { + if config.Dial == nil { + config.Dial = Dial + } + if config.DialTimeout == 0 { + config.DialTimeout = 5 * time.Second + } + if config.AttemptTimeout == 0 { + config.AttemptTimeout = 15 * time.Second + } + if config.BackoffFactor == 0 { + config.BackoffFactor = 100 * time.Millisecond + } + if config.BackoffCap == 0 { + config.BackoffCap = time.Second + } + if config.ConcurrentLeaderConns == 0 { + config.ConcurrentLeaderConns = MaxConcurrentLeaderConns } - protocol, err := c.Connect(ctx) - if err != nil { - return nil, err + return &Connector{ + nodeID: id, + nodeAddress: address, + lt: &LeaderTracker{}, + config: config, + log: log, } - protocol.lt = c.lt - return protocol, nil } -// Connect returns a new connection to the cluster leader. func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { + if c.nodeID != 0 { + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + conn, err := c.config.Dial(ctx, c.nodeAddress) + if err != nil { + return nil, errors.Wrap(err, "dial") + } + version := VersionOne + protocol, err := Handshake(ctx, conn, version, c.nodeAddress) + if err == errBadProtocol { + c.log(logging.Warn, "unsupported protocol %d, attempt with legacy", version) + version = VersionLegacy + protocol, err = Handshake(ctx, conn, version, c.nodeAddress) + } + if err != nil { + conn.Close() + return nil, errors.Wrap(err, "handshake") + } + return protocol, nil + } + + if c.config.PermitShared { + if sharedProto := c.lt.TakeSharedProtocol(); sharedProto != nil { + if leaderAddr, err := askLeader(ctx, sharedProto); err == nil && sharedProto.addr == leaderAddr { + c.log(logging.Debug, "reusing shared connection to %s", sharedProto.addr) + c.lt.SetLeaderAddr(leaderAddr) + return sharedProto, nil + } + c.log(logging.Debug, "discarding shared connection to %s", sharedProto.addr) + sharedProto.Bad() + sharedProto.Close() + } + } + var protocol *Protocol err := retry.Retry(func(attempt uint) error { log := func(l logging.Level, format string, a ...interface{}) { @@ -169,7 +207,9 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { } c.lt.SetLeaderAddr(protocol.addr) - + if c.config.PermitShared { + protocol.lt = c.lt + } return protocol, nil } @@ -357,7 +397,7 @@ func (c *Connector) connectAttemptOne( response := Message{} response.Init(512) - EncodeClient(&request, c.id) + EncodeClient(&request, c.clientID) if err := protocol.Call(ctx, &request, &response); err != nil { protocol.Close() diff --git a/internal/protocol/connector_test.go b/internal/protocol/connector_test.go index adf06baf..fc46a8e4 100644 --- a/internal/protocol/connector_test.go +++ b/internal/protocol/connector_test.go @@ -24,7 +24,7 @@ func TestConnector_Success(t *testing.T) { store := newStore(t, []string{address}) log, check := newLogFunc(t) - connector := protocol.NewConnector(0, store, protocol.Config{}, log) + connector := protocol.NewLeaderConnector(store, protocol.Config{}, log) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() @@ -53,16 +53,12 @@ func TestConnector_LeaderTracker(t *testing.T) { injectFailure bool returnProto bool expectedLog []string - permitShared bool } injectFailure := func(o *options) { o.injectFailure = true o.expectedLog = append(o.expectedLog, "WARN: attempt 1: server @test-0: context deadline exceeded") } - permitShared := func(o *options) { - o.permitShared = true - } returnProto := func(o *options) { o.returnProto = true } @@ -83,7 +79,7 @@ func TestConnector_LeaderTracker(t *testing.T) { defer cleanup() store := newStore(t, []string{address}) log, checkLog := newLogFunc(t) - connector := protocol.NewConnector(0, store, protocol.Config{RetryLimit: 1}, log) + connector := protocol.NewLeaderConnector(store, protocol.Config{RetryLimit: 1, PermitShared: true}, log) check := func(opts ...func(*options)) *protocol.Protocol { o := &options{} for _, opt := range opts { @@ -95,13 +91,7 @@ func TestConnector_LeaderTracker(t *testing.T) { ctx, cancel = context.WithDeadline(ctx, time.Unix(1, 0)) defer cancel() } - var proto *protocol.Protocol - var err error - if o.permitShared { - proto, err = connector.ConnectPermitShared(ctx) - } else { - proto, err = connector.Connect(ctx) - } + proto, err := connector.Connect(ctx) if o.injectFailure { require.Equal(t, protocol.ErrNoAvailableLeader, err) } else { @@ -119,32 +109,28 @@ func TestConnector_LeaderTracker(t *testing.T) { // INIT -> INIT check(injectFailure) // INIT -> HAVE_ADDR - check(expectFallback) + proto := check(expectFallback, returnProto) + proto.Bad() + assert.NoError(t, proto.Close()) // HAVE_ADDR -> HAVE_ADDR - proto := check(permitShared, expectFast, returnProto) + proto = check(expectFast, returnProto) // We need an extra protocol to trigger INIT->HAVE_CONN later. // Grab one here where it doesn't cause a state transition. - protoForLater := check(permitShared, expectFast, returnProto) + protoForLater := check(expectFast, returnProto) // HAVE_ADDR -> HAVE_BOTH assert.NoError(t, proto.Close()) - // HAVE_BOTH -> HAVE_BOTH - check(expectFast) - // HAVE_BOTH -> HAVE_CONN - check(injectFailure) - // HAVE_CONN -> HAVE_CONN - check(injectFailure) - // HAVE_CONN -> INIT - check(permitShared, expectDiscard, injectFailure) - // INIT -> HAVE_CONN - assert.NoError(t, protoForLater.Close()) - // HAVE_CONN -> HAVE_BOTH - check(expectFallback) + // HAVE_BOTH -> HAVE_ADDR -> HAVE_BOTH + check(expectShared) // HAVE_BOTH -> HAVE_ADDR - proto = check(permitShared, expectShared, returnProto) - proto.Bad() - assert.NoError(t, proto.Close()) + check(expectDiscard, injectFailure) // HAVE_ADDR -> INIT check(injectFailure) + // INIT -> HAVE_CONN + assert.NoError(t, protoForLater.Close()) + // HAVE_CONN -> HAVE_CONN + check(expectShared) + // HAVE_CONN -> INIT + check(expectDiscard, injectFailure) } // The network connection can't be established within the specified number of @@ -155,7 +141,7 @@ func TestConnector_LimitRetries(t *testing.T) { RetryLimit: 2, } log, check := newLogFunc(t) - connector := protocol.NewConnector(0, store, config, log) + connector := protocol.NewLeaderConnector(store, config, log) _, err := connector.Connect(context.Background()) assert.Equal(t, protocol.ErrNoAvailableLeader, err) @@ -175,7 +161,7 @@ func TestConnector_DialTimeout(t *testing.T) { DialTimeout: 50 * time.Millisecond, RetryLimit: 1, } - connector := protocol.NewConnector(0, store, config, log) + connector := protocol.NewLeaderConnector(store, config, log) _, err := connector.Connect(context.Background()) assert.Equal(t, protocol.ErrNoAvailableLeader, err) @@ -190,7 +176,7 @@ func TestConnector_DialTimeout(t *testing.T) { func TestConnector_EmptyNodeStore(t *testing.T) { store := newStore(t, []string{}) log, check := newLogFunc(t) - connector := protocol.NewConnector(0, store, protocol.Config{}, log) + connector := protocol.NewLeaderConnector(store, protocol.Config{}, log) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) defer cancel() @@ -206,7 +192,7 @@ func TestConnector_ContextCanceled(t *testing.T) { store := newStore(t, []string{"1.2.3.4:666"}) log, check := newLogFunc(t) - connector := protocol.NewConnector(0, store, protocol.Config{}, log) + connector := protocol.NewLeaderConnector(store, protocol.Config{}, log) ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond) defer cancel() @@ -230,7 +216,7 @@ func TestConnector_AttemptTimeout(t *testing.T) { AttemptTimeout: 100 * time.Millisecond, RetryLimit: 1, } - connector := protocol.NewConnector(0, store, config, logging.Test(t)) + connector := protocol.NewLeaderConnector(store, config, logging.Test(t)) var conn net.Conn go func() { conn, err = listener.Accept() diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go index 6510b552..bc7a97f7 100644 --- a/internal/protocol/protocol_test.go +++ b/internal/protocol/protocol_test.go @@ -151,7 +151,7 @@ func newProtocol(t *testing.T) (*protocol.Protocol, func()) { AttemptTimeout: 100 * time.Millisecond, BackoffFactor: time.Millisecond, } - connector := protocol.NewConnector(0, store, config, logging.Test(t)) + connector := protocol.NewLeaderConnector(store, config, logging.Test(t)) ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) defer cancel() From b09c87bfe03c3f12fb26250d0272f899fcf322fb Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 18 Oct 2024 01:04:13 -0400 Subject: [PATCH 21/23] Don't expose shared leader connections for now Signed-off-by: Cole Miller --- app/app.go | 1 - client/client.go | 6 ------ client/leader_test.go | 48 ------------------------------------------- 3 files changed, 55 deletions(-) diff --git a/app/app.go b/app/app.go index acd0288e..238324f8 100644 --- a/app/app.go +++ b/app/app.go @@ -245,7 +245,6 @@ func New(dir string, options ...Option) (app *App, err error) { client.WithDialFunc(driverDial), client.WithLogFunc(o.Log), client.WithConcurrentLeaderConns(*o.ConcurrentLeaderConns), - client.WithPermitShared(true), ) app = &App{ diff --git a/client/client.go b/client/client.go index 51d2a0ff..82f3e888 100644 --- a/client/client.go +++ b/client/client.go @@ -51,12 +51,6 @@ func WithConcurrentLeaderConns(maxConns int64) Option { } } -func WithPermitShared(permit bool) Option { - return func(o *options) { - o.PermitShared = permit - } -} - // New creates a new client connected to the dqlite node with the given // address. func New(ctx context.Context, address string, options ...Option) (*Client, error) { diff --git a/client/leader_test.go b/client/leader_test.go index e4c3fa2b..57f9fe40 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -12,54 +12,6 @@ import ( "github.com/stretchr/testify/require" ) -// LeaderConnector recycles connections intelligently. -func TestLeaderConnector(t *testing.T) { - infos, cleanup := setup(t) - defer cleanup() - - store := client.NewInmemNodeStore() - store.Set(context.Background(), infos[:1]) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - connector := client.NewLeaderConnector(store, client.WithPermitShared(true)) - - cli, err := connector.Connect(ctx) - require.NoError(t, err) - firstProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() - require.NoError(t, cli.Close()) - - cli, err = connector.Connect(ctx) - require.NoError(t, err) - secondProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() - - // The first leader connection was returned to the leader tracker - // when we closed the client, and is returned by the second call - // to Connect. - require.Equal(t, firstProto, secondProto) - - // The reused connection is good to go. - err = cli.Add(ctx, infos[1]) - require.NoError(t, err) - - // Trigger an unsuccessful Protocol operation that will cause the - // client's connection to be marked as unsuitable for reuse. - shortCtx, cancel := context.WithDeadline(context.Background(), time.Unix(1, 0)) - cancel() - _, err = cli.Cluster(shortCtx) - require.Error(t, err) - require.NoError(t, cli.Close()) - - cli, err = connector.Connect(ctx) - require.NoError(t, err) - thirdProto := reflect.ValueOf(cli).Elem().FieldByName("protocol").Pointer() - require.NoError(t, cli.Close()) - - // The previous connection was not reused. - require.NotEqual(t, secondProto, thirdProto) -} - func TestMembership(t *testing.T) { infos, cleanup := setup(t) defer cleanup() From 58fc6e538fc58f1d7b46ddcc81a2e09fde9fc638 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 18 Oct 2024 01:22:15 -0400 Subject: [PATCH 22/23] Document client connector Signed-off-by: Cole Miller --- client/client.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/client/client.go b/client/client.go index 82f3e888..5145ad62 100644 --- a/client/client.go +++ b/client/client.go @@ -329,8 +329,16 @@ func defaultOptions() *options { } } +// Connector is a reusable configuration for creating new Clients. +// +// In some cases, Connector.Connect can take advantage of state stored in the +// Connector to be more efficient than New or FindLeader, so prefer to use a +// Connector whenever several Clients need to be created with the same +// parameters. type Connector protocol.Connector +// NewLeaderConnector creates a Connector that will yield Clients connected to +// the cluster leader. func NewLeaderConnector(store NodeStore, options ...Option) *Connector { opts := defaultOptions() for _, o := range options { @@ -345,6 +353,8 @@ func NewLeaderConnector(store NodeStore, options ...Option) *Connector { return (*Connector)(inner) } +// NewDirectConnector creates a Connector that will yield Clients connected to +// the node with the given ID and address. func NewDirectConnector(id uint64, address string, options ...Option) *Connector { opts := defaultOptions() for _, o := range options { @@ -355,6 +365,7 @@ func NewDirectConnector(id uint64, address string, options ...Option) *Connector return (*Connector)(inner) } +// Connect opens a Client based on the Connector's configuration. func (connector *Connector) Connect(ctx context.Context) (*Client, error) { protocol, err := (*protocol.Connector)(connector).Connect(ctx) if err != nil { From cf3c2486b8d3d0f630113774abe57ec2c5a4d532 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 18 Oct 2024 01:29:04 -0400 Subject: [PATCH 23/23] Some more docs Signed-off-by: Cole Miller --- client/database_store.go | 1 - client/leader_test.go | 1 - internal/protocol/connector.go | 9 +++++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/client/database_store.go b/client/database_store.go index 17ef7eb9..c1f250b0 100644 --- a/client/database_store.go +++ b/client/database_store.go @@ -1,4 +1,3 @@ -//go:build !nosqlite3 // +build !nosqlite3 package client diff --git a/client/leader_test.go b/client/leader_test.go index 57f9fe40..e0902453 100644 --- a/client/leader_test.go +++ b/client/leader_test.go @@ -3,7 +3,6 @@ package client_test import ( "context" "fmt" - "reflect" "testing" "time" diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 7aee263b..c4481dec 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -22,6 +22,8 @@ const MaxConcurrentLeaderConns int64 = 10 // DialFunc is a function that can be used to establish a network connection. type DialFunc func(context.Context, string) (net.Conn, error) +// LeaderTracker remembers the address of the cluster leader, and possibly +// holds a reusable connection to it. type LeaderTracker struct { mu sync.RWMutex lastKnownLeaderAddr string @@ -80,8 +82,8 @@ type Connector struct { log logging.Func // Logging function. } -// NewConnector returns a new connector that can be used by a dqlite driver to -// create new clients connected to a leader dqlite server. +// NewConnector returns a Connector that will connect to the current cluster +// leader. func NewLeaderConnector(store NodeStore, config Config, log logging.Func) *Connector { if config.Dial == nil { config.Dial = Dial @@ -110,6 +112,8 @@ func NewLeaderConnector(store NodeStore, config Config, log logging.Func) *Conne } } +// NewDirectConnector returns a Connector that will connect to the node with +// the given ID and address. func NewDirectConnector(id uint64, address string, config Config, log logging.Func) *Connector { if config.Dial == nil { config.Dial = Dial @@ -139,6 +143,7 @@ func NewDirectConnector(id uint64, address string, config Config, log logging.Fu } } +// Connect opens a new Protocol based on the Connector's configuration. func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { if c.nodeID != 0 { ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)