Skip to content

Commit

Permalink
perf: Add kube-client-opts for enabling caching
Browse files Browse the repository at this point in the history
fix not starting clusterWftmpl Informer in server
add more descriptive client store naming

Signed-off-by: Jakub Buczak <[email protected]>
  • Loading branch information
jakkubu committed Oct 11, 2024
1 parent dcc28d2 commit e7ab07a
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 38 deletions.
3 changes: 2 additions & 1 deletion pkg/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Client interface {

type Opts struct {
ArgoServerOpts ArgoServerOpts
ArgoKubeOpts ArgoKubeOpts
InstanceID string
AuthSupplier func() string
// DEPRECATED: use `ClientConfigSupplier`
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewClientFromOpts(opts Opts) (context.Context, Client, error) {
opts.ClientConfig = opts.ClientConfigSupplier()
}

ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ClientConfig, instanceid.NewService(opts.InstanceID))
ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ArgoKubeOpts, opts.ClientConfig, instanceid.NewService(opts.InstanceID))
return ctx, client, err
}
}
84 changes: 66 additions & 18 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package apiclient
import (
"context"
"fmt"
"log"

eventsource "github.com/argoproj/argo-events/pkg/client/eventsource/clientset/versioned"
sensor "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned"
Expand All @@ -11,8 +12,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-workflows/v3/server/workflow/store"

"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
Expand All @@ -27,6 +26,8 @@ import (
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow"
"github.com/argoproj/argo-workflows/v3/server/types"
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow"
"github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowstore "github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate"
"github.com/argoproj/argo-workflows/v3/util/help"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
Expand All @@ -37,16 +38,34 @@ var (
NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see %s", help.CLI())
)

type ArgoKubeOpts struct {
// Closing caching channel will stop caching informers
CachingCloseCh chan struct{}

// Whether to cache WorkflowTemplates, ClusterWorkflowTemplates and Workflows
// This improves performance of reading
// It is especially visible during validating templates,
//
// Note that templates caching currently uses informers, so not all template
// get/list can use it, since informer has limited capabilities (such as filtering)
//
// Workflow caching uses in-memory SQLite DB and it provides full capabilities
UseCaching bool
}

type argoKubeClient struct {
opts ArgoKubeOpts
instanceIDService instanceid.Service
wfClient workflow.Interface
wfTmplStore types.WorkflowTemplateStore
cwfInformer types.ClusterWorkflowTemplateStore
cwfTmplStore types.ClusterWorkflowTemplateStore
wfLister workflowstore.WorkflowLister
wfStore workflowstore.WorkflowStore
}

var _ Client = &argoKubeClient{}

func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {
func newArgoKubeClient(ctx context.Context, opts ArgoKubeOpts, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {

Check failure on line 68 in pkg/apiclient/argo-kube-client.go

View workflow job for this annotation

GitHub Actions / Lint

`newArgoKubeClient` - `opts` is unused (unparam)
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, nil, err
Expand All @@ -65,14 +84,6 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
wftmplStore, err := workflowtemplateserver.NewInformer(restConfig, namespace)
if err != nil {
return nil, nil, err
}
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig)
if err != nil {
return nil, nil, err
}
eventSourceInterface, err := eventsource.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -100,21 +111,58 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService, wfClient, wftmplStore, cwftmplInformer}, nil

client := &argoKubeClient{
instanceIDService: instanceIDService,
wfClient: wfClient,
}
err = client.startStores(restConfig, namespace)
if err != nil {
return nil, nil, err
}

return ctx, client, nil
}

func (a *argoKubeClient) startStores(restConfig *restclient.Config, namespace string) error {
if a.opts.UseCaching {
wftmplInformer, err := workflowtemplateserver.NewInformer(restConfig, namespace)
if err != nil {
return err
}
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig)
if err != nil {
return err
}
wfStore, err := store.NewSQLiteStore(a.instanceIDService)
if err != nil {
log.Fatal(err)
}
wftmplInformer.Run(a.opts.CachingCloseCh)
cwftmplInformer.Run(a.opts.CachingCloseCh)
a.wfStore = wfStore
a.wfLister = wfStore
a.wfTmplStore = wftmplInformer
a.cwfTmplStore = cwftmplInformer
} else {
a.wfLister = store.NewKubeLister(a.wfClient)
a.wfTmplStore = workflowtemplateserver.NewWorkflowTemplateClientStore()
a.cwfTmplStore = clusterworkflowtmplserver.NewClusterWorkflowTemplateClientStore()
}
return nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, a.wfTmplStore, a.cwfInformer, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfLister, a.wfStore, a.wfTmplStore, a.cwfTmplStore, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService, a.wfTmplStore, a.cwfInformer)}}, nil
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewWorkflowTemplateServiceClient() (workflowtemplate.WorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService, a.wfTmplStore, a.cwfInformer)}}, nil
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
Expand All @@ -126,5 +174,5 @@ func (a *argoKubeClient) NewInfoServiceClient() (infopkg.InfoServiceClient, erro
}

func (a *argoKubeClient) NewClusterWorkflowTemplateServiceClient() (clusterworkflowtemplate.ClusterWorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService, a.cwfInformer)}}, nil
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService, a.cwfTmplStore)}}, nil
}
1 change: 1 addition & 0 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
grpcL := tcpm.Match(cmux.Any())

