diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index e28b72418ae1..c5c51702617b 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -60,6 +60,9 @@ type SettingsWatcher struct { // inside secondary tenants. It will be uninitialized in a system // tenant. storageClusterVersion clusterversion.ClusterVersion + + // Used by TestingRestart. + updateWait chan struct{} } // testingWatcherKnobs allows the client to inject testing knobs into @@ -85,7 +88,7 @@ func New( stopper *stop.Stopper, storage Storage, // optional ) *SettingsWatcher { - return &SettingsWatcher{ + s := &SettingsWatcher{ clock: clock, codec: codec, settings: settingsToUpdate, @@ -94,6 +97,8 @@ func New( dec: MakeRowDecoder(codec), storage: storage, } + s.mu.updateWait = make(chan struct{}) + return s } // NewWithOverrides constructs a new SettingsWatcher which allows external @@ -141,6 +146,9 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + close(s.mu.updateWait) + s.mu.updateWait = make(chan struct{}) } s.mu.values = make(map[settings.InternalKey]settingsValue) @@ -248,9 +256,15 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } -func (w *SettingsWatcher) TestingRestart() { - if w.rfc != nil { - w.rfc.TestingRestart() +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. +func (s *SettingsWatcher) TestingRestart() { + if s.rfc != nil { + s.mu.Lock() + waitCh := s.mu.updateWait + s.mu.Unlock() + s.rfc.TestingRestart() + <-waitCh } } diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index 9560f4c5b58e..83aadc5effa6 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -64,6 +65,11 @@ type Watcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. rfc *rangefeedcache.Watcher + mu struct { + syncutil.Mutex + // Used by TestingRestart. + updateWait chan struct{} + } } // New constructs a new Watcher. @@ -78,6 +84,7 @@ func New( dec: MakeRowDecoder(), } w.store.Init() + w.mu.updateWait = make(chan struct{}) return w } @@ -162,6 +169,11 @@ func (w *Watcher) startRangeFeed( initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + w.mu.Lock() + defer w.mu.Unlock() + close(w.mu.updateWait) + w.mu.updateWait = make(chan struct{}) } } @@ -220,9 +232,15 @@ func (w *Watcher) WaitForStart(ctx context.Context) error { } } +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. func (w *Watcher) TestingRestart() { if w.rfc != nil { + w.mu.Lock() + waitCh := w.mu.updateWait + w.mu.Unlock() w.rfc.TestingRestart() + <-waitCh } } diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index d723445a25a5..d72d87d81647 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -15,6 +15,7 @@ import ( gosql "database/sql" "fmt" "math" + "regexp" "strconv" "testing" "time" @@ -251,6 +252,68 @@ func TestSQLStatsInitialDelay(t *testing.T) { "expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt) } +func TestSQLStatsLogDiscardMessage(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + var params base.TestServerArgs + params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + } + srv, conn, _ := serverutils.StartServer(t, params) + defer srv.Stopper().Stop(ctx) + + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'") + sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.metrics.max_mem_stmt_fingerprints=%d", 8)) + + for i := 0; i < 20; i++ { + appName := fmt.Sprintf("logDiscardTestApp%d", i) + sqlConn.Exec(t, "SET application_name = $1", appName) + sqlConn.Exec(t, "SELECT 1") + } + + log.FlushFiles() + + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`), + log.WithFlattenedSensitiveData, + ) + require.NoError(t, err) + require.Equal(t, 1, len(entries), "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries) + + // lower the time frame to verify log still occurs after the initial one + sqlConn.Exec(t, "SET CLUSTER SETTING sql.metrics.discarded_stats_log.interval='0.00001ms'") + + for i := 0; i < 20; i++ { + appName := fmt.Sprintf("logDiscardTestApp2%d", i) + sqlConn.Exec(t, "SET application_name = $1", appName) + sqlConn.Exec(t, "SELECT 1") + } + + log.FlushFiles() + + entries, err = log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`), + log.WithFlattenedSensitiveData, + ) + require.NoError(t, err) + require.GreaterOrEqual(t, len(entries), 1, "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries) +} + func TestSQLStatsMinimumFlushInterval(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index 1456ba560116..210643ff4da4 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -13,7 +13,6 @@ package sslocal import ( "context" "math" - "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/settings" @@ -31,14 +30,6 @@ import ( type SQLStats struct { st *cluster.Settings - // uniqueStmtFingerprintLimit is the limit on number of unique statement - // fingerprints we can store in memory. - uniqueStmtFingerprintLimit *settings.IntSetting - - // uniqueTxnFingerprintLimit is the limit on number of unique transaction - // fingerprints we can store in memory. - uniqueTxnFingerprintLimit *settings.IntSetting - mu struct { syncutil.Mutex @@ -50,15 +41,8 @@ type SQLStats struct { apps map[string]*ssmemstorage.Container } - atomic struct { - // uniqueStmtFingerprintCount is the number of unique statement fingerprints - // we are storing in memory. - uniqueStmtFingerprintCount int64 - - // uniqueTxnFingerprintCount is the number of unique transaction fingerprints - // we are storing in memory. - uniqueTxnFingerprintCount int64 - } + // Server level counter + atomic *ssmemstorage.SQLStatsAtomicCounters // flushTarget is a Sink that, when the SQLStats resets at the end of its // reset interval, the SQLStats will dump all of the stats into if it is not @@ -93,14 +77,16 @@ func newSQLStats( st, ) s := &SQLStats{ - st: st, - uniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit, - uniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit, - flushTarget: flushTarget, - knobs: knobs, - insights: insightsWriter, - latencyInformation: latencyInformation, + st: st, + flushTarget: flushTarget, + knobs: knobs, + insights: insightsWriter, + latencyInformation: latencyInformation, } + s.atomic = ssmemstorage.NewSQLStatsAtomicCounters( + st, + uniqueStmtFingerprintLimit, + uniqueTxnFingerprintLimit) s.mu.apps = make(map[string]*ssmemstorage.Container) s.mu.mon = monitor s.mu.mon.StartNoReserved(context.Background(), parentMon) @@ -110,7 +96,7 @@ func newSQLStats( // GetTotalFingerprintCount returns total number of unique statement and // transaction fingerprints stored in the current SQLStats. func (s *SQLStats) GetTotalFingerprintCount() int64 { - return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount) + return s.atomic.GetTotalFingerprintCount() } // GetTotalFingerprintBytes returns the total amount of bytes currently @@ -130,10 +116,7 @@ func (s *SQLStats) getStatsForApplication(appName string) *ssmemstorage.Containe } a := ssmemstorage.New( s.st, - s.uniqueStmtFingerprintLimit, - s.uniqueTxnFingerprintLimit, - &s.atomic.uniqueStmtFingerprintCount, - &s.atomic.uniqueTxnFingerprintCount, + s.atomic, s.mu.mon, appName, s.knobs, diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index ea0f0fbbbba4..d9535c90f0e2 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -107,10 +107,7 @@ func (s *SQLStats) GetApplicationStats(appName string, internal bool) sqlstats.A } a := ssmemstorage.New( s.st, - s.uniqueStmtFingerprintLimit, - s.uniqueTxnFingerprintLimit, - &s.atomic.uniqueStmtFingerprintCount, - &s.atomic.uniqueTxnFingerprintCount, + s.atomic, s.mu.mon, appName, s.knobs, diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index 34e5db02d6ed..1121d064970d 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/util/log" ) // StatsCollector is used to collect statement and transaction statistics @@ -106,8 +105,9 @@ func (s *StatsCollector) EndTransaction( s.ApplicationStats, ) + // Avoid taking locks if no stats are discarded. if discardedStats > 0 { - log.Warningf(ctx, "%d statement statistics discarded due to memory limit", discardedStats) + s.flushTarget.MaybeLogDiscardMessage(ctx) } s.ApplicationStats.Free(ctx) diff --git a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel index 8b19a01d995b..c1f64bf249b4 100644 --- a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel +++ b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "ssmemstorage", srcs = [ + "ss_mem_counter.go", "ss_mem_iterator.go", "ss_mem_storage.go", "ss_mem_writer.go", diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_counter.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_counter.go new file mode 100644 index 000000000000..5d0c7a4cb75f --- /dev/null +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_counter.go @@ -0,0 +1,177 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package sqlstats is a subsystem that is responsible for tracking the +// statistics of statements and transactions. + +package ssmemstorage + +import ( + "context" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +type SQLStatsAtomicCounters struct { + st *cluster.Settings + + // uniqueStmtFingerprintLimit is the limit on number of unique statement + // fingerprints we can store in memory. + UniqueStmtFingerprintLimit *settings.IntSetting + + // uniqueTxnFingerprintLimit is the limit on number of unique transaction + // fingerprints we can store in memory. + UniqueTxnFingerprintLimit *settings.IntSetting + + // uniqueStmtFingerprintCount is the number of unique statement fingerprints + // we are storing in memory. + uniqueStmtFingerprintCount int64 + + // uniqueTxnFingerprintCount is the number of unique transaction fingerprints + // we are storing in memory. + uniqueTxnFingerprintCount int64 + + // discardUniqueStmtFingerprintCount is the number of unique statement + // fingerprints that are discard because of memory limitations. + discardUniqueStmtFingerprintCount int64 + + // discardUniqueTxnFingerprintCount is the number of unique transaction + // fingerprints that are discard because of memory limitations. + discardUniqueTxnFingerprintCount int64 + + mu struct { + syncutil.Mutex + + // lastDiscardLogMessageSent is the last time a log message was sent for + // statistics being discarded because of memory pressure. + lastDiscardLogMessageSent time.Time + } +} + +// DiscardedStatsLogInterval specifies the interval between log emissions for discarded +// statement and transaction statistics due to reaching the SQL statistics memory limit. +var DiscardedStatsLogInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.metrics.discarded_stats_log.interval", + "interval between log emissions for discarded statistics due to SQL statistics memory limit", + 1*time.Minute, + settings.NonNegativeDuration, + settings.WithVisibility(settings.Reserved)) + +func NewSQLStatsAtomicCounters( + st *cluster.Settings, + uniqueStmtFingerprintLimit *settings.IntSetting, + uniqueTxnFingerprintLimit *settings.IntSetting, +) *SQLStatsAtomicCounters { + return &SQLStatsAtomicCounters{ + st: st, + UniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit, + UniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit, + } +} + +// maybeLogDiscardMessage logs a warning if statement or transaction +// fingerprints were discarded because of memory limits and enough time passed +// since the last time the warning was logged. This is necessary to avoid +// flooding the log with warnings once the limit is hit. +func (s *SQLStatsAtomicCounters) maybeLogDiscardMessage(ctx context.Context) { + discardSmtCnt := atomic.LoadInt64(&s.discardUniqueStmtFingerprintCount) + discardTxnCnt := atomic.LoadInt64(&s.discardUniqueTxnFingerprintCount) + if discardSmtCnt == 0 && discardTxnCnt == 0 { + return + } + + // Get the config values before the lock to reduce time in the lock. + discardLogInterval := DiscardedStatsLogInterval.Get(&s.st.SV) + stmtLimit := s.UniqueStmtFingerprintLimit.Get(&s.st.SV) + txnLimit := s.UniqueTxnFingerprintLimit.Get(&s.st.SV) + s.mu.Lock() + defer s.mu.Unlock() + timeNow := timeutil.Now() + + // Not enough time has passed since the last log message was sent. + if timeNow.Sub(s.mu.lastDiscardLogMessageSent) < discardLogInterval { + return + } + + // The discard counts might be slightly off because it's possible that the + // count changed after the initial load and before the log message is sent. + // The count being slightly off won't impact users looking at the message. It + // also avoids holding a lock on the counts which would block requests until + // the log is sent. + log.Warningf(ctx, "statistics discarded due to memory limit. transaction discard count: %d with limit: %d, statement discard count: %d with limit: %d, logged at interval: %s, last logged: %s", + discardTxnCnt, stmtLimit, discardSmtCnt, txnLimit, discardLogInterval, s.mu.lastDiscardLogMessageSent) + s.mu.lastDiscardLogMessageSent = timeNow + + // Reset the discard count back to 0 since the value was logged + atomic.StoreInt64(&s.discardUniqueStmtFingerprintCount, int64(0)) + atomic.StoreInt64(&s.discardUniqueTxnFingerprintCount, int64(0)) +} + +// tryAddStmtFingerprint attempts to add 1 to the server level count for +// statement level fingerprints and returns false if it is being throttled. +func (s *SQLStatsAtomicCounters) tryAddStmtFingerprint() (ok bool) { + limit := s.UniqueStmtFingerprintLimit.Get(&s.st.SV) + + // We check if we have reached the limit of unique fingerprints we can + // store. + incrementedFingerprintCount := + atomic.AddInt64(&s.uniqueStmtFingerprintCount, int64(1) /* delts */) + + // Abort if we have exceeded limit of unique statement fingerprints. + if incrementedFingerprintCount < limit { + return true + } + + atomic.AddInt64(&s.discardUniqueStmtFingerprintCount, int64(1)) + atomic.AddInt64(&s.uniqueStmtFingerprintCount, -int64(1) /* delts */) + return false +} + +// tryAddTxnFingerprint attempts to add 1 to the server level count for +// transaction level fingerprints and returns false if it is being throttled. +func (s *SQLStatsAtomicCounters) tryAddTxnFingerprint() (ok bool) { + limit := s.UniqueTxnFingerprintLimit.Get(&s.st.SV) + + // We check if we have reached the limit of unique fingerprints we can + // store. + incrementedFingerprintCount := + atomic.AddInt64(&s.uniqueTxnFingerprintCount, int64(1) /* delts */) + + if incrementedFingerprintCount < limit { + return true + } + + atomic.AddInt64(&s.discardUniqueTxnFingerprintCount, int64(1)) + atomic.AddInt64(&s.uniqueTxnFingerprintCount, -int64(1) /* delts */) + return false +} + +// freeByCnt decrements the statement and transaction count by the value +// passed in. This is used in scenarios where an entire container which is +// per an app name is being cleaned up. +func (s *SQLStatsAtomicCounters) freeByCnt( + uniqueStmtFingerprintCount, uniqueTxnFingerprintCount int64, +) { + atomic.AddInt64(&s.uniqueStmtFingerprintCount, uniqueStmtFingerprintCount) + atomic.AddInt64(&s.uniqueTxnFingerprintCount, uniqueTxnFingerprintCount) +} + +// GetTotalFingerprintCount returns total number of unique statement and +// transaction fingerprints stored in the current SQLStats. +func (s *SQLStatsAtomicCounters) GetTotalFingerprintCount() int64 { + return atomic.LoadInt64(&s.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.uniqueTxnFingerprintCount) +} diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 6b3325ce3808..8980a7d5d274 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -18,12 +18,10 @@ import ( "context" "encoding/json" "fmt" - "sync/atomic" "time" "unsafe" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" @@ -76,23 +74,8 @@ type Container struct { st *cluster.Settings appName string - // uniqueStmtFingerprintLimit is the limit on number of unique statement - // fingerprints we can store in memory. - uniqueStmtFingerprintLimit *settings.IntSetting - - // uniqueTxnFingerprintLimit is the limit on number of unique transaction - // fingerprints we can store in memory. - uniqueTxnFingerprintLimit *settings.IntSetting - - atomic struct { - // uniqueStmtFingerprintCount is the number of unique statement fingerprints - // we are storing in memory. - uniqueStmtFingerprintCount *int64 - - // uniqueTxnFingerprintCount is the number of unique transaction fingerprints - // we are storing in memory. - uniqueTxnFingerprintCount *int64 - } + // uniqueServerCount is a server level counter of all the unique fingerprints + uniqueServerCount *SQLStatsAtomicCounters mu struct { syncutil.RWMutex @@ -127,10 +110,7 @@ var _ sqlstats.ApplicationStats = &Container{} // New returns a new instance of Container. func New( st *cluster.Settings, - uniqueStmtFingerprintLimit *settings.IntSetting, - uniqueTxnFingerprintLimit *settings.IntSetting, - uniqueStmtFingerprintCount *int64, - uniqueTxnFingerprintCount *int64, + uniqueServerCount *SQLStatsAtomicCounters, mon *mon.BytesMonitor, appName string, knobs *sqlstats.TestingKnobs, @@ -138,14 +118,13 @@ func New( latencyInformation insights.LatencyInformation, ) *Container { s := &Container{ - st: st, - appName: appName, - uniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit, - uniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit, - mon: mon, - knobs: knobs, - insights: insightsWriter, - latencyInformation: latencyInformation, + st: st, + appName: appName, + mon: mon, + knobs: knobs, + insights: insightsWriter, + latencyInformation: latencyInformation, + uniqueServerCount: uniqueServerCount, } if mon != nil { @@ -156,9 +135,6 @@ func New( s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats) s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time) - s.atomic.uniqueStmtFingerprintCount = uniqueStmtFingerprintCount - s.atomic.uniqueTxnFingerprintCount = uniqueTxnFingerprintCount - return s } @@ -244,10 +220,7 @@ func NewTempContainerFromExistingStmtStats( container = New( nil, /* st */ - nil, /* uniqueStmtFingerprintLimit */ - nil, /* uniqueTxnFingerprintLimit */ - nil, /* uniqueStmtFingerprintCount */ - nil, /* uniqueTxnFingerprintCount */ + nil, /* uniqueServerCount */ nil, /* mon */ appName, nil, /* knobs */ @@ -298,6 +271,10 @@ func NewTempContainerFromExistingStmtStats( return container, nil /* remaining */, nil /* err */ } +func (s *Container) MaybeLogDiscardMessage(ctx context.Context) { + s.uniqueServerCount.maybeLogDiscardMessage(ctx) +} + // NewTempContainerFromExistingTxnStats creates a new Container by ingesting a slice // of CollectedTransactionStatistics sorted by .StatsData.App field. // It consumes the first chunk of the slice where all entries in the chunk @@ -318,10 +295,7 @@ func NewTempContainerFromExistingTxnStats( container = New( nil, /* st */ - nil, /* uniqueStmtFingerprintLimit */ - nil, /* uniqueTxnFingerprintLimit */ - nil, /* uniqueStmtFingerprintCount */ - nil, /* uniqueTxnFingerprintCount */ + nil, /* uniqueServerCount */ nil, /* mon */ appName, nil, /* knobs */ @@ -350,20 +324,13 @@ func NewTempContainerFromExistingTxnStats( // NewApplicationStatsWithInheritedOptions implements the // sqlstats.ApplicationStats interface. func (s *Container) NewApplicationStatsWithInheritedOptions() sqlstats.ApplicationStats { - var ( - uniqueStmtFingerprintCount int64 - uniqueTxnFingerprintCount int64 - ) s.mu.Lock() defer s.mu.Unlock() return New( s.st, - sqlstats.MaxSQLStatsStmtFingerprintsPerExplicitTxn, // There is no need to constraint txn fingerprint limit since in temporary // container, there will never be more than one transaction fingerprint. - nil, // uniqueTxnFingerprintLimit - &uniqueStmtFingerprintCount, - &uniqueTxnFingerprintCount, + nil, // uniqueServerCount s.mon, s.appName, s.knobs, @@ -558,18 +525,8 @@ func (s *Container) getStatsForStmtWithKeyLocked( if !ok && createIfNonexistent { // If the uniqueStmtFingerprintCount is nil, then we don't check for // fingerprint limit. - if s.atomic.uniqueStmtFingerprintCount != nil { - // We check if we have reached the limit of unique fingerprints we can - // store. - limit := s.uniqueStmtFingerprintLimit.Get(&s.st.SV) - incrementedFingerprintCount := - atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(1) /* delts */) - - // Abort if we have exceeded limit of unique statement fingerprints. - if incrementedFingerprintCount > limit { - atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, -int64(1) /* delts */) - return stats, false /* created */, true /* throttled */ - } + if s.uniqueServerCount != nil && !s.uniqueServerCount.tryAddStmtFingerprint() { + return stats, false /* created */, true /* throttled */ } stats = &stmtStats{} stats.ID = stmtFingerprintID @@ -603,17 +560,8 @@ func (s *Container) getStatsForTxnWithKeyLocked( if !ok && createIfNonexistent { // If the uniqueTxnFingerprintCount is nil, then we don't check for // fingerprint limit. - if s.atomic.uniqueTxnFingerprintCount != nil { - limit := s.uniqueTxnFingerprintLimit.Get(&s.st.SV) - incrementedFingerprintCount := - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(1) /* delts */) - - // If we have exceeded limit of fingerprint count, decrement the counter - // and abort. - if incrementedFingerprintCount > limit { - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, -int64(1) /* delts */) - return nil /* stats */, false /* created */, true /* throttled */ - } + if s.uniqueServerCount != nil && !s.uniqueServerCount.tryAddTxnFingerprint() { + return nil /* stats */, false /* created */, true /* throttled */ } stats = &txnStats{} stats.statementFingerprintIDs = stmtFingerprintIDs @@ -672,8 +620,9 @@ func (s *Container) Free(ctx context.Context) { } func (s *Container) freeLocked(ctx context.Context) { - atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(-len(s.mu.stmts))) - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(-len(s.mu.txns))) + if s.uniqueServerCount != nil { + s.uniqueServerCount.freeByCnt(int64(len(s.mu.stmts)), int64(len(s.mu.txns))) + } s.mu.acc.Clear(ctx) } diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index f375b3335ac1..527cb2b4351a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -94,6 +94,10 @@ type ApplicationStats interface { other ApplicationStats, ) uint64 + // MaybeLogDiscardMessage is used to possibly log a message when statistics + // are being discarded because of memory limits. + MaybeLogDiscardMessage(ctx context.Context) + // NewApplicationStatsWithInheritedOptions returns a new ApplicationStats // interface that inherits all memory limits of the existing NewApplicationStatsWithInheritedOptions() ApplicationStats