Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

July2024/improve gsfa perf #124

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
11 changes: 6 additions & 5 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/allegro/bigcache/v3"
"github.com/fsnotify/fsnotify"
hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache"
"github.com/rpcpool/yellowstone-faithful/metrics"
splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher"
"github.com/ryanuber/go-glob"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -202,13 +203,13 @@ func newCmd_rpc() *cli.Command {
return nil
}()
if err != nil {
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0)
klog.Error(err)
numFailed.Add(1)
// NOTE: DO NOT return the error here, as we want to continue loading other epochs
return nil
}
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1)
numSucceeded.Add(1)
return nil
})
Expand Down Expand Up @@ -275,7 +276,7 @@ func newCmd_rpc() *cli.Command {
return
}
klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
}
case fsnotify.Create:
{
Expand All @@ -298,7 +299,7 @@ func newCmd_rpc() *cli.Command {
return
}
klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
}
case fsnotify.Remove:
{
Expand All @@ -310,7 +311,7 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
}
klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0)
}
case fsnotify.Rename:
klog.V(3).Infof("File %q was renamed; do nothing", event.Name)
Expand Down
83 changes: 74 additions & 9 deletions cmd-x-index-gsfa.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/rpcpool/yellowstone-faithful/indexmeta"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/third_party/solana_proto/confirmed_block"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func newCmd_Index_gsfa() *cli.Command {
},
&cli.StringFlag{
Name: "tmp-dir",
Usage: "temporary directory to use for storing intermediate files",
Usage: "temporary directory to use for storing intermediate files; WILL BE DELETED",
Value: os.TempDir(),
},
},
Expand Down Expand Up @@ -137,6 +139,10 @@ func newCmd_Index_gsfa() *cli.Command {
return fmt.Errorf("failed to add network to sig_exists index metadata: %w", err)
}
tmpDir := c.String("tmp-dir")
tmpDir = filepath.Join(tmpDir, fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano()))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return fmt.Errorf("failed to create tmp dir: %w", err)
}
indexW, err := gsfa.NewGsfaWriter(
gsfaIndexDir,
meta,
Expand Down Expand Up @@ -218,12 +224,17 @@ func newCmd_Index_gsfa() *cli.Command {
for ii := range transactions {
txWithInfo := transactions[ii]
numProcessedTransactions.Add(1)
accountKeys := txWithInfo.Transaction.Message.AccountKeys
if txWithInfo.Metadata != nil {
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedReadonlyAddresses)...)
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedWritableAddresses)...)
}
err = indexW.Push(
txWithInfo.Offset,
txWithInfo.Length,
txWithInfo.Slot,
txWithInfo.Blocktime,
txWithInfo.Transaction.Message.AccountKeys,
accountKeys,
)
if err != nil {
klog.Exitf("Error while pushing to gsfa index: %s", err)
Expand Down Expand Up @@ -270,27 +281,80 @@ func objectsToTransactions(
objects []accum.ObjectWithMetadata,
) ([]*TransactionWithSlot, error) {
transactions := make([]*TransactionWithSlot, 0, len(objects))
dataBlocks := make([]accum.ObjectWithMetadata, 0)
for _, object := range objects {
// check if the object is a transaction:
kind := iplddecoders.Kind(object.ObjectData[1])
if kind == iplddecoders.KindDataFrame {
dataBlocks = append(dataBlocks, object)
continue
}
if kind != iplddecoders.KindTransaction {
continue
}
decoded, err := iplddecoders.DecodeTransaction(object.ObjectData)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex %s: %w", object.Cid, err)
}
tws := &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
}
if total, ok := decoded.Metadata.GetTotal(); !ok || total == 1 {
completeBuffer := decoded.Metadata.Bytes()
if ha, ok := decoded.Metadata.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
return nil, fmt.Errorf("failed to verify metadata hash: %w", err)
}
}
if len(completeBuffer) > 0 {
uncompressedMeta, err := decompressZstd(completeBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
} else {
metaBuffer, err := loadDataFromDataFrames(&decoded.Metadata, func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) {
for _, dataBlock := range dataBlocks {
if dataBlock.Cid == wantedCid {
df, err := iplddecoders.DecodeDataFrame(dataBlock.ObjectData)
if err != nil {
return nil, err
}
return df, nil
}
}
return nil, fmt.Errorf("dataframe not found")
})
if err != nil {
return nil, fmt.Errorf("failed to load metadata: %w", err)
}
// reset dataBlocks:
dataBlocks = dataBlocks[:0]
if len(metaBuffer) > 0 {
uncompressedMeta, err := decompressZstd(metaBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
}
tx, err := decoded.GetSolanaTransaction()
if err != nil {
return nil, fmt.Errorf("error while getting solana transaction from object %s: %w", object.Cid, err)
}
transactions = append(transactions, &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
Transaction: *tx,
})
tws.Transaction = *tx
transactions = append(transactions, tws)
}
return transactions, nil
}
Expand All @@ -311,4 +375,5 @@ type TransactionWithSlot struct {
Slot uint64
Blocktime uint64
Transaction solana.Transaction
Metadata *confirmed_block.TransactionStatusMeta
}
49 changes: 30 additions & 19 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type GsfaWriter struct {
mu sync.Mutex
indexRootDir string
popRank *rollingRankOfTopPerformers // top pubkeys by flush count
offsets *hashmap.Map[solana.PublicKey, [2]uint64]
ll *linkedlog.LinkedLog
man *manifest.Manifest
Expand Down Expand Up @@ -61,6 +62,7 @@ func NewGsfaWriter(
ctx, cancel := context.WithCancel(context.Background())
index := &GsfaWriter{
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
ctx: ctx,
Expand Down Expand Up @@ -120,6 +122,9 @@ func (a *GsfaWriter) fullBufferWriter() {
has := tmpBuf.Has(buffer.Key)
if len(tmpBuf) == howManyBuffersToFlushConcurrently || has {
for _, buf := range tmpBuf {
if len(buf.Values) == 0 {
continue
}
// Write the buffer to the linked log.
klog.V(5).Infof("Flushing %d transactions for key %s", len(buf.Values), buf.Key)
if err := a.flushKVs(buf); err != nil {
Expand All @@ -131,7 +136,7 @@ func (a *GsfaWriter) fullBufferWriter() {
tmpBuf = append(tmpBuf, buffer)
}
case <-time.After(1 * time.Second):
klog.Infof("Read %d buffers from channel", numReadFromChan)
klog.V(5).Infof("Read %d buffers from channel", numReadFromChan)
}
}
}
Expand All @@ -153,39 +158,45 @@ func (a *GsfaWriter) Push(
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
if slot%1000 == 0 {
if a.accum.Len() > 130_000 {
// flush all
klog.Infof("Flushing all %d keys", a.accum.Len())
if slot%500 == 0 && a.accum.Len() > 100_000 {
// flush all
klog.V(4).Infof("Flushing all %d keys", a.accum.Len())

var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()
var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()

for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
a.popRank.purge()

if len(values) < 100 && len(values) > 0 {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
// The objective is to have as big of a batch for each key as possible (max is 1000).
// So we optimize for delaying the flush for the most popular keys (popular=has been flushed a lot of times).
// And we flush the less popular keys, periodically if they haven't seen much activity.

// if this key has less than 100 values and is not in the top list of keys by flush count, then
// it's very likely that this key isn't going to get a lot of values soon
if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
}
}
}
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0)
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
current = append(current, oas)
if len(current) >= itemsPerBatch {
a.popRank.Incr(publicKey, 1)
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: publicKey,
Values: clone(current),
Expand Down
72 changes: 72 additions & 0 deletions gsfa/pop-rank.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package gsfa

import (
"slices"
"sort"

"github.com/gagliardetto/solana-go"
"github.com/tidwall/hashmap"
)

type rollingRankOfTopPerformers struct {
rankListSize int
maxValue int
minValue int
set hashmap.Map[solana.PublicKey, int]
}

func newRollingRankOfTopPerformers(rankListSize int) *rollingRankOfTopPerformers {
return &rollingRankOfTopPerformers{
rankListSize: rankListSize,
}
}

func (r *rollingRankOfTopPerformers) Incr(key solana.PublicKey, delta int) int {
value, ok := r.set.Get(key)
if !ok {
value = 0
}
value = value + delta
r.set.Set(key, value)
if value > r.maxValue {
r.maxValue = value
}
if value < r.minValue {
r.minValue = value
}
return value
}

func (r *rollingRankOfTopPerformers) Get(key solana.PublicKey) (int, bool) {
value, ok := r.set.Get(key)
return value, ok
}

// purge will remove all keys by the lowest values until the rankListSize is reached.
// keys with equivalent values are kept.
func (r *rollingRankOfTopPerformers) purge() {
values := r.set.Values()
sort.Ints(values)
values = slices.Compact(values)
if len(values) <= r.rankListSize {
return
}

// remove the lowest values
for _, value := range values[:len(values)-r.rankListSize] {
for _, key := range r.set.Keys() {
if v, _ := r.set.Get(key); v == value {
r.set.Delete(key)
}
}
}

// update the min and max values
r.minValue = values[len(values)-r.rankListSize]
r.maxValue = values[len(values)-1]
}

func (r *rollingRankOfTopPerformers) has(key solana.PublicKey) bool {
_, ok := r.set.Get(key)
return ok
}
Loading
Loading