diff --git a/claimtxman/claimtxman.go b/claimtxman/claimtxman.go index 1db3a67f..6f0309f2 100644 --- a/claimtxman/claimtxman.go +++ b/claimtxman/claimtxman.go @@ -10,7 +10,6 @@ import ( "time" ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types" - "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/0xPolygonHermez/zkevm-bridge-service/messagepush" @@ -58,13 +57,11 @@ type ClaimTxManager struct { // Producer to push the transaction status change to front end messagePushProducer messagepush.KafkaProducer redisStorage redisstorage.RedisStorage - monitorTxsLimit apolloconfig.Entry[uint] + monitorTxsLimit uint } // NewClaimTxManager creates a new claim transaction manager. -func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, - l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer, - redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) { +func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, rollupID uint) (*ClaimTxManager, error) { ctx := context.Background() client, err := utils.NewClient(ctx, l2NodeURL, l2BridgeAddr) if err != nil { @@ -77,21 +74,18 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot ctx, cancel := context.WithCancel(ctx) auth, err := client.GetSignerFromKeystore(ctx, cfg.PrivateKey) return &ClaimTxManager{ - ctx: ctx, - cancel: cancel, - l2Node: client, - l2NetworkID: l2NetworkID, - bridgeService: bridgeService, - cfg: cfg, - chExitRootEvent: chExitRootEvent, - chSynced: chSynced, - storage: storage.(storageInterface), - auth: auth, - rollupID: rollupID, - nonceCache: cache, - messagePushProducer: producer, - redisStorage: redisStorage, - monitorTxsLimit: apolloconfig.NewIntEntry("claimtxman.monitorTxsLimit", uint(128)), //nolint:gomnd + ctx: ctx, + cancel: cancel, + l2Node: client, + l2NetworkID: l2NetworkID, + bridgeService: bridgeService, + cfg: cfg, + chExitRootEvent: chExitRootEvent, + chSynced: chSynced, + storage: storage.(storageInterface), + auth: auth, + rollupID: rollupID, + nonceCache: cache, }, err } diff --git a/claimtxman/claimtxman_xlayer.go b/claimtxman/claimtxman_xlayer.go index 9fe20bbb..0a334f1d 100644 --- a/claimtxman/claimtxman_xlayer.go +++ b/claimtxman/claimtxman_xlayer.go @@ -8,10 +8,13 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl/pb" ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/0xPolygonHermez/zkevm-bridge-service/messagepush" "github.com/0xPolygonHermez/zkevm-bridge-service/metrics" "github.com/0xPolygonHermez/zkevm-bridge-service/pushtask" + "github.com/0xPolygonHermez/zkevm-bridge-service/redisstorage" "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-node/pool" @@ -23,6 +26,26 @@ import ( "github.com/pkg/errors" ) +const ( + defaultMonitorTxsLimit = 128 +) + +// NewClaimTxManagerXLayer creates a new claim transaction manager. +func NewClaimTxManagerXLayer(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, + l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer, + redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) { + tm, err := NewClaimTxManager(cfg, chExitRootEvent, chSynced, l2NodeURL, l2NetworkID, l2BridgeAddr, bridgeService, storage, rollupID) + if err != nil { + return tm, err + } + tm.messagePushProducer = producer + tm.redisStorage = redisStorage + tm.monitorTxsLimit = defaultMonitorTxsLimit + apolloconfig.RegisterChangeHandler("claimtxman.monitorTxsLimit", &tm.monitorTxsLimit) + apolloconfig.RegisterChangeHandler("ClaimTxManager", &tm.cfg) + return tm, nil +} + // StartXLayer will start the tx management, reading txs from storage, // send then to the blockchain and keep monitoring them until they // get mined @@ -423,7 +446,7 @@ func (tm *ClaimTxManager) monitorTxsXLayer(ctx context.Context) error { mLog.Infof("monitorTxs begin") statusesFilter := []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatusCreated} - mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit.Get(), 0, dbTx) + mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit, 0, dbTx) if err != nil { mLog.Errorf("failed to get created monitored txs: %v", err) rollbackErr := tm.storage.Rollback(tm.ctx, dbTx) diff --git a/cmd/run.go b/cmd/run.go index 539a8f7b..eadea0fc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -21,7 +21,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/client" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/urfave/cli/v2" ) @@ -101,7 +100,7 @@ func start(ctx *cli.Context) error { return err } rollupID := l1Etherman.GetRollupID() - bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, []*utils.Client{nil}, []*bind.TransactOpts{nil}, apiStorage, rollupID) + bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, apiStorage, rollupID) err = server.RunServer(c.BridgeServer, bridgeService) if err != nil { log.Error(err) @@ -122,7 +121,7 @@ func start(ctx *cli.Context) error { for i := 0; i < len(c.Etherman.L2URLs); i++ { // we should match the orders of L2URLs between etherman and claimtxman // since we are using the networkIDs in the same order - claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, nil, nil, rollupID) + claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, rollupID) if err != nil { log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err) } diff --git a/cmd/run_xlayer.go b/cmd/run_xlayer.go index bef6577f..327a6379 100644 --- a/cmd/run_xlayer.go +++ b/cmd/run_xlayer.go @@ -208,7 +208,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error { utils.InitChainIdManager(networkIDs, chainIDs) rollupID := l1Etherman.GetRollupID() - bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID). + bridgeService := server.NewBridgeServiceXLayer(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID). WithRedisStorage(redisStorage).WithMainCoinsCache(localcache.GetDefaultCache()).WithMessagePushProducer(messagePushProducer) // Initialize inner chain id conf @@ -279,7 +279,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error { for i := 0; i < len(c.Etherman.L2URLs); i++ { // we should match the orders of L2URLs between etherman and claimtxman // since we are using the networkIDs in the same order - claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID) + claimTxManager, err := claimtxman.NewClaimTxManagerXLayer(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID) if err != nil { log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err) } diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index 32f3c6b7..4a79249a 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -3,6 +3,7 @@ package apolloconfig import ( "encoding" "encoding/json" + "fmt" "reflect" "strings" @@ -12,9 +13,7 @@ import ( ) var ( - enabled = false - disableEntryDebugLog = false - defaultClient *agollo.Client + defaultClient *agollo.Client textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() ) @@ -30,7 +29,6 @@ func GetClient() *agollo.Client { // Init initializes the connection to the Apollo server // This should not be called if Apollo config is disabled for the service func Init(c Config) error { - enabled = true SetLogger() cfg := &agolloConfig.AppConfig{ AppID: c.AppID, @@ -49,8 +47,8 @@ func Init(c Config) error { return errors.Wrap(err, "start apollo client error") } + client.AddChangeListener(GetDefaultListener()) defaultClient = client - disableEntryDebugLog = c.DisableEntryDebugLog return nil } @@ -78,147 +76,82 @@ func handleStruct(v reflect.Value) error { // Iterate and handle each field for i := 0; i < v.NumField(); i++ { - origField := v.Field(i) - field := reflect.Indirect(origField) + field := v.Field(i) structField := v.Type().Field(i) - - // Get the config key from the field tag - key := structField.Tag.Get(tagName) - if key != "" && key != "-" { - if !field.CanSet() { - logger.Errorf("Load apollo: field %v cannot be set", structField.Name) - continue - } - - // If config key is not empty, use it to query from Apollo server - // Process differently for each type - if field.CanAddr() && field.Addr().Type().Implements(textUnmarshalerType) { - loadTextUnmarshaler(field, key) - } else if field.Kind() == reflect.Struct { - loadStruct(field, key) - } else if field.CanInt() { - field.SetInt(NewIntEntry(key, field.Int()).Get()) - } else if field.CanUint() { - field.SetUint(NewIntEntry(key, field.Uint()).Get()) - } else if field.Kind() == reflect.String { - field.SetString(NewStringEntry(key, field.String()).Get()) - } else if field.Kind() == reflect.Bool { - field.SetBool(NewBoolEntry(key, field.Bool()).Get()) - } else if field.Kind() == reflect.Slice { - switch field.Type().Elem().Kind() { - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - loadIntSlice(field, key) - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: - loadUintSlice(field, key) - case reflect.String: - loadStringSlice(field, key) - default: - if reflect.New(field.Type().Elem()).Type().Implements(textUnmarshalerType) { - loadTextUnmarshalerSlice(field, key) - } else { - logger.Debugf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type) - } - } - } else { - logger.Errorf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type) - } - } - - if field.Kind() == reflect.Struct { - err := handleStruct(field) - if err != nil { - logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err) - } + err := handleObject(field, structField.Tag.Get(tagName)) + if err != nil { + logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err) } } return nil } -func loadStruct(v reflect.Value, key string) { - s := NewStringEntry(key, "").Get() - if s == "" { - return - } - - // Create a clone so we won't change the original values unexpectedly - temp := reflect.New(v.Type()).Interface() - // Always use JSON for struct - err := json.Unmarshal([]byte(s), &temp) - if err != nil { - return - } - v.Set(reflect.ValueOf(temp).Elem()) -} +func handleObject(v reflect.Value, key string) error { + field := reflect.Indirect(v) -func loadIntSlice(v reflect.Value, key string) { - list, err := NewIntSliceEntry(key, []int64{}).GetWithErr() - if err != nil { - return - } - - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, x := range list { - temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType)) + if key != "" && key != "-" { + val, err := getString(key) + if err != nil { + return err + } + err = decodeStringToValue(val, field) + if err != nil { + return err + } } - v.Set(temp) + return nil } -func loadUintSlice(v reflect.Value, key string) { - list, err := NewIntSliceEntry(key, []uint64{}).GetWithErr() - if err != nil { - return +func decodeStringToValue(val string, v reflect.Value) error { + if !v.CanSet() { + return errors.New("cannot set value") } - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, x := range list { - temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType)) + if v.CanAddr() && v.Addr().Type().Implements(textUnmarshalerType) { + temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler) + err := temp.UnmarshalText([]byte(val)) + if err != nil { + return errors.Wrap(err, "UnmarshalText error") + } + v.Set(reflect.Indirect(reflect.ValueOf(temp))) + } else if v.Kind() == reflect.String { + v.SetString(val) + } else { + // Decode json value (including struct, map, int, float, array, etc.) + // Create a clone so we won't change the original values unexpectedly + temp := reflect.New(v.Type()).Interface() + err := json.Unmarshal([]byte(val), &temp) + if err != nil { + return errors.Wrap(err, "json unmarshal error") + } + v.Set(reflect.ValueOf(temp).Elem()) } - v.Set(temp) -} - -func loadStringSlice(v reflect.Value, key string) { - list, err := NewStringSliceEntry(key, []string{}).GetWithErr() - if err != nil { - return + // Inner fields' values on Apollo have higher priorities + // So we need to re-load the inner fields if this is a struct + if v.Kind() == reflect.Struct { + err := handleStruct(v) + if err != nil { + return errors.Wrap(err, "handleStruct error") + } } - v.Set(reflect.ValueOf(list)) + return nil } -func loadTextUnmarshaler(v reflect.Value, key string) { - s, err := NewStringEntry(key, "").GetWithErr() - if err != nil { - return +var getString = func(key string) (string, error) { + client := GetClient() + if client == nil { + return "", errors.New("apollo client is nil") } - temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler) - err = temp.UnmarshalText([]byte(s)) + v, err := client.GetConfig(defaultNamespace).GetCache().Get(key) if err != nil { - return + return "", err } - - v.Set(reflect.Indirect(reflect.ValueOf(temp))) -} - -func loadTextUnmarshalerSlice(v reflect.Value, key string) { - list, err := NewStringSliceEntry(key, []string{}).GetWithErr() - if err != nil { - return + s, ok := v.(string) + if !ok { + return "", fmt.Errorf("value is not string, type: %T", v) } - - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, s := range list { - tempElem := reflect.New(elemType).Interface().(encoding.TextUnmarshaler) - err = tempElem.UnmarshalText([]byte(s)) - if err != nil { - return - } - temp.Index(i).Set(reflect.Indirect(reflect.ValueOf(tempElem))) - } - - v.Set(temp) + return s, nil } diff --git a/config/apolloconfig/apollo_test.go b/config/apolloconfig/apollo_test.go new file mode 100644 index 00000000..2498fbec --- /dev/null +++ b/config/apolloconfig/apollo_test.go @@ -0,0 +1,70 @@ +package apolloconfig + +import ( + "testing" + "time" + + "github.com/0xPolygonHermez/zkevm-node/config/types" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestLoad(t *testing.T) { + type SubStruct struct { + E map[common.Address]common.Address `apollo:"d.e"` + F []common.Address `apollo:"d.f"` + G string `apollo:"d.g"` + H []string `apollo:"d.h"` + I types.Duration `apollo:"d.i"` + } + type StructTest struct { + A int64 `apollo:"a"` + B []int32 `apollo:"b"` + C bool `apollo:"c"` + D SubStruct `apollo:"d"` + } + + // Mocking the result from Apollo server + resultMapping := map[string]string{ + "a": `123`, + "b": `[1,2]`, + "c": `true`, + "d.e": `{"0x167985f547e5087DA14084b80762104d36c08756":"0xB82381A3fBD3FaFA77B3a7bE693342618240067b", "0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1":"0xB82381A3fBD3FaFA77B3a7bE693342618240067b"}`, + "d.f": `["0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1"]`, + "d.g": `dgstring`, + "d": `{"F":["0x167985f547e5087DA14084b80762104d36c08756","0xB82381A3fBD3FaFA77B3a7bE693342618240067b"],"G":"dg","H":["s1","s2","s3"],"I":"3m5s"}`, + } + + expected := StructTest{ + A: 123, + B: []int32{1, 2}, + C: true, + D: SubStruct{ + E: map[common.Address]common.Address{ + common.HexToAddress("0x167985f547e5087DA14084b80762104d36c08756"): common.HexToAddress("0xB82381A3fBD3FaFA77B3a7bE693342618240067b"), + common.HexToAddress("0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1"): common.HexToAddress("0xB82381A3fBD3FaFA77B3a7bE693342618240067b"), + }, + F: []common.Address{common.HexToAddress("0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1")}, + G: "dgstring", + H: []string{"s1", "s2", "s3"}, + I: types.NewDuration(3*time.Minute + 5*time.Second), + }, + } + + getString = func(key string) (string, error) { + s, ok := resultMapping[key] + if !ok { + return "", errors.New("key not found") + } + return s, nil + } + + var output StructTest + err := Load(output) + require.Error(t, err) + + err = Load(&output) + require.NoError(t, err) + require.Equal(t, expected, output) +} diff --git a/config/apolloconfig/consts.go b/config/apolloconfig/consts.go index b6a0b721..34a2fcfe 100644 --- a/config/apolloconfig/consts.go +++ b/config/apolloconfig/consts.go @@ -4,9 +4,5 @@ const ( loggerFieldKey = "component" loggerFieldValue = "apolloconfig" - parseIntBase = 10 - parseIntBitSize = 64 - - comma = "," defaultNamespace = "application" ) diff --git a/config/apolloconfig/entry.go b/config/apolloconfig/entry.go deleted file mode 100644 index 07a1fc04..00000000 --- a/config/apolloconfig/entry.go +++ /dev/null @@ -1,165 +0,0 @@ -package apolloconfig - -import ( - "fmt" - "strconv" - "strings" - - "github.com/apolloconfig/agollo/v4" - "github.com/pkg/errors" - "golang.org/x/exp/constraints" -) - -type Entry[T any] interface { - Get() T - GetWithErr() (T, error) -} - -// An interface to get the config from Apollo client (and convert it if needed) -type getterFunction[T any] func(client *agollo.Client, namespace, key string) (T, error) - -type entryImpl[T any] struct { - namespace string - key string - defaultValue T - getterFn getterFunction[T] -} - -type entryOption[T any] func(*entryImpl[T]) - -func WithNamespace[T any](namespace string) entryOption[T] { - return func(e *entryImpl[T]) { - e.namespace = namespace - } -} - -// newEntry is a generic constructor for apolloconfig.Entry -func newEntry[T any](key string, defaultValue T, getterFn getterFunction[T], opts ...entryOption[T]) Entry[T] { - e := &entryImpl[T]{ - namespace: defaultNamespace, - key: key, - defaultValue: defaultValue, - getterFn: getterFn, - } - - for _, o := range opts { - o(e) - } - - return e -} - -func NewIntEntry[T constraints.Integer](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { - return newEntry(key, defaultValue, getInt[T], opts...) -} - -func NewIntSliceEntry[T constraints.Integer](key string, defaultValue []T, opts ...entryOption[[]T]) Entry[[]T] { - return newEntry(key, defaultValue, getIntSlice[T], opts...) -} - -func NewBoolEntry(key string, defaultValue bool, opts ...entryOption[bool]) Entry[bool] { - return newEntry(key, defaultValue, getBool, opts...) -} - -func NewStringEntry(key string, defaultValue string, opts ...entryOption[string]) Entry[string] { - return newEntry(key, defaultValue, getString, opts...) -} - -// String array is separated by commas, so this will work incorrectly if we have comma in the elements -func NewStringSliceEntry(key string, defaultValue []string, opts ...entryOption[[]string]) Entry[[]string] { - return newEntry(key, defaultValue, getStringSlice, opts...) -} - -func (e *entryImpl[T]) String() string { - return fmt.Sprintf("%v", e.Get()) -} - -func (e *entryImpl[T]) Get() T { - logger := getLogger().WithFields("key", e.key) - v, err := e.GetWithErr() - if err != nil && !disableEntryDebugLog { - logger.Debugf("error[%v], returning default value", err) - } - return v -} - -func (e *entryImpl[T]) GetWithErr() (T, error) { - // If Apollo config is not enabled, just return the default value - if !enabled { - return e.defaultValue, errors.New("apollo disabled") - } - - // If client is not initialized, return the default value - client := GetClient() - if client == nil { - return e.defaultValue, errors.New("apollo client is nil") - } - - if e.getterFn == nil { - return e.defaultValue, errors.New("getterFn is nil") - } - - v, err := e.getterFn(client, e.namespace, e.key) - if err != nil { - return e.defaultValue, errors.Wrap(err, "getterFn error") - } - return v, nil -} - -// ----- Getter functions ----- - -func getString(client *agollo.Client, namespace, key string) (string, error) { - v, err := client.GetConfig(namespace).GetCache().Get(key) - if err != nil { - return "", err - } - s, ok := v.(string) - if !ok { - return "", fmt.Errorf("value is not string, type: %T", v) - } - return s, nil -} - -// String array is separated by commas, so this will work incorrectly if we have comma in the elements -func getStringSlice(client *agollo.Client, namespace, key string) ([]string, error) { - s, err := getString(client, namespace, key) - if err != nil { - return nil, err - } - return strings.Split(s, comma), nil -} - -func getInt[T constraints.Integer](client *agollo.Client, namespace, key string) (T, error) { - s, err := getString(client, namespace, key) - if err != nil { - return 0, err - } - res, err := strconv.ParseInt(s, parseIntBase, parseIntBitSize) - return T(res), err -} - -func getIntSlice[T constraints.Integer](client *agollo.Client, namespace, key string) ([]T, error) { - s, err := getString(client, namespace, key) - if err != nil { - return nil, err - } - - sArr := strings.Split(s, comma) - result := make([]T, len(sArr)) - for i := range sArr { - v, err := strconv.ParseInt(sArr[i], parseIntBase, parseIntBitSize) - if err != nil { - return nil, err - } - result[i] = T(v) - } - return result, nil -} - -func getBool(client *agollo.Client, namespace, key string) (bool, error) { - s, err := getString(client, namespace, key) - if err != nil { - return false, err - } - return strconv.ParseBool(s) -} diff --git a/config/apolloconfig/listener.go b/config/apolloconfig/listener.go new file mode 100644 index 00000000..f1925e64 --- /dev/null +++ b/config/apolloconfig/listener.go @@ -0,0 +1,127 @@ +package apolloconfig + +import ( + "reflect" + "sync" + + "github.com/apolloconfig/agollo/v4/storage" +) + +// ConfigChangeListener implements agollo's storage.ChangeListener to handle the config changes from Apollo remote server +type ConfigChangeListener struct { + changeHandlers map[string][]*changeHandler +} + +var listener *ConfigChangeListener + +// GetDefaultListener is a singleton getter for ConfigChangeListener +func GetDefaultListener() *ConfigChangeListener { + if listener != nil { + return listener + } + + listener = &ConfigChangeListener{ + changeHandlers: make(map[string][]*changeHandler), + } + return listener +} + +func RegisterChangeHandler[T any](key string, obj *T, opts ...handlerOpt) { + if obj != nil { + opts = append(opts, withConfigObj(obj)) + } + GetDefaultListener().RegisterHandler(key, opts...) +} + +func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) { + getLogger().Debugf("ConfigChangeListener#OnChange received: %v", toJson(event)) + + for key, change := range event.Changes { + for _, handler := range l.changeHandlers[key] { + handler.handle(change, key) + } + } +} + +func (l *ConfigChangeListener) OnNewestChange(_ *storage.FullChangeEvent) {} + +func (l *ConfigChangeListener) RegisterHandler(key string, opts ...handlerOpt) { + if len(opts) == 0 { + return + } + + handler := &changeHandler{} + for _, f := range opts { + f(handler) + } + l.changeHandlers[key] = append(l.changeHandlers[key], handler) +} + +// changeHandler contains the information for handling the config change for one config, in a specific context +type changeHandler struct { + obj any + beforeFn func(key string, change *storage.ConfigChange) + afterFn func(key string, change *storage.ConfigChange, obj any) + locker sync.Locker +} + +func (h *changeHandler) handle(change *storage.ConfigChange, key string) { + if h.locker != nil { + h.locker.Lock() + defer h.locker.Unlock() + } + + if h.beforeFn != nil { + h.beforeFn(key, change) + } + + if h.obj != nil && change.ChangeType != storage.DELETED { + // Only update the object if change is ADDED or MODIFIED + err := decodeStringToObject(change.NewValue.(string), h.obj) + if err != nil { + getLogger().WithFields("key", key).Errorf("changeHandler#handle decodeStringToObject error: %v", err) + } + } + + if h.afterFn != nil { + h.afterFn(key, change, h.obj) + } +} + +type handlerOpt func(handler *changeHandler) + +// withConfigObj assigns an object to be updated when a specific config key is changed. +// The logic for updating can be found in decodeStringToValue function +// obj must be a pointer +func withConfigObj[T any](obj *T) handlerOpt { + return func(handler *changeHandler) { + handler.obj = obj + } +} + +// WithBeforeFn assigns a function to be called before the config object is updated +func WithBeforeFn(beforeFn func(string, *storage.ConfigChange)) handlerOpt { + return func(handler *changeHandler) { + handler.beforeFn = beforeFn + } +} + +// WithAfterFn assigns a function to be called after the config object is updated +func WithAfterFn(afterFn func(string, *storage.ConfigChange, any)) handlerOpt { + return func(handler *changeHandler) { + handler.afterFn = afterFn + } +} + +// WithLocker assigns a locker object (e.g. sync.Mutex, sync.RWMutex) +// When a config change is received, we will first wrap the config update operations inside lock.Lock()/locker.Unlock() +func WithLocker(locker sync.Locker) handlerOpt { + return func(handler *changeHandler) { + handler.locker = locker + } +} + +func decodeStringToObject(val string, obj any) error { + v := reflect.Indirect(reflect.ValueOf(obj)) + return decodeStringToValue(val, v) +} diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go new file mode 100644 index 00000000..9003aaa7 --- /dev/null +++ b/config/apolloconfig/listener_test.go @@ -0,0 +1,135 @@ +package apolloconfig + +import ( + "sync" + "testing" + + "github.com/apolloconfig/agollo/v4/storage" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestConfigChangeListener(t *testing.T) { + type SubStruct struct { + C float64 + D map[string]bool `apollo:"mp"` + E string `apollo:"e"` + } + + type StructTest struct { + A string `apollo:"stringField"` + B SubStruct `apollo:"sub"` + } + + // Mocking the result from Apollo server + configMapping := map[string]string{ + "stringField": "aaa", + "mp": `{"a": true, "b": false, "c": false}`, + "sub": `{"C":0.55, "E": "e1"}`, + } + + getString = func(key string) (string, error) { + s, ok := configMapping[key] + if !ok { + return "", errors.New("key not found") + } + return s, nil + } + + expected := StructTest{ + A: "aaa", + B: SubStruct{ + C: 0.55, + D: map[string]bool{"a": true, "b": false, "c": false}, + E: "e1", + }, + } + + cnt := make(map[string]int) + before := func(key string, _ *storage.ConfigChange) { + cnt[key]++ + } + + var s StructTest + err := Load(&s) + require.NoError(t, err) + require.Equal(t, expected, s) + + var stringField = s.A + mutex := &sync.Mutex{} + RegisterChangeHandler("stringField", &stringField, WithBeforeFn(before), WithLocker(mutex)) + RegisterChangeHandler("stringField", &s.A, WithBeforeFn(before), WithLocker(mutex)) + RegisterChangeHandler("sub", &s.B, WithBeforeFn(before), WithLocker(mutex), + WithAfterFn(func(_ string, _ *storage.ConfigChange, obj any) { + b, ok := obj.(*SubStruct) + require.True(t, ok) + require.Equal(t, expected.B, *b) + })) + RegisterChangeHandler("e", &s.B.E, WithBeforeFn(before), WithLocker(mutex)) + RegisterChangeHandler("mp", &s.B.D, WithBeforeFn(before), WithLocker(mutex)) + + expected.A = "bbb" + expected.B.C = 1.5 + expected.B.E = "e2" + listener := GetDefaultListener() + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "stringField": { + ChangeType: storage.MODIFIED, + NewValue: "bbb", + }, + "sub": { + ChangeType: storage.MODIFIED, + NewValue: `{"C": 1.5, "D": {"z": true}, "E": "e2"}`, + }, + }, + }) + require.Equal(t, expected, s) + require.Equal(t, "bbb", stringField) + require.Equal(t, 2, cnt["stringField"]) + require.Equal(t, 1, cnt["sub"]) + + expected.A = "ccc" + expected.B.E = "e3" + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "stringField": { + ChangeType: storage.MODIFIED, + NewValue: "ccc", + }, + "e": { + ChangeType: storage.ADDED, + NewValue: "e3", + }, + }, + }) + require.Equal(t, expected, s) + require.Equal(t, "ccc", stringField) + require.Equal(t, 4, cnt["stringField"]) + require.Equal(t, 1, cnt["sub"]) + require.Equal(t, 1, cnt["e"]) + + // Test invalid new value + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "mp": { + ChangeType: storage.MODIFIED, + NewValue: "---", + }, + }, + }) + require.Equal(t, expected, s) + require.Equal(t, 1, cnt["mp"]) + + expected.B.D = map[string]bool{"z": false} + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "mp": { + ChangeType: storage.MODIFIED, + NewValue: `{"z": false}`, + }, + }, + }) + require.Equal(t, expected, s) + require.Equal(t, 2, cnt["mp"]) +} diff --git a/config/apolloconfig/utils.go b/config/apolloconfig/utils.go index 3ae02367..9d2af34c 100644 --- a/config/apolloconfig/utils.go +++ b/config/apolloconfig/utils.go @@ -1,6 +1,8 @@ package apolloconfig import ( + "encoding/json" + "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/apolloconfig/agollo/v4" ) @@ -12,3 +14,8 @@ func getLogger() *log.Logger { func SetLogger() { agollo.SetLogger(getLogger()) } + +func toJson(x any) string { + b, _ := json.Marshal(x) + return string(b) +} diff --git a/estimatetime/estimatetime.go b/estimatetime/estimatetime.go index c1e67963..091eb755 100644 --- a/estimatetime/estimatetime.go +++ b/estimatetime/estimatetime.go @@ -45,8 +45,8 @@ func GetDefaultCalculator() Calculator { type calculatorImpl struct { storage DBStorage estimateTime []uint32 // In minutes - defaultEstTimeConfig apolloconfig.Entry[[]uint32] - sampleLimit apolloconfig.Entry[uint] + defaultEstTimeConfig []uint32 + sampleLimit uint } func NewCalculator(storage interface{}) (Calculator, error) { @@ -56,10 +56,12 @@ func NewCalculator(storage interface{}) (Calculator, error) { c := &calculatorImpl{ storage: storage.(DBStorage), estimateTime: make([]uint32, estTimeSize), - defaultEstTimeConfig: apolloconfig.NewIntSliceEntry[uint32](estTimeConfigKey, []uint32{defaultL1EstimateTime, defaultL2EstimateTime}), - sampleLimit: apolloconfig.NewIntEntry[uint](sampleLimitConfigKey, defaultSampleLimit), + defaultEstTimeConfig: []uint32{defaultL1EstimateTime, defaultL2EstimateTime}, + sampleLimit: defaultSampleLimit, } - def := c.defaultEstTimeConfig.Get() + apolloconfig.RegisterChangeHandler("estimateTime.defaultTime", &c.defaultEstTimeConfig) + apolloconfig.RegisterChangeHandler("estimateTime.sampleLimit", &c.sampleLimit) + def := c.defaultEstTimeConfig for i := 0; i < estTimeSize; i++ { c.estimateTime[i] = def[i] } @@ -93,7 +95,7 @@ func (c *calculatorImpl) refresh(ctx context.Context, networkID uint) error { if networkID > 1 { return fmt.Errorf("invalid networkID %v", networkID) } - deposits, err := c.storage.GetLatestReadyDeposits(ctx, networkID, c.sampleLimit.Get(), nil) + deposits, err := c.storage.GetLatestReadyDeposits(ctx, networkID, c.sampleLimit, nil) if err != nil { log.Errorf("GetLatestReadyDeposits err:%v", err) return err @@ -121,7 +123,7 @@ func (c *calculatorImpl) refresh(ctx context.Context, networkID uint) error { } newTime := uint32(math.Ceil(sum / float64(len(fMinutes)))) log.Debugf("Re-calculate estimate time, networkID[%v], fMinutes[%v], newTime[%v]", networkID, fMinutes, newTime) - defaultTime := c.defaultEstTimeConfig.Get()[networkID] + defaultTime := c.defaultEstTimeConfig[networkID] if newTime > defaultTime { newTime = defaultTime } diff --git a/messagepush/kafkaproducer.go b/messagepush/kafkaproducer.go index 64eb7888..e6db9590 100644 --- a/messagepush/kafkaproducer.go +++ b/messagepush/kafkaproducer.go @@ -48,9 +48,9 @@ type KafkaProducer interface { type kafkaProducerImpl struct { producer sarama.SyncProducer - defaultTopic apolloconfig.Entry[string] - defaultPushKey apolloconfig.Entry[string] - bizCode apolloconfig.Entry[string] + defaultTopic string + defaultPushKey string + bizCode string } func NewKafkaProducer(cfg Config) (KafkaProducer, error) { @@ -87,12 +87,16 @@ func NewKafkaProducer(cfg Config) (KafkaProducer, error) { if err != nil { return nil, errors.Wrap(err, "NewKafkaProducer: NewSyncProducer error") } - return &kafkaProducerImpl{ + p := &kafkaProducerImpl{ producer: producer, - defaultTopic: apolloconfig.NewStringEntry("MessagePushProducer.Topic", cfg.Topic), - defaultPushKey: apolloconfig.NewStringEntry("MessagePushProducer.PushKey", cfg.PushKey), - bizCode: apolloconfig.NewStringEntry("MessagePushProducer.BizCode", BizCodeBridgeOrder), - }, nil + defaultTopic: cfg.Topic, + defaultPushKey: cfg.PushKey, + bizCode: BizCodeBridgeOrder, + } + apolloconfig.RegisterChangeHandler("MessagePushProducer.Topic", &p.defaultTopic) + apolloconfig.RegisterChangeHandler("MessagePushProducer.PushKey", &p.defaultPushKey) + apolloconfig.RegisterChangeHandler("MessagePushProducer.BizCode", &p.bizCode) + return p, nil } // Produce send a message to the Kafka topic @@ -104,8 +108,8 @@ func (p *kafkaProducerImpl) Produce(msg interface{}, optFns ...produceOptFunc) e return nil } opts := &produceOptions{ - topic: p.defaultTopic.Get(), - pushKey: p.defaultPushKey.Get(), + topic: p.defaultTopic, + pushKey: p.defaultPushKey, } for _, f := range optFns { f(opts) @@ -159,7 +163,7 @@ func (p *kafkaProducerImpl) PushTransactionUpdate(tx *pb.Transaction, optFns ... } msg := &PushMessage{ - BizCode: p.bizCode.Get(), + BizCode: p.bizCode, WalletAddress: tx.GetDestAddr(), RequestID: utils.GenerateTraceID(), PushContent: fmt.Sprintf("[%v]", string(b)), diff --git a/pushtask/committedbatchtask.go b/pushtask/committedbatchtask.go index 9ea5740b..6fdf9688 100644 --- a/pushtask/committedbatchtask.go +++ b/pushtask/committedbatchtask.go @@ -25,11 +25,17 @@ const ( ) var ( - minCommitDuration = apolloconfig.NewIntEntry[uint64]("pushtask.minCommitDuration", 2) //nolint:gomnd - defaultCommitDuration = apolloconfig.NewIntEntry[uint64]("pushtask.defaultCommitDuration", 10) //nolint:gomnd - commitDurationListLen = apolloconfig.NewIntEntry[int]("pushtask.commitDurationListLen", 5) //nolint:gomnd + minCommitDuration uint64 = 2 //nolint:gomnd + defaultCommitDuration uint64 = 10 //nolint:gomnd + commitDurationListLen = 5 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("pushtask.minCommitDuration", &minCommitDuration) + apolloconfig.RegisterChangeHandler("pushtask.defaultCommitDuration", &defaultCommitDuration) + apolloconfig.RegisterChangeHandler("pushtask.commitDurationListLen", &commitDurationListLen) +} + type CommittedBatchHandler struct { rpcUrl string client *ethclient.Client @@ -214,7 +220,7 @@ func (ins *CommittedBatchHandler) freshRedisForAvgCommitDuration(ctx context.Con if err != nil { return err } - if listLen <= int64(commitDurationListLen.Get()) { + if listLen <= int64(commitDurationListLen) { log.Infof("redis duration list is not enough, so skip count the avg duration!") return nil } @@ -297,18 +303,18 @@ func (ins *CommittedBatchHandler) pushMsgForDeposit(deposit *etherman.Deposit, l // checkAvgDurationLegal duration has a default range, 2-10 minutes, if over range, maybe dirty data, drop the data func (ins *CommittedBatchHandler) checkAvgDurationLegal(avgDuration int64) bool { - return avgDuration > int64(minCommitDuration.Get()) && avgDuration < int64(defaultCommitDuration.Get()) + return avgDuration > int64(minCommitDuration) && avgDuration < int64(defaultCommitDuration) } func GetAvgCommitDuration(ctx context.Context, redisStorage redisstorage.RedisStorage) uint64 { avgDuration, err := redisStorage.GetAvgCommitDuration(ctx) if err != nil && !errors.Is(err, redis.Nil) { log.Errorf("get avg commit duration from redis failed, error: %v", err) - return defaultCommitDuration.Get() + return defaultCommitDuration } if avgDuration == 0 { log.Infof("get avg commit duration from redis is 0, so use default") - return defaultCommitDuration.Get() + return defaultCommitDuration } return avgDuration } diff --git a/pushtask/l1blocknumtask.go b/pushtask/l1blocknumtask.go index 29ef457e..645ea904 100644 --- a/pushtask/l1blocknumtask.go +++ b/pushtask/l1blocknumtask.go @@ -105,8 +105,8 @@ func (t *L1BlockNumTask) doTask(ctx context.Context) { }(blockNum) // Minus 64 to get the target query block num - oldBlockNum -= utils.Min(utils.L1TargetBlockConfirmations.Get(), oldBlockNum) - blockNum -= utils.Min(utils.L1TargetBlockConfirmations.Get(), blockNum) + oldBlockNum -= utils.Min(utils.L1TargetBlockConfirmations, oldBlockNum) + blockNum -= utils.Min(utils.L1TargetBlockConfirmations, blockNum) if blockNum <= oldBlockNum { return } diff --git a/pushtask/verifiedbatchtask.go b/pushtask/verifiedbatchtask.go index dda6999c..3317dd24 100644 --- a/pushtask/verifiedbatchtask.go +++ b/pushtask/verifiedbatchtask.go @@ -17,12 +17,19 @@ const ( ) var ( - minVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.minVerifyDuration", 2) //nolint:gomnd - defaultVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.defaultVerifyDuration", 10) //nolint:gomnd - maxVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.maxVerifyDuration", 60) //nolint:gomnd - verifyDurationListLen = apolloconfig.NewIntEntry("pushtask.verifyDurationListLen", 5) //nolint:gomnd + minVerifyDuration uint64 = 2 //nolint:gomnd + defaultVerifyDuration uint64 = 10 //nolint:gomnd + maxVerifyDuration uint64 = 60 //nolint:gomnd + verifyDurationListLen = 5 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("pushtask.minVerifyDuration", &minVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.defaultVerifyDuration", &defaultVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.maxVerifyDuration", &maxVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.verifyDurationListLen", &verifyDurationListLen) +} + type VerifiedBatchHandler struct { rpcUrl string redisStorage redisstorage.RedisStorage @@ -109,7 +116,7 @@ func (ins *VerifiedBatchHandler) freshRedisForAvgCommitDuration(ctx context.Cont if err != nil { return err } - if listLen <= int64(verifyDurationListLen.Get()) { + if listLen <= int64(verifyDurationListLen) { log.Infof("redis verify duration list is not enough, so skip count the avg duration!") return nil } @@ -151,18 +158,18 @@ func (ins *VerifiedBatchHandler) checkLatestBatchLegal(ctx context.Context, late // checkAvgDurationLegal duration has a default range, 2-30 minutes, if over range, maybe dirty data, drop the data func (ins *VerifiedBatchHandler) checkAvgDurationLegal(avgDuration int64) bool { - return avgDuration > int64(minVerifyDuration.Get()) && avgDuration < int64(maxVerifyDuration.Get()) + return avgDuration > int64(minVerifyDuration) && avgDuration < int64(maxVerifyDuration) } func GetAvgVerifyDuration(ctx context.Context, redisStorage redisstorage.RedisStorage) uint64 { avgDuration, err := redisStorage.GetAvgVerifyDuration(ctx) if err != nil && !errors.Is(err, redis.Nil) { log.Errorf("get avg verify duration from redis failed, error: %v", err) - return defaultVerifyDuration.Get() + return defaultVerifyDuration } if avgDuration == 0 { log.Infof("get avg verify duration from redis is 0, so use default") - return defaultVerifyDuration.Get() + return defaultVerifyDuration } return avgDuration } diff --git a/redisstorage/redisstorage.go b/redisstorage/redisstorage.go index a54fab69..2ce18742 100644 --- a/redisstorage/redisstorage.go +++ b/redisstorage/redisstorage.go @@ -51,8 +51,8 @@ const ( // redisStorageImpl implements RedisStorage interface type redisStorageImpl struct { client RedisClient - enableCoinPriceCfg apolloconfig.Entry[bool] - keyPrefix apolloconfig.Entry[string] + enableCoinPriceCfg bool + keyPrefix string } func NewRedisStorage(cfg Config) (RedisStorage, error) { @@ -79,14 +79,18 @@ func NewRedisStorage(cfg Config) (RedisStorage, error) { return nil, errors.Wrap(err, "cannot connect to redis server") } log.Debugf("redis health check done, result: %v", res) - return &redisStorageImpl{client: client, - enableCoinPriceCfg: apolloconfig.NewBoolEntry("CoinPrice.Enabled", cfg.EnablePrice), - keyPrefix: apolloconfig.NewStringEntry("Redis.KeyPrefix", cfg.KeyPrefix), - }, nil + s := &redisStorageImpl{client: client, + enableCoinPriceCfg: cfg.EnablePrice, + keyPrefix: cfg.KeyPrefix, + } + apolloconfig.RegisterChangeHandler("CoinPrice.Enabled", &s.enableCoinPriceCfg) + apolloconfig.RegisterChangeHandler("Redis.KeyPrefix", &s.keyPrefix) + + return s, nil } func (s *redisStorageImpl) addKeyPrefix(key string) string { - return s.keyPrefix.Get() + key + return s.keyPrefix + key } func (s *redisStorageImpl) SetCoinPrice(ctx context.Context, prices []*pb.SymbolPrice) error { @@ -174,7 +178,7 @@ func (s *redisStorageImpl) GetCoinPrice(ctx context.Context, symbols []*pb.Symbo log.Debugf("GetCoinPrice size[%v]", len(symbols)) var priceList []*pb.SymbolPrice var err error - if s.enableCoinPriceCfg.Get() { + if s.enableCoinPriceCfg { priceList, err = s.getCoinPrice(ctx, symbols) if err != nil { return nil, err diff --git a/server/iprestriction/client.go b/server/iprestriction/client.go index aee82759..adcb87b5 100644 --- a/server/iprestriction/client.go +++ b/server/iprestriction/client.go @@ -6,8 +6,10 @@ import ( "net/http" "net/url" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" + "github.com/apolloconfig/agollo/v4/storage" ) type Client struct { @@ -36,6 +38,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "IPRestriction", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func GetClient() *Client { diff --git a/server/service.go b/server/service.go index 44eca1bb..95da8395 100644 --- a/server/service.go +++ b/server/service.go @@ -7,7 +7,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl" "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl/pb" - "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/localcache" "github.com/0xPolygonHermez/zkevm-bridge-service/log" @@ -26,8 +25,8 @@ type bridgeService struct { storage bridgeServiceStorage networkIDs map[uint]uint8 height uint8 - defaultPageLimit apolloconfig.Entry[uint32] - maxPageLimit apolloconfig.Entry[uint32] + defaultPageLimit uint32 + maxPageLimit uint32 version string cache *lru.Cache[string, [][]byte] pb.UnimplementedBridgeServiceServer @@ -41,16 +40,10 @@ type bridgeService struct { } // NewBridgeService creates new bridge service. -func NewBridgeService(cfg Config, height uint8, networks []uint, l2Clients []*utils.Client, l2Auths []*bind.TransactOpts, storage interface{}, rollupID uint) *bridgeService { +func NewBridgeService(cfg Config, height uint8, networks []uint, storage interface{}, rollupID uint) *bridgeService { var networkIDs = make(map[uint]uint8) - var nodeClients = make(map[uint]*utils.Client, len(networks)) - var authMap = make(map[uint]*bind.TransactOpts, len(networks)) for i, network := range networks { networkIDs[network] = uint8(i) - if i > 0 { - nodeClients[network] = l2Clients[i-1] - authMap[network] = l2Auths[i-1] - } } cache, err := lru.New[string, [][]byte](cfg.CacheSize) if err != nil { @@ -61,10 +54,8 @@ func NewBridgeService(cfg Config, height uint8, networks []uint, l2Clients []*ut storage: storage.(bridgeServiceStorage), height: height, networkIDs: networkIDs, - nodeClients: nodeClients, - auths: authMap, - defaultPageLimit: apolloconfig.NewIntEntry("BridgeServer.DefaultPageLimit", cfg.DefaultPageLimit), - maxPageLimit: apolloconfig.NewIntEntry("BridgeServer.MaxPageLimit", cfg.MaxPageLimit), + defaultPageLimit: cfg.DefaultPageLimit, + maxPageLimit: cfg.MaxPageLimit, version: cfg.BridgeVersion, cache: cache, } @@ -263,10 +254,10 @@ func (s *bridgeService) CheckAPI(ctx context.Context, req *pb.CheckAPIRequest) ( func (s *bridgeService) GetBridges(ctx context.Context, req *pb.GetBridgesRequest) (*pb.GetBridgesResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } totalCount, err := s.storage.GetDepositCount(ctx, req.DestAddr, nil) if err != nil { @@ -318,10 +309,10 @@ func (s *bridgeService) GetBridges(ctx context.Context, req *pb.GetBridgesReques func (s *bridgeService) GetClaims(ctx context.Context, req *pb.GetClaimsRequest) (*pb.GetClaimsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } totalCount, err := s.storage.GetClaimCount(ctx, req.DestAddr, nil) if err != nil { diff --git a/server/service_xlayer.go b/server/service_xlayer.go index 30f4c170..c237d4b1 100644 --- a/server/service_xlayer.go +++ b/server/service_xlayer.go @@ -21,6 +21,7 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/messagebridge" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" "github.com/redis/go-redis/v9" ) @@ -31,9 +32,32 @@ const ( ) var ( - minReadyTimeLimitForWaitClaimSeconds = apolloconfig.NewIntEntry[int64]("api.minReadyTimeLimitForWaitClaim", 24*60*1000) //nolint:gomnd + minReadyTimeLimitForWaitClaimSeconds int64 = 24 * 60 * 1000 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("api.minReadyTimeLimitForWaitClaim", &minReadyTimeLimitForWaitClaimSeconds) +} + +// NewBridgeServiceXLayer creates new bridge service. +func NewBridgeServiceXLayer(cfg Config, height uint8, networks []uint, l2Clients []*utils.Client, l2Auths []*bind.TransactOpts, storage interface{}, rollupID uint) *bridgeService { + s := NewBridgeService(cfg, height, networks, storage, rollupID) + + var nodeClients = make(map[uint]*utils.Client, len(networks)) + var authMap = make(map[uint]*bind.TransactOpts, len(networks)) + for i, network := range networks { + if i > 0 { + nodeClients[network] = l2Clients[i-1] + authMap[network] = l2Auths[i-1] + } + } + s.nodeClients = nodeClients + s.auths = authMap + apolloconfig.RegisterChangeHandler("BridgeServer.DefaultPageLimit", &s.defaultPageLimit) + apolloconfig.RegisterChangeHandler("BridgeServer.MaxPageLimit", &s.maxPageLimit) + return s +} + func (s *bridgeService) WithRedisStorage(storage redisstorage.RedisStorage) *bridgeService { s.redisStorage = storage return s @@ -129,10 +153,10 @@ func (s *bridgeService) GetMainCoins(ctx context.Context, req *pb.GetMainCoinsRe func (s *bridgeService) GetPendingTransactions(ctx context.Context, req *pb.GetPendingTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetPendingTransactions(ctx, req.DestAddr, uint(limit+1), uint(req.Offset), messagebridge.GetContractAddressList(), nil) @@ -178,7 +202,7 @@ func (s *bridgeService) GetPendingTransactions(ctx context.Context, req *pb.GetP // For L1->L2, when ready_for_claim is false, but there have been more than 64 block confirmations, // should also display the status as "L2 executing" (pending auto claim) if deposit.NetworkID == 0 { - if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations.Get() { + if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations { transaction.Status = uint32(pb.TransactionStatus_TX_PENDING_AUTO_CLAIM) } } else { @@ -228,10 +252,10 @@ func (s *bridgeService) setDurationForL2Deposit(ctx context.Context, l2AvgCommit func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetDepositsXLayer(ctx, req.DestAddr, uint(limit+1), uint(req.Offset), messagebridge.GetContractAddressList(), nil) @@ -291,7 +315,7 @@ func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTr // For L1->L2, when ready_for_claim is false, but there have been more than 64 block confirmations, // should also display the status as "L2 executing" (pending auto claim) if deposit.NetworkID == 0 { - if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations.Get() { + if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations { transaction.Status = uint32(pb.TransactionStatus_TX_PENDING_AUTO_CLAIM) } } else { @@ -325,10 +349,10 @@ func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTr func (s *bridgeService) GetNotReadyTransactions(ctx context.Context, req *pb.GetNotReadyTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetNotReadyTransactions(ctx, uint(limit+1), uint(req.Offset), nil) @@ -363,10 +387,10 @@ func (s *bridgeService) GetNotReadyTransactions(ctx context.Context, req *pb.Get func (s *bridgeService) GetMonitoredTxsByStatus(ctx context.Context, req *pb.GetMonitoredTxsByStatusRequest) (*pb.CommonMonitoredTxsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } mTxs, err := s.storage.GetClaimTxsByStatusWithLimit(ctx, []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatus(req.Status)}, uint(limit+1), uint(req.Offset), nil) @@ -502,13 +526,13 @@ func (s *bridgeService) ManualClaim(ctx context.Context, req *pb.ManualClaimRequ func (s *bridgeService) GetReadyPendingTransactions(ctx context.Context, req *pb.GetReadyPendingTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } - minReadyTime := time.Now().Add(time.Duration(-minReadyTimeLimitForWaitClaimSeconds.Get()) * time.Second) + minReadyTime := time.Now().Add(time.Duration(-minReadyTimeLimitForWaitClaimSeconds) * time.Second) deposits, err := s.storage.GetReadyPendingTransactions(ctx, uint(req.NetworkId), uint(limit+1), uint(req.Offset), minReadyTime, nil) if err != nil { diff --git a/server/tokenlogoinfo/Client.go b/server/tokenlogoinfo/Client.go index 2b7e4abd..638988df 100644 --- a/server/tokenlogoinfo/Client.go +++ b/server/tokenlogoinfo/Client.go @@ -10,9 +10,11 @@ import ( "strconv" "strings" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/models/tokenlogo" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/apolloconfig/agollo/v4/storage" ) const ( @@ -48,6 +50,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "TokenLogoServiceConfig", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func (c *Client) GetTokenLogoInfos(tokenAddArr []*tokenlogo.QueryLogoParam) (map[string]tokenlogo.LogoInfo, error) { diff --git a/synchronizer/synchronizer_xlayer.go b/synchronizer/synchronizer_xlayer.go index 15674cd1..a68611be 100644 --- a/synchronizer/synchronizer_xlayer.go +++ b/synchronizer/synchronizer_xlayer.go @@ -21,14 +21,17 @@ import ( ) const ( - num1 = 1 - wstETHRedisLockKey = "wst_eth_l2_token_not_withdrawn_lock_" + num1 = 1 ) var ( - largeTxUsdLimit = apolloconfig.NewIntEntry[uint64]("Synchronizer.LargeTxUsdLimit", 100000) //nolint:gomnd + largeTxUsdLimit = 100000 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("Synchronizer.LargeTxUsdLimit", &largeTxUsdLimit) +} + func (s *ClientSynchronizer) beforeProcessDeposit(deposit *etherman.Deposit) { messagebridge.ReplaceDepositDestAddresses(deposit) } @@ -142,7 +145,7 @@ func (s *ClientSynchronizer) filterLargeTransaction(ctx context.Context, transac tokenDecimal := new(big.Float).SetPrec(uint(transaction.GetLogoInfo().Decimal)).SetFloat64(math.Pow10(int(transaction.GetLogoInfo().Decimal))) tokenAmount, _ := new(big.Float).Quo(originNum, tokenDecimal).Float64() usdAmount := priceInfos[0].Price * tokenAmount - if usdAmount < float64(largeTxUsdLimit.Get()) { + if usdAmount < float64(largeTxUsdLimit) { log.Infof("tx usd amount less than limit, so skip, tx usd amount: %v, tx: %v", usdAmount, transaction.GetTxHash()) return } diff --git a/test/operations/manager.go b/test/operations/manager.go index 93fdfb53..b96e2321 100644 --- a/test/operations/manager.go +++ b/test/operations/manager.go @@ -119,7 +119,7 @@ func NewManager(ctx context.Context, cfg *Config) (*Manager, error) { if err != nil { return nil, err } - bService := server.NewBridgeService(cfg.BS, cfg.BT.Height, []uint{0, 1}, []*utils.Client{nil}, []*bind.TransactOpts{nil}, pgst, rollupID) + bService := server.NewBridgeService(cfg.BS, cfg.BT.Height, []uint{0, 1}, pgst, rollupID) opsman.storage = st.(StorageInterface) opsman.bridgetree = bt opsman.bridgeService = bService diff --git a/test/operations/mockserver.go b/test/operations/mockserver.go index 497b6f6d..192300ba 100644 --- a/test/operations/mockserver.go +++ b/test/operations/mockserver.go @@ -7,8 +7,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl" "github.com/0xPolygonHermez/zkevm-bridge-service/db/pgstorage" "github.com/0xPolygonHermez/zkevm-bridge-service/server" - "github.com/0xPolygonHermez/zkevm-bridge-service/utils" - "github.com/ethereum/go-ethereum/accounts/abi/bind" ) // RunMockServer runs mock server @@ -46,6 +44,6 @@ func RunMockServer(dbType string, height uint8, networks []uint) (*bridgectrl.Br MaxPageLimit: 100, //nolint:gomnd BridgeVersion: "v1", } - bridgeService := server.NewBridgeService(cfg, btCfg.Height, networks, []*utils.Client{nil}, []*bind.TransactOpts{nil}, store, rollupID) + bridgeService := server.NewBridgeService(cfg, btCfg.Height, networks, store, rollupID) return bt, store, server.RunServer(cfg, bridgeService) } diff --git a/utils/constant.go b/utils/constant.go index 933d2ecc..68e7df8b 100644 --- a/utils/constant.go +++ b/utils/constant.go @@ -17,5 +17,9 @@ const ( var ( // L1TargetBlockConfirmations is the number of block confirmations need to wait for the transaction to be synced from L1 to L2 - L1TargetBlockConfirmations = apolloconfig.NewIntEntry[uint64]("l1TargetBlockConfirmations", 64) //nolint:gomnd + L1TargetBlockConfirmations uint64 = 64 //nolint:gomnd ) + +func init() { + apolloconfig.RegisterChangeHandler("l1TargetBlockConfirmations", &L1TargetBlockConfirmations) +} diff --git a/utils/helpers.go b/utils/helpers.go index 3320d474..6eff8e25 100644 --- a/utils/helpers.go +++ b/utils/helpers.go @@ -2,6 +2,7 @@ package utils import ( "crypto/sha256" + "encoding/json" "math/rand" ) @@ -19,3 +20,8 @@ func GenerateRandomHash() [sha256.Size]byte { rs := generateRandomString(10) //nolint:gomnd return sha256.Sum256([]byte(rs)) } + +func ToJson(x any) string { + b, _ := json.Marshal(x) + return string(b) +} diff --git a/utils/innerchainIdmanager.go b/utils/innerchainIdmanager.go index ce18e2ad..b3296852 100644 --- a/utils/innerchainIdmanager.go +++ b/utils/innerchainIdmanager.go @@ -1,13 +1,34 @@ package utils import ( + "sync" + + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" ) -var standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 +var ( + standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 + chainIDMapperLock = &sync.RWMutex{} +) func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { + initOkInnerChainIdMapper(cfg) + + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, _ *storage.ConfigChange, c any) { + initOkInnerChainIdMapper(*c.(*businessconfig.Config)) + })) +} + +func initOkInnerChainIdMapper(cfg businessconfig.Config) { + chainIDMapperLock.Lock() + defer chainIDMapperLock.Unlock() + standardIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) innerIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) if cfg.StandardChainIds == nil { @@ -22,6 +43,9 @@ func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { } func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + chainId, found := innerIdKeyMapper[innerChainId] if !found { return innerChainId @@ -30,6 +54,9 @@ func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { } func GetInnerChainIdByStandardId(chainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + innerChainId, found := standardIdKeyMapper[chainId] if !found { return chainId diff --git a/utils/messagebridge/processor.go b/utils/messagebridge/processor.go index 237cdd67..9429467b 100644 --- a/utils/messagebridge/processor.go +++ b/utils/messagebridge/processor.go @@ -2,6 +2,7 @@ package messagebridge import ( "math/big" + "sync" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/utils" @@ -18,12 +19,14 @@ const ( var ( emptyAddress = common.Address{} processorMap = make(map[ProcessorType]*Processor) + mutexMap = make(map[ProcessorType]*sync.RWMutex) ) // Processor hosts the processing functions for an LxLy bridge using the message bridge feature // Each Processor object should be used for one type of bridged token only // Current supported tokens: USDC, wstETH type Processor struct { + pType ProcessorType contractToTokenMapping map[common.Address]common.Address // DecodeMetadata decodes the metadata of the message bridge, returns the actual destination address and bridged amount DecodeMetadataFn func(metadata []byte) (common.Address, *big.Int) @@ -31,6 +34,10 @@ type Processor struct { // GetContractAddressList returns the list of contract addresses that need to be processed through this struct func (u *Processor) GetContractAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for addr := range u.contractToTokenMapping { result = append(result, addr) @@ -40,6 +47,10 @@ func (u *Processor) GetContractAddressList() []common.Address { // GetTokenAddressList returns the list of original token addresses func (u *Processor) GetTokenAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for _, addr := range u.contractToTokenMapping { result = append(result, addr) @@ -49,6 +60,10 @@ func (u *Processor) GetTokenAddressList() []common.Address { // CheckContractAddress returns true if the input address is in the contract address list of this bridge func (u *Processor) CheckContractAddress(address common.Address) bool { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if _, ok := u.contractToTokenMapping[address]; ok { return true } @@ -57,6 +72,10 @@ func (u *Processor) CheckContractAddress(address common.Address) bool { // GetTokenFromContract return the token address from the bridge contract address, for displaying func (u *Processor) GetTokenFromContract(contractAddress common.Address) (common.Address, bool) { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if token, ok := u.contractToTokenMapping[contractAddress]; ok { return token, true } @@ -131,3 +150,12 @@ func ReplaceDepositInfo(deposit *etherman.Deposit, overwriteOrigNetworkID bool) } processor.ReplaceDepositInfo(deposit, overwriteOrigNetworkID) } + +func getMutex(tp ProcessorType) *sync.RWMutex { + mutex := mutexMap[tp] + if mutex == nil { + mutex = &sync.RWMutex{} + mutexMap[tp] = mutex + } + return mutex +} diff --git a/utils/messagebridge/usdclxly.go b/utils/messagebridge/usdclxly.go index 92c0ebf5..c1f86e0f 100644 --- a/utils/messagebridge/usdclxly.go +++ b/utils/messagebridge/usdclxly.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.USDCContractAddresses) == 0 || len(cfg.USDCContractAddresses) == 0 { + delete(processorMap, USDC) + return + } + initUSDCLxLyProcessor(cfg.USDCContractAddresses, cfg.USDCTokenAddresses) + })) +} + +func initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + mutex := getMutex(USDC) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("USDCLxLyMapping: contracts[%v] tokens[%v]", usdcContractAddresses, usdcTokenAddresses) if len(usdcContractAddresses) != len(usdcTokenAddresses) { log.Errorf("InitUSDCLxLyProcessor: contract addresses (%v) and token addresses (%v) have different length", len(usdcContractAddresses), len(usdcTokenAddresses)) diff --git a/utils/messagebridge/wsteth.go b/utils/messagebridge/wsteth.go index 7d8f27af..059ecbee 100644 --- a/utils/messagebridge/wsteth.go +++ b/utils/messagebridge/wsteth.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.WstETHContractAddresses) == 0 || len(cfg.WstETHTokenAddresses) == 0 { + delete(processorMap, WstETH) + return + } + initUSDCLxLyProcessor(cfg.WstETHContractAddresses, cfg.WstETHTokenAddresses) + })) +} + +func initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + mutex := getMutex(WstETH) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("WstETHMapping: contracts[%v] tokens[%v]", wstETHContractAddresses, wstETHTokenAddresses) if len(wstETHContractAddresses) != len(wstETHTokenAddresses) { log.Errorf("InitWstETHProcessor: contract addresses (%v) and token addresses (%v) have different length", len(wstETHContractAddresses), len(wstETHTokenAddresses))