Skip to content

Commit

Permalink
feat: support Consul as ServiceRegistry (#695)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyt122 authored Aug 27, 2024
1 parent a8f4622 commit c0e2633
Show file tree
Hide file tree
Showing 6 changed files with 755 additions and 54 deletions.
218 changes: 195 additions & 23 deletions controller/registries/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package consul
import (
"fmt"
"net/url"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
istioapi "istio.io/api/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"mosn.io/htnn/controller/pkg/registry"
Expand All @@ -39,6 +43,7 @@ func init() {
store: store,
name: om.Name,
softDeletedServices: map[consulService]bool{},
subscriptions: make(map[string]*watch.Plan),
done: make(chan struct{}),
}
return reg, nil
Expand All @@ -54,6 +59,7 @@ type Consul struct {

lock sync.RWMutex
watchingServices map[consulService]bool
subscriptions map[string]*watch.Plan
softDeletedServices map[consulService]bool

done chan struct{}
Expand All @@ -67,8 +73,13 @@ type Client struct {
DataCenter string
NameSpace string
Token string
Address string
}

var (
RegistryType = "consul"
)

func (reg *Consul) NewClient(config *consul.Config) (*Client, error) {
uri, err := url.Parse(config.ServerUrl)
if err != nil {
Expand All @@ -91,6 +102,7 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) {
DataCenter: config.DataCenter,
NameSpace: config.Namespace,
Token: config.Token,
Address: clientConfig.Address,
}, nil
}

Expand All @@ -115,6 +127,15 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error {
return err
}

for key := range services {
err = reg.subscribe(key.Tag, key.ServiceName)
if err != nil {
reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key)
// the service will be resubscribed after refresh interval
delete(services, key)
}
}

reg.watchingServices = services

dur := 30 * time.Second
Expand Down Expand Up @@ -159,15 +180,72 @@ func (reg *Consul) Stop() error {

reg.lock.Lock()
defer reg.lock.Unlock()
for key := range reg.softDeletedServices {
if _, ok := reg.watchingServices[key]; !ok {
reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName))
}
}
for key := range reg.watchingServices {
reg.removeService(key)
}

return nil
}

func (reg *Consul) Reload(c registrytype.RegistryConfig) error {
fmt.Println(c)
config := c.(*consul.Config)

client, err := reg.NewClient(config)
if err != nil {
return err
}

fetchedServices, err := reg.fetchAllServices(client)
if err != nil {
return fmt.Errorf("fetch all services error: %v", err)
}

for key := range reg.softDeletedServices {
if _, ok := fetchedServices[key]; !ok {
reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName))
}
}
reg.softDeletedServices = map[consulService]bool{}

for key := range reg.watchingServices {
// unsubscribe with the previous client
if _, ok := fetchedServices[key]; !ok {
reg.removeService(key)
} else {
err = reg.unsubscribe(key.ServiceName)
if err != nil {
reg.logger.Errorf("failed to unsubscribe service, err: %v, service: %v", err, key)
}
}
}

reg.client = client

for key := range fetchedServices {
err = reg.subscribe(key.Tag, key.ServiceName)
if err != nil {
reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key)
delete(fetchedServices, key)
}
}
reg.watchingServices = fetchedServices

return nil
}

func (reg *Consul) removeService(key consulService) {
err := reg.unsubscribe(key.ServiceName)
if err != nil {
reg.logger.Errorf("failed to unsubscribe service, err: %v, service: %v", err, key)
}
reg.store.Delete(reg.getServiceEntryKey(key.Tag, key.ServiceName))
}

