Skip to content

Commit

Permalink
dynamicconfig: implement data plane (2/n)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Sep 6, 2024
1 parent 3dbd5a2 commit 8008026
Show file tree
Hide file tree
Showing 24 changed files with 940 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ issues:
- path: registries/ # ditto
linters:
- forcetypeassert
- path: dynamicconfigs/ # ditto
linters:
- forcetypeassert
# Show the complete output
max-issues-per-linter: 0
max-same-issues: 0
154 changes: 154 additions & 0 deletions api/pkg/dynamicconfig/dynamicconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dynamicconfig

import (
"errors"
"fmt"

xds "github.com/cncf/xds/go/xds/type/v3"
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"

"mosn.io/htnn/api/internal/proto"
"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/log"
)

var (
logger = log.DefaultLogger.WithName("dynamicconfig")

dynamicConfigProviders = map[string]DynamicConfigProvider{}
dynamicConfigHandlers = map[string]DynamicConfigHandler{}
)

type dynamicConfigFilter struct {
capi.PassThroughStreamFilter

callbacks capi.FilterCallbackHandler
}

func DynamicConfigFactory(c interface{}) capi.StreamFilterFactory {
return func(callbacks capi.FilterCallbackHandler) capi.StreamFilter {
return &dynamicConfigFilter{
callbacks: callbacks,

Check warning on line 47 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L44-L47

Added lines #L44 - L47 were not covered by tests
}
}
}

type DynamicConfigParser struct {
}

func (p *DynamicConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}

placeholder := &struct{}{}
// No configuration
if any.GetTypeUrl() == "" {
return placeholder, nil
}

if err := any.UnmarshalTo(configStruct); err != nil {
return nil, err
}

if configStruct.Value == nil {
return nil, errors.New("bad TypedStruct format")
}

fields := configStruct.Value.GetFields()
name := fields["name"].GetStringValue()
cfg := fields["config"]
if name == "" || cfg == nil {
return nil, fmt.Errorf("invalid dynamic config format: %s", configStruct.Value.String())
}

cb, ok := dynamicConfigHandlers[name]
if !ok {
// ignore unknown dynamic config as like ignoring unknown plugin
api.LogInfof("no callback for dynamic config %s", name)
return placeholder, nil
}

conf := cb.Config()
data, err := cfg.MarshalJSON()
if err != nil {
return nil, err

Check warning on line 89 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L89

Added line #L89 was not covered by tests
}

api.LogInfof("receive dynamic config %s, configuration: %s", name, data)
err = proto.UnmarshalJSON(data, conf)
if err != nil {
return nil, err

Check warning on line 95 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L95

Added line #L95 was not covered by tests
}

err = conf.Validate()
if err != nil {
return nil, err

Check warning on line 100 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L100

Added line #L100 was not covered by tests
}

err = cb.OnUpdate(conf)
if err != nil {
return nil, err

Check warning on line 105 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L105

Added line #L105 was not covered by tests
}

return placeholder, nil
}

func (p *DynamicConfigParser) Merge(parent interface{}, child interface{}) interface{} {
return child

Check warning on line 112 in api/pkg/dynamicconfig/dynamicconfig.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/dynamicconfig/dynamicconfig.go#L111-L112

Added lines #L111 - L112 were not covered by tests
}

type DynamicConfig interface {
ProtoReflect() protoreflect.Message
Validate() error
}

type DynamicConfigProvider interface {
Config() DynamicConfig
}

type DynamicConfigHandler interface {
DynamicConfigProvider

OnUpdate(config any) error
}

// We extra RegisterDynamicConfigProvider out of RegisterDynamicConfigHandler, so that
// the control plane can register the definition of the DynamicConfigHandler, and only the
// data plane needs to know the implementation. Of course, you can also call
// RegisterDynamicConfigHandler only, which is more convenient for the developer.

func RegisterDynamicConfigProvider(name string, c DynamicConfigProvider) {
if _, ok := dynamicConfigHandlers[name]; !ok {
// As RegisterDynamicConfigHandler also calls RegisterDynamicConfigProvider, we only log for the first time.
// Otherwise, we will log twice for the load in the data plane.
logger.Info("register dynamic config provider", "name", name)
}
dynamicConfigProviders[name] = c
}

func LoadDynamicConfigProvider(name string) DynamicConfigProvider {
return dynamicConfigProviders[name]
}

func RegisterDynamicConfigHandler(name string, c DynamicConfigHandler) {
logger.Info("register dynamic config handler", "name", name)

dynamicConfigHandlers[name] = c
// We don't force developer to divide their dynamic configs into two parts for better DX.
RegisterDynamicConfigProvider(name, c)
}
86 changes: 86 additions & 0 deletions api/pkg/dynamicconfig/dynamicconfig_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dynamicconfig

import (
"testing"

xds "github.com/cncf/xds/go/xds/type/v3"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"mosn.io/htnn/api/internal/proto"
_ "mosn.io/htnn/api/plugins/tests/pkg/envoy" // mock log
)

