From e7a1c80fee45b559dfae9dbba46c5ed33116b0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20=C3=98stergaard=20Jensen?= Date: Tue, 24 Sep 2024 10:35:06 +0200 Subject: [PATCH] Use the External DNS webhook server implementation This enables us to drop our own server implementation. --- cmd/webhook/main.go | 12 +- cmd/webhook/provider.go | 2 +- cmd/webhook/server.go | 16 -- cmd/webhook/webhook.go | 165 ------------------ vendor/modules.txt | 1 + .../provider/webhook/api/httpapi.go | 146 ++++++++++++++++ 6 files changed, 152 insertions(+), 190 deletions(-) delete mode 100644 cmd/webhook/webhook.go create mode 100644 vendor/sigs.k8s.io/external-dns/provider/webhook/api/httpapi.go diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index f135faf..62f4598 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -28,11 +28,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/sdk/metric" + "sigs.k8s.io/external-dns/provider/webhook/api" ) func main() { tidyEndpoint := flag.String("tidydns-endpoint", "", "DNS server address") logLevel := flag.String("log-level", "", "logging level (debug, info, warn, err)") + readTimeout := flag.Duration("read-timeout", (5 * time.Second), "The read timeout in duration format (default: 5s)") + writeTimeout := flag.Duration("write-timeout", (10 * time.Second), "The write timeout in duration format (default: 10s)") zoneArgDescription := "The intercval at which to update zone information format 00h00m00s e.g. 1h32m" zoneUpdateIntervalArg := flag.String("zone-update-interval", "10m", zoneArgDescription) @@ -96,15 +99,8 @@ func main() { panic(err.Error()) } - // Use the provider to make a webhook containing all the callable endpoints - webhook := newWebhook(provider) - // Start webserver to service requests from External-DNS - go func() { - if err = serveWebhook(webhook, "127.0.0.1:8888"); err != nil { - panic(err.Error()) - } - }() + go api.StartHTTPApi(provider, nil, *readTimeout, *writeTimeout, "127.0.0.1:8888") metricsHandler := promhttp.Handler() diff --git a/cmd/webhook/provider.go b/cmd/webhook/provider.go index 18975dd..eded37d 100644 --- a/cmd/webhook/provider.go +++ b/cmd/webhook/provider.go @@ -53,7 +53,7 @@ func newProvider(tidy tidydns.TidyDNSClient, zoneProvider ZoneProvider) (*tidyPr } // Get list of zones from Tidy and return a domain filter based on them. -func (p *tidyProvider) GetDomainFilter() endpoint.DomainFilter { +func (p *tidyProvider) GetDomainFilter() endpoint.DomainFilterInterface { // Make list of all zone names zoneNames := []string{} for _, zone := range p.zoneProvider.getZones() { diff --git a/cmd/webhook/server.go b/cmd/webhook/server.go index bd20f82..372694f 100644 --- a/cmd/webhook/server.go +++ b/cmd/webhook/server.go @@ -24,22 +24,6 @@ import ( type Samples []metrics.Sample -func serveWebhook(wh webhook, addr string) error { - slog.Debug("start webhook server on " + addr) - mux := http.NewServeMux() - mux.HandleFunc("GET /", wh.negociate) - mux.HandleFunc("GET /records", wh.getRecords) - mux.HandleFunc("POST /adjustendpoints", wh.adjustEndpoints) - mux.HandleFunc("POST /records", wh.applyChanges) - - server := http.Server{ - Addr: addr, - Handler: mux, - } - - return server.ListenAndServe() -} - func serveExposed(addr string, metricsHandler http.Handler) error { slog.Debug("start webhook server on " + addr) mux := http.NewServeMux() diff --git a/cmd/webhook/webhook.go b/cmd/webhook/webhook.go deleted file mode 100644 index 443ebbc..0000000 --- a/cmd/webhook/webhook.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2024 Netic A/S. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "encoding/json" - "io" - "log/slog" - "net/http" - - "sigs.k8s.io/external-dns/endpoint" - "sigs.k8s.io/external-dns/plan" -) - -type webhook interface { - negociate(w http.ResponseWriter, req *http.Request) - getRecords(w http.ResponseWriter, req *http.Request) - adjustEndpoints(w http.ResponseWriter, req *http.Request) - applyChanges(w http.ResponseWriter, req *http.Request) -} - -type tidyWebhook struct { - provider *tidyProvider -} - -const ( - headerKey = "Content-Type" - headerValue = "application/external.dns.webhook+json;version=1" -) - -func newWebhook(p *tidyProvider) webhook { - return &tidyWebhook{p} -} - -// Return list of domainfilters -func (wh *tidyWebhook) negociate(w http.ResponseWriter, req *http.Request) { - w.Header().Set(headerKey, headerValue) - - // Encode response - resp, err := wh.provider.GetDomainFilter().MarshalJSON() - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Write(resp) -} - -// Return list of all records using the External-DNS Endpoint list format -func (wh *tidyWebhook) getRecords(w http.ResponseWriter, req *http.Request) { - w.Header().Set(headerKey, headerValue) - - // Get all tidy endpoints - endpoints, err := wh.provider.Records(context.Background()) - if err != nil { - slog.Error(err.Error()) - return - } - - // encode response - resp, err := json.Marshal(endpoints) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Write(resp) -} - -// Recieve a list of proposed endpoints, including endpoints that will later be -// filtered out by the domainfilter, and modify them so they are consumable to -// TidyDNS before returning them. This is to inform External-DNS how the records -// will look when saved so they can be checked for correctness. -func (wh *tidyWebhook) adjustEndpoints(w http.ResponseWriter, req *http.Request) { - w.Header().Set(headerKey, headerValue) - - // Read request - msg, err := io.ReadAll(req.Body) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // Map request body to endpoint list - endpoints := []*endpoint.Endpoint{} - if err = json.Unmarshal(msg, &endpoints); err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // Process request - adjustedEndpoints, err := wh.provider.AdjustEndpoints(endpoints) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // encode response - resp, err := json.Marshal(adjustedEndpoints) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Write(resp) -} - -// Consume a struct with 4 lists. Endpoints to create and delete, and a 2 lists -// representing changes to endpoints. The two changes lists are of equal length -// and represent the before and after spec of each endpoint to be changed. -func (wh *tidyWebhook) applyChanges(w http.ResponseWriter, req *http.Request) { - w.Header().Set(headerKey, headerValue) - - // Read request - msg, err := io.ReadAll(req.Body) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // Map request body to endpoint list - changes := &plan.Changes{} - if err = json.Unmarshal(msg, changes); err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // Process request - err = wh.provider.ApplyChanges(context.Background(), changes) - if err != nil { - slog.Error(err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // The expected return code and content is left undocumented by External-DNS - // at this time but - // https://github.com/kubernetes-sigs/external-dns/blob/9fb831e97f77b31789df8d837e93f36a6e785562/provider/webhook/webhook.go#L229 - // reveals that it excepts an empty return with code 204 (no content) when - // calling POST /records - w.WriteHeader(http.StatusNoContent) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0bb0b0a..3757c9c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -246,6 +246,7 @@ k8s.io/utils/strings/slices sigs.k8s.io/external-dns/endpoint sigs.k8s.io/external-dns/plan sigs.k8s.io/external-dns/provider +sigs.k8s.io/external-dns/provider/webhook/api # sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd ## explicit; go 1.18 sigs.k8s.io/json diff --git a/vendor/sigs.k8s.io/external-dns/provider/webhook/api/httpapi.go b/vendor/sigs.k8s.io/external-dns/provider/webhook/api/httpapi.go new file mode 100644 index 0000000..717fee7 --- /dev/null +++ b/vendor/sigs.k8s.io/external-dns/provider/webhook/api/httpapi.go @@ -0,0 +1,146 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "context" + "encoding/json" + "net" + "net/http" + "time" + + "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/plan" + "sigs.k8s.io/external-dns/provider" + + log "github.com/sirupsen/logrus" +) + +const ( + MediaTypeFormatAndVersion = "application/external.dns.webhook+json;version=1" + ContentTypeHeader = "Content-Type" +) + +type WebhookServer struct { + Provider provider.Provider +} + +func (p *WebhookServer) RecordsHandler(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + records, err := p.Provider.Records(context.Background()) + if err != nil { + log.Errorf("Failed to get Records: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set(ContentTypeHeader, MediaTypeFormatAndVersion) + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(records); err != nil { + log.Errorf("Failed to encode records: %v", err) + } + return + case http.MethodPost: + var changes plan.Changes + if err := json.NewDecoder(req.Body).Decode(&changes); err != nil { + log.Errorf("Failed to decode changes: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + err := p.Provider.ApplyChanges(context.Background(), &changes) + if err != nil { + log.Errorf("Failed to apply changes: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + return + default: + log.Errorf("Unsupported method %s", req.Method) + w.WriteHeader(http.StatusBadRequest) + } +} + +func (p *WebhookServer) AdjustEndpointsHandler(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + log.Errorf("Unsupported method %s", req.Method) + w.WriteHeader(http.StatusBadRequest) + return + } + + pve := []*endpoint.Endpoint{} + if err := json.NewDecoder(req.Body).Decode(&pve); err != nil { + log.Errorf("Failed to decode in adjustEndpointsHandler: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set(ContentTypeHeader, MediaTypeFormatAndVersion) + pve, err := p.Provider.AdjustEndpoints(pve) + if err != nil { + log.Errorf("Failed to call adjust endpoints: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } + if err := json.NewEncoder(w).Encode(&pve); err != nil { + log.Errorf("Failed to encode in adjustEndpointsHandler: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (p *WebhookServer) NegotiateHandler(w http.ResponseWriter, req *http.Request) { + w.Header().Set(ContentTypeHeader, MediaTypeFormatAndVersion) + json.NewEncoder(w).Encode(p.Provider.GetDomainFilter()) +} + +// StartHTTPApi starts a HTTP server given any provider. +// the function takes an optional channel as input which is used to signal that the server has started. +// The server will listen on port `providerPort`. +// The server will respond to the following endpoints: +// - / (GET): initialization, negotiates headers and returns the domain filter +// - /records (GET): returns the current records +// - /records (POST): applies the changes +// - /adjustendpoints (POST): executes the AdjustEndpoints method +func StartHTTPApi(provider provider.Provider, startedChan chan struct{}, readTimeout, writeTimeout time.Duration, providerPort string) { + p := WebhookServer{ + Provider: provider, + } + + m := http.NewServeMux() + m.HandleFunc("/", p.NegotiateHandler) + m.HandleFunc("/records", p.RecordsHandler) + m.HandleFunc("/adjustendpoints", p.AdjustEndpointsHandler) + + s := &http.Server{ + Addr: providerPort, + Handler: m, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + } + + l, err := net.Listen("tcp", providerPort) + if err != nil { + log.Fatal(err) + } + + if startedChan != nil { + startedChan <- struct{}{} + } + + if err := s.Serve(l); err != nil { + log.Fatal(err) + } +}