Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

feat: call cluster service to create identity cluster relation while creating user #745

Merged
merged 3 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions application/service/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type ClusterService interface {
Clusters(ctx context.Context, options ...rest.HTTPClientOption) ([]cluster.Cluster, error)
ClusterByURL(ctx context.Context, url string, options ...rest.HTTPClientOption) (*cluster.Cluster, error)
Status(ctx context.Context, options ...rest.HTTPClientOption) (bool, error)
UnlinkIdentityFromCluster(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error
LinkIdentityToCluster(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error
Stop()
}

Expand Down
10 changes: 10 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package cluster

import "context"

// Cluster represents an OpenShift cluster configuration
type Cluster struct {
Name string `mapstructure:"name"`
Expand All @@ -16,3 +18,11 @@ type Cluster struct {
AuthClientDefaultScope string `mapstructure:"auth-client-default-scope"`
CapacityExhausted bool `mapstructure:"capacity-exhausted"` // Optional in oso-clusters.conf ('false' by default)
}

type ClusterCache interface {
RLock()
RUnlock()
Clusters() map[string]Cluster
Start(ctx context.Context) error
Stop()
}
3 changes: 2 additions & 1 deletion cluster/factory/cluster_cache_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
servicecontext "github.com/fabric8-services/fabric8-auth/application/service/context"
"github.com/fabric8-services/fabric8-auth/authorization/token/manager"
"github.com/fabric8-services/fabric8-auth/cluster"
clusterservice "github.com/fabric8-services/fabric8-auth/cluster/service"
"github.com/fabric8-services/fabric8-auth/rest"
)

Expand All @@ -34,5 +35,5 @@ type clusterCacheFactoryImpl struct {

// NewClusterCache creates a new cluster cache
func (f *clusterCacheFactoryImpl) NewClusterCache(ctx context.Context, options ...rest.HTTPClientOption) cluster.ClusterCache {
return cluster.NewCache(f.config, options...)
return clusterservice.NewCache(f.config, options...)
}
67 changes: 12 additions & 55 deletions cluster/cache.go → cluster/service/cache.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package cluster
package service

import (
"context"
"net/http"
"net/url"
"sync"
"time"

"github.com/fabric8-services/fabric8-auth/authorization/token/manager"
"github.com/fabric8-services/fabric8-auth/goasupport"
"github.com/fabric8-services/fabric8-auth/log"
"github.com/fabric8-services/fabric8-auth/rest"
"github.com/fabric8-services/fabric8-cluster-client/cluster"
clusterclient "github.com/fabric8-services/fabric8-cluster-client/cluster"

goaclient "github.com/goadesign/goa/client"
"github.com/fabric8-services/fabric8-auth/cluster"
"github.com/pkg/errors"
)

Expand All @@ -23,25 +22,17 @@ type clusterConfig interface {
GetClusterCacheRefreshInterval() time.Duration
}

type ClusterCache interface {
RLock()
RUnlock()
Clusters() map[string]Cluster
Start(ctx context.Context) error
Stop()
}

type cache struct {
sync.RWMutex

config clusterConfig
options []rest.HTTPClientOption
refresher *time.Ticker
stopCh chan bool
clusters map[string]Cluster
clusters map[string]cluster.Cluster
}

func NewCache(config clusterConfig, options ...rest.HTTPClientOption) ClusterCache {
func NewCache(config clusterConfig, options ...rest.HTTPClientOption) cluster.ClusterCache {
return &cache{
config: config,
refresher: time.NewTicker(config.GetClusterCacheRefreshInterval()),
Expand Down Expand Up @@ -81,7 +72,7 @@ func (c *cache) Stop() {
}
}

func (c *cache) Clusters() map[string]Cluster {
func (c *cache) Clusters() map[string]cluster.Cluster {
return c.clusters
}

Expand All @@ -99,13 +90,14 @@ func (c *cache) refreshCache(ctx context.Context) error {
}

// fetchClusters fetches a new list of clusters from Cluster Management Service
func (c *cache) fetchClusters(ctx context.Context) (map[string]Cluster, error) {
cln, err := c.createClientWithServiceAccountSigner(ctx)
func (c *cache) fetchClusters(ctx context.Context) (map[string]cluster.Cluster, error) {
signer := newJWTSASigner(ctx, c.config, c.options...)
cln, err := signer.createSignedClient()
if err != nil {
return nil, err
}

res, err := cln.ShowAuthClientClusters(goasupport.ForwardContextRequestID(ctx), cluster.ShowAuthClientClustersPath())
res, err := cln.ShowAuthClientClusters(goasupport.ForwardContextRequestID(ctx), clusterclient.ShowAuthClientClustersPath())
if err != nil {
return nil, err
}
Expand All @@ -125,10 +117,10 @@ func (c *cache) fetchClusters(ctx context.Context) (map[string]Cluster, error) {
return nil, err
}

clusterMap := map[string]Cluster{}
clusterMap := map[string]cluster.Cluster{}
if clusters.Data != nil {
for _, d := range clusters.Data {
cls := Cluster{
cls := cluster.Cluster{
Name: d.Name,
APIURL: d.APIURL,
AppDNS: d.AppDNS,
Expand All @@ -148,38 +140,3 @@ func (c *cache) fetchClusters(ctx context.Context) (map[string]Cluster, error) {
}
return clusterMap, nil
}

// createClientWithSASigner creates a client with a JWT signer which uses the Auth Service Account token
func (c *cache) createClientWithServiceAccountSigner(ctx context.Context) (*cluster.Client, error) {
cln, err := c.createClient(ctx)
if err != nil {
return nil, err
}
m, err := manager.DefaultManager(c.config)
if err != nil {
return nil, err
}
signer := m.AuthServiceAccountSigner()
cln.SetJWTSigner(signer)
return cln, nil
}

func (c *cache) createClient(ctx context.Context) (*cluster.Client, error) {
u, err := url.Parse(c.config.GetClusterServiceURL())
if err != nil {
return nil, err
}

httpClient := http.DefaultClient

if c.options != nil {
for _, opt := range c.options {
opt(httpClient)
}
}
cln := cluster.New(goaclient.HTTPClientDoer(httpClient))

cln.Host = u.Host
cln.Scheme = u.Scheme
return cln, nil
}
123 changes: 119 additions & 4 deletions cluster/service/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import (
"github.com/fabric8-services/fabric8-auth/application/service/base"
servicecontext "github.com/fabric8-services/fabric8-auth/application/service/context"
"github.com/fabric8-services/fabric8-auth/cluster"
"github.com/fabric8-services/fabric8-auth/goasupport"
"github.com/fabric8-services/fabric8-auth/log"
"github.com/fabric8-services/fabric8-auth/rest"
clusterclient "github.com/fabric8-services/fabric8-cluster-client/cluster"
goaclient "github.com/goadesign/goa/client"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
"net/http"
"net/url"
)

type clusterServiceConfig interface {
Expand Down Expand Up @@ -77,6 +85,64 @@ func (s *clusterService) Stop() {
}
}

// LinkIdentityToCluster links Identity To Cluster using Cluster URL
func (s *clusterService) LinkIdentityToCluster(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error {
signer := newJWTSASigner(ctx, s.config, options...)
remoteClusterService, err := signer.createSignedClient()
if err != nil {
return errors.Wrapf(err, "failed to create JWT signer for cluster service")
}
identityToClusterData := &clusterclient.LinkIdentityToClusterData{
ClusterURL: clusterURL,
IdentityID: identityID.String(),
}
res, err := remoteClusterService.LinkIdentityToClusterClusters(goasupport.ForwardContextRequestID(ctx), clusterclient.LinkIdentityToClusterClustersPath(), identityToClusterData)
if err != nil {
return errors.Wrapf(err, "failed to link identity %s to cluster having url %s", identityID, clusterURL)
}
defer rest.CloseResponse(res)
bodyString := rest.ReadBody(res.Body) // To prevent FDs leaks
if res.StatusCode != http.StatusNoContent {
log.Error(ctx, map[string]interface{}{
"identity_id": identityID,
"cluster_url": clusterURL,
"response_status": res.Status,
"response_body": bodyString,
}, "unable to link identity to cluster in cluster management service")
return errors.Errorf("failed to link identity to cluster in cluster management service. Response status: %s. Response body: %s", res.Status, bodyString)
}
return nil
}

// UnlinkIdentityFromCluster removes linked Identity from Cluster using Cluster URL
func (s *clusterService) UnlinkIdentityFromCluster(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error {
signer := newJWTSASigner(ctx, s.config, options...)
remoteClusterService, err := signer.createSignedClient()
if err != nil {
return err
}
identityToClusterData := &clusterclient.UnLinkIdentityToClusterdata{
ClusterURL: clusterURL,
IdentityID: identityID.String(),
}
res, err := remoteClusterService.RemoveIdentityToClusterLinkClusters(goasupport.ForwardContextRequestID(ctx), clusterclient.RemoveIdentityToClusterLinkClustersPath(), identityToClusterData)
if err != nil {
return errors.Wrapf(err, "failed to unlink identity %s from cluster having url %s", identityID, clusterURL)
}
defer rest.CloseResponse(res)
bodyString := rest.ReadBody(res.Body) // To prevent FDs leaks
if res.StatusCode != http.StatusNoContent {
log.Error(ctx, map[string]interface{}{
"identity_id": identityID,
"cluster_url": clusterURL,
"response_status": res.Status,
"response_body": bodyString,
}, "unable to remove identity cluster relationship in cluster management service")
return errors.Errorf("failed to unlink identity to cluster in cluster management service. Response status: %s. Response body: %s", res.Status, bodyString)
}
return nil
}

// Start initializes the default Cluster cache if it's not initialized already
// Cache initialization loads the list of clusters from the cluster management service and starts regular cache refresher
func Start(ctx context.Context, factory service.ClusterCacheFactory, options ...rest.HTTPClientOption) (bool, error) {
Expand All @@ -93,10 +159,10 @@ func Start(ctx context.Context, factory service.ClusterCacheFactory, options ...
} else {
clusterCache = nil
}
return (clusterCache != nil && started == uint32(1)), err
return clusterCache != nil && started == uint32(1), err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this code tries to start the cache only once, then maybe https://golang.org/pkg/sync/#Once.Do would be more appropriate ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can start the cache multiple times. It should initialize the cache successfully only once. But if it failed then it should keep trying. This is why we can't use once.Do() here.
And initialization can fail during auth service startup if cluster service is not ready yet (since it may be waiting for Auth service to start).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
}
return (clusterCache != nil && started == uint32(1)), nil
return clusterCache != nil && started == uint32(1), nil
}

// Clusters converts the given cluster map to an array slice
Expand All @@ -109,11 +175,60 @@ func Clusters(clusters map[string]cluster.Cluster) []cluster.Cluster {
}

func ClusterByURL(clusters map[string]cluster.Cluster, url string) *cluster.Cluster {
for apiURL, cluster := range clusters {
for apiURL, c := range clusters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good work in avoiding var names with package names 👍

if strings.HasPrefix(rest.AddTrailingSlashToURL(url), apiURL) {
return &cluster
return &c
}
}

return nil
}

type saSigner interface {
createSignedClient() (*clusterclient.Client, error)
}

type jwtSASigner struct {
ctx context.Context
config clusterConfig
options []rest.HTTPClientOption
}

func newJWTSASigner(ctx context.Context, config clusterConfig, options ...rest.HTTPClientOption) saSigner {
return &jwtSASigner{ctx, config, options}
}

// CreateSignedClient creates a client with a JWT signer which uses the Auth Service Account token
func (c jwtSASigner) createSignedClient() (*clusterclient.Client, error) {
cln, err := c.createClient(c.ctx)
if err != nil {
return nil, err
}
m, err := manager.DefaultManager(c.config)
if err != nil {
return nil, err
}
signer := m.AuthServiceAccountSigner()
cln.SetJWTSigner(signer)
return cln, nil
}

func (c jwtSASigner) createClient(ctx context.Context) (*clusterclient.Client, error) {
u, err := url.Parse(c.config.GetClusterServiceURL())
if err != nil {
return nil, err
}

httpClient := http.DefaultClient

if c.options != nil {
for _, opt := range c.options {
opt(httpClient)
}
}
cln := clusterclient.New(goaclient.HTTPClientDoer(httpClient))

cln.Host = u.Host
cln.Scheme = u.Scheme
return cln, nil
}
Loading