Skip to content

Commit

Permalink
Ingest new policy types (#58)
Browse files Browse the repository at this point in the history
This PR enables the ingestion of the new Linkerd policy types that are outlined in linkerd/linkerd2#7709

Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev authored Apr 20, 2022
1 parent 84ef4af commit 6488061
Show file tree
Hide file tree
Showing 17 changed files with 989 additions and 404 deletions.
8 changes: 6 additions & 2 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
l5dApi "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/pkg/admin"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
tsclient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -72,9 +73,12 @@ func Main(args []string) {
l5dClient, err := l5dApi.NewForConfig(k8sConfig)
dieIf(err)

tsClient, err := tsclient.NewForConfig(k8sConfig)
dieIf(err)

sharedInformers := informers.NewSharedInformerFactory(k8sAPI.Interface, 10*time.Minute)

k8sClient := k8s.NewClient(sharedInformers, k8sAPI, l5dClient, *localMode)
k8sClient := k8s.NewClient(sharedInformers, k8sAPI, l5dClient, tsClient, *localMode)

// wait for discovery API to load

Expand Down Expand Up @@ -132,7 +136,7 @@ func Main(args []string) {
go manageAgentHandler.Start()

// run admin server
adminServer := admin.NewServer(*adminAddr)
adminServer := admin.NewServer(*adminAddr, false)
go adminServer.ListenAndServe()

// wait for shutdown
Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/handler/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestEvent(t *testing.T) {
k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
}
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, false)
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, nil, false)

m := &api.MockBcloudClient{}
apiClient := api.NewClient(m)
Expand Down
25 changes: 23 additions & 2 deletions agent/pkg/handler/linkerd_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,30 @@ func (h *LinkerdInfo) handleAuthPolicyInfo(ctx context.Context) {
return
}

authPolicies, err := h.k8s.GetAuthorizationPolicies(ctx)
if err != nil {
h.log.Errorf("error getting authorization policies: %s", err)
return
}

meshTlsAuthentications, err := h.k8s.GetMeshTLSAuthentications(ctx)
if err != nil {
h.log.Errorf("error getting mesh tls authentications: %s", err)
return
}

networkAuthentications, err := h.k8s.GetNetworkAuthentications(ctx)
if err != nil {
h.log.Errorf("error getting network authentications: %s", err)
return
}

m := &pb.AuthPolicyInfo{
Servers: servers,
ServerAuthorizations: serverAuths,
Servers: servers,
ServerAuthorizations: serverAuths,
AuthorizationPolicies: authPolicies,
MeshTlsAuthentications: meshTlsAuthentications,
NetworkAuthentications: networkAuthentications,
}
h.log.Tracef("handleAuthPolicyInfo: %s", prototext.Format(m))

Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/handler/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestWorkloadStream(t *testing.T) {
k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
}
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, false)
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, nil, false)

