Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aws sd custom changes #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 75 additions & 11 deletions provider/awssd/aws_sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

const (
sdDefaultRecordTTL = 300
sdDefaultRecordTTL = 15

sdNamespaceTypePublic = "public"
sdNamespaceTypePrivate = "private"
Expand Down Expand Up @@ -196,25 +196,19 @@ func (p *AWSSDProvider) ApplyChanges(ctx context.Context, changes *plan.Changes)
creates, deletes := p.updatesToCreates(changes)
changes.Delete = append(changes.Delete, deletes...)
changes.Create = append(changes.Create, creates...)

dedupedCreate, dedupedDelete := p.DedupDeletesAndCreates(changes.Create, changes.Delete)

namespaces, err := p.ListNamespaces()
if err != nil {
return err
}

// Deletes must be executed first to support update case.
// When just list of targets is updated `[1.2.3.4] -> [1.2.3.4, 1.2.3.5]` it is translated to:
// ```
// deletes = [1.2.3.4]
// creates = [1.2.3.4, 1.2.3.5]
// ```
// then when deletes are executed after creates it will miss the `1.2.3.4` instance.
err = p.submitDeletes(namespaces, changes.Delete)
err = p.submitDeletes(namespaces, dedupedDelete)
if err != nil {
return err
}

err = p.submitCreates(namespaces, changes.Create)
err = p.submitCreates(namespaces, dedupedCreate)
if err != nil {
return err
}
Expand Down Expand Up @@ -243,6 +237,76 @@ func (p *AWSSDProvider) updatesToCreates(changes *plan.Changes) (creates []*endp
return creates, deletes
}

// DedupDeletesAndCreates removes targets that appear identically as both deletes and creates.
// These targets are redundant and result in overlapping API calls. Without deduplication, RegisterInstance could be
// invoked while DeregisterInstance is still in progress in AWS, resulting in failure to register the instance, and
// therefore in service disruption. Redundant targets may have been introduced by updatesToCreates or srvChangesHostnameToIP.
// Code is from https://github.com/kubernetes-sigs/external-dns/pull/1911
func (p *AWSSDProvider) DedupDeletesAndCreates(creates []*endpoint.Endpoint, deletes []*endpoint.Endpoint) ([]*endpoint.Endpoint, []*endpoint.Endpoint) {
// contains all targets appearing in "deletes", mapped by the DNS name of the respective endpoint
targetsByDeleteEp := map[string]map[string]bool{}
// contains all duplicate targets (appearing in both "deletes" and "creates"), mapped by the DNS name of the respective endpoint
dupTargetsByEp := map[string]map[string]bool{}

// populate targetsByDeleteEp
for _, e := range deletes {
if targetsByDeleteEp[e.DNSName] == nil {
targetsByDeleteEp[e.DNSName] = map[string]bool{}
}
for _, t := range e.Targets {
if _, ok := targetsByDeleteEp[e.DNSName][t]; !ok {
targetsByDeleteEp[e.DNSName][t] = true
}
}
}

// loop create endpoints and remove duplicate targets
for _, create := range creates {
// if no delete endpoint for this DNS name, then skip this endpoint
if targetsByDeleteEp[create.DNSName] == nil {
continue
}
TargetsDelete := targetsByDeleteEp[create.DNSName]
// i is the length of the deduplicated targets for the endpoint (initial targets count - duplicate targets count)
i := 0
// loop all targets in this endpoint
for _, createTarget := range create.Targets {
// if the target is not duplicate
if _, ok := TargetsDelete[createTarget]; !ok {
// copy the target and increment i
create.Targets[i] = createTarget
i++
// if the target is duplicate, add it to dupTargetsByEp
} else {
if dupTargetsByEp[create.DNSName] == nil {
dupTargetsByEp[create.DNSName] = map[string]bool{}
}
dupTargetsByEp[create.DNSName][createTarget] = true
}
}
// cut the slice up to i (count of deduplicated targets)
create.Targets = create.Targets[:i]
}

// loop delete endpoints and remove duplicate targets
for _, delete := range deletes {
if dupTargetsByEp[delete.DNSName] == nil {
continue
}
TargetsDup := dupTargetsByEp[delete.DNSName]
i := 0
for _, deleteTarget := range delete.Targets {
if _, ok := TargetsDup[deleteTarget]; !ok {
delete.Targets[i] = deleteTarget
i++
}
}
delete.Targets = delete.Targets[:i]
}

return creates, deletes
}

func (p *AWSSDProvider) submitCreates(namespaces []*sd.NamespaceSummary, changes []*endpoint.Endpoint) error {
changesByNamespaceID := p.changesByNamespaceID(namespaces, changes)

Expand Down
19 changes: 10 additions & 9 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (sc *serviceSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType &&
mergedEndpoints[lastMergedEndpoint].SetIdentifier == endpoints[i].SetIdentifier &&
mergedEndpoints[lastMergedEndpoint].RecordTTL == endpoints[i].RecordTTL {
mergedEndpoints[lastMergedEndpoint].Targets = append(mergedEndpoints[lastMergedEndpoint].Targets, endpoints[i].Targets[0])
mergedEndpoints[lastMergedEndpoint].Targets = append(mergedEndpoints[lastMergedEndpoint].Targets, endpoints[i].Targets...)
} else {
mergedEndpoints = append(mergedEndpoints, endpoints[i])
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}
}
if pod == nil {
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
log.Debugf("Pod %s not found for address %v", address.TargetRef.Name, address)
continue
}

Expand Down Expand Up @@ -380,9 +380,9 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End

var endpoints []*endpoint.Endpoint
for _, hostname := range hostnames {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
}

endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false, false)...)
}
return endpoints, nil
}

