Skip to content

Commit

Permalink
Convert lib/proxy/peer to slog
Browse files Browse the repository at this point in the history
  • Loading branch information
espadolini committed Oct 16, 2024
1 parent 04f80a5 commit 5fcc162
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 40 deletions.
23 changes: 11 additions & 12 deletions lib/proxy/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package peer
import (
"context"
"crypto/tls"
"log/slog"
"math/rand/v2"
"net"
"sync"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/quic-go/quic-go"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -72,7 +72,7 @@ type ClientConfig struct {
// TLSCipherSuites optionally contains a list of TLS ciphersuites to use.
TLSCipherSuites []uint16
// Log is the proxy client logger.
Log logrus.FieldLogger
Log *slog.Logger
// Clock is used to control connection monitoring ticker.
Clock clockwork.Clock
// GracefulShutdownTimout is used set the graceful shutdown
Expand Down Expand Up @@ -112,10 +112,10 @@ func noopConnShuffler() connShuffler {
// checkAndSetDefaults checks and sets default values
func (c *ClientConfig) checkAndSetDefaults() error {
if c.Log == nil {
c.Log = logrus.New()
c.Log = slog.Default()
}

c.Log = c.Log.WithField(
c.Log = c.Log.With(
teleport.ComponentKey,
teleport.Component(teleport.ComponentProxyPeer),
)
Expand Down Expand Up @@ -414,30 +414,29 @@ func (c *Client) sync() {
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.Component(teleport.ComponentProxyPeer),
Client: c.config.AccessPoint,
// TODO(tross): use the configured logger after updating peering to use slog
// Logger: c.config.Logger,
Logger: c.config.Log,
},
ProxyDiffer: func(old, new types.Server) bool {
return old.GetPeerAddr() != new.GetPeerAddr()
},
})
if err != nil {
c.config.Log.Errorf("Error initializing proxy peer watcher: %+v.", err)
c.config.Log.ErrorContext(c.ctx, "Error initializing proxy peer watcher.", "error", err)
return
}
defer proxyWatcher.Close()

for {
select {
case <-c.ctx.Done():
c.config.Log.Debug("Stopping peer proxy sync: context done.")
c.config.Log.DebugContext(c.ctx, "Stopping peer proxy sync: context done.")
return
case <-proxyWatcher.Done():
c.config.Log.Debug("Stopping peer proxy sync: proxy watcher done.")
c.config.Log.DebugContext(c.ctx, "Stopping peer proxy sync: proxy watcher done.")
return
case proxies := <-proxyWatcher.ProxiesC:
if err := c.updateConnections(proxies); err != nil {
c.config.Log.Errorf("Error syncing peer proxies: %+v.", err)
c.config.Log.ErrorContext(c.ctx, "Error syncing peer proxies.", "error", err)
}
}
}
Expand Down Expand Up @@ -488,7 +487,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDial)
c.config.Log.Debugf("Error dialing peer proxy %+v at %+v", id, proxy.GetPeerAddr())
c.config.Log.DebugContext(c.ctx, "Error dialing peer proxy.", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
errs = append(errs, err)
continue
}
Expand Down Expand Up @@ -689,7 +688,7 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDirectDial)
c.config.Log.Debugf("Error direct dialing peer proxy %+v at %+v", id, proxy.GetPeerAddr())
c.config.Log.DebugContext(c.ctx, "Error direct dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
errs = append(errs, err)
continue
}
Expand Down
15 changes: 8 additions & 7 deletions lib/proxy/peer/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package peer

