Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfractal authored Feb 23, 2023
0 parents commit 86445f9
Show file tree
Hide file tree
Showing 9 changed files with 839 additions and 0 deletions.
68 changes: 68 additions & 0 deletions abci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package keeper

import (
"encoding/hex"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/ingenuity-build/quicksilver/x/interchainquery/types"
)

const (
RetryInterval = 25
)

// EndBlocker of interchainquery module
func (k Keeper) EndBlocker(ctx sdk.Context) {
defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), telemetry.MetricKeyBeginBlocker)
_ = k.Logger(ctx)
events := sdk.Events{}
// emit events for periodic queries
k.IterateQueries(ctx, func(_ int64, queryInfo types.Query) (stop bool) {
// if queryInfo.QueryType == "ibc.ClientUpdate" && queryInfo.LastEmission.AddRaw(1000).LTE(sdk.NewInt(ctx.BlockHeight())) {
// k.DeleteQuery(ctx, queryInfo.Id)
// k.Logger(ctx).Error("Deleting stale query")
// return false
// }
if queryInfo.LastEmission.IsNil() || queryInfo.LastEmission.IsZero() || queryInfo.LastEmission.Add(queryInfo.Period).Equal(sdk.NewInt(ctx.BlockHeight())) {
k.Logger(ctx).Debug("Interchainquery event emitted", "id", queryInfo.Id)
event := sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
sdk.NewAttribute(sdk.AttributeKeyAction, types.AttributeValueQuery),
sdk.NewAttribute(types.AttributeKeyQueryID, queryInfo.Id),
sdk.NewAttribute(types.AttributeKeyChainID, queryInfo.ChainId),
sdk.NewAttribute(types.AttributeKeyConnectionID, queryInfo.ConnectionId),
sdk.NewAttribute(types.AttributeKeyType, queryInfo.QueryType),
// TODO: add height to request type
sdk.NewAttribute(types.AttributeKeyHeight, "0"),
sdk.NewAttribute(types.AttributeKeyRequest, hex.EncodeToString(queryInfo.Request)),
)

events = append(events, event)
queryInfo.LastEmission = sdk.NewInt(ctx.BlockHeight())
k.SetQuery(ctx, queryInfo)

}
return false
})

if len(events) > 0 {
ctx.EventManager().EmitEvents(events)
}

k.IterateDatapoints(ctx, func(_ int64, dp types.DataPoint) bool {
q, found := k.GetQuery(ctx, dp.Id)
if !found {
// query was removed; delete datapoint
k.DeleteDatapoint(ctx, dp.Id)
} else if dp.LocalHeight.Int64()+int64(q.Ttl) > ctx.BlockHeader().Height {
// gc old data
k.DeleteDatapoint(ctx, dp.Id)
}

return false
})
}
56 changes: 56 additions & 0 deletions abci_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package keeper_test

import (
sdk "github.com/cosmos/cosmos-sdk/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"

"github.com/ingenuity-build/quicksilver/x/interchainquery/keeper"
)

func (suite *KeeperTestSuite) TestEndBlocker() {
qvr := stakingtypes.QueryValidatorsResponse{
Validators: suite.GetSimApp(suite.chainB).StakingKeeper.GetBondedValidatorsByPower(suite.chainB.GetContext()),
}

bondedQuery := stakingtypes.QueryValidatorsRequest{Status: stakingtypes.BondStatusBonded}
bz, err := bondedQuery.Marshal()
suite.NoError(err)

id := keeper.GenerateQueryHash(suite.path.EndpointB.ConnectionID, suite.chainB.ChainID, "cosmos.staking.v1beta1.Query/Validators", bz, "")

query := suite.GetSimApp(suite.chainA).InterchainQueryKeeper.NewQuery(
suite.chainA.GetContext(),
"",
suite.path.EndpointB.ConnectionID,
suite.chainB.ChainID,
"cosmos.staking.v1beta1.Query/Validators",
bz,
sdk.NewInt(200),
"",
0,
)

// set the query
suite.GetSimApp(suite.chainA).InterchainQueryKeeper.SetQuery(suite.chainA.GetContext(), *query)

// call end blocker
suite.GetSimApp(suite.chainA).InterchainQueryKeeper.EndBlocker(suite.chainA.GetContext())

err = suite.GetSimApp(suite.chainA).InterchainQueryKeeper.SetDatapointForID(
suite.chainA.GetContext(),
id,
suite.GetSimApp(suite.chainB).AppCodec().MustMarshalJSON(&qvr),
sdk.NewInt(suite.chainB.CurrentHeader.Height),
)
suite.NoError(err)

dataPoint, err := suite.GetSimApp(suite.chainA).InterchainQueryKeeper.GetDatapointForID(suite.chainA.GetContext(), id)
suite.NoError(err)
suite.NotNil(dataPoint)

// set the query
suite.GetSimApp(suite.chainA).InterchainQueryKeeper.DeleteQuery(suite.chainA.GetContext(), id)

// call end blocker
suite.GetSimApp(suite.chainA).InterchainQueryKeeper.EndBlocker(suite.chainA.GetContext())
}
49 changes: 49 additions & 0 deletions grpc_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package keeper

