Skip to content

Commit

Permalink
Merge branch 'release-19.0' of https://github.com/vitessio/vitess int…
Browse files Browse the repository at this point in the history
…o release-19.0-github

Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Oct 30, 2024
2 parents ce7168c + 5c08da6 commit 1cc01fc
Show file tree
Hide file tree
Showing 31 changed files with 760 additions and 159 deletions.
20 changes: 19 additions & 1 deletion go/mysql/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestListenerShutdown(t *testing.T) {

l.Shutdown()

assert.EqualValues(t, 1, connRefuse.Get(), "connRefuse")
waitForConnRefuse(t, 1)

err = conn.Ping()
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")
Expand All @@ -1436,6 +1436,24 @@ func TestListenerShutdown(t *testing.T) {
require.Equal(t, "Server shutdown in progress", sqlErr.Message)
}

func waitForConnRefuse(t *testing.T, valWanted int64) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()

for {
select {
case <-ctx.Done():
require.FailNow(t, "connRefuse did not reach %v", valWanted)
case <-tick.C:
if connRefuse.Get() == valWanted {
return
}
}
}
}

func TestParseConnAttrs(t *testing.T) {
expected := map[string]string{
"_client_version": "8.0.11",
Expand Down
16 changes: 16 additions & 0 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,22 @@ func TestConsistentLookupUpdate(t *testing.T) {
require.Empty(t, qr.Rows)
}

func TestSelectMultiEqualLookup(t *testing.T) {
conn, closer := start(t)
defer closer()

utils.Exec(t, conn, "insert into t10 (id, sharding_key, col1) values (1, 1, 'bar'), (2, 1, 'bar'), (3, 1, 'bar'), (4, 2, 'bar'), (5, 2, 'bar')")

for _, workload := range []string{"oltp", "olap"} {
t.Run(workload, func(t *testing.T) {
utils.Exec(t, conn, "set workload = "+workload)

utils.AssertMatches(t, conn, "select id from t10 WHERE (col1, id) IN (('bar', 1), ('baz', 2), ('qux', 3), ('barbar', 4))", "[[INT64(1)]]")
utils.AssertMatches(t, conn, "select id from t10 WHERE (col1 = 'bar' AND id = 1) OR (col1 = 'baz' AND id = 2) OR (col1 = 'qux' AND id = 3) OR (col1 = 'barbar' AND id = 4)", "[[INT64(1)]]")
})
}
}

func TestSelectNullLookup(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@ create table t11
col2 int,
col3 int,
primary key (id)
) Engine = InnoDB;
) Engine = InnoDB;
10 changes: 8 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}),
loadTabletsTrigger: make(chan struct{}, 1),
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -531,7 +531,13 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
hc.loadTabletsTrigger <- struct{}{}
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
select {
case hc.loadTabletsTrigger <- struct{}{}:
default:
}
}
}
}
Expand Down
65 changes: 65 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/test/utils"
querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -682,3 +683,67 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}

// TestDeadlockBetweenTopologyWatcherAndHealthCheck tests the possibility of a deadlock
// between the topology watcher and the health check.
// The issue https://github.com/vitessio/vitess/issues/16994 has more details on the deadlock.
func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// create a new memory topo server and an health check instance.
ts, _ := memorytopo.NewServerAndFactory(ctx, "zone-1")
hc := NewHealthCheck(ctx, time.Hour, time.Hour, ts, "zone-1", "", nil)
defer hc.Close()
defer hc.topoWatchers[0].Stop()

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone-1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Hostname: "host1",
PortMap: map[string]int32{
"grpc": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
err := ts.CreateTablet(ctx, tablet1)
// Run the first loadTablets call to ensure the tablet is present in the topology watcher.
hc.topoWatchers[0].loadTablets()
require.NoError(t, err)

// We want to run updateHealth with arguments that always
// make it trigger load Tablets.
th := &TabletHealth{
Tablet: tablet1,
Target: &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_REPLICA,
},
}
prevTarget := &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_PRIMARY,
}

// If we run the updateHealth function often enough, then we
// will see the deadlock where the topology watcher is trying to replace
// the tablet in the health check, but health check has the mutex acquired
// already because it is calling updateHealth.
// updateHealth itself will be stuck trying to send on the shared channel.
for i := 0; i < 10; i++ {
// Update the port of the tablet so that when update Health asks topo watcher to
// refresh the tablets, it finds an update and tries to replace it.
_, err = ts.UpdateTabletFields(ctx, tablet1.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["testing_port"] = int32(i + 1)
return nil
})
require.NoError(t, err)
hc.updateHealth(th, prevTarget, false, false)
}
}
25 changes: 17 additions & 8 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,6 @@ func (fmd *FakeMysqlDaemon) GetServerUUID(ctx context.Context) (string, error) {
return "000000", nil
}

