From 1ba08153e2b0248033555bcf585d609a3a7e8a96 Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Fri, 5 Jul 2024 12:02:39 +0800 Subject: [PATCH 1/6] Refactor and add unit tests for apolloconfig --- config/apolloconfig/apollo.go | 86 +------------ config/apolloconfig/apollo_test.go | 72 +++++++++++ config/apolloconfig/entry.go | 91 +++++--------- config/apolloconfig/entry_test.go | 189 +++++++++++++++++++++++++++++ 4 files changed, 293 insertions(+), 145 deletions(-) create mode 100644 config/apolloconfig/apollo_test.go create mode 100644 config/apolloconfig/entry_test.go diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index 32f3c6b7..a6fce701 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -94,33 +94,10 @@ func handleStruct(v reflect.Value) error { // Process differently for each type if field.CanAddr() && field.Addr().Type().Implements(textUnmarshalerType) { loadTextUnmarshaler(field, key) - } else if field.Kind() == reflect.Struct { - loadStruct(field, key) - } else if field.CanInt() { - field.SetInt(NewIntEntry(key, field.Int()).Get()) - } else if field.CanUint() { - field.SetUint(NewIntEntry(key, field.Uint()).Get()) } else if field.Kind() == reflect.String { field.SetString(NewStringEntry(key, field.String()).Get()) - } else if field.Kind() == reflect.Bool { - field.SetBool(NewBoolEntry(key, field.Bool()).Get()) - } else if field.Kind() == reflect.Slice { - switch field.Type().Elem().Kind() { - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - loadIntSlice(field, key) - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: - loadUintSlice(field, key) - case reflect.String: - loadStringSlice(field, key) - default: - if reflect.New(field.Type().Elem()).Type().Implements(textUnmarshalerType) { - loadTextUnmarshalerSlice(field, key) - } else { - logger.Debugf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type) - } - } } else { - logger.Errorf("Load apollo: field %v has invalid type %v", structField.Name, structField.Type) + loadJson(field, key) } } @@ -135,7 +112,7 @@ func handleStruct(v reflect.Value) error { return nil } -func loadStruct(v reflect.Value, key string) { +func loadJson(v reflect.Value, key string) { s := NewStringEntry(key, "").Get() if s == "" { return @@ -143,7 +120,6 @@ func loadStruct(v reflect.Value, key string) { // Create a clone so we won't change the original values unexpectedly temp := reflect.New(v.Type()).Interface() - // Always use JSON for struct err := json.Unmarshal([]byte(s), &temp) if err != nil { return @@ -151,44 +127,6 @@ func loadStruct(v reflect.Value, key string) { v.Set(reflect.ValueOf(temp).Elem()) } -func loadIntSlice(v reflect.Value, key string) { - list, err := NewIntSliceEntry(key, []int64{}).GetWithErr() - if err != nil { - return - } - - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, x := range list { - temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType)) - } - - v.Set(temp) -} - -func loadUintSlice(v reflect.Value, key string) { - list, err := NewIntSliceEntry(key, []uint64{}).GetWithErr() - if err != nil { - return - } - - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, x := range list { - temp.Index(i).Set(reflect.ValueOf(x).Convert(elemType)) - } - - v.Set(temp) -} - -func loadStringSlice(v reflect.Value, key string) { - list, err := NewStringSliceEntry(key, []string{}).GetWithErr() - if err != nil { - return - } - v.Set(reflect.ValueOf(list)) -} - func loadTextUnmarshaler(v reflect.Value, key string) { s, err := NewStringEntry(key, "").GetWithErr() if err != nil { @@ -202,23 +140,3 @@ func loadTextUnmarshaler(v reflect.Value, key string) { v.Set(reflect.Indirect(reflect.ValueOf(temp))) } - -func loadTextUnmarshalerSlice(v reflect.Value, key string) { - list, err := NewStringSliceEntry(key, []string{}).GetWithErr() - if err != nil { - return - } - - temp := reflect.MakeSlice(v.Type(), len(list), len(list)) - elemType := v.Type().Elem() - for i, s := range list { - tempElem := reflect.New(elemType).Interface().(encoding.TextUnmarshaler) - err = tempElem.UnmarshalText([]byte(s)) - if err != nil { - return - } - temp.Index(i).Set(reflect.Indirect(reflect.ValueOf(tempElem))) - } - - v.Set(temp) -} diff --git a/config/apolloconfig/apollo_test.go b/config/apolloconfig/apollo_test.go new file mode 100644 index 00000000..308a224c --- /dev/null +++ b/config/apolloconfig/apollo_test.go @@ -0,0 +1,72 @@ +package apolloconfig + +import ( + "testing" + "time" + + "github.com/0xPolygonHermez/zkevm-node/config/types" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestLoad(t *testing.T) { + type SubStruct struct { + E map[common.Address]common.Address `apollo:"d.e"` + F []common.Address `apollo:"d.f"` + G string `apollo:"d.g"` + H []string `apollo:"d.h"` + I types.Duration `apollo:"d.i"` + } + type StructTest struct { + A int64 `apollo:"a"` + B []int32 `apollo:"b"` + C bool `apollo:"c"` + D SubStruct `apollo:"d"` + } + + // Mocking the result from Apollo server + resultMapping := map[string]string{ + "a": `123`, + "b": `[1,2]`, + "c": `true`, + "d.e": `{"0x167985f547e5087DA14084b80762104d36c08756":"0xB82381A3fBD3FaFA77B3a7bE693342618240067b", "0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1":"0xB82381A3fBD3FaFA77B3a7bE693342618240067b"}`, + "d.f": `["0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1"]`, + "d.g": `dgstring`, + "d": `{"F":["0x167985f547e5087DA14084b80762104d36c08756","0xB82381A3fBD3FaFA77B3a7bE693342618240067b"],"G":"dg","H":["s1","s2","s3"],"I":"3m5s"}`, + } + + expected := StructTest{ + A: 123, + B: []int32{1, 2}, + C: true, + D: SubStruct{ + E: map[common.Address]common.Address{ + common.HexToAddress("0x167985f547e5087DA14084b80762104d36c08756"): common.HexToAddress("0xB82381A3fBD3FaFA77B3a7bE693342618240067b"), + common.HexToAddress("0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1"): common.HexToAddress("0xB82381A3fBD3FaFA77B3a7bE693342618240067b"), + }, + F: []common.Address{common.HexToAddress("0x0392D4076E31Fa4cd6AB5c3491046F46e06901B1")}, + G: "dgstring", + H: []string{"s1", "s2", "s3"}, + I: types.NewDuration(3*time.Minute + 5*time.Second), + }, + } + + enabled = true + getStringFn = func(_, key string, result *string) error { + s, ok := resultMapping[key] + if !ok { + return errors.New("key not found") + } + *result = s + return nil + } + + var output StructTest + err := Load(output) + require.Error(t, err) + + err = Load(&output) + require.NoError(t, err) + require.Equal(t, expected, output) +} diff --git a/config/apolloconfig/entry.go b/config/apolloconfig/entry.go index 07a1fc04..23b6da03 100644 --- a/config/apolloconfig/entry.go +++ b/config/apolloconfig/entry.go @@ -1,11 +1,9 @@ package apolloconfig import ( + "encoding/json" "fmt" - "strconv" - "strings" - "github.com/apolloconfig/agollo/v4" "github.com/pkg/errors" "golang.org/x/exp/constraints" ) @@ -16,7 +14,7 @@ type Entry[T any] interface { } // An interface to get the config from Apollo client (and convert it if needed) -type getterFunction[T any] func(client *agollo.Client, namespace, key string) (T, error) +type getterFunction[T any] func(namespace, key string, result *T) error type entryImpl[T any] struct { namespace string @@ -50,24 +48,27 @@ func newEntry[T any](key string, defaultValue T, getterFn getterFunction[T], opt } func NewIntEntry[T constraints.Integer](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { - return newEntry(key, defaultValue, getInt[T], opts...) + return newEntry(key, defaultValue, getJson[T], opts...) } func NewIntSliceEntry[T constraints.Integer](key string, defaultValue []T, opts ...entryOption[[]T]) Entry[[]T] { - return newEntry(key, defaultValue, getIntSlice[T], opts...) + return newEntry(key, defaultValue, getJson[[]T], opts...) } func NewBoolEntry(key string, defaultValue bool, opts ...entryOption[bool]) Entry[bool] { - return newEntry(key, defaultValue, getBool, opts...) + return newEntry(key, defaultValue, getJson[bool], opts...) } func NewStringEntry(key string, defaultValue string, opts ...entryOption[string]) Entry[string] { - return newEntry(key, defaultValue, getString, opts...) + return newEntry(key, defaultValue, getStringFn, opts...) } -// String array is separated by commas, so this will work incorrectly if we have comma in the elements func NewStringSliceEntry(key string, defaultValue []string, opts ...entryOption[[]string]) Entry[[]string] { - return newEntry(key, defaultValue, getStringSlice, opts...) + return newEntry(key, defaultValue, getJson[[]string], opts...) +} + +func NewJsonEntry[T any](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { + return newEntry(key, defaultValue, getJson[T], opts...) } func (e *entryImpl[T]) String() string { @@ -89,17 +90,12 @@ func (e *entryImpl[T]) GetWithErr() (T, error) { return e.defaultValue, errors.New("apollo disabled") } - // If client is not initialized, return the default value - client := GetClient() - if client == nil { - return e.defaultValue, errors.New("apollo client is nil") - } - if e.getterFn == nil { return e.defaultValue, errors.New("getterFn is nil") } - v, err := e.getterFn(client, e.namespace, e.key) + var v T + err := e.getterFn(e.namespace, e.key, &v) if err != nil { return e.defaultValue, errors.Wrap(err, "getterFn error") } @@ -108,58 +104,31 @@ func (e *entryImpl[T]) GetWithErr() (T, error) { // ----- Getter functions ----- -func getString(client *agollo.Client, namespace, key string) (string, error) { +var getStringFn = getString + +func getString(namespace, key string, result *string) error { + client := GetClient() + if client == nil { + return errors.New("apollo client is nil") + } v, err := client.GetConfig(namespace).GetCache().Get(key) if err != nil { - return "", err + return err } s, ok := v.(string) if !ok { - return "", fmt.Errorf("value is not string, type: %T", v) - } - return s, nil -} - -// String array is separated by commas, so this will work incorrectly if we have comma in the elements -func getStringSlice(client *agollo.Client, namespace, key string) ([]string, error) { - s, err := getString(client, namespace, key) - if err != nil { - return nil, err - } - return strings.Split(s, comma), nil -} - -func getInt[T constraints.Integer](client *agollo.Client, namespace, key string) (T, error) { - s, err := getString(client, namespace, key) - if err != nil { - return 0, err - } - res, err := strconv.ParseInt(s, parseIntBase, parseIntBitSize) - return T(res), err -} - -func getIntSlice[T constraints.Integer](client *agollo.Client, namespace, key string) ([]T, error) { - s, err := getString(client, namespace, key) - if err != nil { - return nil, err - } - - sArr := strings.Split(s, comma) - result := make([]T, len(sArr)) - for i := range sArr { - v, err := strconv.ParseInt(sArr[i], parseIntBase, parseIntBitSize) - if err != nil { - return nil, err - } - result[i] = T(v) + return fmt.Errorf("value is not string, type: %T", v) } - return result, nil + *result = s + return nil } -func getBool(client *agollo.Client, namespace, key string) (bool, error) { - s, err := getString(client, namespace, key) +func getJson[T any](namespace, key string, result *T) error { + var s string + err := getStringFn(namespace, key, &s) if err != nil { - return false, err + return err } - return strconv.ParseBool(s) + err = json.Unmarshal([]byte(s), result) + return err } diff --git a/config/apolloconfig/entry_test.go b/config/apolloconfig/entry_test.go new file mode 100644 index 00000000..b8ece7eb --- /dev/null +++ b/config/apolloconfig/entry_test.go @@ -0,0 +1,189 @@ +package apolloconfig + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetInt(t *testing.T) { + testCases := []struct { + inputString string + outputResult int + outputHasErr bool + }{ + {"1", 1, false}, + {"-11", -11, false}, + {"0", 0, false}, + {"", 0, true}, + {" 22 ", 22, false}, + {"asd", 0, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res int + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} + +func TestGetFloat(t *testing.T) { + testCases := []struct { + inputString string + outputResult float64 + outputHasErr bool + }{ + {"1", 1, false}, + {"-11.5", -11.5, false}, + {"0.222", 0.222, false}, + {"", 0, true}, + {" 22.1234 ", 22.1234, false}, + {"asd", 0, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res float64 + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} + +func TestGetIntSlice(t *testing.T) { + testCases := []struct { + inputString string + outputResult []int + outputHasErr bool + }{ + {"[1,2,3]", []int{1, 2, 3}, false}, + {"[]", []int{}, false}, + {"0", nil, true}, + {"1,2,3", nil, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res []int + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} + +func TestGetStringSlice(t *testing.T) { + testCases := []struct { + inputString string + outputResult []string + outputHasErr bool + }{ + {"[\"ab\",\"c\",\"d\\\"\"]", []string{"ab", "c", "d\""}, false}, + {"[]", []string{}, false}, + {"\"a\"", nil, true}, + {"abc", nil, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res []string + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} + +func TestJsonStruct(t *testing.T) { + type S1 struct { + A int `json:"a"` + B string `json:"b"` + } + + type S2 struct { + C S1 `json:"sub"` + D bool `json:"d"` + } + + testCases := []struct { + inputString string + outputResult *S2 + outputHasErr bool + }{ + {`{"sub":{"a":1,"b":"abc"},"d":true}`, &S2{S1{1, "abc"}, true}, false}, + {"{}", &S2{S1{0, ""}, false}, false}, + {"", nil, true}, + {"abc", nil, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res *S2 + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} + +func TestJsonMap(t *testing.T) { + testCases := []struct { + inputString string + outputResult map[string]string + outputHasErr bool + }{ + {`{"sub":{"a":1,"b":"abc"},"d":true}`, nil, true}, + {`{"a":"1","b":"2"}`, map[string]string{"a": "1", "b": "2"}, false}, + {`{"a":"1","b":2}`, nil, true}, + {"abc", nil, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res map[string]string + err := getJson("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, res, "Case #%v", i) + } + } +} From a1b218c5e37c94a8638bc19a1297daaf46ba05af Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Mon, 8 Jul 2024 14:29:28 +0800 Subject: [PATCH 2/6] Add config change listener --- config/apolloconfig/apollo.go | 95 ++++++++++---------- config/apolloconfig/entry.go | 15 ++++ config/apolloconfig/entry_test.go | 33 ++++++- config/apolloconfig/listener.go | 115 ++++++++++++++++++++++++ config/apolloconfig/listener_test.go | 130 +++++++++++++++++++++++++++ 5 files changed, 339 insertions(+), 49 deletions(-) create mode 100644 config/apolloconfig/listener.go create mode 100644 config/apolloconfig/listener_test.go diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index a6fce701..61305a2a 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -78,65 +78,66 @@ func handleStruct(v reflect.Value) error { // Iterate and handle each field for i := 0; i < v.NumField(); i++ { - origField := v.Field(i) - field := reflect.Indirect(origField) + field := v.Field(i) structField := v.Type().Field(i) - - // Get the config key from the field tag - key := structField.Tag.Get(tagName) - if key != "" && key != "-" { - if !field.CanSet() { - logger.Errorf("Load apollo: field %v cannot be set", structField.Name) - continue - } - - // If config key is not empty, use it to query from Apollo server - // Process differently for each type - if field.CanAddr() && field.Addr().Type().Implements(textUnmarshalerType) { - loadTextUnmarshaler(field, key) - } else if field.Kind() == reflect.String { - field.SetString(NewStringEntry(key, field.String()).Get()) - } else { - loadJson(field, key) - } - } - - if field.Kind() == reflect.Struct { - err := handleStruct(field) - if err != nil { - logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err) - } + err := handleObject(field, structField.Tag.Get(tagName)) + if err != nil { + logger.Errorf("Load apollo: field %v of type %v error: %v", structField.Name, structField.Type, err) } } return nil } -func loadJson(v reflect.Value, key string) { - s := NewStringEntry(key, "").Get() - if s == "" { - return - } +func handleObject(v reflect.Value, key string) error { + field := reflect.Indirect(v) - // Create a clone so we won't change the original values unexpectedly - temp := reflect.New(v.Type()).Interface() - err := json.Unmarshal([]byte(s), &temp) - if err != nil { - return + if key != "" && key != "-" { + val, err := NewStringEntry(key, "").GetWithErr() + if err != nil { + return err + } + err = decodeStringToValue(val, field) + if err != nil { + return err + } } - v.Set(reflect.ValueOf(temp).Elem()) + + return nil } -func loadTextUnmarshaler(v reflect.Value, key string) { - s, err := NewStringEntry(key, "").GetWithErr() - if err != nil { - return +func decodeStringToValue(val string, v reflect.Value) error { + if !v.CanSet() { + return errors.New("cannot set value") } - temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler) - err = temp.UnmarshalText([]byte(s)) - if err != nil { - return + + if v.CanAddr() && v.Addr().Type().Implements(textUnmarshalerType) { + temp := reflect.New(v.Type()).Interface().(encoding.TextUnmarshaler) + err := temp.UnmarshalText([]byte(val)) + if err != nil { + return errors.Wrap(err, "UnmarshalText error") + } + v.Set(reflect.Indirect(reflect.ValueOf(temp))) + } else if v.Kind() == reflect.String { + v.SetString(val) + } else { + // Decode json value (including struct, map, int, float, array, etc.) + // Create a clone so we won't change the original values unexpectedly + temp := reflect.New(v.Type()).Interface() + err := json.Unmarshal([]byte(val), &temp) + if err != nil { + return errors.Wrap(err, "json unmarshal error") + } + v.Set(reflect.ValueOf(temp).Elem()) } - v.Set(reflect.Indirect(reflect.ValueOf(temp))) + // Inner fields' values on Apollo have higher priorities + // So we need to re-load the inner fields if this is a struct + if v.Kind() == reflect.Struct { + err := handleStruct(v) + if err != nil { + return errors.Wrap(err, "handleStruct error") + } + } + return nil } diff --git a/config/apolloconfig/entry.go b/config/apolloconfig/entry.go index 23b6da03..679cec58 100644 --- a/config/apolloconfig/entry.go +++ b/config/apolloconfig/entry.go @@ -1,6 +1,7 @@ package apolloconfig import ( + "encoding" "encoding/json" "fmt" @@ -71,6 +72,10 @@ func NewJsonEntry[T any](key string, defaultValue T, opts ...entryOption[T]) Ent return newEntry(key, defaultValue, getJson[T], opts...) } +func NewTextUnmarshalerEntry[T encoding.TextUnmarshaler](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { + return newEntry(key, defaultValue, getTextUnmarshaler[T], opts...) +} + func (e *entryImpl[T]) String() string { return fmt.Sprintf("%v", e.Get()) } @@ -123,6 +128,16 @@ func getString(namespace, key string, result *string) error { return nil } +func getTextUnmarshaler[T encoding.TextUnmarshaler](namespace, key string, result *T) error { + var s string + err := getStringFn(namespace, key, &s) + if err != nil { + return err + } + err = (*result).UnmarshalText([]byte(s)) + return err +} + func getJson[T any](namespace, key string, result *T) error { var s string err := getStringFn(namespace, key, &s) diff --git a/config/apolloconfig/entry_test.go b/config/apolloconfig/entry_test.go index b8ece7eb..e2171204 100644 --- a/config/apolloconfig/entry_test.go +++ b/config/apolloconfig/entry_test.go @@ -3,6 +3,7 @@ package apolloconfig import ( "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" ) @@ -122,7 +123,7 @@ func TestGetStringSlice(t *testing.T) { } } -func TestJsonStruct(t *testing.T) { +func TestGetJsonStruct(t *testing.T) { type S1 struct { A int `json:"a"` B string `json:"b"` @@ -160,7 +161,7 @@ func TestJsonStruct(t *testing.T) { } } -func TestJsonMap(t *testing.T) { +func TestGetJsonMap(t *testing.T) { testCases := []struct { inputString string outputResult map[string]string @@ -187,3 +188,31 @@ func TestJsonMap(t *testing.T) { } } } + +func TestGetTextUnmarshaler(t *testing.T) { + testCases := []struct { + inputString string + outputResult common.Address + outputHasErr bool + }{ + {"0x167985f547e5087DA14084b80762104d36c08756", common.HexToAddress("0x167985f547e5087DA14084b80762104d36c08756"), false}, + {"[]", common.Address{}, true}, + {"", common.Address{}, true}, + {`"0x167985f547e5087DA14084b80762104d36c08756"`, common.Address{}, true}, + } + + for i, c := range testCases { + getStringFn = func(_ string, _ string, result *string) error { + *result = c.inputString + return nil + } + var res = &common.Address{} + err := getTextUnmarshaler("", "", &res) + if c.outputHasErr { + require.Errorf(t, err, "Case #%v", i) + } else { + require.NoErrorf(t, err, "Case #%v", i) + require.Equalf(t, c.outputResult, *res, "Case #%v", i) + } + } +} diff --git a/config/apolloconfig/listener.go b/config/apolloconfig/listener.go new file mode 100644 index 00000000..4740ea03 --- /dev/null +++ b/config/apolloconfig/listener.go @@ -0,0 +1,115 @@ +package apolloconfig + +import ( + "reflect" + "sync" + + "github.com/apolloconfig/agollo/v4/storage" +) + +// ConfigChangeListener implements agollo's storage.ChangeListener to handle the config changes from Apollo remote server +type ConfigChangeListener struct { + changeHandlers map[string][]*changeHandler +} + +var listener *ConfigChangeListener + +// GetDefaultListener is a singleton getter for ConfigChangeListener +func GetDefaultListener() *ConfigChangeListener { + if listener != nil { + return listener + } + + listener = &ConfigChangeListener{ + changeHandlers: make(map[string][]*changeHandler), + } + return listener +} + +func RegisterChangeHandler(key string, opts ...handlerOpt) { + GetDefaultListener().RegisterHandler(key, opts...) +} + +func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) { + getLogger().Debugf("ConfigChangeListener#OnChange received: %+v", event) + + for key, change := range event.Changes { + // Only handle ADDED and MODIFIED type + if change.ChangeType == storage.DELETED { + continue + } + for _, handler := range l.changeHandlers[key] { + handler.handle(change, key) + } + } +} + +func (l *ConfigChangeListener) OnNewestChange(_ *storage.FullChangeEvent) {} + +func (l *ConfigChangeListener) RegisterHandler(key string, opts ...handlerOpt) { + if len(opts) == 0 { + return + } + + handler := &changeHandler{} + for _, f := range opts { + f(handler) + } + l.changeHandlers[key] = append(l.changeHandlers[key], handler) +} + +// changeHandler contains the information for handling the config change for one config, in a specific context +type changeHandler struct { + obj any + callbackFn func(key string, change *storage.ConfigChange) + locker sync.Locker +} + +func (h *changeHandler) handle(change *storage.ConfigChange, key string) { + if h.locker != nil { + h.locker.Lock() + defer h.locker.Unlock() + } + + if h.obj != nil { + err := decodeStringToObject(change.NewValue.(string), h.obj) + if err != nil { + getLogger().WithFields("key", key).Errorf("changeHandler#handle decodeStringToObject error: %v", err) + } + } + + if h.callbackFn != nil { + h.callbackFn(key, change) + } +} + +type handlerOpt func(handler *changeHandler) + +// WithConfigObj assigns an object to be updated when a specific config key is changed. +// The logic for updating can be found in decodeStringToValue function +// obj must be a pointer +func WithConfigObj[T any](obj *T) handlerOpt { + return func(handler *changeHandler) { + handler.obj = obj + } +} + +// WithCallbackFn assigns a callback function that will be called when a config key is changed +func WithCallbackFn(callbackFn func(string, *storage.ConfigChange)) handlerOpt { + return func(handler *changeHandler) { + handler.callbackFn = callbackFn + } +} + +// WithLocker assigns a locker object (e.g. sync.Mutex, sync.RWMutex) +// When a config change is received, we will first wrap the config update operations inside lock.Lock()/locker.Unlock() +func WithLocker(locker sync.Locker) handlerOpt { + return func(handler *changeHandler) { + handler.locker = locker + } +} + +func decodeStringToObject(val string, obj any) error { + v := reflect.Indirect(reflect.ValueOf(obj)) + return decodeStringToValue(val, v) +} diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go new file mode 100644 index 00000000..f0a29759 --- /dev/null +++ b/config/apolloconfig/listener_test.go @@ -0,0 +1,130 @@ +package apolloconfig + +import ( + "testing" + + "github.com/apolloconfig/agollo/v4/storage" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestConfigChangeListener(t *testing.T) { + type SubStruct struct { + C float64 + D map[string]bool `apollo:"mp"` + E string `apollo:"e"` + } + + type StructTest struct { + A string `apollo:"stringField"` + B SubStruct `apollo:"sub"` + } + + // Mocking the result from Apollo server + configMapping := map[string]string{ + "stringField": "aaa", + "mp": `{"a": true, "b": false, "c": false}`, + "sub": `{"C":0.55, "E": "e1"}`, + } + + enabled = true + getStringFn = func(_, key string, result *string) error { + s, ok := configMapping[key] + if !ok { + return errors.New("key not found") + } + *result = s + return nil + } + + expected := StructTest{ + A: "aaa", + B: SubStruct{ + C: 0.55, + D: map[string]bool{"a": true, "b": false, "c": false}, + E: "e1", + }, + } + + cnt := make(map[string]int) + callback := func(key string, _ *storage.ConfigChange) { + cnt[key]++ + } + + var s StructTest + err := Load(&s) + require.NoError(t, err) + require.Equal(t, expected, s) + + var stringField = s.A + RegisterChangeHandler("stringField", WithConfigObj(&stringField), WithCallbackFn(callback)) + RegisterChangeHandler("stringField", WithConfigObj(&s.A), WithCallbackFn(callback)) + RegisterChangeHandler("sub", WithConfigObj(&s.B), WithCallbackFn(callback)) + RegisterChangeHandler("e", WithConfigObj(&s.B.E), WithCallbackFn(callback)) + RegisterChangeHandler("mp", WithConfigObj(&s.B.D), WithCallbackFn(callback)) + + listener := GetDefaultListener() + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "stringField": { + ChangeType: storage.MODIFIED, + NewValue: "bbb", + }, + "sub": { + ChangeType: storage.MODIFIED, + NewValue: `{"C": 1.5, "D": {"z": true}, "E": "e2"}`, + }, + }, + }) + expected.A = "bbb" + expected.B.C = 1.5 + expected.B.E = "e2" + require.Equal(t, expected, s) + require.Equal(t, "bbb", stringField) + require.Equal(t, 2, cnt["stringField"]) + require.Equal(t, 1, cnt["sub"]) + + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "stringField": { + ChangeType: storage.MODIFIED, + NewValue: "ccc", + }, + "e": { + ChangeType: storage.ADDED, + NewValue: "e3", + }, + }, + }) + expected.A = "ccc" + expected.B.E = "e3" + require.Equal(t, expected, s) + require.Equal(t, "ccc", stringField) + require.Equal(t, 4, cnt["stringField"]) + require.Equal(t, 1, cnt["sub"]) + require.Equal(t, 1, cnt["e"]) + + // Test invalid new value + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "mp": { + ChangeType: storage.MODIFIED, + NewValue: "---", + }, + }, + }) + require.Equal(t, expected, s) + require.Equal(t, 1, cnt["mp"]) + + listener.OnChange(&storage.ChangeEvent{ + Changes: map[string]*storage.ConfigChange{ + "mp": { + ChangeType: storage.MODIFIED, + NewValue: `{"z": false}`, + }, + }, + }) + expected.B.D = map[string]bool{"z": false} + require.Equal(t, expected, s) + require.Equal(t, 1, cnt["mp"]) +} From 604d15a30fa385d3b1744eb04754d0d1c521c04f Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Tue, 9 Jul 2024 10:33:08 +0800 Subject: [PATCH 3/6] Change all Apollo config entries to use change listener instead --- claimtxman/claimtxman.go | 34 ++--- claimtxman/claimtxman_xlayer.go | 25 ++- cmd/run.go | 5 +- cmd/run_xlayer.go | 4 +- config/apolloconfig/apollo.go | 20 ++- config/apolloconfig/apollo_test.go | 7 +- config/apolloconfig/consts.go | 4 - config/apolloconfig/entry.go | 149 ------------------ config/apolloconfig/entry_test.go | 218 --------------------------- config/apolloconfig/listener.go | 11 +- config/apolloconfig/listener_test.go | 21 +-- config/apolloconfig/utils.go | 7 + estimatetime/estimatetime.go | 16 +- messagepush/kafkaproducer.go | 26 ++-- pushtask/committedbatchtask.go | 20 ++- pushtask/l1blocknumtask.go | 4 +- pushtask/verifiedbatchtask.go | 23 ++- redisstorage/redisstorage.go | 20 ++- server/service.go | 31 ++-- server/service_xlayer.go | 62 +++++--- synchronizer/synchronizer_xlayer.go | 11 +- test/operations/manager.go | 2 +- test/operations/mockserver.go | 4 +- utils/constant.go | 6 +- utils/helpers.go | 6 + 25 files changed, 229 insertions(+), 507 deletions(-) delete mode 100644 config/apolloconfig/entry.go delete mode 100644 config/apolloconfig/entry_test.go diff --git a/claimtxman/claimtxman.go b/claimtxman/claimtxman.go index 1db3a67f..6f0309f2 100644 --- a/claimtxman/claimtxman.go +++ b/claimtxman/claimtxman.go @@ -10,7 +10,6 @@ import ( "time" ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types" - "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/0xPolygonHermez/zkevm-bridge-service/messagepush" @@ -58,13 +57,11 @@ type ClaimTxManager struct { // Producer to push the transaction status change to front end messagePushProducer messagepush.KafkaProducer redisStorage redisstorage.RedisStorage - monitorTxsLimit apolloconfig.Entry[uint] + monitorTxsLimit uint } // NewClaimTxManager creates a new claim transaction manager. -func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, - l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer, - redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) { +func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, rollupID uint) (*ClaimTxManager, error) { ctx := context.Background() client, err := utils.NewClient(ctx, l2NodeURL, l2BridgeAddr) if err != nil { @@ -77,21 +74,18 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot ctx, cancel := context.WithCancel(ctx) auth, err := client.GetSignerFromKeystore(ctx, cfg.PrivateKey) return &ClaimTxManager{ - ctx: ctx, - cancel: cancel, - l2Node: client, - l2NetworkID: l2NetworkID, - bridgeService: bridgeService, - cfg: cfg, - chExitRootEvent: chExitRootEvent, - chSynced: chSynced, - storage: storage.(storageInterface), - auth: auth, - rollupID: rollupID, - nonceCache: cache, - messagePushProducer: producer, - redisStorage: redisStorage, - monitorTxsLimit: apolloconfig.NewIntEntry("claimtxman.monitorTxsLimit", uint(128)), //nolint:gomnd + ctx: ctx, + cancel: cancel, + l2Node: client, + l2NetworkID: l2NetworkID, + bridgeService: bridgeService, + cfg: cfg, + chExitRootEvent: chExitRootEvent, + chSynced: chSynced, + storage: storage.(storageInterface), + auth: auth, + rollupID: rollupID, + nonceCache: cache, }, err } diff --git a/claimtxman/claimtxman_xlayer.go b/claimtxman/claimtxman_xlayer.go index 9fe20bbb..0a334f1d 100644 --- a/claimtxman/claimtxman_xlayer.go +++ b/claimtxman/claimtxman_xlayer.go @@ -8,10 +8,13 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl/pb" ctmtypes "github.com/0xPolygonHermez/zkevm-bridge-service/claimtxman/types" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/0xPolygonHermez/zkevm-bridge-service/messagepush" "github.com/0xPolygonHermez/zkevm-bridge-service/metrics" "github.com/0xPolygonHermez/zkevm-bridge-service/pushtask" + "github.com/0xPolygonHermez/zkevm-bridge-service/redisstorage" "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-node/pool" @@ -23,6 +26,26 @@ import ( "github.com/pkg/errors" ) +const ( + defaultMonitorTxsLimit = 128 +) + +// NewClaimTxManagerXLayer creates a new claim transaction manager. +func NewClaimTxManagerXLayer(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, + l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}, producer messagepush.KafkaProducer, + redisStorage redisstorage.RedisStorage, rollupID uint) (*ClaimTxManager, error) { + tm, err := NewClaimTxManager(cfg, chExitRootEvent, chSynced, l2NodeURL, l2NetworkID, l2BridgeAddr, bridgeService, storage, rollupID) + if err != nil { + return tm, err + } + tm.messagePushProducer = producer + tm.redisStorage = redisStorage + tm.monitorTxsLimit = defaultMonitorTxsLimit + apolloconfig.RegisterChangeHandler("claimtxman.monitorTxsLimit", &tm.monitorTxsLimit) + apolloconfig.RegisterChangeHandler("ClaimTxManager", &tm.cfg) + return tm, nil +} + // StartXLayer will start the tx management, reading txs from storage, // send then to the blockchain and keep monitoring them until they // get mined @@ -423,7 +446,7 @@ func (tm *ClaimTxManager) monitorTxsXLayer(ctx context.Context) error { mLog.Infof("monitorTxs begin") statusesFilter := []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatusCreated} - mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit.Get(), 0, dbTx) + mTxs, err := tm.storage.GetClaimTxsByStatusWithLimit(ctx, statusesFilter, tm.monitorTxsLimit, 0, dbTx) if err != nil { mLog.Errorf("failed to get created monitored txs: %v", err) rollbackErr := tm.storage.Rollback(tm.ctx, dbTx) diff --git a/cmd/run.go b/cmd/run.go index 539a8f7b..eadea0fc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -21,7 +21,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/client" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/urfave/cli/v2" ) @@ -101,7 +100,7 @@ func start(ctx *cli.Context) error { return err } rollupID := l1Etherman.GetRollupID() - bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, []*utils.Client{nil}, []*bind.TransactOpts{nil}, apiStorage, rollupID) + bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, apiStorage, rollupID) err = server.RunServer(c.BridgeServer, bridgeService) if err != nil { log.Error(err) @@ -122,7 +121,7 @@ func start(ctx *cli.Context) error { for i := 0; i < len(c.Etherman.L2URLs); i++ { // we should match the orders of L2URLs between etherman and claimtxman // since we are using the networkIDs in the same order - claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, nil, nil, rollupID) + claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, rollupID) if err != nil { log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err) } diff --git a/cmd/run_xlayer.go b/cmd/run_xlayer.go index bef6577f..327a6379 100644 --- a/cmd/run_xlayer.go +++ b/cmd/run_xlayer.go @@ -208,7 +208,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error { utils.InitChainIdManager(networkIDs, chainIDs) rollupID := l1Etherman.GetRollupID() - bridgeService := server.NewBridgeService(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID). + bridgeService := server.NewBridgeServiceXLayer(c.BridgeServer, c.BridgeController.Height, networkIDs, l2NodeClients, l2Auths, apiStorage, rollupID). WithRedisStorage(redisStorage).WithMainCoinsCache(localcache.GetDefaultCache()).WithMessagePushProducer(messagePushProducer) // Initialize inner chain id conf @@ -279,7 +279,7 @@ func startServer(ctx *cli.Context, opts ...runOptionFunc) error { for i := 0; i < len(c.Etherman.L2URLs); i++ { // we should match the orders of L2URLs between etherman and claimtxman // since we are using the networkIDs in the same order - claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID) + claimTxManager, err := claimtxman.NewClaimTxManagerXLayer(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, messagePushProducer, redisStorage, rollupID) if err != nil { log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err) } diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index 61305a2a..26ebd05c 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -3,6 +3,7 @@ package apolloconfig import ( "encoding" "encoding/json" + "fmt" "reflect" "strings" @@ -49,6 +50,7 @@ func Init(c Config) error { return errors.Wrap(err, "start apollo client error") } + client.AddChangeListener(GetDefaultListener()) defaultClient = client disableEntryDebugLog = c.DisableEntryDebugLog return nil @@ -93,7 +95,7 @@ func handleObject(v reflect.Value, key string) error { field := reflect.Indirect(v) if key != "" && key != "-" { - val, err := NewStringEntry(key, "").GetWithErr() + val, err := getString(key) if err != nil { return err } @@ -141,3 +143,19 @@ func decodeStringToValue(val string, v reflect.Value) error { } return nil } + +var getString = func(key string) (string, error) { + client := GetClient() + if client == nil { + return "", errors.New("apollo client is nil") + } + v, err := client.GetConfig(defaultNamespace).GetCache().Get(key) + if err != nil { + return "", err + } + s, ok := v.(string) + if !ok { + return "", fmt.Errorf("value is not string, type: %T", v) + } + return s, nil +} diff --git a/config/apolloconfig/apollo_test.go b/config/apolloconfig/apollo_test.go index 308a224c..e3c1303a 100644 --- a/config/apolloconfig/apollo_test.go +++ b/config/apolloconfig/apollo_test.go @@ -53,13 +53,12 @@ func TestLoad(t *testing.T) { } enabled = true - getStringFn = func(_, key string, result *string) error { + getString = func(key string) (string, error) { s, ok := resultMapping[key] if !ok { - return errors.New("key not found") + return "", errors.New("key not found") } - *result = s - return nil + return s, nil } var output StructTest diff --git a/config/apolloconfig/consts.go b/config/apolloconfig/consts.go index b6a0b721..34a2fcfe 100644 --- a/config/apolloconfig/consts.go +++ b/config/apolloconfig/consts.go @@ -4,9 +4,5 @@ const ( loggerFieldKey = "component" loggerFieldValue = "apolloconfig" - parseIntBase = 10 - parseIntBitSize = 64 - - comma = "," defaultNamespace = "application" ) diff --git a/config/apolloconfig/entry.go b/config/apolloconfig/entry.go deleted file mode 100644 index 679cec58..00000000 --- a/config/apolloconfig/entry.go +++ /dev/null @@ -1,149 +0,0 @@ -package apolloconfig - -import ( - "encoding" - "encoding/json" - "fmt" - - "github.com/pkg/errors" - "golang.org/x/exp/constraints" -) - -type Entry[T any] interface { - Get() T - GetWithErr() (T, error) -} - -// An interface to get the config from Apollo client (and convert it if needed) -type getterFunction[T any] func(namespace, key string, result *T) error - -type entryImpl[T any] struct { - namespace string - key string - defaultValue T - getterFn getterFunction[T] -} - -type entryOption[T any] func(*entryImpl[T]) - -func WithNamespace[T any](namespace string) entryOption[T] { - return func(e *entryImpl[T]) { - e.namespace = namespace - } -} - -// newEntry is a generic constructor for apolloconfig.Entry -func newEntry[T any](key string, defaultValue T, getterFn getterFunction[T], opts ...entryOption[T]) Entry[T] { - e := &entryImpl[T]{ - namespace: defaultNamespace, - key: key, - defaultValue: defaultValue, - getterFn: getterFn, - } - - for _, o := range opts { - o(e) - } - - return e -} - -func NewIntEntry[T constraints.Integer](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { - return newEntry(key, defaultValue, getJson[T], opts...) -} - -func NewIntSliceEntry[T constraints.Integer](key string, defaultValue []T, opts ...entryOption[[]T]) Entry[[]T] { - return newEntry(key, defaultValue, getJson[[]T], opts...) -} - -func NewBoolEntry(key string, defaultValue bool, opts ...entryOption[bool]) Entry[bool] { - return newEntry(key, defaultValue, getJson[bool], opts...) -} - -func NewStringEntry(key string, defaultValue string, opts ...entryOption[string]) Entry[string] { - return newEntry(key, defaultValue, getStringFn, opts...) -} - -func NewStringSliceEntry(key string, defaultValue []string, opts ...entryOption[[]string]) Entry[[]string] { - return newEntry(key, defaultValue, getJson[[]string], opts...) -} - -func NewJsonEntry[T any](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { - return newEntry(key, defaultValue, getJson[T], opts...) -} - -func NewTextUnmarshalerEntry[T encoding.TextUnmarshaler](key string, defaultValue T, opts ...entryOption[T]) Entry[T] { - return newEntry(key, defaultValue, getTextUnmarshaler[T], opts...) -} - -func (e *entryImpl[T]) String() string { - return fmt.Sprintf("%v", e.Get()) -} - -func (e *entryImpl[T]) Get() T { - logger := getLogger().WithFields("key", e.key) - v, err := e.GetWithErr() - if err != nil && !disableEntryDebugLog { - logger.Debugf("error[%v], returning default value", err) - } - return v -} - -func (e *entryImpl[T]) GetWithErr() (T, error) { - // If Apollo config is not enabled, just return the default value - if !enabled { - return e.defaultValue, errors.New("apollo disabled") - } - - if e.getterFn == nil { - return e.defaultValue, errors.New("getterFn is nil") - } - - var v T - err := e.getterFn(e.namespace, e.key, &v) - if err != nil { - return e.defaultValue, errors.Wrap(err, "getterFn error") - } - return v, nil -} - -// ----- Getter functions ----- - -var getStringFn = getString - -func getString(namespace, key string, result *string) error { - client := GetClient() - if client == nil { - return errors.New("apollo client is nil") - } - v, err := client.GetConfig(namespace).GetCache().Get(key) - if err != nil { - return err - } - s, ok := v.(string) - if !ok { - return fmt.Errorf("value is not string, type: %T", v) - } - *result = s - return nil -} - -func getTextUnmarshaler[T encoding.TextUnmarshaler](namespace, key string, result *T) error { - var s string - err := getStringFn(namespace, key, &s) - if err != nil { - return err - } - err = (*result).UnmarshalText([]byte(s)) - return err -} - -func getJson[T any](namespace, key string, result *T) error { - var s string - err := getStringFn(namespace, key, &s) - if err != nil { - return err - } - err = json.Unmarshal([]byte(s), result) - return err -} diff --git a/config/apolloconfig/entry_test.go b/config/apolloconfig/entry_test.go deleted file mode 100644 index e2171204..00000000 --- a/config/apolloconfig/entry_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package apolloconfig - -import ( - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/require" -) - -func TestGetInt(t *testing.T) { - testCases := []struct { - inputString string - outputResult int - outputHasErr bool - }{ - {"1", 1, false}, - {"-11", -11, false}, - {"0", 0, false}, - {"", 0, true}, - {" 22 ", 22, false}, - {"asd", 0, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res int - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetFloat(t *testing.T) { - testCases := []struct { - inputString string - outputResult float64 - outputHasErr bool - }{ - {"1", 1, false}, - {"-11.5", -11.5, false}, - {"0.222", 0.222, false}, - {"", 0, true}, - {" 22.1234 ", 22.1234, false}, - {"asd", 0, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res float64 - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetIntSlice(t *testing.T) { - testCases := []struct { - inputString string - outputResult []int - outputHasErr bool - }{ - {"[1,2,3]", []int{1, 2, 3}, false}, - {"[]", []int{}, false}, - {"0", nil, true}, - {"1,2,3", nil, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res []int - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetStringSlice(t *testing.T) { - testCases := []struct { - inputString string - outputResult []string - outputHasErr bool - }{ - {"[\"ab\",\"c\",\"d\\\"\"]", []string{"ab", "c", "d\""}, false}, - {"[]", []string{}, false}, - {"\"a\"", nil, true}, - {"abc", nil, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res []string - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetJsonStruct(t *testing.T) { - type S1 struct { - A int `json:"a"` - B string `json:"b"` - } - - type S2 struct { - C S1 `json:"sub"` - D bool `json:"d"` - } - - testCases := []struct { - inputString string - outputResult *S2 - outputHasErr bool - }{ - {`{"sub":{"a":1,"b":"abc"},"d":true}`, &S2{S1{1, "abc"}, true}, false}, - {"{}", &S2{S1{0, ""}, false}, false}, - {"", nil, true}, - {"abc", nil, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res *S2 - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetJsonMap(t *testing.T) { - testCases := []struct { - inputString string - outputResult map[string]string - outputHasErr bool - }{ - {`{"sub":{"a":1,"b":"abc"},"d":true}`, nil, true}, - {`{"a":"1","b":"2"}`, map[string]string{"a": "1", "b": "2"}, false}, - {`{"a":"1","b":2}`, nil, true}, - {"abc", nil, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res map[string]string - err := getJson("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, res, "Case #%v", i) - } - } -} - -func TestGetTextUnmarshaler(t *testing.T) { - testCases := []struct { - inputString string - outputResult common.Address - outputHasErr bool - }{ - {"0x167985f547e5087DA14084b80762104d36c08756", common.HexToAddress("0x167985f547e5087DA14084b80762104d36c08756"), false}, - {"[]", common.Address{}, true}, - {"", common.Address{}, true}, - {`"0x167985f547e5087DA14084b80762104d36c08756"`, common.Address{}, true}, - } - - for i, c := range testCases { - getStringFn = func(_ string, _ string, result *string) error { - *result = c.inputString - return nil - } - var res = &common.Address{} - err := getTextUnmarshaler("", "", &res) - if c.outputHasErr { - require.Errorf(t, err, "Case #%v", i) - } else { - require.NoErrorf(t, err, "Case #%v", i) - require.Equalf(t, c.outputResult, *res, "Case #%v", i) - } - } -} diff --git a/config/apolloconfig/listener.go b/config/apolloconfig/listener.go index 4740ea03..ff091d42 100644 --- a/config/apolloconfig/listener.go +++ b/config/apolloconfig/listener.go @@ -26,12 +26,15 @@ func GetDefaultListener() *ConfigChangeListener { return listener } -func RegisterChangeHandler(key string, opts ...handlerOpt) { +func RegisterChangeHandler[T any](key string, obj *T, opts ...handlerOpt) { + if obj != nil { + opts = append(opts, withConfigObj(obj)) + } GetDefaultListener().RegisterHandler(key, opts...) } func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) { - getLogger().Debugf("ConfigChangeListener#OnChange received: %+v", event) + getLogger().Debugf("ConfigChangeListener#OnChange received: %v", toJson(event)) for key, change := range event.Changes { // Only handle ADDED and MODIFIED type @@ -85,10 +88,10 @@ func (h *changeHandler) handle(change *storage.ConfigChange, key string) { type handlerOpt func(handler *changeHandler) -// WithConfigObj assigns an object to be updated when a specific config key is changed. +// withConfigObj assigns an object to be updated when a specific config key is changed. // The logic for updating can be found in decodeStringToValue function // obj must be a pointer -func WithConfigObj[T any](obj *T) handlerOpt { +func withConfigObj[T any](obj *T) handlerOpt { return func(handler *changeHandler) { handler.obj = obj } diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go index f0a29759..970e11b4 100644 --- a/config/apolloconfig/listener_test.go +++ b/config/apolloconfig/listener_test.go @@ -1,6 +1,7 @@ package apolloconfig import ( + "sync" "testing" "github.com/apolloconfig/agollo/v4/storage" @@ -28,13 +29,12 @@ func TestConfigChangeListener(t *testing.T) { } enabled = true - getStringFn = func(_, key string, result *string) error { + getString = func(key string) (string, error) { s, ok := configMapping[key] if !ok { - return errors.New("key not found") + return "", errors.New("key not found") } - *result = s - return nil + return s, nil } expected := StructTest{ @@ -57,11 +57,12 @@ func TestConfigChangeListener(t *testing.T) { require.Equal(t, expected, s) var stringField = s.A - RegisterChangeHandler("stringField", WithConfigObj(&stringField), WithCallbackFn(callback)) - RegisterChangeHandler("stringField", WithConfigObj(&s.A), WithCallbackFn(callback)) - RegisterChangeHandler("sub", WithConfigObj(&s.B), WithCallbackFn(callback)) - RegisterChangeHandler("e", WithConfigObj(&s.B.E), WithCallbackFn(callback)) - RegisterChangeHandler("mp", WithConfigObj(&s.B.D), WithCallbackFn(callback)) + mutex := &sync.Mutex{} + RegisterChangeHandler("stringField", &stringField, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("stringField", &s.A, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("sub", &s.B, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("e", &s.B.E, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("mp", &s.B.D, WithCallbackFn(callback), WithLocker(mutex)) listener := GetDefaultListener() listener.OnChange(&storage.ChangeEvent{ @@ -126,5 +127,5 @@ func TestConfigChangeListener(t *testing.T) { }) expected.B.D = map[string]bool{"z": false} require.Equal(t, expected, s) - require.Equal(t, 1, cnt["mp"]) + require.Equal(t, 2, cnt["mp"]) } diff --git a/config/apolloconfig/utils.go b/config/apolloconfig/utils.go index 3ae02367..9d2af34c 100644 --- a/config/apolloconfig/utils.go +++ b/config/apolloconfig/utils.go @@ -1,6 +1,8 @@ package apolloconfig import ( + "encoding/json" + "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/apolloconfig/agollo/v4" ) @@ -12,3 +14,8 @@ func getLogger() *log.Logger { func SetLogger() { agollo.SetLogger(getLogger()) } + +func toJson(x any) string { + b, _ := json.Marshal(x) + return string(b) +} diff --git a/estimatetime/estimatetime.go b/estimatetime/estimatetime.go index c1e67963..091eb755 100644 --- a/estimatetime/estimatetime.go +++ b/estimatetime/estimatetime.go @@ -45,8 +45,8 @@ func GetDefaultCalculator() Calculator { type calculatorImpl struct { storage DBStorage estimateTime []uint32 // In minutes - defaultEstTimeConfig apolloconfig.Entry[[]uint32] - sampleLimit apolloconfig.Entry[uint] + defaultEstTimeConfig []uint32 + sampleLimit uint } func NewCalculator(storage interface{}) (Calculator, error) { @@ -56,10 +56,12 @@ func NewCalculator(storage interface{}) (Calculator, error) { c := &calculatorImpl{ storage: storage.(DBStorage), estimateTime: make([]uint32, estTimeSize), - defaultEstTimeConfig: apolloconfig.NewIntSliceEntry[uint32](estTimeConfigKey, []uint32{defaultL1EstimateTime, defaultL2EstimateTime}), - sampleLimit: apolloconfig.NewIntEntry[uint](sampleLimitConfigKey, defaultSampleLimit), + defaultEstTimeConfig: []uint32{defaultL1EstimateTime, defaultL2EstimateTime}, + sampleLimit: defaultSampleLimit, } - def := c.defaultEstTimeConfig.Get() + apolloconfig.RegisterChangeHandler("estimateTime.defaultTime", &c.defaultEstTimeConfig) + apolloconfig.RegisterChangeHandler("estimateTime.sampleLimit", &c.sampleLimit) + def := c.defaultEstTimeConfig for i := 0; i < estTimeSize; i++ { c.estimateTime[i] = def[i] } @@ -93,7 +95,7 @@ func (c *calculatorImpl) refresh(ctx context.Context, networkID uint) error { if networkID > 1 { return fmt.Errorf("invalid networkID %v", networkID) } - deposits, err := c.storage.GetLatestReadyDeposits(ctx, networkID, c.sampleLimit.Get(), nil) + deposits, err := c.storage.GetLatestReadyDeposits(ctx, networkID, c.sampleLimit, nil) if err != nil { log.Errorf("GetLatestReadyDeposits err:%v", err) return err @@ -121,7 +123,7 @@ func (c *calculatorImpl) refresh(ctx context.Context, networkID uint) error { } newTime := uint32(math.Ceil(sum / float64(len(fMinutes)))) log.Debugf("Re-calculate estimate time, networkID[%v], fMinutes[%v], newTime[%v]", networkID, fMinutes, newTime) - defaultTime := c.defaultEstTimeConfig.Get()[networkID] + defaultTime := c.defaultEstTimeConfig[networkID] if newTime > defaultTime { newTime = defaultTime } diff --git a/messagepush/kafkaproducer.go b/messagepush/kafkaproducer.go index 64eb7888..e6db9590 100644 --- a/messagepush/kafkaproducer.go +++ b/messagepush/kafkaproducer.go @@ -48,9 +48,9 @@ type KafkaProducer interface { type kafkaProducerImpl struct { producer sarama.SyncProducer - defaultTopic apolloconfig.Entry[string] - defaultPushKey apolloconfig.Entry[string] - bizCode apolloconfig.Entry[string] + defaultTopic string + defaultPushKey string + bizCode string } func NewKafkaProducer(cfg Config) (KafkaProducer, error) { @@ -87,12 +87,16 @@ func NewKafkaProducer(cfg Config) (KafkaProducer, error) { if err != nil { return nil, errors.Wrap(err, "NewKafkaProducer: NewSyncProducer error") } - return &kafkaProducerImpl{ + p := &kafkaProducerImpl{ producer: producer, - defaultTopic: apolloconfig.NewStringEntry("MessagePushProducer.Topic", cfg.Topic), - defaultPushKey: apolloconfig.NewStringEntry("MessagePushProducer.PushKey", cfg.PushKey), - bizCode: apolloconfig.NewStringEntry("MessagePushProducer.BizCode", BizCodeBridgeOrder), - }, nil + defaultTopic: cfg.Topic, + defaultPushKey: cfg.PushKey, + bizCode: BizCodeBridgeOrder, + } + apolloconfig.RegisterChangeHandler("MessagePushProducer.Topic", &p.defaultTopic) + apolloconfig.RegisterChangeHandler("MessagePushProducer.PushKey", &p.defaultPushKey) + apolloconfig.RegisterChangeHandler("MessagePushProducer.BizCode", &p.bizCode) + return p, nil } // Produce send a message to the Kafka topic @@ -104,8 +108,8 @@ func (p *kafkaProducerImpl) Produce(msg interface{}, optFns ...produceOptFunc) e return nil } opts := &produceOptions{ - topic: p.defaultTopic.Get(), - pushKey: p.defaultPushKey.Get(), + topic: p.defaultTopic, + pushKey: p.defaultPushKey, } for _, f := range optFns { f(opts) @@ -159,7 +163,7 @@ func (p *kafkaProducerImpl) PushTransactionUpdate(tx *pb.Transaction, optFns ... } msg := &PushMessage{ - BizCode: p.bizCode.Get(), + BizCode: p.bizCode, WalletAddress: tx.GetDestAddr(), RequestID: utils.GenerateTraceID(), PushContent: fmt.Sprintf("[%v]", string(b)), diff --git a/pushtask/committedbatchtask.go b/pushtask/committedbatchtask.go index 9ea5740b..6fdf9688 100644 --- a/pushtask/committedbatchtask.go +++ b/pushtask/committedbatchtask.go @@ -25,11 +25,17 @@ const ( ) var ( - minCommitDuration = apolloconfig.NewIntEntry[uint64]("pushtask.minCommitDuration", 2) //nolint:gomnd - defaultCommitDuration = apolloconfig.NewIntEntry[uint64]("pushtask.defaultCommitDuration", 10) //nolint:gomnd - commitDurationListLen = apolloconfig.NewIntEntry[int]("pushtask.commitDurationListLen", 5) //nolint:gomnd + minCommitDuration uint64 = 2 //nolint:gomnd + defaultCommitDuration uint64 = 10 //nolint:gomnd + commitDurationListLen = 5 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("pushtask.minCommitDuration", &minCommitDuration) + apolloconfig.RegisterChangeHandler("pushtask.defaultCommitDuration", &defaultCommitDuration) + apolloconfig.RegisterChangeHandler("pushtask.commitDurationListLen", &commitDurationListLen) +} + type CommittedBatchHandler struct { rpcUrl string client *ethclient.Client @@ -214,7 +220,7 @@ func (ins *CommittedBatchHandler) freshRedisForAvgCommitDuration(ctx context.Con if err != nil { return err } - if listLen <= int64(commitDurationListLen.Get()) { + if listLen <= int64(commitDurationListLen) { log.Infof("redis duration list is not enough, so skip count the avg duration!") return nil } @@ -297,18 +303,18 @@ func (ins *CommittedBatchHandler) pushMsgForDeposit(deposit *etherman.Deposit, l // checkAvgDurationLegal duration has a default range, 2-10 minutes, if over range, maybe dirty data, drop the data func (ins *CommittedBatchHandler) checkAvgDurationLegal(avgDuration int64) bool { - return avgDuration > int64(minCommitDuration.Get()) && avgDuration < int64(defaultCommitDuration.Get()) + return avgDuration > int64(minCommitDuration) && avgDuration < int64(defaultCommitDuration) } func GetAvgCommitDuration(ctx context.Context, redisStorage redisstorage.RedisStorage) uint64 { avgDuration, err := redisStorage.GetAvgCommitDuration(ctx) if err != nil && !errors.Is(err, redis.Nil) { log.Errorf("get avg commit duration from redis failed, error: %v", err) - return defaultCommitDuration.Get() + return defaultCommitDuration } if avgDuration == 0 { log.Infof("get avg commit duration from redis is 0, so use default") - return defaultCommitDuration.Get() + return defaultCommitDuration } return avgDuration } diff --git a/pushtask/l1blocknumtask.go b/pushtask/l1blocknumtask.go index 29ef457e..645ea904 100644 --- a/pushtask/l1blocknumtask.go +++ b/pushtask/l1blocknumtask.go @@ -105,8 +105,8 @@ func (t *L1BlockNumTask) doTask(ctx context.Context) { }(blockNum) // Minus 64 to get the target query block num - oldBlockNum -= utils.Min(utils.L1TargetBlockConfirmations.Get(), oldBlockNum) - blockNum -= utils.Min(utils.L1TargetBlockConfirmations.Get(), blockNum) + oldBlockNum -= utils.Min(utils.L1TargetBlockConfirmations, oldBlockNum) + blockNum -= utils.Min(utils.L1TargetBlockConfirmations, blockNum) if blockNum <= oldBlockNum { return } diff --git a/pushtask/verifiedbatchtask.go b/pushtask/verifiedbatchtask.go index dda6999c..3317dd24 100644 --- a/pushtask/verifiedbatchtask.go +++ b/pushtask/verifiedbatchtask.go @@ -17,12 +17,19 @@ const ( ) var ( - minVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.minVerifyDuration", 2) //nolint:gomnd - defaultVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.defaultVerifyDuration", 10) //nolint:gomnd - maxVerifyDuration = apolloconfig.NewIntEntry[uint64]("pushtask.maxVerifyDuration", 60) //nolint:gomnd - verifyDurationListLen = apolloconfig.NewIntEntry("pushtask.verifyDurationListLen", 5) //nolint:gomnd + minVerifyDuration uint64 = 2 //nolint:gomnd + defaultVerifyDuration uint64 = 10 //nolint:gomnd + maxVerifyDuration uint64 = 60 //nolint:gomnd + verifyDurationListLen = 5 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("pushtask.minVerifyDuration", &minVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.defaultVerifyDuration", &defaultVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.maxVerifyDuration", &maxVerifyDuration) + apolloconfig.RegisterChangeHandler("pushtask.verifyDurationListLen", &verifyDurationListLen) +} + type VerifiedBatchHandler struct { rpcUrl string redisStorage redisstorage.RedisStorage @@ -109,7 +116,7 @@ func (ins *VerifiedBatchHandler) freshRedisForAvgCommitDuration(ctx context.Cont if err != nil { return err } - if listLen <= int64(verifyDurationListLen.Get()) { + if listLen <= int64(verifyDurationListLen) { log.Infof("redis verify duration list is not enough, so skip count the avg duration!") return nil } @@ -151,18 +158,18 @@ func (ins *VerifiedBatchHandler) checkLatestBatchLegal(ctx context.Context, late // checkAvgDurationLegal duration has a default range, 2-30 minutes, if over range, maybe dirty data, drop the data func (ins *VerifiedBatchHandler) checkAvgDurationLegal(avgDuration int64) bool { - return avgDuration > int64(minVerifyDuration.Get()) && avgDuration < int64(maxVerifyDuration.Get()) + return avgDuration > int64(minVerifyDuration) && avgDuration < int64(maxVerifyDuration) } func GetAvgVerifyDuration(ctx context.Context, redisStorage redisstorage.RedisStorage) uint64 { avgDuration, err := redisStorage.GetAvgVerifyDuration(ctx) if err != nil && !errors.Is(err, redis.Nil) { log.Errorf("get avg verify duration from redis failed, error: %v", err) - return defaultVerifyDuration.Get() + return defaultVerifyDuration } if avgDuration == 0 { log.Infof("get avg verify duration from redis is 0, so use default") - return defaultVerifyDuration.Get() + return defaultVerifyDuration } return avgDuration } diff --git a/redisstorage/redisstorage.go b/redisstorage/redisstorage.go index a54fab69..2ce18742 100644 --- a/redisstorage/redisstorage.go +++ b/redisstorage/redisstorage.go @@ -51,8 +51,8 @@ const ( // redisStorageImpl implements RedisStorage interface type redisStorageImpl struct { client RedisClient - enableCoinPriceCfg apolloconfig.Entry[bool] - keyPrefix apolloconfig.Entry[string] + enableCoinPriceCfg bool + keyPrefix string } func NewRedisStorage(cfg Config) (RedisStorage, error) { @@ -79,14 +79,18 @@ func NewRedisStorage(cfg Config) (RedisStorage, error) { return nil, errors.Wrap(err, "cannot connect to redis server") } log.Debugf("redis health check done, result: %v", res) - return &redisStorageImpl{client: client, - enableCoinPriceCfg: apolloconfig.NewBoolEntry("CoinPrice.Enabled", cfg.EnablePrice), - keyPrefix: apolloconfig.NewStringEntry("Redis.KeyPrefix", cfg.KeyPrefix), - }, nil + s := &redisStorageImpl{client: client, + enableCoinPriceCfg: cfg.EnablePrice, + keyPrefix: cfg.KeyPrefix, + } + apolloconfig.RegisterChangeHandler("CoinPrice.Enabled", &s.enableCoinPriceCfg) + apolloconfig.RegisterChangeHandler("Redis.KeyPrefix", &s.keyPrefix) + + return s, nil } func (s *redisStorageImpl) addKeyPrefix(key string) string { - return s.keyPrefix.Get() + key + return s.keyPrefix + key } func (s *redisStorageImpl) SetCoinPrice(ctx context.Context, prices []*pb.SymbolPrice) error { @@ -174,7 +178,7 @@ func (s *redisStorageImpl) GetCoinPrice(ctx context.Context, symbols []*pb.Symbo log.Debugf("GetCoinPrice size[%v]", len(symbols)) var priceList []*pb.SymbolPrice var err error - if s.enableCoinPriceCfg.Get() { + if s.enableCoinPriceCfg { priceList, err = s.getCoinPrice(ctx, symbols) if err != nil { return nil, err diff --git a/server/service.go b/server/service.go index 44eca1bb..95da8395 100644 --- a/server/service.go +++ b/server/service.go @@ -7,7 +7,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl" "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl/pb" - "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/localcache" "github.com/0xPolygonHermez/zkevm-bridge-service/log" @@ -26,8 +25,8 @@ type bridgeService struct { storage bridgeServiceStorage networkIDs map[uint]uint8 height uint8 - defaultPageLimit apolloconfig.Entry[uint32] - maxPageLimit apolloconfig.Entry[uint32] + defaultPageLimit uint32 + maxPageLimit uint32 version string cache *lru.Cache[string, [][]byte] pb.UnimplementedBridgeServiceServer @@ -41,16 +40,10 @@ type bridgeService struct { } // NewBridgeService creates new bridge service. -func NewBridgeService(cfg Config, height uint8, networks []uint, l2Clients []*utils.Client, l2Auths []*bind.TransactOpts, storage interface{}, rollupID uint) *bridgeService { +func NewBridgeService(cfg Config, height uint8, networks []uint, storage interface{}, rollupID uint) *bridgeService { var networkIDs = make(map[uint]uint8) - var nodeClients = make(map[uint]*utils.Client, len(networks)) - var authMap = make(map[uint]*bind.TransactOpts, len(networks)) for i, network := range networks { networkIDs[network] = uint8(i) - if i > 0 { - nodeClients[network] = l2Clients[i-1] - authMap[network] = l2Auths[i-1] - } } cache, err := lru.New[string, [][]byte](cfg.CacheSize) if err != nil { @@ -61,10 +54,8 @@ func NewBridgeService(cfg Config, height uint8, networks []uint, l2Clients []*ut storage: storage.(bridgeServiceStorage), height: height, networkIDs: networkIDs, - nodeClients: nodeClients, - auths: authMap, - defaultPageLimit: apolloconfig.NewIntEntry("BridgeServer.DefaultPageLimit", cfg.DefaultPageLimit), - maxPageLimit: apolloconfig.NewIntEntry("BridgeServer.MaxPageLimit", cfg.MaxPageLimit), + defaultPageLimit: cfg.DefaultPageLimit, + maxPageLimit: cfg.MaxPageLimit, version: cfg.BridgeVersion, cache: cache, } @@ -263,10 +254,10 @@ func (s *bridgeService) CheckAPI(ctx context.Context, req *pb.CheckAPIRequest) ( func (s *bridgeService) GetBridges(ctx context.Context, req *pb.GetBridgesRequest) (*pb.GetBridgesResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } totalCount, err := s.storage.GetDepositCount(ctx, req.DestAddr, nil) if err != nil { @@ -318,10 +309,10 @@ func (s *bridgeService) GetBridges(ctx context.Context, req *pb.GetBridgesReques func (s *bridgeService) GetClaims(ctx context.Context, req *pb.GetClaimsRequest) (*pb.GetClaimsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } totalCount, err := s.storage.GetClaimCount(ctx, req.DestAddr, nil) if err != nil { diff --git a/server/service_xlayer.go b/server/service_xlayer.go index 30f4c170..c237d4b1 100644 --- a/server/service_xlayer.go +++ b/server/service_xlayer.go @@ -21,6 +21,7 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/utils" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/gerror" "github.com/0xPolygonHermez/zkevm-bridge-service/utils/messagebridge" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" "github.com/redis/go-redis/v9" ) @@ -31,9 +32,32 @@ const ( ) var ( - minReadyTimeLimitForWaitClaimSeconds = apolloconfig.NewIntEntry[int64]("api.minReadyTimeLimitForWaitClaim", 24*60*1000) //nolint:gomnd + minReadyTimeLimitForWaitClaimSeconds int64 = 24 * 60 * 1000 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("api.minReadyTimeLimitForWaitClaim", &minReadyTimeLimitForWaitClaimSeconds) +} + +// NewBridgeServiceXLayer creates new bridge service. +func NewBridgeServiceXLayer(cfg Config, height uint8, networks []uint, l2Clients []*utils.Client, l2Auths []*bind.TransactOpts, storage interface{}, rollupID uint) *bridgeService { + s := NewBridgeService(cfg, height, networks, storage, rollupID) + + var nodeClients = make(map[uint]*utils.Client, len(networks)) + var authMap = make(map[uint]*bind.TransactOpts, len(networks)) + for i, network := range networks { + if i > 0 { + nodeClients[network] = l2Clients[i-1] + authMap[network] = l2Auths[i-1] + } + } + s.nodeClients = nodeClients + s.auths = authMap + apolloconfig.RegisterChangeHandler("BridgeServer.DefaultPageLimit", &s.defaultPageLimit) + apolloconfig.RegisterChangeHandler("BridgeServer.MaxPageLimit", &s.maxPageLimit) + return s +} + func (s *bridgeService) WithRedisStorage(storage redisstorage.RedisStorage) *bridgeService { s.redisStorage = storage return s @@ -129,10 +153,10 @@ func (s *bridgeService) GetMainCoins(ctx context.Context, req *pb.GetMainCoinsRe func (s *bridgeService) GetPendingTransactions(ctx context.Context, req *pb.GetPendingTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetPendingTransactions(ctx, req.DestAddr, uint(limit+1), uint(req.Offset), messagebridge.GetContractAddressList(), nil) @@ -178,7 +202,7 @@ func (s *bridgeService) GetPendingTransactions(ctx context.Context, req *pb.GetP // For L1->L2, when ready_for_claim is false, but there have been more than 64 block confirmations, // should also display the status as "L2 executing" (pending auto claim) if deposit.NetworkID == 0 { - if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations.Get() { + if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations { transaction.Status = uint32(pb.TransactionStatus_TX_PENDING_AUTO_CLAIM) } } else { @@ -228,10 +252,10 @@ func (s *bridgeService) setDurationForL2Deposit(ctx context.Context, l2AvgCommit func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetDepositsXLayer(ctx, req.DestAddr, uint(limit+1), uint(req.Offset), messagebridge.GetContractAddressList(), nil) @@ -291,7 +315,7 @@ func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTr // For L1->L2, when ready_for_claim is false, but there have been more than 64 block confirmations, // should also display the status as "L2 executing" (pending auto claim) if deposit.NetworkID == 0 { - if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations.Get() { + if l1BlockNum-deposit.BlockNumber >= utils.L1TargetBlockConfirmations { transaction.Status = uint32(pb.TransactionStatus_TX_PENDING_AUTO_CLAIM) } } else { @@ -325,10 +349,10 @@ func (s *bridgeService) GetAllTransactions(ctx context.Context, req *pb.GetAllTr func (s *bridgeService) GetNotReadyTransactions(ctx context.Context, req *pb.GetNotReadyTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } deposits, err := s.storage.GetNotReadyTransactions(ctx, uint(limit+1), uint(req.Offset), nil) @@ -363,10 +387,10 @@ func (s *bridgeService) GetNotReadyTransactions(ctx context.Context, req *pb.Get func (s *bridgeService) GetMonitoredTxsByStatus(ctx context.Context, req *pb.GetMonitoredTxsByStatusRequest) (*pb.CommonMonitoredTxsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } mTxs, err := s.storage.GetClaimTxsByStatusWithLimit(ctx, []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatus(req.Status)}, uint(limit+1), uint(req.Offset), nil) @@ -502,13 +526,13 @@ func (s *bridgeService) ManualClaim(ctx context.Context, req *pb.ManualClaimRequ func (s *bridgeService) GetReadyPendingTransactions(ctx context.Context, req *pb.GetReadyPendingTransactionsRequest) (*pb.CommonTransactionsResponse, error) { limit := req.Limit if limit == 0 { - limit = s.defaultPageLimit.Get() + limit = s.defaultPageLimit } - if limit > s.maxPageLimit.Get() { - limit = s.maxPageLimit.Get() + if limit > s.maxPageLimit { + limit = s.maxPageLimit } - minReadyTime := time.Now().Add(time.Duration(-minReadyTimeLimitForWaitClaimSeconds.Get()) * time.Second) + minReadyTime := time.Now().Add(time.Duration(-minReadyTimeLimitForWaitClaimSeconds) * time.Second) deposits, err := s.storage.GetReadyPendingTransactions(ctx, uint(req.NetworkId), uint(limit+1), uint(req.Offset), minReadyTime, nil) if err != nil { diff --git a/synchronizer/synchronizer_xlayer.go b/synchronizer/synchronizer_xlayer.go index 15674cd1..a68611be 100644 --- a/synchronizer/synchronizer_xlayer.go +++ b/synchronizer/synchronizer_xlayer.go @@ -21,14 +21,17 @@ import ( ) const ( - num1 = 1 - wstETHRedisLockKey = "wst_eth_l2_token_not_withdrawn_lock_" + num1 = 1 ) var ( - largeTxUsdLimit = apolloconfig.NewIntEntry[uint64]("Synchronizer.LargeTxUsdLimit", 100000) //nolint:gomnd + largeTxUsdLimit = 100000 //nolint:gomnd ) +func init() { + apolloconfig.RegisterChangeHandler("Synchronizer.LargeTxUsdLimit", &largeTxUsdLimit) +} + func (s *ClientSynchronizer) beforeProcessDeposit(deposit *etherman.Deposit) { messagebridge.ReplaceDepositDestAddresses(deposit) } @@ -142,7 +145,7 @@ func (s *ClientSynchronizer) filterLargeTransaction(ctx context.Context, transac tokenDecimal := new(big.Float).SetPrec(uint(transaction.GetLogoInfo().Decimal)).SetFloat64(math.Pow10(int(transaction.GetLogoInfo().Decimal))) tokenAmount, _ := new(big.Float).Quo(originNum, tokenDecimal).Float64() usdAmount := priceInfos[0].Price * tokenAmount - if usdAmount < float64(largeTxUsdLimit.Get()) { + if usdAmount < float64(largeTxUsdLimit) { log.Infof("tx usd amount less than limit, so skip, tx usd amount: %v, tx: %v", usdAmount, transaction.GetTxHash()) return } diff --git a/test/operations/manager.go b/test/operations/manager.go index 93fdfb53..b96e2321 100644 --- a/test/operations/manager.go +++ b/test/operations/manager.go @@ -119,7 +119,7 @@ func NewManager(ctx context.Context, cfg *Config) (*Manager, error) { if err != nil { return nil, err } - bService := server.NewBridgeService(cfg.BS, cfg.BT.Height, []uint{0, 1}, []*utils.Client{nil}, []*bind.TransactOpts{nil}, pgst, rollupID) + bService := server.NewBridgeService(cfg.BS, cfg.BT.Height, []uint{0, 1}, pgst, rollupID) opsman.storage = st.(StorageInterface) opsman.bridgetree = bt opsman.bridgeService = bService diff --git a/test/operations/mockserver.go b/test/operations/mockserver.go index 497b6f6d..192300ba 100644 --- a/test/operations/mockserver.go +++ b/test/operations/mockserver.go @@ -7,8 +7,6 @@ import ( "github.com/0xPolygonHermez/zkevm-bridge-service/bridgectrl" "github.com/0xPolygonHermez/zkevm-bridge-service/db/pgstorage" "github.com/0xPolygonHermez/zkevm-bridge-service/server" - "github.com/0xPolygonHermez/zkevm-bridge-service/utils" - "github.com/ethereum/go-ethereum/accounts/abi/bind" ) // RunMockServer runs mock server @@ -46,6 +44,6 @@ func RunMockServer(dbType string, height uint8, networks []uint) (*bridgectrl.Br MaxPageLimit: 100, //nolint:gomnd BridgeVersion: "v1", } - bridgeService := server.NewBridgeService(cfg, btCfg.Height, networks, []*utils.Client{nil}, []*bind.TransactOpts{nil}, store, rollupID) + bridgeService := server.NewBridgeService(cfg, btCfg.Height, networks, store, rollupID) return bt, store, server.RunServer(cfg, bridgeService) } diff --git a/utils/constant.go b/utils/constant.go index 933d2ecc..68e7df8b 100644 --- a/utils/constant.go +++ b/utils/constant.go @@ -17,5 +17,9 @@ const ( var ( // L1TargetBlockConfirmations is the number of block confirmations need to wait for the transaction to be synced from L1 to L2 - L1TargetBlockConfirmations = apolloconfig.NewIntEntry[uint64]("l1TargetBlockConfirmations", 64) //nolint:gomnd + L1TargetBlockConfirmations uint64 = 64 //nolint:gomnd ) + +func init() { + apolloconfig.RegisterChangeHandler("l1TargetBlockConfirmations", &L1TargetBlockConfirmations) +} diff --git a/utils/helpers.go b/utils/helpers.go index 3320d474..6eff8e25 100644 --- a/utils/helpers.go +++ b/utils/helpers.go @@ -2,6 +2,7 @@ package utils import ( "crypto/sha256" + "encoding/json" "math/rand" ) @@ -19,3 +20,8 @@ func GenerateRandomHash() [sha256.Size]byte { rs := generateRandomString(10) //nolint:gomnd return sha256.Sum256([]byte(rs)) } + +func ToJson(x any) string { + b, _ := json.Marshal(x) + return string(b) +} From af4598c0673d4a29a9c2f1d4782ea4ebc9e7f62d Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Tue, 9 Jul 2024 15:16:28 +0800 Subject: [PATCH 4/6] Use Apollo config change listener --- config/apolloconfig/apollo.go | 6 +---- config/apolloconfig/apollo_test.go | 1 - config/apolloconfig/listener.go | 35 +++++++++++++++++----------- config/apolloconfig/listener_test.go | 16 +++++++------ server/iprestriction/client.go | 6 +++++ server/tokenlogoinfo/Client.go | 6 +++++ utils/innerchainIdmanager.go | 29 ++++++++++++++++++++++- utils/messagebridge/processor.go | 28 ++++++++++++++++++++++ utils/messagebridge/usdclxly.go | 22 +++++++++++++++++ utils/messagebridge/wsteth.go | 22 +++++++++++++++++ 10 files changed, 144 insertions(+), 27 deletions(-) diff --git a/config/apolloconfig/apollo.go b/config/apolloconfig/apollo.go index 26ebd05c..4a79249a 100644 --- a/config/apolloconfig/apollo.go +++ b/config/apolloconfig/apollo.go @@ -13,9 +13,7 @@ import ( ) var ( - enabled = false - disableEntryDebugLog = false - defaultClient *agollo.Client + defaultClient *agollo.Client textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() ) @@ -31,7 +29,6 @@ func GetClient() *agollo.Client { // Init initializes the connection to the Apollo server // This should not be called if Apollo config is disabled for the service func Init(c Config) error { - enabled = true SetLogger() cfg := &agolloConfig.AppConfig{ AppID: c.AppID, @@ -52,7 +49,6 @@ func Init(c Config) error { client.AddChangeListener(GetDefaultListener()) defaultClient = client - disableEntryDebugLog = c.DisableEntryDebugLog return nil } diff --git a/config/apolloconfig/apollo_test.go b/config/apolloconfig/apollo_test.go index e3c1303a..2498fbec 100644 --- a/config/apolloconfig/apollo_test.go +++ b/config/apolloconfig/apollo_test.go @@ -52,7 +52,6 @@ func TestLoad(t *testing.T) { }, } - enabled = true getString = func(key string) (string, error) { s, ok := resultMapping[key] if !ok { diff --git a/config/apolloconfig/listener.go b/config/apolloconfig/listener.go index ff091d42..f1925e64 100644 --- a/config/apolloconfig/listener.go +++ b/config/apolloconfig/listener.go @@ -37,10 +37,6 @@ func (l *ConfigChangeListener) OnChange(event *storage.ChangeEvent) { getLogger().Debugf("ConfigChangeListener#OnChange received: %v", toJson(event)) for key, change := range event.Changes { - // Only handle ADDED and MODIFIED type - if change.ChangeType == storage.DELETED { - continue - } for _, handler := range l.changeHandlers[key] { handler.handle(change, key) } @@ -63,9 +59,10 @@ func (l *ConfigChangeListener) RegisterHandler(key string, opts ...handlerOpt) { // changeHandler contains the information for handling the config change for one config, in a specific context type changeHandler struct { - obj any - callbackFn func(key string, change *storage.ConfigChange) - locker sync.Locker + obj any + beforeFn func(key string, change *storage.ConfigChange) + afterFn func(key string, change *storage.ConfigChange, obj any) + locker sync.Locker } func (h *changeHandler) handle(change *storage.ConfigChange, key string) { @@ -74,15 +71,20 @@ func (h *changeHandler) handle(change *storage.ConfigChange, key string) { defer h.locker.Unlock() } - if h.obj != nil { + if h.beforeFn != nil { + h.beforeFn(key, change) + } + + if h.obj != nil && change.ChangeType != storage.DELETED { + // Only update the object if change is ADDED or MODIFIED err := decodeStringToObject(change.NewValue.(string), h.obj) if err != nil { getLogger().WithFields("key", key).Errorf("changeHandler#handle decodeStringToObject error: %v", err) } } - if h.callbackFn != nil { - h.callbackFn(key, change) + if h.afterFn != nil { + h.afterFn(key, change, h.obj) } } @@ -97,10 +99,17 @@ func withConfigObj[T any](obj *T) handlerOpt { } } -// WithCallbackFn assigns a callback function that will be called when a config key is changed -func WithCallbackFn(callbackFn func(string, *storage.ConfigChange)) handlerOpt { +// WithBeforeFn assigns a function to be called before the config object is updated +func WithBeforeFn(beforeFn func(string, *storage.ConfigChange)) handlerOpt { + return func(handler *changeHandler) { + handler.beforeFn = beforeFn + } +} + +// WithAfterFn assigns a function to be called after the config object is updated +func WithAfterFn(afterFn func(string, *storage.ConfigChange, any)) handlerOpt { return func(handler *changeHandler) { - handler.callbackFn = callbackFn + handler.afterFn = afterFn } } diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go index 970e11b4..5c774ef9 100644 --- a/config/apolloconfig/listener_test.go +++ b/config/apolloconfig/listener_test.go @@ -28,7 +28,6 @@ func TestConfigChangeListener(t *testing.T) { "sub": `{"C":0.55, "E": "e1"}`, } - enabled = true getString = func(key string) (string, error) { s, ok := configMapping[key] if !ok { @@ -47,7 +46,10 @@ func TestConfigChangeListener(t *testing.T) { } cnt := make(map[string]int) - callback := func(key string, _ *storage.ConfigChange) { + before := func(key string, _ *storage.ConfigChange) { + cnt[key]++ + } + after := func(key string, _ *storage.ConfigChange, _ any) { cnt[key]++ } @@ -58,11 +60,11 @@ func TestConfigChangeListener(t *testing.T) { var stringField = s.A mutex := &sync.Mutex{} - RegisterChangeHandler("stringField", &stringField, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("stringField", &s.A, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("sub", &s.B, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("e", &s.B.E, WithCallbackFn(callback), WithLocker(mutex)) - RegisterChangeHandler("mp", &s.B.D, WithCallbackFn(callback), WithLocker(mutex)) + RegisterChangeHandler("stringField", &stringField, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("stringField", &s.A, WithBeforeFn(before), WithLocker(mutex)) + RegisterChangeHandler("sub", &s.B, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("e", &s.B.E, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("mp", &s.B.D, WithBeforeFn(before), WithLocker(mutex)) listener := GetDefaultListener() listener.OnChange(&storage.ChangeEvent{ diff --git a/server/iprestriction/client.go b/server/iprestriction/client.go index aee82759..adcb87b5 100644 --- a/server/iprestriction/client.go +++ b/server/iprestriction/client.go @@ -6,8 +6,10 @@ import ( "net/http" "net/url" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" + "github.com/apolloconfig/agollo/v4/storage" ) type Client struct { @@ -36,6 +38,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "IPRestriction", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func GetClient() *Client { diff --git a/server/tokenlogoinfo/Client.go b/server/tokenlogoinfo/Client.go index 2b7e4abd..4ef3eaa8 100644 --- a/server/tokenlogoinfo/Client.go +++ b/server/tokenlogoinfo/Client.go @@ -10,9 +10,11 @@ import ( "strconv" "strings" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/models/tokenlogo" "github.com/0xPolygonHermez/zkevm-bridge-service/nacos" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/apolloconfig/agollo/v4/storage" ) const ( @@ -48,6 +50,10 @@ func InitClient(c Config) { Timeout: c.Timeout.Duration, }, } + apolloconfig.RegisterChangeHandler( + "TokenLogoServiceConfig", + &client.cfg, + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func (c *Client) GetTokenLogoInfos(tokenAddArr []*tokenlogo.QueryLogoParam) (map[string]tokenlogo.LogoInfo, error) { diff --git a/utils/innerchainIdmanager.go b/utils/innerchainIdmanager.go index ce18e2ad..b3296852 100644 --- a/utils/innerchainIdmanager.go +++ b/utils/innerchainIdmanager.go @@ -1,13 +1,34 @@ package utils import ( + "sync" + + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" ) -var standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 +var ( + standardIdKeyMapper, innerIdKeyMapper map[uint64]uint64 + chainIDMapperLock = &sync.RWMutex{} +) func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { + initOkInnerChainIdMapper(cfg) + + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, _ *storage.ConfigChange, c any) { + initOkInnerChainIdMapper(*c.(*businessconfig.Config)) + })) +} + +func initOkInnerChainIdMapper(cfg businessconfig.Config) { + chainIDMapperLock.Lock() + defer chainIDMapperLock.Unlock() + standardIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) innerIdKeyMapper = make(map[uint64]uint64, len(cfg.StandardChainIds)) if cfg.StandardChainIds == nil { @@ -22,6 +43,9 @@ func InnitOkInnerChainIdMapper(cfg businessconfig.Config) { } func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + chainId, found := innerIdKeyMapper[innerChainId] if !found { return innerChainId @@ -30,6 +54,9 @@ func GetStandardChainIdByInnerId(innerChainId uint64) uint64 { } func GetInnerChainIdByStandardId(chainId uint64) uint64 { + chainIDMapperLock.RLock() + defer chainIDMapperLock.RUnlock() + innerChainId, found := standardIdKeyMapper[chainId] if !found { return chainId diff --git a/utils/messagebridge/processor.go b/utils/messagebridge/processor.go index 237cdd67..9429467b 100644 --- a/utils/messagebridge/processor.go +++ b/utils/messagebridge/processor.go @@ -2,6 +2,7 @@ package messagebridge import ( "math/big" + "sync" "github.com/0xPolygonHermez/zkevm-bridge-service/etherman" "github.com/0xPolygonHermez/zkevm-bridge-service/utils" @@ -18,12 +19,14 @@ const ( var ( emptyAddress = common.Address{} processorMap = make(map[ProcessorType]*Processor) + mutexMap = make(map[ProcessorType]*sync.RWMutex) ) // Processor hosts the processing functions for an LxLy bridge using the message bridge feature // Each Processor object should be used for one type of bridged token only // Current supported tokens: USDC, wstETH type Processor struct { + pType ProcessorType contractToTokenMapping map[common.Address]common.Address // DecodeMetadata decodes the metadata of the message bridge, returns the actual destination address and bridged amount DecodeMetadataFn func(metadata []byte) (common.Address, *big.Int) @@ -31,6 +34,10 @@ type Processor struct { // GetContractAddressList returns the list of contract addresses that need to be processed through this struct func (u *Processor) GetContractAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for addr := range u.contractToTokenMapping { result = append(result, addr) @@ -40,6 +47,10 @@ func (u *Processor) GetContractAddressList() []common.Address { // GetTokenAddressList returns the list of original token addresses func (u *Processor) GetTokenAddressList() []common.Address { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + result := make([]common.Address, 0) for _, addr := range u.contractToTokenMapping { result = append(result, addr) @@ -49,6 +60,10 @@ func (u *Processor) GetTokenAddressList() []common.Address { // CheckContractAddress returns true if the input address is in the contract address list of this bridge func (u *Processor) CheckContractAddress(address common.Address) bool { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if _, ok := u.contractToTokenMapping[address]; ok { return true } @@ -57,6 +72,10 @@ func (u *Processor) CheckContractAddress(address common.Address) bool { // GetTokenFromContract return the token address from the bridge contract address, for displaying func (u *Processor) GetTokenFromContract(contractAddress common.Address) (common.Address, bool) { + mutex := getMutex(u.pType) + mutex.RLock() + defer mutex.RUnlock() + if token, ok := u.contractToTokenMapping[contractAddress]; ok { return token, true } @@ -131,3 +150,12 @@ func ReplaceDepositInfo(deposit *etherman.Deposit, overwriteOrigNetworkID bool) } processor.ReplaceDepositInfo(deposit, overwriteOrigNetworkID) } + +func getMutex(tp ProcessorType) *sync.RWMutex { + mutex := mutexMap[tp] + if mutex == nil { + mutex = &sync.RWMutex{} + mutexMap[tp] = mutex + } + return mutex +} diff --git a/utils/messagebridge/usdclxly.go b/utils/messagebridge/usdclxly.go index 92c0ebf5..c1f86e0f 100644 --- a/utils/messagebridge/usdclxly.go +++ b/utils/messagebridge/usdclxly.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.USDCContractAddresses) == 0 || len(cfg.USDCContractAddresses) == 0 { + delete(processorMap, USDC) + return + } + initUSDCLxLyProcessor(cfg.USDCContractAddresses, cfg.USDCTokenAddresses) + })) +} + +func initUSDCLxLyProcessor(usdcContractAddresses, usdcTokenAddresses []common.Address) { + mutex := getMutex(USDC) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("USDCLxLyMapping: contracts[%v] tokens[%v]", usdcContractAddresses, usdcTokenAddresses) if len(usdcContractAddresses) != len(usdcTokenAddresses) { log.Errorf("InitUSDCLxLyProcessor: contract addresses (%v) and token addresses (%v) have different length", len(usdcContractAddresses), len(usdcTokenAddresses)) diff --git a/utils/messagebridge/wsteth.go b/utils/messagebridge/wsteth.go index 7d8f27af..059ecbee 100644 --- a/utils/messagebridge/wsteth.go +++ b/utils/messagebridge/wsteth.go @@ -3,11 +3,33 @@ package messagebridge import ( "math/big" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/apolloconfig" + "github.com/0xPolygonHermez/zkevm-bridge-service/config/businessconfig" "github.com/0xPolygonHermez/zkevm-bridge-service/log" + "github.com/apolloconfig/agollo/v4/storage" "github.com/ethereum/go-ethereum/common" ) func InitWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses) + apolloconfig.RegisterChangeHandler( + "BusinessConfig", + &businessconfig.Config{}, + apolloconfig.WithAfterFn(func(_ string, change *storage.ConfigChange, c any) { + cfg := c.(*businessconfig.Config) + if change.ChangeType == storage.DELETED || len(cfg.WstETHContractAddresses) == 0 || len(cfg.WstETHTokenAddresses) == 0 { + delete(processorMap, WstETH) + return + } + initUSDCLxLyProcessor(cfg.WstETHContractAddresses, cfg.WstETHTokenAddresses) + })) +} + +func initWstETHProcessor(wstETHContractAddresses, wstETHTokenAddresses []common.Address) { + mutex := getMutex(WstETH) + mutex.Lock() + defer mutex.Unlock() + log.Debugf("WstETHMapping: contracts[%v] tokens[%v]", wstETHContractAddresses, wstETHTokenAddresses) if len(wstETHContractAddresses) != len(wstETHTokenAddresses) { log.Errorf("InitWstETHProcessor: contract addresses (%v) and token addresses (%v) have different length", len(wstETHContractAddresses), len(wstETHTokenAddresses)) From 3e0692972dcb22b7bbd676f59a0ea369235a1c57 Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Tue, 9 Jul 2024 15:25:16 +0800 Subject: [PATCH 5/6] Fix --- server/tokenlogoinfo/Client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tokenlogoinfo/Client.go b/server/tokenlogoinfo/Client.go index 4ef3eaa8..638988df 100644 --- a/server/tokenlogoinfo/Client.go +++ b/server/tokenlogoinfo/Client.go @@ -53,7 +53,7 @@ func InitClient(c Config) { apolloconfig.RegisterChangeHandler( "TokenLogoServiceConfig", &client.cfg, - apolloconfig.WithAfterFn(func(string, *storage.ConfigChange) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) + apolloconfig.WithAfterFn(func(string, *storage.ConfigChange, any) { client.httpClient.Timeout = client.cfg.Timeout.Duration })) } func (c *Client) GetTokenLogoInfos(tokenAddArr []*tokenlogo.QueryLogoParam) (map[string]tokenlogo.LogoInfo, error) { From 1d82b3e92367158e3574afcf75529d06f7eaa9c7 Mon Sep 17 00:00:00 2001 From: trunghai95 Date: Tue, 9 Jul 2024 15:36:26 +0800 Subject: [PATCH 6/6] Add test case for config listener --- config/apolloconfig/listener_test.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/config/apolloconfig/listener_test.go b/config/apolloconfig/listener_test.go index 5c774ef9..9003aaa7 100644 --- a/config/apolloconfig/listener_test.go +++ b/config/apolloconfig/listener_test.go @@ -49,9 +49,6 @@ func TestConfigChangeListener(t *testing.T) { before := func(key string, _ *storage.ConfigChange) { cnt[key]++ } - after := func(key string, _ *storage.ConfigChange, _ any) { - cnt[key]++ - } var s StructTest err := Load(&s) @@ -60,12 +57,20 @@ func TestConfigChangeListener(t *testing.T) { var stringField = s.A mutex := &sync.Mutex{} - RegisterChangeHandler("stringField", &stringField, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("stringField", &stringField, WithBeforeFn(before), WithLocker(mutex)) RegisterChangeHandler("stringField", &s.A, WithBeforeFn(before), WithLocker(mutex)) - RegisterChangeHandler("sub", &s.B, WithAfterFn(after), WithLocker(mutex)) - RegisterChangeHandler("e", &s.B.E, WithAfterFn(after), WithLocker(mutex)) + RegisterChangeHandler("sub", &s.B, WithBeforeFn(before), WithLocker(mutex), + WithAfterFn(func(_ string, _ *storage.ConfigChange, obj any) { + b, ok := obj.(*SubStruct) + require.True(t, ok) + require.Equal(t, expected.B, *b) + })) + RegisterChangeHandler("e", &s.B.E, WithBeforeFn(before), WithLocker(mutex)) RegisterChangeHandler("mp", &s.B.D, WithBeforeFn(before), WithLocker(mutex)) + expected.A = "bbb" + expected.B.C = 1.5 + expected.B.E = "e2" listener := GetDefaultListener() listener.OnChange(&storage.ChangeEvent{ Changes: map[string]*storage.ConfigChange{ @@ -79,14 +84,13 @@ func TestConfigChangeListener(t *testing.T) { }, }, }) - expected.A = "bbb" - expected.B.C = 1.5 - expected.B.E = "e2" require.Equal(t, expected, s) require.Equal(t, "bbb", stringField) require.Equal(t, 2, cnt["stringField"]) require.Equal(t, 1, cnt["sub"]) + expected.A = "ccc" + expected.B.E = "e3" listener.OnChange(&storage.ChangeEvent{ Changes: map[string]*storage.ConfigChange{ "stringField": { @@ -99,8 +103,6 @@ func TestConfigChangeListener(t *testing.T) { }, }, }) - expected.A = "ccc" - expected.B.E = "e3" require.Equal(t, expected, s) require.Equal(t, "ccc", stringField) require.Equal(t, 4, cnt["stringField"]) @@ -119,6 +121,7 @@ func TestConfigChangeListener(t *testing.T) { require.Equal(t, expected, s) require.Equal(t, 1, cnt["mp"]) + expected.B.D = map[string]bool{"z": false} listener.OnChange(&storage.ChangeEvent{ Changes: map[string]*storage.ConfigChange{ "mp": { @@ -127,7 +130,6 @@ func TestConfigChangeListener(t *testing.T) { }, }, }) - expected.B.D = map[string]bool{"z": false} require.Equal(t, expected, s) require.Equal(t, 2, cnt["mp"]) }