Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: restructure vnet for windows support #49921

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/teleterm/vnet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *Service) Start(ctx context.Context, req *api.StartRequest) (*api.StartR
}

s.clusterConfigCache = vnet.NewClusterConfigCache(s.cfg.Clock)
processManager, err := vnet.SetupAndRun(ctx, &vnet.SetupAndRunConfig{
processManager, err := vnet.Run(ctx, &vnet.RunConfig{
AppProvider: appProvider,
ClusterConfigCache: s.clusterConfigCache,
})
Expand Down
157 changes: 157 additions & 0 deletions lib/vnet/admin_process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package vnet

import (
"context"
"os"
"time"

"github.com/gravitational/trace"
"golang.zx2c4.com/wireguard/tun"

"github.com/gravitational/teleport/lib/vnet/daemon"
)

// RunAdminProcess must run as root. It creates and sets up a TUN device and passes
// the file descriptor for that device over the unix socket found at config.socketPath.
//
// It also handles host OS configuration that must run as root, and stays alive to keep the host configuration
// up to date. It will stay running until the socket at config.socketPath is deleted or until encountering an
// unrecoverable error.
//
// OS configuration is updated every [osConfigurationInterval]. During the update, it temporarily
// changes egid and euid of the process to that of the client connecting to the daemon.
func RunAdminProcess(ctx context.Context, config daemon.Config) error {
if err := config.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

tunName, err := createAndSendTUNDevice(ctx, config.SocketPath)
if err != nil {
return trace.Wrap(err)
}

errCh := make(chan error)
go func() {
errCh <- trace.Wrap(osConfigurationLoop(ctx, tunName, config.IPv6Prefix, config.DNSAddr, config.HomePath, config.ClientCred))
}()

// Stay alive until we get an error on errCh, indicating that the osConfig loop exited.
// If the socket is deleted, indicating that the unprivileged process exited, cancel the context
// and then wait for the osConfig loop to exit and send an err on errCh.
ticker := time.NewTicker(daemon.CheckUnprivilegedProcessInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if _, err := os.Stat(config.SocketPath); err != nil {
log.DebugContext(ctx, "failed to stat socket path, assuming parent exited")
cancel()
return trace.Wrap(<-errCh)
}
case err := <-errCh:
return trace.Wrap(err)
}
}
}

// createAndSendTUNDevice creates a virtual network TUN device and sends the open file descriptor on
// [socketPath]. It returns the name of the TUN device or an error.
func createAndSendTUNDevice(ctx context.Context, socketPath string) (string, error) {
tun, tunName, err := createTUNDevice(ctx)
if err != nil {
return "", trace.Wrap(err, "creating TUN device")
}

defer func() {
// We can safely close the TUN device in the admin process after it has been sent on the socket.
if err := tun.Close(); err != nil {
log.WarnContext(ctx, "Failed to close TUN device.", "error", trace.Wrap(err))
}
}()

if err := sendTUNNameAndFd(socketPath, tunName, tun.File()); err != nil {
return "", trace.Wrap(err, "sending TUN over socket")
}
return tunName, nil
}

func createTUNDevice(ctx context.Context) (tun.Device, string, error) {
log.DebugContext(ctx, "Creating TUN device.")
dev, err := tun.CreateTUN("utun", mtu)
if err != nil {
return nil, "", trace.Wrap(err, "creating TUN device")
}
name, err := dev.Name()
if err != nil {
return nil, "", trace.Wrap(err, "getting TUN device name")
}
return dev, name, nil
}

// osConfigurationLoop will keep running until [ctx] is canceled or an unrecoverable error is encountered, in
// order to keep the host OS configuration up to date.
func osConfigurationLoop(ctx context.Context, tunName, ipv6Prefix, dnsAddr, homePath string, clientCred daemon.ClientCred) error {
osConfigurator, err := newOSConfigurator(tunName, ipv6Prefix, dnsAddr, homePath, clientCred)
if err != nil {
return trace.Wrap(err, "creating OS configurator")
}
defer func() {
if err := osConfigurator.close(); err != nil {
log.ErrorContext(ctx, "Error while closing OS configurator", "error", err)
}
}()

// Clean up any stale configuration left by a previous VNet instance that may have failed to clean up.
// This is necessary in case any stale /etc/resolver/<proxy address> entries are still present, we need to
// be able to reach the proxy in order to fetch the vnet_config.
if err := osConfigurator.deconfigureOS(ctx); err != nil {
return trace.Wrap(err, "cleaning up OS configuration on startup")
}

defer func() {
// Shutting down, deconfigure OS. Pass context.Background because [ctx] has likely been canceled
// already but we still need to clean up.
if err := osConfigurator.deconfigureOS(context.Background()); err != nil {
log.ErrorContext(ctx, "Error deconfiguring host OS before shutting down.", "error", err)
}
}()

if err := osConfigurator.updateOSConfiguration(ctx); err != nil {
return trace.Wrap(err, "applying initial OS configuration")
}

// Re-configure the host OS every 10 seconds. This will pick up any newly logged-in clusters by
// reading profiles from TELEPORT_HOME.
const osConfigurationInterval = 10 * time.Second
ticker := time.NewTicker(osConfigurationInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := osConfigurator.updateOSConfiguration(ctx); err != nil {
return trace.Wrap(err, "updating OS configuration")
}
case <-ctx.Done():
return ctx.Err()
}
}
}
54 changes: 27 additions & 27 deletions lib/vnet/app_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,24 @@ type DialOptions struct {
InsecureSkipVerify bool
}

