From fcc362102b572d8cc606502c4c431a3ab99b3c2c Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 4 Oct 2024 01:18:00 -0400 Subject: [PATCH 1/2] cli: fix double-quote escaping for JSON values with format=sql A previous commit (98f9c8afaa603cd8d57340468345e0e3653751a6) made an attempt to fix how JSON values are escaped when they contain invalid UTF8 codes and are displayed in the CLI using the --format=sql flag (see #107518). That commit ended up breaking how JSON values are escaped when they contain double quotes. Luckily it turns out that both problems were actually caused by a long-lived mistake in the `clisqlexec.FormatVal` function. It shouldn't use `fmt.Sprintf("%+q", s)` to escape a string that has invalid characters, as that conflicts with how SQL strings are normally escaped. The proper way is to use `lexbase.EscapeSQLString(s)`. Release note (bug fix): Fixed a bug where the CLI would not correctly escape JSON values that had double-quotes inside of a string when using the --format=sql flag. --- pkg/cli/clisqlexec/BUILD.bazel | 1 + pkg/cli/clisqlexec/format_sql.go | 2 +- pkg/cli/clisqlexec/format_sql_test.go | 71 +++++++++++++++++++++++++++ pkg/cli/clisqlexec/format_value.go | 7 ++- pkg/sql/lexbase/encode.go | 8 --- pkg/sql/lexbase/encode_test.go | 22 --------- 6 files changed, 78 insertions(+), 33 deletions(-) create mode 100644 pkg/cli/clisqlexec/format_sql_test.go diff --git a/pkg/cli/clisqlexec/BUILD.bazel b/pkg/cli/clisqlexec/BUILD.bazel index 2892e84f7c91..359969b019c5 100644 --- a/pkg/cli/clisqlexec/BUILD.bazel +++ b/pkg/cli/clisqlexec/BUILD.bazel @@ -39,6 +39,7 @@ go_test( name = "clisqlexec_test", srcs = [ "format_html_test.go", + "format_sql_test.go", "format_table_test.go", "format_value_test.go", "main_test.go", diff --git a/pkg/cli/clisqlexec/format_sql.go b/pkg/cli/clisqlexec/format_sql.go index 891938d80d5f..e4c5d41022d3 100644 --- a/pkg/cli/clisqlexec/format_sql.go +++ b/pkg/cli/clisqlexec/format_sql.go @@ -43,7 +43,7 @@ func (p *sqlReporter) iter(w, _ io.Writer, _ int, row []string) error { fmt.Fprint(w, "INSERT INTO results VALUES (") for i, r := range row { var buf bytes.Buffer - lexbase.EncodeSQLStringWithFlags(&buf, r, lexbase.EncNoDoubleEscapeQuotes) + lexbase.EncodeSQLStringWithFlags(&buf, r, lexbase.EncNoFlags) fmt.Fprint(w, buf.String()) if i < len(row)-1 { fmt.Fprint(w, ", ") diff --git a/pkg/cli/clisqlexec/format_sql_test.go b/pkg/cli/clisqlexec/format_sql_test.go new file mode 100644 index 000000000000..df5c0f67ffdd --- /dev/null +++ b/pkg/cli/clisqlexec/format_sql_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package clisqlexec_test + +import "github.com/cockroachdb/cockroach/pkg/cli" + +func Example_json_sql_format() { + c := cli.NewCLITest(cli.TestCLIParams{}) + defer c.Cleanup() + + testData := []string{ + `e'{"a": "bc"}'`, + `e'{"a": "b\u0099c"}'`, + `e'{"a": "b\\"c"}'`, + `'"there are \"quotes\" in this json string"'`, + `'""'`, + `'{}'`, + } + + for _, s := range testData { + query := `SELECT ` + s + `::json` + c.RunWithArgs([]string{"sql", "--format=sql", "-e", query}) + } + + // Output: + // sql --format=sql -e SELECT e'{"a": "bc"}'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES ('{"a": "bc"}'); + // -- 1 row + // sql --format=sql -e SELECT e'{"a": "b\u0099c"}'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES (e'{"a": "b\\u0099c"}'); + // -- 1 row + // sql --format=sql -e SELECT e'{"a": "b\\"c"}'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES (e'{"a": "b\\"c"}'); + // -- 1 row + // sql --format=sql -e SELECT '"there are \"quotes\" in this json string"'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES (e'"there are \\"quotes\\" in this json string"'); + // -- 1 row + // sql --format=sql -e SELECT '""'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES ('""'); + // -- 1 row + // sql --format=sql -e SELECT '{}'::json + // CREATE TABLE results ( + // jsonb STRING + // ); + // + // INSERT INTO results VALUES ('{}'); + // -- 1 row +} diff --git a/pkg/cli/clisqlexec/format_value.go b/pkg/cli/clisqlexec/format_value.go index 91c89622e879..a93b79d64a41 100644 --- a/pkg/cli/clisqlexec/format_value.go +++ b/pkg/cli/clisqlexec/format_value.go @@ -11,6 +11,8 @@ import ( "strings" "unicode" "unicode/utf8" + + "github.com/cockroachdb/cockroach/pkg/sql/lexbase" ) func isNotPrintableASCII(r rune) bool { return r < 0x20 || r > 0x7e || r == '"' || r == '\\' } @@ -40,10 +42,11 @@ func FormatVal(val driver.Value, showPrintableUnicode bool, showNewLinesAndTabs return t } } - s := fmt.Sprintf("%+q", t) + s := lexbase.EscapeSQLString(t) + // The result from EscapeSQLString is an escape-quoted string, like e'...'. // Strip the start and final quotes. The surrounding display // format (e.g. CSV/TSV) will add its own quotes. - return s[1 : len(s)-1] + return s[2 : len(s)-1] } // Fallback to printing the value as-is. diff --git a/pkg/sql/lexbase/encode.go b/pkg/sql/lexbase/encode.go index 7a19d82827a8..748e3fac042d 100644 --- a/pkg/sql/lexbase/encode.go +++ b/pkg/sql/lexbase/encode.go @@ -42,10 +42,6 @@ const ( // without wrapping quotes. EncBareIdentifiers - // EncNoDoubleEscapeQuotes indicates that backslashes will not be - // escaped when they are used as escape quotes. - EncNoDoubleEscapeQuotes - // EncFirstFreeFlagBit needs to remain unused; it is used as base // bit offset for tree.FmtFlags. EncFirstFreeFlagBit @@ -144,7 +140,6 @@ func EncodeSQLStringWithFlags(buf *bytes.Buffer, in string, flags EncodeFlags) { start := 0 escapedString := false bareStrings := flags.HasFlags(EncBareStrings) - noDoubleEscapeQuotes := flags.HasFlags(EncNoDoubleEscapeQuotes) // Loop through each unicode code point. for i, r := range in { if i < start { @@ -165,9 +160,6 @@ func EncodeSQLStringWithFlags(buf *bytes.Buffer, in string, flags EncodeFlags) { buf.WriteString("e'") // begin e'xxx' string escapedString = true } - if noDoubleEscapeQuotes && i+1 < len(in) && in[i:i+2] == "\\\"" { - continue - } buf.WriteString(in[start:i]) ln := utf8.RuneLen(r) diff --git a/pkg/sql/lexbase/encode_test.go b/pkg/sql/lexbase/encode_test.go index a6870a65666b..a8fbfd3f7544 100644 --- a/pkg/sql/lexbase/encode_test.go +++ b/pkg/sql/lexbase/encode_test.go @@ -112,28 +112,6 @@ func testEncodeString(t *testing.T, input []byte, encode func(*bytes.Buffer, str return stmt } -func TestEncodeSQLStringWithNoDoubleEscapeQuotes(t *testing.T) { - testCases := []struct { - input string - output string - }{ - // (GH issue #107518) - {`\"`, `e'\"'`}, - {`{"a": "b\u0099c"}`, `e'{"a": "b\\u0099c"}'`}, - {`{\"a\": \"b\u0099c\"}`, `e'{\"a\": \"b\\u0099c\"}'`}, - } - - for _, tc := range testCases { - var buf bytes.Buffer - lexbase.EncodeSQLStringWithFlags(&buf, tc.input, lexbase.EncNoDoubleEscapeQuotes) - out := buf.String() - - if out != tc.output { - t.Errorf("`%s`: expected `%s`, got `%s`", tc.input, tc.output, out) - } - } -} - func BenchmarkEncodeSQLString(b *testing.B) { str := strings.Repeat("foo", 10000) for i := 0; i < b.N; i++ { From efc635a40dc875404bf2e141340910c7541777b2 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:57:53 -0400 Subject: [PATCH 2/2] tablemetadatacache: update progress/metris of update tmj updates the job progress value of the update table metadata job as it processes batches of tables. The progress will not be updated on every successful batch update, but will instead be updated every nth batch, where n is defined by `batchesPerProgressUpdate`. This is done because each batch executes relatively quickly, and it is unnecessary to provide such granular updates to the progress, each of which results in a write to the database. Adds additional metrics to the update table metadata job: * UpdatedTables - The total number of table rows written to system.table_metadata * Errors - The total number of errors emitted from job runs * Duration - The time spent executing the job Part of: #130249 Epic: CRDB-37558 Release note: None --- docs/generated/metrics/metrics.html | 3 + pkg/server/api_v2_databases_metadata_test.go | 1 - pkg/server/server_sql.go | 3 +- pkg/sql/tablemetadatacache/BUILD.bazel | 7 +- .../table_metadata_updater.go | 95 ++++++++++++++++--- .../table_metadata_updater_test.go | 89 ++++++++++++++++- .../update_table_metadata_cache_job.go | 95 ++++++++++++++----- .../update_table_metadata_cache_job_test.go | 70 ++++++-------- pkg/sql/tablemetadatacache/util/BUILD.bazel | 5 +- pkg/sql/tablemetadatacache/util/test_utils.go | 39 ++++++++ pkg/sql/tablemetadatacache/util/util.go | 16 ++++ 11 files changed, 334 insertions(+), 89 deletions(-) create mode 100644 pkg/sql/tablemetadatacache/util/util.go diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 5669e0a5e54a..6792a7d8530a 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -1477,7 +1477,10 @@ APPLICATIONlogical_replication.replicated_time_secondsThe replicated time of the logical replication stream in seconds since the unix epoch.SecondsGAUGESECONDSAVGNONE APPLICATIONlogical_replication.retry_queue_bytesThe replicated time of the logical replication stream in seconds since the unix epoch.BytesGAUGEBYTESAVGNONE APPLICATIONlogical_replication.retry_queue_eventsThe replicated time of the logical replication stream in seconds since the unix epoch.EventsGAUGECOUNTAVGNONE +APPLICATIONobs.tablemetadata.update_job.durationTime spent running the update table metadata job.DurationHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONobs.tablemetadata.update_job.errorsThe total number of errors that have been emitted from the update table metadata job.ErrorsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONobs.tablemetadata.update_job.runsThe total number of runs of the update table metadata job.ExecutionsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONobs.tablemetadata.update_job.table_updatesThe total number of rows that have been updated in system.table_metadataRows UpdatedCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONphysical_replication.admit_latencyEvent admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processorNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONphysical_replication.commit_latencyEvent commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recordedNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONphysical_replication.cutover_progressThe number of ranges left to revert in order to complete an inflight cutoverRangesGAUGECOUNTAVGNONE diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index 2f103cb5d4c2..c1c661236c92 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -748,7 +748,6 @@ func TestTriggerMetadataUpdateJob(t *testing.T) { defer close(jobReadyChan) testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ TableMetadata: &tablemetadatacache_util.TestingKnobs{ OnJobReady: func() { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 16c4d934f237..595cf28ad7e0 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1146,11 +1146,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if tableStatsKnobs := cfg.TestingKnobs.TableStatsKnobs; tableStatsKnobs != nil { tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs) } - if tableMetadataKnobs := cfg.TestingKnobs.TableMetadata; tableMetadataKnobs != nil { execCfg.TableMetadataKnobs = tableMetadataKnobs.(*tablemetadatacacheutil.TestingKnobs) - } + // Set up internal memory metrics for use by internal SQL executors. // Don't add them to the registry now because it will be added as part of pgServer metrics. sqlMemMetrics := sql.MakeMemMetrics("sql", cfg.HistogramWindowInterval()) diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index a19ae735faff..50c4a992871a 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/roachpb", @@ -20,6 +21,7 @@ go_library( "//pkg/sql/isql", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/tablemetadatacache/util", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/timeutil", @@ -48,9 +50,12 @@ go_test( "//pkg/kv/kvserver", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", "//pkg/server/serverpb", + "//pkg/sql", "//pkg/sql/isql", + "//pkg/sql/tablemetadatacache/util", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", @@ -59,8 +64,6 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/syncutil", - "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater.go b/pkg/sql/tablemetadatacache/table_metadata_updater.go index 2c86793309cc..c7dd261c854f 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater.go @@ -10,22 +10,27 @@ import ( "context" gojson "encoding/json" "fmt" + "math" "time" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const pruneBatchSize = 512 // tableMetadataUpdater encapsulates the logic for updating the table metadata cache. type tableMetadataUpdater struct { - ie isql.Executor - // upsertQuery is the query used to upsert table metadata rows, - // it is reused for each batch to avoid allocations between batches. - upsertQuery *tableMetadataBatchUpsertQuery + ie isql.Executor + metrics *TableMetadataUpdateJobMetrics + updateProgress func(ctx context.Context, progress float32) + testKnobs *tablemetadatacacheutil.TestingKnobs } // tableMetadataDetails contains additional details for a table_metadata row that doesn't @@ -40,16 +45,52 @@ type tableMetadataDetails struct { StatsLastUpdated *time.Time `json:"stats_last_updated"` } +var _ tablemetadatacacheutil.ITableMetadataUpdater = &tableMetadataUpdater{} + // newTableMetadataUpdater creates a new tableMetadataUpdater. -func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater { - return &tableMetadataUpdater{ie: ie, upsertQuery: newTableMetadataBatchUpsertQuery(tableBatchSize)} +var newTableMetadataUpdater = func( + job *jobs.Job, + metrics *TableMetadataUpdateJobMetrics, + ie isql.Executor, + testKnobs *tablemetadatacacheutil.TestingKnobs) *tableMetadataUpdater { + return &tableMetadataUpdater{ + ie: ie, + metrics: metrics, + updateProgress: func(ctx context.Context, progress float32) { + updateProgress(ctx, job, progress) + }, + testKnobs: testKnobs, + } +} + +// RunUpdater implements tablemetadatacacheutil.ITableMetadataUpdater +func (u *tableMetadataUpdater) RunUpdater(ctx context.Context) error { + u.metrics.NumRuns.Inc(1) + sw := timeutil.NewStopWatch() + sw.Start() + if _, err := u.pruneCache(ctx); err != nil { + log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error()) + } + rowsUpdated, err := u.updateCache(ctx) + sw.Stop() + u.metrics.Duration.RecordValue(sw.Elapsed().Nanoseconds()) + u.metrics.UpdatedTables.Inc(int64(rowsUpdated)) + return err } // updateCache performs a full update of the system.table_metadata, returning // the number of rows updated and the last error encountered. func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, err error) { + // upsertQuery is the query used to upsert table metadata rows, + // it is reused for each batch to avoid allocations between batches. + upsert := newTableMetadataBatchUpsertQuery(tableBatchSize) it := newTableMetadataBatchIterator(u.ie) - + estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx) + if err != nil { + log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error()) + } + estimatedBatches := int(math.Ceil(float64(estimatedRowsToUpdate) / float64(tableBatchSize))) + batchNum := 0 for { more, err := it.fetchNextBatch(ctx, tableBatchSize) if err != nil { @@ -57,6 +98,7 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er // https://github.com/cockroachdb/cockroach/issues/130040. For now, // we can't continue because the page key is in an invalid state. log.Errorf(ctx, "failed to fetch next batch: %s", err.Error()) + u.metrics.Errors.Inc(1) return updated, err } @@ -64,14 +106,22 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er // No more results. break } - - count, err := u.upsertBatch(ctx, it.batchRows) + batchNum++ + count, err := u.upsertBatch(ctx, it.batchRows, upsert) if err != nil { // If an upsert fails, let's just continue to the next batch for now. log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error()) + u.metrics.Errors.Inc(1) continue } + updated += count + if batchNum == estimatedBatches { + u.updateProgress(ctx, .99) + } else if batchNum%batchesPerProgressUpdate == 0 && estimatedBatches > 0 { + progress := float32(updated) / float32(estimatedRowsToUpdate) + u.updateProgress(ctx, progress) + } } return updated, err @@ -116,13 +166,12 @@ RETURNING table_id`, pruneBatchSize) // upsertBatch upserts the given batch of table metadata rows returning // the number of rows upserted and any error that occurred. func (u *tableMetadataUpdater) upsertBatch( - ctx context.Context, batch []tableMetadataIterRow, + ctx context.Context, batch []tableMetadataIterRow, upsertQuery *tableMetadataBatchUpsertQuery, ) (int, error) { - u.upsertQuery.resetForBatch() - + upsertQuery.resetForBatch() upsertBatchSize := 0 for _, row := range batch { - if err := u.upsertQuery.addRow(ctx, &row); err != nil { + if err := upsertQuery.addRow(ctx, &row); err != nil { log.Errorf(ctx, "failed to add row to upsert batch: %s", err.Error()) continue } @@ -138,11 +187,27 @@ func (u *tableMetadataUpdater) upsertBatch( "batch-upsert-table-metadata", nil, // txn sessiondata.NodeUserWithBulkLowPriSessionDataOverride, - u.upsertQuery.getQuery(), - u.upsertQuery.args..., + upsertQuery.getQuery(), + upsertQuery.args..., ) } +func (u *tableMetadataUpdater) getRowsToUpdateCount(ctx context.Context) (int, error) { + query := fmt.Sprintf(` +SELECT count(*)::INT +FROM system.namespace t +JOIN system.namespace d ON t."parentID" = d.id +%s +WHERE d."parentID" = 0 and t."parentSchemaID" != 0 +`, u.testKnobs.GetAOSTClause()) + row, err := u.ie.QueryRow(ctx, "get-table-metadata-row-count", nil, query) + if err != nil { + return 0, err + } + + return int(tree.MustBeDInt(row[0])), nil +} + type tableMetadataBatchUpsertQuery struct { stmt bytes.Buffer args []interface{} diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go index ce1aeca05b8b..eaac9b8d2c43 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go @@ -8,17 +8,24 @@ package tablemetadatacache import ( "context" "fmt" + "math" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/isql" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" ) // TestDataDrivenTableMetadataCacheUpdater tests the operations performed by @@ -29,12 +36,32 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + knobs := tablemetadatacacheutil.CreateTestingKnobs() + s := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + TableMetadata: knobs, + }, + }) defer s.Stopper().Stop(ctx) queryConn := s.ApplicationLayer().SQLConn(t) s.ApplicationLayer().DB() + jobRegistry := s.JobRegistry().(*jobs.Registry) + jobId := jobRegistry.MakeJobID() + // Create a new job so that we don't have to wait for the existing one be claimed + jr := jobs.Record{ + JobID: jobId, + Description: jobspb.TypeUpdateTableMetadataCache.String(), + Details: jobspb.UpdateTableMetadataCacheDetails{}, + Progress: jobspb.UpdateTableMetadataCacheProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: username.NodeUser, ID: username.NodeUserID}, + Username: username.NodeUserName(), + NonCancelable: true, + } + job, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobId, nil) + metrics := newTableMetadataUpdateJobMetrics().(TableMetadataUpdateJobMetrics) + require.NoError(t, err) datadriven.Walk(t, datapathutils.TestDataPath(t, ""), func(t *testing.T, path string) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -49,14 +76,14 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) { } return res case "update-cache": - updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor)) + updater := newTableMetadataUpdater(job, &metrics, s.InternalExecutor().(isql.Executor), knobs) updated, err := updater.updateCache(ctx) if err != nil { return err.Error() } return fmt.Sprintf("updated %d table(s)", updated) case "prune-cache": - updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor)) + updater := newTableMetadataUpdater(job, &metrics, s.InternalExecutor().(isql.Executor), knobs) pruned, err := updater.pruneCache(ctx) if err != nil { return err.Error() @@ -79,3 +106,59 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) { }) } + +func TestTableMetadataUpdateJobProgressAndMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var currentProgress float32 + // Server setup. + knobs := tablemetadatacacheutil.CreateTestingKnobs() + s := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + TableMetadata: knobs, + }, + }) + defer s.Stopper().Stop(ctx) + conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t)) + metrics := newTableMetadataUpdateJobMetrics().(TableMetadataUpdateJobMetrics) + count := 0 + updater := tableMetadataUpdater{ + ie: s.ExecutorConfig().(sql.ExecutorConfig).InternalDB.Executor(), + metrics: &metrics, + updateProgress: func(ctx context.Context, progress float32) { + require.Greater(t, progress, currentProgress, "progress should be greater than current progress") + currentProgress = progress + count++ + }, + testKnobs: knobs, + } + require.NoError(t, updater.RunUpdater(ctx)) + updatedTables := metrics.UpdatedTables.Count() + require.Greater(t, updatedTables, int64(0)) + // Since there are only system tables in the cluster, the threshold to call updateProgress isn't met. + // Update progress will only be called once. + require.Equal(t, 1, count) + require.Equal(t, int64(0), metrics.Errors.Count()) + require.Equal(t, float32(.99), currentProgress) + require.Equal(t, int64(1), metrics.NumRuns.Count()) + require.Greater(t, metrics.Duration.CumulativeSnapshot().Mean(), float64(0)) + count = 0 + currentProgress = 0 + + // generate 500 random tables + conn.Exec(t, + `SELECT crdb_internal.generate_test_objects('{"names": "random_table_", "counts": [500], "randomize_columns": true}'::JSONB)`) + require.NoError(t, updater.RunUpdater(ctx)) + // The updated tables metric doesn't reset between runs, so it should increase by updatedTables + 500, because 500 + // random tables were previously generated + expectedTablesUpdated := (updatedTables * 2) + 500 + require.Equal(t, expectedTablesUpdated, metrics.UpdatedTables.Count()) + estimatedBatches := int(math.Ceil(float64(expectedTablesUpdated) / float64(tableBatchSize))) + estimatedProgressUpdates := estimatedBatches / batchesPerProgressUpdate + require.GreaterOrEqual(t, count, estimatedProgressUpdates) + require.Equal(t, int64(0), metrics.Errors.Count()) + require.Equal(t, float32(.99), currentProgress) + require.Equal(t, int64(2), metrics.NumRuns.Count()) +} diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 7f6a3a029f39..5211199e2179 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -9,11 +9,13 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/isql" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -21,28 +23,43 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) -// updateJobExecFn specifies the function that is run on each iteration of the -// table metadata update job. It can be overriden in tests. -var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache +const ( + // batchesPerProgressUpdate is used to determine how many batches + // should be processed before updating the job progress + batchesPerProgressUpdate = 10 +) type tableMetadataUpdateJobResumer struct { - job *jobs.Job + job *jobs.Job + metrics *TableMetadataUpdateJobMetrics } var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil) // Resume is part of the jobs.Resumer interface. func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error { + // This job is a forever running background job, and it is always safe to + // terminate the SQL pod whenever the job is running, so mark it as idle. j.job.MarkIdle(true) execCtx := execCtxI.(sql.JobExecContext) metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct(). JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics) + j.metrics = &metrics + testKnobs := execCtx.ExecCfg().TableMetadataKnobs + var updater tablemetadatacacheutil.ITableMetadataUpdater var onJobStartKnob, onJobCompleteKnob, onJobReady func() - if execCtx.ExecCfg().TableMetadataKnobs != nil { - onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart - onJobCompleteKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobComplete - onJobReady = execCtx.ExecCfg().TableMetadataKnobs.OnJobReady + if testKnobs != nil { + onJobStartKnob = testKnobs.OnJobStart + onJobCompleteKnob = testKnobs.OnJobComplete + onJobReady = testKnobs.OnJobReady + if testKnobs.TableMetadataUpdater != nil { + updater = testKnobs.TableMetadataUpdater + } + } + + if updater == nil { + updater = newTableMetadataUpdater(j.job, &metrics, execCtx.ExecCfg().InternalDB.Executor(), testKnobs) } // We must reset the job's num runs to 0 so that it doesn't get // delayed by the job system's exponential backoff strategy. @@ -98,11 +115,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int if onJobStartKnob != nil { onJobStartKnob() } - // Run table metadata update job. - metrics.NumRuns.Inc(1) + j.markAsRunning(ctx) - if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { + err := updater.RunUpdater(ctx) + if err != nil { log.Errorf(ctx, "error running table metadata update job: %s", err) + j.metrics.Errors.Inc(1) } j.markAsCompleted(ctx) if onJobCompleteKnob != nil { @@ -111,6 +129,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int } } +func updateProgress(ctx context.Context, job *jobs.Job, progress float32) { + if err := job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(progress)); err != nil { + log.Errorf(ctx, "Error updating table metadata log progress. error: %s", err.Error()) + } +} + // markAsRunning updates the last_start_time and status fields in the job's progress // details and writes the job progress as a JSON string to the running status. func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) { @@ -121,6 +145,9 @@ func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) { progress.RunningStatus = fmt.Sprintf("Job started at %s", now) details.LastStartTime = &now details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 0, + } ju.UpdateProgress(progress) return nil }); err != nil { @@ -138,6 +165,9 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) { progress.RunningStatus = fmt.Sprintf("Job completed at %s", now) details.LastCompletedTime = &now details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 1.0, + } ju.UpdateProgress(progress) return nil }); err != nil { @@ -145,20 +175,6 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) { } } -// updateTableMetadataCache performs a full update of system.table_metadata by collecting -// metadata from the system.namespace, system.descriptor tables and table span stats RPC. -func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error { - updater := newTableMetadataUpdater(ie) - if _, err := updater.pruneCache(ctx); err != nil { - log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error()) - } - - // We'll use the updated ret val in a follow-up to update metrics and - // fractional job progress. - _, err := updater.updateCache(ctx) - return err -} - // OnFailOrCancel implements jobs.Resumer. func (j *tableMetadataUpdateJobResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, jobErr error, @@ -180,7 +196,10 @@ func (j *tableMetadataUpdateJobResumer) CollectProfile( } type TableMetadataUpdateJobMetrics struct { - NumRuns *metric.Counter + NumRuns *metric.Counter + UpdatedTables *metric.Counter + Errors *metric.Counter + Duration metric.IHistogram } func (m TableMetadataUpdateJobMetrics) MetricStruct() {} @@ -194,6 +213,30 @@ func newTableMetadataUpdateJobMetrics() metric.Struct { Unit: metric.Unit_COUNT, MetricType: io_prometheus_client.MetricType_COUNTER, }), + UpdatedTables: metric.NewCounter(metric.Metadata{ + Name: "obs.tablemetadata.update_job.table_updates", + Help: "The total number of rows that have been updated in system.table_metadata", + Measurement: "Rows Updated", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }), + Errors: metric.NewCounter(metric.Metadata{ + Name: "obs.tablemetadata.update_job.errors", + Help: "The total number of errors that have been emitted from the update table metadata job.", + Measurement: "Errors", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }), + Duration: metric.NewHistogram(metric.HistogramOptions{ + Metadata: metric.Metadata{ + Name: "obs.tablemetadata.update_job.duration", + Help: "Time spent running the update table metadata job.", + Measurement: "Duration", + Unit: metric.Unit_NANOSECONDS}, + Duration: base.DefaultHistogramWindowInterval(), + BucketConfig: metric.IOLatencyBuckets, + Mode: metric.HistogramModePrometheus, + }), } } diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index 4e7baac47a40..fdfb76483270 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -17,15 +17,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql/isql" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -97,30 +95,8 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { ctx := context.Background() - // We'll mock the job execution function to track when the job is run and - // to avoid running the actual job which could take longer - we don't care - // about the actual update logic in this test. - var mockCalls []time.Time - mockMutex := syncutil.RWMutex{} jobRunCh := make(chan struct{}) - restoreUpdate := testutils.TestingHook(&updateJobExecFn, - func(ctx context.Context, ie isql.Executor) error { - mockMutex.Lock() - defer mockMutex.Unlock() - mockCalls = append(mockCalls, timeutil.Now()) - select { - case jobRunCh <- struct{}{}: - default: - } - return nil - }) - defer restoreUpdate() - - getMockCallCount := func() int { - mockMutex.RLock() - defer mockMutex.RUnlock() - return len(mockCalls) - } + updater := mockUpdater{jobRunCh: jobRunCh} waitForJobRuns := func(count int, timeout time.Duration) error { ctxWithCancel, cancel := context.WithTimeout(ctx, timeout) @@ -136,7 +112,13 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { } // Server setup. - s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + s := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + TableMetadata: &tablemetadatacacheutil.TestingKnobs{ + TableMetadataUpdater: &updater, + }, + }, + }) defer s.Stopper().Stop(ctx) conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t)) @@ -144,9 +126,9 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { // Wait for the job to be claimed by a node. testutils.SucceedsSoon(t, func() error { row := conn.Query(t, ` - SELECT claim_instance_id, status FROM system.jobs + SELECT claim_instance_id, status FROM system.jobs WHERE id = $1 AND claim_instance_id IS NOT NULL - AND status = 'running'`, + AND status = 'running'`, jobs.UpdateTableMetadataCacheJobID) if !row.Next() { return errors.New("no node has claimed the job") @@ -154,25 +136,22 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { return nil }) - require.Zero(t, getMockCallCount(), "Job should not run automatically by default") + require.Zero(t, len(updater.mockCalls), "Job should not run automatically by default") t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) - err := waitForJobRuns(3, 10*time.Second) + err := waitForJobRuns(3, 30*time.Second) require.NoError(t, err, "Job should have run at least 3 times") - mockCallsCount := getMockCallCount() - require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times") + require.GreaterOrEqual(t, len(updater.mockCalls), 3, "Job should have run at least 3 times") conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`) // We'll wait for one more signal in case the job was running when the setting was disabled. // Ignore the error since it could timeout or be successful. _ = waitForJobRuns(1, 200*time.Millisecond) // Verify time between calls. - mockMutex.RLock() - defer mockMutex.RUnlock() - for i := 1; i < len(mockCalls); i++ { - timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1]) + for i := 1; i < len(updater.mockCalls); i++ { + timeBetweenCalls := updater.mockCalls[i].Sub(updater.mockCalls[i-1]) require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond, "Time between calls %d and %d should be at least 50ms", i-1, i) } @@ -181,9 +160,22 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) - initialCount := getMockCallCount() + initialCount := len(updater.mockCalls) err := waitForJobRuns(1, 200*time.Millisecond) require.Error(t, err, "Job should not run after being disabled") - require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled") + require.Equal(t, initialCount, len(updater.mockCalls), "Job count should not increase after being disabled") }) } + +type mockUpdater struct { + mockCalls []time.Time + jobRunCh chan struct{} +} + +func (t *mockUpdater) RunUpdater(ctx context.Context) error { + t.mockCalls = append(t.mockCalls, time.Now()) + t.jobRunCh <- struct{}{} + return nil +} + +var _ tablemetadatacacheutil.ITableMetadataUpdater = &mockUpdater{} diff --git a/pkg/sql/tablemetadatacache/util/BUILD.bazel b/pkg/sql/tablemetadatacache/util/BUILD.bazel index e1c5e7377bde..48839f177cc1 100644 --- a/pkg/sql/tablemetadatacache/util/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/util/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "util", - srcs = ["test_utils.go"], + srcs = [ + "test_utils.go", + "util.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util", visibility = ["//visibility:public"], ) diff --git a/pkg/sql/tablemetadatacache/util/test_utils.go b/pkg/sql/tablemetadatacache/util/test_utils.go index 9eaefd13c10e..11947b3927f1 100644 --- a/pkg/sql/tablemetadatacache/util/test_utils.go +++ b/pkg/sql/tablemetadatacache/util/test_utils.go @@ -13,7 +13,46 @@ type TestingKnobs struct { OnJobStart func() // onJobComplete is called when the job completes OnJobComplete func() + // aostClause overrides the AS OF SYSTEM TIME clause in queries used in + // table metadata update job. + aostClause string + // TableMetadataUpdater overrides the ITableMetadataUpdater used in + // tableMetadataUpdateJobResumer.Resume + TableMetadataUpdater ITableMetadataUpdater } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. func (*TestingKnobs) ModuleTestingKnobs() {} + +// GetAOSTClause returns the appropriate AS OF SYSTEM TIME clause to be +// used when reading from tables in the job run +func (knobs *TestingKnobs) GetAOSTClause() string { + if knobs != nil { + return knobs.aostClause + } + + return "AS OF SYSTEM TIME follower_read_timestamp()" +} + +// CreateTestingKnobs creates a testing knob in the unit tests. +// +// Note: The table metadata update job uses follower read (AS OF SYSTEM TIME +// follower_read_timestamp()) to ensure that contention between reads and writes +// is minimized. +// +// However, in a new cluster in unit tests, system tables are created using the +// migration framework. The migration framework goes through a list of +// registered migrations and creates the stats system tables. By using follower +// read, we shift the transaction read timestamp far enough to the past. This +// means it is possible in the unit tests, the read timestamp would be chosen to +// be before the creation of the stats table. This can cause 'descriptor not +// found' error when accessing the system tables. +// +// Additionally, we don't want to completely remove the AOST clause in the unit +// test. Therefore, `AS OF SYSTEM TIME '-1us'` is a compromise used to get +// around the 'descriptor not found' error. +func CreateTestingKnobs() *TestingKnobs { + return &TestingKnobs{ + aostClause: "AS OF SYSTEM TIME '-1us'", + } +} diff --git a/pkg/sql/tablemetadatacache/util/util.go b/pkg/sql/tablemetadatacache/util/util.go new file mode 100644 index 000000000000..8866e728d588 --- /dev/null +++ b/pkg/sql/tablemetadatacache/util/util.go @@ -0,0 +1,16 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package tablemetadatacacheutil + +import "context" + +// ITableMetadataUpdater is an interface that exposes a RunUpdater +// to be tableMetadataUpdateJobResumer. This interface primarily +// serves as a way to facilitate better testing of tableMetadataUpdater +// and tableMetadataUpdateJobResumer. +type ITableMetadataUpdater interface { + RunUpdater(ctx context.Context) error +}