From 7ac677d582c84aa7ea597af7c04cd9325eab5449 Mon Sep 17 00:00:00 2001 From: spacewander Date: Tue, 12 Mar 2024 19:46:31 +0800 Subject: [PATCH] init mcp output Signed-off-by: spacewander --- controller/Makefile | 3 + controller/cmd/main.go | 30 ++- controller/internal/config/config.go | 12 ++ controller/internal/config/config_test.go | 2 + .../internal/config/testdata/config.yaml | 2 + .../controller/consumer_controller.go | 52 +---- .../controller/httpfilterpolicy_controller.go | 68 +------ .../internal/controller/output/k8s_output.go | 162 +++++++++++++++ .../internal/controller/output/mcp_output.go | 126 ++++++++++++ .../internal/controller/output/mcp_server.go | 185 ++++++++++++++++++ .../controller/output/service_entry_syncer.go | 182 +++++++++++++++++ .../output/service_entry_syncer_test.go} | 6 +- controller/internal/model/constant.go | 3 +- controller/internal/registry/registry.go | 7 +- controller/internal/registry/store.go | 152 ++------------ controller/pkg/procession/output.go | 40 ++++ controller/tests/benchmark/suite_test.go | 5 + .../integration/controller/suite_test.go | 9 +- controller/tests/integration/helper/k8s.go | 23 +++ controller/tests/integration/helper/mcp.go | 36 ++++ .../tests/integration/helper/mcp_client.go | 169 ++++++++++++++++ .../integration/registries/suite_test.go | 4 +- 22 files changed, 1018 insertions(+), 260 deletions(-) create mode 100644 controller/internal/controller/output/k8s_output.go create mode 100644 controller/internal/controller/output/mcp_output.go create mode 100644 controller/internal/controller/output/mcp_server.go create mode 100644 controller/internal/controller/output/service_entry_syncer.go rename controller/internal/{registry/store_test.go => controller/output/service_entry_syncer_test.go} (94%) create mode 100644 controller/pkg/procession/output.go create mode 100644 controller/tests/integration/helper/k8s.go create mode 100644 controller/tests/integration/helper/mcp.go create mode 100644 controller/tests/integration/helper/mcp_client.go diff --git a/controller/Makefile b/controller/Makefile index 4121cc61f..4967a0746 100644 --- a/controller/Makefile +++ b/controller/Makefile @@ -79,6 +79,9 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and test: manifests generate envtest ## Run tests. KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -gcflags="all=-N -l" -race \ ./... -coverprofile cover.out -covermode=atomic -coverpkg=./... +# rerun controller integration test with different output + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -tags mcp_output -gcflags="all=-N -l" -race \ + ./tests/integration/controller -coverprofile cover.out -covermode=atomic -coverpkg=./... .PHONY: benchmark benchmark: manifests generate envtest ## Run benchmarks diff --git a/controller/cmd/main.go b/controller/cmd/main.go index 443ebbd69..cb631097b 100644 --- a/controller/cmd/main.go +++ b/controller/cmd/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "fmt" "os" istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme" @@ -34,7 +35,9 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" "mosn.io/htnn/controller/internal/registry" + "mosn.io/htnn/controller/pkg/procession" "mosn.io/htnn/pkg/log" ) @@ -69,8 +72,17 @@ func main() { flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + + var outputDest string + flag.StringVar(&outputDest, "output", "k8s", "The output destination of reconciliation result, mcp or k8s. Default is k8s.") + flag.Parse() + if outputDest != "mcp" && outputDest != "k8s" { + setupLog.Error(fmt.Errorf("unknown output: %s", outputDest), "unable to start") + os.Exit(1) + } + ctrl.SetLogger(log.DefaultLogger) config.Init() @@ -103,9 +115,22 @@ func main() { os.Exit(1) } + ctx := ctrl.SetupSignalHandler() + var output procession.Output + if outputDest == "k8s" { + output = controlleroutput.NewK8sOutput(mgr.GetClient()) + } else { + output, err = controlleroutput.NewMcpOutput(ctx) + if err != nil { + setupLog.Error(err, "unable to new mcp output") + os.Exit(1) + } + } + if err = (&controller.HTTPFilterPolicyReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + Output: output, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HTTPFilterPolicy") os.Exit(1) @@ -113,13 +138,14 @@ func main() { if err = (&controller.ConsumerReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + Output: output, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Consumer") os.Exit(1) } registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: mgr.GetClient(), + Output: output, }) if err = (&controller.ServiceRegistryReconciler{ Client: mgr.GetClient(), @@ -159,7 +185,7 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/controller/internal/config/config.go b/controller/internal/config/config.go index e7d01fc8e..43d287880 100644 --- a/controller/internal/config/config.go +++ b/controller/internal/config/config.go @@ -24,6 +24,13 @@ var ( logger = log.DefaultLogger.WithName("config") ) +// istio's xds server listen address plus 100 +var mcpServerListenAddress = ":15110" + +func McpServerListenAddress() string { + return mcpServerListenAddress +} + func GoSoPath() string { return "/etc/libgolang.so" } @@ -54,4 +61,9 @@ func Init() { if cfgRootNamespace != "" { rootNamespace = cfgRootNamespace } + + addr := viper.GetString("mcp.listen") + if addr != "" { + mcpServerListenAddress = addr + } } diff --git a/controller/internal/config/config_test.go b/controller/internal/config/config_test.go index ac7092557..3a0b49108 100644 --- a/controller/internal/config/config_test.go +++ b/controller/internal/config/config_test.go @@ -26,9 +26,11 @@ func TestInit(t *testing.T) { // Check default values assert.Equal(t, "istio-system", RootNamespace()) + assert.Equal(t, ":15110", McpServerListenAddress()) viper.AddConfigPath("./testdata") Init() assert.Equal(t, "htnn", RootNamespace()) + assert.Equal(t, ":9989", McpServerListenAddress()) } diff --git a/controller/internal/config/testdata/config.yaml b/controller/internal/config/testdata/config.yaml index 93526a3f0..b1c324589 100644 --- a/controller/internal/config/testdata/config.yaml +++ b/controller/internal/config/testdata/config.yaml @@ -1,2 +1,4 @@ istio: rootNamespace: htnn +mcp: + listen: ":9989" diff --git a/controller/internal/controller/consumer_controller.go b/controller/internal/controller/consumer_controller.go index 9763d5ca1..dae48363c 100644 --- a/controller/internal/controller/consumer_controller.go +++ b/controller/internal/controller/consumer_controller.go @@ -36,18 +36,16 @@ import ( "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/istio" "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" ) // ConsumerReconciler reconciles a Consumer object type ConsumerReconciler struct { client.Client Scheme *runtime.Scheme + Output procession.Output } -const ( - ConsumerEnvoyFilterName = "htnn-consumer" -) - //+kubebuilder:rbac:groups=mosn.io,resources=consumers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=mosn.io,resources=consumers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=mosn.io,resources=consumers/finalizers,verbs=update @@ -68,7 +66,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - err = r.generateCustomResource(ctx, &logger, state) + err = r.generateCustomResource(ctx, state) if err != nil { return ctrl.Result{}, err } @@ -120,7 +118,7 @@ func (r *ConsumerReconciler) consumersToState(ctx context.Context, logger *logr. return state, nil } -func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger *logr.Logger, state *consumerReconcileState) error { +func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, state *consumerReconcileState) error { consumerData := map[string]interface{}{} for ns, consumers := range state.namespaceToConsumers { data := make(map[string]interface{}, len(consumers)) @@ -137,49 +135,13 @@ func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger ef := istio.GenerateConsumers(consumerData) ef.Namespace = config.RootNamespace() - ef.Name = ConsumerEnvoyFilterName + ef.Name = model.ConsumerEnvoyFilterName if ef.Labels == nil { ef.Labels = map[string]string{} } ef.Labels[model.LabelCreatedBy] = "Consumer" - - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - var envoyfilters istiov1a3.EnvoyFilterList - if err := r.List(ctx, &envoyfilters, client.MatchingLabels{model.LabelCreatedBy: "Consumer"}); err != nil { - return fmt.Errorf("failed to list EnvoyFilter: %w", err) - } - - var envoyfilter *istiov1a3.EnvoyFilter - for _, e := range envoyfilters.Items { - if e.Namespace != nsName.Namespace || e.Name != nsName.Name { - logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) - - if err := r.Delete(ctx, e); err != nil { - return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", - err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) - } - } else { - envoyfilter = e - } - } - - if envoyfilter == nil { - logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - - if err := r.Create(ctx, ef.DeepCopy()); err != nil { - return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } else { - logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - - ef = ef.DeepCopy() - ef.SetResourceVersion(envoyfilter.ResourceVersion) - if err := r.Update(ctx, ef); err != nil { - return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } - - return nil + return r.Output.WriteEnvoyFilters(ctx, procession.ConfigSourceConsumer, + map[string]*istiov1a3.EnvoyFilter{model.ConsumerEnvoyFilterName: ef}) } func (r *ConsumerReconciler) updateConsumers(ctx context.Context, consumers *mosniov1.ConsumerList) error { diff --git a/controller/internal/controller/httpfilterpolicy_controller.go b/controller/internal/controller/httpfilterpolicy_controller.go index 0d29b0c82..b754c0ac9 100644 --- a/controller/internal/controller/httpfilterpolicy_controller.go +++ b/controller/internal/controller/httpfilterpolicy_controller.go @@ -25,8 +25,6 @@ import ( "time" "github.com/go-logr/logr" - "google.golang.org/protobuf/proto" - istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" @@ -43,17 +41,17 @@ import ( gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" mosniov1 "mosn.io/htnn/controller/api/v1" - "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/k8s" "mosn.io/htnn/controller/internal/metrics" - "mosn.io/htnn/controller/internal/model" "mosn.io/htnn/controller/internal/translation" + "mosn.io/htnn/controller/pkg/procession" ) // HTTPFilterPolicyReconciler reconciles a HTTPFilterPolicy object type HTTPFilterPolicyReconciler struct { client.Client Scheme *runtime.Scheme + Output procession.Output istioGatewayIndexer *IstioGatewayIndexer k8sGatewayIndexer *K8sGatewayIndexer @@ -102,7 +100,7 @@ func (r *HTTPFilterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Req // We can add a configured concurrency to write to API server in parallel, if // the performance is not good. Note that the API server probably has rate limit. - err = r.translationStateToCustomResource(ctx, &logger, finalState) + err = r.translationStateToCustomResource(ctx, finalState) if err != nil { return ctrl.Result{}, err } @@ -350,65 +348,11 @@ func (r *HTTPFilterPolicyReconciler) policyToTranslationState(ctx context.Contex return initState, nil } -func fillEnvoyFilterMeta(ef *istiov1a3.EnvoyFilter) { - ef.Namespace = config.RootNamespace() - if ef.Labels == nil { - ef.Labels = map[string]string{} - } - ef.Labels[model.LabelCreatedBy] = "HTTPFilterPolicy" -} - -func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context, logger *logr.Logger, +func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context, finalState *translation.FinalState) error { - var envoyfilters istiov1a3.EnvoyFilterList - if err := r.List(ctx, &envoyfilters, - client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"}, - ); err != nil { - return fmt.Errorf("failed to list EnvoyFilter: %w", err) - } - - preEnvoyFilterMap := make(map[string]*istiov1a3.EnvoyFilter, len(envoyfilters.Items)) - for _, e := range envoyfilters.Items { - if _, ok := finalState.EnvoyFilters[e.Name]; !ok || e.Namespace != config.RootNamespace() { - logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) - if err := r.Delete(ctx, e); err != nil { - return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", - err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) - } - } else { - preEnvoyFilterMap[e.Name] = e - } - } - - for _, ef := range finalState.EnvoyFilters { - envoyfilter, ok := preEnvoyFilterMap[ef.Name] - if !ok { - logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - fillEnvoyFilterMeta(ef) - - if err := r.Create(ctx, ef); err != nil { - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - - } else { - if proto.Equal(&envoyfilter.Spec, &ef.Spec) { - continue - } - - logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - fillEnvoyFilterMeta(ef) - // Address metadata.resourceVersion: Invalid value: 0x0 error - ef.SetResourceVersion(envoyfilter.ResourceVersion) - if err := r.Update(ctx, ef); err != nil { - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } - } - - return nil + generatedEnvoyFilters := finalState.EnvoyFilters + return r.Output.WriteEnvoyFilters(ctx, procession.ConfigSourceHTTPFilterPolicy, generatedEnvoyFilters) } func (r *HTTPFilterPolicyReconciler) updatePolicies(ctx context.Context, diff --git a/controller/internal/controller/output/k8s_output.go b/controller/internal/controller/output/k8s_output.go new file mode 100644 index 000000000..22e7a7fd7 --- /dev/null +++ b/controller/internal/controller/output/k8s_output.go @@ -0,0 +1,162 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/proto" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" + "mosn.io/htnn/pkg/log" +) + +type k8sOutput struct { + client.Client + logger logr.Logger + + serviceEntrySyncer *serviceEntrySyncer +} + +func NewK8sOutput(c client.Client) procession.Output { + o := &k8sOutput{ + Client: c, + logger: log.DefaultLogger.WithName("k8s output"), + } + o.serviceEntrySyncer = newServiceEntrySyncer(c, &o.logger) + return o +} + +func fillEnvoyFilterMeta(ef *istiov1a3.EnvoyFilter) { + ef.Namespace = config.RootNamespace() + if ef.Labels == nil { + ef.Labels = map[string]string{} + } + ef.Labels[model.LabelCreatedBy] = "HTTPFilterPolicy" +} + +func (o *k8sOutput) fromHTTPFilterPolicy(ctx context.Context, generatedEnvoyFilters map[string]*istiov1a3.EnvoyFilter) error { + logger := o.logger + + var envoyfilters istiov1a3.EnvoyFilterList + if err := o.List(ctx, &envoyfilters, + client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"}, + ); err != nil { + return fmt.Errorf("failed to list EnvoyFilter: %w", err) + } + + preEnvoyFilterMap := make(map[string]*istiov1a3.EnvoyFilter, len(envoyfilters.Items)) + for _, e := range envoyfilters.Items { + if _, ok := generatedEnvoyFilters[e.Name]; !ok || e.Namespace != config.RootNamespace() { + logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) + if err := o.Delete(ctx, e); err != nil { + return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", + err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) + } + } else { + preEnvoyFilterMap[e.Name] = e + } + } + + for _, ef := range generatedEnvoyFilters { + envoyfilter, ok := preEnvoyFilterMap[ef.Name] + if !ok { + logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + fillEnvoyFilterMeta(ef) + + if err := o.Create(ctx, ef); err != nil { + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + + } else { + if proto.Equal(&envoyfilter.Spec, &ef.Spec) { + continue + } + + logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + fillEnvoyFilterMeta(ef) + // Address metadata.resourceVersion: Invalid value: 0x0 error + ef.SetResourceVersion(envoyfilter.ResourceVersion) + if err := o.Update(ctx, ef); err != nil { + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } + } + + return nil +} + +func (o *k8sOutput) fromConsumer(ctx context.Context, ef *istiov1a3.EnvoyFilter) error { + logger := o.logger + + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + var envoyfilters istiov1a3.EnvoyFilterList + if err := o.List(ctx, &envoyfilters, client.MatchingLabels{model.LabelCreatedBy: "Consumer"}); err != nil { + return fmt.Errorf("failed to list EnvoyFilter: %w", err) + } + + var envoyfilter *istiov1a3.EnvoyFilter + for _, e := range envoyfilters.Items { + if e.Namespace != nsName.Namespace || e.Name != nsName.Name { + logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) + + if err := o.Delete(ctx, e); err != nil { + return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", + err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) + } + } else { + envoyfilter = e + } + } + + if envoyfilter == nil { + logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + + if err := o.Create(ctx, ef.DeepCopy()); err != nil { + return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } else { + logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + + ef = ef.DeepCopy() + ef.SetResourceVersion(envoyfilter.ResourceVersion) + if err := o.Update(ctx, ef); err != nil { + return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } + + return nil +} + +func (o *k8sOutput) WriteEnvoyFilters(ctx context.Context, src procession.ConfigSource, filters map[string]*istiov1a3.EnvoyFilter) error { + if src == procession.ConfigSourceHTTPFilterPolicy { + return o.fromHTTPFilterPolicy(ctx, filters) + } + return o.fromConsumer(ctx, filters[model.ConsumerEnvoyFilterName]) +} + +func (o *k8sOutput) WriteServiceEntries(ctx context.Context, src procession.ConfigSource, serviceEntries map[string]*istioapi.ServiceEntry) { + o.serviceEntrySyncer.Update(ctx, serviceEntries) +} diff --git a/controller/internal/controller/output/mcp_output.go b/controller/internal/controller/output/mcp_output.go new file mode 100644 index 000000000..6fe879475 --- /dev/null +++ b/controller/internal/controller/output/mcp_output.go @@ -0,0 +1,126 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + "net" + "sync" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/pkg/procession" + "mosn.io/htnn/pkg/log" +) + +type mcpOutput struct { + logger *logr.Logger + + // envoyFilters will be updated by multiple sources + envoyFilters sync.Map + + mcp *mcpServer +} + +func NewMcpOutput(ctx context.Context) (procession.Output, error) { + logger := log.DefaultLogger.WithName("mcp output") + s := grpc.NewServer() + + srv := NewMcpServer(&logger) + + discovery.RegisterAggregatedDiscoveryServiceServer(s, srv) + + addr := config.McpServerListenAddress() + logger.Info("listening as mcp server", "address", addr) + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen: %w", err) + } + + go func() { + ctx, cancel := context.WithCancel(ctx) + + go func() { + defer cancel() + + logger.Info("starting mcp server") + + err := s.Serve(l) + if err != nil { + logger.Error(err, "mcp server failed") + } + }() + + <-ctx.Done() + logger.Info("stopping mcp server") + srv.CloseSubscribers() + s.GracefulStop() + logger.Info("mcp server stopped") + }() + + return &mcpOutput{ + logger: &logger, + mcp: srv, + }, nil +} + +func (o *mcpOutput) WriteEnvoyFilters(ctx context.Context, src procession.ConfigSource, filters map[string]*istiov1a3.EnvoyFilter) error { + // Store the converted Any directly can save memory, but we keep the original EnvoyFilter here + // so that we can add observability in the future. + o.envoyFilters.Store(src, filters) + ress := make([]*anypb.Any, 0, len(filters)*2) + ok := true + o.envoyFilters.Range(func(_, value interface{}) bool { + efs := value.(map[string]*istiov1a3.EnvoyFilter) + for name, ef := range efs { + res, err := MarshalToMcpPb(name, &ef.Spec) + if err != nil { + o.logger.Error(err, "failed to marshal EnvoyFilter", "name", name) + // do not push partial configuration, this may cause service unavailable + ok = false + return false + } + ress = append(ress, res) + } + return true + }) + + if ok { + o.mcp.UpdateEnvoyFilters(ress) + } + return nil +} + +func (o *mcpOutput) WriteServiceEntries(ctx context.Context, src procession.ConfigSource, serviceEntries map[string]*istioapi.ServiceEntry) { + ress := make([]*anypb.Any, 0, len(serviceEntries)) + for name, se := range serviceEntries { + res, err := MarshalToMcpPb(name, se) + if err != nil { + o.logger.Error(err, "failed to marshal ServiceEntry", "name", name) + // do not push partial configuration, this may cause service unavailable + return + } + ress = append(ress, res) + } + + o.mcp.UpdateServiceEntries(ress) +} diff --git a/controller/internal/controller/output/mcp_server.go b/controller/internal/controller/output/mcp_server.go new file mode 100644 index 000000000..a54f013e2 --- /dev/null +++ b/controller/internal/controller/output/mcp_server.go @@ -0,0 +1,185 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + envoycfgcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + mcpapi "istio.io/api/mcp/v1alpha1" + + "mosn.io/htnn/controller/internal/config" +) + +func MarshalToMcpPb(name string, src proto.Message) (*anypb.Any, error) { + body := &anypb.Any{} + if err := anypb.MarshalFrom(body, src, proto.MarshalOptions{}); err != nil { + return nil, fmt.Errorf("failed to marshal mcp body: %w", err) + } + + ns := config.RootNamespace() + mcpRes := &mcpapi.Resource{ + Metadata: &mcpapi.Metadata{ + Name: fmt.Sprintf("%s/%s", ns, name), + }, + Body: body, + } + + pb := &anypb.Any{} + if err := anypb.MarshalFrom(pb, mcpRes, proto.MarshalOptions{}); err != nil { + return nil, fmt.Errorf("failed to marshal mcp resource: %w", err) + } + + return pb, nil +} + +type ( + DiscoveryStream = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer + DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer +) + +type mcpServer struct { + logger *logr.Logger + + subscribers sync.Map + nextSubscriberID atomic.Uint64 + + resourceLock sync.RWMutex + envoyFilters []*anypb.Any + serviceEntries []*anypb.Any +} + +func NewMcpServer(logger *logr.Logger) *mcpServer { + return &mcpServer{ + logger: logger, + } +} + +type subscriber struct { + id uint64 + + stream DiscoveryStream + closeStream func() +} + +func (srv *mcpServer) UpdateEnvoyFilters(envoyFilters []*anypb.Any) { + srv.resourceLock.Lock() + srv.envoyFilters = envoyFilters + srv.resourceLock.Unlock() + go func() { + typeUrl := "networking.istio.io/v1alpha3/EnvoyFilter" + srv.sendToSubscribers(typeUrl, envoyFilters) + }() +} + +func (srv *mcpServer) UpdateServiceEntries(serviceEntries []*anypb.Any) { + srv.resourceLock.Lock() + srv.serviceEntries = serviceEntries + srv.resourceLock.Unlock() + go func() { + typeUrl := "networking.istio.io/v1beta1/ServiceEntry" + srv.sendToSubscribers(typeUrl, serviceEntries) + }() +} + +func (srv *mcpServer) send(sub *subscriber, typeUrl string, mcpResources []*anypb.Any) { + if err := sub.stream.Send(&discovery.DiscoveryResponse{ + TypeUrl: typeUrl, + VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10), + Resources: mcpResources, + ControlPlane: &envoycfgcorev3.ControlPlane{ + Identifier: os.Getenv("POD_NAME"), + }, + }); err != nil { + id := sub.id + srv.logger.Error(err, "failed to send to subscriber", "id", id) + // let Istio to retry + sub.closeStream() + srv.subscribers.Delete(id) + } +} + +func (srv *mcpServer) sendToSubscribers(typeUrl string, mcpResources []*anypb.Any) { + srv.resourceLock.Lock() + defer srv.resourceLock.Unlock() + + srv.subscribers.Range(func(key, value any) bool { + srv.logger.Info("sending to subscriber", "id", key, "typeUrl", typeUrl, "length", len(mcpResources)) + srv.send(value.(*subscriber), typeUrl, mcpResources) + return true + }) +} + +func (srv *mcpServer) CloseSubscribers() { + srv.subscribers.Range(func(key, value any) bool { + srv.logger.Info("close subscriber", "id", key) + value.(*subscriber).closeStream() + srv.subscribers.Delete(key) + + return true + }) +} + +func (srv *mcpServer) initSubscriberResource(sub *subscriber) { + srv.resourceLock.Lock() + defer srv.resourceLock.Unlock() + + srv.logger.Info("sending initial conf to subscriber", "id", sub.id) + typeUrl := "networking.istio.io/v1beta1/ServiceEntry" + srv.send(sub, typeUrl, srv.serviceEntries) + typeUrl = "networking.istio.io/v1alpha3/EnvoyFilter" + srv.send(sub, typeUrl, srv.envoyFilters) +} + +// Implement discovery.AggregatedDiscoveryServiceServer + +func (srv *mcpServer) StreamAggregatedResources(downstream DiscoveryStream) error { + ctx, closeStream := context.WithCancel(downstream.Context()) + + sub := &subscriber{ + id: srv.nextSubscriberID.Add(1), + stream: downstream, + closeStream: closeStream, + } + srv.logger.Info("handle new subscriber", "id", sub.id) + + srv.subscribers.Store(sub.id, sub) + + go func() { + srv.initSubscriberResource(sub) + }() + + <-ctx.Done() + return nil +} + +func (srv *mcpServer) DeltaAggregatedResources(downstream DeltaDiscoveryStream) error { + // By now, Istio doesn't support MCP over delta ads + return status.Errorf(codes.Unimplemented, "not implemented") +} + +var _ discovery.AggregatedDiscoveryServiceServer = (*mcpServer)(nil) diff --git a/controller/internal/controller/output/service_entry_syncer.go b/controller/internal/controller/output/service_entry_syncer.go new file mode 100644 index 000000000..028452acf --- /dev/null +++ b/controller/internal/controller/output/service_entry_syncer.go @@ -0,0 +1,182 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "sync" + "time" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/proto" + istioapi "istio.io/api/networking/v1beta1" + istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/model" +) + +type serviceEntrySyncer struct { + client client.Client + logger *logr.Logger + + lock sync.RWMutex + entries map[string]*istiov1b1.ServiceEntry + syncInterval time.Duration +} + +func newServiceEntrySyncer(c client.Client, logger *logr.Logger) *serviceEntrySyncer { + s := &serviceEntrySyncer{ + client: c, + logger: logger, + entries: make(map[string]*istiov1b1.ServiceEntry), + syncInterval: 20 * time.Second, + } + go s.Sync() + return s +} + +func (syncer *serviceEntrySyncer) getFromK8s(ctx context.Context, service string, se *istiov1b1.ServiceEntry) error { + err := syncer.client.Get(ctx, client.ObjectKey{ + Namespace: config.RootNamespace(), + Name: service, + }, se) + return err +} + +func (syncer *serviceEntrySyncer) deleteFromK8s(ctx context.Context, se *istiov1b1.ServiceEntry) { + c := syncer.client + syncer.logger.Info("delete ServiceEntry", "name", se.Name, "namespace", se.Namespace) + err := c.Delete(ctx, se) + if err != nil { + syncer.logger.Error(err, "failed to delete service entry from k8s", "service", se.Name) + return + } +} + +func (syncer *serviceEntrySyncer) addToK8s(ctx context.Context, service string, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { + c := syncer.client + ns := config.RootNamespace() + se := istiov1b1.ServiceEntry{ + Spec: *entry.DeepCopy(), + } + se.Namespace = ns + if se.Labels == nil { + se.Labels = map[string]string{} + } + se.Labels[model.LabelCreatedBy] = "ServiceRegistry" + se.Name = service + + syncer.logger.Info("create ServiceEntry", "name", service, "namespace", ns) + err := c.Create(ctx, &se) + if err != nil { + syncer.logger.Error(err, "failed to create service entry to k8s", "service", service) + } + + return &se +} + +func (syncer *serviceEntrySyncer) updateToK8s(ctx context.Context, se *istiov1b1.ServiceEntry, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { + if proto.Equal(&se.Spec, entry) { + return se + } + + c := syncer.client + syncer.logger.Info("update ServiceEntry", "name", se.Name, "namespace", se.Namespace) + se.SetResourceVersion(se.ResourceVersion) + se.Spec = *entry.DeepCopy() + if err := c.Update(ctx, se); err != nil { + syncer.logger.Error(err, "failed to update service entry to k8s", "service", se.Name) + return se + } + + return se +} + +func (syncer *serviceEntrySyncer) Update(ctx context.Context, entries map[string]*istioapi.ServiceEntry) { + syncer.lock.Lock() + defer syncer.lock.Unlock() + + var obj istiov1b1.ServiceEntry + for service, se := range syncer.entries { + if _, ok := entries[service]; !ok { + syncer.deleteFromK8s(ctx, se) + delete(syncer.entries, service) + } + } + + var latestServiceEntry *istiov1b1.ServiceEntry + for service, se := range entries { + if prev, ok := syncer.entries[service]; !ok { + if err := syncer.getFromK8s(ctx, service, &obj); err != nil { + if !apierrors.IsNotFound(err) { + syncer.logger.Error(err, "failed to get service entry from k8s", "service", service) + return + } + + latestServiceEntry = syncer.addToK8s(ctx, service, se) + } else { + latestServiceEntry = syncer.updateToK8s(ctx, &obj, se) + } + } else { + latestServiceEntry = syncer.updateToK8s(ctx, prev, se) + } + + syncer.entries[service] = latestServiceEntry + } +} + +func (syncer *serviceEntrySyncer) sync() { + syncer.lock.Lock() + defer syncer.lock.Unlock() + + c := syncer.client + ctx := context.Background() + var serviceEntries istiov1b1.ServiceEntryList + err := c.List(ctx, &serviceEntries, client.MatchingLabels{model.LabelCreatedBy: "ServiceRegistry"}) + if err != nil { + syncer.logger.Error(err, "failed to list service entries") + return + } + + persisted := make(map[string]*istiov1b1.ServiceEntry, len(serviceEntries.Items)) + for _, se := range serviceEntries.Items { + if _, ok := syncer.entries[se.Name]; !ok { + syncer.deleteFromK8s(ctx, se) + } else { + persisted[se.Name] = se + } + } + + for service, wrp := range syncer.entries { + entry := &wrp.Spec + if se, ok := persisted[service]; !ok { + syncer.addToK8s(ctx, service, entry) + } else { + syncer.updateToK8s(ctx, se, entry) + } + } +} + +func (syncer *serviceEntrySyncer) Sync() { + // We sync the service entries so we can retry if something wrong happened + ticker := time.NewTicker(syncer.syncInterval) + // For now we don't release the ticker + for range ticker.C { + syncer.sync() + } +} diff --git a/controller/internal/registry/store_test.go b/controller/internal/controller/output/service_entry_syncer_test.go similarity index 94% rename from controller/internal/registry/store_test.go rename to controller/internal/controller/output/service_entry_syncer_test.go index 2ac1df214..5d230d9cc 100644 --- a/controller/internal/registry/store_test.go +++ b/controller/internal/controller/output/service_entry_syncer_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package registry +package output import ( "context" @@ -26,10 +26,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "mosn.io/htnn/controller/tests/pkg" + "mosn.io/htnn/pkg/log" ) func TestSync(t *testing.T) { - store := newServiceEntryStore(pkg.FakeK8sClient(t)) + logger := log.DefaultLogger.WithName("test") + store := newServiceEntrySyncer(pkg.FakeK8sClient(t), &logger) var created, updated, deleted bool diff --git a/controller/internal/model/constant.go b/controller/internal/model/constant.go index 7f1479e1a..6d45692dc 100644 --- a/controller/internal/model/constant.go +++ b/controller/internal/model/constant.go @@ -15,5 +15,6 @@ package model const ( - LabelCreatedBy = "htnn.mosn.io/created-by" + ConsumerEnvoyFilterName = "htnn-consumer" + LabelCreatedBy = "htnn.mosn.io/created-by" ) diff --git a/controller/internal/registry/registry.go b/controller/internal/registry/registry.go index b3a190b63..ae5d50214 100644 --- a/controller/internal/registry/registry.go +++ b/controller/internal/registry/registry.go @@ -16,9 +16,9 @@ package registry import ( "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" mosniov1 "mosn.io/htnn/controller/api/v1" + "mosn.io/htnn/controller/pkg/procession" pkgRegistry "mosn.io/htnn/controller/pkg/registry" "mosn.io/htnn/pkg/log" ) @@ -31,12 +31,11 @@ var ( ) type RegistryManagerOption struct { - Client client.Client + Output procession.Output } func InitRegistryManager(opt *RegistryManagerOption) { - store = newServiceEntryStore(opt.Client) - go store.Sync() + store = newServiceEntryStore(opt.Output) } func UpdateRegistry(registry *mosniov1.ServiceRegistry) error { diff --git a/controller/internal/registry/store.go b/controller/internal/registry/store.go index 35297e1ec..b960c869f 100644 --- a/controller/internal/registry/store.go +++ b/controller/internal/registry/store.go @@ -17,64 +17,39 @@ package registry import ( "context" "sync" - "time" - "google.golang.org/protobuf/proto" istioapi "istio.io/api/networking/v1beta1" - istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "mosn.io/htnn/controller/internal/config" - "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" pkgRegistry "mosn.io/htnn/controller/pkg/registry" ) type serviceEntryStore struct { - client client.Client + output procession.Output lock sync.RWMutex - entries map[string]*istiov1b1.ServiceEntry - - syncInterval time.Duration + entries map[string]*istioapi.ServiceEntry } -func newServiceEntryStore(client client.Client) *serviceEntryStore { +func newServiceEntryStore(output procession.Output) *serviceEntryStore { return &serviceEntryStore{ - client: client, - entries: make(map[string]*istiov1b1.ServiceEntry), - syncInterval: 20 * time.Second, + output: output, + entries: make(map[string]*istioapi.ServiceEntry), } } +// Implement ServiceEntryStore interface + func (store *serviceEntryStore) Update(service string, se *pkgRegistry.ServiceEntryWrapper) { store.lock.Lock() defer store.lock.Unlock() ctx := context.Background() - var obj istiov1b1.ServiceEntry - var latestServiceEntry *istiov1b1.ServiceEntry - - if prev, ok := store.entries[service]; !ok { - if err := store.getFromK8s(ctx, service, &obj); err != nil { - if !apierrors.IsNotFound(err) { - logger.Error(err, "failed to get service entry from k8s", "service", service) - return - } - - latestServiceEntry = store.addToK8s(ctx, service, &se.ServiceEntry) - } else { - latestServiceEntry = store.updateToK8s(ctx, &obj, &se.ServiceEntry) - } - } else { - latestServiceEntry = store.updateToK8s(ctx, prev, &se.ServiceEntry) - } + store.entries[service] = &se.ServiceEntry - store.entries[service] = latestServiceEntry + store.output.WriteServiceEntries(ctx, procession.ConfigSourceServiceRegistry, store.entries) } -// Implement ServiceEntryStore interface - func (store *serviceEntryStore) Delete(service string) { store.lock.Lock() defer store.lock.Unlock() @@ -84,110 +59,5 @@ func (store *serviceEntryStore) Delete(service string) { } delete(store.entries, service) - - ctx := context.Background() - var se istiov1b1.ServiceEntry - if err := store.getFromK8s(ctx, service, &se); err != nil { - logger.Error(err, "failed to get service entry from k8s", "service", service) - return - } - store.deleteFromK8s(ctx, &se) -} - -func (store *serviceEntryStore) getFromK8s(ctx context.Context, service string, se *istiov1b1.ServiceEntry) error { - err := store.client.Get(ctx, client.ObjectKey{ - Namespace: config.RootNamespace(), - Name: service, - }, se) - return err -} - -func (store *serviceEntryStore) deleteFromK8s(ctx context.Context, se *istiov1b1.ServiceEntry) { - c := store.client - logger.Info("delete ServiceEntry", "name", se.Name, "namespace", se.Namespace) - err := c.Delete(ctx, se) - if err != nil { - logger.Error(err, "failed to delete service entry from k8s", "service", se.Name) - return - } -} - -func (store *serviceEntryStore) addToK8s(ctx context.Context, service string, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { - c := store.client - ns := config.RootNamespace() - se := istiov1b1.ServiceEntry{ - Spec: *entry.DeepCopy(), - } - se.Namespace = ns - if se.Labels == nil { - se.Labels = map[string]string{} - } - se.Labels[model.LabelCreatedBy] = "ServiceRegistry" - se.Name = service - - logger.Info("create ServiceEntry", "name", service, "namespace", ns) - err := c.Create(ctx, &se) - if err != nil { - logger.Error(err, "failed to create service entry to k8s", "service", service) - } - - return &se -} - -func (store *serviceEntryStore) updateToK8s(ctx context.Context, se *istiov1b1.ServiceEntry, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { - if proto.Equal(&se.Spec, entry) { - return se - } - - c := store.client - logger.Info("update ServiceEntry", "name", se.Name, "namespace", se.Namespace) - se.SetResourceVersion(se.ResourceVersion) - se.Spec = *entry.DeepCopy() - if err := c.Update(ctx, se); err != nil { - logger.Error(err, "failed to update service entry to k8s", "service", se.Name) - return se - } - - return se -} - -func (store *serviceEntryStore) sync() { - store.lock.Lock() - defer store.lock.Unlock() - - c := store.client - ctx := context.Background() - var serviceEntries istiov1b1.ServiceEntryList - err := c.List(ctx, &serviceEntries, client.MatchingLabels{model.LabelCreatedBy: "ServiceRegistry"}) - if err != nil { - logger.Error(err, "failed to list service entries") - return - } - - persisted := make(map[string]*istiov1b1.ServiceEntry, len(serviceEntries.Items)) - for _, se := range serviceEntries.Items { - if _, ok := store.entries[se.Name]; !ok { - store.deleteFromK8s(ctx, se) - } else { - persisted[se.Name] = se - } - } - - for service, wrp := range store.entries { - entry := &wrp.Spec - if se, ok := persisted[service]; !ok { - store.addToK8s(ctx, service, entry) - } else { - store.updateToK8s(ctx, se, entry) - } - } -} - -func (store *serviceEntryStore) Sync() { - // We sync the service entries so we can retry if something wrong happened - ticker := time.NewTicker(store.syncInterval) - // For now we don't release the ticker - for range ticker.C { - store.sync() - } + store.output.WriteServiceEntries(context.Background(), procession.ConfigSourceServiceRegistry, store.entries) } diff --git a/controller/pkg/procession/output.go b/controller/pkg/procession/output.go new file mode 100644 index 000000000..42cb23348 --- /dev/null +++ b/controller/pkg/procession/output.go @@ -0,0 +1,40 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package procession + +import ( + "context" + + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" +) + +// ConfigSource marks the source of the istio configuration +type ConfigSource int + +const ( + ConfigSourceHTTPFilterPolicy ConfigSource = iota + ConfigSourceConsumer + ConfigSourceServiceRegistry +) + +type Output interface { + // WriteEnvoyFilters writes the generated EnvoyFilters to the output + WriteEnvoyFilters(ctx context.Context, src ConfigSource, filters map[string]*istiov1a3.EnvoyFilter) error + // WriteServiceEntries writes the generated ServiceEntries to the output. Unlike the EnvoyFilter generators, + // the ServiceEntry generators assume the write already succeed, and don't retry on error, + // so the output should handle the retry by themselves. That's why the error is not returned here. + WriteServiceEntries(ctx context.Context, src ConfigSource, serviceEntries map[string]*istioapi.ServiceEntry) +} diff --git a/controller/tests/benchmark/suite_test.go b/controller/tests/benchmark/suite_test.go index c829864af..7bffbe3ec 100644 --- a/controller/tests/benchmark/suite_test.go +++ b/controller/tests/benchmark/suite_test.go @@ -50,6 +50,7 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" ) var cfg *rest.Config @@ -178,9 +179,13 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output, err := controlleroutput.NewMcpOutput(ctx) + Expect(err).ToNot(HaveOccurred()) + httpFilterPolicyReconciler = &controller.HTTPFilterPolicyReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, } err = httpFilterPolicyReconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/controller/tests/integration/controller/suite_test.go b/controller/tests/integration/controller/suite_test.go index 502adafcc..afaf43b70 100644 --- a/controller/tests/integration/controller/suite_test.go +++ b/controller/tests/integration/controller/suite_test.go @@ -44,6 +44,7 @@ import ( "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" "mosn.io/htnn/controller/internal/registry" + "mosn.io/htnn/controller/tests/integration/helper" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -55,6 +56,7 @@ var testEnv *envtest.Environment var ctx context.Context var cancel context.CancelFunc var clientset *kubernetes.Clientset +var outputSuite = helper.OutputSuite{} func ptrstr(s string) *string { return &s @@ -63,7 +65,7 @@ func ptrstr(s string) *string { func TestControllers(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Controller Suite") + RunSpecs(t, fmt.Sprintf("Controller Suite [%s]", outputSuite.Name())) } var _ = BeforeSuite(func() { @@ -129,20 +131,23 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output := outputSuite.Get(ctx, k8sManager.GetClient()) err = (&controller.HTTPFilterPolicyReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) err = (&controller.ConsumerReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: k8sManager.GetClient(), + Output: output, }) err = (&controller.ServiceRegistryReconciler{ Client: k8sManager.GetClient(), diff --git a/controller/tests/integration/helper/k8s.go b/controller/tests/integration/helper/k8s.go new file mode 100644 index 000000000..381077ca4 --- /dev/null +++ b/controller/tests/integration/helper/k8s.go @@ -0,0 +1,23 @@ +//go:build !mcp_output + +package helper + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + controlleroutput "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/pkg/procession" +) + +type OutputSuite struct { +} + +func (o *OutputSuite) Name() string { + return "k8s" +} + +func (o *OutputSuite) Get(ctx context.Context, c client.Client) procession.Output { + return controlleroutput.NewK8sOutput(c) +} diff --git a/controller/tests/integration/helper/mcp.go b/controller/tests/integration/helper/mcp.go new file mode 100644 index 000000000..988c68b79 --- /dev/null +++ b/controller/tests/integration/helper/mcp.go @@ -0,0 +1,36 @@ +//go:build mcp_output + +package helper + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + + controlleroutput "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/pkg/procession" +) + +type OutputSuite struct { +} + +func (o *OutputSuite) Name() string { + return "mcp" +} + +func (o *OutputSuite) Get(ctx context.Context, c client.Client) procession.Output { + output, err := controlleroutput.NewMcpOutput(ctx) + Expect(err).ToNot(HaveOccurred()) + + go func() { + defer GinkgoRecover() + + mc := NewMcpClient(c) + defer mc.Close() + mc.Init() + mc.Handle() + }() + return output +} diff --git a/controller/tests/integration/helper/mcp_client.go b/controller/tests/integration/helper/mcp_client.go new file mode 100644 index 000000000..986479b54 --- /dev/null +++ b/controller/tests/integration/helper/mcp_client.go @@ -0,0 +1,169 @@ +package helper + +import ( + "context" + "errors" + "io" + "strings" + "sync" + "time" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/anypb" + mcpapi "istio.io/api/mcp/v1alpha1" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" +) + +type mcpClient struct { + // Stream is the GRPC connection stream, allowing direct GRPC send operations. + // Set after Dial is called. + stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient + // xds client used to create a stream + client discovery.AggregatedDiscoveryServiceClient + conn *grpc.ClientConn + + lock sync.Mutex + + k8sClient client.Client + // To simulate k8s output in the existing tests, the simplest way is to use the k8s output directly + output procession.Output +} + +func NewMcpClient(cli client.Client) *mcpClient { + c := &mcpClient{ + k8sClient: cli, + output: output.NewK8sOutput(cli), + } + return c +} + +func (c *mcpClient) dial() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, config.McpServerListenAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), // ensure the connection is established + ) + if err != nil { + return err + } + c.conn = conn + return nil +} + +func (c *mcpClient) Init() { + c.lock.Lock() + defer c.lock.Unlock() + + for i := 0; i < 10; i++ { + err := c.dial() + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.client = discovery.NewAggregatedDiscoveryServiceClient(c.conn) + var err error + c.stream, err = c.client.StreamAggregatedResources(context.Background()) + Expect(err).NotTo(HaveOccurred()) + + // For now we don't care about the details in DiscoveryRequest + req := &discovery.DiscoveryRequest{} + err = c.stream.Send(req) + Expect(err).NotTo(HaveOccurred()) +} + +const ( + TypeUrlEnvoyFilter = "networking.istio.io/v1alpha3/EnvoyFilter" + TypeUrlServiceEntry = "networking.istio.io/v1beta1/ServiceEntry" +) + +func (c *mcpClient) Handle() { + for { + var err error + msg, err := c.stream.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + Expect(err).NotTo(HaveOccurred()) + } + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + switch msg.TypeUrl { + case TypeUrlEnvoyFilter: + efs := map[string]*istiov1a3.EnvoyFilter{} + for _, resource := range msg.Resources { + ef := c.convertAnyToEnvoyFilter(resource) + efs[ef.Name] = ef + } + if _, ok := efs[model.ConsumerEnvoyFilterName]; ok { + c.output.WriteEnvoyFilters(ctx, procession.ConfigSourceConsumer, efs) + } else { + c.output.WriteEnvoyFilters(ctx, procession.ConfigSourceHTTPFilterPolicy, efs) + } + case TypeUrlServiceEntry: + ses := map[string]*istioapi.ServiceEntry{} + for _, resource := range msg.Resources { + se := c.convertAnyToServiceEntry(resource) + ses[se.Name] = &se.Spec + } + c.output.WriteServiceEntries(ctx, procession.ConfigSourceServiceRegistry, ses) + default: + Expect(false).To(BeTrue(), "unknown type url: %s", msg.TypeUrl) + } + } +} + +func (c *mcpClient) convertAnyToEnvoyFilter(res *anypb.Any) *istiov1a3.EnvoyFilter { + mcpRes := &mcpapi.Resource{} + err := res.UnmarshalTo(mcpRes) + Expect(err).NotTo(HaveOccurred()) + + ef := &istiov1a3.EnvoyFilter{} + ss := strings.Split(mcpRes.Metadata.Name, "/") + ef.SetNamespace(ss[0]) + ef.SetName(ss[1]) + err = mcpRes.Body.UnmarshalTo(&ef.Spec) + Expect(err).NotTo(HaveOccurred()) + return ef +} + +func (c *mcpClient) convertAnyToServiceEntry(res *anypb.Any) *istiov1b1.ServiceEntry { + mcpRes := &mcpapi.Resource{} + err := res.UnmarshalTo(mcpRes) + Expect(err).NotTo(HaveOccurred()) + + se := &istiov1b1.ServiceEntry{} + ss := strings.Split(mcpRes.Metadata.Name, "/") + se.SetNamespace(ss[0]) + se.SetName(ss[1]) + err = mcpRes.Body.UnmarshalTo(&se.Spec) + Expect(err).NotTo(HaveOccurred()) + return se +} + +func (c *mcpClient) Close() { + c.lock.Lock() + defer c.lock.Unlock() + + if c.conn == nil { + return + } + c.conn.Close() +} diff --git a/controller/tests/integration/registries/suite_test.go b/controller/tests/integration/registries/suite_test.go index 95aa8203b..6df42763b 100644 --- a/controller/tests/integration/registries/suite_test.go +++ b/controller/tests/integration/registries/suite_test.go @@ -43,6 +43,7 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" "mosn.io/htnn/controller/internal/registry" ) @@ -125,8 +126,9 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output := controlleroutput.NewK8sOutput(k8sManager.GetClient()) registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: k8sManager.GetClient(), + Output: output, }) err = (&controller.ServiceRegistryReconciler{ Client: k8sManager.GetClient(),