Skip to content

Commit

Permalink
[kube] fixes duplicated session recordings in root and leaf clusters (#…
Browse files Browse the repository at this point in the history
…48740)

This PR addresses a bug in Kubernetes session recordings where both the root proxy and the leaf cluster's Kubernetes services were recording the same session, resulting in the session being available in both clusters.

This behavior was inconsistent with other protocols, where recordings of leaf resources are only accessible in leaf clusters. To maintain consistency, this PR removes session recordings on the root clusters.

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Nov 11, 2024
1 parent 378e901 commit 882ee26
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 28 deletions.
5 changes: 2 additions & 3 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) {
loop:
for {
select {
case event := <-main.UploadEventsC:
case event := <-aux.UploadEventsC:
sessionID = event.SessionID
break loop
case <-timeoutC:
Expand All @@ -767,7 +767,7 @@ loop:
}

// read back the entire session and verify that it matches the stated output
capturedStream, err := main.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes)
capturedStream, err := aux.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes)
require.NoError(t, err)

require.Equal(t, sessionStream, string(capturedStream))
Expand Down Expand Up @@ -1583,7 +1583,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface,
}

s := getContainerStatusByName(p, containerName)
fmt.Println("test", s)
if s == nil {
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/ephemeral_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri
f.log.Errorf("Failed to set up forwarding headers: %v.", err)
return nil, trace.Wrap(err)
}
if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
sess.forwarder.ServeHTTP(w, req)
return nil, nil
}
Expand Down
43 changes: 23 additions & 20 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ type authContext struct {
kubeServers []types.KubeServer
// apiResource holds the information about the requested API resource.
apiResource apiResource
// isLocalKubernetesCluster is true if the target cluster is served by this teleport service.
// It is false if the target cluster is served by another teleport service or a different
// Teleport cluster.
isLocalKubernetesCluster bool
}

func (c authContext) String() string {
Expand Down Expand Up @@ -775,7 +779,8 @@ func (f *Forwarder) setupContext(
return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster)
}
}
if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) {
isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster)
if isLocalKubernetesCluster {
kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -809,10 +814,11 @@ func (f *Forwarder) setupContext(
remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr},
isRemote: isRemoteCluster,
},
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
isLocalKubernetesCluster: isLocalKubernetesCluster,
}, nil
}

Expand Down Expand Up @@ -865,9 +871,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat
)
defer span.End()

if sess.noAuditEvents {
// If the session is not local, don't emit the event.
if !sess.isLocalKubernetesCluster {
return
}

r := sess.apiResource
if r.skipEvent {
return
Expand Down Expand Up @@ -1161,7 +1169,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
return nil, trace.Wrap(err)
}

if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
return f.remoteJoin(ctx, w, req, p, sess)
}

Expand Down Expand Up @@ -1658,7 +1666,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.
}
// proxy.Close closes the underlying connection and releases the resources.
defer proxy.Close()
if sess.noAuditEvents {
if !sess.isLocalKubernetesCluster {
// We're forwarding this to another kubernetes_service instance, let it handle multiplexing.
return f.remoteExec(authCtx, w, req, p, sess, request, proxy)
}
Expand Down Expand Up @@ -1777,7 +1785,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
}

onPortForward := func(addr string, success bool) {
if sess.noAuditEvents {
if !sess.isLocalKubernetesCluster {
return
}
portForward := &apievents.PortForward{
Expand Down Expand Up @@ -2048,7 +2056,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h
return nil, trace.Wrap(err)
}

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
isListRequest := authCtx.requestVerb == types.KubeVerbList
// Watch requests can be send to a single resource or to a collection of resources.
// isWatchingCollectionRequest is true when the request is a watch request and
Expand Down Expand Up @@ -2145,10 +2153,8 @@ type clusterSession struct {
// nil otherwise.
kubeAPICreds kubeCreds
forwarder *reverseproxy.Forwarder
// noAuditEvents is true if this teleport service should leave audit event
// logging to another service.
noAuditEvents bool
targetAddr string
// targetAddr is the address of the target cluster.
targetAddr string
// kubeAddress is the address of this session's active connection (if there is one)
kubeAddress string
// upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2.
Expand Down Expand Up @@ -2357,11 +2363,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont
func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) {
connCtx, cancel := context.WithCancelCause(ctx)
return &clusterSession{
parent: f,
authContext: authCtx,
// This session talks to a kubernetes_service, which should handle
// audit logging. Avoid duplicate logging.
noAuditEvents: true,
parent: f,
authContext: authCtx,
requestContext: ctx,
connCtx: connCtx,
connMonitorCancel: cancel,
Expand All @@ -2385,7 +2388,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo
reverseproxy.WithLogger(f.log),
reverseproxy.WithErrorHandler(f.formatForwardResponseError),
}
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
// If the target cluster is local, i.e. the cluster that is served by this
// teleport service, then we set up the forwarder to allow re-writing
// the response to the client to include user friendly error messages.
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_deletecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo
defer span.End()
req = req.WithContext(ctx)
var (
isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster = sess.isLocalKubernetesCluster
kubeObjType string
namespace string
)
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r

req = req.WithContext(ctx)

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
supportsType := false
if isLocalKubeCluster {
_, supportsType = sess.rbacSupportedResources.getTeleportResourceKindFromAPIResource(sess.apiResource)
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/self_subject_reviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon

// only allow self subject access reviews for the service that proxies the
// request to the kubernetes API server.
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) {
return nil, nil
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta
s.started = true
sessionStart := s.forwarder.cfg.Clock.Now().UTC()

if !s.sess.noAuditEvents {
if s.sess.isLocalKubernetesCluster {
s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 882ee26

Please sign in to comment.