Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 [WIP] Use SPDY over websockets and fallback to direct SPDY on client side #11125

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion controlplane/kubeadm/internal/proxy/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/pkg/errors"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could it be that PR title and description are slightly incorrect?

It seems to me that we are always using SPDY. The question is just if it's over webhooks or directly?

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
Expand All @@ -37,6 +39,7 @@ const defaultTimeout = 10 * time.Second
type Dialer struct {
proxy Proxy
clientset *kubernetes.Clientset
restConfig *rest.Config
proxyTransport http.RoundTripper
upgrader spdy.Upgrader
timeout time.Duration
Expand Down Expand Up @@ -74,6 +77,7 @@ func NewDialer(p Proxy, options ...func(*Dialer) error) (*Dialer, error) {
dialer.proxyTransport = proxyTransport
dialer.upgrader = upgrader
dialer.clientset = clientset
dialer.restConfig = p.KubeConfig
return dialer, nil
}

Expand All @@ -92,7 +96,17 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn
Name(addr).
SubResource("portforward")

dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())
spdyDialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())

websocketDialer, err := portforward.NewSPDYOverWebsocketDialer(req.URL(), d.restConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if this only works with certain Kubernetes apiserver versions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without diving to deep into the upstream k/k PR it looks to me like there is a server-side feature-gate to make this possible (PortForwardWebsockets) and it's only enabled per default with 1.31?
Is this correct?

xref:

If yes, this means for Kubernetes < 1.31 we are now with every single reconcile trying the websocketDialer but it won't work. Sounds like a lot of unnecessary "work" (not sure what it's exactly doing, requests?). We should probably try to avoid this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Wondering if we should have two dials method and invoke them depending on the k8s version, or we should have a cache (might be a TTL cache) where we track IP that do not support web sockets

Copy link
Member Author

@chrischdi chrischdi Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We maybe should configure the fallback depending on the k8s version.

< 1.31 : spdy first, websocket second
>= 1.31 : websocket first, spdy second

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the simple option, it just doesn't take care of the case where someone disables the feature gate in 1.31.

Do we have an idea of how high the impact is of falling back on every single reconcile?

Copy link
Member

@sbueringer sbueringer Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait. We're talking about KCP, so KCP would actually know if someone enables/disables the feature gate (and not just the default is used). Although this could be tricky during rollouts, but probably fine if we just temporarily fallback a few additional times (should be okay as long as we don't permanently try to do something that has no chance of working?)

if err != nil {
return nil, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe let's wrap the error

}

// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer := portforward.NewFallbackDialer(websocketDialer, spdyDialer, func(err error) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we can't tell from green e2e tests that websocketDialer worked, right? (it could be that we are always falling back to the spdyDialer, which I assume would lead to a lot of unnecessary stuff being done)

Would it make sense to check it once with Tilt manually? (if not already done)

return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})

// Create a new connection from the dialer.
//
Expand Down
16 changes: 15 additions & 1 deletion test/infrastructure/inmemory/pkg/server/proxy/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
Expand All @@ -39,6 +41,7 @@ type Dialer struct {
clientset *kubernetes.Clientset
proxyTransport http.RoundTripper
upgrader spdy.Upgrader
restConfig *rest.Config
timeout time.Duration
}

Expand Down Expand Up @@ -74,6 +77,7 @@ func NewDialer(p Proxy, options ...func(*Dialer) error) (*Dialer, error) {
dialer.proxyTransport = proxyTransport
dialer.upgrader = upgrader
dialer.clientset = clientset
dialer.restConfig = p.KubeConfig
return dialer, nil
}

Expand All @@ -92,7 +96,17 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn
Name(addr).
SubResource("portforward")

dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())
spdyDialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())

websocketDialer, err := portforward.NewSPDYOverWebsocketDialer(req.URL(), d.restConfig)
if err != nil {
return nil, err
}

// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer := portforward.NewFallbackDialer(websocketDialer, spdyDialer, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})

// Create a new connection from the dialer.
//
Expand Down
Loading