Skip to content

Commit

Permalink
Merge pull request #4 from clastix/issues/1
Browse files Browse the repository at this point in the history
Doing bare reverse proxy with label selector
  • Loading branch information
prometherion authored Sep 19, 2020
2 parents 3c69d06 + 1d9b502 commit 59bbe7d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 124 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.4.0
github.com/imdario/mergo v0.3.11 // indirect
github.com/stretchr/testify v1.6.1 // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
k8s.io/api v0.19.0-beta.2
k8s.io/apimachinery v0.19.0-beta.2
k8s.io/client-go v0.19.0-beta.2
k8s.io/utils v0.0.0-20200912215256-4140de9c8800 // indirect
Expand Down
14 changes: 8 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func main() {
var mgr ctrl.Manager

fs := flag.NewFlagSet("filter", flag.ExitOnError)
listeningPort := fs.Uint("listening-port", 9001, "HTTP port the proxy listens to")
k8sControlPlaneUrl := fs.String("k8s-control-plane-url", "https://kubernetes.default.svc", "Kubernetes control plane URL")
capsuleUserGroup := fs.String("capsule-user-group", "clastix.capsule.io", "The Capsule User Group eligible to create Namespace for Tenant resources")
listeningPort := fs.Uint("listening-port", 9001, "HTTP port the proxy listens to (default: 9001)")
k8sControlPlaneUrl := fs.String("k8s-control-plane-url", "https://kubernetes.default.svc", "Kubernetes control plane URL (default: https://kubernetes.default.svc)")
capsuleUserGroup := fs.String("capsule-user-group", "clastix.capsule.io", "The Capsule User Group eligible to create Namespace for Tenant resources (default: clastix.capsule.io)")
usernameClaimField := fs.String("oidc-username-claim", "preferred_username", "The OIDC field name used to identify the user (default: preferred_username)")
err = fs.Parse(os.Args[1:])

opts := zap.Options{}
Expand All @@ -48,9 +49,10 @@ func main() {
}

log.Info("---")
log.Info(fmt.Sprintf("Manager will listen to port %d", *listeningPort))
log.Info(fmt.Sprintf("Manager listening on port %d", *listeningPort))
log.Info(fmt.Sprintf("Connecting to the Kubernete API Server listening on %s", *k8sControlPlaneUrl))
log.Info(fmt.Sprintf("The selected Capsule User Group is %s", *capsuleUserGroup))
log.Info(fmt.Sprintf("The OIDC username filed %s", *usernameClaimField))
log.Info("---")

log.Info("Creating the manager")
Expand All @@ -65,15 +67,15 @@ func main() {

log.Info("Creating the Field Indexer")
ow := tenant.OwnerReference{}
err = mgr.GetFieldIndexer().IndexField(context.TODO(), ow.Object(), ow.Field(), ow.Func())
err = mgr.GetFieldIndexer().IndexField(context.Background(), ow.Object(), ow.Field(), ow.Func())
if err != nil {
log.Error(err, "cannot create new Field Indexer")
os.Exit(1)
}

var r manager.Runnable
log.Info("Creating the NamespaceFilter runner")
r, err = webserver.NewKubeFilter(*listeningPort, *k8sControlPlaneUrl, *capsuleUserGroup, ctrl.GetConfigOrDie())
r, err = webserver.NewKubeFilter(*listeningPort, *k8sControlPlaneUrl, *capsuleUserGroup, *usernameClaimField, ctrl.GetConfigOrDie())
if err != nil {
log.Error(err, "cannot create NamespaceFilter runner")
os.Exit(1)
Expand Down
12 changes: 9 additions & 3 deletions webserver/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Request interface {

type httpRequest struct {
*http.Request
usernameClaimField string
}

func (h httpRequest) getJwtClaims() jwt.MapClaims {
Expand All @@ -32,6 +33,11 @@ func (h httpRequest) getJwtClaims() jwt.MapClaims {

func (h httpRequest) IsUserInGroup(groupName string) (bool, error) {
claims := h.getJwtClaims()
// ignoring Service Accounts
iss, ok := claims["iss"]
if iss == "kubernetes/serviceaccount" {
return false, nil
}
g, ok := claims["groups"]
if !ok {
return false, fmt.Errorf("missing groups claim in JWT")
Expand All @@ -45,13 +51,13 @@ func (h httpRequest) IsUserInGroup(groupName string) (bool, error) {

func (h httpRequest) GetUserName() (string, error) {
claims := h.getJwtClaims()
username, ok := claims["preferred_username"]
username, ok := claims[h.usernameClaimField]
if !ok {
return "", fmt.Errorf("missing groups claim in JWT")
}
return username.(string), nil
}

func NewHttpRequest(request *http.Request) Request {
return &httpRequest{Request: request}
func NewHttpRequest(request *http.Request, usernameClaimField string) Request {
return &httpRequest{Request: request, usernameClaimField: usernameClaimField}
}
150 changes: 37 additions & 113 deletions webserver/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,8 @@ import (
"time"

capsulev1alpha1 "github.com/clastix/capsule/api/v1alpha1"
"github.com/gorilla/websocket"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -29,7 +24,7 @@ var (
log = ctrl.Log.WithName("namespace_filter")
)

func NewKubeFilter(listeningPort uint, controlPlaneUrl, capsuleUserGroup string, config *rest.Config) (*kubeFilter, error) {
func NewKubeFilter(listeningPort uint, controlPlaneUrl, capsuleUserGroup, usernameClaimField string, config *rest.Config) (*kubeFilter, error) {
u, err := url.Parse(controlPlaneUrl)
if err != nil {
log.Error(err, "cannot parse Kubernetes Control Plane URL")
Expand Down Expand Up @@ -62,16 +57,12 @@ func NewKubeFilter(listeningPort uint, controlPlaneUrl, capsuleUserGroup string,
}(),
}

cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

return &kubeFilter{
capsuleUserGroup: capsuleUserGroup,
reverseProxy: reverseProxy,
listeningPort: listeningPort,
namespaceClientSet: cs.CoreV1().Namespaces(),
bearerToken: config.BearerToken,
usernameClaimField: usernameClaimField,
}, nil
}

Expand All @@ -80,7 +71,8 @@ type kubeFilter struct {
reverseProxy *httputil.ReverseProxy
client client.Client
listeningPort uint
namespaceClientSet v1.NamespaceInterface
bearerToken string
usernameClaimField string
}

func (n *kubeFilter) InjectClient(client client.Client) error {
Expand All @@ -98,19 +90,16 @@ func (n kubeFilter) isWatchEndpoint(request *http.Request) (ok bool) {

func (n kubeFilter) Start(stop <-chan struct{}) error {
http.HandleFunc("/api/v1/namespaces", func(writer http.ResponseWriter, request *http.Request) {
if n.isWatchEndpoint(request) {
log.Info("handling /api/v1/namespaces for WebSocket")
n.filterWsNamespace(writer, request)
}
if request.Method == "GET" {
log.Info("handling /api/v1/namespaces")
n.filterHttpNamespace(writer, request)
return
if request.Method == "GET" || n.isWatchEndpoint(request) {
log.Info("decorating /api/v1/namespaces request")
if err := n.decorateRequest(writer, request); err != nil {
n.handleError(err, writer)
return
}
}
n.reverseProxyFunc(writer, request)
})
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
log.Info("handling /")
n.reverseProxyFunc(writer, request)
})
http.HandleFunc("/_healthz", func(writer http.ResponseWriter, request *http.Request) {
Expand All @@ -130,16 +119,18 @@ func (n kubeFilter) Start(stop <-chan struct{}) error {
}

func (n kubeFilter) reverseProxyFunc(writer http.ResponseWriter, request *http.Request) {
log.Info("handling " + request.URL.String())
n.reverseProxy.ServeHTTP(writer, request)
}

type errorJson struct {
Error string `json:"error"`
}

func (n kubeFilter) handleError(err error, msg string, writer http.ResponseWriter) {
log.Error(err, msg)
func (n kubeFilter) handleError(err error, writer http.ResponseWriter) {
log.Error(err, "handling failed request")
writer.WriteHeader(500)
writer.Header().Set("Content-Type", "application/json")
b, _ := json.Marshal(errorJson{Error: err.Error()})
_, _ = writer.Write(b)
}
Expand All @@ -149,7 +140,7 @@ func (n kubeFilter) getOwnedNamespacesForUser(username string) (res NamespaceLis
f := client.MatchingFields{
".spec.owner.ownerkind": fmt.Sprintf("%s:%s", "User", username),
}
if err := n.client.List(context.TODO(), tl, f); err != nil {
if err := n.client.List(context.Background(), tl, f); err != nil {
return nil, fmt.Errorf("cannot retrieve Tenant list: %s", err.Error())
}

Expand Down Expand Up @@ -184,111 +175,44 @@ func (n kubeFilter) getLabelSelectorForUser(username string) (labels.Selector, e
return labels.NewSelector().Add(*req), nil
}

func (n kubeFilter) filterHttpNamespace(writer http.ResponseWriter, request *http.Request) {
r := NewHttpRequest(request)
func (n kubeFilter) decorateRequest(writer http.ResponseWriter, request *http.Request) error {
r := NewHttpRequest(request, n.usernameClaimField)

if ok, err := r.IsUserInGroup(n.capsuleUserGroup); err != nil {
n.handleError(err, "cannot determinate User group", writer)
return
return fmt.Errorf("cannot determinate User group: %s", err.Error())
} else if !ok {
n.reverseProxyFunc(writer, request)
return
// not a Capsule user, let's break
return nil
}

var username string
var err error
username, err = r.GetUserName()
if err != nil {
n.handleError(err, "cannot determinate username", writer)
return
return fmt.Errorf("cannot determinate username: %s", err.Error())
}

var s labels.Selector
s, err = n.getLabelSelectorForUser(username)
if err != nil {
n.handleError(err, "cannot create label selector", writer)
return
}

nl := &corev1.NamespaceList{}
err = n.client.List(context.TODO(), nl, &client.ListOptions{
LabelSelector: s,
})
if err != nil {
n.handleError(err, "cannot list Tenant resources", writer)
return
}
var b []byte
b, err = json.Marshal(nl)
if err != nil {
n.handleError(err, "cannot marshal Namespace List resource", writer)
return
}
_, _ = writer.Write(b)
}

func (n kubeFilter) namespacesGet(proxy *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) {
return func(writer http.ResponseWriter, request *http.Request) {
if len(request.Header.Get("Upgrade")) > 0 {
n.filterWsNamespace(writer, request)
return
}
if request.Method == "GET" {
n.filterHttpNamespace(writer, request)
return
}
n.reverseProxyFunc(writer, request)
}
}

func (n *kubeFilter) filterWsNamespace(writer http.ResponseWriter, request *http.Request) {
r := NewHttpRequest(request)

if ok, err := r.IsUserInGroup(n.capsuleUserGroup); err != nil {
log.Error(err, "cannot determinate User group")
panic(err)
} else if !ok {
n.reverseProxyFunc(writer, request)
return
}

var username string
var err error
username, err = r.GetUserName()
if err != nil {
log.Error(err, "cannot determinate User username")
panic(err)
return fmt.Errorf("cannot create label selector: %s", err)
}

u := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
c, err := u.Upgrade(writer, request, nil)
if err != nil {
log.Error(err, "cannot upgrade connection")
panic(err)
q := request.URL.Query()
if e := q.Get("labelSelector"); len(e) > 0 {
v := e + "," + s.String()
q.Add("labelSelector", v)
log.Info("labelSelector updated", "selector", v)
} else {
q.Add("labelSelector", s.String())
log.Info("labelSelector updated", "selector", s.String())
}
defer func() {
_ = c.Close()
}()
log.Info("updating RawQuery", "query", q.Encode())
request.URL.RawQuery = q.Encode()

s, _ := n.getLabelSelectorForUser(username)
watch, err := n.namespaceClientSet.Watch(context.Background(), metav1.ListOptions{
LabelSelector: s.String(),
Watch: true,
})
if err != nil {
log.Error(err, "cannot start watch")
panic(err)
}
log.Info("Updating the token", "token", n.bearerToken)
request.Header.Set("Authorization", "Bearer "+n.bearerToken)

for event := range watch.ResultChan() {
err = c.WriteMessage(websocket.TextMessage, NewMessage(event).Serialize())
if err != nil {
log.Error(err, "cannot write websocket message")
watch.Stop()
}
}
log.Info("proxying to API Server", "url", request.URL.String())
return nil
}

0 comments on commit 59bbe7d

Please sign in to comment.