Skip to content

Commit

Permalink
chore: Add log sampling (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Warehime committed Aug 27, 2024
1 parent f174ff2 commit 7391885
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
3 changes: 2 additions & 1 deletion oracle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (o *OracleImpl) createPriceProvider(ctx context.Context, cfg config.Provide
// Add the provider to the oracle.
o.priceProviders[provider.Name()] = state

// Add the provider name to the message here since we want these to ignore log sampling limits
o.logger.Info(
"created price provider state",
fmt.Sprintf("created %s provider state", provider.Name()),
zap.String("provider", provider.Name()),
zap.Int("num_tickers", len(provider.GetIDs())),
)
Expand Down
10 changes: 6 additions & 4 deletions oracle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func (o *OracleImpl) UpdateProviderState(providerTickers []types.ProviderTicker,
}()
}

// Update the provider's state.
o.logger.Info("updated provider state", zap.String("provider_state", provider.Name()))
// Ignore sampling limits for provider update logs via injecting provider name in message
o.logger.Info(
fmt.Sprintf("updated %s provider state", provider.Name()),
zap.String("provider", provider.Name()),
zap.Int("num_tickers", len(provider.GetIDs())),
)
return state, nil
}

Expand Down Expand Up @@ -99,8 +103,6 @@ func (o *OracleImpl) fetchAllPrices() {

// update the last sync time
o.metrics.AddTick()

o.logger.Info("oracle updated prices", zap.Time("last_sync", o.lastPriceSync), zap.Int("num_prices", len(o.aggregator.GetPrices())))
}

func (o *OracleImpl) fetchPrices(provider *types.PriceProvider) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/log/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package log
import (
"fmt"
"os"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -27,6 +29,8 @@ type Config struct {
MaxAge int
// Compress determines if the rotated log files should be compressed.
Compress bool
// LogSamplePeriod is the duration in which we de-dupe identical log messages.
LogSamplePeriod time.Duration
}

// NewDefaultConfig creates a default configuration for the logger.
Expand All @@ -40,6 +44,7 @@ func NewDefaultConfig() Config {
MaxBackups: 1,
MaxAge: 3, // 3 days
Compress: false,
LogSamplePeriod: 10 * time.Second,
}
}

Expand Down Expand Up @@ -93,6 +98,10 @@ func NewLogger(config Config) *zap.Logger {
} else {
core = stdCore
}
if strings.ToUpper(config.StdOutLogLevel) != zap.DebugLevel.CapitalString() && strings.ToUpper(config.FileOutLogLevel) != zap.DebugLevel.CapitalString() {
// If we're not in debug log level anywhere, filter any logs which have non-unique messages within a 10-second period
core = zapcore.NewSamplerWithOptions(core, config.LogSamplePeriod, 1, 0)
}

return zap.New(
core,
Expand Down
11 changes: 8 additions & 3 deletions pkg/math/oracle/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"math/big"
"sync"

"go.uber.org/zap"

"github.com/skip-mev/connect/v2/oracle"
oraclemetrics "github.com/skip-mev/connect/v2/oracle/metrics"
"github.com/skip-mev/connect/v2/oracle/types"
"github.com/skip-mev/connect/v2/pkg/math"
mmtypes "github.com/skip-mev/connect/v2/x/marketmap/types"
"go.uber.org/zap"
)

var _ oracle.PriceAggregator = &IndexPriceAggregator{}
Expand Down Expand Up @@ -77,6 +76,8 @@ func (m *IndexPriceAggregator) AggregatePrices() {
indexPrices := make(types.Prices)
scaledPrices := make(types.Prices)

var missingPrices []string

for ticker, market := range m.cfg.Markets {
if !market.Ticker.Enabled {
m.logger.Debug("skipping disabled market", zap.Any("market", market))
Expand All @@ -92,7 +93,8 @@ func (m *IndexPriceAggregator) AggregatePrices() {

// We need to have at least the minimum number of providers to calculate the median.
if len(convertedPrices) < int(target.MinProviderCount) { //nolint:gosec
m.logger.Error(
missingPrices = append(missingPrices, ticker)
m.logger.Debug(
"insufficient amount of converted prices",
zap.String("target_ticker", ticker),
zap.Int("num_converted_prices", len(convertedPrices)),
Expand Down Expand Up @@ -127,6 +129,9 @@ func (m *IndexPriceAggregator) AggregatePrices() {
// Update the aggregated data. These prices are going to be used as the index prices the
// next time we calculate prices.
m.logger.Debug("calculated median prices for price feeds", zap.Int("num_prices", len(indexPrices)))
if len(missingPrices) > 0 {
m.logger.Info("failed to calculate prices for price feeds", zap.Strings("missing_prices", missingPrices))
}
m.indexPrices = indexPrices
m.scaledPrices = scaledPrices
}
Expand Down

0 comments on commit 7391885

Please sign in to comment.