Skip to content

Commit

Permalink
Feature: build asset map
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Sep 10, 2024
1 parent 2fcd418 commit 2f1e41a
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 31 deletions.
6 changes: 3 additions & 3 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func New(ctx context.Context, cfg config.Config, stopperModule modules.Module) (
return Indexer{}, errors.Wrap(err, "while creating rollback module")
}

p, err := createParser(r)
p, err := createParser(r, &api)
if err != nil {
return Indexer{}, errors.Wrap(err, "while creating parser module")
}
Expand Down Expand Up @@ -150,8 +150,8 @@ func createRollback(receiverModule modules.Module, pg postgres.Storage, api node
return &rollbackModule, nil
}

func createParser(receiverModule modules.Module) (*parser.Module, error) {
parserModule := parser.NewModule()
func createParser(receiverModule modules.Module, api node.Api) (*parser.Module, error) {
parserModule := parser.NewModule(api)

if err := parserModule.AttachTo(receiverModule, receiver.BlocksOutput, parser.InputName); err != nil {
return nil, errors.Wrap(err, "while attaching parser to receiver")
Expand Down
2 changes: 1 addition & 1 deletion pkg/indexer/parser/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *Module) listen(ctx context.Context) {
continue
}

if err := p.parse(block); err != nil {
if err := p.parse(ctx, block); err != nil {
p.Log.Err(err).
Uint64("height", uint64(block.Height)).
Msg("block parsing error")
Expand Down
5 changes: 3 additions & 2 deletions pkg/indexer/parser/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package parser

import (
"context"
"encoding/hex"
"strings"
"time"
Expand All @@ -16,7 +17,7 @@ import (
"github.com/shopspring/decimal"
)

func (p *Module) parse(b types.BlockData) error {
func (p *Module) parse(ctx context.Context, b types.BlockData) error {
start := time.Now()
p.Log.Info().
Int64("height", b.Block.Height).
Expand All @@ -30,7 +31,7 @@ func (p *Module) parse(b types.BlockData) error {
decodeCtx := decode.NewContext()
decodeCtx.Proposer = proposer

txs, err := parseTxs(b, &decodeCtx)
txs, err := parseTxs(ctx, b, &decodeCtx, p.api)
if err != nil {
return errors.Wrapf(err, "while parsing block on level=%d", b.Height)
}
Expand Down
55 changes: 47 additions & 8 deletions pkg/indexer/parser/parseEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
package parser

import (
"context"
"strings"
"time"

astria "buf.build/gen/go/astria/protocol-apis/protocolbuffers/go/astria/protocol/asset/v1alpha1"
"github.com/celenium-io/astria-indexer/internal/currency"
"github.com/celenium-io/astria-indexer/internal/storage"
"github.com/celenium-io/astria-indexer/pkg/indexer/decode"
"github.com/celenium-io/astria-indexer/pkg/node"
"github.com/celenium-io/astria-indexer/pkg/types"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"google.golang.org/protobuf/proto"
)

func parseEvents(events []types.Event, ctx *decode.Context) error {
func parseEvents(ctx context.Context, events []types.Event, decodeCtx *decode.Context, api node.Api) error {
for i := range events {
var err error
switch events[i].Type {
case "tx.fees":
err = parseTxFees(events[i].Attributes, ctx)
err = parseTxFees(ctx, events[i].Attributes, decodeCtx, api)
default:
continue
}
Expand All @@ -29,19 +36,51 @@ func parseEvents(events []types.Event, ctx *decode.Context) error {
return nil
}

func parseTxFees(attrs []types.EventAttribute, ctx *decode.Context) error {
var (
assets = map[string]string{
"704031c868fd3d3c84a1cfa8cb45deba4ea746b44697f7f4a6ed1b8f6c239b82": string(currency.Nria),
}
)

func getAsset(ctx context.Context, api node.Api, val string) (string, error) {
if !strings.HasPrefix(val, "ibc") {
return val, nil
}
parts := strings.Split(val, "/")
hash := parts[len(parts)-1]
if asset, ok := assets[hash]; ok {
return asset, nil
}

timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

metadata, err := api.GetAssetInfo(timeoutCtx, hash)
if err != nil {
return "", errors.Wrap(err, val)
}

var response astria.DenomResponse
if err := proto.Unmarshal(metadata.Response.Value, &response); err != nil {
return "", errors.Wrap(err, val)
}
assets[hash] = response.GetDenom()
return response.GetDenom(), nil
}

func parseTxFees(ctx context.Context, attrs []types.EventAttribute, decodeCtx *decode.Context, api node.Api) error {
var (
fee = new(storage.Fee)
err error
)
for i := range attrs {
switch attrs[i].Key {
case "asset":
fee.Asset = attrs[i].Value
// TODO: think about general logic with IBC channels
if fee.Asset == "ibc/704031c868fd3d3c84a1cfa8cb45deba4ea746b44697f7f4a6ed1b8f6c239b82" {
fee.Asset = string(currency.Nria)
asset, err := getAsset(ctx, api, attrs[i].Value)
if err != nil {
return err
}
fee.Asset = asset
case "feeAmount":
fee.Amount, err = decimal.NewFromString(attrs[i].Value)
if err != nil {
Expand All @@ -53,6 +92,6 @@ func parseTxFees(attrs []types.EventAttribute, ctx *decode.Context) error {
}
}

ctx.AddFee(fee)
decodeCtx.AddFee(fee)
return nil
}
15 changes: 9 additions & 6 deletions pkg/indexer/parser/parseTxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package parser

import (
"context"

"github.com/celenium-io/astria-indexer/internal/storage"
storageTypes "github.com/celenium-io/astria-indexer/internal/storage/types"
"github.com/celenium-io/astria-indexer/pkg/indexer/decode"
"github.com/celenium-io/astria-indexer/pkg/node"
"github.com/celenium-io/astria-indexer/pkg/types"
"github.com/pkg/errors"
)

func parseTxs(b types.BlockData, ctx *decode.Context) ([]*storage.Tx, error) {
func parseTxs(ctx context.Context, b types.BlockData, decodeCtx *decode.Context, api node.Api) ([]*storage.Tx, error) {
count := len(b.Block.Txs)
index := 0
if count == 0 {
Expand All @@ -21,24 +24,24 @@ func parseTxs(b types.BlockData, ctx *decode.Context) ([]*storage.Tx, error) {
if len(b.Block.Txs) >= 2 && len(b.Block.Txs[0]) == 32 && len(b.Block.Txs[1]) == 32 {
count -= 2
index = 2
ctx.BytesInBlock += 64
decodeCtx.BytesInBlock += 64
}

txs := make([]*storage.Tx, count)

for i := index; i < len(b.TxsResults); i++ {
if err := parseEvents(b.TxsResults[i].Events, ctx); err != nil {
if err := parseEvents(ctx, b.TxsResults[i].Events, decodeCtx, api); err != nil {
return nil, errors.Wrap(err, "parse events")
}

t, err := parseTx(b, i, b.TxsResults[i], ctx)
t, err := parseTx(b, i, b.TxsResults[i], decodeCtx)
if err != nil {
return nil, err
}
txs[i-index] = &t

ctx.GasWanted += b.TxsResults[i].GasWanted
ctx.GasUsed += b.TxsResults[i].GasUsed
decodeCtx.GasWanted += b.TxsResults[i].GasWanted
decodeCtx.GasUsed += b.TxsResults[i].GasUsed
}

return txs, nil
Expand Down
26 changes: 21 additions & 5 deletions pkg/indexer/parser/parseTxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@
package parser

import (
"context"
"testing"

storageTypes "github.com/celenium-io/astria-indexer/internal/storage/types"
testsuite "github.com/celenium-io/astria-indexer/internal/test_suite"
"github.com/celenium-io/astria-indexer/pkg/indexer/decode"
"github.com/celenium-io/astria-indexer/pkg/node/mock"
"github.com/celenium-io/astria-indexer/pkg/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

func TestParseTxs_EmptyTxsResults(t *testing.T) {
block, _ := testsuite.EmptyBlock()

ctx := decode.NewContext()
resultTxs, err := parseTxs(block, &ctx)
decodeCtx := decode.NewContext()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
api := mock.NewMockApi(ctrl)
resultTxs, err := parseTxs(context.Background(), block, &decodeCtx, api)

assert.NoError(t, err)
assert.Empty(t, resultTxs)
Expand Down Expand Up @@ -52,7 +58,11 @@ func TestParseTxs_SuccessTx(t *testing.T) {
}
block, now := testsuite.CreateTestBlock(txRes, true)
ctx := decode.NewContext()
resultTxs, err := parseTxs(block, &ctx)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
api := mock.NewMockApi(ctrl)

resultTxs, err := parseTxs(context.Background(), block, &ctx, api)

assert.NoError(t, err)
assert.Len(t, resultTxs, 1)
Expand All @@ -79,7 +89,10 @@ func TestParseTxs_FailedTx(t *testing.T) {
}
block, now := testsuite.CreateTestBlock(txRes, true)
ctx := decode.NewContext()
resultTxs, err := parseTxs(block, &ctx)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
api := mock.NewMockApi(ctrl)
resultTxs, err := parseTxs(context.Background(), block, &ctx, api)

assert.NoError(t, err)
assert.Len(t, resultTxs, 1)
Expand All @@ -106,7 +119,10 @@ func TestParseTxs_FailedTxWithNonstandardErrorCode(t *testing.T) {
}
block, now := testsuite.CreateTestBlock(txRes, true)
ctx := decode.NewContext()
resultTxs, err := parseTxs(block, &ctx)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
api := mock.NewMockApi(ctrl)
resultTxs, err := parseTxs(context.Background(), block, &ctx, api)

assert.NoError(t, err)
assert.Len(t, resultTxs, 1)
Expand Down
6 changes: 5 additions & 1 deletion pkg/indexer/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package parser
import (
"context"

"github.com/celenium-io/astria-indexer/pkg/node"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
)

type Module struct {
modules.BaseModule

api node.Api
}

var _ modules.Module = (*Module)(nil)
Expand All @@ -21,9 +24,10 @@ const (
StopOutput = "stop"
)

func NewModule() Module {
func NewModule(api node.Api) Module {
m := Module{
BaseModule: modules.New("parser"),
api: api,
}
m.CreateInput(InputName)
m.CreateOutput(OutputName)
Expand Down
23 changes: 18 additions & 5 deletions pkg/indexer/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ import (
"time"

"github.com/celenium-io/astria-indexer/internal/storage"
"github.com/celenium-io/astria-indexer/pkg/node/mock"
"github.com/celenium-io/astria-indexer/pkg/types"
cometTypes "github.com/cometbft/cometbft/types"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

var testTime = time.Now()

func createModules(t *testing.T) (modules.BaseModule, string, Module) {
func createModules(t *testing.T, ctrl *gomock.Controller) (modules.BaseModule, string, Module) {
writerModule := modules.New("writer-module")
outputName := "write"
writerModule.CreateOutput(outputName)
parserModule := NewModule()

api := mock.NewMockApi(ctrl)
parserModule := NewModule(api)

err := parserModule.AttachTo(&writerModule, outputName, InputName)
assert.NoError(t, err)
Expand Down Expand Up @@ -134,7 +138,10 @@ func getBlock() types.BlockData {
}

func TestParserModule_Success(t *testing.T) {
writerModule, outputName, parserModule := createModules(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

writerModule, outputName, parserModule := createModules(t, ctrl)

readerModule := modules.New("reader-module")
readerInputName := "read"
Expand Down Expand Up @@ -169,7 +176,10 @@ func TestParserModule_Success(t *testing.T) {
}

func TestModule_OnClosedChannel(t *testing.T) {
_, _, parserModule := createModules(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

_, _, parserModule := createModules(t, ctrl)

stopperModule := modules.New("stopper-module")
stopInputName := "stop-signal"
Expand Down Expand Up @@ -198,7 +208,10 @@ func TestModule_OnClosedChannel(t *testing.T) {
}

func TestModule_OnParseError(t *testing.T) {
writerModule, writerOutputName, parserModule := createModules(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

writerModule, writerOutputName, parserModule := createModules(t, ctrl)

stopperModule := modules.New("stopper-module")
stopInputName := "stop-signal"
Expand Down
1 change: 1 addition & 0 deletions pkg/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Api interface {
Genesis(ctx context.Context) (types.Genesis, error)
BlockData(ctx context.Context, level pkgTypes.Level) (pkgTypes.BlockData, error)
BlockDataGet(ctx context.Context, level pkgTypes.Level) (pkgTypes.BlockData, error)
GetAssetInfo(ctx context.Context, asset string) (types.DenomMetadataResponse, error)
}
Loading

0 comments on commit 2f1e41a

Please sign in to comment.