Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] Handle resource cleanup on termination within the inventory control stream #44224

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/client/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (i *downstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStr
oneOf.Msg = &proto.UpstreamInventoryOneOf_AgentMetadata{
AgentMetadata: &msg,
}
case proto.UpstreamInventoryGoodbye:
oneOf.Msg = &proto.UpstreamInventoryOneOf_Goodbye{
Goodbye: &msg,
}
default:
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
continue
Expand Down Expand Up @@ -478,6 +482,8 @@ func (i *upstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStrea
msg = *oneOf.GetPong()
case oneOf.GetAgentMetadata() != nil:
msg = *oneOf.GetAgentMetadata()
case oneOf.GetGoodbye() != nil:
msg = *oneOf.GetGoodbye()
default:
// TODO: log unknown message variants once we have a better story around
// logging in api/* packages.
Expand Down Expand Up @@ -514,7 +520,7 @@ func (i *upstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStrea
UpdateLabels: &msg,
}
default:
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
sendMsg.errC <- trace.BadParameter("cannot send unexpected downstream msg type: %T", msg)
continue
}
err := stream.Send(&oneOf)
Expand Down
3,005 changes: 2,120 additions & 885 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/client/proto/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (p UpstreamInventoryPong) sealedUpstreamInventoryMessage() {}

func (a UpstreamInventoryAgentMetadata) sealedUpstreamInventoryMessage() {}

func (h UpstreamInventoryGoodbye) sealedUpstreamInventoryMessage() {}

// DownstreamInventoryMessage is a sealed interface representing the possible
// downstream messages of the inventory controls stream after initial hello.
type DownstreamInventoryMessage interface {
Expand Down
55 changes: 55 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2141,6 +2141,8 @@ message UpstreamInventoryOneOf {
UpstreamInventoryPong Pong = 3;
// UpstreamInventoryAgentMetadata advertises instance metadata.
UpstreamInventoryAgentMetadata AgentMetadata = 4;
// UpstreamInventoryGoodbye advertises that the instance is terminating.
UpstreamInventoryGoodbye Goodbye = 5;
}
}

Expand Down Expand Up @@ -2221,6 +2223,51 @@ message DownstreamInventoryHello {
string Version = 1;
// ServerID advertises the server ID of the auth server.
string ServerID = 2;

// SupportedCapabilities indicate which features of the ICS that
// the connect auth server supports. This allows agents to determine
// how they should interact with the auth server to maintain compatibility.
message SupportedCapabilities {
// ProxyHeartbeats indicates the ICS supports heartbeating proxy servers.
bool ProxyHeartbeats = 1;
// ProxyCleanup indicates the ICS supports deleting proxies when UpstreamInventoryGoodbye.DeleteResources is set.
bool ProxyCleanup = 2;
// ProxyHeartbeats indicates the ICS supports heartbeating proxy servers.
bool AuthHeartbeats = 3;
// ProxyCleanup indicates the ICS supports deleting proxies when UpstreamInventoryGoodbye.DeleteResources is set.
bool AuthCleanup = 4;
// NodeHeartbeats indicates the ICS supports heartbeating ssh servers.
bool NodeHeartbeats = 5;
// NodeCleanup indicates the ICS supports deleting nodes when UpstreamInventoryGoodbye.DeleteResources is set.
bool NodeCleanup = 6;
// AppHeartbeats indicates the ICS supports heartbeating app servers.
bool AppHeartbeats = 7;
// AppCleanup indicates the ICS supports deleting apps when UpstreamInventoryGoodbye.DeleteResources is set.
bool AppCleanup = 8;
// DatabaseHeartbeats indicates the ICS supports heartbeating databases.
bool DatabaseHeartbeats = 9;
// DatabaseCleanup indicates the ICS supports deleting databases when UpstreamInventoryGoodbye.DeleteResources is set.
bool DatabaseCleanup = 10;
// DatabaseServiceHeartbeats indicates the ICS supports heartbeating databse services.
bool DatabaseServiceHeartbeats = 11;
// DatabaseServiceCleanup indicates the ICS supports deleting database services when UpstreamInventoryGoodbye.DeleteResources is set.
bool DatabaseServiceCleanup = 12;
// WindowsDesktopHeartbeats indicates the ICS supports heartbeating windows desktop servers.
bool WindowsDesktopHeartbeats = 13;
// WindowsDesktopCleanup indicates the ICS supports deleting windows desktops when UpstreamInventoryGoodbye.DeleteResources is set.
bool WindowsDesktopCleanup = 14;
// WindowsDesktopHeartbeats indicates the ICS supports heartbeating windows desktop services.
bool WindowsDesktopServiceHeartbeats = 15;
// WindowsDesktopCleanup indicates the ICS supports deleting windows desktop services when UpstreamInventoryGoodbye.DeleteResources is set.
bool WindowsDesktopServiceCleanup = 16;
// KubernetesHeartbeats indicates the ICS supports heartbeating kubernetes clusters.
bool KubernetesHeartbeats = 17;
// KubernetesCleanup indicates the ICS supports deleting kubernetes clusters when UpstreamInventoryGoodbye.DeleteResources is set.
bool KubernetesCleanup = 18;
}

// SupportedCapabilities advertises the supported features of the auth server.
SupportedCapabilities Capabilities = 3;
}

// LabelUpdateKind is the type of service to update labels for.
Expand Down Expand Up @@ -2263,6 +2310,14 @@ message InventoryHeartbeat {
types.AppServerV3 AppServer = 2;
}

// UpstreamInventoryGoodbye informs the upstream service that instance
// is terminating
message UpstreamInventoryGoodbye {
// DeleteResources indicates that any heartbeats received from
// the instance should be terminated when the stream is closed.
bool DeleteResources = 1;
}

// InventoryStatusRequest requests inventory status info.
message InventoryStatusRequest {
// Connected requests summary of the inventory control streams registered with
Expand Down
5 changes: 5 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4055,6 +4055,11 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
downstreamHello := proto.DownstreamInventoryHello{
Version: teleport.Version,
ServerID: a.ServerID,
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
},
}
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {
return trace.Wrap(err)
Expand Down
12 changes: 12 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type Auth interface {
UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)
UpsertApplicationServer(context.Context, types.AppServer) (*types.KeepAlive, error)
DeleteApplicationServer(ctx context.Context, namespace, hostID, name string) error

KeepAliveServer(context.Context, types.KeepAlive) error

Expand Down Expand Up @@ -310,6 +311,15 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
}

defer func() {
if handle.goodbye.GetDeleteResources() {
log.WithField("apps", len(handle.appServers)).Debug("Cleaning up resources in response to instance termination")
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err)
}
}
}

for _, service := range handle.hello.Services {
c.serviceCounter.decrement(service)
}
Expand Down Expand Up @@ -360,6 +370,8 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
}
case proto.UpstreamInventoryPong:
c.handlePong(handle, m)
case proto.UpstreamInventoryGoodbye:
handle.goodbye = m
default:
log.Warnf("Unexpected upstream message type %T on control stream of server %q.", m, handle.Hello().ServerID)
handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m))
Expand Down
Loading
Loading