Expand All @@ -394,15 +394,16 @@ func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
providerSpecific, setIdentifier := getProviderSpecificAnnotations(svc.Annotations)
var hostnameList []string
var internalHostnameList []string
var exportEndpointForClusterIP bool = getExportEndpointFromAnnotations(svc.Annotations)

hostnameList = getHostnamesFromAnnotations(svc.Annotations)
for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false, exportEndpointForClusterIP)...)
}

internalHostnameList = getInternalHostnamesFromAnnotations(svc.Annotations)
for _, hostname := range internalHostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, true)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, true, exportEndpointForClusterIP)...)
}
}
return endpoints
Expand Down Expand Up @@ -458,7 +459,7 @@ func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endp
}
}

func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific, setIdentifier string, useClusterIP bool) []*endpoint.Endpoint {
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific, setIdentifier string, useClusterIP bool, exportEndpointForClusterIP bool) []*endpoint.Endpoint {
hostname = strings.TrimSuffix(hostname, ".")
ttl, err := getTTLFromAnnotations(svc.Annotations)
if err != nil {
Expand Down Expand Up @@ -503,7 +504,7 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, pro
targets = extractLoadBalancerTargets(svc, sc.resolveLoadBalancerHostname)
}
case v1.ServiceTypeClusterIP:
if svc.Spec.ClusterIP == v1.ClusterIPNone {
if svc.Spec.ClusterIP == v1.ClusterIPNone || exportEndpointForClusterIP {
endpoints = append(endpoints, sc.extractHeadlessEndpoints(svc, hostname, ttl)...)
} else if sc.publishInternal {
targets = extractServiceIps(svc)
Expand Down
7 changes: 7 additions & 0 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
controllerAnnotationValue = "dns-controller"
// The annotation used for defining the desired hostname
internalHostnameAnnotationKey = "external-dns.alpha.kubernetes.io/internal-hostname"
// The annotation used for exporting endpoint for service with ClusterIP
exportEndpointAnnotationKey = "external-dns.alpha.kubernetes.io/export-endpoint"
)

const (
Expand Down Expand Up @@ -174,6 +176,11 @@ func splitHostnameAnnotation(annotation string) []string {
return strings.Split(strings.Replace(annotation, " ", "", -1), ",")
}

func getExportEndpointFromAnnotations(annotations map[string]string) bool {
exportEndpointAnnotation, exists := annotations[exportEndpointAnnotationKey]
return exists && exportEndpointAnnotation == "true"
}

func getAliasFromAnnotations(annotations map[string]string) bool {
aliasAnnotation, exists := annotations[aliasAnnotationKey]
return exists && aliasAnnotation == "true"
Expand Down