import (
"context"
"log/slog"
"net"

"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/credentials"

"github.com/gravitational/teleport/api/types"
Expand All @@ -37,16 +37,16 @@ type clientCredentials struct {
credentials.TransportCredentials
peerID string
peerAddr string
logger logrus.FieldLogger
log *slog.Logger
}

// newClientCredentials creates new clientCredentials from the given [crendentials.TransportCredentials].
func newClientCredentials(peerID, peerAddr string, logger logrus.FieldLogger, creds credentials.TransportCredentials) *clientCredentials {
func newClientCredentials(peerID, peerAddr string, log *slog.Logger, creds credentials.TransportCredentials) *clientCredentials {
return &clientCredentials{
TransportCredentials: creds,
peerID: peerID,
peerAddr: peerAddr,
logger: logger,
log: log,
}
}

Expand All @@ -73,15 +73,16 @@ func (c *clientCredentials) ClientHandshake(ctx context.Context, laddr string, c
return nil, nil, trace.Wrap(err)
}

const duplicatePeerMsg = "Detected multiple Proxy Peers with the same public address %q when connecting to Proxy %q which can lead to inconsistent state and problems establishing sessions. For best results ensure that `peer_public_addr` is unique per proxy and not a load balancer."
if err := validatePeer(c.peerID, identity); err != nil {
c.logger.Errorf(duplicatePeerMsg, c.peerAddr, c.peerID)
c.log.ErrorContext(ctx, duplicatePeerMsg, "peer_addr", c.peerAddr, "peer_id", c.peerID)
return nil, nil, trace.Wrap(err)
}

return conn, authInfo, nil
}

const duplicatePeerMsg = "Detected multiple Proxy Peers with the same public address when connecting to a Proxy which can lead to inconsistent state and problems establishing sessions. For best results ensure that `peer_public_addr` is unique per proxy and not a load balancer."

// getIdentity returns a [tlsca.Identity] that is created from the certificate
// presented during the TLS handshake.
func getIdentity(authInfo credentials.AuthInfo) (*tlsca.Identity, error) {
Expand Down Expand Up @@ -120,5 +121,5 @@ func validatePeer(peerID string, identity *tlsca.Identity) error {
return nil
}

return trace.AccessDenied("connected to unexpected proxy")
return trace.Wrap(wrongProxyError{})
}
2 changes: 1 addition & 1 deletion lib/proxy/peer/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestClientCredentials(t *testing.T) {
},
}

creds := newClientCredentials(test.expectedPeerID, test.peerAddr, utils.NewLoggerForTests(), newTestCredentials(cert))
creds := newClientCredentials(test.expectedPeerID, test.peerAddr, utils.NewSlogLoggerForTests(), newTestCredentials(cert))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Expand Down
63 changes: 44 additions & 19 deletions lib/proxy/peer/quicclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/binary"
"errors"
"io"
"log/slog"
"net"
"slices"
"sync"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"github.com/quic-go/quic-go"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -45,11 +45,11 @@ import (
)

