From 2af418c164340351ea2a3dec276f2cb5ac88899e Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Fri, 5 Jul 2024 22:52:30 +0200 Subject: [PATCH 01/10] Use custom temp dir for gsfa --- cmd-x-index-gsfa.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd-x-index-gsfa.go b/cmd-x-index-gsfa.go index eb1ef7b3..33bcddc8 100644 --- a/cmd-x-index-gsfa.go +++ b/cmd-x-index-gsfa.go @@ -70,8 +70,8 @@ func newCmd_Index_gsfa() *cli.Command { }, &cli.StringFlag{ Name: "tmp-dir", - Usage: "temporary directory to use for storing intermediate files", - Value: os.TempDir(), + Usage: "temporary directory to use for storing intermediate files; WILL BE DELETED", + Value: filepath.Join(os.TempDir(), fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano())), }, }, Action: func(c *cli.Context) error { @@ -137,6 +137,9 @@ func newCmd_Index_gsfa() *cli.Command { return fmt.Errorf("failed to add network to sig_exists index metadata: %w", err) } tmpDir := c.String("tmp-dir") + if err := os.MkdirAll(tmpDir, 0o755); err != nil { + return fmt.Errorf("failed to create tmp dir: %w", err) + } indexW, err := gsfa.NewGsfaWriter( gsfaIndexDir, meta, From 2763835bc7721e1ac60d3e71799d4303584a5699 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Fri, 5 Jul 2024 23:06:52 +0200 Subject: [PATCH 02/10] Flush based on performance --- gsfa/gsfa-write.go | 47 +++++++++++++++++----------- gsfa/pop-rank.go | 72 +++++++++++++++++++++++++++++++++++++++++++ gsfa/pop-rank_test.go | 55 +++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 gsfa/pop-rank.go create mode 100644 gsfa/pop-rank_test.go diff --git a/gsfa/gsfa-write.go b/gsfa/gsfa-write.go index 640fa64f..13222122 100644 --- a/gsfa/gsfa-write.go +++ b/gsfa/gsfa-write.go @@ -23,6 +23,7 @@ import ( type GsfaWriter struct { mu sync.Mutex indexRootDir string + popRank *rollingRankOfTopPerformers // top pubkeys by flush count offsets *hashmap.Map[solana.PublicKey, [2]uint64] ll *linkedlog.LinkedLog man *manifest.Manifest @@ -61,6 +62,7 @@ func NewGsfaWriter( ctx, cancel := context.WithCancel(context.Background()) index := &GsfaWriter{ fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable + popRank: newRollingRankOfTopPerformers(10_000), offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)), accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)), ctx: ctx, @@ -120,6 +122,9 @@ func (a *GsfaWriter) fullBufferWriter() { has := tmpBuf.Has(buffer.Key) if len(tmpBuf) == howManyBuffersToFlushConcurrently || has { for _, buf := range tmpBuf { + if len(buf.Values) == 0 { + continue + } // Write the buffer to the linked log. klog.V(5).Infof("Flushing %d transactions for key %s", len(buf.Values), buf.Key) if err := a.flushKVs(buf); err != nil { @@ -131,7 +136,7 @@ func (a *GsfaWriter) fullBufferWriter() { tmpBuf = append(tmpBuf, buffer) } case <-time.After(1 * time.Second): - klog.Infof("Read %d buffers from channel", numReadFromChan) + klog.V(5).Infof("Read %d buffers from channel", numReadFromChan) } } } @@ -153,27 +158,32 @@ func (a *GsfaWriter) Push( } publicKeys = publicKeys.Dedupe() publicKeys.Sort() - if slot%1000 == 0 { - if a.accum.Len() > 130_000 { - // flush all - klog.Infof("Flushing all %d keys", a.accum.Len()) + if slot%500 == 0 && a.accum.Len() > 100_000 { + // flush all + klog.V(4).Infof("Flushing all %d keys", a.accum.Len()) - var keys solana.PublicKeySlice = a.accum.Keys() - keys.Sort() + var keys solana.PublicKeySlice = a.accum.Keys() + keys.Sort() - for iii := range keys { - key := keys[iii] - values, _ := a.accum.Get(key) + a.popRank.purge() - if len(values) < 100 && len(values) > 0 { - if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{ - Key: key, - Values: values, - }); err != nil { - return err - } - a.accum.Delete(key) + for iii := range keys { + key := keys[iii] + values, _ := a.accum.Get(key) + // The objective is to have as big of a batch for each key as possible (max is 1000). + // So we optimize for delaying the flush for the most popular keys (popular=has been flushed a lot of times). + // And we flush the less popular keys, periodically if they haven't seen much activity. + + // if this key has less than 100 values and is not in the top list of keys by flush count, then + // it's very likely that this key isn't going to get a lot of values soon + if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) { + if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{ + Key: key, + Values: values, + }); err != nil { + return err } + a.accum.Delete(key) } } } @@ -186,6 +196,7 @@ func (a *GsfaWriter) Push( } else { current = append(current, oas) if len(current) >= itemsPerBatch { + a.popRank.Incr(publicKey, 1) a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{ Key: publicKey, Values: clone(current), diff --git a/gsfa/pop-rank.go b/gsfa/pop-rank.go new file mode 100644 index 00000000..92362b80 --- /dev/null +++ b/gsfa/pop-rank.go @@ -0,0 +1,72 @@ +package gsfa + +import ( + "slices" + "sort" + + "github.com/gagliardetto/solana-go" + "github.com/tidwall/hashmap" +) + +type rollingRankOfTopPerformers struct { + rankListSize int + maxValue int + minValue int + set hashmap.Map[solana.PublicKey, int] +} + +func newRollingRankOfTopPerformers(rankListSize int) *rollingRankOfTopPerformers { + return &rollingRankOfTopPerformers{ + rankListSize: rankListSize, + } +} + +func (r *rollingRankOfTopPerformers) Incr(key solana.PublicKey, delta int) int { + value, ok := r.set.Get(key) + if !ok { + value = 0 + } + value = value + delta + r.set.Set(key, value) + if value > r.maxValue { + r.maxValue = value + } + if value < r.minValue { + r.minValue = value + } + return value +} + +func (r *rollingRankOfTopPerformers) Get(key solana.PublicKey) (int, bool) { + value, ok := r.set.Get(key) + return value, ok +} + +// purge will remove all keys by the lowest values until the rankListSize is reached. +// keys with equivalent values are kept. +func (r *rollingRankOfTopPerformers) purge() { + values := r.set.Values() + sort.Ints(values) + values = slices.Compact(values) + if len(values) <= r.rankListSize { + return + } + + // remove the lowest values + for _, value := range values[:len(values)-r.rankListSize] { + for _, key := range r.set.Keys() { + if v, _ := r.set.Get(key); v == value { + r.set.Delete(key) + } + } + } + + // update the min and max values + r.minValue = values[len(values)-r.rankListSize] + r.maxValue = values[len(values)-1] +} + +func (r *rollingRankOfTopPerformers) has(key solana.PublicKey) bool { + _, ok := r.set.Get(key) + return ok +} diff --git a/gsfa/pop-rank_test.go b/gsfa/pop-rank_test.go new file mode 100644 index 00000000..f90c49e4 --- /dev/null +++ b/gsfa/pop-rank_test.go @@ -0,0 +1,55 @@ +package gsfa + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/require" +) + +func TestPopRank(t *testing.T) { + // Test the rollingRankOfTopPerformers type: + { + // Create a new rollingRankOfTopPerformers: + r := newRollingRankOfTopPerformers(5) + if r == nil { + t.Fatal("expected non-nil rollingRankOfTopPerformers") + } + // Test the Incr method: + { + key := solana.SysVarRentPubkey + delta := 1 + value := r.Incr(key, delta) + require.Equal(t, 1, value) + } + // Test the purge method: + { + r.purge() + // the value should still be 1 + value, ok := r.Get(solana.SysVarRentPubkey) + require.True(t, ok) + require.Equal(t, 1, value) + } + { + // now add a few more values: + r.Incr(solana.SysVarClockPubkey, 6) + r.Incr(solana.SysVarEpochSchedulePubkey, 5) + r.Incr(solana.SysVarFeesPubkey, 4) + r.Incr(solana.SysVarInstructionsPubkey, 3) + r.Incr(solana.SysVarRewardsPubkey, 2) + + // there should be 6 values now + require.Equal(t, 6, r.set.Len()) + + // purge should remove the lowest values + r.purge() + + // there should be 5 values now (equivalent values are kept) + require.Equal(t, 5, r.set.Len()) + + // the lowest value should be 2 + require.Equal(t, 2, r.minValue) + require.Equal(t, 6, r.maxValue) + } + } +} From 9b1b4b7a6165a31747d0aebc79875b118b4d211c Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Fri, 5 Jul 2024 23:14:19 +0200 Subject: [PATCH 03/10] Fix tmpDir --- cmd-x-index-gsfa.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd-x-index-gsfa.go b/cmd-x-index-gsfa.go index 33bcddc8..ffde6021 100644 --- a/cmd-x-index-gsfa.go +++ b/cmd-x-index-gsfa.go @@ -71,7 +71,7 @@ func newCmd_Index_gsfa() *cli.Command { &cli.StringFlag{ Name: "tmp-dir", Usage: "temporary directory to use for storing intermediate files; WILL BE DELETED", - Value: filepath.Join(os.TempDir(), fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano())), + Value: os.TempDir(), }, }, Action: func(c *cli.Context) error { @@ -137,6 +137,7 @@ func newCmd_Index_gsfa() *cli.Command { return fmt.Errorf("failed to add network to sig_exists index metadata: %w", err) } tmpDir := c.String("tmp-dir") + tmpDir = filepath.Join(tmpDir, fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano())) if err := os.MkdirAll(tmpDir, 0o755); err != nil { return fmt.Errorf("failed to create tmp dir: %w", err) } From f327a7393ae4998a816693327693335dbb0426cc Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Fri, 5 Jul 2024 23:19:18 +0200 Subject: [PATCH 04/10] Max is 1000 --- gsfa/gsfa-write.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gsfa/gsfa-write.go b/gsfa/gsfa-write.go index 13222122..136e5563 100644 --- a/gsfa/gsfa-write.go +++ b/gsfa/gsfa-write.go @@ -190,7 +190,7 @@ func (a *GsfaWriter) Push( for _, publicKey := range publicKeys { current, ok := a.accum.Get(publicKey) if !ok { - current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0) + current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch) current = append(current, oas) a.accum.Set(publicKey, current) } else { From a67493cad54cfe3e2228ab4b91294284d57072eb Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 11 Jul 2024 22:21:30 +0200 Subject: [PATCH 05/10] Remove /health and /metrics req logging; closes #127 --- multiepoch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/multiepoch.go b/multiepoch.go index 1a3a27a7..0ec03d3d 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -276,11 +276,15 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } klog.Infof("Will proxy unhandled RPC methods to %q", addr) } + metricsHandler := fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) return func(reqCtx *fasthttp.RequestCtx) { startedAt := time.Now() reqID := randomRequestID() var method string = "" defer func() { + if method == "/metrics" || method == "/health" { + return + } klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), time.Since(startedAt)) metrics_statusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() metrics_responseTimeHistogram.WithLabelValues(sanitizeMethod(method)).Observe(time.Since(startedAt).Seconds()) @@ -289,8 +293,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx // handle the /metrics endpoint if string(reqCtx.Path()) == "/metrics" { method = "/metrics" - handler := fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) - handler(reqCtx) + metricsHandler(reqCtx) return } { From 20e5219f90e1bc79ae3096119275cb92a0d1cc5a Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 11 Jul 2024 22:26:55 +0200 Subject: [PATCH 06/10] Move metrics to metrics package --- cmd-rpc.go | 11 ++-- metrics.go | 147 --------------------------------------------- metrics/metrics.go | 71 ++++++++++++++++++++++ multiepoch.go | 75 ++++++++++++++++++++--- 4 files changed, 144 insertions(+), 160 deletions(-) delete mode 100644 metrics.go create mode 100644 metrics/metrics.go diff --git a/cmd-rpc.go b/cmd-rpc.go index 56488562..36fa8129 100644 --- a/cmd-rpc.go +++ b/cmd-rpc.go @@ -15,6 +15,7 @@ import ( "github.com/allegro/bigcache/v3" "github.com/fsnotify/fsnotify" hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache" + "github.com/rpcpool/yellowstone-faithful/metrics" splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "github.com/ryanuber/go-glob" "github.com/urfave/cli/v2" @@ -202,13 +203,13 @@ func newCmd_rpc() *cli.Command { return nil }() if err != nil { - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0) klog.Error(err) numFailed.Add(1) // NOTE: DO NOT return the error here, as we want to continue loading other epochs return nil } - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1) numSucceeded.Add(1) return nil }) @@ -275,7 +276,7 @@ func newCmd_rpc() *cli.Command { return } klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) } case fsnotify.Create: { @@ -298,7 +299,7 @@ func newCmd_rpc() *cli.Command { return } klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) } case fsnotify.Remove: { @@ -310,7 +311,7 @@ func newCmd_rpc() *cli.Command { klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) } klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0) } case fsnotify.Rename: klog.V(3).Infof("File %q was renamed; do nothing", event.Name) diff --git a/metrics.go b/metrics.go deleted file mode 100644 index 2194023c..00000000 --- a/metrics.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "runtime/debug" - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -// - RPC requests by method (counter) -// - Epochs available epoch_available{epoch="200"} = 1 -// - status_code -// - miner ids -// - source type (ipfs/bitwarden/s3/etc) -// - response time histogram - -func init() { - prometheus.MustRegister(metrics_RpcRequestByMethod) - prometheus.MustRegister(metrics_epochsAvailable) - prometheus.MustRegister(metrics_statusCode) - prometheus.MustRegister(metrics_methodToCode) - prometheus.MustRegister(metrics_methodToSuccessOrFailure) - prometheus.MustRegister(metrics_methodToNumProxied) - prometheus.MustRegister(metrics_responseTimeHistogram) -} - -var metrics_RpcRequestByMethod = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "rpc_requests_by_method", - Help: "RPC requests by method", - }, - []string{"method"}, -) - -var metrics_epochsAvailable = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "epoch_available", - Help: "Epochs available", - }, - []string{"epoch"}, -) - -var metrics_statusCode = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "status_code", - Help: "Status code", - }, - []string{"code"}, -) - -var metrics_methodToCode = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_code", - Help: "Method to code", - }, - []string{"method", "code"}, -) - -var metrics_methodToSuccessOrFailure = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_success_or_failure", - Help: "Method to success or failure", - }, - []string{"method", "status"}, -) - -var metrics_methodToNumProxied = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_num_proxied", - Help: "Method to num proxied", - }, - []string{"method"}, -) - -var metrics_responseTimeHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "response_time_histogram", - Help: "Response time histogram", - }, - []string{"method"}, -) - -// - Version information of this binary -var metrics_version = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "version", - Help: "Version information of this binary", - }, - []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, -) - -func init() { - // Add an entry to the metric with the version information. - labeledValues := map[string]string{ - "started_at": StartedAt.Format(time.RFC3339), - "tag": GitTag, - "commit": GitCommit, - "compiler": "", - "goarch": "", - "goos": "", - "goamd64": "", - "vcs": "", - "vcs_revision": "", - "vcs_time": "", - "vcs_modified": "", - } - if info, ok := debug.ReadBuildInfo(); ok { - for _, setting := range info.Settings { - if isAnyOf(setting.Key, - "-compiler", - "GOARCH", - "GOOS", - "GOAMD64", - "vcs", - "vcs.revision", - "vcs.time", - "vcs.modified", - ) { - switch setting.Key { - case "-compiler": - labeledValues["compiler"] = setting.Value - case "GOARCH": - labeledValues["goarch"] = setting.Value - case "GOOS": - labeledValues["goos"] = setting.Value - case "GOAMD64": - labeledValues["goamd64"] = setting.Value - case "vcs": - labeledValues["vcs"] = setting.Value - case "vcs.revision": - labeledValues["vcs_revision"] = setting.Value - case "vcs.time": - labeledValues["vcs_time"] = setting.Value - case "vcs.modified": - labeledValues["vcs_modified"] = setting.Value - } - } - } - } - metrics_version.With(labeledValues).Set(1) -} - -var StartedAt = time.Now() - -func GetUptime() time.Duration { - return time.Since(StartedAt) -} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..ebd90388 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,71 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var RpcRequestByMethod = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "rpc_requests_by_method", + Help: "RPC requests by method", + }, + []string{"method"}, +) + +var EpochsAvailable = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "epoch_available", + Help: "Epochs available", + }, + []string{"epoch"}, +) + +var StatusCode = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "status_code", + Help: "Status code", + }, + []string{"code"}, +) + +var MethodToCode = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_code", + Help: "Method to code", + }, + []string{"method", "code"}, +) + +var MethodToSuccessOrFailure = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_success_or_failure", + Help: "Method to success or failure", + }, + []string{"method", "status"}, +) + +var MethodToNumProxied = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_num_proxied", + Help: "Method to num proxied", + }, + []string{"method"}, +) + +var ResponseTimeHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "response_time_histogram", + Help: "Response time histogram", + }, + []string{"method"}, +) + +// - Version information of this binary +var Version = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "version", + Help: "Version information of this binary", + }, + []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, +) diff --git a/multiepoch.go b/multiepoch.go index 0ec03d3d..8c904fcb 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "runtime/debug" "sort" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-reuseport" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/metrics" old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc" "github.com/sourcegraph/jsonrpc2" "github.com/valyala/fasthttp" @@ -286,8 +288,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx return } klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), time.Since(startedAt)) - metrics_statusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() - metrics_responseTimeHistogram.WithLabelValues(sanitizeMethod(method)).Observe(time.Since(startedAt).Seconds()) + metrics.StatusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() + metrics.ResponseTimeHistogram.WithLabelValues(sanitizeMethod(method)).Observe(time.Since(startedAt).Seconds()) }() { // handle the /metrics endpoint @@ -353,9 +355,9 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx return } method = rpcRequest.Method - metrics_RpcRequestByMethod.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.RpcRequestByMethod.WithLabelValues(sanitizeMethod(method)).Inc() defer func() { - metrics_methodToCode.WithLabelValues(sanitizeMethod(method), fmt.Sprint(reqCtx.Response.StatusCode())).Inc() + metrics.MethodToCode.WithLabelValues(sanitizeMethod(method), fmt.Sprint(reqCtx.Response.StatusCode())).Inc() }() klog.V(2).Infof("[%s] method=%q", reqID, sanitizeMethod(method)) @@ -373,7 +375,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx body, reqID, ) - metrics_methodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.MethodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() return } @@ -406,7 +408,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx klog.Errorf("[%s] failed to handle %q: %v", reqID, sanitizeMethod(method), err) } if errorResp != nil { - metrics_methodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "failure").Inc() + metrics.MethodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "failure").Inc() if proxy != nil && lsConf.ProxyConfig.ProxyFailedRequests { klog.Warningf("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) // proxy the request to the target @@ -419,7 +421,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx body, reqID, ) - metrics_methodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.MethodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() return } else { if errors.Is(err, ErrNotFound) { @@ -439,7 +441,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } return } - metrics_methodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "success").Inc() + metrics.MethodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "success").Inc() } } @@ -542,3 +544,60 @@ func (ser *MultiEpoch) handleRequest(ctx context.Context, conn *requestContext, }, fmt.Errorf("method not found") } } + +func init() { + // Add an entry to the metric with the version information. + labeledValues := map[string]string{ + "started_at": StartedAt.Format(time.RFC3339), + "tag": GitTag, + "commit": GitCommit, + "compiler": "", + "goarch": "", + "goos": "", + "goamd64": "", + "vcs": "", + "vcs_revision": "", + "vcs_time": "", + "vcs_modified": "", + } + if info, ok := debug.ReadBuildInfo(); ok { + for _, setting := range info.Settings { + if isAnyOf(setting.Key, + "-compiler", + "GOARCH", + "GOOS", + "GOAMD64", + "vcs", + "vcs.revision", + "vcs.time", + "vcs.modified", + ) { + switch setting.Key { + case "-compiler": + labeledValues["compiler"] = setting.Value + case "GOARCH": + labeledValues["goarch"] = setting.Value + case "GOOS": + labeledValues["goos"] = setting.Value + case "GOAMD64": + labeledValues["goamd64"] = setting.Value + case "vcs": + labeledValues["vcs"] = setting.Value + case "vcs.revision": + labeledValues["vcs_revision"] = setting.Value + case "vcs.time": + labeledValues["vcs_time"] = setting.Value + case "vcs.modified": + labeledValues["vcs_modified"] = setting.Value + } + } + } + } + metrics.Version.With(labeledValues).Set(1) +} + +var StartedAt = time.Now() + +func GetUptime() time.Duration { + return time.Since(StartedAt) +} From fc8ae64f9a2802c062e6510a3cbdb16fee3653a2 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 11 Jul 2024 22:46:34 +0200 Subject: [PATCH 07/10] Prometheus for index and car lookups; closes #126 --- http-range.go | 14 ++++++++++++++ metrics/metrics.go | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/http-range.go b/http-range.go index 2b948b8d..8c47df4a 100644 --- a/http-range.go +++ b/http-range.go @@ -2,9 +2,11 @@ package main import ( "io" + "path/filepath" "strings" "time" + "github.com/rpcpool/yellowstone-faithful/metrics" "k8s.io/klog/v2" ) @@ -43,6 +45,15 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { // if has suffix .index, then it's an index file if strings.HasSuffix(r.name, ".index") { prefix = icon + azureBG("[READ-INDEX]") + // get the index name, which is the part before the .index suffix, after the last . + indexName := strings.TrimSuffix(r.name, ".index") + // split the index name by . and get the last part + byDot := strings.Split(indexName, ".") + if len(byDot) > 0 { + indexName = byDot[len(byDot)-1] + } + // TODO: distinguish between remote and local index reads + metrics.IndexLookups.WithLabelValues(indexName).Observe(float64(took.Seconds())) } // if has suffix .car, then it's a car file if strings.HasSuffix(r.name, ".car") || r.isSplitCar { @@ -51,6 +62,9 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { } else { prefix = icon + purpleBG("[READ-CAR]") } + carName := filepath.Base(r.name) + // TODO: distinguish between remote and local index reads + metrics.CarLookups.WithLabelValues(carName).Observe(float64(took.Seconds())) } klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took) } diff --git a/metrics/metrics.go b/metrics/metrics.go index ebd90388..b19eb233 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -69,3 +69,21 @@ var Version = promauto.NewGaugeVec( }, []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, ) + +var IndexLookups = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "index_lookups", + Help: "Index lookups", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"index_type"}, +) + +var CarLookups = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "car_lookups", + Help: "Car lookups", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"car"}, +) From b44d0152541e4b1f6280c857f547cff2fb36eb14 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 11 Jul 2024 22:54:37 +0200 Subject: [PATCH 08/10] Cleanup metrics; closes #128 --- http-range.go | 4 ++-- metrics/metrics.go | 29 +++++++++++++++-------------- multiepoch.go | 5 +++-- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/http-range.go b/http-range.go index 8c47df4a..8eebcf2b 100644 --- a/http-range.go +++ b/http-range.go @@ -53,7 +53,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { indexName = byDot[len(byDot)-1] } // TODO: distinguish between remote and local index reads - metrics.IndexLookups.WithLabelValues(indexName).Observe(float64(took.Seconds())) + metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds())) } // if has suffix .car, then it's a car file if strings.HasSuffix(r.name, ".car") || r.isSplitCar { @@ -64,7 +64,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { } carName := filepath.Base(r.name) // TODO: distinguish between remote and local index reads - metrics.CarLookups.WithLabelValues(carName).Observe(float64(took.Seconds())) + metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds())) } klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took) } diff --git a/metrics/metrics.go b/metrics/metrics.go index b19eb233..c0e6a293 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -53,14 +53,6 @@ var MethodToNumProxied = promauto.NewCounterVec( []string{"method"}, ) -var ResponseTimeHistogram = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "response_time_histogram", - Help: "Response time histogram", - }, - []string{"method"}, -) - // - Version information of this binary var Version = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -70,20 +62,29 @@ var Version = promauto.NewGaugeVec( []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, ) -var IndexLookups = promauto.NewHistogramVec( +var IndexLookupHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "index_lookups", - Help: "Index lookups", + Name: "index_lookup_latency_histogram", + Help: "Index lookup latency", Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), }, []string{"index_type"}, ) -var CarLookups = promauto.NewHistogramVec( +var CarLookupHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "car_lookups", - Help: "Car lookups", + Name: "car_lookup_latency_histogram", + Help: "Car lookup latency", Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), }, []string{"car"}, ) + +var RpcResponseLatencyHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rpc_response_latency_histogram", + Help: "RPC response latency histogram", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"rpc_method"}, +) diff --git a/multiepoch.go b/multiepoch.go index 8c904fcb..344a92da 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -287,9 +287,10 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx if method == "/metrics" || method == "/health" { return } - klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), time.Since(startedAt)) + took := time.Since(startedAt) + klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), took) metrics.StatusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() - metrics.ResponseTimeHistogram.WithLabelValues(sanitizeMethod(method)).Observe(time.Since(startedAt).Seconds()) + metrics.RpcResponseLatencyHistogram.WithLabelValues(sanitizeMethod(method)).Observe(took.Seconds()) }() { // handle the /metrics endpoint From c9569a8127907bffd27ce7d942990bf5b000304c Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 11 Jul 2024 23:39:27 +0200 Subject: [PATCH 09/10] gsfa: include pubkeys from address lookup tables --- cmd-x-index-gsfa.go | 77 ++++++++++++++++++++++++++++++++++++++++----- request-response.go | 10 +++--- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/cmd-x-index-gsfa.go b/cmd-x-index-gsfa.go index ffde6021..26f0556a 100644 --- a/cmd-x-index-gsfa.go +++ b/cmd-x-index-gsfa.go @@ -21,6 +21,8 @@ import ( "github.com/rpcpool/yellowstone-faithful/indexmeta" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" + solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers" + "github.com/rpcpool/yellowstone-faithful/third_party/solana_proto/confirmed_block" "github.com/urfave/cli/v2" "k8s.io/klog/v2" ) @@ -222,12 +224,17 @@ func newCmd_Index_gsfa() *cli.Command { for ii := range transactions { txWithInfo := transactions[ii] numProcessedTransactions.Add(1) + accountKeys := txWithInfo.Transaction.Message.AccountKeys + if txWithInfo.Metadata != nil { + accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedReadonlyAddresses)...) + accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedWritableAddresses)...) + } err = indexW.Push( txWithInfo.Offset, txWithInfo.Length, txWithInfo.Slot, txWithInfo.Blocktime, - txWithInfo.Transaction.Message.AccountKeys, + accountKeys, ) if err != nil { klog.Exitf("Error while pushing to gsfa index: %s", err) @@ -274,9 +281,14 @@ func objectsToTransactions( objects []accum.ObjectWithMetadata, ) ([]*TransactionWithSlot, error) { transactions := make([]*TransactionWithSlot, 0, len(objects)) + dataBlocks := make([]accum.ObjectWithMetadata, 0) for _, object := range objects { // check if the object is a transaction: kind := iplddecoders.Kind(object.ObjectData[1]) + if kind == iplddecoders.KindDataFrame { + dataBlocks = append(dataBlocks, object) + continue + } if kind != iplddecoders.KindTransaction { continue } @@ -284,17 +296,65 @@ func objectsToTransactions( if err != nil { return nil, fmt.Errorf("error while decoding transaction from nodex %s: %w", object.Cid, err) } + tws := &TransactionWithSlot{ + Offset: object.Offset, + Length: object.SectionLength, + Slot: uint64(decoded.Slot), + Blocktime: uint64(block.Meta.Blocktime), + } + if total, ok := decoded.Metadata.GetTotal(); !ok || total == 1 { + completeBuffer := decoded.Metadata.Bytes() + if ha, ok := decoded.Metadata.GetHash(); ok { + err := ipldbindcode.VerifyHash(completeBuffer, ha) + if err != nil { + return nil, fmt.Errorf("failed to verify metadata hash: %w", err) + } + } + if len(completeBuffer) > 0 { + uncompressedMeta, err := decompressZstd(completeBuffer) + if err != nil { + return nil, fmt.Errorf("failed to decompress metadata: %w", err) + } + status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta) + if err == nil { + tws.Metadata = status + } + } + } else { + metaBuffer, err := loadDataFromDataFrames(&decoded.Metadata, func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) { + for _, dataBlock := range dataBlocks { + if dataBlock.Cid == wantedCid { + df, err := iplddecoders.DecodeDataFrame(dataBlock.ObjectData) + if err != nil { + return nil, err + } + return df, nil + } + } + return nil, fmt.Errorf("dataframe not found") + }) + if err != nil { + return nil, fmt.Errorf("failed to load metadata: %w", err) + } + // reset dataBlocks: + dataBlocks = dataBlocks[:0] + if len(metaBuffer) > 0 { + uncompressedMeta, err := decompressZstd(metaBuffer) + if err != nil { + return nil, fmt.Errorf("failed to decompress metadata: %w", err) + } + status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta) + if err == nil { + tws.Metadata = status + } + } + } tx, err := decoded.GetSolanaTransaction() if err != nil { return nil, fmt.Errorf("error while getting solana transaction from object %s: %w", object.Cid, err) } - transactions = append(transactions, &TransactionWithSlot{ - Offset: object.Offset, - Length: object.SectionLength, - Slot: uint64(decoded.Slot), - Blocktime: uint64(block.Meta.Blocktime), - Transaction: *tx, - }) + tws.Transaction = *tx + transactions = append(transactions, tws) } return transactions, nil } @@ -315,4 +375,5 @@ type TransactionWithSlot struct { Slot uint64 Blocktime uint64 Transaction solana.Transaction + Metadata *confirmed_block.TransactionStatusMeta } diff --git a/request-response.go b/request-response.go index 97c5e823..b592658f 100644 --- a/request-response.go +++ b/request-response.go @@ -410,10 +410,10 @@ func compiledInstructionsToJsonParsed( case *confirmed_block.TransactionStatusMeta: return &txstatus.LoadedAddresses{ Writable: func() []solana.PublicKey { - return byteSlicesToKeySlices(vv.LoadedWritableAddresses) + return byteSlicesToKeySlice(vv.LoadedWritableAddresses) }(), Readonly: func() []solana.PublicKey { - return byteSlicesToKeySlices(vv.LoadedReadonlyAddresses) + return byteSlicesToKeySlice(vv.LoadedReadonlyAddresses) }(), } default: @@ -471,8 +471,8 @@ func encodeTransactionResponseBasedOnWantedEncoding( if ok { { tables := map[solana.PublicKey]solana.PublicKeySlice{} - writable := byteSlicesToKeySlices(unwrappedMeta.LoadedWritableAddresses) - readonly := byteSlicesToKeySlices(unwrappedMeta.LoadedReadonlyAddresses) + writable := byteSlicesToKeySlice(unwrappedMeta.LoadedWritableAddresses) + readonly := byteSlicesToKeySlice(unwrappedMeta.LoadedReadonlyAddresses) for _, addr := range tx.Message.AddressTableLookups { numTakeWritable := len(addr.WritableIndexes) numTakeReadonly := len(addr.ReadonlyIndexes) @@ -609,7 +609,7 @@ func byeSliceToUint16Slice(in []byte) []uint16 { return out } -func byteSlicesToKeySlices(keys [][]byte) []solana.PublicKey { +func byteSlicesToKeySlice(keys [][]byte) []solana.PublicKey { var out []solana.PublicKey for _, key := range keys { var k solana.PublicKey From 3e4e454e6418156319fa60972204aa39d9000674 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 31 Jul 2024 15:28:59 +0200 Subject: [PATCH 10/10] Miner info: use exponential retry --- split-car-fetcher/miner-info.go | 36 +++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/split-car-fetcher/miner-info.go b/split-car-fetcher/miner-info.go index b9606f9c..a051a9b8 100644 --- a/split-car-fetcher/miner-info.go +++ b/split-car-fetcher/miner-info.go @@ -32,7 +32,8 @@ func NewMinerInfo( ) *MinerInfoCache { minerInfoCache := ttlcache.New[string, *MinerInfo]( ttlcache.WithTTL[string, *MinerInfo](cacheTTL), - ttlcache.WithDisableTouchOnHit[string, *MinerInfo]()) + ttlcache.WithDisableTouchOnHit[string, *MinerInfo](), + ) return &MinerInfoCache{ lotusClient: lotusClient, @@ -47,7 +48,15 @@ func (d *MinerInfoCache) GetProviderInfo(ctx context.Context, provider address.A return file.Value(), nil } - minerInfo, err := (&MinerInfoFetcher{Client: d.lotusClient}).GetProviderInfo(ctx, provider.String()) + ctx, cancel := context.WithTimeout(ctx, d.requestTimeout) + defer cancel() + minerInfo, err := retryExponentialBackoff(ctx, + func() (*MinerInfo, error) { + return (&MinerInfoFetcher{Client: d.lotusClient}).GetProviderInfo(ctx, provider.String()) + }, + time.Second*2, + 5, + ) if err != nil { return nil, err } @@ -59,6 +68,29 @@ type MinerInfoFetcher struct { Client jsonrpc.RPCClient } +func retryExponentialBackoff[T any]( + ctx context.Context, + fn func() (T, error), + startingBackoff time.Duration, + maxRetries int, +) (T, error) { + var err error + var out T + for i := 0; i < maxRetries; i++ { + out, err = fn() + if err == nil { + return out, nil + } + select { + case <-ctx.Done(): + return out, fmt.Errorf("context done: %w; last error: %s", ctx.Err(), err) + case <-time.After(startingBackoff): + startingBackoff *= 2 + } + } + return out, err +} + func (m *MinerInfoFetcher) GetProviderInfo(ctx context.Context, provider string) (*MinerInfo, error) { minerInfo := new(MinerInfo) err := m.Client.CallFor(ctx, minerInfo, "Filecoin.StateMinerInfo", provider, nil)