Skip to content

Commit

Permalink
init mcp output
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Mar 12, 2024
1 parent 086a923 commit 7ac677d
Show file tree
Hide file tree
Showing 22 changed files with 1,018 additions and 260 deletions.
3 changes: 3 additions & 0 deletions controller/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and
test: manifests generate envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -gcflags="all=-N -l" -race \
./... -coverprofile cover.out -covermode=atomic -coverpkg=./...
# rerun controller integration test with different output
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -tags mcp_output -gcflags="all=-N -l" -race \
./tests/integration/controller -coverprofile cover.out -covermode=atomic -coverpkg=./...

.PHONY: benchmark
benchmark: manifests generate envtest ## Run benchmarks
Expand Down
30 changes: 28 additions & 2 deletions controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"fmt"
"os"

istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme"
Expand All @@ -34,7 +35,9 @@ import (
mosniov1 "mosn.io/htnn/controller/api/v1"
"mosn.io/htnn/controller/internal/config"
"mosn.io/htnn/controller/internal/controller"
controlleroutput "mosn.io/htnn/controller/internal/controller/output"
"mosn.io/htnn/controller/internal/registry"
"mosn.io/htnn/controller/pkg/procession"
"mosn.io/htnn/pkg/log"
)

Expand Down Expand Up @@ -69,8 +72,17 @@ func main() {
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")

var outputDest string
flag.StringVar(&outputDest, "output", "k8s", "The output destination of reconciliation result, mcp or k8s. Default is k8s.")

flag.Parse()

if outputDest != "mcp" && outputDest != "k8s" {
setupLog.Error(fmt.Errorf("unknown output: %s", outputDest), "unable to start")
os.Exit(1)
}

ctrl.SetLogger(log.DefaultLogger)

config.Init()
Expand Down Expand Up @@ -103,23 +115,37 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
var output procession.Output
if outputDest == "k8s" {
output = controlleroutput.NewK8sOutput(mgr.GetClient())
} else {
output, err = controlleroutput.NewMcpOutput(ctx)
if err != nil {
setupLog.Error(err, "unable to new mcp output")
os.Exit(1)
}
}

if err = (&controller.HTTPFilterPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Output: output,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HTTPFilterPolicy")
os.Exit(1)
}
if err = (&controller.ConsumerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Output: output,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Consumer")
os.Exit(1)
}

registry.InitRegistryManager(&registry.RegistryManagerOption{
Client: mgr.GetClient(),
Output: output,
})
if err = (&controller.ServiceRegistryReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -159,7 +185,7 @@ func main() {
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
12 changes: 12 additions & 0 deletions controller/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ var (
logger = log.DefaultLogger.WithName("config")
)

// istio's xds server listen address plus 100
var mcpServerListenAddress = ":15110"

func McpServerListenAddress() string {
return mcpServerListenAddress
}

func GoSoPath() string {
return "/etc/libgolang.so"
}
Expand Down Expand Up @@ -54,4 +61,9 @@ func Init() {
if cfgRootNamespace != "" {
rootNamespace = cfgRootNamespace
}

addr := viper.GetString("mcp.listen")
if addr != "" {
mcpServerListenAddress = addr

Check warning on line 67 in controller/internal/config/config.go

View check run for this annotation

Codecov / codecov/patch

controller/internal/config/config.go#L65-L67

Added lines #L65 - L67 were not covered by tests
}
}
2 changes: 2 additions & 0 deletions controller/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func TestInit(t *testing.T) {

// Check default values
assert.Equal(t, "istio-system", RootNamespace())
assert.Equal(t, ":15110", McpServerListenAddress())

viper.AddConfigPath("./testdata")
Init()

assert.Equal(t, "htnn", RootNamespace())
assert.Equal(t, ":9989", McpServerListenAddress())
}
2 changes: 2 additions & 0 deletions controller/internal/config/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
istio:
rootNamespace: htnn
mcp:
listen: ":9989"
52 changes: 7 additions & 45 deletions controller/internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@ import (
"mosn.io/htnn/controller/internal/config"
"mosn.io/htnn/controller/internal/istio"
"mosn.io/htnn/controller/internal/model"
"mosn.io/htnn/controller/pkg/procession"
)

// ConsumerReconciler reconciles a Consumer object
type ConsumerReconciler struct {
client.Client
Scheme *runtime.Scheme
Output procession.Output
}

const (
ConsumerEnvoyFilterName = "htnn-consumer"
)

//+kubebuilder:rbac:groups=mosn.io,resources=consumers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=mosn.io,resources=consumers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=mosn.io,resources=consumers/finalizers,verbs=update
Expand All @@ -68,7 +66,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

err = r.generateCustomResource(ctx, &logger, state)
err = r.generateCustomResource(ctx, state)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -120,7 +118,7 @@ func (r *ConsumerReconciler) consumersToState(ctx context.Context, logger *logr.
return state, nil
}

func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger *logr.Logger, state *consumerReconcileState) error {
func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, state *consumerReconcileState) error {
consumerData := map[string]interface{}{}
for ns, consumers := range state.namespaceToConsumers {
data := make(map[string]interface{}, len(consumers))
Expand All @@ -137,49 +135,13 @@ func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger

ef := istio.GenerateConsumers(consumerData)
ef.Namespace = config.RootNamespace()
ef.Name = ConsumerEnvoyFilterName
ef.Name = model.ConsumerEnvoyFilterName
if ef.Labels == nil {
ef.Labels = map[string]string{}
}
ef.Labels[model.LabelCreatedBy] = "Consumer"

nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace}
var envoyfilters istiov1a3.EnvoyFilterList
if err := r.List(ctx, &envoyfilters, client.MatchingLabels{model.LabelCreatedBy: "Consumer"}); err != nil {
return fmt.Errorf("failed to list EnvoyFilter: %w", err)
}

var envoyfilter *istiov1a3.EnvoyFilter
for _, e := range envoyfilters.Items {
if e.Namespace != nsName.Namespace || e.Name != nsName.Name {
logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace)

if err := r.Delete(ctx, e); err != nil {
return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v",
err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace})
}
} else {
envoyfilter = e
}
}