import (
"context"

"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/query"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ingenuity-build/quicksilver/x/interchainquery/types"
)

var _ types.QuerySrvrServer = Keeper{}

// Queries returns information about registered zones.
func (k Keeper) Queries(c context.Context, req *types.QueryRequestsRequest) (*types.QueryRequestsResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "empty request")
}

ctx := sdk.UnwrapSDKContext(c)

var queries []types.Query
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixQuery)

pageRes, err := query.FilteredPaginate(store, req.Pagination, func(_, value []byte, accumulate bool) (bool, error) {
var query types.Query
if err := k.cdc.Unmarshal(value, &query); err != nil {
return false, err
}

if query.ChainId == req.ChainId && (query.LastEmission.IsNil() || query.LastEmission.IsZero() || query.LastEmission.GTE(query.LastHeight)) {
queries = append(queries, query)
return true, nil
}

return false, nil
})
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &types.QueryRequestsResponse{
Queries: queries,
Pagination: pageRes,
}, nil
}
40 changes: 40 additions & 0 deletions grpc_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package keeper_test

import (
sdk "github.com/cosmos/cosmos-sdk/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"

icqtypes "github.com/ingenuity-build/quicksilver/x/interchainquery/types"
)

func (suite *KeeperTestSuite) TestQueries() {
bondedQuery := stakingtypes.QueryValidatorsRequest{Status: stakingtypes.BondStatusBonded}
bz, err := bondedQuery.Marshal()
suite.NoError(err)

query := suite.GetSimApp(suite.chainA).InterchainQueryKeeper.NewQuery(
suite.chainA.GetContext(),
"",
suite.path.EndpointB.ConnectionID,
suite.chainB.ChainID,
"cosmos.staking.v1beta1.Query/Validators",
bz,
sdk.NewInt(200),
"",
0,
)

// set the query
suite.GetSimApp(suite.chainA).InterchainQueryKeeper.SetQuery(suite.chainA.GetContext(), *query)

icqsrvSrv := icqtypes.QuerySrvrServer(suite.GetSimApp(suite.chainA).InterchainQueryKeeper)

res, err := icqsrvSrv.Queries(sdk.WrapSDKContext(suite.chainA.GetContext()), &icqtypes.QueryRequestsRequest{ChainId: suite.chainB.ChainID})
suite.NoError(err)
suite.Len(res.Queries, 1)
suite.Equal(suite.path.EndpointB.ConnectionID, res.Queries[0].ConnectionId)
suite.Equal(suite.chainB.ChainID, res.Queries[0].ChainId)
suite.Equal("cosmos.staking.v1beta1.Query/Validators", res.Queries[0].QueryType)
suite.Equal(sdk.NewInt(200), res.Queries[0].Period)
suite.Equal("", res.Queries[0].CallbackId)
}
151 changes: 151 additions & 0 deletions keeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package keeper

import (
"errors"
"fmt"

"cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
ibckeeper "github.com/cosmos/ibc-go/v5/modules/core/keeper"
"github.com/tendermint/tendermint/libs/log"

"github.com/ingenuity-build/quicksilver/x/interchainquery/types"
)

// Keeper of this module maintains collections of registered zones.
type Keeper struct {
cdc codec.Codec
storeKey storetypes.StoreKey
callbacks map[string]types.QueryCallbacks
IBCKeeper *ibckeeper.Keeper
}

// NewKeeper returns a new instance of zones Keeper
func NewKeeper(cdc codec.Codec, storeKey storetypes.StoreKey, ibckeeper *ibckeeper.Keeper) Keeper {
return Keeper{
cdc: cdc,
storeKey: storeKey,
callbacks: make(map[string]types.QueryCallbacks),
IBCKeeper: ibckeeper,
}
}

