diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb..419db1b9f67 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -77,9 +77,20 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } } + var pipeline_options argocompiler.Options + for _, platform := range t.platformSpec.Platforms { + if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" { + pipeline_options = argocompiler.Options{ + SemaphoreKey: platform.PipelineConfig.SemaphoreKey, + MutexName: platform.PipelineConfig.MutexName, + } + break + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher}) } @@ -300,9 +311,20 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u } } + var pipeline_options *argocompiler.Options + for _, platform := range t.platformSpec.Platforms { + if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" { + pipeline_options = &argocompiler.Options{ + SemaphoreKey: platform.PipelineConfig.SemaphoreKey, + MutexName: platform.PipelineConfig.MutexName, + } + break + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil) } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 1f1c19ed3ec..41e86295564 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" k8score "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8sres "k8s.io/apimachinery/pkg/api/resource" k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -40,6 +41,8 @@ type Options struct { // optional PipelineRoot string // TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. + SemaphoreKey string + MutexName string } func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) { @@ -76,6 +79,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } + var semaphore_key, mutex_name string + if opts != nil && opts.SemaphoreKey != "" { + semaphore_key = opts.SemaphoreKey + } + if opts != nil && opts.MutexName != "" { + mutex_name = opts.MutexName + } + var kubernetesSpec *pipelinespec.SinglePlatformSpec if kubernetesSpecArg != nil { // clone kubernetesSpecArg, because we don't want to change it @@ -94,13 +105,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ObjectMeta: k8smeta.ObjectMeta{ GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-", - // Note, uncomment the following during development to view argo inputs/outputs in KFP UI. - // TODO(Bobgy): figure out what annotations we should use for v2 engine. - // For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts - // suitable for debugging. - // + // Uncomment during development for better debugging in KFP UI // Annotations: map[string]string{ - // "pipelines.kubeflow.org/v2_pipeline": "true", + // "pipelines.kubeflow.org/v2_pipeline": "true", // }, }, Spec: wfapi.WorkflowSpec{ @@ -117,8 +124,22 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + Synchronization: &wfapi.Synchronization{ + Semaphore: &wfapi.SemaphoreRef{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "semaphore-config", + }, + Key: semaphore_key, + }, + }, + Mutex: &wfapi.Mutex{ + Name: mutex_name, + }, + }, }, } + c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), diff --git a/backend/src/v2/compiler/visitor.go b/backend/src/v2/compiler/visitor.go index f6b7204a45c..277fadcbec8 100644 --- a/backend/src/v2/compiler/visitor.go +++ b/backend/src/v2/compiler/visitor.go @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec } // Add kubernetes spec to annotation - if state.kubernetesSpec != nil { + if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil { kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel] if ok { state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)