func TestParse(t *testing.T) {
ts := xds.TypedStruct{}
ts.Value, _ = structpb.NewStruct(map[string]interface{}{})
any1 := proto.MessageToAny(&ts)
any2 := proto.MessageToAny(&xds.TypedStruct{})
tsNoCb := xds.TypedStruct{}
tsNoCb.Value, _ = structpb.NewStruct(map[string]interface{}{
"name": "unknown",
"config": map[string]interface{}{},
})
any3 := proto.MessageToAny(&tsNoCb)

cases := []struct {
name string
input *anypb.Any
err string
}{
{
name: "happy path without config",
input: &anypb.Any{},
},
{
name: "error UnmarshalTo",
input: &anypb.Any{
TypeUrl: "aaa",
},
err: "mismatched message type",
},
{
name: "invalid value",
input: any1,
err: "invalid dynamic config format",
},
{
name: "empty value",
input: any2,
err: "bad TypedStruct format",
},
{
name: "unknown value",
input: any3,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
parser := &DynamicConfigParser{}

_, err := parser.Parse(c.input, nil)
if c.err != "" {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), c.err)
} else {
assert.Nil(t, err)
}
})
}
}
41 changes: 39 additions & 2 deletions api/plugins/tests/integration/dataplane/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (b *bootstrap) SetAccessLogFormat(fmt string) *bootstrap {
return b
}

func (b *bootstrap) WriteTo(cfgFile *os.File) error {
func (b *bootstrap) buildConfiguration() (map[string]interface{}, error) {
var root map[string]interface{}
// check if the input is valid yaml
err := yaml.Unmarshal(boostrapTemplate, &root)
if err != nil {
return err
return nil, err
}

// TODO: simplify it with some third party lib if possible
Expand Down Expand Up @@ -135,10 +135,47 @@ func (b *bootstrap) WriteTo(cfgFile *os.File) error {
}
}

return root, nil
}

func (b *bootstrap) WriteTo(cfgFile *os.File) error {
root, err := b.buildConfiguration()
if err != nil {
return err
}

res, err := yaml.Marshal(&root)
if err != nil {
return err
}

_, err = cfgFile.Write(res)
return err
}

func (b *bootstrap) WriteToForValidation(cfgFile *os.File) error {
root, err := b.buildConfiguration()
if err != nil {
return err
}

for _, l := range root["static_resources"].(map[string]interface{})["listeners"].([]interface{}) {
listener := l.(map[string]interface{})
if listener["name"] == "dynamic_config" {
listener["internal_listener"] = nil
listener["address"] = map[string]interface{}{
"pipe": map[string]interface{}{
"path": "/tmp/fake_socket_to_pass_validation",
},
}
}
}

res, err := yaml.Marshal(&root)
if err != nil {
return err
}

_, err = cfgFile.Write(res)
return err
}
35 changes: 35 additions & 0 deletions api/plugins/tests/integration/dataplane/bootstrap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ node:
id: id
cluster: cluster

bootstrap_extensions:
- name: envoy.bootstrap.internal_listener
typed_config:
"@type": type.googleapis.com/envoy.extensions.bootstrap.internal_listener.v3.InternalListener

static_resources:
listeners:
- name: listener_0
Expand Down Expand Up @@ -170,6 +175,36 @@ static_resources:
end
function envoy_on_response(handle)
end
- name: dynamic_config
internal_listener: {}
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
http_filters:
- name: htnn-consumer
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.golang.v3alpha.Config
library_id: dc
library_path: /etc/libgolang.so
plugin_name: dc
plugin_config:
"@type": type.googleapis.com/xds.type.v3.TypedStruct
value:
config:
key: value
name: demo
- name: envoy.filters.http.router
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
route_config:
name: htnn_dynamic_config
virtual_hosts:
- domains:
- '*'
name: htnn_dynamic_config
stat_prefix: htnn_dynamic_config

clusters:
- name: backend
Expand Down
8 changes: 8 additions & 0 deletions api/plugins/tests/integration/dataplane/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func StartDataPlane(t *testing.T, opt *Option) (*DataPlane, error) {
content, _ := os.ReadFile(cfgFile.Name())
digest := md5.Sum(content)
if _, ok := validationCache[digest]; !ok {
// Workaround for https://github.com/envoyproxy/envoy/issues/35961
// TODO: drop this once we upgrade to Envoy 1.30+
cfgFile, _ := os.Create(cfgFile.Name())
opt.Bootstrap.WriteToForValidation(cfgFile)

validateCmd := cmdline + " " + envoyValidateCmd
cmds := strings.Fields(validateCmd)
logger.Info("run validate cmd", "cmdline", validateCmd)
Expand All @@ -215,6 +220,9 @@ func StartDataPlane(t *testing.T, opt *Option) (*DataPlane, error) {
}

validationCache[digest] = struct{}{}

cfgFile, _ = os.Create(cfgFile.Name())
cfgFile.Write(content)
}

cmdline = cmdline + " " + envoyCmd
Expand Down
Loading

0 comments on commit 8008026

Please sign in to comment.