wftmplStore.Run(as.stopCh)
cwftmplInformer.Run(as.stopCh)
go eventServer.Run(as.stopCh)
go workflowServer.Run(as.stopCh)
go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ClusterWorkflowTemplateServer struct {

func NewClusterWorkflowTemplateServer(instanceID instanceid.Service, cwftmplStore servertypes.ClusterWorkflowTemplateStore) clusterwftmplpkg.ClusterWorkflowTemplateServiceServer {
if cwftmplStore == nil {
cwftmplStore = NewCwfClientStore()
cwftmplStore = NewClusterWorkflowTemplateClientStore()
}
return &ClusterWorkflowTemplateServer{instanceID, cwftmplStore}
}
Expand Down
2 changes: 0 additions & 2 deletions server/clusterworkflowtemplate/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func NewInformer(restConfig *rest.Config) (*Informer, error) {
// Start informer in separate go-routine and block until cache sync
func (cwti *Informer) Run(stopCh <-chan struct{}) {

cwti.informer.Informer()

go cwti.informer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(
Expand Down
8 changes: 4 additions & 4 deletions server/clusterworkflowtemplate/wf_client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (

// Store is a wrapper around informer
// if
type CwfClientStore struct {
type ClusterWorkflowTemplateClientStore struct {
}

func NewCwfClientStore() *CwfClientStore {
return &CwfClientStore{}
func NewClusterWorkflowTemplateClientStore() *ClusterWorkflowTemplateClientStore {
return &ClusterWorkflowTemplateClientStore{}
}

func (wcs *CwfClientStore) Getter(ctx context.Context) templateresolution.ClusterWorkflowTemplateGetter {
func (wcs *ClusterWorkflowTemplateClientStore) Getter(ctx context.Context) templateresolution.ClusterWorkflowTemplateGetter {
wfClient := auth.GetWfClient(ctx)
return templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
}
4 changes: 2 additions & 2 deletions server/cronworkflow/cron_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type cronWorkflowServiceServer struct {
// NewCronWorkflowServer returns a new cronWorkflowServiceServer
func NewCronWorkflowServer(instanceIDService instanceid.Service, wftmplStore servertypes.WorkflowTemplateStore, cwftmplStore servertypes.ClusterWorkflowTemplateStore) cronworkflowpkg.CronWorkflowServiceServer {
if wftmplStore == nil {
wftmplStore = workflowtemplate.NewWfClientStore()
wftmplStore = workflowtemplate.NewWorkflowTemplateClientStore()
}
if cwftmplStore == nil {
cwftmplStore = clusterworkflowtemplate.NewCwfClientStore()
cwftmplStore = clusterworkflowtemplate.NewClusterWorkflowTemplateClientStore()
}
return &cronWorkflowServiceServer{instanceIDService, wftmplStore, cwftmplStore}
}
Expand Down
4 changes: 2 additions & 2 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ var _ workflowpkg.WorkflowServiceServer = &workflowServer{}
// NewWorkflowServer returns a new WorkflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore, wftmplStore servertypes.WorkflowTemplateStore, cwftmplStore servertypes.ClusterWorkflowTemplateStore, namespace *string) *workflowServer {
if wftmplStore == nil {
wftmplStore = workflowtemplate.NewWfClientStore()
wftmplStore = workflowtemplate.NewWorkflowTemplateClientStore()
}
if cwftmplStore == nil {
cwftmplStore = clusterworkflowtemplate.NewCwfClientStore()
cwftmplStore = clusterworkflowtemplate.NewClusterWorkflowTemplateClientStore()
}
ws := &workflowServer{
instanceIDService: instanceIDService,
Expand Down
2 changes: 0 additions & 2 deletions server/workflowtemplate/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func NewInformer(restConfig *rest.Config, managedNamespace string) (*Informer, e
// Start informer in separate go-routine and block until cache sync
func (wti *Informer) Run(stopCh <-chan struct{}) {

wti.informer.Informer()

go wti.informer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(
Expand Down
8 changes: 4 additions & 4 deletions server/workflowtemplate/wf_client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (

// Store is a wrapper around informer
// if
type WfClientStore struct {
type WorkflowTemplateClientStore struct {
}

func NewWfClientStore() *WfClientStore {
return &WfClientStore{}
func NewWorkflowTemplateClientStore() *WorkflowTemplateClientStore {
return &WorkflowTemplateClientStore{}
}

func (wcs *WfClientStore) Getter(ctx context.Context, namespace string) templateresolution.WorkflowTemplateNamespacedGetter {
func (wcs *WorkflowTemplateClientStore) Getter(ctx context.Context, namespace string) templateresolution.WorkflowTemplateNamespacedGetter {
wfClient := auth.GetWfClient(ctx)
return templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(namespace))
}
4 changes: 2 additions & 2 deletions server/workflowtemplate/workflow_template_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type WorkflowTemplateServer struct {

func NewWorkflowTemplateServer(instanceIDService instanceid.Service, wftmplStore servertypes.WorkflowTemplateStore, cwftmplStore servertypes.ClusterWorkflowTemplateStore) workflowtemplatepkg.WorkflowTemplateServiceServer {
if wftmplStore == nil {
wftmplStore = NewWfClientStore()
wftmplStore = NewWorkflowTemplateClientStore()
}
if cwftmplStore == nil {
cwftmplStore = clusterworkflowtemplate.NewCwfClientStore()
cwftmplStore = clusterworkflowtemplate.NewClusterWorkflowTemplateClientStore()
}
return &WorkflowTemplateServer{instanceIDService, wftmplStore, cwftmplStore}
}
Expand Down

0 comments on commit e7ab07a

Please sign in to comment.