Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apollo config listener #117

Draft
wants to merge 7 commits into
base: release/v0.3.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions claimtxman/claimtxman.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types"
"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/etherman"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/0xPolygonHermez/zkevm-bridge-service/messagepush"
Expand Down Expand Up @@ -58,13 +57,11 @@ type ClaimTxManager struct {
// Producer to push the transaction status change to front end
messagePushProducer messagepush.KafkaProducer
redisStorage redisstorage.RedisStorage
monitorTxsLimit apolloconfig.Entry[uint]
monitorTxsLimit uint
}

// NewClaimTxManager creates a new claim transaction manager.
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint,
l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer,
redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) {
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, rollupID uint) (*ClaimTxManager, error) {
ctx := context.Background()
client, err := utils.NewClient(ctx, l2NodeURL, l2BridgeAddr)
if err != nil {
Expand All @@ -77,21 +74,18 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot
ctx, cancel := context.WithCancel(ctx)
auth, err := client.GetSignerFromKeystore(ctx, cfg.PrivateKey)
return &ClaimTxManager{
ctx: ctx,
cancel: cancel,
l2Node: client,
l2NetworkID: l2NetworkID,
bridgeService: bridgeService,
cfg: cfg,
chExitRootEvent: chExitRootEvent,
chSynced: chSynced,
storage: storage.(storageInterface),
auth: auth,
rollupID: rollupID,
nonceCache: cache,
messagePushProducer: producer,
redisStorage: redisStorage,
monitorTxsLimit: apolloconfig.NewIntEntry("claimtxman.monitorTxsLimit", uint(128)), //nolint:gomnd
ctx: ctx,
cancel: cancel,
l2Node: client,
l2NetworkID: l2NetworkID,
bridgeService: bridgeService,
cfg: cfg,
chExitRootEvent: chExitRootEvent,
chSynced: chSynced,
storage: storage.(storageInterface),
auth: auth,
rollupID: rollupID,
nonceCache: cache,
}, err
}

Expand Down
25 changes: 24 additions & 1 deletion claimtxman/claimtxman_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (

"github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl/pb"
ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types"
"github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig"
"github.com/0xPolygonHermez/zkevm-bridge-service/etherman"
"github.com/0xPolygonHermez/zkevm-bridge-service/log"
"github.com/0xPolygonHermez/zkevm-bridge-service/messagepush"
"github.com/0xPolygonHermez/zkevm-bridge-service/metrics"
"github.com/0xPolygonHermez/zkevm-bridge-service/pushtask"
"github.com/0xPolygonHermez/zkevm-bridge-service/redisstorage"
"github.com/0xPolygonHermez/zkevm-bridge-service/utils"
"github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror"
"github.com/0xPolygonHermez/zkevm-node/pool"
Expand All @@ -23,6 +26,26 @@ import (
"github.com/pkg/errors"
)

const (
defaultMonitorTxsLimit = 128
)

// NewClaimTxManagerXLayer creates a new claim transaction manager.
func NewClaimTxManagerXLayer(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint,
l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer,
redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) {
tm, err := NewClaimTxManager(cfg, chExitRootEvent, chSynced, l2NodeURL, l2NetworkID, l2BridgeAddr, bridgeService, storage, rollupID)
if err != nil {
return tm, err
}
tm.messagePushProducer = producer
tm.redisStorage = redisStorage
tm.monitorTxsLimit = defaultMonitorTxsLimit
apolloconfig.RegisterChangeHandler("claimtxman.monitorTxsLimit", &tm.monitorTxsLimit)
apolloconfig.RegisterChangeHandler("ClaimTxManager", &tm.cfg)
return tm, nil
}

// StartXLayer will start the tx management, reading txs from storage,
// send then to the blockchain and keep monitoring them until they
// get mined
Expand Down Expand Up @@ -423,7 +446,7 @@ func (tm *ClaimTxManager) monitorTxsXLayer(ctx context.Context) error {
mLog.Infof("monitorTxs begin")

statusesFilter := []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatusCreated}
mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit.Get(), 0, dbTx)
mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit, 0, dbTx)
if err != nil {
mLog.Errorf("failed to get created monitored txs: %v", err)
rollbackErr := tm.storage.Rollback(tm.ctx, dbTx)
Expand Down
5 changes: 2 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/0xPolygonHermez/zkevm-bridge-service/utils"
"github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -101,7 +100,7 @@ func start(ctx *cli.Context) error {
return err
}
rollupID := l1Etherman.GetRollupID()
bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, []*utils.Client{nil}, []*bind.TransactOpts{nil}, apiStorage, rollupID)
bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, apiStorage, rollupID)
err = server.RunServer(c.BridgeServer, bridgeService)
if err != nil {
log.Error(err)
Expand All @@ -122,7 +121,7 @@ func start(ctx *cli.Context) error {
for i := 0; i < len(c.Etherman.L2URLs); i++ {
// we should match the orders of L2URLs between etherman and claimtxman
// since we are using the networkIDs in the same order
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, nil, nil, rollupID)
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, rollupID)
if err != nil {
log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/run_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error {
utils.InitChainIdManager(networkIDs, chainIDs)

rollupID := l1Etherman.GetRollupID()
bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID).
bridgeService := server.NewBridgeServiceXLayer(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID).
WithRedisStorage(redisStorage).WithMainCoinsCache(localcache.GetDefaultCache()).WithMessagePushProducer(messagePushProducer)

// Initialize inner chain id conf
Expand Down Expand Up @@ -279,7 +279,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error {
for i := 0; i < len(c.Etherman.L2URLs); i++ {
// we should match the orders of L2URLs between etherman and claimtxman
// since we are using the networkIDs in the same order
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID)
claimTxManager, err := claimtxman.NewClaimTxManagerXLayer(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID)
if err != nil {
log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err)
}
Expand Down
183 changes: 58 additions & 125 deletions config/apolloconfig/apollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package apolloconfig
import (
"encoding"
"encoding/json"
"fmt"
"reflect"
"strings"

Expand All @@ -12,9 +13,7 @@ import (
)

var (
enabled = false
disableEntryDebugLog = false
defaultClient *agollo.Client
defaultClient *agollo.Client

textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem()
)
Expand All @@ -30,7 +29,6 @@ func GetClient() *agollo.Client {
// Init initializes the connection to the Apollo server
// This should not be called if Apollo config is disabled for the service
func Init(c Config) error {
enabled = true
SetLogger()
cfg := &agolloConfig.AppConfig{
AppID: c.AppID,
Expand All @@ -49,8 +47,8 @@ func Init(c Config) error {
return errors.Wrap(err, "start apollo client error")
}

client.AddChangeListener(GetDefaultListener())
defaultClient = client
disableEntryDebugLog = c.DisableEntryDebugLog
return nil
}

Expand Down Expand Up @@ -78,147 +76,82 @@ func handleStruct(v reflect.Value) error {

// Iterate and handle each field
for i := 0; i < v.NumField(); i++ {
origField := v.Field(i)
field := reflect.Indirect(origField)
field := v.Field(i)
structField := v.Type().Field(i)

// Get the config key from the field tag
key := structField.Tag.Get(tagName)
if key != "" && key != "-" {
if !field.CanSet() {
logger.Errorf("Load apollo: field %v cannot be set", structField.Name)
continue
}

// If config key is not empty, use it to query from Apollo server
// Process differently for each type
if field.CanAddr() && field.Addr().Type().Implements(textUnmarshalerType) {
loadTextUnmarshaler(field, key)
} else if field.Kind() == reflect.Struct {
loadStruct(field, key)
} else if field.CanInt() {
field.SetInt(NewIntEntry(key, field.Int()).Get())
} else if field.CanUint() {
field.SetUint(NewIntEntry(key, field.Uint()).Get())
} else if field.Kind() == reflect.String {
field.SetString(NewStringEntry(key, field.String()).Get())
} else if field.Kind() == reflect.Bool {
field.SetBool(NewBoolEntry(key, field.Bool()).Get())
} else if field.Kind() == reflect.Slice {
switch field.Type().Elem().Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
loadIntSlice(field, key)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
loadUintSlice(field, key)
case reflect.String:
loadStringSlice(field, key)
default:
if reflect.New(field.Type().Elem()).Type().Implements(textUnmarshalerType) {
loadTextUnmarshalerSlice(field, key)
} else {
logger.Debugf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type)
}
}
} else {
logger.Errorf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type)
}
}

if field.Kind() == reflect.Struct {
err := handleStruct(field)
if err != nil {
logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err)
}
err := handleObject(field, structField.Tag.Get(tagName))
if err != nil {
logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err)
}
}

return nil
}

func loadStruct(v reflect.Value, key string) {
s := NewStringEntry(key, "").Get()
if s == "" {
return
}

// Create a clone so we won't change the original values unexpectedly
temp := reflect.New(v.Type()).Interface()
// Always use JSON for struct
err := json.Unmarshal([]byte(s), &temp)
if err != nil {
return
}
v.Set(reflect.ValueOf(temp).Elem())
}
func handleObject(v reflect.Value, key string) error {
field := reflect.Indirect(v)

func loadIntSlice(v reflect.Value, key string) {
list, err := NewIntSliceEntry(key, []int64{}).GetWithErr()
if err != nil {
return
}

temp := reflect.MakeSlice(v.Type(), len(list), len(list))
elemType := v.Type().Elem()
for i, x := range list {
temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType))
if key != "" && key != "-" {
val, err := getString(key)
if err != nil {
return err
}
err = decodeStringToValue(val, field)
if err != nil {
return err
}
}

v.Set(temp)
return nil
}

func loadUintSlice(v reflect.Value, key string) {
list, err := NewIntSliceEntry(key, []uint64{}).GetWithErr()
if err != nil {
return
func decodeStringToValue(val string, v reflect.Value) error {
if !v.CanSet() {
return errors.New("cannot set value")
}

temp := reflect.MakeSlice(v.Type(), len(list), len(list))
elemType := v.Type().Elem()
for i, x := range list {
temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType))
if v.CanAddr() && v.Addr().Type().Implements(textUnmarshalerType) {
temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler)
err := temp.UnmarshalText([]byte(val))
if err != nil {
return errors.Wrap(err, "UnmarshalText error")
}
v.Set(reflect.Indirect(reflect.ValueOf(temp)))
} else if v.Kind() == reflect.String {
v.SetString(val)
} else {
// Decode json value (including struct, map, int, float, array, etc.)
// Create a clone so we won't change the original values unexpectedly
temp := reflect.New(v.Type()).Interface()
err := json.Unmarshal([]byte(val), &temp)
if err != nil {
return errors.Wrap(err, "json unmarshal error")
}
v.Set(reflect.ValueOf(temp).Elem())
}

v.Set(temp)
}

func loadStringSlice(v reflect.Value, key string) {
list, err := NewStringSliceEntry(key, []string{}).GetWithErr()
if err != nil {
return
// Inner fields' values on Apollo have higher priorities
// So we need to re-load the inner fields if this is a struct
if v.Kind() == reflect.Struct {
err := handleStruct(v)
if err != nil {
return errors.Wrap(err, "handleStruct error")
}
}
v.Set(reflect.ValueOf(list))
return nil
}

func loadTextUnmarshaler(v reflect.Value, key string) {
s, err := NewStringEntry(key, "").GetWithErr()
if err != nil {
return
var getString = func(key string) (string, error) {
client := GetClient()
if client == nil {
return "", errors.New("apollo client is nil")
}
temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler)
err = temp.UnmarshalText([]byte(s))
v, err := client.GetConfig(defaultNamespace).GetCache().Get(key)
if err != nil {
return
return "", err
}

v.Set(reflect.Indirect(reflect.ValueOf(temp)))
}

func loadTextUnmarshalerSlice(v reflect.Value, key string) {
list, err := NewStringSliceEntry(key, []string{}).GetWithErr()
if err != nil {
return
s, ok := v.(string)
if !ok {
return "", fmt.Errorf("value is not string, type: %T", v)
}

temp := reflect.MakeSlice(v.Type(), len(list), len(list))
elemType := v.Type().Elem()
for i, s := range list {
tempElem := reflect.New(elemType).Interface().(encoding.TextUnmarshaler)
err = tempElem.UnmarshalText([]byte(s))
if err != nil {
return
}
temp.Index(i).Set(reflect.Indirect(reflect.ValueOf(tempElem)))
}

v.Set(temp)
return s, nil
}
Loading
Loading