diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 3195008f..c8e3ed7a 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -7,6 +7,7 @@ import ( "io" "net" "sort" + "sync" "time" "github.com/Rican7/retry" @@ -14,6 +15,7 @@ import ( "github.com/Rican7/retry/strategy" "github.com/canonical/go-dqlite/logging" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) // DialFunc is a function that can be used to establish a network connection. @@ -126,60 +128,101 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P return servers[i].Role < servers[j].Role }) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sem := semaphore.NewWeighted(10) + + protocolChan := make(chan *Protocol) + + wg := &sync.WaitGroup{} + wg.Add(len(servers)) + + go func() { + wg.Wait() + close(protocolChan) + }() + // Make an attempt for each address until we find the leader. for _, server := range servers { - log := func(l logging.Level, format string, a ...interface{}) { - format = fmt.Sprintf("server %s: ", server.Address) + format - log(l, format, a...) - } + go func(server NodeInfo, pc chan<- *Protocol) { + defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + if err := sem.Acquire(ctx, 1); err != nil { + return + } + defer sem.Release(1) - protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) - if err != nil { - // This server is unavailable, try with the next target. - log(logging.Warn, err.Error()) - continue - } - if protocol != nil { - // We found the leader + if ctx.Err() != nil { + return + } + + log := func(l logging.Level, format string, a ...interface{}) { + format = fmt.Sprintf("server %s: ", server.Address) + format + log(l, format, a...) + } + + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + + protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) + if err != nil { + // This server is unavailable, try with the next target. + log(logging.Warn, err.Error()) + return + } + if protocol != nil { + // We found the leader + log(logging.Debug, "connected") + pc <- protocol + return + } + if leader == "" { + // This server does not know who the current leader is, + // try with the next target. + log(logging.Warn, "no known leader") + return + } + + // If we get here, it means this server reported that another + // server is the leader, let's close the connection to this + // server and try with the suggested one. + log(logging.Debug, "connect to reported leader %s", leader) + + ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + + protocol, _, err = c.connectAttemptOne(ctx, leader, log) + if err != nil { + // The leader reported by the previous server is + // unavailable, try with the next target. + log(logging.Warn, "reported leader unavailable err=%v", err) + return + } + if protocol == nil { + // The leader reported by the target server does not consider itself + // the leader, try with the next target. + log(logging.Warn, "reported leader server is not the leader") + return + } log(logging.Debug, "connected") - return protocol, nil - } - if leader == "" { - // This server does not know who the current leader is, - // try with the next target. - log(logging.Warn, "no known leader") - continue - } + pc <- protocol + }(server, protocolChan) + } - // If we get here, it means this server reported that another - // server is the leader, let's close the connection to this - // server and try with the suggested one. - log(logging.Debug, "connect to reported leader %s", leader) + // Read from protocol chan, cancel context + protocol, ok := <-protocolChan + if !ok { + return nil, ErrNoAvailableLeader + } - ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + cancel() - protocol, _, err = c.connectAttemptOne(ctx, leader, log) - if err != nil { - // The leader reported by the previous server is - // unavailable, try with the next target. - log(logging.Warn, "reported leader unavailable err=%v", err) - continue - } - if protocol == nil { - // The leader reported by the target server does not consider itself - // the leader, try with the next target. - log(logging.Warn, "reported leader server is not the leader") - continue - } - log(logging.Debug, "connected") - return protocol, nil + for extra := range protocolChan { + extra.Close() } - return nil, ErrNoAvailableLeader + return protocol, nil } // Perform the initial handshake using the given protocol version.