Skip to content

Commit

Permalink
Use cloud anonymization key (#46819)
Browse files Browse the repository at this point in the history
- When present, is used to anonymize events
  • Loading branch information
michellescripts committed Nov 5, 2024
1 parent f04a16e commit 4b9fbc6
Show file tree
Hide file tree
Showing 12 changed files with 1,145 additions and 980 deletions.
1,940 changes: 999 additions & 941 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ message Features {
// NOTE: this flag is used to signal that Access Monitoring is *enabled* on a cluster.
// *Access* to the feature is gated on the `AccessMonitoring` entitlement.
bool AccessMonitoringConfigured = 36;
// CloudAnonymizationKey is a hash of the Salesforce ID used to anonymize usage events
bytes CloudAnonymizationKey = 37 [(gogoproto.jsontag) = "cloud_anonymization_key,omitempty"];
}

// EntitlementInfo is the state and limits of a particular entitlement
Expand Down
15 changes: 11 additions & 4 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,13 +1979,20 @@ func (a *Server) GetClusterID(ctx context.Context, opts ...services.MarshalOptio
}

// GetAnonymizationKey returns the anonymization key that identifies this client.
// It falls back to the cluster ID if the anonymization key is not set in license file.
// The anonymization key may be any of the following, in order of precedence:
// - (Teleport Cloud) a key provided by the Teleport Cloud API
// - a key embedded in the license file
// - the cluster's UUID
func (a *Server) GetAnonymizationKey(ctx context.Context, opts ...services.MarshalOption) (string, error) {
if a.license == nil || len(a.license.AnonymizationKey) == 0 {
return a.GetClusterID(ctx, opts...)
if key := modules.GetModules().Features().CloudAnonymizationKey; len(key) > 0 {
return string(key), nil
}

return string(a.license.AnonymizationKey), nil
if a.license != nil && len(a.license.AnonymizationKey) > 0 {
return string(a.license.AnonymizationKey), nil
}
id, err := a.GetClusterID(ctx, opts...)
return id, trace.Wrap(err)
}

// GetDomainName returns the domain name that identifies this authority server.
Expand Down
62 changes: 62 additions & 0 deletions lib/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4334,6 +4334,68 @@ func TestCleanupNotifications(t *testing.T) {
}, 3*time.Second, 100*time.Millisecond)
}

func TestServer_GetAnonymizationKey(t *testing.T) {
tests := []struct {
name string
testModules *modules.TestModules
license *license.License
want string
errCheck require.ErrorAssertionFunc
}{
{
name: "returns CloudAnonymizationKey if present",
testModules: &modules.TestModules{
TestFeatures: modules.Features{CloudAnonymizationKey: []byte("cloud-key")},
},
license: &license.License{
AnonymizationKey: []byte("license-key"),
},
want: "cloud-key",
errCheck: require.NoError,
},
{
name: "Returns license AnonymizationKey if no Cloud Key is present",
testModules: &modules.TestModules{},
license: &license.License{
AnonymizationKey: []byte("license-key"),
},
want: "license-key",
errCheck: require.NoError,
},
{
name: "Returns clusterID if no cloud key nor license key is present",
testModules: &modules.TestModules{},
license: &license.License{},
want: "cluster-id",
errCheck: require.NoError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testAuthServer, err := NewTestAuthServer(TestAuthServerConfig{
Dir: t.TempDir(),
Clock: clockwork.NewFakeClock(),
ClusterID: "cluster-id",
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, testAuthServer.Close()) })

testTLSServer, err := testAuthServer.NewTestTLSServer()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, testTLSServer.Close()) })

modules.SetTestModules(t, tt.testModules)

testTLSServer.AuthServer.AuthServer.SetLicense(tt.license)

got, err := testTLSServer.AuthServer.AuthServer.GetAnonymizationKey(context.Background())
tt.errCheck(t, err)
require.Equal(t, tt.want, got)
})
}
}

