diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index 3e2d3ab02ec..0005be6cbd2 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -12,6 +12,7 @@
- **[Support for More Efficient JSON Replication](#efficient-json-replication)**
- **[Support for LAST_INSERT_ID(x)](#last-insert-id)**
- **[Support for Maximum Idle Connections in the Pool](#max-idle-connections)**
+ - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
@@ -101,6 +102,11 @@ You can control idle connection retention for the query server’s query pool, s
This feature ensures that, during traffic spikes, idle connections are available for faster responses, while minimizing overhead in low-traffic periods by limiting the number of idle connections retained. It helps strike a balance between performance, efficiency, and cost.
+### Stalled Disk Recovery in VTOrc
+VTOrc can now identify and recover from stalled disk errors. VTTablets test whether the disk is writable and they send this information in the full status output to VTOrc. If the disk is not writable on the primary tablet, VTOrc will attempt to recover the cluster by promoting a new primary. This is useful in scenarios where the disk is stalled and the primary vttablet is unable to accept writes because of it.
+
+To opt into this feature, `--enable-primary-disk-stalled-recovery` flag has to be specified on VTOrc, and `--disk-write-dir` flag has to be specified on the vttablets. `--disk-write-interval` and `--disk-write-timeout` flags can be used to configure the polling interval and timeout respectively.
+
### Support for Filtering Query logs on Error
The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
index 38b0521f142..a5d07f0502a 100644
--- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
+++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
@@ -436,7 +436,7 @@ State: {{.State}}
RowsCompared: {{.RowsCompared}}
HasMismatch: {{.HasMismatch}}
StartedAt: {{.StartedAt}}
-{{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}}
+{{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}}
{{if .CompletedAt}}CompletedAt: {{.CompletedAt}}{{end}}
{{range $table := .TableSummaryMap}}
Table {{$table.TableName}}:
diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go
index e27c57f47be..cf713d79579 100644
--- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go
+++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go
@@ -680,6 +680,75 @@ func TestVDiffSharded(t *testing.T) {
}
}
+func TestVDiffTextTemplate(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ env := newTestVDiffEnv(t, ctx, []string{"0"}, []string{"0"}, "", nil)
+ defer env.close()
+
+ now := time.Now()
+ UUID := uuid.New().String()
+ req := &tabletmanagerdatapb.VDiffRequest{
+ Keyspace: env.targetKeyspace,
+ Workflow: env.workflow,
+ Action: string(vdiff.ShowAction),
+ ActionArg: UUID,
+ }
+ starttime := now.UTC().Format(vdiff.TimestampFormat)
+
+ testCases := []struct {
+ id string
+ res *sqltypes.Result
+ report string
+ }{{
+ id: "1",
+ res: sqltypes.MakeTestResult(fields,
+ "started||t1|"+UUID+"|started|300|"+starttime+"|30||0|"+
+ `{"TableName": "t1", "MatchingRows": 30, "ProcessedRows": 30, "MismatchedRows": 0, "ExtraRowsSource": 0, `+
+ `"ExtraRowsTarget": 0}`),
+ report: fmt.Sprintf(`
+VDiff Summary for targetks.vdiffTest (%s)
+State: started
+RowsCompared: 30
+HasMismatch: false
+StartedAt: %s
+Progress: 10.00%%, ETA: %s
+
+Table t1:
+ State: started
+ ProcessedRows: 30
+ MatchingRows: 30
+
+Use "--format=json" for more detailed output.
+
+`, UUID, starttime, starttime),
+ }}
+
+ for _, tc := range testCases {
+ t.Run(tc.id, func(t *testing.T) {
+ res := &tabletmanagerdatapb.VDiffResponse{
+ Id: 1,
+ Output: sqltypes.ResultToProto3(tc.res),
+ }
+ env.tmc.setVDResults(env.tablets[200].tablet, req, res)
+ req := &vtctldatapb.VDiffShowRequest{
+ TargetKeyspace: env.targetKeyspace,
+ Workflow: env.workflow,
+ Arg: UUID,
+ }
+
+ resp, err := env.ws.VDiffShow(context.Background(), req)
+ require.NoError(t, err)
+ vds, err := displayShowSingleSummary(env.out, "text", env.targetKeyspace, env.workflow, UUID, resp, true)
+ require.NoError(t, err)
+ require.Equal(t, vdiff.StartedState, vds)
+
+ require.Equal(t, tc.report, env.getOutput())
+ env.resetOutput()
+ })
+ }
+}
+
func TestGetStructNames(t *testing.T) {
type s struct {
A string
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index b6786b29487..d6ad6ad015a 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -102,6 +102,9 @@ Flags:
--ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct")
--default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY)
--degraded_threshold duration replication lag after which a replica is considered degraded (default 30s)
+ --disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled
+ --disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s)
+ --disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-consolidator Synonym to -enable_consolidator (default true)
--enable-consolidator-replicas Synonym to -enable_consolidator_replicas
diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt
index c2799a72dc1..ca8083709e5 100644
--- a/go/flags/endtoend/vtorc.txt
+++ b/go/flags/endtoend/vtorc.txt
@@ -33,6 +33,7 @@ Flags:
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--emit_stats If set, emit stats to push-based monitoring and stats backends
+ --enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index ab6b32ba4c3..7965e14f88a 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -133,6 +133,9 @@ Flags:
--dba_idle_timeout duration Idle timeout for dba connections (default 1m0s)
--dba_pool_size int Size of the connection pool for dba connections (default 20)
--degraded_threshold duration replication lag after which a replica is considered degraded (default 30s)
+ --disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled
+ --disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s)
+ --disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-consolidator Synonym to -enable_consolidator (default true)
--enable-consolidator-replicas Synonym to -enable_consolidator_replicas
diff --git a/go/mysql/capabilities/capability.go b/go/mysql/capabilities/capability.go
index 4015059e686..eac25585089 100644
--- a/go/mysql/capabilities/capability.go
+++ b/go/mysql/capabilities/capability.go
@@ -31,25 +31,26 @@ var (
type FlavorCapability int
const (
- NoneFlavorCapability FlavorCapability = iota // default placeholder
- FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
- TransactionalGtidExecutedFlavorCapability //
- InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
- InstantAddLastColumnFlavorCapability //
- InstantAddDropVirtualColumnFlavorCapability //
- InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
- InstantChangeColumnDefaultFlavorCapability //
- InstantExpandEnumCapability //
- InstantChangeColumnVisibilityCapability //
- MySQLUpgradeInServerFlavorCapability //
- DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
- DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
- CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
- PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
- InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
- ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
- BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
- RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
+ NoneFlavorCapability FlavorCapability = iota // default placeholder
+ FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
+ TransactionalGtidExecutedFlavorCapability //
+ InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
+ InstantAddLastColumnFlavorCapability //
+ InstantAddDropVirtualColumnFlavorCapability //
+ InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
+ InstantChangeColumnDefaultFlavorCapability //
+ InstantExpandEnumCapability //
+ InstantChangeColumnVisibilityCapability //
+ MySQLUpgradeInServerFlavorCapability //
+ DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
+ DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
+ CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
+ PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
+ PerformanceSchemaMetadataLocksTableCapability // supported in MySQL 8.0.2 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html
+ InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
+ ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
+ BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
+ RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
)
type CapableOf func(capability FlavorCapability) (bool, error)
@@ -97,6 +98,8 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability
return atLeast(8, 0, 0)
case PerformanceSchemaDataLocksTableCapability:
return atLeast(8, 0, 1)
+ case PerformanceSchemaMetadataLocksTableCapability:
+ return atLeast(8, 0, 2)
case MySQLUpgradeInServerFlavorCapability:
return atLeast(8, 0, 16)
case CheckConstraintsCapability:
diff --git a/go/mysql/capabilities/capability_test.go b/go/mysql/capabilities/capability_test.go
index aeb18bed22e..cf5e693840e 100644
--- a/go/mysql/capabilities/capability_test.go
+++ b/go/mysql/capabilities/capability_test.go
@@ -218,6 +218,25 @@ func TestMySQLVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: true,
+ }, {
+ version: "5.7.38",
+ capability: PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: false,
+ },
+ {
+ version: "8.0",
+ capability: PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: false,
+ },
+ {
+ version: "8.0.1",
+ capability: PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: false,
+ },
+ {
+ version: "8.0.2",
+ capability: PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: true,
},
{
version: "8.0.29",
diff --git a/go/mysql/collations/colldata/cached_size.go b/go/mysql/collations/colldata/cached_size.go
index b348baaaed8..190e1731651 100644
--- a/go/mysql/collations/colldata/cached_size.go
+++ b/go/mysql/collations/colldata/cached_size.go
@@ -19,6 +19,10 @@ package colldata
import hack "vitess.io/vitess/go/hack"
+type cachedObject interface {
+ CachedSize(alloc bool) int64
+}
+
func (cached *eightbitWildcard) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
@@ -59,6 +63,10 @@ func (cached *unicodeWildcard) CachedSize(alloc bool) int64 {
if alloc {
size += int64(48)
}
+ // field charset vitess.io/vitess/go/mysql/collations/charset.Charset
+ if cc, ok := cached.charset.(cachedObject); ok {
+ size += cc.CachedSize(true)
+ }
// field pattern []rune
{
size += hack.RuntimeAllocSize(int64(cap(cached.pattern)) * int64(4))
diff --git a/go/mysql/flavor_test.go b/go/mysql/flavor_test.go
index 3d584b8293b..219b9803933 100644
--- a/go/mysql/flavor_test.go
+++ b/go/mysql/flavor_test.go
@@ -102,6 +102,15 @@ func TestServerVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: capabilities.PerformanceSchemaDataLocksTableCapability,
isCapable: true,
+ }, {
+ version: "5.7.38",
+ capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: false,
+ },
+ {
+ version: "8.0.20",
+ capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
+ isCapable: true,
},
{
// Some ridiculous version
diff --git a/go/sqltypes/cached_size.go b/go/sqltypes/cached_size.go
index 632c8249455..53bc407278d 100644
--- a/go/sqltypes/cached_size.go
+++ b/go/sqltypes/cached_size.go
@@ -37,6 +37,14 @@ func (cached *Result) CachedSize(alloc bool) int64 {
// field Rows []vitess.io/vitess/go/sqltypes.Row
{
size += hack.RuntimeAllocSize(int64(cap(cached.Rows)) * int64(24))
+ for _, elem := range cached.Rows {
+ {
+ size += hack.RuntimeAllocSize(int64(cap(elem)) * int64(32))
+ for _, elem := range elem {
+ size += elem.CachedSize(false)
+ }
+ }
+ }
}
// field SessionStateChanges string
size += hack.RuntimeAllocSize(int64(len(cached.SessionStateChanges)))
diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
index 5f6423b2556..8547431ddd3 100644
--- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
+++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
@@ -714,6 +714,89 @@ func testScheduler(t *testing.T) {
}
})
})
+ t.Run("force_cutover mdl", func(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5)
+ defer cancel()
+
+ t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait
+
+ t.Run("wait for t1 running", func(t *testing.T) {
+ status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
+ fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
+ })
+ t.Run("wait for t1 ready to complete", func(t *testing.T) {
+ // Waiting for 'running', above, is not enough. We want to let vreplication a chance to start running, or else
+ // we attempt the cut-over too early. Specifically in this test, we're going to lock rows FOR UPDATE, which,
+ // if vreplication does not get the chance to start, will prevent it from doing anything at all.
+ // ready_to_complete is a great signal for us that vreplication is healthy and up to date.
+ waitForReadyToComplete(t, t1uuid, true)
+ })
+
+ conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true)
+ require.NoError(t, err)
+ defer conn.Close()
+
+ unlockTables := func() error {
+ _, err := conn.ExecuteFetch("unlock tables", 0, false)
+ return err
+ }
+ t.Run("locking table", func(t *testing.T) {
+ _, err := conn.ExecuteFetch("lock tables t1_test write", 0, false)
+ require.NoError(t, err)
+ })
+ defer unlockTables()
+ t.Run("injecting heartbeats asynchronously", func(t *testing.T) {
+ go func() {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ for {
+ throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil)
+ select {
+ case <-ticker.C:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ })
+ t.Run("check no force_cutover", func(t *testing.T) {
+ rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
+ require.NotNil(t, rs)
+ for _, row := range rs.Named().Rows {
+ forceCutOver := row.AsInt64("force_cutover", 0)
+ assert.Equal(t, int64(0), forceCutOver) // disabled
+ }
+ })
+ t.Run("attempt to complete", func(t *testing.T) {
+ onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
+ })
+ t.Run("cut-over fail due to timeout", func(t *testing.T) {
+ waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded")
+ status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
+ fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
+ onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
+ })
+ t.Run("force_cutover", func(t *testing.T) {
+ onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true)
+ })
+ t.Run("check force_cutover", func(t *testing.T) {
+ rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
+ require.NotNil(t, rs)
+ for _, row := range rs.Named().Rows {
+ forceCutOver := row.AsInt64("force_cutover", 0)
+ assert.Equal(t, int64(1), forceCutOver) // enabled
+ }
+ })
+ t.Run("expect completion", func(t *testing.T) {
+ status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
+ fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
+ onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
+ })
+ t.Run("expect unlock failure", func(t *testing.T) {
+ err := unlockTables()
+ assert.ErrorContains(t, err, "broken pipe")
+ })
+ })
}
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
diff --git a/go/tools/sizegen/sizegen.go b/go/tools/sizegen/sizegen.go
index 7ecd50e3d8c..17b155ad3f4 100644
--- a/go/tools/sizegen/sizegen.go
+++ b/go/tools/sizegen/sizegen.go
@@ -163,6 +163,8 @@ func (sizegen *sizegen) generateTyp(tt types.Type) {
sizegen.generateKnownType(tt)
case *types.Alias:
sizegen.generateTyp(types.Unalias(tt))
+ default:
+ panic(fmt.Sprintf("unhandled type: %v (%T)", tt, tt))
}
}
@@ -490,9 +492,11 @@ func (sizegen *sizegen) sizeStmtForType(fieldName *jen.Statement, field types.Ty
// assume that function pointers do not allocate (although they might, if they're closures)
return nil, 0
+ case *types.Alias:
+ return sizegen.sizeStmtForType(fieldName, types.Unalias(node), alloc)
+
default:
- log.Printf("unhandled type: %T", node)
- return nil, 0
+ panic(fmt.Sprintf("unhandled type: %v (%T)", node, node))
}
}
diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go
index c48905be948..cfe1ba2e964 100644
--- a/go/vt/discovery/tablet_picker.go
+++ b/go/vt/discovery/tablet_picker.go
@@ -387,8 +387,13 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err)
return nil
}
- if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
- aliases = append(aliases, si.PrimaryAlias)
+
+ // It is possible that there is a cluster event (ERS/PRS, for example) due to which
+ // there is no primary elected for the shard at the moment.
+ if si.PrimaryAlias != nil {
+ if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
+ aliases = append(aliases, si.PrimaryAlias)
+ }
}
} else {
actualCells := make([]string, 0)
@@ -425,6 +430,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
continue
}
for _, node := range sri.Nodes {
+ if node.TabletAlias == nil {
+ continue
+ }
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go
index 76a8828afec..27c4d8bf7b1 100644
--- a/go/vt/discovery/tablet_picker_test.go
+++ b/go/vt/discovery/tablet_picker_test.go
@@ -62,6 +62,29 @@ func TestPickPrimary(t *testing.T) {
assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want)
}
+// TestPickNoPrimary confirms that if the picker was setup only for primary tablets but
+// there is no primary setup for the shard we correctly return an error.
+func TestPickNoPrimary(t *testing.T) {
+ defer utils.EnsureNoLeaks(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"})
+ want := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, "cell", true, true)
+ defer deleteTablet(t, te, want)
+ _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
+ si.PrimaryAlias = nil // force a missing primary
+ return nil
+ })
+ require.NoError(t, err)
+
+ tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{})
+ require.NoError(t, err)
+
+ _, err = tp.PickForStreaming(ctx)
+ require.Errorf(t, err, "No healthy serving tablet")
+}
+
func TestPickLocalPreferences(t *testing.T) {
defer utils.EnsureNoLeaks(t)
type tablet struct {
diff --git a/go/vt/proto/query/cached_size.go b/go/vt/proto/query/cached_size.go
index 5b613317294..4436594681a 100644
--- a/go/vt/proto/query/cached_size.go
+++ b/go/vt/proto/query/cached_size.go
@@ -27,6 +27,10 @@ func (cached *BindVariable) CachedSize(alloc bool) int64 {
if alloc {
size += int64(96)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Value []byte
{
size += hack.RuntimeAllocSize(int64(cap(cached.Value)))
@@ -48,6 +52,10 @@ func (cached *Field) CachedSize(alloc bool) int64 {
if alloc {
size += int64(160)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Name string
size += hack.RuntimeAllocSize(int64(len(cached.Name)))
// field Table string
@@ -70,6 +78,10 @@ func (cached *QueryWarning) CachedSize(alloc bool) int64 {
if alloc {
size += int64(64)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Message string
size += hack.RuntimeAllocSize(int64(len(cached.Message)))
return size
@@ -82,6 +94,10 @@ func (cached *Target) CachedSize(alloc bool) int64 {
if alloc {
size += int64(96)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Keyspace string
size += hack.RuntimeAllocSize(int64(len(cached.Keyspace)))
// field Shard string
@@ -98,6 +114,10 @@ func (cached *Value) CachedSize(alloc bool) int64 {
if alloc {
size += int64(80)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Value []byte
{
size += hack.RuntimeAllocSize(int64(cap(cached.Value)))
diff --git a/go/vt/proto/topodata/cached_size.go b/go/vt/proto/topodata/cached_size.go
index 94b7fc6818c..3feead01bae 100644
--- a/go/vt/proto/topodata/cached_size.go
+++ b/go/vt/proto/topodata/cached_size.go
@@ -27,6 +27,10 @@ func (cached *KeyRange) CachedSize(alloc bool) int64 {
if alloc {
size += int64(96)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Start []byte
{
size += hack.RuntimeAllocSize(int64(cap(cached.Start)))
@@ -45,6 +49,10 @@ func (cached *ThrottledAppRule) CachedSize(alloc bool) int64 {
if alloc {
size += int64(80)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
// field Name string
size += hack.RuntimeAllocSize(int64(len(cached.Name)))
// field ExpiresAt *vitess.io/vitess/go/vt/proto/vttime.Time
diff --git a/go/vt/proto/vttime/cached_size.go b/go/vt/proto/vttime/cached_size.go
index 62a6366ba3c..f2b69dbefae 100644
--- a/go/vt/proto/vttime/cached_size.go
+++ b/go/vt/proto/vttime/cached_size.go
@@ -17,6 +17,8 @@ limitations under the License.
package vttime
+import hack "vitess.io/vitess/go/hack"
+
func (cached *Time) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
@@ -25,5 +27,9 @@ func (cached *Time) CachedSize(alloc bool) int64 {
if alloc {
size += int64(64)
}
+ // field unknownFields google.golang.org/protobuf/runtime/protoimpl.UnknownFields
+ {
+ size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields)))
+ }
return size
}
diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go
index 10ba787a3c1..f2c9ab81d67 100644
--- a/go/vt/topo/tablet.go
+++ b/go/vt/topo/tablet.go
@@ -492,6 +492,9 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.
)
for _, tabletAlias := range tabletAliases {
+ if tabletAlias == nil {
+ return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "nil tablet alias in list")
+ }
wg.Add(1)
go func(tabletAlias *topodatapb.TabletAlias) {
defer wg.Done()
diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go
index cafff5acce8..db367673aeb 100644
--- a/go/vt/vtorc/config/config.go
+++ b/go/vt/vtorc/config/config.go
@@ -174,6 +174,15 @@ var (
Dynamic: true,
},
)
+
+ enablePrimaryDiskStalledRecovery = viperutil.Configure(
+ "enable-primary-disk-stalled-recovery",
+ viperutil.Options[bool]{
+ FlagName: "enable-primary-disk-stalled-recovery",
+ Default: false,
+ Dynamic: true,
+ },
+ )
)
func init() {
@@ -197,6 +206,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.Duration("recovery-poll-duration", recoveryPollDuration.Default(), "Timer duration on which VTOrc polls its database to run a recovery")
fs.Bool("allow-emergency-reparent", ersEnabled.Default(), "Whether VTOrc should be allowed to run emergency reparent operation when it detects a dead primary")
fs.Bool("change-tablets-with-errant-gtid-to-drained", convertTabletsWithErrantGTIDs.Default(), "Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED")
+ fs.Bool("enable-primary-disk-stalled-recovery", enablePrimaryDiskStalledRecovery.Default(), "Whether VTOrc should detect a stalled disk on the primary and failover")
viperutil.BindFlags(fs,
instancePollTime,
@@ -214,6 +224,7 @@ func registerFlags(fs *pflag.FlagSet) {
recoveryPollDuration,
ersEnabled,
convertTabletsWithErrantGTIDs,
+ enablePrimaryDiskStalledRecovery,
)
}
@@ -332,6 +343,11 @@ func SetConvertTabletWithErrantGTIDs(val bool) {
convertTabletsWithErrantGTIDs.Set(val)
}
+// GetStalledDiskPrimaryRecovery reports whether VTOrc is allowed to check for and recovery stalled disk problems.
+func GetStalledDiskPrimaryRecovery() bool {
+ return enablePrimaryDiskStalledRecovery.Get()
+}
+
// MarkConfigurationLoaded is called once configuration has first been loaded.
// Listeners on ConfigurationLoaded will get a notification
func MarkConfigurationLoaded() {
diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go
index 21375fb8eb3..8baa9a12476 100644
--- a/go/vt/vtorc/db/generate_base.go
+++ b/go/vt/vtorc/db/generate_base.go
@@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
+ is_disk_stalled TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
`
diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go
index fa2e1a4ec95..6a800e5ee0b 100644
--- a/go/vt/vtorc/inst/analysis.go
+++ b/go/vt/vtorc/inst/analysis.go
@@ -56,6 +56,7 @@ const (
LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis"
LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary"
ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected"
+ PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled"
)
type StructureAnalysisCode string
@@ -129,6 +130,7 @@ type ReplicationAnalysis struct {
MaxReplicaGTIDMode string
MaxReplicaGTIDErrant string
IsReadOnly bool
+ IsDiskStalled bool
}
func (replicationAnalysis *ReplicationAnalysis) MarshalJSON() ([]byte, error) {
diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go
index 7837955c541..d487973b0f0 100644
--- a/go/vt/vtorc/inst/analysis_dao.go
+++ b/go/vt/vtorc/inst/analysis_dao.go
@@ -79,7 +79,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
vitess_keyspace.durability_policy AS durability_policy,
vitess_shard.primary_timestamp AS shard_primary_term_timestamp,
primary_instance.read_only AS read_only,
- MIN(primary_instance.gtid_errant) AS gtid_errant,
+ MIN(primary_instance.gtid_errant) AS gtid_errant,
MIN(primary_instance.alias) IS NULL AS is_invalid,
MIN(primary_instance.binary_log_file) AS binary_log_file,
MIN(primary_instance.binary_log_pos) AS binary_log_pos,
@@ -233,7 +233,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
COUNT(
DISTINCT case when replica_instance.log_bin
AND replica_instance.log_replica_updates then replica_instance.major_version else NULL end
- ) AS count_distinct_logging_major_versions
+ ) AS count_distinct_logging_major_versions,
+ primary_instance.is_disk_stalled != 0 AS is_disk_stalled
FROM
vitess_tablet
JOIN vitess_keyspace ON (
@@ -354,6 +355,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.HeartbeatInterval = m.GetFloat64("heartbeat_interval")
a.IsReadOnly = m.GetUint("read_only") == 1
+ a.IsDiskStalled = m.GetBool("is_disk_stalled")
if !a.LastCheckValid {
analysisMessage := fmt.Sprintf("analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v",
@@ -401,6 +403,10 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
} else if isInvalid {
a.Analysis = InvalidReplica
a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown"
+ } else if a.IsClusterPrimary && !a.LastCheckValid && a.IsDiskStalled {
+ a.Analysis = PrimaryDiskStalled
+ a.Description = "Primary has a stalled disk"
+ ca.hasClusterwideAction = true
} else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
a.Analysis = DeadPrimaryWithoutReplicas
a.Description = "Primary cannot be reached by vtorc and has no replica"
diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go
index ae4f7279403..baa1121b776 100644
--- a/go/vt/vtorc/inst/analysis_dao_test.go
+++ b/go/vt/vtorc/inst/analysis_dao_test.go
@@ -34,10 +34,10 @@ var (
// The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here.
// This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica.
initialSQL = []string{
- `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0);`,
- `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0);`,
- `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2);`,
- `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false);`,
+ `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false);`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
@@ -96,6 +96,29 @@ func TestGetReplicationAnalysisDecision(t *testing.T) {
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimaryTabletDeleted,
+ }, {
+ name: "StalledDiskPrimary",
+ info: []*test.InfoForRecoveryAnalysis{{
+ TabletInfo: &topodatapb.Tablet{
+ Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
+ Hostname: "localhost",
+ Keyspace: "ks",
+ Shard: "0",
+ Type: topodatapb.TabletType_PRIMARY,
+ MysqlHostname: "localhost",
+ MysqlPort: 6709,
+ },
+ DurabilityPolicy: "none",
+ LastCheckValid: 0,
+ CountReplicas: 4,
+ CountValidReplicas: 4,
+ CountValidReplicatingReplicas: 0,
+ IsPrimary: 1,
+ IsStalledDisk: 1,
+ }},
+ keyspaceWanted: "ks",
+ shardWanted: "0",
+ codeWanted: PrimaryDiskStalled,
}, {
name: "DeadPrimary",
info: []*test.InfoForRecoveryAnalysis{{
diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go
index fef1e90acce..b7b097bb14d 100644
--- a/go/vt/vtorc/inst/instance.go
+++ b/go/vt/vtorc/inst/instance.go
@@ -91,6 +91,7 @@ type Instance struct {
IsUpToDate bool
IsRecentlyChecked bool
SecondsSinceLastSeen sql.NullInt64
+ StalledDisk bool
AllowTLS bool
diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go
index 9198514d6ed..f92a15079dd 100644
--- a/go/vt/vtorc/inst/instance_dao.go
+++ b/go/vt/vtorc/inst/instance_dao.go
@@ -175,6 +175,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
var tablet *topodatapb.Tablet
var fs *replicationdatapb.FullStatus
readingStartTime := time.Now()
+ stalledDisk := false
instance := NewInstance()
instanceFound := false
partialSuccess := false
@@ -205,6 +206,9 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
fs, err = fullStatus(tabletAlias)
if err != nil {
+ if config.GetStalledDiskPrimaryRecovery() && strings.Contains(err.Error(), "stalled disk") {
+ stalledDisk = true
+ }
goto Cleanup
}
partialSuccess = true // We at least managed to read something from the server.
@@ -381,9 +385,10 @@ Cleanup:
// Something is wrong, could be network-wise. Record that we
// tried to check the instance. last_attempted_check is also
- // updated on success by writeInstance.
+ // updated on success by writeInstance. If the reason is a
+ // stalled disk, we can record that as well.
latency.Start("backend")
- _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess)
+ _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess, stalledDisk)
latency.Stop("backend")
return nil, err
}
@@ -874,6 +879,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
"semi_sync_primary_clients",
"semi_sync_replica_status",
"last_discovery_latency",
+ "is_disk_stalled",
}
values := make([]string, len(columns))
@@ -953,6 +959,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
args = append(args, instance.SemiSyncPrimaryClients)
args = append(args, instance.SemiSyncReplicaStatus)
args = append(args, instance.LastDiscoveryLatency.Nanoseconds())
+ args = append(args, instance.StalledDisk)
}
sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore)
@@ -998,16 +1005,18 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError
// UpdateInstanceLastChecked updates the last_check timestamp in the vtorc backed database
// for a given instance
-func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error {
+func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledDisk bool) error {
writeFunc := func() error {
_, err := db.ExecVTOrc(`UPDATE database_instance
SET
last_checked = DATETIME('now'),
- last_check_partial_success = ?
+ last_check_partial_success = ?,
+ is_disk_stalled = ?
WHERE
alias = ?
`,
partialSuccess,
+ stalledDisk,
tabletAlias,
)
if err != nil {
diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go
index 1a14041450c..c3b99455741 100644
--- a/go/vt/vtorc/inst/instance_dao_test.go
+++ b/go/vt/vtorc/inst/instance_dao_test.go
@@ -64,13 +64,13 @@ func TestMkInsertSingle(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT,
FULL, false, false, , 0, , 0, 0, 0,
- false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,`
+ false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,`
sql1, args1, err := mkInsertForInstances(instances[:1], false, true)
require.NoError(t, err)
@@ -87,16 +87,16 @@ func TestMkInsertThree(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a3 := `
- zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
- zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
- zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
+ zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
+ zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
+ zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
`
sql3, args3, err := mkInsertForInstances(instances[:3], true, true)
@@ -483,9 +483,9 @@ func TestReadOutdatedInstanceKeys(t *testing.T) {
tabletAliases, err := ReadOutdatedInstanceKeys()
- errInDataCollection := db.QueryVTOrcRowsMap(`select alias,
-last_checked,
-last_attempted_check,
+ errInDataCollection := db.QueryVTOrcRowsMap(`select alias,
+last_checked,
+last_attempted_check,
ROUND((JULIANDAY(DATETIME('now')) - JULIANDAY(last_checked)) * 86400) AS difference,
last_attempted_check <= last_checked as use1,
last_checked < DATETIME('now', '-1500 second') as is_outdated1,
@@ -507,22 +507,32 @@ func TestUpdateInstanceLastChecked(t *testing.T) {
name string
tabletAlias string
partialSuccess bool
+ stalledDisk bool
conditionToCheck string
}{
{
name: "Verify updated last checked",
tabletAlias: "zone1-0000000100",
partialSuccess: false,
- conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false",
+ stalledDisk: false,
+ conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = false",
}, {
name: "Verify partial success",
tabletAlias: "zone1-0000000100",
partialSuccess: true,
- conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = true",
+ stalledDisk: false,
+ conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true and is_disk_stalled = false",
+ }, {
+ name: "Verify stalled disk",
+ tabletAlias: "zone1-0000000100",
+ partialSuccess: false,
+ stalledDisk: true,
+ conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = true",
}, {
name: "Verify no error on unknown tablet",
tabletAlias: "unknown tablet",
partialSuccess: true,
+ stalledDisk: true,
},
}
@@ -537,7 +547,7 @@ func TestUpdateInstanceLastChecked(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess)
+ err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess, tt.stalledDisk)
require.NoError(t, err)
if tt.conditionToCheck != "" {
diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go
index 0d0bbff5b53..ab41d1fa988 100644
--- a/go/vt/vtorc/logic/topology_recovery.go
+++ b/go/vt/vtorc/logic/topology_recovery.go
@@ -285,7 +285,7 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl
func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction {
switch analysisCode {
// primary
- case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas:
+ case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled:
// If ERS is disabled, we have no way of repairing the cluster.
if !config.ERSEnabled() {
log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode)
diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go
index f7658060b95..ca164d78836 100644
--- a/go/vt/vtorc/logic/topology_recovery_test.go
+++ b/go/vt/vtorc/logic/topology_recovery_test.go
@@ -42,6 +42,11 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) {
prevAnalysisCode: inst.DeadPrimary,
newAnalysisCode: inst.DeadPrimaryAndSomeReplicas,
shouldBeEqual: true,
+ }, {
+ // DeadPrimary and StalledDiskPrimary have the same recovery
+ prevAnalysisCode: inst.DeadPrimary,
+ newAnalysisCode: inst.PrimaryDiskStalled,
+ shouldBeEqual: true,
}, {
// DeadPrimary and PrimaryTabletDeleted are different recoveries.
prevAnalysisCode: inst.DeadPrimary,
@@ -215,6 +220,16 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) {
ersEnabled: false,
analysisCode: inst.DeadPrimary,
wantRecoveryFunction: noRecoveryFunc,
+ }, {
+ name: "StalledDiskPrimary with ERS enabled",
+ ersEnabled: true,
+ analysisCode: inst.PrimaryDiskStalled,
+ wantRecoveryFunction: recoverDeadPrimaryFunc,
+ }, {
+ name: "StalledDiskPrimary with ERS disabled",
+ ersEnabled: false,
+ analysisCode: inst.PrimaryDiskStalled,
+ wantRecoveryFunction: noRecoveryFunc,
}, {
name: "PrimaryTabletDeleted with ERS enabled",
ersEnabled: true,
diff --git a/go/vt/vtorc/test/recovery_analysis.go b/go/vt/vtorc/test/recovery_analysis.go
index 218a679bdb0..bb6e4132243 100644
--- a/go/vt/vtorc/test/recovery_analysis.go
+++ b/go/vt/vtorc/test/recovery_analysis.go
@@ -80,6 +80,7 @@ type InfoForRecoveryAnalysis struct {
MaxReplicaGTIDMode string
MaxReplicaGTIDErrant string
ReadOnly uint
+ IsStalledDisk uint
}
func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap {
@@ -145,6 +146,7 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap {
rowMap["semi_sync_replica_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncReplicaEnabled), Valid: true}
res, _ := prototext.Marshal(info.TabletInfo)
rowMap["tablet_info"] = sqlutils.CellData{String: string(res), Valid: true}
+ rowMap["is_disk_stalled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.IsStalledDisk), Valid: true}
return rowMap
}
diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go
index ce81a2dd516..76c7af7fc2e 100644
--- a/go/vt/vttablet/onlineddl/executor.go
+++ b/go/vt/vttablet/onlineddl/executor.go
@@ -838,34 +838,41 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
}
capableOf := mysql.ServerVersionCapableOf(conn.ServerVersion)
- capable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability)
- if err != nil {
- return err
- }
- if capable {
- {
- // Kill connections that have open transactions locking the table. These potentially (probably?) are not
- // actively running a query on our table. They're doing other things while holding locks on our table.
- query, err := sqlparser.ParseAndBind(sqlProcessWithLocksOnTable, sqltypes.StringBindVariable(tableName))
- if err != nil {
- return err
- }
- rs, err := conn.Conn.ExecuteFetch(query, -1, true)
+ terminateTransactions := func(capability capabilities.FlavorCapability, query string, column string, description string) error {
+ capable, err := capableOf(capability)
+ if err != nil {
+ return err
+ }
+ if !capable {
+ return nil
+ }
+ query, err = sqlparser.ParseAndBind(query, sqltypes.StringBindVariable(tableName))
+ if err != nil {
+ return err
+ }
+ rs, err := conn.Conn.ExecuteFetch(query, -1, true)
+ if err != nil {
+ return vterrors.Wrapf(err, "finding transactions locking table `%s` %s", tableName, description)
+ }
+ log.Infof("terminateTransactions: found %v transactions locking table `%s` %s", len(rs.Rows), tableName, description)
+ for _, row := range rs.Named().Rows {
+ threadId := row.AsInt64(column, 0)
+ log.Infof("terminateTransactions: killing connection %v with transaction locking table `%s` %s", threadId, tableName, description)
+ killConnection := fmt.Sprintf("KILL %d", threadId)
+ _, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
- return vterrors.Wrapf(err, "finding transactions locking table")
- }
- log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows))
- for _, row := range rs.Named().Rows {
- threadId := row.AsInt64("trx_mysql_thread_id", 0)
- log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId)
- killConnection := fmt.Sprintf("KILL %d", threadId)
- _, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
- if err != nil {
- log.Errorf("Unable to kill the connection %d: %v", threadId, err)
- }
+ log.Errorf("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v", threadId, tableName, description, err)
}
}
+ return nil
+ }
+ if err := terminateTransactions(capabilities.PerformanceSchemaDataLocksTableCapability, sqlProcessWithLocksOnTable, "trx_mysql_thread_id", "data"); err != nil {
+ return err
}
+ if err := terminateTransactions(capabilities.PerformanceSchemaMetadataLocksTableCapability, sqlProcessWithMetadataLocksOnTable, "processlist_id", "metadata"); err != nil {
+ return err
+ }
+
return nil
}
diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go
index 6ba0217519c..6c0bff1086f 100644
--- a/go/vt/vttablet/onlineddl/schema.go
+++ b/go/vt/vttablet/onlineddl/schema.go
@@ -567,6 +567,15 @@ const (
where
data_locks.OBJECT_SCHEMA=database() AND data_locks.OBJECT_NAME=%a
`
+ sqlProcessWithMetadataLocksOnTable = `
+ SELECT
+ DISTINCT threads.processlist_id
+ from
+ performance_schema.metadata_locks
+ join performance_schema.threads on (metadata_locks.OWNER_THREAD_ID=threads.THREAD_ID)
+ where
+ metadata_locks.OBJECT_SCHEMA=database() AND metadata_locks.OBJECT_NAME=%a
+ `
)
var (
diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor.go b/go/vt/vttablet/tabletmanager/disk_health_monitor.go
new file mode 100644
index 00000000000..e35bc662a12
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/disk_health_monitor.go
@@ -0,0 +1,131 @@
+package tabletmanager
+
+import (
+ "context"
+ "os"
+ "path"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type DiskHealthMonitor interface {
+ // IsDiskStalled returns true if the disk is stalled or rejecting writes.
+ IsDiskStalled() bool
+}
+
+func newDiskHealthMonitor(ctx context.Context) DiskHealthMonitor {
+ if stalledDiskWriteDir == "" {
+ return newNoopDiskHealthMonitor()
+ }
+
+ return newPollingDiskHealthMonitor(ctx, attemptFileWrite, stalledDiskWriteInterval, stalledDiskWriteTimeout)
+}
+
+type writeFunction func() error
+
+func attemptFileWrite() error {
+ file, err := os.Create(path.Join(stalledDiskWriteDir, ".stalled_disk_check"))
+ if err != nil {
+ return err
+ }
+ _, err = file.WriteString(strconv.FormatInt(time.Now().UnixNano(), 10))
+ if err != nil {
+ return err
+ }
+ err = file.Sync()
+ if err != nil {
+ return err
+ }
+ return file.Close()
+}
+
+type pollingDiskHealthMonitor struct {
+ stalledMutex sync.RWMutex
+ stalled bool
+ writeInProgressMutex sync.RWMutex
+ writeInProgress bool
+ writeFunc writeFunction
+ pollingInterval time.Duration
+ writeTimeout time.Duration
+}
+
+var _ DiskHealthMonitor = &pollingDiskHealthMonitor{}
+
+func newPollingDiskHealthMonitor(ctx context.Context, writeFunc writeFunction, pollingInterval, writeTimeout time.Duration) *pollingDiskHealthMonitor {
+ fs := &pollingDiskHealthMonitor{
+ stalledMutex: sync.RWMutex{},
+ stalled: false,
+ writeInProgressMutex: sync.RWMutex{},
+ writeInProgress: false,
+ writeFunc: writeFunc,
+ pollingInterval: pollingInterval,
+ writeTimeout: writeTimeout,
+ }
+ go fs.poll(ctx)
+ return fs
+}
+
+func (fs *pollingDiskHealthMonitor) poll(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(fs.pollingInterval):
+ if fs.isWriteInProgress() {
+ continue
+ }
+
+ ch := make(chan error, 1)
+ go func() {
+ fs.setIsWriteInProgress(true)
+ err := fs.writeFunc()
+ fs.setIsWriteInProgress(false)
+ ch <- err
+ }()
+
+ select {
+ case <-time.After(fs.writeTimeout):
+ fs.setIsDiskStalled(true)
+ case err := <-ch:
+ fs.setIsDiskStalled(err != nil)
+ }
+ }
+ }
+}
+
+func (fs *pollingDiskHealthMonitor) IsDiskStalled() bool {
+ fs.stalledMutex.RLock()
+ defer fs.stalledMutex.RUnlock()
+ return fs.stalled
+}
+
+func (fs *pollingDiskHealthMonitor) setIsDiskStalled(isStalled bool) {
+ fs.stalledMutex.Lock()
+ defer fs.stalledMutex.Unlock()
+ fs.stalled = isStalled
+}
+
+func (fs *pollingDiskHealthMonitor) isWriteInProgress() bool {
+ fs.writeInProgressMutex.RLock()
+ defer fs.writeInProgressMutex.RUnlock()
+ return fs.writeInProgress
+}
+
+func (fs *pollingDiskHealthMonitor) setIsWriteInProgress(isInProgress bool) {
+ fs.writeInProgressMutex.Lock()
+ defer fs.writeInProgressMutex.Unlock()
+ fs.writeInProgress = isInProgress
+}
+
+type noopDiskHealthMonitor struct{}
+
+var _ DiskHealthMonitor = &noopDiskHealthMonitor{}
+
+func newNoopDiskHealthMonitor() DiskHealthMonitor {
+ return &noopDiskHealthMonitor{}
+}
+
+func (fs *noopDiskHealthMonitor) IsDiskStalled() bool {
+ return false
+}
diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go
new file mode 100644
index 00000000000..68930f3061d
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go
@@ -0,0 +1,103 @@
+package tabletmanager
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestDiskHealthMonitor_noStall(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 {
+ t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled {
+ t.Fatalf("expected isStalled to be false")
+ }
+}
+
+func TestDiskHealthMonitor_stallAndRecover(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{sequencedWriteFunctions: []writeFunction{delayedWriteFunction(10*time.Millisecond, nil), delayedWriteFunction(300*time.Millisecond, nil)}}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 2 {
+ t.Fatalf("expected 2 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled {
+ t.Fatalf("expected isStalled to be true")
+ }
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls < 5 {
+ t.Fatalf("expected at least 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled {
+ t.Fatalf("expected isStalled to be false")
+ }
+}
+
+func TestDiskHealthMonitor_stallDetected(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{defaultWriteFunction: delayedWriteFunction(10*time.Millisecond, errors.New("test error"))}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 {
+ t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled {
+ t.Fatalf("expected isStalled to be true")
+ }
+}
+
+type sequencedMockWriter struct {
+ defaultWriteFunction writeFunction
+ sequencedWriteFunctions []writeFunction
+
+ totalCreateCalls int
+ totalCreateCallsMutex sync.RWMutex
+}
+
+func (smw *sequencedMockWriter) mockWriteFunction() error {
+ functionIndex := smw.getTotalCreateCalls()
+ smw.incrementTotalCreateCalls()
+
+ if functionIndex >= len(smw.sequencedWriteFunctions) {
+ if smw.defaultWriteFunction != nil {
+ return smw.defaultWriteFunction()
+ }
+ return delayedWriteFunction(10*time.Millisecond, nil)()
+ }
+
+ return smw.sequencedWriteFunctions[functionIndex]()
+}
+
+func (smw *sequencedMockWriter) incrementTotalCreateCalls() {
+ smw.totalCreateCallsMutex.Lock()
+ defer smw.totalCreateCallsMutex.Unlock()
+ smw.totalCreateCalls += 1
+}
+
+func (smw *sequencedMockWriter) getTotalCreateCalls() int {
+ smw.totalCreateCallsMutex.RLock()
+ defer smw.totalCreateCallsMutex.RUnlock()
+ return smw.totalCreateCalls
+}
+
+func delayedWriteFunction(delay time.Duration, err error) writeFunction {
+ return func() error {
+ time.Sleep(delay)
+ return err
+ }
+}
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 47794e92b9a..b27b25d87c6 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -18,6 +18,7 @@ package tabletmanager
import (
"context"
+ "errors"
"fmt"
"runtime"
"strings"
@@ -60,6 +61,13 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
+
+ // Return error if the disk is stalled or rejecting writes.
+ // Noop by default, must be enabled with the flag "disk-write-dir".
+ if tm.dhMonitor.IsDiskStalled() {
+ return nil, errors.New("stalled disk")
+ }
+
// Server ID - "select @@global.server_id"
serverID, err := tm.MysqlDaemon.GetServerID(ctx)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go
index fbef04de357..c22ea0a6e51 100644
--- a/go/vt/vttablet/tabletmanager/tm_init.go
+++ b/go/vt/vttablet/tabletmanager/tm_init.go
@@ -95,8 +95,11 @@ var (
skipBuildInfoTags = "/.*/"
initTags flagutil.StringMapValue
- initTimeout = 1 * time.Minute
- mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
+ initTimeout = 1 * time.Minute
+ mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
+ stalledDiskWriteDir = ""
+ stalledDiskWriteTimeout = 30 * time.Second
+ stalledDiskWriteInterval = 5 * time.Second
)
func registerInitFlags(fs *pflag.FlagSet) {
@@ -109,6 +112,9 @@ func registerInitFlags(fs *pflag.FlagSet) {
fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet")
fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.")
fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.")
+ fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
+ fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
+ fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
}
var (
@@ -164,6 +170,7 @@ type TabletManager struct {
VREngine *vreplication.Engine
VDiffEngine *vdiff.Engine
Env *vtenv.Environment
+ dhMonitor DiskHealthMonitor
// tmc is used to run an RPC against other vttablets.
tmc tmclient.TabletManagerClient
@@ -372,6 +379,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
tm.tmc = tmclient.NewTabletManagerClient()
tm.tmState = newTMState(tm, tablet)
tm.actionSema = semaphore.NewWeighted(1)
+ tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx)
tm._waitForGrantsComplete = make(chan struct{})
tm.baseTabletType = tablet.Type