// CurrentPrimaryPositionLocked is thread-safe.
func (fmd *FakeMysqlDaemon) CurrentPrimaryPositionLocked(pos replication.Position) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.CurrentPrimaryPosition = pos
}

// ReplicationStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ReplicationStatus() (replication.ReplicationStatus, error) {
if fmd.ReplicationStatusError != nil {
Expand All @@ -316,6 +309,8 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (replication.ReplicationStatus,

// PrimaryStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.PrimaryStatusError != nil {
return replication.PrimaryStatus{}, fmd.PrimaryStatusError
}
Expand Down Expand Up @@ -381,7 +376,21 @@ func (fmd *FakeMysqlDaemon) GetPreviousGTIDs(ctx context.Context, binlog string)

// PrimaryPosition is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) PrimaryPosition() (replication.Position, error) {
return fmd.CurrentPrimaryPosition, nil
return fmd.GetPrimaryPositionLocked(), nil
}

// GetPrimaryPositionLocked gets the primary position while holding the lock.
func (fmd *FakeMysqlDaemon) GetPrimaryPositionLocked() replication.Position {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.CurrentPrimaryPosition
}

// SetPrimaryPositionLocked is thread-safe.
func (fmd *FakeMysqlDaemon) SetPrimaryPositionLocked(pos replication.Position) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.CurrentPrimaryPosition = pos
}

// IsReadOnly is part of the MysqlDaemon interface.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12620,7 +12620,7 @@ func TestTabletExternallyReparented(t *testing.T) {
defer func() {
topofactory.SetError(nil)

ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10)
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

resp, err := vtctld.GetTablets(ctx, &vtctldatapb.GetTabletsRequest{})
Expand Down
35 changes: 12 additions & 23 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ func (route *Route) SetTruncateColumnCount(count int) {
func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, route.QueryTimeout)
defer cancelFunc()
qr, err := route.executeInternal(ctx, vcursor, bindVars, wantfields)
rss, bvs, err := route.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}
return qr.Truncate(route.TruncateColumnCount), nil

return route.executeShards(ctx, vcursor, bindVars, wantfields, rss, bvs)
}

// addQueryTimeout adds a query timeout to the context it receives and returns the modified context along with the cancel function.
Expand All @@ -159,20 +160,6 @@ const (
IgnoreReserveTxn cxtKey = iota
)

func (route *Route) executeInternal(
ctx context.Context,
vcursor VCursor,
bindVars map[string]*querypb.BindVariable,
wantfields bool,
) (*sqltypes.Result, error) {
rss, bvs, err := route.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}

return route.executeShards(ctx, vcursor, bindVars, wantfields, rss, bvs)
}

func (route *Route) executeShards(
ctx context.Context,
vcursor VCursor,
Expand Down Expand Up @@ -228,11 +215,15 @@ func (route *Route) executeShards(
}
}

if len(route.OrderBy) == 0 {
return result, nil
if len(route.OrderBy) != 0 {
var err error
result, err = route.sort(result)
if err != nil {
return nil, err
}
}

return route.sort(result)
return result.Truncate(route.TruncateColumnCount), nil
}

func filterOutNilErrors(errs []error) []error {
Expand Down Expand Up @@ -389,10 +380,8 @@ func (route *Route) sort(in *sqltypes.Result) (*sqltypes.Result, error) {
// the contents of any row.
out := in.ShallowCopy()

if err := route.OrderBy.SortResult(out); err != nil {
return nil, err
}
return out.Truncate(route.TruncateColumnCount), nil
err := route.OrderBy.SortResult(out)
return out, err
}

func (route *Route) description() PrimitiveDescription {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/vindex_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (vr *VindexLookup) generateIds(ctx context.Context, vcursor VCursor, bindVa
switch vr.Opcode {
case Equal, EqualUnique:
return []sqltypes.Value{value.Value(vcursor.ConnCollation())}, nil
case IN:
case IN, MultiEqual:
return value.TupleValues(), nil
}
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "opcode %s not supported for VindexLookup", vr.Opcode.String())
Expand Down
Loading

0 comments on commit 1cc01fc

Please sign in to comment.