func newUserNotificationWithExpiry(t *testing.T, username string, title string, expires *timestamppb.Timestamp) *notificationsv1.Notification {
t.Helper()

Expand Down
3 changes: 3 additions & 0 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ import (
type TestAuthServerConfig struct {
// ClusterName is cluster name
ClusterName string
// ClusterID is the cluster ID; optional - sets to random UUID string if not present
ClusterID string
// Dir is directory for local backend
Dir string
// AcceptedUsage is an optional list of restricted
Expand Down Expand Up @@ -285,6 +287,7 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {

clusterName, err := services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{
ClusterName: cfg.ClusterName,
ClusterID: cfg.ClusterID,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
5 changes: 5 additions & 0 deletions lib/modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Features struct {
SupportType proto.SupportType
// Entitlements reflect Cloud Entitlements including access and limits
Entitlements map[entitlements.EntitlementKind]EntitlementInfo
// CloudAnonymizationKey is the key used to anonymize usage events in a cluster.
// Only applicable for Cloud customers (self-hosted clusters get their anonymization key from the
// license file).
CloudAnonymizationKey []byte

// todo (michellescripts) have the following fields evaluated for deprecation, consolidation, or fetch from Cloud
// AdvancedAccessWorkflows is currently set to the value of the Cloud Access Requests entitlement
Expand Down Expand Up @@ -125,6 +129,7 @@ func (f Features) ToProto() *proto.Features {
RecoveryCodes: f.RecoveryCodes,
AccessMonitoringConfigured: f.AccessMonitoringConfigured,
Entitlements: f.EntitlementsToProto(),
CloudAnonymizationKey: f.CloudAnonymizationKey,
}

// remove setLegacyLogic in v18
Expand Down
2 changes: 2 additions & 0 deletions lib/modules/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestFeatures_ToProto(t *testing.T) {
Questionnaire: true,
RecoveryCodes: true,
AccessMonitoringConfigured: false,
CloudAnonymizationKey: []byte("001"),
Entitlements: map[string]*proto.EntitlementInfo{
string(entitlements.AccessLists): {Enabled: true, Limit: 111},
string(entitlements.AccessMonitoring): {Enabled: true, Limit: 2113},
Expand Down Expand Up @@ -180,6 +181,7 @@ func TestFeatures_ToProto(t *testing.T) {
Questionnaire: true,
RecoveryCodes: true,
AccessMonitoringConfigured: false,
CloudAnonymizationKey: []byte("001"),
Entitlements: map[entitlements.EntitlementKind]modules.EntitlementInfo{
entitlements.AccessLists: {Enabled: true, Limit: 111},
entitlements.AccessMonitoring: {Enabled: true, Limit: 2113},
Expand Down
38 changes: 19 additions & 19 deletions lib/usagereporter/teleport/aggregating/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type ReporterConfig struct {
// HostID is the host ID of the current Teleport instance, added to reports
// for auditing purposes. Required.
HostID string
// AnonymizationKey is the key used to anonymize data user or resource names. Optional.
AnonymizationKey string
// Anonymizer is used to anonymize data user or resource names. Required.
Anonymizer utils.Anonymizer
}

// CheckAndSetDefaults checks the [ReporterConfig] for validity, returning nil
Expand All @@ -82,8 +82,8 @@ func (cfg *ReporterConfig) CheckAndSetDefaults() error {
if cfg.HostID == "" {
return trace.BadParameter("missing HostID")
}
if cfg.AnonymizationKey == "" {
return trace.BadParameter("missing AnonymizationKey")
if cfg.Anonymizer == nil {
return trace.BadParameter("missing Anonymizer")
}

if cfg.Logger == nil {
Expand All @@ -100,15 +100,10 @@ func NewReporter(ctx context.Context, cfg ReporterConfig) (*Reporter, error) {
return nil, trace.Wrap(err)
}

anonymizer, err := utils.NewHMACAnonymizer(cfg.AnonymizationKey)
if err != nil {
return nil, trace.Wrap(err)
}

baseCtx, baseCancel := context.WithCancel(ctx)

r := &Reporter{
anonymizer: anonymizer,
anonymizer: cfg.Anonymizer,
svc: reportService{cfg.Backend},
logger: cfg.Logger,
clock: cfg.Clock,
Expand All @@ -117,8 +112,8 @@ func NewReporter(ctx context.Context, cfg ReporterConfig) (*Reporter, error) {
closing: make(chan struct{}),
done: make(chan struct{}),

clusterName: anonymizer.AnonymizeNonEmpty(cfg.ClusterName.GetClusterName()),
hostID: anonymizer.AnonymizeNonEmpty(cfg.HostID),
clusterName: cfg.ClusterName.GetClusterName(),
hostID: cfg.HostID,

baseCancel: baseCancel,
}
Expand All @@ -145,10 +140,10 @@ type Reporter struct {
// done is closed at the end of the background goroutine.
done chan struct{}

// clusterName is the anonymized cluster name.
clusterName []byte
// hostID is the anonymized host ID of the reporter (this instance).
hostID []byte
// clusterName is the un-anonymized cluster name.
clusterName string
// hostID is the un-anonymized host ID of the reporter (this instance).
hostID string

// baseCancel cancels the context used by the background goroutine.
baseCancel context.CancelFunc
Expand Down Expand Up @@ -300,7 +295,6 @@ Ingest:
select {
case <-ticker.Chan():
case ae = <-r.ingest:

case <-ctx.Done():
r.closingOnce.Do(func() { close(r.closing) })
break Ingest
Expand Down Expand Up @@ -407,7 +401,10 @@ func (r *Reporter) persistUserActivity(ctx context.Context, startTime time.Time,
records = append(records, record)
}

reports, err := prepareUserActivityReports(r.clusterName, r.hostID, startTime, records)
anonymizedClusterName := r.anonymizer.AnonymizeNonEmpty(r.clusterName)
anonymizedHostID := r.anonymizer.AnonymizeNonEmpty(r.hostID)

reports, err := prepareUserActivityReports(anonymizedClusterName, anonymizedHostID, startTime, records)
if err != nil {
r.logger.ErrorContext(ctx, "Failed to prepare user activity report, dropping data.",
"start_time", startTime,
Expand Down Expand Up @@ -451,7 +448,10 @@ func (r *Reporter) persistResourcePresence(ctx context.Context, startTime time.T
records = append(records, record)
}

reports, err := prepareResourcePresenceReports(r.clusterName, r.hostID, startTime, records)
anonymizedClusterName := r.anonymizer.AnonymizeNonEmpty(r.clusterName)
anonymizedHostID := r.anonymizer.AnonymizeNonEmpty(r.hostID)

reports, err := prepareResourcePresenceReports(anonymizedClusterName, anonymizedHostID, startTime, records)
if err != nil {
r.logger.ErrorContext(ctx, "Failed to prepare resource presence report, dropping data.", "start_time", startTime, "error", err)
return
Expand Down
14 changes: 9 additions & 5 deletions lib/usagereporter/teleport/aggregating/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/services"
usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport"
"github.com/gravitational/teleport/lib/utils"
)

func TestReporter(t *testing.T) {
Expand Down Expand Up @@ -67,12 +68,15 @@ func TestReporter(t *testing.T) {
})
require.NoError(t, err)

anonymizer, err := utils.NewHMACAnonymizer("0123456789abcdef")
require.NoError(t, err)

r, err := NewReporter(ctx, ReporterConfig{
Backend: bk,
Clock: clk,
ClusterName: clusterName,
HostID: uuid.NewString(),
AnonymizationKey: "0123456789abcdef",
Backend: bk,
Clock: clk,
ClusterName: clusterName,
HostID: uuid.NewString(),
Anonymizer: anonymizer,
})
require.NoError(t, err)

Expand Down
12 changes: 4 additions & 8 deletions lib/usagereporter/teleport/usagereporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,12 @@ func (t *StreamingUsageReporter) Run(ctx context.Context) {

type SubmitFunc = usagereporter.SubmitFunc[prehogv1a.SubmitEventRequest]

func NewStreamingUsageReporter(logger *slog.Logger, clusterName types.ClusterName, anonymizationKey string, submitter SubmitFunc) (*StreamingUsageReporter, error) {
if anonymizationKey == "" {
return nil, trace.BadParameter("anonymization key is required")
}
anonymizer, err := utils.NewHMACAnonymizer(anonymizationKey)
if err != nil {
return nil, trace.Wrap(err)
func NewStreamingUsageReporter(logger *slog.Logger, clusterName types.ClusterName, anonymizer utils.Anonymizer, submitter SubmitFunc) (*StreamingUsageReporter, error) {
if anonymizer == nil {
return nil, trace.BadParameter("missing anonymizer")
}

err = metrics.RegisterPrometheusCollectors(usagereporter.UsagePrometheusCollectors...)
err := metrics.RegisterPrometheusCollectors(usagereporter.UsagePrometheusCollectors...)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
26 changes: 23 additions & 3 deletions lib/utils/anonymizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/sha256"
"encoding/base64"
"strings"
"sync"

"github.com/gravitational/trace"
)
Expand All @@ -38,16 +39,26 @@ type Anonymizer interface {
// AnonymizeNonEmpty anonymizes the given string into bytes if the string is
// nonempty, otherwise returns an empty slice.
AnonymizeNonEmpty(s string) []byte

// SetAnonymizationKey updates the underlying anonymization key.
SetAnonymizationKey(k []byte)
}

// hmacAnonymizer implements anonymization using HMAC
// HMACAnonymizer implements anonymization using HMAC
type HMACAnonymizer struct {
// key is the HMAC key
key []byte
mu sync.RWMutex
}

var _ Anonymizer = (*HMACAnonymizer)(nil)

func (a *HMACAnonymizer) SetAnonymizationKey(k []byte) {
a.mu.Lock()
a.key = k
a.mu.Unlock()
}

// NewHMACAnonymizer returns a new HMAC-based anonymizer
func NewHMACAnonymizer(key string) (*HMACAnonymizer, error) {
if strings.TrimSpace(key) == "" {
Expand All @@ -58,7 +69,11 @@ func NewHMACAnonymizer(key string) (*HMACAnonymizer, error) {

// Anonymize anonymizes the provided data using HMAC
func (a *HMACAnonymizer) Anonymize(data []byte) string {
h := hmac.New(sha256.New, a.key)
a.mu.RLock()
k := a.key
a.mu.RUnlock()

h := hmac.New(sha256.New, k)
h.Write(data)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
Expand All @@ -73,7 +88,12 @@ func (a *HMACAnonymizer) AnonymizeNonEmpty(s string) []byte {
if s == "" {
return nil
}
h := hmac.New(sha256.New, a.key)

a.mu.RLock()
k := a.key
a.mu.RUnlock()

h := hmac.New(sha256.New, k)
h.Write([]byte(s))
return h.Sum(nil)
}
6 changes: 6 additions & 0 deletions lib/web/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package web

import (
"bytes"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/entitlements"
)
Expand All @@ -30,6 +32,10 @@ func (h *Handler) SetClusterFeatures(features proto.Features) {
h.Mutex.Lock()
defer h.Mutex.Unlock()

if !bytes.Equal(h.clusterFeatures.CloudAnonymizationKey, features.CloudAnonymizationKey) {
h.log.Info("Received new cloud anonymization key from server")
}

entitlements.BackfillFeatures(&features)
h.clusterFeatures = features
}
Expand Down

0 comments on commit 4b9fbc6

Please sign in to comment.