Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110805: sql: limit statistics discard log message r=j82w a=j82w

Problem:
The discard log message occurs for every transaction end after the limit is hit. This causes the log to be flooded with discard messages. This is not useful for users and can cause issues with telemetry pipelines.

Solution:
The discard message will only be logged once per minute. The log rate is controlled by a cluster setting. This allows the message to be set to a very large interval if this expected behavior for a cluster.

Refactored:
The SQLStats creates and hold the reference to the counts. Then each container which is per an app name is passed the counts by reference. It's not obvious that the counts are shared between the containers. The code was refactored to make a single object to hold the counts and pass all the related content together. This makes the code easier to read and expand in the future if other values need to be added.

Fixes: cockroachdb#110454

Release note (sql change): The discard log message is now limited to once per minute by default. The message was also changed to have both the number of transactions and the number of statements that were discarded.

110947: server: fix sync on setting overrides for secondary tenants r=yuzefovich a=knz

Improves/completes cockroachdb#110789
Prerequisite for tests in cockroachdb#110758.
Fixes cockroachdb#110560.
Epic: CRDB-6671

The previous patch in this area was merely restarting the rangefeed but did not actually wait for the initial update event to be received.

This patch fixes it.

Release note: None

Co-authored-by: j82w <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Sep 20, 2023
3 parents bcac3fe + 91af030 + 10e1ea9 commit db64500
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 115 deletions.
22 changes: 18 additions & 4 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type SettingsWatcher struct {
// inside secondary tenants. It will be uninitialized in a system
// tenant.
storageClusterVersion clusterversion.ClusterVersion

// Used by TestingRestart.
updateWait chan struct{}
}

// testingWatcherKnobs allows the client to inject testing knobs into
Expand All @@ -85,7 +88,7 @@ func New(
stopper *stop.Stopper,
storage Storage, // optional
) *SettingsWatcher {
return &SettingsWatcher{
s := &SettingsWatcher{
clock: clock,
codec: codec,
settings: settingsToUpdate,
Expand All @@ -94,6 +97,8 @@ func New(
dec: MakeRowDecoder(codec),
storage: storage,
}
s.mu.updateWait = make(chan struct{})
return s
}

// NewWithOverrides constructs a new SettingsWatcher which allows external
Expand Down Expand Up @@ -141,6 +146,9 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
initialScan.done = true
close(initialScan.ch)
}
// Used by TestingRestart().
close(s.mu.updateWait)
s.mu.updateWait = make(chan struct{})
}

s.mu.values = make(map[settings.InternalKey]settingsValue)
Expand Down Expand Up @@ -248,9 +256,15 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

func (w *SettingsWatcher) TestingRestart() {
if w.rfc != nil {
w.rfc.TestingRestart()
// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (s *SettingsWatcher) TestingRestart() {
if s.rfc != nil {
s.mu.Lock()
waitCh := s.mu.updateWait
s.mu.Unlock()
s.rfc.TestingRestart()
<-waitCh
}
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -64,6 +65,11 @@ type Watcher struct {
// rfc provides access to the underlying rangefeedcache.Watcher for
// testing.
rfc *rangefeedcache.Watcher
mu struct {
syncutil.Mutex
// Used by TestingRestart.
updateWait chan struct{}
}
}

// New constructs a new Watcher.
Expand All @@ -78,6 +84,7 @@ func New(
dec: MakeRowDecoder(),
}
w.store.Init()
w.mu.updateWait = make(chan struct{})
return w
}

Expand Down Expand Up @@ -162,6 +169,11 @@ func (w *Watcher) startRangeFeed(
initialScan.done = true
close(initialScan.ch)
}
// Used by TestingRestart().
w.mu.Lock()
defer w.mu.Unlock()
close(w.mu.updateWait)
w.mu.updateWait = make(chan struct{})
}
}

Expand Down Expand Up @@ -220,9 +232,15 @@ func (w *Watcher) WaitForStart(ctx context.Context) error {
}
}

// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (w *Watcher) TestingRestart() {
if w.rfc != nil {
w.mu.Lock()
waitCh := w.mu.updateWait
w.mu.Unlock()
w.rfc.TestingRestart()
<-waitCh
}
}

Expand Down
63 changes: 63 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gosql "database/sql"
"fmt"
"math"
"regexp"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -251,6 +252,68 @@ func TestSQLStatsInitialDelay(t *testing.T) {
"expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt)
}

func TestSQLStatsLogDiscardMessage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
fakeTime := stubTime{
aggInterval: time.Hour,
}
fakeTime.setTime(timeutil.Now())

var params base.TestServerArgs
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
StubTimeNow: fakeTime.Now,
}
srv, conn, _ := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)

sqlConn := sqlutils.MakeSQLRunner(conn)

sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.metrics.max_mem_stmt_fingerprints=%d", 8))

for i := 0; i < 20; i++ {
appName := fmt.Sprintf("logDiscardTestApp%d", i)
sqlConn.Exec(t, "SET application_name = $1", appName)
sqlConn.Exec(t, "SELECT 1")
}