if envoyfilter == nil {
logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace)

if err := r.Create(ctx, ef.DeepCopy()); err != nil {
return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName)
}
} else {
logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace)

ef = ef.DeepCopy()
ef.SetResourceVersion(envoyfilter.ResourceVersion)
if err := r.Update(ctx, ef); err != nil {
return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName)
}
}

return nil
return r.Output.WriteEnvoyFilters(ctx, procession.ConfigSourceConsumer,
map[string]*istiov1a3.EnvoyFilter{model.ConsumerEnvoyFilterName: ef})
}

func (r *ConsumerReconciler) updateConsumers(ctx context.Context, consumers *mosniov1.ConsumerList) error {
Expand Down
68 changes: 6 additions & 62 deletions controller/internal/controller/httpfilterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
"google.golang.org/protobuf/proto"
istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -43,17 +41,17 @@ import (
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

mosniov1 "mosn.io/htnn/controller/api/v1"
"mosn.io/htnn/controller/internal/config"
"mosn.io/htnn/controller/internal/k8s"
"mosn.io/htnn/controller/internal/metrics"
"mosn.io/htnn/controller/internal/model"
"mosn.io/htnn/controller/internal/translation"
"mosn.io/htnn/controller/pkg/procession"
)

// HTTPFilterPolicyReconciler reconciles a HTTPFilterPolicy object
type HTTPFilterPolicyReconciler struct {
client.Client
Scheme *runtime.Scheme
Output procession.Output

istioGatewayIndexer *IstioGatewayIndexer
k8sGatewayIndexer *K8sGatewayIndexer
Expand Down Expand Up @@ -102,7 +100,7 @@ func (r *HTTPFilterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Req
// We can add a configured concurrency to write to API server in parallel, if
// the performance is not good. Note that the API server probably has rate limit.

err = r.translationStateToCustomResource(ctx, &logger, finalState)
err = r.translationStateToCustomResource(ctx, finalState)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -350,65 +348,11 @@ func (r *HTTPFilterPolicyReconciler) policyToTranslationState(ctx context.Contex
return initState, nil
}

func fillEnvoyFilterMeta(ef *istiov1a3.EnvoyFilter) {
ef.Namespace = config.RootNamespace()
if ef.Labels == nil {
ef.Labels = map[string]string{}
}
ef.Labels[model.LabelCreatedBy] = "HTTPFilterPolicy"
}

func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context, logger *logr.Logger,
func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context,
finalState *translation.FinalState) error {

var envoyfilters istiov1a3.EnvoyFilterList
if err := r.List(ctx, &envoyfilters,
client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"},
); err != nil {
return fmt.Errorf("failed to list EnvoyFilter: %w", err)
}

preEnvoyFilterMap := make(map[string]*istiov1a3.EnvoyFilter, len(envoyfilters.Items))
for _, e := range envoyfilters.Items {
if _, ok := finalState.EnvoyFilters[e.Name]; !ok || e.Namespace != config.RootNamespace() {
logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace)
if err := r.Delete(ctx, e); err != nil {
return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v",
err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace})
}
} else {
preEnvoyFilterMap[e.Name] = e
}
}

for _, ef := range finalState.EnvoyFilters {
envoyfilter, ok := preEnvoyFilterMap[ef.Name]
if !ok {
logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace)
fillEnvoyFilterMeta(ef)

if err := r.Create(ctx, ef); err != nil {
nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace}
return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName)
}

} else {
if proto.Equal(&envoyfilter.Spec, &ef.Spec) {
continue
}

logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace)
fillEnvoyFilterMeta(ef)
// Address metadata.resourceVersion: Invalid value: 0x0 error
ef.SetResourceVersion(envoyfilter.ResourceVersion)
if err := r.Update(ctx, ef); err != nil {
nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace}
return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName)
}
}
}

return nil
generatedEnvoyFilters := finalState.EnvoyFilters
return r.Output.WriteEnvoyFilters(ctx, procession.ConfigSourceHTTPFilterPolicy, generatedEnvoyFilters)
}

func (r *HTTPFilterPolicyReconciler) updatePolicies(ctx context.Context,
Expand Down
Loading

0 comments on commit 7ac677d

Please sign in to comment.