Skip to content

Commit

Permalink
query, rule: make endpoint discovery dynamically reloadable
Browse files Browse the repository at this point in the history
* Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...)
* Removed --store.sd-file and --store.sd-interval flags
* Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file
* Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup

The new config makes it possible to also set "strict" and "group" flags on the endpoint instead
of only their addresses, making it possible to have file based service discovery for endpoint groups too.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Nov 10, 2024
1 parent bfbabbb commit f08f79e
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 524 deletions.
38 changes: 38 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ import (

"github.com/KimMachineGun/automemlimit/memlimit"
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extgrpc/snappy"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/shipper"
)
Expand Down Expand Up @@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig {
return gc
}

type grpcClientConfig struct {
secure bool
skipVerify bool
cert, key, caCert string
serverName string
compression string
}

func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig {
cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure)
cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify)
cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert)
cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key)
cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert)
cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName)
compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ")
cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone)

return gc
}

func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) {
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName)
if err != nil {
return nil, errors.Wrapf(err, "building gRPC client")
}
if gc.compression != compressionNone {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression)))
}
return dialOpts, nil
}

type httpConfig struct {
bindAddress string
tlsConfig string
Expand Down
287 changes: 287 additions & 0 deletions cmd/thanos/endpointset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"context"
"fmt"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/errors"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/runutil"
)

type EndpointSpec struct {
Strict bool `yaml:"strict"`
Group bool `yaml:"group"`
Address string `yaml:"address"`
}

type EndpointConfig struct {
Endpoints []EndpointSpec `yaml:"endpoints"`
}

type endpointConfigProvider struct {
mu sync.Mutex
cfg EndpointConfig

// statically defined endpoints from flags for backwards compatibility
endpoints []string
endpointGroups []string
strictEndpoints []string
strictEndpointGroups []string
}

func (er *endpointConfigProvider) config() EndpointConfig {
er.mu.Lock()
defer er.mu.Unlock()

res := EndpointConfig{Endpoints: make([]EndpointSpec, len(er.cfg.Endpoints))}
copy(res.Endpoints, er.cfg.Endpoints)
return res
}

func (er *endpointConfigProvider) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) {
content, err := configFile.Content()
if err != nil {
return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path())
}
var cfg EndpointConfig
if err := yaml.Unmarshal(content, &cfg); err != nil {
return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path())
}
return cfg, nil
}

func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) {
for _, e := range er.endpoints {
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{
Address: e,
})
}
for _, e := range er.endpointGroups {
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{
Address: e,
Group: true,
})
}
for _, e := range er.strictEndpoints {
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{
Address: e,
Strict: true,
})
}
for _, e := range er.strictEndpointGroups {
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{
Address: e,
Group: true,
Strict: true,
})
}
}

func validateEndpointConfig(cfg EndpointConfig) error {
for _, ecfg := range cfg.Endpoints {
if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict {
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address)
}
if dns.IsDynamicNode(ecfg.Address) && ecfg.Group {
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address)
}
}
return nil
}

func newEndpointConfigProvider(
logger log.Logger,
configFile *extflag.PathOrContent,
configReloadInterval time.Duration,
endpoints []string,
endpointGroups []string,
strictEndpoints []string,
strictEndpointGroups []string,
) (*endpointConfigProvider, error) {
res := &endpointConfigProvider{
endpoints: endpoints,
endpointGroups: endpointGroups,
strictEndpoints: strictEndpoints,
strictEndpointGroups: strictEndpointGroups,
}

cfg, err := res.parse(configFile)
if err != nil {
return nil, errors.Wrapf(err, "unable to load config file")
}
res.addStaticEndpoints(&cfg)
res.cfg = cfg

// only static endpoints
if len(configFile.Path()) == 0 {
return res, nil
}

if err := extkingpin.PathContentReloader(context.Background(), configFile, logger, func() {
res.mu.Lock()
defer res.mu.Unlock()

level.Info(logger).Log("msg", "reloading endpoint config")
cfg, err := res.parse(configFile)
if err != nil {
level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err)
return
}
res.addStaticEndpoints(&cfg)
if err := validateEndpointConfig(cfg); err != nil {
level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err)
return
}
res.cfg = cfg
}, configReloadInterval); err != nil {
return nil, errors.Wrapf(err, "unable to create config reloader")
}
return res, nil
}

func setupEndpointset(
g *run.Group,
reg prometheus.Registerer,
logger log.Logger,
configFile *extflag.PathOrContent,
configReloadInterval time.Duration,
endpoints []string,
endpointGroups []string,
strictEndpoints []string,
strictEndpointGroups []string,
dnsSDResolver string,
dnsSDInterval time.Duration,
unhealthyTimeout time.Duration,
endpointTimeout time.Duration,
dialOpts []grpc.DialOption,
queryConnMetricLabels ...string,
) (*query.EndpointSet, error) {
configProvider, err := newEndpointConfigProvider(
logger,
configFile,
configReloadInterval,
endpoints,
endpointGroups,
strictEndpoints,
strictEndpointGroups,
)
if err != nil {
return nil, errors.Wrapf(err, "unable to load config initially")
}
// Register resolver for the "thanos:///" scheme for endpoint-groups
dns.RegisterGRPCResolver(
logger,
dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
)
dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
dns.ResolverType(dnsSDResolver),
)
duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_query_duplicated_store_addresses_total",
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
})

removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec {
set := make(map[string]*query.GRPCEndpointSpec)
for _, spec := range specs {
addr := spec.Addr()
if _, ok := set[addr]; ok {
level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr)
duplicatedStores.Inc()
}
set[addr] = spec
}
deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set))
for _, value := range set {
deduplicated = append(deduplicated, value)
}
return deduplicated
}

ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error {
ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval)
defer cancelUpdateIter()

endpointConfig := configProvider.config()

addresses := make([]string, 0, len(endpointConfig.Endpoints))
for _, ecfg := range endpointConfig.Endpoints {
if addr := ecfg.Address; dns.IsDynamicNode(addr) {
addresses = append(addresses, addr)
}
}
if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err)
}
return nil
})
}, func(error) {
cancelDNSUpdate()
})

endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec {
endpointConfig := configProvider.config()

specs := make([]*query.GRPCEndpointSpec, 0)
// add static nodes
for _, ecfg := range endpointConfig.Endpoints {
strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address
if dns.IsDynamicNode(addr) {
continue
}
if group {
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...))
} else {
specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...))
}
}
// add dynamic nodes
for _, addr := range dnsEndpointProvider.Addresses() {
specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...))
}
return removeDuplicateEndpointSpecs(specs)
}, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...)

ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(endpointTimeout, ctxEndpointUpdate.Done(), func() error {
ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, endpointTimeout)
defer cancelIter()

endpointset.Update(ctxIter)
return nil
})
}, func(error) {
cancelEndpointUpdate()
})

return endpointset, nil
}
Loading

0 comments on commit f08f79e

Please sign in to comment.