log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`),
log.WithFlattenedSensitiveData,
)
require.NoError(t, err)
require.Equal(t, 1, len(entries), "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries)

// lower the time frame to verify log still occurs after the initial one
sqlConn.Exec(t, "SET CLUSTER SETTING sql.metrics.discarded_stats_log.interval='0.00001ms'")

for i := 0; i < 20; i++ {
appName := fmt.Sprintf("logDiscardTestApp2%d", i)
sqlConn.Exec(t, "SET application_name = $1", appName)
sqlConn.Exec(t, "SELECT 1")
}

log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`),
log.WithFlattenedSensitiveData,
)
require.NoError(t, err)
require.GreaterOrEqual(t, len(entries), 1, "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries)
}

func TestSQLStatsMinimumFlushInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
43 changes: 13 additions & 30 deletions pkg/sql/sqlstats/sslocal/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package sslocal
import (
"context"
"math"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -31,14 +30,6 @@ import (
type SQLStats struct {
st *cluster.Settings

// uniqueStmtFingerprintLimit is the limit on number of unique statement
// fingerprints we can store in memory.
uniqueStmtFingerprintLimit *settings.IntSetting

// uniqueTxnFingerprintLimit is the limit on number of unique transaction
// fingerprints we can store in memory.
uniqueTxnFingerprintLimit *settings.IntSetting

mu struct {
syncutil.Mutex

Expand All @@ -50,15 +41,8 @@ type SQLStats struct {
apps map[string]*ssmemstorage.Container
}

atomic struct {
// uniqueStmtFingerprintCount is the number of unique statement fingerprints
// we are storing in memory.
uniqueStmtFingerprintCount int64

// uniqueTxnFingerprintCount is the number of unique transaction fingerprints
// we are storing in memory.
uniqueTxnFingerprintCount int64
}
// Server level counter
atomic *ssmemstorage.SQLStatsAtomicCounters

// flushTarget is a Sink that, when the SQLStats resets at the end of its
// reset interval, the SQLStats will dump all of the stats into if it is not
Expand Down Expand Up @@ -93,14 +77,16 @@ func newSQLStats(
st,
)
s := &SQLStats{
st: st,
uniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit,
uniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit,
flushTarget: flushTarget,
knobs: knobs,
insights: insightsWriter,
latencyInformation: latencyInformation,
st: st,
flushTarget: flushTarget,
knobs: knobs,
insights: insightsWriter,
latencyInformation: latencyInformation,
}
s.atomic = ssmemstorage.NewSQLStatsAtomicCounters(
st,
uniqueStmtFingerprintLimit,
uniqueTxnFingerprintLimit)
s.mu.apps = make(map[string]*ssmemstorage.Container)
s.mu.mon = monitor
s.mu.mon.StartNoReserved(context.Background(), parentMon)
Expand All @@ -110,7 +96,7 @@ func newSQLStats(
// GetTotalFingerprintCount returns total number of unique statement and
// transaction fingerprints stored in the current SQLStats.
func (s *SQLStats) GetTotalFingerprintCount() int64 {
return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount)
return s.atomic.GetTotalFingerprintCount()
}

// GetTotalFingerprintBytes returns the total amount of bytes currently
Expand All @@ -130,10 +116,7 @@ func (s *SQLStats) getStatsForApplication(appName string) *ssmemstorage.Containe
}
a := ssmemstorage.New(
s.st,
s.uniqueStmtFingerprintLimit,
s.uniqueTxnFingerprintLimit,
&s.atomic.uniqueStmtFingerprintCount,
&s.atomic.uniqueTxnFingerprintCount,
s.atomic,
s.mu.mon,
appName,
s.knobs,
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/sqlstats/sslocal/sslocal_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (s *SQLStats) GetApplicationStats(appName string, internal bool) sqlstats.A
}
a := ssmemstorage.New(
s.st,
s.uniqueStmtFingerprintLimit,
s.uniqueTxnFingerprintLimit,
&s.atomic.uniqueStmtFingerprintCount,
&s.atomic.uniqueTxnFingerprintCount,
s.atomic,
s.mu.mon,
appName,
s.knobs,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// StatsCollector is used to collect statement and transaction statistics
Expand Down Expand Up @@ -106,8 +105,9 @@ func (s *StatsCollector) EndTransaction(
s.ApplicationStats,
)

// Avoid taking locks if no stats are discarded.
if discardedStats > 0 {
log.Warningf(ctx, "%d statement statistics discarded due to memory limit", discardedStats)
s.flushTarget.MaybeLogDiscardMessage(ctx)
}

s.ApplicationStats.Free(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssmemstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "ssmemstorage",
srcs = [
"ss_mem_counter.go",
"ss_mem_iterator.go",
"ss_mem_storage.go",
"ss_mem_writer.go",
Expand Down
Loading

0 comments on commit db64500

Please sign in to comment.