func (c *Client) connectQuic(peerID string, peerAddr string) (*quicClientConn, error) {
log := c.config.Log.WithFields(logrus.Fields{
"peer_id": peerID,
"peer_addr": peerAddr,
})
log.Info("Setting up a QUIC client conn.")
log := c.config.Log.With(
"peer_id", peerID,
"peer_addr", peerAddr,
)
log.InfoContext(c.ctx, "Setting up a QUIC client conn.")

udpAddr, err := net.ResolveUDPAddr("udp", peerAddr)
if err != nil {
Expand Down Expand Up @@ -106,7 +106,7 @@ type quicClientConn struct {
id string
addr *net.UDPAddr

log logrus.FieldLogger
log *slog.Logger

transport *quic.Transport

Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT
return nil, trace.Wrap(err)
}

log := c.log.WithField("conn_nonce", nonce)
log := c.log.With("conn_nonce", nonce)

req := &quicpeeringv1a.DialRequest{
ServerId: nodeID,
Expand Down Expand Up @@ -212,9 +212,13 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT
dialCtx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

log.Debug("Dialing peer proxy.")
log.DebugContext(dialCtx, "Dialing peer proxy.")
earlyConn, err := c.transport.DialEarly(dialCtx, c.addr, tlsConfig, c.quicConfig)
if err != nil {
if errors.Is(err, wrongProxyError{}) {
const duplicatePeerMsg = duplicatePeerMsg // to appease sloglint
log.ErrorContext(dialCtx, duplicatePeerMsg)
}
return nil, trace.Wrap(err)
}

Expand All @@ -226,17 +230,27 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT
conn.CloseWithError(0, "")
}()

log.WithField("gso", conn.ConnectionState().GSO).Debug("Opened connection.")
log.DebugContext(conn.Context(),
"Opened connection.",
"gso", conn.ConnectionState().GSO,
)

respBuf, stream, err := quicSendUnary(deadline, sizedReqBuf, conn)
if err != nil {
if !errors.Is(err, quic.Err0RTTRejected) {
return nil, trace.Wrap(err)
}

log.Info("0-RTT attempt rejected, retrying with a full handshake.")
log.InfoContext(dialCtx, "0-RTT attempt rejected, retrying with a full handshake.")
nextConn, err := earlyConn.NextConnection(dialCtx)
if err != nil {
if errors.Is(err, wrongProxyError{}) {
// if we are hitting a QUIC-aware load balancer(?) it's possible
// to reach an unexpected peer proxy after a failed 0-RTT
// (failed because we got sent to the "wrong" peer)
const duplicatePeerMsg = duplicatePeerMsg // to appease sloglint
log.ErrorContext(dialCtx, duplicatePeerMsg)
}
return nil, trace.Wrap(err)
}
conn, earlyConn = nextConn, nil
Expand All @@ -245,14 +259,17 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT
if conn.Context().Err() != nil {
return nil, trace.Wrap(context.Cause(conn.Context()))
}
log.Debug("Full handshake completed after 0-RTT rejection.")
log.DebugContext(conn.Context(), "Full handshake completed after 0-RTT rejection.")
respBuf, stream, err = quicSendUnary(deadline, sizedReqBuf, conn)
if err != nil {
return nil, trace.Wrap(err)
}
}

log.WithField("used_0rtt", conn.ConnectionState().Used0RTT).Debug("Exchanged dial request and response.")
log.DebugContext(conn.Context(),
"Exchanged dial request and response.",
"used_0rtt", conn.ConnectionState().Used0RTT,
)

resp := new(quicpeeringv1a.DialResponse)
if err := proto.Unmarshal(respBuf, resp); err != nil {
Expand Down Expand Up @@ -281,8 +298,6 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT

src: src,
dst: dst,

log: log,
}

detach := context.AfterFunc(c.runCtx, func() { _ = sc.Close() })
Expand All @@ -291,7 +306,7 @@ func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelT
// so we're always allowed to add another one here
c.wg.Add(1)
context.AfterFunc(conn.Context(), func() {
log.Debug("Connection closed.")
log.DebugContext(conn.Context(), "Connection closed.")
c.wg.Done()
// remove the connection from the runCtx cancellation tree
detach()
Expand Down Expand Up @@ -344,8 +359,6 @@ type streamConn struct {

src net.Addr
dst net.Addr

log logrus.FieldLogger
}

var _ net.Conn = (*streamConn)(nil)
Expand Down Expand Up @@ -409,8 +422,20 @@ func verifyPeerCertificateIsSpecificProxy(peerID string) verifyPeerCertificateFu
}

if clientIdentity.Username != peerID {
return trace.AccessDenied("expected proxy %v, got %q", peerID, clientIdentity.Username)
return trace.Wrap(wrongProxyError{})
}
return nil
}
}

type wrongProxyError struct{}

func (wrongProxyError) Error() string {
return "connected to unexpected proxy"
}

func (e wrongProxyError) Unwrap() error {
return &trace.AccessDeniedError{
Message: e.Error(),
}
}
2 changes: 1 addition & 1 deletion lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4349,7 +4349,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
TLSCipherSuites: cfg.CipherSuites,
GetTLSCertificate: conn.ClientGetCertificate,
GetTLSRoots: conn.ClientGetPool,
Log: process.log,
Log: process.logger,
Clock: process.Clock,
ClusterName: clusterName,
QUICTransport: peerQUICTransport,
Expand Down

0 comments on commit 5fcc162

Please sign in to comment.