func (reg *Consul) fetchAllServices(client *Client) (map[consulService]bool, error) {
q := &consulapi.QueryOptions{}
q.Datacenter = client.DataCenter
Expand All @@ -181,46 +259,140 @@ func (reg *Consul) fetchAllServices(client *Client) (map[consulService]bool, err
}
serviceMap := make(map[consulService]bool)
for serviceName, tags := range services {
for _, tag := range tags {
service := consulService{
Tag: tag,
ServiceName: serviceName,
}
serviceMap[service] = true
tag := strings.Join(tags, "-")
service := consulService{
Tag: tag,
ServiceName: serviceName,
}
serviceMap[service] = true
}

return serviceMap, nil
}

func (reg *Consul) subscribe(serviceName string) error {
fmt.Println(serviceName)
func (reg *Consul) getServiceEntryKey(tag, serviceName string) string {
host := strings.Join([]string{tag, serviceName, reg.client.NameSpace, reg.client.DataCenter, reg.name, RegistryType}, ".")
host = strings.ReplaceAll(host, "_", "-")

re := regexp.MustCompile(`\.+`)
h := re.ReplaceAllString(host, ".")
h = strings.Trim(h, ".")
return strings.ToLower(h)
}

func (reg *Consul) generateServiceEntry(host string, services []*consulapi.ServiceEntry) *registry.ServiceEntryWrapper {
portList := make([]*istioapi.ServicePort, 0, 1)
endpoints := make([]*istioapi.WorkloadEntry, 0, len(services))

for _, service := range services {
protocol := registry.HTTP
if service.Service.Meta == nil {
service.Service.Meta = make(map[string]string)
}

if service.Service.Meta["protocol"] != "" {
protocol = registry.ParseProtocol(service.Service.Meta["protocol"])
}

port := &istioapi.ServicePort{
Name: string(protocol),
Number: uint32(service.Service.Port),
Protocol: string(protocol),
}
if len(portList) == 0 {
portList = append(portList, port)
}

endpoint := istioapi.WorkloadEntry{
Address: service.Service.Address,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Service.Meta,
}
endpoints = append(endpoints, &endpoint)
}

return &registry.ServiceEntryWrapper{
ServiceEntry: istioapi.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: istioapi.ServiceEntry_MESH_INTERNAL,
Resolution: istioapi.ServiceEntry_STATIC,
Endpoints: endpoints,
},
Source: RegistryType,
}
}

func (reg *Consul) subscribe(tag, serviceName string) error {
plan, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": serviceName,
})
if err != nil {
return err
}

plan.Handler = reg.getSubscribeCallback(tag, serviceName)
plan.Token = reg.client.Token
plan.Datacenter = reg.client.DataCenter
reg.subscriptions[serviceName] = plan

go func() {
err := plan.Run(reg.client.Address)
if err != nil {
reg.logger.Errorf("failed to subscribe ,err=%v", err)
}
}()

return nil
}

func (reg *Consul) getSubscribeCallback(tag, serviceName string) func(idx uint64, data interface{}) {
host := reg.getServiceEntryKey(tag, serviceName)
return func(idx uint64, data interface{}) {
services, ok := data.([]*consulapi.ServiceEntry)
if !ok {
reg.logger.Infof("Unexpected type for data in callback: %t", data)
return
}
if reg.stopped.Load() {
return
}
reg.store.Update(host, reg.generateServiceEntry(host, services))
}

}

func (reg *Consul) unsubscribe(serviceName string) error {
fmt.Println(serviceName)
plan, exists := reg.subscriptions[serviceName]
if !exists {
return fmt.Errorf("no subscription found for service %s", serviceName)
}

plan.Stop()
delete(reg.subscriptions, serviceName)
return nil
}

func (reg *Consul) refresh(services map[string][]string) {

serviceMap := make(map[consulService]bool)

for serviceName, tags := range services {
for _, tag := range tags {
service := consulService{
Tag: tag,
ServiceName: serviceName,
}
serviceMap[service] = true
if _, ok := reg.watchingServices[service]; !ok {
err := reg.subscribe(serviceName)
if err != nil {
reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, serviceName)
delete(serviceMap, service)
}
tag := strings.Join(tags, "-")
service := consulService{
Tag: tag,
ServiceName: serviceName,
}
serviceMap[service] = true
if _, ok := reg.watchingServices[service]; !ok {
err := reg.subscribe("", service.ServiceName)
if err != nil {
reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, service.ServiceName)
delete(serviceMap, service)
}
}
}

prevFetchServices := reg.watchingServices
reg.watchingServices = serviceMap

Expand Down
Loading

0 comments on commit c0e2633

Please sign in to comment.