diff --git a/internal/storage/postgres/event.go b/internal/storage/postgres/event.go index 7066d61..8e49069 100644 --- a/internal/storage/postgres/event.go +++ b/internal/storage/postgres/event.go @@ -6,6 +6,7 @@ import ( "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-net/go-lib/database" "github.com/dipdup-net/indexer-sdk/pkg/storage/postgres" + "github.com/rs/zerolog/log" "github.com/uptrace/bun" ) @@ -23,17 +24,17 @@ func NewEvent(db *database.Bun) *Event { // Filter - func (event *Event) Filter(ctx context.Context, fltr []storage.EventFilter, opts ...storage.FilterOption) (result []storage.Event, err error) { - query := event.DB().NewSelect().Model(&result) + query := event.DB().NewSelect().Model((*storage.Event)(nil)) query = query.WhereGroup(" AND ", func(q1 *bun.SelectQuery) *bun.SelectQuery { for i := range fltr { q1 = q1.WhereGroup(" OR ", func(q *bun.SelectQuery) *bun.SelectQuery { - q = integerFilter(q, "event.id", fltr[i].ID) - q = integerFilter(q, "event.height", fltr[i].Height) - q = timeFilter(q, "event.time", fltr[i].Time) - q = idFilter(q, "event.contract_id", fltr[i].Contract, "Contract") - q = idFilter(q, "event.from_id", fltr[i].From, "From") - q = stringFilter(q, "event.name", fltr[i].Name) - q = jsonFilter(q, "event.parsed_data", fltr[i].ParsedData) + q = integerFilter(q, "id", fltr[i].ID) + q = integerFilter(q, "height", fltr[i].Height) + q = timeFilter(q, "time", fltr[i].Time) + q = idFilter(q, "contract_id", fltr[i].Contract) + q = idFilter(q, "from_id", fltr[i].From) + q = stringFilter(q, "name", fltr[i].Name) + q = jsonFilter(q, "parsed_data", fltr[i].ParsedData) return q }) } @@ -41,8 +42,22 @@ func (event *Event) Filter(ctx context.Context, fltr []storage.EventFilter, opts }) query = optionsFilter(query, "event", opts...) - query.Relation("Contract").Relation("From") - err = query.Scan(ctx) + var opt storage.FilterOptions + for i := range opts { + opts[i](&opt) + } + + q := event.DB().NewSelect(). + TableExpr("(?) as event", query). + ColumnExpr("event.*"). + ColumnExpr("contract.id as contract__id, contract.class_id as contract__class_id, contract.height as contract__height, contract.hash as contract__hash"). + ColumnExpr("from_addr.id as from__id, from_addr.class_id as from__class_id, from_addr.height as from__height, from_addr.hash as from__hash"). + Join("left join address as contract on contract.id = event.contract_id"). + Join("left join address as from_addr on from_addr.id = event.from_id") + q = addSort(q, opt.SortField, opt.SortOrder) + + log.Info().Msg(q.String()) + err = q.Scan(ctx, &result) return } diff --git a/internal/storage/postgres/filters.go b/internal/storage/postgres/filters.go index 3befa7b..3de33f8 100644 --- a/internal/storage/postgres/filters.go +++ b/internal/storage/postgres/filters.go @@ -122,17 +122,15 @@ func addressFilter(q *bun.SelectQuery, name string, fltr storage.BytesFilter, jo return q } -func idFilter(q *bun.SelectQuery, name string, fltr storage.IdFilter, joinColumn string) *bun.SelectQuery { - if name == "" || joinColumn == "" { +func idFilter(q *bun.SelectQuery, name string, fltr storage.IdFilter) *bun.SelectQuery { + if name == "" { return q } switch { case fltr.Eq > 0: - q = q.Relation(joinColumn) q = q.Where("? = ?", bun.Safe(name), fltr.Eq) case len(fltr.In) > 0: - q = q.Relation(joinColumn) q = q.Where("? IN (?)", bun.Safe(name), bun.In(fltr.In)) } diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index 45d70c2..d5d2a7d 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -137,7 +137,7 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) ( grpcStream := grpc.NewStream[pb.Subscription](stream) client.G.GoCtx(ctx, func(ctx context.Context) { - client.handleMessage(ctx, grpcStream) + client.handleMessage(grpcStream) }) id, err := grpcStream.Subscribe(ctx) @@ -164,7 +164,7 @@ func (client *Client) sendToOutput(name string, data any) error { return nil } -func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) { +func (client *Client) handleMessage(stream *grpcSDK.Stream[pb.Subscription]) { for { select { case <-stream.Context().Done(): diff --git a/pkg/grpc/sync.go b/pkg/grpc/sync.go index 417a79e..344cf08 100644 --- a/pkg/grpc/sync.go +++ b/pkg/grpc/sync.go @@ -286,7 +286,7 @@ func (module *Server) syncTables(ctx context.Context, tables tables, targetHeigh } if head := tables[0].Head(); head != nil { - if err := sendModelToClient(ctx, subscriptionID, stream, head); err != nil { + if err := sendModelToClient(subscriptionID, stream, head); err != nil { return err } } @@ -301,7 +301,7 @@ func (module *Server) syncTables(ctx context.Context, tables tables, targetHeigh return nil } -func sendModelToClient(ctx context.Context, subscriptionID uint64, stream pb.IndexerService_SubscribeServer, model any) error { +func sendModelToClient(subscriptionID uint64, stream pb.IndexerService_SubscribeServer, model any) error { var msg pb.Subscription switch typ := model.(type) { case storage.Invoke: @@ -423,7 +423,7 @@ func (module *Server) syncTokenBalances(ctx context.Context, fltr []storage.Toke end = len(data) < limit offset += len(data) for i := range data { - if err := sendModelToClient(ctx, subscriptionID, stream, data[i]); err != nil { + if err := sendModelToClient(subscriptionID, stream, data[i]); err != nil { return err } }