m := &api.MockBcloudClient{}
apiClient := api.NewClient(m)
Expand Down
4 changes: 2 additions & 2 deletions agent/pkg/k8s/certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestFindIdentityPod(t *testing.T) {
t.Run(tc.testName, func(t *testing.T) {
c := fakeClient(tc.pods...)
c.Sync(nil, time.Second)
client := NewClient(c.sharedInformers, nil, nil, false)
client := NewClient(c.sharedInformers, nil, nil, nil, false)

pod, err := client.getControlPlaneComponentPod(identityComponentName)
if tc.expectedErr != nil {
Expand Down Expand Up @@ -465,7 +465,7 @@ AiAtuoI5XuCtrGVRzSmRTl2ra28aV9MyTU7d5qnTAFHKSgIgRKCvluOSgA5O21p5
}

c.Sync(nil, time.Second)
client := NewClient(c.sharedInformers, c.k8sClient, nil, false)
client := NewClient(c.sharedInformers, c.k8sClient, nil, nil, false)

roots, err := client.extractRootsCerts(context.Background(), tc.container, "linkerd")
if tc.expectedErr != nil {
Expand Down
12 changes: 3 additions & 9 deletions agent/pkg/k8s/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ func fakeClient(objects ...runtime.Object) *Client {

k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
TsClient: ts,
}

client := NewClient(sharedInformers, k8sApi, l5dApiClient, false)
client := NewClient(sharedInformers, k8sApi, l5dApiClient, ts, false)
client.ignoreCRDSupportCheck = true
return client
}
Expand All @@ -42,13 +41,8 @@ func fakeClientSets(objects ...runtime.Object) (kubernetes.Interface, l5dClient.

for _, obj := range objects {
switch obj.GetObjectKind().GroupVersionKind().Kind {
case "ServiceProfile":
l5dObjects = append(l5dObjects, obj)
case "ServerAuthorization":
l5dObjects = append(l5dObjects, obj)
case "Server":
l5dObjects = append(l5dObjects, obj)
case "Link":
case "ServiceProfile", "ServerAuthorization", "Server", "AuthorizationPolicy",
"MeshTLSAuthentication", "NetworkAuthentication", "Link":
l5dObjects = append(l5dObjects, obj)
case "TrafficSplit":
tsObjs = append(tsObjs, obj)
Expand Down
8 changes: 7 additions & 1 deletion agent/pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"time"

link "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha1"
policy "github.com/linkerd/linkerd2/controller/gen/apis/policy/v1alpha1"
server "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
serverAuthorization "github.com/linkerd/linkerd2/controller/gen/apis/serverauthorization/v1beta1"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
l5dApi "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dscheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
ts "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/split/v1alpha1"
tsclient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
tsscheme "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned/scheme"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,6 +36,7 @@ import (
type Client struct {
k8sClient *l5dk8s.KubernetesAPI
l5dClient l5dApi.Interface
tsClient tsclient.Interface

encoders map[runtime.GroupVersioner]runtime.Encoder

Expand Down Expand Up @@ -84,7 +87,7 @@ const (

var errSyncCache = errors.New("failed to sync caches")

func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8s.KubernetesAPI, l5dClient l5dApi.Interface, local bool) *Client {
func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8s.KubernetesAPI, l5dClient l5dApi.Interface, tsClient tsclient.Interface, local bool) *Client {
log := log.WithField("client", "k8s")
log.Debug("initializing")

Expand Down Expand Up @@ -116,6 +119,7 @@ func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8
link.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, link.SchemeGroupVersion),
serverAuthorization.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, serverAuthorization.SchemeGroupVersion),
server.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, server.SchemeGroupVersion),
policy.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, policy.SchemeGroupVersion),
}

podInformer := sharedInformers.Core().V1().Pods()
Expand All @@ -141,6 +145,8 @@ func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8

sharedInformers: sharedInformers,

tsClient: tsClient,

podLister: podInformer.Lister(),
rsLister: rsInformer.Lister(),
dsLister: dsInformer.Lister(),
Expand Down
86 changes: 84 additions & 2 deletions agent/pkg/k8s/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import (
"context"

pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
policy "github.com/linkerd/linkerd2/controller/gen/apis/policy/v1alpha1"
server "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
serverAuthorization "github.com/linkerd/linkerd2/controller/gen/apis/serverauthorization/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
sazGVR = serverAuthorization.SchemeGroupVersion.WithResource("serverauthorizations")
serverGVR = server.SchemeGroupVersion.WithResource("servers")
sazGVR = serverAuthorization.SchemeGroupVersion.WithResource("serverauthorizations")
serverGVR = server.SchemeGroupVersion.WithResource("servers")
authPolicyGVR = policy.SchemeGroupVersion.WithResource("authorizationpolicies")
meshTLSAuthGVR = policy.SchemeGroupVersion.WithResource("meshtlsauthentications")
netwrokAuthGVR = policy.SchemeGroupVersion.WithResource("networkauthentications")
)

func (c *Client) GetServers(ctx context.Context) ([]*pb.Server, error) {
Expand Down Expand Up @@ -65,3 +69,81 @@ func (c *Client) GetServerAuths(ctx context.Context) ([]*pb.ServerAuthorization,

return results, nil
}

func (c *Client) GetAuthorizationPolicies(ctx context.Context) ([]*pb.AuthorizationPolicy, error) {
supported, err := c.resourceSupported(authPolicyGVR)
if err != nil {
return nil, err
}

if !supported {
return nil, nil
}

policies, err := c.l5dClient.PolicyV1alpha1().AuthorizationPolicies(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

results := make([]*pb.AuthorizationPolicy, len(policies.Items))
for i, s := range policies.Items {
s := s
results[i] = &pb.AuthorizationPolicy{
AuthorizationPolicy: c.serialize(&s, policy.SchemeGroupVersion),
}
}

return results, nil
}

func (c *Client) GetMeshTLSAuthentications(ctx context.Context) ([]*pb.MeshTLSAuthentication, error) {
supported, err := c.resourceSupported(meshTLSAuthGVR)
if err != nil {
return nil, err
}

if !supported {
return nil, nil
}

meshAuths, err := c.l5dClient.PolicyV1alpha1().MeshTLSAuthentications(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

results := make([]*pb.MeshTLSAuthentication, len(meshAuths.Items))
for i, s := range meshAuths.Items {
s := s
results[i] = &pb.MeshTLSAuthentication{
MeshTlsAuthentication: c.serialize(&s, policy.SchemeGroupVersion),
}
}

return results, nil
}

func (c *Client) GetNetworkAuthentications(ctx context.Context) ([]*pb.NetworkAuthentication, error) {
supported, err := c.resourceSupported(netwrokAuthGVR)
if err != nil {
return nil, err
}

if !supported {
return nil, nil
}

networkAuths, err := c.l5dClient.PolicyV1alpha1().NetworkAuthentications(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

results := make([]*pb.NetworkAuthentication, len(networkAuths.Items))
for i, s := range networkAuths.Items {
s := s
results[i] = &pb.NetworkAuthentication{
NetworkAuthentication: c.serialize(&s, policy.SchemeGroupVersion),
}
}

return results, nil
}
Loading

0 comments on commit 6488061

Please sign in to comment.