Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-9467, NET-9468] Add partitions + exportedServices funcs #1940

Merged
merged 11 commits into from
May 28, 2024
Merged
2 changes: 1 addition & 1 deletion dependency/catalog_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
// Ensure implements
_ Dependency = (*CatalogServicesQuery)(nil)

// CatalogServicesQueryRe is the regular expression to use for CatalogNodesQuery.
// CatalogServicesQueryRe is the regular expression to use for CatalogServicesQuery.
CatalogServicesQueryRe = regexp.MustCompile(`\A` + queryRe + dcRe + `\z`)
)

Expand Down
143 changes: 143 additions & 0 deletions dependency/consul_exported_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package dependency

import (
"log"
"net/url"
"slices"
"time"

capi "github.com/hashicorp/consul/api"
"github.com/pkg/errors"
)

const (
exportedServicesEndpointLabel = "list.exportedServices"

// ListExportedServicesQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
ListExportedServicesQuerySleepTime = 15 * time.Second
)

// Ensure implements
var _ Dependency = (*ListExportedServicesQuery)(nil)

// ListExportedServicesQuery is the representation of a requested exported services
// dependency from inside a template.
type ListExportedServicesQuery struct {
stopCh chan struct{}
partition string
}

type ExportedService struct {
// Name of the service
Service string

// Partition of the service
Partition string

// Namespace of the service
Namespace string

// Consumers is a list of downstream consumers of the service.
Consumers ResolvedConsumers
}

type ResolvedConsumers struct {
Peers []string
Partitions []string
}

func fromConsulExportedService(svc capi.ResolvedExportedService) ExportedService {
return ExportedService{
Service: svc.Service,
Consumers: ResolvedConsumers{
Peers: slices.Clone(svc.Consumers.Peers),
Partitions: slices.Clone(svc.Consumers.Partitions),
},
}
}

// NewListExportedServicesQuery parses a string of the format @dc.
func NewListExportedServicesQuery(s string) (*ListExportedServicesQuery, error) {
return &ListExportedServicesQuery{
stopCh: make(chan struct{}, 1),
jm96441n marked this conversation as resolved.
Show resolved Hide resolved
partition: s,
}, nil
}

func (c *ListExportedServicesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
opts = opts.Merge(&QueryOptions{
ConsulPartition: c.partition,
})

log.Printf("[TRACE] %s: GET %s", c, &url.URL{
Path: "/v1/exported-services",
RawQuery: opts.String(),
})

// This is certainly not elegant, but the partitions endpoint does not support
// blocking queries, so we are going to "fake it until we make it". When we
// first query, the LastIndex will be "0", meaning we should immediately
// return data, but future calls will include a LastIndex. If we have a
// LastIndex in the query metadata, sleep for 15 seconds before asking Consul
// again.
//
jm96441n marked this conversation as resolved.
Show resolved Hide resolved
// This is probably okay given the frequency in which partitions actually
// change, but is technically not edge-triggering.
if opts.WaitIndex != 0 {
log.Printf("[TRACE] %s: long polling for %s", c, ListExportedServicesQuerySleepTime)

select {
case <-c.stopCh:
return nil, nil, ErrStopped
case <-time.After(ListExportedServicesQuerySleepTime):
}
}

// TODO Consider using a proper context
consulExportedServices, qm, err := clients.Consul().ExportedServices(opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrapf(err, c.String())
}

exportedServices := make([]ExportedService, 0, len(consulExportedServices))
for _, svc := range consulExportedServices {
exportedServices = append(exportedServices, fromConsulExportedService(svc))
}

log.Printf("[TRACE] %s: returned %d results", c, len(exportedServices))

slices.SortStableFunc(exportedServices, func(i, j ExportedService) int {
if i.Service < j.Service {
return -1
} else if i.Service > j.Service {
return 1
}
return 0
})

rm := &ResponseMetadata{
LastContact: qm.LastContact,
LastIndex: qm.LastIndex,
}

return exportedServices, rm, nil
}

// CanShare returns if this dependency is shareable.
// TODO What is this?
jm96441n marked this conversation as resolved.
Show resolved Hide resolved
func (c *ListExportedServicesQuery) CanShare() bool {
return true
}

func (c *ListExportedServicesQuery) String() string {
return exportedServicesEndpointLabel
}

func (c *ListExportedServicesQuery) Stop() {
close(c.stopCh)
}

func (c *ListExportedServicesQuery) Type() Type {
return TypeConsul
}
112 changes: 112 additions & 0 deletions dependency/consul_partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dependency

import (
"context"
"log"
"net/url"
"slices"
"strings"
"time"

"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
)

var (
// Ensure implements
_ Dependency = (*ListPartitionsQuery)(nil)

// ListPartitionsQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
ListPartitionsQuerySleepTime = 15 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] wonder if we want to create just one LongPollingSleepTime set to 15 seconds for use across all of the non blocking query endpoints?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we kinda can, we do some re-assigning of these within tests and I'm not 100% on how that will play out if these tests run in parallel (since we could get a concurrent read and write) what I did though was move this to a DefaultNonBlockingQuerySleepTime constant and assign the individual sleep times to that const

)

// Partition is a partition in Consul.
type Partition struct {
Name string
Description string
}

// ListPartitionsQuery is the representation of a requested partitions
// dependency from inside a template.
type ListPartitionsQuery struct {
stopCh chan struct{}
}

func NewListPartitionsQuery() (*ListPartitionsQuery, error) {
return &ListPartitionsQuery{
stopCh: make(chan struct{}, 1),
}, nil
}

func (c *ListPartitionsQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
opts = opts.Merge(&QueryOptions{})

log.Printf("[TRACE] %s: GET %s", c, &url.URL{
Path: "/v1/partitions",
RawQuery: opts.String(),
})

// This is certainly not elegant, but the partitions endpoint does not support
// blocking queries, so we are going to "fake it until we make it". When we
// first query, the LastIndex will be "0", meaning we should immediately
// return data, but future calls will include a LastIndex. If we have a
// LastIndex in the query metadata, sleep for 15 seconds before asking Consul
// again.
//
// This is probably okay given the frequency in which partitions actually
// change, but is technically not edge-triggering.
if opts.WaitIndex != 0 {
log.Printf("[TRACE] %s: long polling for %s", c, ListPartitionsQuerySleepTime)

select {
case <-c.stopCh:
return nil, nil, ErrStopped
case <-time.After(ListPartitionsQuerySleepTime):
}
}

// TODO Consider using a proper context
jm96441n marked this conversation as resolved.
Show resolved Hide resolved
partitions, _, err := clients.Consul().Partitions().List(context.Background(), opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrapf(err, c.String())
}

log.Printf("[TRACE] %s: returned %d results", c, len(partitions))

slices.SortFunc(partitions, func(i, j *api.Partition) int {
return strings.Compare(i.Name, j.Name)
})

resp := []*Partition{}
for _, partition := range partitions {
if partition != nil {
resp = append(resp, &Partition{
Name: partition.Name,
Description: partition.Description,
})
}
}

// Use respWithMetadata which always increments LastIndex and results
// in fetching new data for endpoints that don't support blocking queries
return respWithMetadata(resp)
}

// CanShare returns if this dependency is shareable.
// TODO What is this?
jm96441n marked this conversation as resolved.
Show resolved Hide resolved
func (c *ListPartitionsQuery) CanShare() bool {
return true
}

func (c *ListPartitionsQuery) String() string {
return "list.partitions"
}

func (c *ListPartitionsQuery) Stop() {
close(c.stopCh)
}

func (c *ListPartitionsQuery) Type() Type {
return TypeConsul
}
10 changes: 10 additions & 0 deletions docs/templating-language.md
nathancoleman marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ provides the following functions:
* [`safeLs`](#safels)
* [`node`](#node)
* [`nodes`](#nodes)
* [`partitions`](#partitions)
* [`peerings`](#peerings)
* [`secret`](#secret)
+ [Format](#format)
Expand Down Expand Up @@ -506,6 +507,15 @@ To query a different data center and order by shortest trip time to ourselves:
To access map data such as `TaggedAddresses` or `Meta`, use
[Go's text/template][text-template] map indexing.

### `partitions`

Query [Consul][consul] for all partitions.

```golang
{{ range partitions }}
{{ . }}{{ end }}
```
nathancoleman marked this conversation as resolved.
Show resolved Hide resolved

### `peerings`

Query [Consul][consul] for all peerings.
Expand Down
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module github.com/hashicorp/consul-template

go 1.21
go 1.22

require (
github.com/BurntSushi/toml v1.3.2
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/hashicorp/consul/api v1.26.1
github.com/hashicorp/consul/api v1.28.3
github.com/hashicorp/consul/sdk v0.15.0
github.com/hashicorp/go-gatedio v0.5.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-rootcerts v1.0.2
github.com/hashicorp/go-sockaddr v1.0.6
Expand All @@ -25,14 +25,14 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.19.0
golang.org/x/sys v0.20.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/hashicorp/vault/api/auth/kubernetes v0.5.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/text v0.14.0
)

Expand All @@ -41,10 +41,11 @@ require (
github.com/Masterminds/semver/v3 v3.2.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/consul/proto-public v0.6.1 // indirect
github.com/hashicorp/cronexpr v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand All @@ -54,10 +55,10 @@ require (
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
Expand Down
Loading