diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 8f5fb43..e6a82df 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -56,7 +56,7 @@ func New(ctx context.Context, cfg config.Config, stopperModule modules.Module) ( return Indexer{}, errors.Wrap(err, "while creating rollback module") } - p, err := createParser(r) + p, err := createParser(r, &api) if err != nil { return Indexer{}, errors.Wrap(err, "while creating parser module") } @@ -150,8 +150,8 @@ func createRollback(receiverModule modules.Module, pg postgres.Storage, api node return &rollbackModule, nil } -func createParser(receiverModule modules.Module) (*parser.Module, error) { - parserModule := parser.NewModule() +func createParser(receiverModule modules.Module, api node.Api) (*parser.Module, error) { + parserModule := parser.NewModule(api) if err := parserModule.AttachTo(receiverModule, receiver.BlocksOutput, parser.InputName); err != nil { return nil, errors.Wrap(err, "while attaching parser to receiver") diff --git a/pkg/indexer/parser/listen.go b/pkg/indexer/parser/listen.go index e8b5b90..1dc8235 100644 --- a/pkg/indexer/parser/listen.go +++ b/pkg/indexer/parser/listen.go @@ -31,7 +31,7 @@ func (p *Module) listen(ctx context.Context) { continue } - if err := p.parse(block); err != nil { + if err := p.parse(ctx, block); err != nil { p.Log.Err(err). Uint64("height", uint64(block.Height)). Msg("block parsing error") diff --git a/pkg/indexer/parser/parse.go b/pkg/indexer/parser/parse.go index 0369fdf..c4fc4d7 100644 --- a/pkg/indexer/parser/parse.go +++ b/pkg/indexer/parser/parse.go @@ -4,6 +4,7 @@ package parser import ( + "context" "encoding/hex" "strings" "time" @@ -16,7 +17,7 @@ import ( "github.com/shopspring/decimal" ) -func (p *Module) parse(b types.BlockData) error { +func (p *Module) parse(ctx context.Context, b types.BlockData) error { start := time.Now() p.Log.Info(). Int64("height", b.Block.Height). @@ -30,7 +31,7 @@ func (p *Module) parse(b types.BlockData) error { decodeCtx := decode.NewContext() decodeCtx.Proposer = proposer - txs, err := parseTxs(b, &decodeCtx) + txs, err := parseTxs(ctx, b, &decodeCtx, p.api) if err != nil { return errors.Wrapf(err, "while parsing block on level=%d", b.Height) } diff --git a/pkg/indexer/parser/parseEvents.go b/pkg/indexer/parser/parseEvents.go index 368e72c..476a89d 100644 --- a/pkg/indexer/parser/parseEvents.go +++ b/pkg/indexer/parser/parseEvents.go @@ -4,20 +4,27 @@ package parser import ( + "context" + "strings" + "time" + + astria "buf.build/gen/go/astria/protocol-apis/protocolbuffers/go/astria/protocol/asset/v1alpha1" "github.com/celenium-io/astria-indexer/internal/currency" "github.com/celenium-io/astria-indexer/internal/storage" "github.com/celenium-io/astria-indexer/pkg/indexer/decode" + "github.com/celenium-io/astria-indexer/pkg/node" "github.com/celenium-io/astria-indexer/pkg/types" "github.com/pkg/errors" "github.com/shopspring/decimal" + "google.golang.org/protobuf/proto" ) -func parseEvents(events []types.Event, ctx *decode.Context) error { +func parseEvents(ctx context.Context, events []types.Event, decodeCtx *decode.Context, api node.Api) error { for i := range events { var err error switch events[i].Type { case "tx.fees": - err = parseTxFees(events[i].Attributes, ctx) + err = parseTxFees(ctx, events[i].Attributes, decodeCtx, api) default: continue } @@ -29,7 +36,39 @@ func parseEvents(events []types.Event, ctx *decode.Context) error { return nil } -func parseTxFees(attrs []types.EventAttribute, ctx *decode.Context) error { +var ( + assets = map[string]string{ + "704031c868fd3d3c84a1cfa8cb45deba4ea746b44697f7f4a6ed1b8f6c239b82": string(currency.Nria), + } +) + +func getAsset(ctx context.Context, api node.Api, val string) (string, error) { + if !strings.HasPrefix(val, "ibc") { + return val, nil + } + parts := strings.Split(val, "/") + hash := parts[len(parts)-1] + if asset, ok := assets[hash]; ok { + return asset, nil + } + + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + metadata, err := api.GetAssetInfo(timeoutCtx, hash) + if err != nil { + return "", errors.Wrap(err, val) + } + + var response astria.DenomResponse + if err := proto.Unmarshal(metadata.Response.Value, &response); err != nil { + return "", errors.Wrap(err, val) + } + assets[hash] = response.GetDenom() + return response.GetDenom(), nil +} + +func parseTxFees(ctx context.Context, attrs []types.EventAttribute, decodeCtx *decode.Context, api node.Api) error { var ( fee = new(storage.Fee) err error @@ -37,11 +76,11 @@ func parseTxFees(attrs []types.EventAttribute, ctx *decode.Context) error { for i := range attrs { switch attrs[i].Key { case "asset": - fee.Asset = attrs[i].Value - // TODO: think about general logic with IBC channels - if fee.Asset == "ibc/704031c868fd3d3c84a1cfa8cb45deba4ea746b44697f7f4a6ed1b8f6c239b82" { - fee.Asset = string(currency.Nria) + asset, err := getAsset(ctx, api, attrs[i].Value) + if err != nil { + return err } + fee.Asset = asset case "feeAmount": fee.Amount, err = decimal.NewFromString(attrs[i].Value) if err != nil { @@ -53,6 +92,6 @@ func parseTxFees(attrs []types.EventAttribute, ctx *decode.Context) error { } } - ctx.AddFee(fee) + decodeCtx.AddFee(fee) return nil } diff --git a/pkg/indexer/parser/parseTxs.go b/pkg/indexer/parser/parseTxs.go index 37cae18..e85a25a 100644 --- a/pkg/indexer/parser/parseTxs.go +++ b/pkg/indexer/parser/parseTxs.go @@ -4,14 +4,17 @@ package parser import ( + "context" + "github.com/celenium-io/astria-indexer/internal/storage" storageTypes "github.com/celenium-io/astria-indexer/internal/storage/types" "github.com/celenium-io/astria-indexer/pkg/indexer/decode" + "github.com/celenium-io/astria-indexer/pkg/node" "github.com/celenium-io/astria-indexer/pkg/types" "github.com/pkg/errors" ) -func parseTxs(b types.BlockData, ctx *decode.Context) ([]*storage.Tx, error) { +func parseTxs(ctx context.Context, b types.BlockData, decodeCtx *decode.Context, api node.Api) ([]*storage.Tx, error) { count := len(b.Block.Txs) index := 0 if count == 0 { @@ -21,24 +24,24 @@ func parseTxs(b types.BlockData, ctx *decode.Context) ([]*storage.Tx, error) { if len(b.Block.Txs) >= 2 && len(b.Block.Txs[0]) == 32 && len(b.Block.Txs[1]) == 32 { count -= 2 index = 2 - ctx.BytesInBlock += 64 + decodeCtx.BytesInBlock += 64 } txs := make([]*storage.Tx, count) for i := index; i < len(b.TxsResults); i++ { - if err := parseEvents(b.TxsResults[i].Events, ctx); err != nil { + if err := parseEvents(ctx, b.TxsResults[i].Events, decodeCtx, api); err != nil { return nil, errors.Wrap(err, "parse events") } - t, err := parseTx(b, i, b.TxsResults[i], ctx) + t, err := parseTx(b, i, b.TxsResults[i], decodeCtx) if err != nil { return nil, err } txs[i-index] = &t - ctx.GasWanted += b.TxsResults[i].GasWanted - ctx.GasUsed += b.TxsResults[i].GasUsed + decodeCtx.GasWanted += b.TxsResults[i].GasWanted + decodeCtx.GasUsed += b.TxsResults[i].GasUsed } return txs, nil diff --git a/pkg/indexer/parser/parseTxs_test.go b/pkg/indexer/parser/parseTxs_test.go index 41fd45d..44ca5ce 100644 --- a/pkg/indexer/parser/parseTxs_test.go +++ b/pkg/indexer/parser/parseTxs_test.go @@ -4,20 +4,26 @@ package parser import ( + "context" "testing" storageTypes "github.com/celenium-io/astria-indexer/internal/storage/types" testsuite "github.com/celenium-io/astria-indexer/internal/test_suite" "github.com/celenium-io/astria-indexer/pkg/indexer/decode" + "github.com/celenium-io/astria-indexer/pkg/node/mock" "github.com/celenium-io/astria-indexer/pkg/types" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" ) func TestParseTxs_EmptyTxsResults(t *testing.T) { block, _ := testsuite.EmptyBlock() - ctx := decode.NewContext() - resultTxs, err := parseTxs(block, &ctx) + decodeCtx := decode.NewContext() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + api := mock.NewMockApi(ctrl) + resultTxs, err := parseTxs(context.Background(), block, &decodeCtx, api) assert.NoError(t, err) assert.Empty(t, resultTxs) @@ -52,7 +58,11 @@ func TestParseTxs_SuccessTx(t *testing.T) { } block, now := testsuite.CreateTestBlock(txRes, true) ctx := decode.NewContext() - resultTxs, err := parseTxs(block, &ctx) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + api := mock.NewMockApi(ctrl) + + resultTxs, err := parseTxs(context.Background(), block, &ctx, api) assert.NoError(t, err) assert.Len(t, resultTxs, 1) @@ -79,7 +89,10 @@ func TestParseTxs_FailedTx(t *testing.T) { } block, now := testsuite.CreateTestBlock(txRes, true) ctx := decode.NewContext() - resultTxs, err := parseTxs(block, &ctx) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + api := mock.NewMockApi(ctrl) + resultTxs, err := parseTxs(context.Background(), block, &ctx, api) assert.NoError(t, err) assert.Len(t, resultTxs, 1) @@ -106,7 +119,10 @@ func TestParseTxs_FailedTxWithNonstandardErrorCode(t *testing.T) { } block, now := testsuite.CreateTestBlock(txRes, true) ctx := decode.NewContext() - resultTxs, err := parseTxs(block, &ctx) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + api := mock.NewMockApi(ctrl) + resultTxs, err := parseTxs(context.Background(), block, &ctx, api) assert.NoError(t, err) assert.Len(t, resultTxs, 1) diff --git a/pkg/indexer/parser/parser.go b/pkg/indexer/parser/parser.go index 3e63038..f495704 100644 --- a/pkg/indexer/parser/parser.go +++ b/pkg/indexer/parser/parser.go @@ -6,11 +6,14 @@ package parser import ( "context" + "github.com/celenium-io/astria-indexer/pkg/node" "github.com/dipdup-net/indexer-sdk/pkg/modules" ) type Module struct { modules.BaseModule + + api node.Api } var _ modules.Module = (*Module)(nil) @@ -21,9 +24,10 @@ const ( StopOutput = "stop" ) -func NewModule() Module { +func NewModule(api node.Api) Module { m := Module{ BaseModule: modules.New("parser"), + api: api, } m.CreateInput(InputName) m.CreateOutput(OutputName) diff --git a/pkg/indexer/parser/parser_test.go b/pkg/indexer/parser/parser_test.go index e20139a..34ce97b 100644 --- a/pkg/indexer/parser/parser_test.go +++ b/pkg/indexer/parser/parser_test.go @@ -9,20 +9,24 @@ import ( "time" "github.com/celenium-io/astria-indexer/internal/storage" + "github.com/celenium-io/astria-indexer/pkg/node/mock" "github.com/celenium-io/astria-indexer/pkg/types" cometTypes "github.com/cometbft/cometbft/types" "github.com/dipdup-net/indexer-sdk/pkg/modules" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" ) var testTime = time.Now() -func createModules(t *testing.T) (modules.BaseModule, string, Module) { +func createModules(t *testing.T, ctrl *gomock.Controller) (modules.BaseModule, string, Module) { writerModule := modules.New("writer-module") outputName := "write" writerModule.CreateOutput(outputName) - parserModule := NewModule() + + api := mock.NewMockApi(ctrl) + parserModule := NewModule(api) err := parserModule.AttachTo(&writerModule, outputName, InputName) assert.NoError(t, err) @@ -134,7 +138,10 @@ func getBlock() types.BlockData { } func TestParserModule_Success(t *testing.T) { - writerModule, outputName, parserModule := createModules(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + writerModule, outputName, parserModule := createModules(t, ctrl) readerModule := modules.New("reader-module") readerInputName := "read" @@ -169,7 +176,10 @@ func TestParserModule_Success(t *testing.T) { } func TestModule_OnClosedChannel(t *testing.T) { - _, _, parserModule := createModules(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + _, _, parserModule := createModules(t, ctrl) stopperModule := modules.New("stopper-module") stopInputName := "stop-signal" @@ -198,7 +208,10 @@ func TestModule_OnClosedChannel(t *testing.T) { } func TestModule_OnParseError(t *testing.T) { - writerModule, writerOutputName, parserModule := createModules(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + writerModule, writerOutputName, parserModule := createModules(t, ctrl) stopperModule := modules.New("stopper-module") stopInputName := "stop-signal" diff --git a/pkg/node/api.go b/pkg/node/api.go index 42d2084..c735791 100644 --- a/pkg/node/api.go +++ b/pkg/node/api.go @@ -20,4 +20,5 @@ type Api interface { Genesis(ctx context.Context) (types.Genesis, error) BlockData(ctx context.Context, level pkgTypes.Level) (pkgTypes.BlockData, error) BlockDataGet(ctx context.Context, level pkgTypes.Level) (pkgTypes.BlockData, error) + GetAssetInfo(ctx context.Context, asset string) (types.DenomMetadataResponse, error) } diff --git a/pkg/node/mock/api.go b/pkg/node/mock/api.go index aa86578..2f8c1b5 100644 --- a/pkg/node/mock/api.go +++ b/pkg/node/mock/api.go @@ -239,6 +239,45 @@ func (c *MockApiGenesisCall) DoAndReturn(f func(context.Context) (types.Genesis, return c } +// GetAssetInfo mocks base method. +func (m *MockApi) GetAssetInfo(ctx context.Context, asset string) (types.DenomMetadataResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAssetInfo", ctx, asset) + ret0, _ := ret[0].(types.DenomMetadataResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAssetInfo indicates an expected call of GetAssetInfo. +func (mr *MockApiMockRecorder) GetAssetInfo(ctx, asset any) *MockApiGetAssetInfoCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAssetInfo", reflect.TypeOf((*MockApi)(nil).GetAssetInfo), ctx, asset) + return &MockApiGetAssetInfoCall{Call: call} +} + +// MockApiGetAssetInfoCall wrap *gomock.Call +type MockApiGetAssetInfoCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockApiGetAssetInfoCall) Return(arg0 types.DenomMetadataResponse, arg1 error) *MockApiGetAssetInfoCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockApiGetAssetInfoCall) Do(f func(context.Context, string) (types.DenomMetadataResponse, error)) *MockApiGetAssetInfoCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockApiGetAssetInfoCall) DoAndReturn(f func(context.Context, string) (types.DenomMetadataResponse, error)) *MockApiGetAssetInfoCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Head mocks base method. func (m *MockApi) Head(ctx context.Context) (types0.ResultBlock, error) { m.ctrl.T.Helper() diff --git a/pkg/node/rpc/asset.go b/pkg/node/rpc/asset.go new file mode 100644 index 0000000..f10968a --- /dev/null +++ b/pkg/node/rpc/asset.go @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2024 PK Lab AG +// SPDX-License-Identifier: MIT + +package rpc + +import ( + "context" + "fmt" + + "github.com/celenium-io/astria-indexer/pkg/node/types" + "github.com/pkg/errors" +) + +const pathAbciQuery = "abci_query" + +func (api *API) GetAssetInfo(ctx context.Context, asset string) (types.DenomMetadataResponse, error) { + args := make(map[string]string) + args["path"] = fmt.Sprintf(`"asset/denom/%s"`, asset) + + var gbr types.Response[types.DenomMetadataResponse] + if err := api.get(ctx, pathAbciQuery, args, &gbr); err != nil { + return gbr.Result, errors.Wrap(err, "api.get") + } + + if gbr.Error != nil { + return gbr.Result, errors.Wrapf(types.ErrRequest, "request %d error: %s", gbr.Id, gbr.Error.Error()) + } + + return gbr.Result, nil +} diff --git a/pkg/node/types/denom.go b/pkg/node/types/denom.go new file mode 100644 index 0000000..49851d2 --- /dev/null +++ b/pkg/node/types/denom.go @@ -0,0 +1,21 @@ +// SPDX-FileCopyrightText: 2024 PK Lab AG +// SPDX-License-Identifier: MIT + +package types + +import "encoding/json" + +type DenomMetadata struct { + Code int `json:"code"` + Log string `json:"log"` + Info string `json:"info"` + Index json.Number `json:"index"` + Key []byte `json:"key"` + Value []byte `json:"value"` + Height json.Number `json:"height"` + Codespace string `json:"codespace"` +} + +type DenomMetadataResponse struct { + Response DenomMetadata `json:"response"` +}