diff --git a/controller/registries/consul/config.go b/controller/registries/consul/config.go index 417a656b..2a18cda3 100644 --- a/controller/registries/consul/config.go +++ b/controller/registries/consul/config.go @@ -17,11 +17,15 @@ package consul import ( "fmt" "net/url" + "regexp" + "strings" "sync" "sync/atomic" "time" consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + istioapi "istio.io/api/networking/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "mosn.io/htnn/controller/pkg/registry" @@ -39,6 +43,7 @@ func init() { store: store, name: om.Name, softDeletedServices: map[consulService]bool{}, + subscriptions: make(map[string]*watch.Plan), done: make(chan struct{}), } return reg, nil @@ -54,6 +59,7 @@ type Consul struct { lock sync.RWMutex watchingServices map[consulService]bool + subscriptions map[string]*watch.Plan softDeletedServices map[consulService]bool done chan struct{} @@ -67,8 +73,13 @@ type Client struct { DataCenter string NameSpace string Token string + Address string } +var ( + RegistryType = "consul" +) + func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { uri, err := url.Parse(config.ServerUrl) if err != nil { @@ -91,6 +102,7 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { DataCenter: config.DataCenter, NameSpace: config.Namespace, Token: config.Token, + Address: clientConfig.Address, }, nil } @@ -115,6 +127,15 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error { return err } + for key := range services { + err = reg.subscribe(key.Tag, key.ServiceName) + if err != nil { + reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key) + // the service will be resubscribed after refresh interval + delete(services, key) + } + } + reg.watchingServices = services dur := 30 * time.Second @@ -159,15 +180,72 @@ func (reg *Consul) Stop() error { reg.lock.Lock() defer reg.lock.Unlock() + for key := range reg.softDeletedServices { + if _, ok := reg.watchingServices[key]; !ok { + reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName)) + } + } + for key := range reg.watchingServices { + reg.removeService(key) + } return nil } func (reg *Consul) Reload(c registrytype.RegistryConfig) error { - fmt.Println(c) + config := c.(*consul.Config) + + client, err := reg.NewClient(config) + if err != nil { + return err + } + + fetchedServices, err := reg.fetchAllServices(client) + if err != nil { + return fmt.Errorf("fetch all services error: %v", err) + } + + for key := range reg.softDeletedServices { + if _, ok := fetchedServices[key]; !ok { + reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName)) + } + } + reg.softDeletedServices = map[consulService]bool{} + + for key := range reg.watchingServices { + // unsubscribe with the previous client + if _, ok := fetchedServices[key]; !ok { + reg.removeService(key) + } else { + err = reg.unsubscribe(key.ServiceName) + if err != nil { + reg.logger.Errorf("failed to unsubscribe service, err: %v, service: %v", err, key) + } + } + } + + reg.client = client + + for key := range fetchedServices { + err = reg.subscribe(key.Tag, key.ServiceName) + if err != nil { + reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key) + delete(fetchedServices, key) + } + } + reg.watchingServices = fetchedServices + return nil } +func (reg *Consul) removeService(key consulService) { + err := reg.unsubscribe(key.ServiceName) + if err != nil { + reg.logger.Errorf("failed to unsubscribe service, err: %v, service: %v", err, key) + } + reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName)) +} + func (reg *Consul) fetchAllServices(client *Client) (map[consulService]bool, error) { q := &consulapi.QueryOptions{} q.Datacenter = client.DataCenter @@ -181,46 +259,140 @@ func (reg *Consul) fetchAllServices(client *Client) (map[consulService]bool, err } serviceMap := make(map[consulService]bool) for serviceName, tags := range services { - for _, tag := range tags { - service := consulService{ - Tag: tag, - ServiceName: serviceName, - } - serviceMap[service] = true + tag := strings.Join(tags, "-") + service := consulService{ + Tag: tag, + ServiceName: serviceName, } + serviceMap[service] = true } + return serviceMap, nil } -func (reg *Consul) subscribe(serviceName string) error { - fmt.Println(serviceName) +func (reg *Consul) getServiceEntryKey(tag, serviceName string) string { + host := strings.Join([]string{tag, serviceName, reg.client.NameSpace, reg.client.DataCenter, reg.name, RegistryType}, ".") + host = strings.ReplaceAll(host, "_", "-") + + re := regexp.MustCompile(`\.+`) + h := re.ReplaceAllString(host, ".") + h = strings.Trim(h, ".") + return strings.ToLower(h) +} + +func (reg *Consul) generateServiceEntry(host string, services []*consulapi.ServiceEntry) *registry.ServiceEntryWrapper { + portList := make([]*istioapi.ServicePort, 0, 1) + endpoints := make([]*istioapi.WorkloadEntry, 0, len(services)) + + for _, service := range services { + protocol := registry.HTTP + if service.Service.Meta == nil { + service.Service.Meta = make(map[string]string) + } + + if service.Service.Meta["protocol"] != "" { + protocol = registry.ParseProtocol(service.Service.Meta["protocol"]) + } + + port := &istioapi.ServicePort{ + Name: string(protocol), + Number: uint32(service.Service.Port), + Protocol: string(protocol), + } + if len(portList) == 0 { + portList = append(portList, port) + } + + endpoint := istioapi.WorkloadEntry{ + Address: service.Service.Address, + Ports: map[string]uint32{port.Protocol: port.Number}, + Labels: service.Service.Meta, + } + endpoints = append(endpoints, &endpoint) + } + + return ®istry.ServiceEntryWrapper{ + ServiceEntry: istioapi.ServiceEntry{ + Hosts: []string{host}, + Ports: portList, + Location: istioapi.ServiceEntry_MESH_INTERNAL, + Resolution: istioapi.ServiceEntry_STATIC, + Endpoints: endpoints, + }, + Source: RegistryType, + } +} + +func (reg *Consul) subscribe(tag, serviceName string) error { + plan, err := watch.Parse(map[string]interface{}{ + "type": "service", + "service": serviceName, + }) + if err != nil { + return err + } + + plan.Handler = reg.getSubscribeCallback(tag, serviceName) + plan.Token = reg.client.Token + plan.Datacenter = reg.client.DataCenter + reg.subscriptions[serviceName] = plan + + go func() { + err := plan.Run(reg.client.Address) + if err != nil { + reg.logger.Errorf("failed to subscribe ,err=%v", err) + } + }() + return nil } +func (reg *Consul) getSubscribeCallback(tag, serviceName string) func(idx uint64, data interface{}) { + host := reg.getServiceEntryKey(tag, serviceName) + return func(idx uint64, data interface{}) { + services, ok := data.([]*consulapi.ServiceEntry) + if !ok { + reg.logger.Infof("Unexpected type for data in callback: %t", data) + return + } + if reg.stopped.Load() { + return + } + reg.store.Update(host, reg.generateServiceEntry(host, services)) + } + +} + func (reg *Consul) unsubscribe(serviceName string) error { - fmt.Println(serviceName) + plan, exists := reg.subscriptions[serviceName] + if !exists { + return fmt.Errorf("no subscription found for service %s", serviceName) + } + + plan.Stop() + delete(reg.subscriptions, serviceName) return nil } func (reg *Consul) refresh(services map[string][]string) { + serviceMap := make(map[consulService]bool) + for serviceName, tags := range services { - for _, tag := range tags { - service := consulService{ - Tag: tag, - ServiceName: serviceName, - } - serviceMap[service] = true - if _, ok := reg.watchingServices[service]; !ok { - err := reg.subscribe(serviceName) - if err != nil { - reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, serviceName) - delete(serviceMap, service) - } + tag := strings.Join(tags, "-") + service := consulService{ + Tag: tag, + ServiceName: serviceName, + } + serviceMap[service] = true + if _, ok := reg.watchingServices[service]; !ok { + err := reg.subscribe("", service.ServiceName) + if err != nil { + reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, service.ServiceName) + delete(serviceMap, service) } } } - prevFetchServices := reg.watchingServices reg.watchingServices = serviceMap diff --git a/controller/registries/consul/config_test.go b/controller/registries/consul/config_test.go index bbbc1753..c451f100 100644 --- a/controller/registries/consul/config_test.go +++ b/controller/registries/consul/config_test.go @@ -17,12 +17,19 @@ package consul import ( "errors" "reflect" + "sync" + "sync/atomic" "testing" "github.com/agiledragon/gomonkey/v2" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + istioapi "istio.io/api/networking/v1alpha3" + "mosn.io/htnn/controller/pkg/registry" "mosn.io/htnn/controller/pkg/registry/log" "mosn.io/htnn/types/registries/consul" ) @@ -47,6 +54,7 @@ func TestNewClient(t *testing.T) { assert.Error(t, err) assert.Nil(t, client) + } func TestStart(t *testing.T) { @@ -54,22 +62,25 @@ func TestStart(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), - done: make(chan struct{}), + softDeletedServices: map[consulService]bool{}, + subscriptions: make(map[string]*watch.Plan), + done: make(chan struct{}), + lock: sync.RWMutex{}, } + config := &consul.Config{} + patches := gomonkey.ApplyPrivateMethod(reflect.TypeOf(reg), "fetchAllServices", func(_ *Consul, client *Client) (map[consulService]bool, error) { return map[consulService]bool{ {ServiceName: "service1", Tag: "tag1"}: true, {ServiceName: "service2", Tag: "tag2"}: true, }, nil }) - config := &consul.Config{} - err := reg.Start(config) - assert.Nil(t, err) - err = reg.subscribe("123") - assert.Nil(t, err) + patches.ApplyPrivateMethod(reg, "subscribe", func(tag, serviceName string) error { return nil }) + patches.ApplyPrivateMethod(reg, "unsubscribe", func(serviceName string) error { return nil }) + patches.ApplyPrivateMethod(reg, "removeService", func(key consulService) {}) - err = reg.unsubscribe("123") + err := reg.Start(config) assert.Nil(t, err) err = reg.Stop() @@ -77,31 +88,25 @@ func TestStart(t *testing.T) { patches.Reset() - config = &consul.Config{} - reg = &Consul{ logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), - done: make(chan struct{}), + softDeletedServices: map[consulService]bool{}, + subscriptions: make(map[string]*watch.Plan), + done: make(chan struct{}), + lock: sync.RWMutex{}, + } + + config = &consul.Config{ + ServerUrl: "::::::::::::", } err = reg.Start(config) assert.Error(t, err) - close(reg.done) } -func TestReload(t *testing.T) { - reg := &Consul{} - config := &consul.Config{ - ServerUrl: "http://127.0.0.1:8500", - } - - err := reg.Reload(config) - assert.NoError(t, err) -} - func TestRefresh(t *testing.T) { reg := &Consul{ logger: log.NewLogger(&log.RegistryLoggerOptions{ @@ -110,24 +115,34 @@ func TestRefresh(t *testing.T) { softDeletedServices: map[consulService]bool{}, done: make(chan struct{}), watchingServices: map[consulService]bool{}, + lock: sync.RWMutex{}, } config := &consul.Config{ - ServerUrl: "http://127.0.0.1:8500", + ServerUrl: "::::::::::::::::::", } + client, _ := reg.NewClient(config) reg.client = client services := map[string][]string{ - "service1": {"dc1", "dc2"}, - "service2": {"dc1"}, + "service1": {"tag1", "tag2"}, + "service2": {"tag1"}, } + patches := gomonkey.ApplyPrivateMethod(reflect.TypeOf(reg), "fetchAllServices", func(_ *Consul, client *Client) (map[consulService]bool, error) { + return map[consulService]bool{ + {ServiceName: "service1", Tag: "tag1"}: true, + {ServiceName: "service2", Tag: "tag2"}: true, + }, nil + }) + patches.ApplyPrivateMethod(reg, "subscribe", func(serviceName string) error { return nil }) + defer patches.Reset() + reg.refresh(services) - assert.Len(t, reg.watchingServices, 3) - assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service1", Tag: "dc1"}) - assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service1", Tag: "dc2"}) - assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service2", Tag: "dc1"}) + assert.Len(t, reg.watchingServices, 2) + assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service1", Tag: "tag1-tag2"}) + assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service2", Tag: "tag1"}) assert.Empty(t, reg.softDeletedServices) reg = &Consul{ @@ -136,8 +151,9 @@ func TestRefresh(t *testing.T) { }), softDeletedServices: map[consulService]bool{}, watchingServices: map[consulService]bool{ - {ServiceName: "service1", Tag: "dc1"}: true, + {ServiceName: "service1", Tag: "tag1"}: true, }, + lock: sync.RWMutex{}, } services = map[string][]string{} @@ -155,6 +171,7 @@ func TestFetchAllServices(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), + lock: sync.RWMutex{}, } client := &Client{ consulCatalog: &api.Catalog{}, @@ -174,8 +191,7 @@ func TestFetchAllServices(t *testing.T) { services, err := reg.fetchAllServices(client) assert.NoError(t, err) assert.NotNil(t, services) - assert.True(t, services[consulService{ServiceName: "service1", Tag: "tag1"}]) - assert.True(t, services[consulService{ServiceName: "service1", Tag: "tag2"}]) + assert.True(t, services[consulService{ServiceName: "service1", Tag: "tag1-tag2"}]) assert.True(t, services[consulService{ServiceName: "service2", Tag: "tag3"}]) }) @@ -184,6 +200,7 @@ func TestFetchAllServices(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), + lock: sync.RWMutex{}, } client := &Client{ consulCatalog: &api.Catalog{}, @@ -203,3 +220,210 @@ func TestFetchAllServices(t *testing.T) { assert.Nil(t, services) }) } +func TestGenerateServiceEntry(t *testing.T) { + host := "test.default.default-dc.earth.consul" + reg := &Consul{} + + type test struct { + name string + services []*api.ServiceEntry + port *istioapi.ServicePort + endpoint *istioapi.WorkloadEntry + } + tests := []test{} + for input, proto := range registry.ProtocolMap { + s := string(proto) + tests = append(tests, test{ + name: input, + services: []*api.ServiceEntry{ + { + Service: &api.AgentService{ + Port: 80, + Address: "1.1.1.1", + Meta: map[string]string{ + "protocol": input, + }, + Namespace: "default", + }, + }, + }, + port: &istioapi.ServicePort{ + Name: s, + Protocol: s, + Number: 80, + }, + endpoint: &istioapi.WorkloadEntry{ + Address: "1.1.1.1", + Ports: map[string]uint32{s: 80}, + Labels: map[string]string{ + "protocol": input, + }, + }, + }) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + se := reg.generateServiceEntry(host, tt.services) + require.True(t, proto.Equal(se.ServiceEntry.Ports[0], tt.port)) + require.True(t, proto.Equal(se.ServiceEntry.Endpoints[0], tt.endpoint)) + }) + } +} + +func TestGetServiceEntryKey(t *testing.T) { + reg := &Consul{ + client: &Client{ + NameSpace: "default_namespace", + DataCenter: "dc1", + }, + name: "test_registry", + } + + testCases := []struct { + serviceName string + expectedKey string + tag string + }{ + { + serviceName: "service", + expectedKey: "service.default-namespace.dc1.test-registry.consul", + }, + { + serviceName: "service", + expectedKey: "service.default-namespace.dc1.test-registry.consul", + }, + { + tag: "default", + serviceName: "service", + expectedKey: "default.service.default-namespace.dc1.test-registry.consul", + }, + } + + for _, tt := range testCases { + t.Run(tt.serviceName, func(t *testing.T) { + result := reg.getServiceEntryKey(tt.tag, tt.serviceName) + assert.Equal(t, tt.expectedKey, result) + }) + } +} + +type fakeServiceEntryStore struct { +} + +func (f *fakeServiceEntryStore) Delete(service string) { +} + +func (f *fakeServiceEntryStore) Update(service string, se *registry.ServiceEntryWrapper) { +} + +func TestGetSubscribeCallback(t *testing.T) { + reg := &Consul{ + store: &fakeServiceEntryStore{}, + stopped: atomic.Bool{}, + } + + patch := gomonkey.ApplyPrivateMethod(reflect.TypeOf(reg), "getServiceEntryKey", func(_ *Consul, serviceName string) string { + return "test.default.default-dc.earth.consul" + }) + defer patch.Reset() + + patch.ApplyPrivateMethod(reflect.TypeOf(reg), "generateServiceEntry", func(_ *Consul, host string, services []*api.ServiceEntry) *registry.ServiceEntryWrapper { + return ®istry.ServiceEntryWrapper{} + }) + + callback := reg.getSubscribeCallback("", "test-service") + + var services []*api.ServiceEntry + callback(0, services) + +} + +func TestReload(t *testing.T) { + reg := &Consul{ + watchingServices: make(map[consulService]bool), + softDeletedServices: make(map[consulService]bool), + subscriptions: make(map[string]*watch.Plan), + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + store: &fakeServiceEntryStore{}, + lock: sync.RWMutex{}, + } + + patches := gomonkey.ApplyFunc(reg.NewClient, func(config *consul.Config) (*Client, error) { + return &Client{ + Address: "new-client-address", + Token: "new-token", + DataCenter: "new-datacenter", + }, nil + }) + + service := consulService{"test-service", "new-datacenter"} + patches.ApplyPrivateMethod(reflect.TypeOf(reg), "fetchAllServices", func(client *Client) (map[consulService]bool, error) { + return map[consulService]bool{ + service: true, + }, nil + }) + + patches.ApplyPrivateMethod(reflect.TypeOf(reg), "subscribe", func(_ *Consul, serviceName string) error { + return nil + }) + + patches.ApplyPrivateMethod(reflect.TypeOf(reg), "unsubscribe", func(_ *Consul, serviceName string) error { + return nil + }) + + err := reg.Reload(&consul.Config{}) + + assert.Nil(t, err) + assert.Equal(t, reg.client.Address, "127.0.0.1:8500") + assert.Contains(t, reg.watchingServices, consulService{"test-service", "new-datacenter"}) + + reg.removeService(service) + + patches.Reset() +} + +func TestSubscribe(t *testing.T) { + reg := &Consul{ + client: &Client{ + Token: "test-token", + DataCenter: "dc1", + Address: "127.0.0.1:8500", + }, + subscriptions: make(map[string]*watch.Plan), + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + store: &fakeServiceEntryStore{}, + lock: sync.RWMutex{}, + } + + plan := &watch.Plan{ + Token: "test-token", + Datacenter: "dc1", + Handler: func(idx uint64, data interface{}) { + }, + } + + patch := gomonkey.ApplyMethod(reflect.TypeOf(plan), "Run", func(_ *watch.Plan, address string) error { + return nil + }) + + err := reg.subscribe("", "test-service") + + assert.Nil(t, err) + assert.NotNil(t, reg.subscriptions["test-service"]) + assert.Equal(t, reg.subscriptions["test-service"].Token, "test-token") + assert.Equal(t, reg.subscriptions["test-service"].Datacenter, "dc1") + + patch.ApplyMethod(reflect.TypeOf(&watch.Plan{}), "Stop", func(_ *watch.Plan) {}) + err = reg.unsubscribe("test-service") + patch.Reset() + + assert.Nil(t, err) + assert.Nil(t, reg.subscriptions["test-service"]) + _, exists := reg.subscriptions["test-service"] + assert.False(t, exists) +} diff --git a/controller/registries/registries.go b/controller/registries/registries.go index 6cb426fe..348c9a0a 100644 --- a/controller/registries/registries.go +++ b/controller/registries/registries.go @@ -15,5 +15,6 @@ package registries import ( + _ "mosn.io/htnn/controller/registries/consul" _ "mosn.io/htnn/controller/registries/nacos" ) diff --git a/controller/tests/integration/registries/consul_test.go b/controller/tests/integration/registries/consul_test.go new file mode 100644 index 00000000..15dca978 --- /dev/null +++ b/controller/tests/integration/registries/consul_test.go @@ -0,0 +1,277 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registries + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "path/filepath" + "strconv" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + istioapi "istio.io/api/networking/v1alpha3" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/pkg/constant" + "mosn.io/htnn/controller/tests/integration/helper" + "mosn.io/htnn/controller/tests/pkg" + mosniov1 "mosn.io/htnn/types/apis/v1" +) + +var ( + currConsul *mosniov1.ServiceRegistry +) + +func enableConsul(consulInstance string) { + var input []map[string]interface{} + fn := filepath.Join("testdata", "consul", consulInstance+".yml") + helper.MustReadInput(fn, &input) + for _, in := range input { + obj := pkg.MapToObj(in) + Expect(k8sClient.Create(ctx, obj)).Should(Succeed()) + Eventually(func() bool { + err := k8sClient.Get(ctx, client.ObjectKey{Name: consulInstance, Namespace: "default"}, obj) + currConsul = obj.(*mosniov1.ServiceRegistry) + return err == nil + }, 10*time.Millisecond, 5*time.Second).Should(BeTrue()) + } +} + +func disableConsul(consulInstance string) { + sr := &mosniov1.ServiceRegistry{} + sr.SetName(consulInstance) + sr.SetNamespace("default") + Expect(k8sClient.Delete(context.Background(), sr)).Should(Succeed()) +} + +func listConsulServiceEntries() []*istiov1a3.ServiceEntry { + var entries istiov1a3.ServiceEntryList + Expect(k8sClient.List(ctx, &entries, client.MatchingLabels{constant.LabelCreatedBy: "ServiceRegistry"})).Should(Succeed()) + return entries.Items +} + +func registerConsulInstance(consulPort string, name string, ip string, port string, metadata map[string]any) { + consulServerURL := "http://0.0.0.0:" + consulPort + + portInt, err := strconv.Atoi(port) + Expect(err).To(BeNil()) + + service := map[string]any{ + "Node": "node1", + "Address": ip, + "Service": map[string]any{ + "ID": name + ip + port, + "Service": name, + "Address": ip, + "Port": portInt, + }, + } + + if metadata != nil { + service["Service"].(map[string]any)["Meta"] = metadata + } + + body, err := json.Marshal(service) + Expect(err).To(BeNil()) + + fullURL := consulServerURL + "/v1/catalog/register" + + req, err := http.NewRequest("PUT", fullURL, bytes.NewBuffer(body)) + req.Header.Set("Content-Type", "application/json") + Expect(err).To(BeNil()) + + client := &http.Client{} + resp, err := client.Do(req) + Expect(err).To(BeNil()) + + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(200)) +} + +func deregisterConsulInstance(consulPort string, name string, ip string, port string) { + consulServerURL := "http://0.0.0.0:" + consulPort + + serviceID := name + ip + port + body := map[string]any{ + "Node": "node1", + "ServiceID": serviceID, + } + + bodyJSON, err := json.Marshal(body) + Expect(err).To(BeNil()) + + fullURL := consulServerURL + "/v1/catalog/deregister" + + req, err := http.NewRequest("PUT", fullURL, bytes.NewBuffer(bodyJSON)) + req.Header.Set("Content-Type", "application/json") + Expect(err).To(BeNil()) + + client := &http.Client{} + resp, err := client.Do(req) + Expect(err).To(BeNil()) + Expect(resp.StatusCode).To(Equal(200)) +} + +func deleteConsulService(consulPort string, name string, ip string, port string) { + deregisterConsulInstance(consulPort, name, ip, port) +} + +var _ = Describe("Consul", func() { + + const ( + timeout = time.Second * 30 + interval = time.Millisecond * 250 + ) + + helper.WaitServiceUp(":8500", "Consul") + + AfterEach(func() { + var registries mosniov1.ServiceRegistryList + if err := k8sClient.List(ctx, ®istries); err == nil { + for _, e := range registries.Items { + pkg.DeleteK8sResource(ctx, k8sClient, &e) + } + } + + Eventually(func() bool { + entries := listConsulServiceEntries() + return len(entries) == 0 + }, timeout, interval).Should(BeTrue()) + }) + + It("service life cycle", func() { + enableConsul("default") + + registerConsulInstance("8500", "test", "1.2.3.4", "8080", nil) + + var entries []*istiov1a3.ServiceEntry + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries) == 2 + }, timeout, interval).Should(BeTrue()) + + Expect(entries[1].Name).To(Equal("test.default.consul")) + Expect(entries[1].Spec.GetHosts()).To(Equal([]string{"test.default.consul"})) + Expect(entries[1].Spec.Location).To(Equal(istioapi.ServiceEntry_MESH_INTERNAL)) + Expect(entries[1].Spec.Resolution).To(Equal(istioapi.ServiceEntry_STATIC)) + Expect(len(entries[1].Spec.Endpoints)).To(Equal(1)) + Expect(entries[1].Spec.Endpoints[0].Address).To(Equal("1.2.3.4")) + Expect(entries[1].Spec.Endpoints[0].Ports).To(Equal(map[string]uint32{ + "HTTP": 8080, + })) + + registerConsulInstance("8500", "test", "1.2.3.5", "8080", nil) + + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries[1].Spec.Endpoints) == 2 + }, timeout, interval).Should(BeTrue()) + + deregisterConsulInstance("8500", "test", "1.2.3.4", "8080") + + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries[1].Spec.Endpoints) == 1 + }, timeout, interval).Should(BeTrue()) + + deleteConsulService("8500", "test", "1.2.3.5", "8080") + }) + + It("stop consul should remove service entries", func() { + registerConsulInstance("8500", "test", "1.2.3.4", "8080", nil) + enableConsul("default") + + Eventually(func() bool { + entries := listConsulServiceEntries() + return len(entries) == 2 + }, timeout, interval).Should(BeTrue()) + + disableConsul("default") + + Eventually(func() bool { + entries := listConsulServiceEntries() + return len(entries) == 0 + }, timeout, interval).Should(BeTrue()) + + deleteConsulService("8500", "test", "1.2.3.4", "8080") + }) + + It("reload", func() { + registerConsulInstance("8500", "test", "1.2.3.4", "8080", nil) + registerConsulInstance("8500", "test1", "1.2.3.4", "8080", nil) + registerConsulInstance("8500", "test2", "1.2.3.4", "8080", nil) + registerConsulInstance("8501", "test", "1.2.3.5", "8080", nil) + registerConsulInstance("8501", "test3", "1.2.3.5", "8080", nil) + + // old + enableConsul("default") + var entries []*istiov1a3.ServiceEntry + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries) == 4 + }, timeout, interval).Should(BeTrue()) + Expect(entries[1].Spec.Endpoints[0].Address).To(Equal("1.2.3.4")) + + // new + base := client.MergeFrom(currConsul.DeepCopy()) + currConsul.Spec.Config.Raw = []byte(`{"serviceRefreshInterval":"1s", "serverUrl":"http://127.0.0.1:8501"}`) + Expect(k8sClient.Patch(ctx, currConsul, base)).Should(Succeed()) + Eventually(func() bool { + entries = listConsulServiceEntries() + + return len(entries) == 3 && entries[1].Spec.Endpoints[0].Address == "1.2.3.5" + }, timeout, interval).Should(BeTrue()) + + // refresh & unsubscribe + deleteConsulService("8501", "test3", "1.2.3.5", "8080") + time.Sleep(1 * time.Second) + entries = listConsulServiceEntries() + + Expect(len(entries)).To(Equal(5)) + + // ServiceEntry is removed only when the configuration changed + base = client.MergeFrom(currConsul.DeepCopy()) + currConsul.Spec.Config.Raw = []byte(`{"serviceRefreshInterval":"2s", "serverUrl":"http://127.0.0.1:8501"}`) + Expect(k8sClient.Patch(ctx, currConsul, base)).Should(Succeed()) + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries) == 2 + }, timeout, interval).Should(BeTrue()) + + // subscribe change + registerConsulInstance("8501", "test", "1.2.4.5", "8080", nil) + deleteConsulService("8500", "test", "1.2.3.4", "8080") // should be ignored + Eventually(func() bool { + entries = listConsulServiceEntries() + return len(entries[1].Spec.Endpoints) == 2 + }, timeout, interval).Should(BeTrue()) + + // unsubscribe + disableConsul("default") + Eventually(func() bool { + entries := listConsulServiceEntries() + return len(entries) == 0 + }, timeout, interval).Should(BeTrue()) + deleteConsulService("8500", "test1", "1.2.3.4", "8080") + deleteConsulService("8500", "test2", "1.2.3.4", "8080") + deleteConsulService("8501", "test", "1.2.4.5", "8080") + }) +}) diff --git a/controller/tests/integration/registries/testdata/consul/default.yml b/controller/tests/integration/registries/testdata/consul/default.yml new file mode 100644 index 00000000..d8dbd41a --- /dev/null +++ b/controller/tests/integration/registries/testdata/consul/default.yml @@ -0,0 +1,10 @@ +- apiVersion: htnn.mosn.io/v1 + kind: ServiceRegistry + metadata: + name: default + namespace: default + spec: + type: consul + config: + serverUrl: http://127.0.0.1:8500 + serviceRefreshInterval: 1s diff --git a/controller/tests/testdata/services/docker-compose.yml b/controller/tests/testdata/services/docker-compose.yml index 728b50f5..fd2e2fd7 100644 --- a/controller/tests/testdata/services/docker-compose.yml +++ b/controller/tests/testdata/services/docker-compose.yml @@ -17,6 +17,23 @@ services: - "8849:8848" networks: service: + consul: + image: docker.m.daocloud.io/library/consul:1.15.4 + ports: + - "8500:8500" + - "8600:8600" + command: agent -server -bootstrap -client=0.0.0.0 + networks: + service: + consul1: + image: docker.m.daocloud.io/library/consul:1.15.4 + ports: + - "8501:8500" + - "8601:8600" + command: agent -server -bootstrap -client=0.0.0.0 + networks: + service: + networks: