Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] Workload ID: Add WorkloadIdentity local service and cache config (#49942) #49990

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/auth/accesspoint/accesspoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Config struct {
Users services.UsersService
WebSession types.WebSessionInterface
WebToken types.WebTokenInterface
WorkloadIdentity cache.WorkloadIdentityReader
WindowsDesktops services.WindowsDesktops
AutoUpdateService services.AutoUpdateServiceGetter
}
Expand Down Expand Up @@ -198,6 +199,7 @@ func NewCache(cfg Config) (*cache.Cache, error) {
Users: cfg.Users,
WebSession: cfg.WebSession,
WebToken: cfg.WebToken,
WorkloadIdentity: cfg.WorkloadIdentity,
WindowsDesktops: cfg.WindowsDesktops,
}

Expand Down
9 changes: 9 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,13 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
return nil, trace.Wrap(err, "creating SPIFFEFederation service")
}
}
if cfg.WorkloadIdentity == nil {
workloadIdentity, err := local.NewWorkloadIdentityService(cfg.Backend)
if err != nil {
return nil, trace.Wrap(err, "creating WorkloadIdentity service")
}
cfg.WorkloadIdentity = workloadIdentity
}

limiter, err := limiter.NewConnectionsLimiter(limiter.Config{
MaxConnections: defaults.LimiterMaxConcurrentSignatures,
Expand Down Expand Up @@ -455,6 +462,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
BotInstance: cfg.BotInstance,
SPIFFEFederations: cfg.SPIFFEFederations,
StaticHostUser: cfg.StaticHostUsers,
WorkloadIdentities: cfg.WorkloadIdentity,
}

as := Server{
Expand Down Expand Up @@ -668,6 +676,7 @@ type Services struct {
services.BotInstance
services.StaticHostUser
services.AutoUpdateService
services.WorkloadIdentities
}

// GetWebSession returns existing web session described by req.
Expand Down
7 changes: 7 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -1211,6 +1212,12 @@ type Cache interface {
// GetAccessGraphSettings returns the access graph settings.
GetAccessGraphSettings(context.Context) (*clusterconfigpb.AccessGraphSettings, error)

// GetWorkloadIdentity gets a WorkloadIdentity by name.
GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error)
// ListWorkloadIdentities lists all SPIFFE Federations using Google style
// pagination.
ListWorkloadIdentities(ctx context.Context, pageSize int, lastToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error)

// ListStaticHostUsers lists static host users.
ListStaticHostUsers(ctx context.Context, pageSize int, startKey string) ([]*userprovisioningpb.StaticHostUser, string, error)
// GetStaticHostUser returns a static host user by name.
Expand Down
1 change: 1 addition & 0 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
SecReports: svces.SecReports,
SnowflakeSession: svces.Identity,
SPIFFEFederations: svces.SPIFFEFederations,
WorkloadIdentity: svces.WorkloadIdentities,
StaticHostUsers: svces.StaticHostUser,
Trust: svces.TrustInternal,
UserGroups: svces.UserGroups,
Expand Down
4 changes: 4 additions & 0 deletions lib/auth/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ type InitConfig struct {
// SPIFFEFederations is a service that manages storing SPIFFE federations.
SPIFFEFederations services.SPIFFEFederations

// WorkloadIdentity is the service for storing and retrieving
// WorkloadIdentity resources.
WorkloadIdentity services.WorkloadIdentities

// StaticHostUsers is a service that manages host users that should be
// created on SSH nodes.
StaticHostUsers services.StaticHostUser
Expand Down
12 changes: 12 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindUserTask},
{Kind: types.KindAutoUpdateVersion},
{Kind: types.KindAutoUpdateConfig},
{Kind: types.KindWorkloadIdentity},
}
cfg.QueueSize = defaults.AuthQueueSize
// We don't want to enable partial health for auth cache because auth uses an event stream
Expand Down Expand Up @@ -536,6 +537,7 @@ type Cache struct {
accessMontoringRuleCache services.AccessMonitoringRules
spiffeFederationCache spiffeFederationCacher
staticHostUsersCache *local.StaticHostUserService
workloadIdentityCache workloadIdentityCacher

// closed indicates that the cache has been closed
closed atomic.Bool
Expand Down Expand Up @@ -716,6 +718,9 @@ type Config struct {
SPIFFEFederations SPIFFEFederationReader
// StaticHostUsers is the static host user service.
StaticHostUsers services.StaticHostUser
// WorkloadIdentity is the upstream Workload Identities service that we're
// caching
WorkloadIdentity WorkloadIdentityReader
// Backend is a backend for local cache
Backend backend.Backend
// MaxRetryPeriod is the maximum period between cache retries on failures
Expand Down Expand Up @@ -969,6 +974,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

workloadIdentityCache, err := local.NewWorkloadIdentityService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

staticHostUserCache, err := local.NewStaticHostUserService(config.Backend)
if err != nil {
cancel()
Expand Down Expand Up @@ -1019,6 +1030,7 @@ func New(config Config) (*Cache, error) {
kubeWaitingContsCache: kubeWaitingContsCache,
spiffeFederationCache: spiffeFederationCache,
staticHostUsersCache: staticHostUserCache,
workloadIdentityCache: workloadIdentityCache,
Logger: log.WithFields(log.Fields{
teleport.ComponentKey: config.Component,
}),
Expand Down
13 changes: 13 additions & 0 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type testPack struct {
spiffeFederations *local.SPIFFEFederationService
staticHostUsers services.StaticHostUser
autoUpdateService services.AutoUpdateService
workloadIdentity *local.WorkloadIdentityService
}

// testFuncs are functions to support testing an object in a cache.
Expand Down Expand Up @@ -350,6 +351,12 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) {
}
p.spiffeFederations = spiffeFederationsSvc

workloadIdentitySvc, err := local.NewWorkloadIdentityService(p.backend)
if err != nil {
return nil, trace.Wrap(err)
}
p.workloadIdentity = workloadIdentitySvc

databaseObjectsSvc, err := local.NewDatabaseObjectService(p.backend)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -428,6 +435,7 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption)
DatabaseObjects: p.databaseObjects,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -836,6 +844,7 @@ func TestCompletenessInit(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -917,6 +926,7 @@ func TestCompletenessReset(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -1124,6 +1134,7 @@ func TestListResources_NodesTTLVariant(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
neverOK: true, // ensure reads are never healthy
Expand Down Expand Up @@ -1216,6 +1227,7 @@ func initStrategy(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -3454,6 +3466,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) {
types.KindUserTask: types.Resource153ToLegacy(newUserTasks(t)),
types.KindAutoUpdateConfig: types.Resource153ToLegacy(newAutoUpdateConfig(t)),
types.KindAutoUpdateVersion: types.Resource153ToLegacy(newAutoUpdateVersion(t)),
types.KindWorkloadIdentity: types.Resource153ToLegacy(newWorkloadIdentity("some_identifier")),
}

for name, cfg := range cases {
Expand Down
11 changes: 11 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -265,6 +266,7 @@ type cacheCollections struct {
spiffeFederations collectionReader[SPIFFEFederationReader]
autoUpdateConfigs collectionReader[autoUpdateConfigGetter]
autoUpdateVersions collectionReader[autoUpdateVersionGetter]
workloadIdentity collectionReader[WorkloadIdentityReader]
}

// setupCollections returns a registry of collections.
Expand Down Expand Up @@ -784,6 +786,15 @@ func setupCollections(c *Cache, watches []types.WatchKind) (*cacheCollections, e
watch: watch,
}
collections.byKind[resourceKind] = collections.accessGraphSettings
case types.KindWorkloadIdentity:
if c.Config.WorkloadIdentity == nil {
return nil, trace.BadParameter("missing parameter WorkloadIdentity")
}
collections.workloadIdentity = &genericCollection[*workloadidentityv1pb.WorkloadIdentity, WorkloadIdentityReader, workloadIdentityExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.workloadIdentity
case types.KindAutoUpdateConfig:
if c.AutoUpdateService == nil {
return nil, trace.BadParameter("missing parameter AutoUpdateService")
Expand Down
119 changes: 119 additions & 0 deletions lib/cache/resource_workload_identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

//nolint:unused // Because the executors generate a large amount of false positives.
package cache

import (
"context"

"github.com/gravitational/trace"

workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
)

// WorkloadIdentityReader is an interface that defines the methods for getting
// WorkloadIdentity. This is returned as the reader for the WorkloadIdentity
// collection but is also used by the executor to read the full list of
// WorkloadIdentity on initialization.
type WorkloadIdentityReader interface {
ListWorkloadIdentities(ctx context.Context, pageSize int, nextToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error)
GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error)
}

// workloadIdentityCacher is used for storing and retrieving WorkloadIdentity
// from the cache's local backend.
type workloadIdentityCacher interface {
WorkloadIdentityReader
UpsertWorkloadIdentity(ctx context.Context, resource *workloadidentityv1pb.WorkloadIdentity) (*workloadidentityv1pb.WorkloadIdentity, error)
DeleteWorkloadIdentity(ctx context.Context, name string) error
DeleteAllWorkloadIdentities(ctx context.Context) error
}

type workloadIdentityExecutor struct{}

var _ executor[*workloadidentityv1pb.WorkloadIdentity, WorkloadIdentityReader] = workloadIdentityExecutor{}

func (workloadIdentityExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*workloadidentityv1pb.WorkloadIdentity, error) {
var out []*workloadidentityv1pb.WorkloadIdentity
var nextToken string
for {
var page []*workloadidentityv1pb.WorkloadIdentity
var err error

const defaultPageSize = 0
page, nextToken, err = cache.Config.WorkloadIdentity.ListWorkloadIdentities(ctx, defaultPageSize, nextToken)
if err != nil {
return nil, trace.Wrap(err)
}
out = append(out, page...)
if nextToken == "" {
break
}
}
return out, nil
}

func (workloadIdentityExecutor) upsert(ctx context.Context, cache *Cache, resource *workloadidentityv1pb.WorkloadIdentity) error {
_, err := cache.workloadIdentityCache.UpsertWorkloadIdentity(ctx, resource)
return trace.Wrap(err)
}

func (workloadIdentityExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return trace.Wrap(cache.workloadIdentityCache.DeleteAllWorkloadIdentities(ctx))
}

func (workloadIdentityExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return trace.Wrap(cache.workloadIdentityCache.DeleteWorkloadIdentity(ctx, resource.GetName()))
}

func (workloadIdentityExecutor) isSingleton() bool { return false }

func (workloadIdentityExecutor) getReader(cache *Cache, cacheOK bool) WorkloadIdentityReader {
if cacheOK {
return cache.workloadIdentityCache
}
return cache.Config.WorkloadIdentity
}

// ListWorkloadIdentities returns a paginated list of WorkloadIdentity resources.
func (c *Cache) ListWorkloadIdentities(ctx context.Context, pageSize int, nextToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListWorkloadIdentities")
defer span.End()

rg, err := readCollectionCache(c, c.collections.workloadIdentity)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
out, nextKey, err := rg.reader.ListWorkloadIdentities(ctx, pageSize, nextToken)
return out, nextKey, trace.Wrap(err)
}

// GetWorkloadIdentity returns a single WorkloadIdentity by name
func (c *Cache) GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetWorkloadIdentity")
defer span.End()

rg, err := readCollectionCache(c, c.collections.workloadIdentity)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
out, err := rg.reader.GetWorkloadIdentity(ctx, name)
return out, trace.Wrap(err)
}
Loading
Loading