// TCPAppResolver implements [TCPHandlerResolver] for Teleport TCP apps.
type TCPAppResolver struct {
// tcpAppResolver implements [tcpHandlerResolver] for Teleport TCP apps.
type tcpAppResolver struct {
appProvider AppProvider
clusterConfigCache *ClusterConfigCache
log *slog.Logger
clock clockwork.Clock
}

// NewTCPAppResolver returns a new *TCPAppResolver which will resolve full-qualified domain names to
// TCPHandlers that will proxy TCP connection to Teleport TCP apps.
// newTCPAppResolver returns a new [*tcpAppResolver] which will resolve full-qualified domain names to
// [tcpHandler]s that will proxy TCP connection to Teleport TCP apps.
//
// It uses [appProvider] to list and retrieve cluster clients which are expected to be cached to avoid
// repeated/unnecessary dials to the cluster. These clients are then used to list TCP apps that should be
// handled.
//
// [appProvider] is also used to get app certificates used to dial the apps.
func NewTCPAppResolver(appProvider AppProvider, opts ...tcpAppResolverOption) (*TCPAppResolver, error) {
r := &TCPAppResolver{
func newTCPAppResolver(appProvider AppProvider, opts ...tcpAppResolverOption) (*tcpAppResolver, error) {
r := &tcpAppResolver{
appProvider: appProvider,
log: log.With(teleport.ComponentKey, "VNet.AppResolver"),
}
Expand All @@ -120,35 +120,35 @@ func NewTCPAppResolver(appProvider AppProvider, opts ...tcpAppResolverOption) (*
return r, nil
}

type tcpAppResolverOption func(*TCPAppResolver)
type tcpAppResolverOption func(*tcpAppResolver)

// withClock is a functional option to override the default clock (for tests).
func withClock(clock clockwork.Clock) tcpAppResolverOption {
return func(r *TCPAppResolver) {
return func(r *tcpAppResolver) {
r.clock = clock
}
}

// WithClusterConfigCache is a functional option to override the cluster config cache.
func WithClusterConfigCache(clusterConfigCache *ClusterConfigCache) tcpAppResolverOption {
return func(r *TCPAppResolver) {
return func(r *tcpAppResolver) {
r.clusterConfigCache = clusterConfigCache
}
}

// ResolveTCPHandler resolves a fully-qualified domain name to a [TCPHandlerSpec] for a Teleport TCP app that should
// resolveTCPHandler resolves a fully-qualified domain name to a [tcpHandlerSpec] for a Teleport TCP app that should
// be used to handle all future TCP connections to [fqdn].
// Avoid using [trace.Wrap] on [ErrNoTCPHandler] to prevent collecting a full stack trace on every unhandled
// Avoid using [trace.Wrap] on [errNoTCPHandler] to prevent collecting a full stack trace on every unhandled
// query.
func (r *TCPAppResolver) ResolveTCPHandler(ctx context.Context, fqdn string) (*TCPHandlerSpec, error) {
func (r *tcpAppResolver) resolveTCPHandler(ctx context.Context, fqdn string) (*tcpHandlerSpec, error) {
profileNames, err := r.appProvider.ListProfiles()
if err != nil {
return nil, trace.Wrap(err, "listing profiles")
}
for _, profileName := range profileNames {
if fqdn == fullyQualify(profileName) {
// This is a query for the proxy address, which we'll never want to handle.
return nil, ErrNoTCPHandler
return nil, errNoTCPHandler
}

clusterClient, err := r.clusterClientForAppFQDN(ctx, profileName, fqdn)
Expand All @@ -172,12 +172,12 @@ func (r *TCPAppResolver) ResolveTCPHandler(ctx context.Context, fqdn string) (*T
return r.resolveTCPHandlerForCluster(ctx, clusterClient, profileName, leafClusterName, fqdn)
}
// fqdn did not match any profile, forward the request upstream.
return nil, ErrNoTCPHandler
return nil, errNoTCPHandler
}

var errNoMatch = errors.New("cluster does not match queried FQDN")

func (r *TCPAppResolver) clusterClientForAppFQDN(ctx context.Context, profileName, fqdn string) (ClusterClient, error) {
func (r *tcpAppResolver) clusterClientForAppFQDN(ctx context.Context, profileName, fqdn string) (ClusterClient, error) {
rootClient, err := r.appProvider.GetCachedClient(ctx, profileName, "")
if err != nil {
r.log.ErrorContext(ctx, "Failed to get root cluster client, apps in this cluster will not be resolved.", "profile", profileName, "error", err)
Expand Down Expand Up @@ -236,15 +236,15 @@ func getLeafClusters(ctx context.Context, rootClient ClusterClient) ([]string, e
}
}

// resolveTCPHandlerForCluster takes a cluster client and resolves [fqdn] to a [TCPHandlerSpec] if a matching
// resolveTCPHandlerForCluster takes a cluster client and resolves [fqdn] to a [tcpHandlerSpec] if a matching
// app is found in that cluster.
// Avoid using [trace.Wrap] on [ErrNoTCPHandler] to prevent collecting a full stack trace on every unhandled
// Avoid using [trace.Wrap] on [errNoTCPHandler] to prevent collecting a full stack trace on every unhandled
// query.
func (r *TCPAppResolver) resolveTCPHandlerForCluster(
func (r *tcpAppResolver) resolveTCPHandlerForCluster(
ctx context.Context,
clusterClient ClusterClient,
profileName, leafClusterName, fqdn string,
) (*TCPHandlerSpec, error) {
) (*tcpHandlerSpec, error) {
log := r.log.With("profile", profileName, "leaf_cluster", leafClusterName, "fqdn", fqdn)
// An app public_addr could technically be full-qualified or not, match either way.
expr := fmt.Sprintf(`(resource.spec.public_addr == "%s" || resource.spec.public_addr == "%s") && hasPrefix(resource.spec.uri, "tcp://")`,
Expand All @@ -258,11 +258,11 @@ func (r *TCPAppResolver) resolveTCPHandlerForCluster(
// Don't return an unexpected error so we can try to find the app in different clusters or forward the
// request upstream.
log.InfoContext(ctx, "Failed to list application servers.", "error", err)
return nil, ErrNoTCPHandler
return nil, errNoTCPHandler
}
if len(resp.Resources) == 0 {
// Didn't find any matching app, forward the request upstream.
return nil, ErrNoTCPHandler
return nil, errNoTCPHandler
}
app := resp.Resources[0].GetApp()
appHandler, err := r.newTCPAppHandler(ctx, profileName, leafClusterName, app)
Expand All @@ -275,9 +275,9 @@ func (r *TCPAppResolver) resolveTCPHandlerForCluster(
return nil, trace.Wrap(err)
}

return &TCPHandlerSpec{
IPv4CIDRRange: clusterConfig.IPv4CIDRRange,
TCPHandler: appHandler,
return &tcpHandlerSpec{
ipv4CIDRRange: clusterConfig.IPv4CIDRRange,
tcpHandler: appHandler,
}, nil
}

Expand All @@ -293,7 +293,7 @@ type tcpAppHandler struct {
mu sync.Mutex
}

func (r *TCPAppResolver) newTCPAppHandler(
func (r *tcpAppResolver) newTCPAppHandler(
ctx context.Context,
profileName string,
leafClusterName string,
Expand Down Expand Up @@ -391,9 +391,9 @@ func (h *tcpAppHandler) getOrInitializeLocalProxy(ctx context.Context, localPort
return newLP, nil
}

// HandleTCPConnector handles an incoming TCP connection from VNet by passing it to the local alpn proxy,
// handleTCPConnector handles an incoming TCP connection from VNet by passing it to the local alpn proxy,
// which is set up with middleware to automatically handler certificate renewal and re-logins.
func (h *tcpAppHandler) HandleTCPConnector(ctx context.Context, localPort uint16, connector func() (net.Conn, error)) error {
func (h *tcpAppHandler) handleTCPConnector(ctx context.Context, localPort uint16, connector func() (net.Conn, error)) error {
lp, err := h.getOrInitializeLocalProxy(ctx, localPort)
if err != nil {
return trace.Wrap(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"github.com/gravitational/teleport/lib/vnet/daemon"
)

// execAdminProcess is called from the normal user process to register and call
// the daemon process which runs as root.
func execAdminProcess(ctx context.Context, config daemon.Config) error {
return trace.Wrap(daemon.RegisterAndCall(ctx, config))
}

// DaemonSubcommand runs the VNet daemon process.
func DaemonSubcommand(ctx context.Context) error {
return trace.Wrap(daemon.Start(ctx, AdminSetup))
return trace.Wrap(daemon.Start(ctx, RunAdminProcess))
}
Loading
Loading