Skip to content

Commit

Permalink
Optimization: event filters query
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed May 4, 2024
1 parent 4c04877 commit 29ba95d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
32 changes: 22 additions & 10 deletions internal/storage/postgres/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,38 @@ func NewEvent(db *database.Bun) *Event {

// Filter -
func (event *Event) Filter(ctx context.Context, fltr []storage.EventFilter, opts ...storage.FilterOption) (result []storage.Event, err error) {
query := event.DB().NewSelect().Model(&result)
query := event.DB().NewSelect().Model((*storage.Event)(nil))
query = query.WhereGroup(" AND ", func(q1 *bun.SelectQuery) *bun.SelectQuery {
for i := range fltr {
q1 = q1.WhereGroup(" OR ", func(q *bun.SelectQuery) *bun.SelectQuery {
q = integerFilter(q, "event.id", fltr[i].ID)
q = integerFilter(q, "event.height", fltr[i].Height)
q = timeFilter(q, "event.time", fltr[i].Time)
q = idFilter(q, "event.contract_id", fltr[i].Contract, "Contract")
q = idFilter(q, "event.from_id", fltr[i].From, "From")
q = stringFilter(q, "event.name", fltr[i].Name)
q = jsonFilter(q, "event.parsed_data", fltr[i].ParsedData)
q = integerFilter(q, "id", fltr[i].ID)
q = integerFilter(q, "height", fltr[i].Height)
q = timeFilter(q, "time", fltr[i].Time)
q = idFilter(q, "contract_id", fltr[i].Contract)
q = idFilter(q, "from_id", fltr[i].From)
q = stringFilter(q, "name", fltr[i].Name)
q = jsonFilter(q, "parsed_data", fltr[i].ParsedData)
return q
})
}
return q1
})

query = optionsFilter(query, "event", opts...)
query.Relation("Contract").Relation("From")

err = query.Scan(ctx)
var opt storage.FilterOptions
for i := range opts {
opts[i](&opt)
}

q := event.DB().NewSelect().
TableExpr("(?) as event", query).
ColumnExpr("event.*").
ColumnExpr("contract.id as contract__id, contract.class_id as contract__class_id, contract.height as contract__height, contract.hash as contract__hash").
ColumnExpr("from_addr.id as from__id, from_addr.class_id as from__class_id, from_addr.height as from__height, from_addr.hash as from__hash").
Join("left join address as contract on contract.id = event.contract_id").
Join("left join address as from_addr on from_addr.id = event.from_id")
q = addSort(q, opt.SortField, opt.SortOrder)
err = q.Scan(ctx, &result)
return
}
6 changes: 2 additions & 4 deletions internal/storage/postgres/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,15 @@ func addressFilter(q *bun.SelectQuery, name string, fltr storage.BytesFilter, jo
return q
}

func idFilter(q *bun.SelectQuery, name string, fltr storage.IdFilter, joinColumn string) *bun.SelectQuery {
if name == "" || joinColumn == "" {
func idFilter(q *bun.SelectQuery, name string, fltr storage.IdFilter) *bun.SelectQuery {
if name == "" {
return q
}

switch {
case fltr.Eq > 0:
q = q.Relation(joinColumn)
q = q.Where("? = ?", bun.Safe(name), fltr.Eq)
case len(fltr.In) > 0:
q = q.Relation(joinColumn)
q = q.Where("? IN (?)", bun.Safe(name), bun.In(fltr.In))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) (
grpcStream := grpc.NewStream[pb.Subscription](stream)

client.G.GoCtx(ctx, func(ctx context.Context) {
client.handleMessage(ctx, grpcStream)
client.handleMessage(grpcStream)
})

id, err := grpcStream.Subscribe(ctx)
Expand All @@ -164,7 +164,7 @@ func (client *Client) sendToOutput(name string, data any) error {
return nil
}

func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) {
func (client *Client) handleMessage(stream *grpcSDK.Stream[pb.Subscription]) {
for {
select {
case <-stream.Context().Done():
Expand Down
6 changes: 3 additions & 3 deletions pkg/grpc/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (module *Server) syncTables(ctx context.Context, tables tables, targetHeigh
}

if head := tables[0].Head(); head != nil {
if err := sendModelToClient(ctx, subscriptionID, stream, head); err != nil {
if err := sendModelToClient(subscriptionID, stream, head); err != nil {
return err
}
}
Expand All @@ -301,7 +301,7 @@ func (module *Server) syncTables(ctx context.Context, tables tables, targetHeigh
return nil
}

func sendModelToClient(ctx context.Context, subscriptionID uint64, stream pb.IndexerService_SubscribeServer, model any) error {
func sendModelToClient(subscriptionID uint64, stream pb.IndexerService_SubscribeServer, model any) error {
var msg pb.Subscription
switch typ := model.(type) {
case storage.Invoke:
Expand Down Expand Up @@ -423,7 +423,7 @@ func (module *Server) syncTokenBalances(ctx context.Context, fltr []storage.Toke
end = len(data) < limit
offset += len(data)
for i := range data {
if err := sendModelToClient(ctx, subscriptionID, stream, data[i]); err != nil {
if err := sendModelToClient(subscriptionID, stream, data[i]); err != nil {
return err
}
}
Expand Down

0 comments on commit 29ba95d

Please sign in to comment.