func (k *Keeper) SetCallbackHandler(module string, handler types.QueryCallbacks) error {
_, found := k.callbacks[module]
if found {
return fmt.Errorf("callback handler already set for %s", module)
}
k.callbacks[module] = handler.RegisterCallbacks()
return nil
}

// Logger returns a module-specific logger.
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName))
}

func (k *Keeper) SetDatapointForID(ctx sdk.Context, id string, result []byte, height math.Int) error {
mapping := types.DataPoint{Id: id, RemoteHeight: height, LocalHeight: sdk.NewInt(ctx.BlockHeight()), Value: result}
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
bz := k.cdc.MustMarshal(&mapping)
store.Set([]byte(id), bz)
return nil
}

func (k *Keeper) GetDatapointForID(ctx sdk.Context, id string) (types.DataPoint, error) {
mapping := types.DataPoint{}
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
bz := store.Get([]byte(id))
if len(bz) == 0 {
return types.DataPoint{}, fmt.Errorf("unable to find data for id %s", id)
}

k.cdc.MustUnmarshal(bz, &mapping)
return mapping, nil
}

// IterateDatapoints iterate through datapoints
func (k Keeper) IterateDatapoints(ctx sdk.Context, fn func(index int64, dp types.DataPoint) (stop bool)) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
iterator := sdk.KVStorePrefixIterator(store, nil)
defer iterator.Close()

i := int64(0)
for ; iterator.Valid(); iterator.Next() {
datapoint := types.DataPoint{}
k.cdc.MustUnmarshal(iterator.Value(), &datapoint)
stop := fn(i, datapoint)

if stop {
break
}
i++
}
}

// DeleteQuery delete datapoint
func (k Keeper) DeleteDatapoint(ctx sdk.Context, id string) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
store.Delete([]byte(id))
}

func (k *Keeper) GetDatapoint(ctx sdk.Context, module string, connectionID string, chainID string, queryType string, request []byte) (types.DataPoint, error) {
id := GenerateQueryHash(connectionID, chainID, queryType, request, module)
return k.GetDatapointForID(ctx, id)
}

func (k *Keeper) GetDatapointOrRequest(ctx sdk.Context, module string, connectionID string, chainID string, queryType string, request []byte, maxAge uint64) (types.DataPoint, error) {
val, err := k.GetDatapoint(ctx, module, connectionID, chainID, queryType, request)
if err != nil {
// no datapoint
k.MakeRequest(ctx, connectionID, chainID, queryType, request, sdk.NewInt(-1), "", "", maxAge)
return types.DataPoint{}, errors.New("no data; query submitted")
}

if val.LocalHeight.LT(sdk.NewInt(ctx.BlockHeight() - int64(maxAge))) { // this is somewhat arbitrary; TODO: make this better
k.MakeRequest(ctx, connectionID, chainID, queryType, request, sdk.NewInt(-1), "", "", maxAge)
return types.DataPoint{}, errors.New("stale data; query submitted")
}
// check ttl
return val, nil
}

func (k *Keeper) MakeRequest(ctx sdk.Context, connectionID string, chainID string, queryType string, request []byte, period math.Int, module string, callbackID string, ttl uint64) {
k.Logger(ctx).Info(
"MakeRequest",
"connection_id", connectionID,
"chain_id", chainID,
"query_type", queryType,
"request", request,
"period", period,
"module", module,
"callback", callbackID,
"ttl", ttl,
)
key := GenerateQueryHash(connectionID, chainID, queryType, request, module)
existingQuery, found := k.GetQuery(ctx, key)
if !found {
if module != "" && callbackID != "" {
if _, exists := k.callbacks[module]; !exists {
err := fmt.Errorf("no callback handler registered for module %s", module)
k.Logger(ctx).Error(err.Error())
panic(err)
}
if exists := k.callbacks[module].Has(callbackID); !exists {
err := fmt.Errorf("no callback %s registered for module %s", callbackID, module)
k.Logger(ctx).Error(err.Error())
panic(err)
}
}
newQuery := k.NewQuery(ctx, module, connectionID, chainID, queryType, request, period, callbackID, ttl)
k.SetQuery(ctx, *newQuery)
} else {
// a re-request of an existing query triggers resetting of height to trigger immediately.
k.Logger(ctx).Info("re-request", "LastHeight", existingQuery.LastHeight)
existingQuery.LastHeight = sdk.ZeroInt()
k.SetQuery(ctx, existingQuery)
}
}
Loading

0 comments on commit 86445f9

Please sign in to comment.