Skip to content

Commit

Permalink
Fix: add indexes and optimize transfer query
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Apr 5, 2024
1 parent 459f55d commit 0916de9
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 3 deletions.
32 changes: 32 additions & 0 deletions internal/storage/postgres/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func createIndices(ctx context.Context, conn *database.Bun) error {
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS storage_diff_contract_id_idx ON storage_diff (contract_id)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS storage_diff_height_idx ON storage_diff USING BRIN (height)`); err != nil {
return err
}

// Invoke
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS invoke_height_idx ON invoke USING BRIN (height)`); err != nil {
Expand Down Expand Up @@ -224,6 +227,35 @@ func createIndices(ctx context.Context, conn *database.Bun) error {
return err
}

// Transfer
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS transfer_height_idx ON transfer USING BRIN (height)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS transfer_from_id_idx ON transfer (from_id)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS transfer_to_id_idx ON transfer (to_id)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS transfer_contract_id_idx ON transfer (contract_id)`); err != nil {
return err
}

// Class Replace
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS class_replace_height_idx ON class_replace USING BRIN (height)`); err != nil {
return err
}

// Class
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS class_height_idx ON class USING BRIN (height)`); err != nil {
return err
}

// Block
if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS block_height_idx ON block USING BRIN (height)`); err != nil {
return err
}

return nil
})
}
Expand Down
8 changes: 8 additions & 0 deletions internal/storage/postgres/fixtures/transfer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- id: 1
height: 1
time: '2021-11-16T14:24:55+00:00'
contract_id: 8
from_id: 11
to_id: 12
amount: 1000
token_id: 1
13 changes: 10 additions & 3 deletions internal/storage/postgres/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewTransfer(db *database.Bun) *Transfer {

// Filter -
func (t *Transfer) Filter(ctx context.Context, fltr []storage.TransferFilter, opts ...storage.FilterOption) (result []storage.Transfer, err error) {
query := t.DB().NewSelect().Model(&result)
query := t.DB().NewSelect().Model((*storage.Transfer)(nil))
query = query.WhereGroup(" OR ", func(q1 *bun.SelectQuery) *bun.SelectQuery {
for i := range fltr {
q1 = q1.WhereGroup(" AND ", func(q *bun.SelectQuery) *bun.SelectQuery {
Expand All @@ -40,8 +40,15 @@ func (t *Transfer) Filter(ctx context.Context, fltr []storage.TransferFilter, op
return q1
})
query = optionsFilter(query, "transfer", opts...)
query.Relation("Contract").Relation("From").Relation("To")

err = query.Scan(ctx)
err = t.DB().NewSelect().TableExpr("(?) as transfer", query).
ColumnExpr("transfer.*").
ColumnExpr("from_addr.id as from__id, from_addr.class_id as from__class_id, from_addr.hash as from__hash, from_addr.height as from__height").
ColumnExpr("to_addr.id as to__id, to_addr.class_id as to__class_id, to_addr.hash as to__hash, to_addr.height as to__height").
ColumnExpr("contract.id as contract__id, contract.class_id as contract__class_id, contract.hash as contract__hash, contract.height as contract__height").
Join("left join address as from_addr on from_addr.id = transfer.from_id").
Join("left join address as to_addr on to_addr.id = transfer.to_id").
Join("left join address as contract on contract.id = transfer.contract_id").
Scan(ctx, &result)
return result, err
}
99 changes: 99 additions & 0 deletions internal/storage/postgres/transfer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package postgres

import (
"context"
"database/sql"
"testing"
"time"

"github.com/dipdup-io/starknet-indexer/internal/storage"
"github.com/dipdup-net/go-lib/config"
"github.com/dipdup-net/go-lib/database"
"github.com/go-testfixtures/testfixtures/v3"
_ "github.com/lib/pq"
"github.com/stretchr/testify/suite"
)

// TransferTestSuite -
type TransferTestSuite struct {
suite.Suite
psqlContainer *database.PostgreSQLContainer
storage Storage
pm database.RangePartitionManager
}

// SetupSuite -
func (s *TransferTestSuite) SetupSuite() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer ctxCancel()

psqlContainer, err := database.NewPostgreSQLContainer(ctx, database.PostgreSQLContainerConfig{
User: "user",
Password: "password",
Database: "db_test",
Port: 5432,
Image: "postgres:15",
})
s.Require().NoError(err)
s.psqlContainer = psqlContainer

store, err := Create(ctx, config.Database{
Kind: config.DBKindPostgres,
User: s.psqlContainer.Config.User,
Database: s.psqlContainer.Config.Database,
Password: s.psqlContainer.Config.Password,
Host: s.psqlContainer.Config.Host,
Port: s.psqlContainer.MappedPort().Int(),
})
s.Require().NoError(err)
s.storage = store

s.pm = database.NewPartitionManager(s.storage.Connection(), database.PartitionByYear)
currentTime, err := time.Parse(time.RFC3339, "2021-11-16T10:00:35+00:00")
s.Require().NoError(err)
err = s.pm.CreatePartition(ctx, currentTime, storage.Transfer{}.TableName())
s.Require().NoError(err)

db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
s.Require().NoError(err)

fixtures, err := testfixtures.New(
testfixtures.Database(db),
testfixtures.Dialect("postgres"),
testfixtures.Files(
"fixtures/transfer.yml",
"fixtures/address.yml",
),
)
s.Require().NoError(err)
s.Require().NoError(fixtures.Load())
s.Require().NoError(db.Close())
}

// TearDownSuite -
func (s *TransferTestSuite) TearDownSuite() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

s.Require().NoError(s.storage.Close())
s.Require().NoError(s.psqlContainer.Terminate(ctx))
}

func (s *TransferTestSuite) TestFilterByHeight() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

transfers, err := s.storage.Transfer.Filter(ctx, []storage.TransferFilter{
{
Height: storage.IntegerFilter{
Eq: 1,
},
},
}, storage.WithLimitFilter(3))
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}

func TestSuiteTransfer_Run(t *testing.T) {
suite.Run(t, new(TransferTestSuite))
}

0 comments on commit 0916de9

Please sign in to comment.