diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 5669e0a5e54a..6792a7d8530a 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1477,7 +1477,10 @@
APPLICATION | logical_replication.replicated_time_seconds | The replicated time of the logical replication stream in seconds since the unix epoch. | Seconds | GAUGE | SECONDS | AVG | NONE |
APPLICATION | logical_replication.retry_queue_bytes | The replicated time of the logical replication stream in seconds since the unix epoch. | Bytes | GAUGE | BYTES | AVG | NONE |
APPLICATION | logical_replication.retry_queue_events | The replicated time of the logical replication stream in seconds since the unix epoch. | Events | GAUGE | COUNT | AVG | NONE |
+APPLICATION | obs.tablemetadata.update_job.duration | Time spent running the update table metadata job. | Duration | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | obs.tablemetadata.update_job.errors | The total number of errors that have been emitted from the update table metadata job. | Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | obs.tablemetadata.update_job.runs | The total number of runs of the update table metadata job. | Executions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | obs.tablemetadata.update_job.table_updates | The total number of rows that have been updated in system.table_metadata | Rows Updated | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | physical_replication.admit_latency | Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | physical_replication.commit_latency | Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | physical_replication.cutover_progress | The number of ranges left to revert in order to complete an inflight cutover | Ranges | GAUGE | COUNT | AVG | NONE |
diff --git a/pkg/cli/clisqlexec/BUILD.bazel b/pkg/cli/clisqlexec/BUILD.bazel
index 2892e84f7c91..359969b019c5 100644
--- a/pkg/cli/clisqlexec/BUILD.bazel
+++ b/pkg/cli/clisqlexec/BUILD.bazel
@@ -39,6 +39,7 @@ go_test(
name = "clisqlexec_test",
srcs = [
"format_html_test.go",
+ "format_sql_test.go",
"format_table_test.go",
"format_value_test.go",
"main_test.go",
diff --git a/pkg/cli/clisqlexec/format_sql.go b/pkg/cli/clisqlexec/format_sql.go
index 891938d80d5f..e4c5d41022d3 100644
--- a/pkg/cli/clisqlexec/format_sql.go
+++ b/pkg/cli/clisqlexec/format_sql.go
@@ -43,7 +43,7 @@ func (p *sqlReporter) iter(w, _ io.Writer, _ int, row []string) error {
fmt.Fprint(w, "INSERT INTO results VALUES (")
for i, r := range row {
var buf bytes.Buffer
- lexbase.EncodeSQLStringWithFlags(&buf, r, lexbase.EncNoDoubleEscapeQuotes)
+ lexbase.EncodeSQLStringWithFlags(&buf, r, lexbase.EncNoFlags)
fmt.Fprint(w, buf.String())
if i < len(row)-1 {
fmt.Fprint(w, ", ")
diff --git a/pkg/cli/clisqlexec/format_sql_test.go b/pkg/cli/clisqlexec/format_sql_test.go
new file mode 100644
index 000000000000..df5c0f67ffdd
--- /dev/null
+++ b/pkg/cli/clisqlexec/format_sql_test.go
@@ -0,0 +1,71 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the CockroachDB Software License
+// included in the /LICENSE file.
+
+package clisqlexec_test
+
+import "github.com/cockroachdb/cockroach/pkg/cli"
+
+func Example_json_sql_format() {
+ c := cli.NewCLITest(cli.TestCLIParams{})
+ defer c.Cleanup()
+
+ testData := []string{
+ `e'{"a": "bc"}'`,
+ `e'{"a": "b\u0099c"}'`,
+ `e'{"a": "b\\"c"}'`,
+ `'"there are \"quotes\" in this json string"'`,
+ `'""'`,
+ `'{}'`,
+ }
+
+ for _, s := range testData {
+ query := `SELECT ` + s + `::json`
+ c.RunWithArgs([]string{"sql", "--format=sql", "-e", query})
+ }
+
+ // Output:
+ // sql --format=sql -e SELECT e'{"a": "bc"}'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES ('{"a": "bc"}');
+ // -- 1 row
+ // sql --format=sql -e SELECT e'{"a": "b\u0099c"}'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES (e'{"a": "b\\u0099c"}');
+ // -- 1 row
+ // sql --format=sql -e SELECT e'{"a": "b\\"c"}'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES (e'{"a": "b\\"c"}');
+ // -- 1 row
+ // sql --format=sql -e SELECT '"there are \"quotes\" in this json string"'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES (e'"there are \\"quotes\\" in this json string"');
+ // -- 1 row
+ // sql --format=sql -e SELECT '""'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES ('""');
+ // -- 1 row
+ // sql --format=sql -e SELECT '{}'::json
+ // CREATE TABLE results (
+ // jsonb STRING
+ // );
+ //
+ // INSERT INTO results VALUES ('{}');
+ // -- 1 row
+}
diff --git a/pkg/cli/clisqlexec/format_value.go b/pkg/cli/clisqlexec/format_value.go
index 91c89622e879..a93b79d64a41 100644
--- a/pkg/cli/clisqlexec/format_value.go
+++ b/pkg/cli/clisqlexec/format_value.go
@@ -11,6 +11,8 @@ import (
"strings"
"unicode"
"unicode/utf8"
+
+ "github.com/cockroachdb/cockroach/pkg/sql/lexbase"
)
func isNotPrintableASCII(r rune) bool { return r < 0x20 || r > 0x7e || r == '"' || r == '\\' }
@@ -40,10 +42,11 @@ func FormatVal(val driver.Value, showPrintableUnicode bool, showNewLinesAndTabs
return t
}
}
- s := fmt.Sprintf("%+q", t)
+ s := lexbase.EscapeSQLString(t)
+ // The result from EscapeSQLString is an escape-quoted string, like e'...'.
// Strip the start and final quotes. The surrounding display
// format (e.g. CSV/TSV) will add its own quotes.
- return s[1 : len(s)-1]
+ return s[2 : len(s)-1]
}
// Fallback to printing the value as-is.
diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go
index 2f103cb5d4c2..c1c661236c92 100644
--- a/pkg/server/api_v2_databases_metadata_test.go
+++ b/pkg/server/api_v2_databases_metadata_test.go
@@ -748,7 +748,6 @@ func TestTriggerMetadataUpdateJob(t *testing.T) {
defer close(jobReadyChan)
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
-
Knobs: base.TestingKnobs{
TableMetadata: &tablemetadatacache_util.TestingKnobs{
OnJobReady: func() {
diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go
index f09088c1507a..2d27243774f6 100644
--- a/pkg/server/server_sql.go
+++ b/pkg/server/server_sql.go
@@ -1146,11 +1146,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
if tableStatsKnobs := cfg.TestingKnobs.TableStatsKnobs; tableStatsKnobs != nil {
tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs)
}
-
if tableMetadataKnobs := cfg.TestingKnobs.TableMetadata; tableMetadataKnobs != nil {
execCfg.TableMetadataKnobs = tableMetadataKnobs.(*tablemetadatacacheutil.TestingKnobs)
-
}
+
// Set up internal memory metrics for use by internal SQL executors.
// Don't add them to the registry now because it will be added as part of pgServer metrics.
sqlMemMetrics := sql.MakeMemMetrics("sql", cfg.HistogramWindowInterval())
diff --git a/pkg/sql/lexbase/encode.go b/pkg/sql/lexbase/encode.go
index 7a19d82827a8..748e3fac042d 100644
--- a/pkg/sql/lexbase/encode.go
+++ b/pkg/sql/lexbase/encode.go
@@ -42,10 +42,6 @@ const (
// without wrapping quotes.
EncBareIdentifiers
- // EncNoDoubleEscapeQuotes indicates that backslashes will not be
- // escaped when they are used as escape quotes.
- EncNoDoubleEscapeQuotes
-
// EncFirstFreeFlagBit needs to remain unused; it is used as base
// bit offset for tree.FmtFlags.
EncFirstFreeFlagBit
@@ -144,7 +140,6 @@ func EncodeSQLStringWithFlags(buf *bytes.Buffer, in string, flags EncodeFlags) {
start := 0
escapedString := false
bareStrings := flags.HasFlags(EncBareStrings)
- noDoubleEscapeQuotes := flags.HasFlags(EncNoDoubleEscapeQuotes)
// Loop through each unicode code point.
for i, r := range in {
if i < start {
@@ -165,9 +160,6 @@ func EncodeSQLStringWithFlags(buf *bytes.Buffer, in string, flags EncodeFlags) {
buf.WriteString("e'") // begin e'xxx' string
escapedString = true
}
- if noDoubleEscapeQuotes && i+1 < len(in) && in[i:i+2] == "\\\"" {
- continue
- }
buf.WriteString(in[start:i])
ln := utf8.RuneLen(r)
diff --git a/pkg/sql/lexbase/encode_test.go b/pkg/sql/lexbase/encode_test.go
index a6870a65666b..a8fbfd3f7544 100644
--- a/pkg/sql/lexbase/encode_test.go
+++ b/pkg/sql/lexbase/encode_test.go
@@ -112,28 +112,6 @@ func testEncodeString(t *testing.T, input []byte, encode func(*bytes.Buffer, str
return stmt
}
-func TestEncodeSQLStringWithNoDoubleEscapeQuotes(t *testing.T) {
- testCases := []struct {
- input string
- output string
- }{
- // (GH issue #107518)
- {`\"`, `e'\"'`},
- {`{"a": "b\u0099c"}`, `e'{"a": "b\\u0099c"}'`},
- {`{\"a\": \"b\u0099c\"}`, `e'{\"a\": \"b\\u0099c\"}'`},
- }
-
- for _, tc := range testCases {
- var buf bytes.Buffer
- lexbase.EncodeSQLStringWithFlags(&buf, tc.input, lexbase.EncNoDoubleEscapeQuotes)
- out := buf.String()
-
- if out != tc.output {
- t.Errorf("`%s`: expected `%s`, got `%s`", tc.input, tc.output, out)
- }
- }
-}
-
func BenchmarkEncodeSQLString(b *testing.B) {
str := strings.Repeat("foo", 10000)
for i := 0; i < b.N; i++ {
diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel
index a19ae735faff..50c4a992871a 100644
--- a/pkg/sql/tablemetadatacache/BUILD.bazel
+++ b/pkg/sql/tablemetadatacache/BUILD.bazel
@@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache",
visibility = ["//visibility:public"],
deps = [
+ "//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
@@ -20,6 +21,7 @@ go_library(
"//pkg/sql/isql",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
+ "//pkg/sql/tablemetadatacache/util",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
@@ -48,9 +50,12 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
+ "//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
+ "//pkg/sql",
"//pkg/sql/isql",
+ "//pkg/sql/tablemetadatacache/util",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
@@ -59,8 +64,6 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
- "//pkg/util/syncutil",
- "//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater.go b/pkg/sql/tablemetadatacache/table_metadata_updater.go
index 2c86793309cc..c7dd261c854f 100644
--- a/pkg/sql/tablemetadatacache/table_metadata_updater.go
+++ b/pkg/sql/tablemetadatacache/table_metadata_updater.go
@@ -10,22 +10,27 @@ import (
"context"
gojson "encoding/json"
"fmt"
+ "math"
"time"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
const pruneBatchSize = 512
// tableMetadataUpdater encapsulates the logic for updating the table metadata cache.
type tableMetadataUpdater struct {
- ie isql.Executor
- // upsertQuery is the query used to upsert table metadata rows,
- // it is reused for each batch to avoid allocations between batches.
- upsertQuery *tableMetadataBatchUpsertQuery
+ ie isql.Executor
+ metrics *TableMetadataUpdateJobMetrics
+ updateProgress func(ctx context.Context, progress float32)
+ testKnobs *tablemetadatacacheutil.TestingKnobs
}
// tableMetadataDetails contains additional details for a table_metadata row that doesn't
@@ -40,16 +45,52 @@ type tableMetadataDetails struct {
StatsLastUpdated *time.Time `json:"stats_last_updated"`
}
+var _ tablemetadatacacheutil.ITableMetadataUpdater = &tableMetadataUpdater{}
+
// newTableMetadataUpdater creates a new tableMetadataUpdater.
-func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {
- return &tableMetadataUpdater{ie: ie, upsertQuery: newTableMetadataBatchUpsertQuery(tableBatchSize)}
+var newTableMetadataUpdater = func(
+ job *jobs.Job,
+ metrics *TableMetadataUpdateJobMetrics,
+ ie isql.Executor,
+ testKnobs *tablemetadatacacheutil.TestingKnobs) *tableMetadataUpdater {
+ return &tableMetadataUpdater{
+ ie: ie,
+ metrics: metrics,
+ updateProgress: func(ctx context.Context, progress float32) {
+ updateProgress(ctx, job, progress)
+ },
+ testKnobs: testKnobs,
+ }
+}
+
+// RunUpdater implements tablemetadatacacheutil.ITableMetadataUpdater
+func (u *tableMetadataUpdater) RunUpdater(ctx context.Context) error {
+ u.metrics.NumRuns.Inc(1)
+ sw := timeutil.NewStopWatch()
+ sw.Start()
+ if _, err := u.pruneCache(ctx); err != nil {
+ log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
+ }
+ rowsUpdated, err := u.updateCache(ctx)
+ sw.Stop()
+ u.metrics.Duration.RecordValue(sw.Elapsed().Nanoseconds())
+ u.metrics.UpdatedTables.Inc(int64(rowsUpdated))
+ return err
}
// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, err error) {
+ // upsertQuery is the query used to upsert table metadata rows,
+ // it is reused for each batch to avoid allocations between batches.
+ upsert := newTableMetadataBatchUpsertQuery(tableBatchSize)
it := newTableMetadataBatchIterator(u.ie)
-
+ estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
+ if err != nil {
+ log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error())
+ }
+ estimatedBatches := int(math.Ceil(float64(estimatedRowsToUpdate) / float64(tableBatchSize)))
+ batchNum := 0
for {
more, err := it.fetchNextBatch(ctx, tableBatchSize)
if err != nil {
@@ -57,6 +98,7 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er
// https://github.com/cockroachdb/cockroach/issues/130040. For now,
// we can't continue because the page key is in an invalid state.
log.Errorf(ctx, "failed to fetch next batch: %s", err.Error())
+ u.metrics.Errors.Inc(1)
return updated, err
}
@@ -64,14 +106,22 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er
// No more results.
break
}
-
- count, err := u.upsertBatch(ctx, it.batchRows)
+ batchNum++
+ count, err := u.upsertBatch(ctx, it.batchRows, upsert)
if err != nil {
// If an upsert fails, let's just continue to the next batch for now.
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
+ u.metrics.Errors.Inc(1)
continue
}
+
updated += count
+ if batchNum == estimatedBatches {
+ u.updateProgress(ctx, .99)
+ } else if batchNum%batchesPerProgressUpdate == 0 && estimatedBatches > 0 {
+ progress := float32(updated) / float32(estimatedRowsToUpdate)
+ u.updateProgress(ctx, progress)
+ }
}
return updated, err
@@ -116,13 +166,12 @@ RETURNING table_id`, pruneBatchSize)
// upsertBatch upserts the given batch of table metadata rows returning
// the number of rows upserted and any error that occurred.
func (u *tableMetadataUpdater) upsertBatch(
- ctx context.Context, batch []tableMetadataIterRow,
+ ctx context.Context, batch []tableMetadataIterRow, upsertQuery *tableMetadataBatchUpsertQuery,
) (int, error) {
- u.upsertQuery.resetForBatch()
-
+ upsertQuery.resetForBatch()
upsertBatchSize := 0
for _, row := range batch {
- if err := u.upsertQuery.addRow(ctx, &row); err != nil {
+ if err := upsertQuery.addRow(ctx, &row); err != nil {
log.Errorf(ctx, "failed to add row to upsert batch: %s", err.Error())
continue
}
@@ -138,11 +187,27 @@ func (u *tableMetadataUpdater) upsertBatch(
"batch-upsert-table-metadata",
nil, // txn
sessiondata.NodeUserWithBulkLowPriSessionDataOverride,
- u.upsertQuery.getQuery(),
- u.upsertQuery.args...,
+ upsertQuery.getQuery(),
+ upsertQuery.args...,
)
}
+func (u *tableMetadataUpdater) getRowsToUpdateCount(ctx context.Context) (int, error) {
+ query := fmt.Sprintf(`
+SELECT count(*)::INT
+FROM system.namespace t
+JOIN system.namespace d ON t."parentID" = d.id
+%s
+WHERE d."parentID" = 0 and t."parentSchemaID" != 0
+`, u.testKnobs.GetAOSTClause())
+ row, err := u.ie.QueryRow(ctx, "get-table-metadata-row-count", nil, query)
+ if err != nil {
+ return 0, err
+ }
+
+ return int(tree.MustBeDInt(row[0])), nil
+}
+
type tableMetadataBatchUpsertQuery struct {
stmt bytes.Buffer
args []interface{}
diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
index ce1aeca05b8b..eaac9b8d2c43 100644
--- a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
+++ b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
@@ -8,17 +8,24 @@ package tablemetadatacache
import (
"context"
"fmt"
+ "math"
"testing"
"github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/security/username"
+ "github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
+ tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
+ "github.com/stretchr/testify/require"
)
// TestDataDrivenTableMetadataCacheUpdater tests the operations performed by
@@ -29,12 +36,32 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
- s := serverutils.StartServerOnly(t, base.TestServerArgs{})
+ knobs := tablemetadatacacheutil.CreateTestingKnobs()
+ s := serverutils.StartServerOnly(t, base.TestServerArgs{
+ Knobs: base.TestingKnobs{
+ TableMetadata: knobs,
+ },
+ })
defer s.Stopper().Stop(ctx)
queryConn := s.ApplicationLayer().SQLConn(t)
s.ApplicationLayer().DB()
+ jobRegistry := s.JobRegistry().(*jobs.Registry)
+ jobId := jobRegistry.MakeJobID()
+ // Create a new job so that we don't have to wait for the existing one be claimed
+ jr := jobs.Record{
+ JobID: jobId,
+ Description: jobspb.TypeUpdateTableMetadataCache.String(),
+ Details: jobspb.UpdateTableMetadataCacheDetails{},
+ Progress: jobspb.UpdateTableMetadataCacheProgress{},
+ CreatedBy: &jobs.CreatedByInfo{Name: username.NodeUser, ID: username.NodeUserID},
+ Username: username.NodeUserName(),
+ NonCancelable: true,
+ }
+ job, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobId, nil)
+ metrics := newTableMetadataUpdateJobMetrics().(TableMetadataUpdateJobMetrics)
+ require.NoError(t, err)
datadriven.Walk(t, datapathutils.TestDataPath(t, ""), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
@@ -49,14 +76,14 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
}
return res
case "update-cache":
- updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
+ updater := newTableMetadataUpdater(job, &metrics, s.InternalExecutor().(isql.Executor), knobs)
updated, err := updater.updateCache(ctx)
if err != nil {
return err.Error()
}
return fmt.Sprintf("updated %d table(s)", updated)
case "prune-cache":
- updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
+ updater := newTableMetadataUpdater(job, &metrics, s.InternalExecutor().(isql.Executor), knobs)
pruned, err := updater.pruneCache(ctx)
if err != nil {
return err.Error()
@@ -79,3 +106,59 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
})
}
+
+func TestTableMetadataUpdateJobProgressAndMetrics(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ var currentProgress float32
+ // Server setup.
+ knobs := tablemetadatacacheutil.CreateTestingKnobs()
+ s := serverutils.StartServerOnly(t, base.TestServerArgs{
+ Knobs: base.TestingKnobs{
+ TableMetadata: knobs,
+ },
+ })
+ defer s.Stopper().Stop(ctx)
+ conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))
+ metrics := newTableMetadataUpdateJobMetrics().(TableMetadataUpdateJobMetrics)
+ count := 0
+ updater := tableMetadataUpdater{
+ ie: s.ExecutorConfig().(sql.ExecutorConfig).InternalDB.Executor(),
+ metrics: &metrics,
+ updateProgress: func(ctx context.Context, progress float32) {
+ require.Greater(t, progress, currentProgress, "progress should be greater than current progress")
+ currentProgress = progress
+ count++
+ },
+ testKnobs: knobs,
+ }
+ require.NoError(t, updater.RunUpdater(ctx))
+ updatedTables := metrics.UpdatedTables.Count()
+ require.Greater(t, updatedTables, int64(0))
+ // Since there are only system tables in the cluster, the threshold to call updateProgress isn't met.
+ // Update progress will only be called once.
+ require.Equal(t, 1, count)
+ require.Equal(t, int64(0), metrics.Errors.Count())
+ require.Equal(t, float32(.99), currentProgress)
+ require.Equal(t, int64(1), metrics.NumRuns.Count())
+ require.Greater(t, metrics.Duration.CumulativeSnapshot().Mean(), float64(0))
+ count = 0
+ currentProgress = 0
+
+ // generate 500 random tables
+ conn.Exec(t,
+ `SELECT crdb_internal.generate_test_objects('{"names": "random_table_", "counts": [500], "randomize_columns": true}'::JSONB)`)
+ require.NoError(t, updater.RunUpdater(ctx))
+ // The updated tables metric doesn't reset between runs, so it should increase by updatedTables + 500, because 500
+ // random tables were previously generated
+ expectedTablesUpdated := (updatedTables * 2) + 500
+ require.Equal(t, expectedTablesUpdated, metrics.UpdatedTables.Count())
+ estimatedBatches := int(math.Ceil(float64(expectedTablesUpdated) / float64(tableBatchSize)))
+ estimatedProgressUpdates := estimatedBatches / batchesPerProgressUpdate
+ require.GreaterOrEqual(t, count, estimatedProgressUpdates)
+ require.Equal(t, int64(0), metrics.Errors.Count())
+ require.Equal(t, float32(.99), currentProgress)
+ require.Equal(t, int64(2), metrics.NumRuns.Count())
+}
diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
index 7f6a3a029f39..5211199e2179 100644
--- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
+++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
@@ -9,11 +9,13 @@ import (
"context"
"fmt"
+ "github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
+ tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -21,28 +23,43 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)
-// updateJobExecFn specifies the function that is run on each iteration of the
-// table metadata update job. It can be overriden in tests.
-var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache
+const (
+ // batchesPerProgressUpdate is used to determine how many batches
+ // should be processed before updating the job progress
+ batchesPerProgressUpdate = 10
+)
type tableMetadataUpdateJobResumer struct {
- job *jobs.Job
+ job *jobs.Job
+ metrics *TableMetadataUpdateJobMetrics
}
var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil)
// Resume is part of the jobs.Resumer interface.
func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error {
+ // This job is a forever running background job, and it is always safe to
+ // terminate the SQL pod whenever the job is running, so mark it as idle.
j.job.MarkIdle(true)
execCtx := execCtxI.(sql.JobExecContext)
metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
+ j.metrics = &metrics
+ testKnobs := execCtx.ExecCfg().TableMetadataKnobs
+ var updater tablemetadatacacheutil.ITableMetadataUpdater
var onJobStartKnob, onJobCompleteKnob, onJobReady func()
- if execCtx.ExecCfg().TableMetadataKnobs != nil {
- onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart
- onJobCompleteKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobComplete
- onJobReady = execCtx.ExecCfg().TableMetadataKnobs.OnJobReady
+ if testKnobs != nil {
+ onJobStartKnob = testKnobs.OnJobStart
+ onJobCompleteKnob = testKnobs.OnJobComplete
+ onJobReady = testKnobs.OnJobReady
+ if testKnobs.TableMetadataUpdater != nil {
+ updater = testKnobs.TableMetadataUpdater
+ }
+ }
+
+ if updater == nil {
+ updater = newTableMetadataUpdater(j.job, &metrics, execCtx.ExecCfg().InternalDB.Executor(), testKnobs)
}
// We must reset the job's num runs to 0 so that it doesn't get
// delayed by the job system's exponential backoff strategy.
@@ -98,11 +115,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
if onJobStartKnob != nil {
onJobStartKnob()
}
- // Run table metadata update job.
- metrics.NumRuns.Inc(1)
+
j.markAsRunning(ctx)
- if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
+ err := updater.RunUpdater(ctx)
+ if err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
+ j.metrics.Errors.Inc(1)
}
j.markAsCompleted(ctx)
if onJobCompleteKnob != nil {
@@ -111,6 +129,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
}
}
+func updateProgress(ctx context.Context, job *jobs.Job, progress float32) {
+ if err := job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(progress)); err != nil {
+ log.Errorf(ctx, "Error updating table metadata log progress. error: %s", err.Error())
+ }
+}
+
// markAsRunning updates the last_start_time and status fields in the job's progress
// details and writes the job progress as a JSON string to the running status.
func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
@@ -121,6 +145,9 @@ func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job started at %s", now)
details.LastStartTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING
+ progress.Progress = &jobspb.Progress_FractionCompleted{
+ FractionCompleted: 0,
+ }
ju.UpdateProgress(progress)
return nil
}); err != nil {
@@ -138,6 +165,9 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job completed at %s", now)
details.LastCompletedTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING
+ progress.Progress = &jobspb.Progress_FractionCompleted{
+ FractionCompleted: 1.0,
+ }
ju.UpdateProgress(progress)
return nil
}); err != nil {
@@ -145,20 +175,6 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
}
}
-// updateTableMetadataCache performs a full update of system.table_metadata by collecting
-// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
-func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error {
- updater := newTableMetadataUpdater(ie)
- if _, err := updater.pruneCache(ctx); err != nil {
- log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
- }
-
- // We'll use the updated ret val in a follow-up to update metrics and
- // fractional job progress.
- _, err := updater.updateCache(ctx)
- return err
-}
-
// OnFailOrCancel implements jobs.Resumer.
func (j *tableMetadataUpdateJobResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, jobErr error,
@@ -180,7 +196,10 @@ func (j *tableMetadataUpdateJobResumer) CollectProfile(
}
type TableMetadataUpdateJobMetrics struct {
- NumRuns *metric.Counter
+ NumRuns *metric.Counter
+ UpdatedTables *metric.Counter
+ Errors *metric.Counter
+ Duration metric.IHistogram
}
func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
@@ -194,6 +213,30 @@ func newTableMetadataUpdateJobMetrics() metric.Struct {
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
+ UpdatedTables: metric.NewCounter(metric.Metadata{
+ Name: "obs.tablemetadata.update_job.table_updates",
+ Help: "The total number of rows that have been updated in system.table_metadata",
+ Measurement: "Rows Updated",
+ Unit: metric.Unit_COUNT,
+ MetricType: io_prometheus_client.MetricType_COUNTER,
+ }),
+ Errors: metric.NewCounter(metric.Metadata{
+ Name: "obs.tablemetadata.update_job.errors",
+ Help: "The total number of errors that have been emitted from the update table metadata job.",
+ Measurement: "Errors",
+ Unit: metric.Unit_COUNT,
+ MetricType: io_prometheus_client.MetricType_COUNTER,
+ }),
+ Duration: metric.NewHistogram(metric.HistogramOptions{
+ Metadata: metric.Metadata{
+ Name: "obs.tablemetadata.update_job.duration",
+ Help: "Time spent running the update table metadata job.",
+ Measurement: "Duration",
+ Unit: metric.Unit_NANOSECONDS},
+ Duration: base.DefaultHistogramWindowInterval(),
+ BucketConfig: metric.IOLatencyBuckets,
+ Mode: metric.HistogramModePrometheus,
+ }),
}
}
diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
index 4e7baac47a40..fdfb76483270 100644
--- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
+++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
@@ -17,15 +17,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
- "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
- "github.com/cockroachdb/cockroach/pkg/util/syncutil"
- "github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)
@@ -97,30 +95,8 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
ctx := context.Background()
- // We'll mock the job execution function to track when the job is run and
- // to avoid running the actual job which could take longer - we don't care
- // about the actual update logic in this test.
- var mockCalls []time.Time
- mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
- restoreUpdate := testutils.TestingHook(&updateJobExecFn,
- func(ctx context.Context, ie isql.Executor) error {
- mockMutex.Lock()
- defer mockMutex.Unlock()
- mockCalls = append(mockCalls, timeutil.Now())
- select {
- case jobRunCh <- struct{}{}:
- default:
- }
- return nil
- })
- defer restoreUpdate()
-
- getMockCallCount := func() int {
- mockMutex.RLock()
- defer mockMutex.RUnlock()
- return len(mockCalls)
- }
+ updater := mockUpdater{jobRunCh: jobRunCh}
waitForJobRuns := func(count int, timeout time.Duration) error {
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
@@ -136,7 +112,13 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
}
// Server setup.
- s := serverutils.StartServerOnly(t, base.TestServerArgs{})
+ s := serverutils.StartServerOnly(t, base.TestServerArgs{
+ Knobs: base.TestingKnobs{
+ TableMetadata: &tablemetadatacacheutil.TestingKnobs{
+ TableMetadataUpdater: &updater,
+ },
+ },
+ })
defer s.Stopper().Stop(ctx)
conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))
@@ -144,9 +126,9 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
// Wait for the job to be claimed by a node.
testutils.SucceedsSoon(t, func() error {
row := conn.Query(t, `
- SELECT claim_instance_id, status FROM system.jobs
+ SELECT claim_instance_id, status FROM system.jobs
WHERE id = $1 AND claim_instance_id IS NOT NULL
- AND status = 'running'`,
+ AND status = 'running'`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("no node has claimed the job")
@@ -154,25 +136,22 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
return nil
})
- require.Zero(t, getMockCallCount(), "Job should not run automatically by default")
+ require.Zero(t, len(updater.mockCalls), "Job should not run automatically by default")
t.Run("AutomaticUpdatesEnabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`)
DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond)
- err := waitForJobRuns(3, 10*time.Second)
+ err := waitForJobRuns(3, 30*time.Second)
require.NoError(t, err, "Job should have run at least 3 times")
- mockCallsCount := getMockCallCount()
- require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times")
+ require.GreaterOrEqual(t, len(updater.mockCalls), 3, "Job should have run at least 3 times")
conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`)
// We'll wait for one more signal in case the job was running when the setting was disabled.
// Ignore the error since it could timeout or be successful.
_ = waitForJobRuns(1, 200*time.Millisecond)
// Verify time between calls.
- mockMutex.RLock()
- defer mockMutex.RUnlock()
- for i := 1; i < len(mockCalls); i++ {
- timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1])
+ for i := 1; i < len(updater.mockCalls); i++ {
+ timeBetweenCalls := updater.mockCalls[i].Sub(updater.mockCalls[i-1])
require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond,
"Time between calls %d and %d should be at least 50ms", i-1, i)
}
@@ -181,9 +160,22 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
t.Run("AutomaticUpdatesDisabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`)
DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond)
- initialCount := getMockCallCount()
+ initialCount := len(updater.mockCalls)
err := waitForJobRuns(1, 200*time.Millisecond)
require.Error(t, err, "Job should not run after being disabled")
- require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled")
+ require.Equal(t, initialCount, len(updater.mockCalls), "Job count should not increase after being disabled")
})
}
+
+type mockUpdater struct {
+ mockCalls []time.Time
+ jobRunCh chan struct{}
+}
+
+func (t *mockUpdater) RunUpdater(ctx context.Context) error {
+ t.mockCalls = append(t.mockCalls, time.Now())
+ t.jobRunCh <- struct{}{}
+ return nil
+}
+
+var _ tablemetadatacacheutil.ITableMetadataUpdater = &mockUpdater{}
diff --git a/pkg/sql/tablemetadatacache/util/BUILD.bazel b/pkg/sql/tablemetadatacache/util/BUILD.bazel
index e1c5e7377bde..48839f177cc1 100644
--- a/pkg/sql/tablemetadatacache/util/BUILD.bazel
+++ b/pkg/sql/tablemetadatacache/util/BUILD.bazel
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "util",
- srcs = ["test_utils.go"],
+ srcs = [
+ "test_utils.go",
+ "util.go",
+ ],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util",
visibility = ["//visibility:public"],
)
diff --git a/pkg/sql/tablemetadatacache/util/test_utils.go b/pkg/sql/tablemetadatacache/util/test_utils.go
index 9eaefd13c10e..11947b3927f1 100644
--- a/pkg/sql/tablemetadatacache/util/test_utils.go
+++ b/pkg/sql/tablemetadatacache/util/test_utils.go
@@ -13,7 +13,46 @@ type TestingKnobs struct {
OnJobStart func()
// onJobComplete is called when the job completes
OnJobComplete func()
+ // aostClause overrides the AS OF SYSTEM TIME clause in queries used in
+ // table metadata update job.
+ aostClause string
+ // TableMetadataUpdater overrides the ITableMetadataUpdater used in
+ // tableMetadataUpdateJobResumer.Resume
+ TableMetadataUpdater ITableMetadataUpdater
}
// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
func (*TestingKnobs) ModuleTestingKnobs() {}
+
+// GetAOSTClause returns the appropriate AS OF SYSTEM TIME clause to be
+// used when reading from tables in the job run
+func (knobs *TestingKnobs) GetAOSTClause() string {
+ if knobs != nil {
+ return knobs.aostClause
+ }
+
+ return "AS OF SYSTEM TIME follower_read_timestamp()"
+}
+
+// CreateTestingKnobs creates a testing knob in the unit tests.
+//
+// Note: The table metadata update job uses follower read (AS OF SYSTEM TIME
+// follower_read_timestamp()) to ensure that contention between reads and writes
+// is minimized.
+//
+// However, in a new cluster in unit tests, system tables are created using the
+// migration framework. The migration framework goes through a list of
+// registered migrations and creates the stats system tables. By using follower
+// read, we shift the transaction read timestamp far enough to the past. This
+// means it is possible in the unit tests, the read timestamp would be chosen to
+// be before the creation of the stats table. This can cause 'descriptor not
+// found' error when accessing the system tables.
+//
+// Additionally, we don't want to completely remove the AOST clause in the unit
+// test. Therefore, `AS OF SYSTEM TIME '-1us'` is a compromise used to get
+// around the 'descriptor not found' error.
+func CreateTestingKnobs() *TestingKnobs {
+ return &TestingKnobs{
+ aostClause: "AS OF SYSTEM TIME '-1us'",
+ }
+}
diff --git a/pkg/sql/tablemetadatacache/util/util.go b/pkg/sql/tablemetadatacache/util/util.go
new file mode 100644
index 000000000000..8866e728d588
--- /dev/null
+++ b/pkg/sql/tablemetadatacache/util/util.go
@@ -0,0 +1,16 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the CockroachDB Software License
+// included in the /LICENSE file.
+
+package tablemetadatacacheutil
+
+import "context"
+
+// ITableMetadataUpdater is an interface that exposes a RunUpdater
+// to be tableMetadataUpdateJobResumer. This interface primarily
+// serves as a way to facilitate better testing of tableMetadataUpdater
+// and tableMetadataUpdateJobResumer.
+type ITableMetadataUpdater interface {
+ RunUpdater(ctx context.Context) error
+}