Skip to content

Commit

Permalink
Some fixes to reduce memory usage (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Jan 13, 2024
1 parent 0567590 commit 158bdf9
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 160 deletions.
4 changes: 2 additions & 2 deletions cmd/api/cache/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (r *ResponseRecorder) Result() *CacheEntry {

func copyHeaders(src, dst http.Header) {
for k, v := range src {
for _, v := range v {
dst.Set(k, v)
for _, val := range v {
dst.Set(k, val)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions internal/storage/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"time"

pkgTypes "github.com/celenium-io/celestia-indexer/pkg/types"
"github.com/goccy/go-json"
jsoniter "github.com/json-iterator/go"

"github.com/celenium-io/celestia-indexer/internal/storage/types"
"github.com/dipdup-net/indexer-sdk/pkg/storage"
"github.com/uptrace/bun"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

//go:generate mockgen -source=$GOFILE -destination=mock/$GOFILE -package=mock -typed
type IEvent interface {
storage.Table[*Event]
Expand Down Expand Up @@ -50,14 +52,12 @@ func (e Event) Columns() []string {

func (e Event) Flat() []any {
data := []any{
e.Height, e.Time, e.Position, e.Type, e.TxId,
e.Height, e.Time, e.Position, e.Type, e.TxId, nil,
}
if len(data) > 0 {
raw, err := json.MarshalWithOption(e.Data, json.UnorderedMap(), json.DisableNormalizeUTF8())
if len(e.Data) > 0 {
raw, err := json.MarshalToString(e.Data)
if err == nil {
data = append(data, string(raw))
} else {
data = append(data, nil)
data[5] = raw
}
}
return data
Expand Down
7 changes: 2 additions & 5 deletions internal/storage/postgres/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,8 @@ func (tx Transaction) SaveEvents(ctx context.Context, events ...models.Event) er
case len(events) == 0:
return nil
case len(events) < 20:
data := make([]any, len(events))
for i := range events {
data[i] = &events[i]
}
return tx.BulkSave(ctx, data)
_, err := tx.Tx().NewInsert().Model(&events).Exec(ctx)
return err
default:
copiable := make([]storage.Copiable, len(events))
for i := range events {
Expand Down
1 change: 0 additions & 1 deletion pkg/indexer/decode/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func Message(
case *cosmosBankTypes.MsgSend:
d.Msg.Type, d.Msg.Addresses, err = handle.MsgSend(height, typedMsg)
case *cosmosBankTypes.MsgMultiSend:
log.Warn().Msg("MsgMultiSend detected")
d.Msg.Type, d.Msg.Addresses, err = handle.MsgMultiSend(height, typedMsg)

// vesting module
Expand Down
5 changes: 1 addition & 4 deletions pkg/indexer/decode/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ func TestDecodeTx_TxWithMemo(t *testing.T) {
Info: "",
GasWanted: 200000,
GasUsed: 170049,
Events: []nodeTypes.Event{{Type: "coin_received", Attributes: []nodeTypes.EventAttribute{
{Key: "receiver", Value: "celestia1h2kqw44hdq5dwlcvsw8f2l49lkehtf9wp95kth"},
{Key: "amount", Value: "1562utia"},
}}},
Events: []nodeTypes.Event{},
Codespace: "",
}
txData := []byte{10, 252, 1, 10, 225, 1, 10, 42, 47, 99, 111, 115, 109, 111, 115, 46, 115, 116, 97, 107, 105, 110, 103, 46, 118, 49, 98, 101, 116, 97, 49, 46, 77, 115, 103, 66, 101, 103, 105, 110, 82, 101, 100, 101, 108, 101, 103, 97, 116, 101, 18, 178, 1, 10, 47, 99, 101, 108, 101, 115, 116, 105, 97, 49, 100, 97, 118, 122, 52, 48, 107, 97, 116, 57, 51, 116, 52, 57, 108, 106, 114, 107, 109, 107, 108, 53, 117, 113, 104, 113, 113, 52, 53, 101, 48, 116, 101, 100, 103, 102, 56, 97, 18, 54, 99, 101, 108, 101, 115, 116, 105, 97, 118, 97, 108, 111, 112, 101, 114, 49, 114, 102, 108, 117, 116, 107, 51, 101, 117, 119, 56, 100, 99, 119, 97, 101, 104, 120, 119, 117, 103, 99, 109, 57, 112, 101, 119, 107, 100, 110, 53, 54, 120, 106, 108, 104, 50, 54, 26, 54, 99, 101, 108, 101, 115, 116, 105, 97, 118, 97, 108, 111, 112, 101, 114, 49, 100, 97, 118, 122, 52, 48, 107, 97, 116, 57, 51, 116, 52, 57, 108, 106, 114, 107, 109, 107, 108, 53, 117, 113, 104, 113, 113, 52, 53, 101, 48, 116, 117, 106, 50, 115, 51, 109, 34, 15, 10, 4, 117, 116, 105, 97, 18, 7, 49, 48, 48, 48, 48, 48, 48, 18, 22, 116, 101, 115, 116, 32, 117, 105, 32, 114, 101, 100, 101, 108, 101, 103, 97, 116, 101, 32, 116, 120, 32, 18, 103, 10, 80, 10, 70, 10, 31, 47, 99, 111, 115, 109, 111, 115, 46, 99, 114, 121, 112, 116, 111, 46, 115, 101, 99, 112, 50, 53, 54, 107, 49, 46, 80, 117, 98, 75, 101, 121, 18, 35, 10, 33, 2, 205, 82, 66, 173, 172, 164, 110, 151, 162, 183, 151, 111, 80, 96, 191, 38, 188, 141, 208, 175, 86, 52, 254, 146, 134, 204, 43, 40, 79, 127, 106, 1, 18, 4, 10, 2, 8, 127, 24, 39, 18, 19, 10, 13, 10, 4, 117, 116, 105, 97, 18, 5, 55, 50, 52, 51, 49, 16, 185, 215, 17, 26, 64, 98, 225, 18, 145, 187, 225, 213, 198, 229, 6, 6, 240, 177, 0, 28, 112, 160, 126, 193, 177, 221, 161, 96, 79, 5, 192, 224, 168, 253, 161, 12, 33, 9, 118, 215, 22, 219, 239, 73, 133, 79, 37, 218, 83, 238, 115, 44, 232, 16, 163, 242, 174, 100, 175, 162, 213, 142, 194, 58, 69, 84, 81, 3, 70}
Expand Down
14 changes: 7 additions & 7 deletions pkg/indexer/parser/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ func (p *Module) parse(ctx context.Context, b types.BlockData) error {
block.Events = parseEvents(b, b.ResultBlockResults.BeginBlockEvents)
allEvents = append(allEvents, block.Events...)

for _, tx := range txs {
block.Stats.Fee = block.Stats.Fee.Add(tx.Fee)
block.MessageTypes.Set(tx.MessageTypes.Bits)
block.Stats.BlobsSize += tx.BlobsSize
block.Stats.GasLimit += tx.GasWanted
block.Stats.GasUsed += tx.GasUsed
allEvents = append(allEvents, tx.Events...)
for i := range txs {
block.Stats.Fee = block.Stats.Fee.Add(txs[i].Fee)
block.MessageTypes.Set(txs[i].MessageTypes.Bits)
block.Stats.BlobsSize += txs[i].BlobsSize
block.Stats.GasLimit += txs[i].GasWanted
block.Stats.GasUsed += txs[i].GasUsed
allEvents = append(allEvents, txs[i].Events...)
}

endEvents := parseEvents(b, b.ResultBlockResults.EndBlockEvents)
Expand Down
39 changes: 11 additions & 28 deletions pkg/indexer/parser/parseEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package parser

import (
"encoding/base64"
"github.com/rs/zerolog/log"

"github.com/celenium-io/celestia-indexer/internal/storage"
Expand All @@ -15,44 +14,28 @@ import (
func parseEvents(b types.BlockData, events []types.Event) []storage.Event {
result := make([]storage.Event, len(events))

for i, eN := range events {
eS := parseEvent(b, eN, i)
result[i] = eS
for i := range events {
parseEvent(b, events[i], i, &result[i])
}

return result
}

func parseEvent(b types.BlockData, eN types.Event, index int) storage.Event {
func parseEvent(b types.BlockData, eN types.Event, index int, resultEvent *storage.Event) {
eventType, err := storageTypes.ParseEventType(eN.Type)
if err != nil {
log.Err(err).Msgf("got type %v", eN.Type)
eventType = storageTypes.EventTypeUnknown
}

event := storage.Event{
Height: b.Height,
Time: b.Block.Time,
Position: int64(index),
Type: eventType,
Data: make(map[string]any),
}

for _, attr := range eN.Attributes {
key := decodeEventAttribute(attr.Key)
value := decodeEventAttribute(attr.Value)
event.Data[key] = value
}
resultEvent.Height = b.Height
resultEvent.Time = b.Block.Time
resultEvent.Position = int64(index)
resultEvent.Type = eventType
resultEvent.Data = make(map[string]any, len(eN.Attributes))

return event
}

var b64 = base64.StdEncoding

func decodeEventAttribute(data string) string {
dst, err := b64.DecodeString(data)
if err != nil {
return data
for i := range eN.Attributes {
key := string(eN.Attributes[i].Key)
resultEvent.Data[key] = string(eN.Attributes[i].Value)
}
return string(dst)
}
112 changes: 58 additions & 54 deletions pkg/indexer/parser/parseEvents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package parser

import (
"encoding/json"
"testing"
"time"

"github.com/celenium-io/celestia-indexer/internal/storage"
storageTypes "github.com/celenium-io/celestia-indexer/internal/storage/types"
testsuite "github.com/celenium-io/celestia-indexer/internal/test_suite"
"github.com/celenium-io/celestia-indexer/pkg/types"
Expand All @@ -26,23 +29,24 @@ func TestParseEvents_EmptyEventsResults(t *testing.T) {
}

func TestParseEvents_SuccessTx(t *testing.T) {
events := []types.Event{
{
Type: "coin_spent",
Attributes: []types.EventAttribute{
{
Key: "c3BlbmRlcg==",
Value: "Y2VsZXN0aWExdjY5bnB6NncwN3h0NGhkdWU5eGR3a3V4eHZ2ZDZlYTl5MjZlcXI=",
Index: true,
},
{
Key: "YW1vdW50",
Value: "NzAwMDB1dGlh",
Index: true,
},
raw := `[{
"type": "coin_spent",
"attributes": [
{
"key": "c3BlbmRlcg==",
"value": "Y2VsZXN0aWExcDMzMHN0YXB1c3lrZnNzNDdxcmhxbHVram5jdmd5emY2Z2R1ZnM=",
"index": true
},
},
}
{
"key": "YW1vdW50",
"value": "NDA0OTR1dGlh",
"index": true
}
]
}]`
var events []types.Event
err := json.Unmarshal([]byte(raw), &events)
require.NoError(t, err)

txRes := types.ResponseDeliverTx{
Code: 0,
Expand All @@ -68,48 +72,48 @@ func TestParseEvents_SuccessTx(t *testing.T) {
assert.Nil(t, e.TxId)

attrs := map[string]any{
"spender": "celestia1v69npz6w07xt4hdue9xdwkuxxvvd6ea9y26eqr",
"amount": "70000utia",
"spender": "celestia1p330stapusykfss47qrhqlukjncvgyzf6gdufs",
"amount": "40494utia",
}
assert.Equal(t, attrs, e.Data)
}

func Test_decodeEventAttribute(t *testing.T) {
tests := []struct {
name string
data string
want string
}{
{
name: "test 1",
data: "Y2VsZXN0aWExczQ1NXJoenh3Yzh3YzlrcXBoeHV0NzUyNHVtMDY3YzhwZGNjamo=",
want: "celestia1s455rhzxwc8wc9kqphxut7524um067c8pdccjj",
}, {
name: "test 2",
data: "c3BlbmRlcg==",
want: "spender",
}, {
name: "test 3",
data: "YW1vdW50",
want: "amount",
}, {
name: "test 4",
data: "NzAwMDB1dGlh",
want: "70000utia",
}, {
name: "test 5",
data: "bW9kdWxl",
want: "module",
}, {
name: "test 6",
data: "cmVjZWl2ZXI=",
want: "receiver",
func BenchmarkParseEvent(b *testing.B) {
block := types.BlockData{
ResultBlock: types.ResultBlock{
Block: &types.Block{
Header: types.Header{
Time: time.Now(),
},
},
},
ResultBlockResults: types.ResultBlockResults{
Height: 100,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := decodeEventAttribute(tt.data)
require.Equal(t, tt.want, got)
})
}
raw := `{
"type": "coin_spent",
"attributes": [
{
"key": "c3BlbmRlcg==",
"value": "Y2VsZXN0aWExcDMzMHN0YXB1c3lrZnNzNDdxcmhxbHVram5jdmd5emY2Z2R1ZnM=",
"index": true
},
{
"key": "YW1vdW50",
"value": "NDA0OTR1dGlh",
"index": true
}
]
}`
var event types.Event
err := json.Unmarshal([]byte(raw), &event)
require.NoError(b, err)

resultEvent := storage.Event{}
b.Run("parse event", func(b *testing.B) {
for i := 0; i < b.N; i++ {
parseEvent(block, event, 10, &resultEvent)
}
})
}
Loading

0 comments on commit 158bdf9

Please sign in to comment.