diff --git a/.spelling b/.spelling index 8a3401f1ca5b..27bc74f90714 100644 --- a/.spelling +++ b/.spelling @@ -102,6 +102,7 @@ PVCs Peixuan Ploomber Postgres +PriorityClass RCs Roadmap RoleBinding diff --git a/cmd/argo/commands/cron/create.go b/cmd/argo/commands/cron/create.go index fd97f535c58e..541be7efc464 100644 --- a/cmd/argo/commands/cron/create.go +++ b/cmd/argo/commands/cron/create.go @@ -90,7 +90,7 @@ func CreateCronWorkflows(ctx context.Context, filePaths []string, cliOpts *cliCr if err != nil { return fmt.Errorf("Failed to create cron workflow: %v", err) } - fmt.Print(getCronWorkflowGet(created)) + fmt.Print(getCronWorkflowGet(ctx, created)) } return nil } diff --git a/cmd/argo/commands/cron/get.go b/cmd/argo/commands/cron/get.go index 75e5e242e150..56f94c813730 100644 --- a/cmd/argo/commands/cron/get.go +++ b/cmd/argo/commands/cron/get.go @@ -33,7 +33,7 @@ func NewGetCommand() *cobra.Command { if err != nil { return err } - printCronWorkflow(cronWf, output) + printCronWorkflow(ctx, cronWf, output) } return nil }, diff --git a/cmd/argo/commands/cron/list.go b/cmd/argo/commands/cron/list.go index d4ccfc4b118b..bbcb6a71e358 100644 --- a/cmd/argo/commands/cron/list.go +++ b/cmd/argo/commands/cron/list.go @@ -1,6 +1,7 @@ package cron import ( + "context" "fmt" "os" "text/tabwriter" @@ -50,7 +51,7 @@ func NewListCommand() *cobra.Command { } switch listArgs.output { case "", "wide": - printTable(cronWfList.Items, &listArgs) + printTable(ctx, cronWfList.Items, &listArgs) case "name": for _, cronWf := range cronWfList.Items { fmt.Println(cronWf.ObjectMeta.Name) @@ -67,7 +68,7 @@ func NewListCommand() *cobra.Command { return command } -func printTable(wfList []wfv1.CronWorkflow, listArgs *listFlags) { +func printTable(ctx context.Context, wfList []wfv1.CronWorkflow, listArgs *listFlags) { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) if listArgs.allNamespaces { _, _ = fmt.Fprint(w, "NAMESPACE\t") @@ -85,7 +86,7 @@ func printTable(wfList []wfv1.CronWorkflow, listArgs *listFlags) { cleanLastScheduledTime = "N/A" } var cleanNextScheduledTime string - if next, err := GetNextRuntime(&cwf); err == nil { + if next, err := GetNextRuntime(ctx, &cwf); err == nil { cleanNextScheduledTime = humanize.RelativeDurationShort(next, time.Now()) } else { cleanNextScheduledTime = "N/A" diff --git a/cmd/argo/commands/cron/update.go b/cmd/argo/commands/cron/update.go index e1340267fe95..5f782ddf6ff5 100644 --- a/cmd/argo/commands/cron/update.go +++ b/cmd/argo/commands/cron/update.go @@ -89,7 +89,7 @@ func updateCronWorkflows(ctx context.Context, filePaths []string, cliOpts *cliUp if err != nil { return fmt.Errorf("Failed to update workflow template: %v", err) } - fmt.Print(getCronWorkflowGet(updated)) + fmt.Print(getCronWorkflowGet(ctx, updated)) } return nil } diff --git a/cmd/argo/commands/cron/util.go b/cmd/argo/commands/cron/util.go index eefbd754c0c1..82db7983dcde 100644 --- a/cmd/argo/commands/cron/util.go +++ b/cmd/argo/commands/cron/util.go @@ -1,6 +1,7 @@ package cron import ( + "context" "encoding/json" "fmt" "log" @@ -22,10 +23,10 @@ import ( // GetNextRuntime returns the next time the workflow should run in local time. It assumes the workflow-controller is in // UTC, but nevertheless returns the time in the local timezone. -func GetNextRuntime(cwf *v1alpha1.CronWorkflow) (time.Time, error) { +func GetNextRuntime(ctx context.Context, cwf *v1alpha1.CronWorkflow) (time.Time, error) { var nextRunTime time.Time now := time.Now().UTC() - for _, schedule := range cwf.Spec.GetSchedulesWithTimezone() { + for _, schedule := range cwf.Spec.GetSchedulesWithTimezone(ctx) { cronSchedule, err := cron.ParseStandard(schedule) if err != nil { return time.Time{}, err @@ -77,7 +78,7 @@ func unmarshalCronWorkflows(wfBytes []byte, strict bool) []wfv1.CronWorkflow { return nil } -func printCronWorkflow(wf *wfv1.CronWorkflow, outFmt string) { +func printCronWorkflow(ctx context.Context, wf *wfv1.CronWorkflow, outFmt string) { switch outFmt { case "name": fmt.Println(wf.ObjectMeta.Name) @@ -88,13 +89,13 @@ func printCronWorkflow(wf *wfv1.CronWorkflow, outFmt string) { outBytes, _ := yaml.Marshal(wf) fmt.Print(string(outBytes)) case "wide", "": - fmt.Print(getCronWorkflowGet(wf)) + fmt.Print(getCronWorkflowGet(ctx, wf)) default: log.Fatalf("Unknown output format: %s", outFmt) } } -func getCronWorkflowGet(cwf *wfv1.CronWorkflow) string { +func getCronWorkflowGet(ctx context.Context, cwf *wfv1.CronWorkflow) string { const fmtStr = "%-30s %v\n" out := "" @@ -116,7 +117,7 @@ func getCronWorkflowGet(cwf *wfv1.CronWorkflow) string { out += fmt.Sprintf(fmtStr, "LastScheduledTime:", humanize.Timestamp(cwf.Status.LastScheduledTime.Time)) } - next, err := GetNextRuntime(cwf) + next, err := GetNextRuntime(ctx, cwf) if err == nil { out += fmt.Sprintf(fmtStr, "NextScheduledTime:", humanize.Timestamp(next)+" (assumes workflow-controller is in UTC)") } diff --git a/cmd/argo/commands/cron/util_test.go b/cmd/argo/commands/cron/util_test.go index 0eb696d14ab9..e1e0ab9e623c 100644 --- a/cmd/argo/commands/cron/util_test.go +++ b/cmd/argo/commands/cron/util_test.go @@ -1,6 +1,7 @@ package cron import ( + "context" "testing" "time" @@ -56,13 +57,13 @@ Conditions: func TestPrintCronWorkflow(t *testing.T) { var cronWf = v1alpha1.MustUnmarshalCronWorkflow(invalidCwf) - out := getCronWorkflowGet(cronWf) + out := getCronWorkflowGet(context.Background(), cronWf) assert.Contains(t, out, expectedOut) } func TestNextRuntime(t *testing.T) { var cronWf = v1alpha1.MustUnmarshalCronWorkflow(invalidCwf) - next, err := GetNextRuntime(cronWf) + next, err := GetNextRuntime(context.Background(), cronWf) require.NoError(t, err) assert.LessOrEqual(t, next.Unix(), time.Now().Add(1*time.Minute).Unix()) assert.Greater(t, next.Unix(), time.Now().Unix()) @@ -94,7 +95,7 @@ spec: func TestNextRuntimeWithMultipleSchedules(t *testing.T) { var cronWf = v1alpha1.MustUnmarshalCronWorkflow(cronMultipleSchedules) - next, err := GetNextRuntime(cronWf) + next, err := GetNextRuntime(context.Background(), cronWf) require.NoError(t, err) assert.LessOrEqual(t, next.Unix(), time.Now().Add(1*time.Minute).Unix()) assert.Greater(t, next.Unix(), time.Now().Unix()) diff --git a/config/config.go b/config/config.go index edccd9fa1ba3..74f29272d7a8 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,8 @@ import ( "net/url" "time" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -288,13 +290,28 @@ type MetricsConfig struct { Temporality MetricsTemporality `json:"temporality,omitempty"` } -func (mc MetricsConfig) GetSecure(defaultValue bool) bool { +func (mc *MetricsConfig) GetSecure(defaultValue bool) bool { if mc.Secure != nil { return *mc.Secure } return defaultValue } +func (mc *MetricsConfig) GetTemporality() metricsdk.TemporalitySelector { + switch mc.Temporality { + case MetricsTemporalityCumulative: + return func(metricsdk.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + } + case MetricsTemporalityDelta: + return func(metricsdk.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + } + default: + return metricsdk.DefaultTemporalitySelector + } +} + type WorkflowRestrictions struct { TemplateReferencing TemplateReferencing `json:"templateReferencing,omitempty"` } diff --git a/docs/deprecations.md b/docs/deprecations.md new file mode 100644 index 000000000000..b830701e6794 --- /dev/null +++ b/docs/deprecations.md @@ -0,0 +1,75 @@ +# Deprecations + +Sometimes a feature of Argo Workflows is deprecated. +You should stop using a deprecated feature as it may be removed in a future minor or major release of Argo Workflows. + +To determine if you are using a deprecated feature the [`deprecated_feature`](metrics.md#deprecated_feature) metric can help. +This metric will go up for each use of a deprecated feature by the workflow controller. +This means it may go up once or many times for a single event. +If the number is going up the feature is still in use by your system. +If the metric is not present or no longer increasing are no longer using the monitored deprecated features. + +## `cronworkflow schedule` + +The spec field `schedule` which takes a single value is replaced by `schedules` which takes a list. +To update this replace the `schedule` with `schedules` as in the following example + +```yaml +spec: + schedule: "30 1 * * *" +``` + +is replaced with + +```yaml +spec: + schedules: + - "30 1 * * *" +``` + +## `synchronization mutex` + +The synchronization field `mutex` which takes a single value is replaced by `mutexes` which takes a list. +To update this replace `mutex` with `mutexes` as in the following example + +```yaml +synchronization: + mutex: + name: foobar +``` + +is replaced with + +```yaml +synchronization: + mutexes: + - name: foobar +``` + +## `synchronization semaphore` + +The synchronization field `semaphore` which takes a single value is replaced by `semaphores` which takes a list. +To update this replace `semaphore` with `semaphores` as in the following example + +```yaml +synchronization: + semaphore: + configMapKeyRef: + name: my-config + key: workflow +``` + +is replaced with + +```yaml +synchronization: + semaphores: + - configMapKeyRef: + name: my-config + key: workflow +``` + +## `workflow podpriority` + +The Workflow spec field `podPriority` which takes a numeric value is deprecated and `podPriorityClassName` should be used instead. +To update this you will need a [PriorityClass](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) in your cluster and refer to that using `podPriorityClassName`. diff --git a/docs/metrics.md b/docs/metrics.md index d9bae309d34e..a1cd4adb586f 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -265,6 +265,23 @@ Suppressed runs due to `concurrencyPolicy: Forbid` will not be counted. | `name` | ⚠️ The name of the CronWorkflow | | `namespace` | The namespace of the CronWorkflow | +#### `deprecated_feature` + +A counter which goes up when a feature which is [deprecated](deprecations.md) is used. +🚨 This counter may go up much more than once for a single use of the feature. + +| attribute | explanation | +|-------------|---------------------------------------------| +| `feature` | The name of the feature used | +| `namespace` | The namespace of the item using the feature | + +`feature` will be one of: + +- [`cronworkflow schedule`](deprecations.md#cronworkflow_schedule) +- [`synchronization mutex`](deprecations.md#synchronization_mutex) +- [`synchronization semaphore`](deprecations.md#synchronization_semaphore) +- [`workflow podpriority`](deprecations.md#workflow_podpriority) + #### `gauge` A gauge of the number of workflows currently in the cluster in each phase. The `Running` count does not mean that a workflows pods are running, just that the controller has scheduled them. A workflow can be stuck in `Running` with pending pods for a long time. diff --git a/docs/upgrading.md b/docs/upgrading.md index d17f155899a8..27a01747556d 100644 --- a/docs/upgrading.md +++ b/docs/upgrading.md @@ -31,6 +31,7 @@ The following are new metrics: * `cronworkflows_concurrencypolicy_triggered` * `cronworkflows_triggered_total` +* `deprecated_feature` * `is_leader` * `k8s_request_duration` * `pod_pending_count` diff --git a/mkdocs.yml b/mkdocs.yml index 52e1dbb74f3b..e232b7cf9cd1 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -248,6 +248,7 @@ nav: - offloading-large-workflows.md - workflow-archive.md - metrics.md + - deprecations.md - workflow-executors.md - workflow-restrictions.md - sidecar-injection.md diff --git a/pkg/apiclient/offline-cron-workflow-service-client.go b/pkg/apiclient/offline-cron-workflow-service-client.go index 6104a207c89b..6dbd1a7bd407 100644 --- a/pkg/apiclient/offline-cron-workflow-service-client.go +++ b/pkg/apiclient/offline-cron-workflow-service-client.go @@ -19,7 +19,7 @@ type OfflineCronWorkflowServiceClient struct { var _ cronworkflow.CronWorkflowServiceClient = &OfflineCronWorkflowServiceClient{} func (o OfflineCronWorkflowServiceClient) LintCronWorkflow(ctx context.Context, req *cronworkflow.LintCronWorkflowRequest, _ ...grpc.CallOption) (*v1alpha1.CronWorkflow, error) { - err := validate.ValidateCronWorkflow(o.namespacedWorkflowTemplateGetterMap.GetNamespaceGetter(req.Namespace), o.clusterWorkflowTemplateGetter, req.CronWorkflow) + err := validate.ValidateCronWorkflow(ctx, o.namespacedWorkflowTemplateGetterMap.GetNamespaceGetter(req.Namespace), o.clusterWorkflowTemplateGetter, req.CronWorkflow) if err != nil { return nil, err } diff --git a/pkg/apis/workflow/v1alpha1/cron_workflow_types.go b/pkg/apis/workflow/v1alpha1/cron_workflow_types.go index d6c2d69aaf29..28bc4cca44b4 100644 --- a/pkg/apis/workflow/v1alpha1/cron_workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/cron_workflow_types.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "context" "strings" v1 "k8s.io/api/core/v1" @@ -8,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" + "github.com/argoproj/argo-workflows/v3/util/deprecation" ) // CronWorkflow is the definition of a scheduled workflow resource @@ -169,17 +171,17 @@ func (c *CronWorkflowSpec) getScheduleString(withTimezone bool) string { // GetSchedulesWithTimezone returns all schedules configured for the CronWorkflow with a timezone. It handles // both Spec.Schedules and Spec.Schedule for backwards compatibility -func (c *CronWorkflowSpec) GetSchedulesWithTimezone() []string { - return c.getSchedules(true) +func (c *CronWorkflowSpec) GetSchedulesWithTimezone(ctx context.Context) []string { + return c.getSchedules(ctx, true) } // GetSchedules returns all schedules configured for the CronWorkflow. It handles both Spec.Schedules // and Spec.Schedule for backwards compatibility -func (c *CronWorkflowSpec) GetSchedules() []string { - return c.getSchedules(false) +func (c *CronWorkflowSpec) GetSchedules(ctx context.Context) []string { + return c.getSchedules(ctx, false) } -func (c *CronWorkflowSpec) getSchedules(withTimezone bool) []string { +func (c *CronWorkflowSpec) getSchedules(ctx context.Context, withTimezone bool) []string { var schedules []string if c.Schedule != "" { schedule := c.Schedule @@ -187,6 +189,7 @@ func (c *CronWorkflowSpec) getSchedules(withTimezone bool) []string { schedule = c.withTimezone(c.Schedule) } schedules = append(schedules, schedule) + deprecation.Record(ctx, deprecation.Schedule) } else { schedules = make([]string, len(c.Schedules)) for i, schedule := range c.Schedules { diff --git a/pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go b/pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go index 1822eec4f941..021b60f6e9de 100644 --- a/pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go +++ b/pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -21,14 +22,14 @@ func TestCronWorkflowSpec_GetScheduleStrings(t *testing.T) { Timezone: "", Schedule: "* * * * *", } - - assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedules()) - assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedulesWithTimezone()) + ctx := context.Background() + assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedules(ctx)) + assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedulesWithTimezone(ctx)) assert.Equal(t, "* * * * *", cwfSpec.GetScheduleString()) cwfSpec.Timezone = "America/Los_Angeles" - assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedules()) - assert.Equal(t, []string{"CRON_TZ=America/Los_Angeles * * * * *"}, cwfSpec.GetSchedulesWithTimezone()) + assert.Equal(t, []string{"* * * * *"}, cwfSpec.GetSchedules(ctx)) + assert.Equal(t, []string{"CRON_TZ=America/Los_Angeles * * * * *"}, cwfSpec.GetSchedulesWithTimezone(ctx)) assert.Equal(t, "* * * * *", cwfSpec.GetScheduleString()) assert.Equal(t, "CRON_TZ=America/Los_Angeles * * * * *", cwfSpec.GetScheduleWithTimezoneString()) @@ -39,8 +40,8 @@ func TestCronWorkflowSpec_GetScheduleStrings(t *testing.T) { assert.Equal(t, "* * * * *,0 * * * *", cwfSpec.GetScheduleString()) cwfSpec.Timezone = "America/Los_Angeles" - assert.Equal(t, []string{"* * * * *", "0 * * * *"}, cwfSpec.GetSchedules()) - assert.Equal(t, []string{"CRON_TZ=America/Los_Angeles * * * * *", "CRON_TZ=America/Los_Angeles 0 * * * *"}, cwfSpec.GetSchedulesWithTimezone()) + assert.Equal(t, []string{"* * * * *", "0 * * * *"}, cwfSpec.GetSchedules(ctx)) + assert.Equal(t, []string{"CRON_TZ=America/Los_Angeles * * * * *", "CRON_TZ=America/Los_Angeles 0 * * * *"}, cwfSpec.GetSchedulesWithTimezone(ctx)) assert.Equal(t, "* * * * *,0 * * * *", cwfSpec.GetScheduleString()) assert.Equal(t, "CRON_TZ=America/Los_Angeles * * * * *,CRON_TZ=America/Los_Angeles 0 * * * *", cwfSpec.GetScheduleWithTimezoneString()) } diff --git a/server/cronworkflow/cron_workflow_server.go b/server/cronworkflow/cron_workflow_server.go index 49d45bffea4c..cd8012a61320 100644 --- a/server/cronworkflow/cron_workflow_server.go +++ b/server/cronworkflow/cron_workflow_server.go @@ -35,7 +35,7 @@ func (c *cronWorkflowServiceServer) LintCronWorkflow(ctx context.Context, req *c cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()) c.instanceIDService.Label(req.CronWorkflow) creator.Label(ctx, req.CronWorkflow) - err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow) + err := validate.ValidateCronWorkflow(ctx, wftmplGetter, cwftmplGetter, req.CronWorkflow) if err != nil { return nil, sutils.ToStatusError(err, codes.InvalidArgument) } @@ -64,7 +64,7 @@ func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req creator.Label(ctx, req.CronWorkflow) wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace)) cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()) - err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow) + err := validate.ValidateCronWorkflow(ctx, wftmplGetter, cwftmplGetter, req.CronWorkflow) if err != nil { return nil, sutils.ToStatusError(err, codes.InvalidArgument) } @@ -91,7 +91,7 @@ func (c *cronWorkflowServiceServer) UpdateCronWorkflow(ctx context.Context, req } wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace)) cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()) - if err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow); err != nil { + if err := validate.ValidateCronWorkflow(ctx, wftmplGetter, cwftmplGetter, req.CronWorkflow); err != nil { return nil, sutils.ToStatusError(err, codes.InvalidArgument) } crWf, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(ctx, req.CronWorkflow, metav1.UpdateOptions{}) diff --git a/test/e2e/metrics_test.go b/test/e2e/metrics_test.go index 3c17afdc0175..61d8d092d6ac 100644 --- a/test/e2e/metrics_test.go +++ b/test/e2e/metrics_test.go @@ -94,6 +94,74 @@ func (s *MetricsSuite) TestDAGMetrics() { }) } +func (s *MetricsSuite) TestDeprecatedCronSchedule() { + s.Given(). + CronWorkflow(`@testdata/cronworkflow-deprecated-schedule.yaml`). + When(). + CreateCronWorkflow(). + Wait(1 * time.Minute). // This pattern is used in cron_test.go too + Then(). + ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) { + s.e(s.T()).GET(""). + Expect(). + Status(200). + Body(). + Contains(`deprecated_feature{feature="cronworkflow schedule",namespace="argo"}`) // Count unimportant and unknown + }) +} + +func (s *MetricsSuite) TestDeprecatedMutex() { + s.Given(). + Workflow(`@testdata/synchronization-deprecated-mutex.yaml`). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeSucceeded). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase) + s.e(s.T()).GET(""). + Expect(). + Status(200). + Body(). + Contains(`deprecated_feature{feature="synchronization mutex",namespace="argo"}`) // Count unimportant and unknown + }) +} + +func (s *MetricsSuite) TestDeprecatedPodPriority() { + s.Given(). + Workflow(`@testdata/workflow-deprecated-podpriority.yaml`). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeErrored). // Fails as kubernetes we test on do not support this1 + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + s.e(s.T()).GET(""). + Expect(). + Status(200). + Body(). + Contains(`deprecated_feature{feature="workflow podpriority",namespace="argo"}`) // Count unimportant and unknown + }) +} + +func (s *MetricsSuite) TestDeprecatedSemaphore() { + s.Given(). + Workflow(`@testdata/synchronization-deprecated-semaphore.yaml`). + When(). + CreateConfigMap("my-config", map[string]string{"workflow": "1"}, map[string]string{}). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeSucceeded). + DeleteConfigMap("my-config"). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase) + s.e(s.T()).GET(""). + Expect(). + Status(200). + Body(). + Contains(`deprecated_feature{feature="synchronization semaphore",namespace="argo"}`) // Count unimportant and unknown + }) +} + func (s *MetricsSuite) TestFailedMetric() { s.Given(). Workflow(`@testdata/template-status-failed-conditional-metric.yaml`). diff --git a/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml b/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml new file mode 100644 index 000000000000..abfda0b51fde --- /dev/null +++ b/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml @@ -0,0 +1,21 @@ +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: test-cron-deprecated-schedule +spec: + schedules: + - "* * * * *" + concurrencyPolicy: "Forbid" + startingDeadlineSeconds: 0 + workflowSpec: + metadata: + labels: + workflows.argoproj.io/test: "true" + podGC: + strategy: OnPodCompletion + entrypoint: sleep + templates: + - name: sleep + container: + image: alpine:latest + command: [sh, -c, "sleep 120"] diff --git a/test/e2e/testdata/cronworkflow-metrics-forbid.yaml b/test/e2e/testdata/cronworkflow-metrics-forbid.yaml index 7e4cb97c4ad7..0b3a40ad97a9 100644 --- a/test/e2e/testdata/cronworkflow-metrics-forbid.yaml +++ b/test/e2e/testdata/cronworkflow-metrics-forbid.yaml @@ -3,7 +3,8 @@ kind: CronWorkflow metadata: name: test-cron-metric-forbid spec: - schedule: "* * * * *" + schedules: + - "* * * * *" concurrencyPolicy: "Forbid" startingDeadlineSeconds: 0 workflowSpec: diff --git a/test/e2e/testdata/cronworkflow-metrics-replace.yaml b/test/e2e/testdata/cronworkflow-metrics-replace.yaml index 7b2a8534b24f..e67b490c8aa2 100644 --- a/test/e2e/testdata/cronworkflow-metrics-replace.yaml +++ b/test/e2e/testdata/cronworkflow-metrics-replace.yaml @@ -3,7 +3,8 @@ kind: CronWorkflow metadata: name: test-cron-metric-replace spec: - schedule: "* * * * *" + schedules: + - "* * * * *" concurrencyPolicy: "Replace" startingDeadlineSeconds: 0 workflowSpec: diff --git a/test/e2e/testdata/cronworkflow_deprecated_schedule.yaml b/test/e2e/testdata/cronworkflow_deprecated_schedule.yaml new file mode 100644 index 000000000000..0b3a40ad97a9 --- /dev/null +++ b/test/e2e/testdata/cronworkflow_deprecated_schedule.yaml @@ -0,0 +1,21 @@ +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: test-cron-metric-forbid +spec: + schedules: + - "* * * * *" + concurrencyPolicy: "Forbid" + startingDeadlineSeconds: 0 + workflowSpec: + metadata: + labels: + workflows.argoproj.io/test: "true" + podGC: + strategy: OnPodCompletion + entrypoint: sleep + templates: + - name: sleep + container: + image: alpine:latest + command: [sh, -c, "sleep 120"] diff --git a/test/e2e/testdata/synchronization-deprecated-mutex.yaml b/test/e2e/testdata/synchronization-deprecated-mutex.yaml new file mode 100644 index 000000000000..31dfd1a912a0 --- /dev/null +++ b/test/e2e/testdata/synchronization-deprecated-mutex.yaml @@ -0,0 +1,13 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: mutex-deprecated- +spec: + entrypoint: whalesay + synchronization: + mutex: + name: test-mutex + templates: + - name: whalesay + container: + image: argoproj/argosay:v2 diff --git a/test/e2e/testdata/synchronization-deprecated-semaphore.yaml b/test/e2e/testdata/synchronization-deprecated-semaphore.yaml new file mode 100644 index 000000000000..5f2b9e8a5d22 --- /dev/null +++ b/test/e2e/testdata/synchronization-deprecated-semaphore.yaml @@ -0,0 +1,15 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: semaphore-deprecated- +spec: + entrypoint: whalesay + synchronization: + semaphore: + configMapKeyRef: + name: my-config + key: workflow + templates: + - name: whalesay + container: + image: argoproj/argosay:v2 diff --git a/test/e2e/testdata/workflow-deprecated-podpriority.yaml b/test/e2e/testdata/workflow-deprecated-podpriority.yaml new file mode 100644 index 000000000000..17c1e7e43471 --- /dev/null +++ b/test/e2e/testdata/workflow-deprecated-podpriority.yaml @@ -0,0 +1,11 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: podpriority-deprecated- +spec: + entrypoint: whalesay + podPriority: 9999 + templates: + - name: whalesay + container: + image: argoproj/argosay:v2 diff --git a/util/context/context.go b/util/context/context.go new file mode 100644 index 000000000000..891a6675011f --- /dev/null +++ b/util/context/context.go @@ -0,0 +1,33 @@ +// Package context contains common functions for storing and retriving information from +// standard go context +package context + +import ( + "context" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + name string = `object_name` + namespace string = `object_namespace` +) + +func InjectObjectMeta(ctx context.Context, meta *meta.ObjectMeta) context.Context { + ctx = context.WithValue(ctx, name, meta.Name) + return context.WithValue(ctx, namespace, meta.Namespace) +} + +func ObjectName(ctx context.Context) string { + if n, ok := ctx.Value(name).(string); ok { + return n + } + return "" +} + +func ObjectNamespace(ctx context.Context) string { + if n, ok := ctx.Value(namespace).(string); ok { + return n + } + return "" +} diff --git a/util/context/context_test.go b/util/context/context_test.go new file mode 100644 index 000000000000..a6dde3809d30 --- /dev/null +++ b/util/context/context_test.go @@ -0,0 +1,16 @@ +package context + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestObjectMeta(t *testing.T) { + ctx := context.Background() + ctx = InjectObjectMeta(ctx, &meta.ObjectMeta{Name: "foo", Namespace: "bar"}) + assert.Equal(t, "foo", ObjectName(ctx)) + assert.Equal(t, "bar", ObjectNamespace(ctx)) +} diff --git a/util/deprecation/deprecation.go b/util/deprecation/deprecation.go new file mode 100644 index 000000000000..d8eeb66205d6 --- /dev/null +++ b/util/deprecation/deprecation.go @@ -0,0 +1,56 @@ +// Package deprecation records uses of deprecated features so that users can be made aware of +// things that may be removed in a future version and move away from them. +package deprecation + +// This is a deliberate singleton devised to be functional when initialised with an +// instance of metrics, and otherwise to remain quiet +// +// This avoids the problem of injecting the metrics package (or whatever recording method the deprecation +// recorder is using) temporarily into packages and then painfully removing the injection later when the +// package no longer has deprecated features (as they've been removed) + +import ( + "context" + + wfctx "github.com/argoproj/argo-workflows/v3/util/context" +) + +type metricsFunc func(context.Context, string, string) + +var ( + metricsF metricsFunc +) + +type Type int + +const ( + Schedule Type = iota + Mutex + Semaphore + PodPriority +) + +func (t *Type) asString() string { + switch *t { + case Schedule: + return `cronworkflow schedule` + case Mutex: + return `synchronization mutex` + case Semaphore: + return `synchronization semaphore` + case PodPriority: + return `workflow podpriority` + default: + return `unknown` + } +} + +func Initialize(m metricsFunc) { + metricsF = m +} + +func Record(ctx context.Context, deprecation Type) { + if metricsF != nil { + metricsF(ctx, deprecation.asString(), wfctx.ObjectNamespace(ctx)) + } +} diff --git a/util/deprecation/deprecation_test.go b/util/deprecation/deprecation_test.go new file mode 100644 index 000000000000..ab22f4a13727 --- /dev/null +++ b/util/deprecation/deprecation_test.go @@ -0,0 +1,38 @@ +package deprecation + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUninitalized(t *testing.T) { + metricsF = nil + Record(context.Background(), Schedule) +} + +func TestInitalized(t *testing.T) { + count := 0 + countSchedule := 0 + countMutex := 0 + fn := func(_ context.Context, deprecation, _ string) { + count += 1 + if deprecation == "cronworkflows schedule" { + countSchedule += 1 + } + if deprecation == "synchronization mutex" { + countMutex += 1 + } + } + Initialize(fn) + ctx := context.Background() + Record(ctx, Schedule) + assert.Equal(t, 1, count) + assert.Equal(t, 1, countSchedule) + assert.Equal(t, 0, countMutex) + Record(ctx, Mutex) + assert.Equal(t, 2, count) + assert.Equal(t, 1, countSchedule) + assert.Equal(t, 1, countMutex) +} diff --git a/util/telemetry/attributes.go b/util/telemetry/attributes.go index 562f92f69a3d..e2ca6700731f 100644 --- a/util/telemetry/attributes.go +++ b/util/telemetry/attributes.go @@ -13,6 +13,8 @@ const ( AttribCronWFName string = `name` AttribConcurrencyPolicy string = `concurrency_policy` + AttribDeprecatedFeature string = "feature" + AttribErrorCause string = "cause" AttribLogLevel string = `level` diff --git a/util/telemetry/counter_deprecations.go b/util/telemetry/counter_deprecations.go new file mode 100644 index 000000000000..7f4bbec320f2 --- /dev/null +++ b/util/telemetry/counter_deprecations.go @@ -0,0 +1,28 @@ +package telemetry + +import ( + "context" +) + +const ( + nameDeprecated = `deprecated_feature` +) + +func AddDeprecationCounter(_ context.Context, m *Metrics) error { + return m.CreateInstrument(Int64Counter, + nameDeprecated, + "Incidents of deprecated feature being used.", + "{feature}", + WithAsBuiltIn(), + ) +} + +func (m *Metrics) DeprecatedFeature(ctx context.Context, deprecation string, namespace string) { + attribs := InstAttribs{ + {Name: AttribDeprecatedFeature, Value: deprecation}, + } + if namespace != "" { + attribs = append(attribs, InstAttrib{Name: AttribWorkflowNamespace, Value: namespace}) + } + m.AddInt(ctx, nameDeprecated, 1, attribs) +} diff --git a/util/telemetry/metrics.go b/util/telemetry/metrics.go index 2a6be32c38fd..a4583fe92aed 100644 --- a/util/telemetry/metrics.go +++ b/util/telemetry/metrics.go @@ -9,13 +9,10 @@ import ( log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" - wfconfig "github.com/argoproj/argo-workflows/v3/config" - "go.opentelemetry.io/contrib/instrumentation/runtime" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) @@ -28,7 +25,7 @@ type Config struct { IgnoreErrors bool Secure bool Modifiers map[string]Modifier - Temporality wfconfig.MetricsTemporality + Temporality metricsdk.TemporalitySelector } type Metrics struct { @@ -55,7 +52,7 @@ func NewMetrics(ctx context.Context, serviceName, prometheusName string, config _, otlpMetricsEnabled := os.LookupEnv(`OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`) if otlpEnabled || otlpMetricsEnabled { log.Info("Starting OTLP metrics exporter") - otelExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithTemporalitySelector(getTemporality(config))) + otelExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithTemporalitySelector(config.Temporality)) if err != nil { return nil, err } @@ -103,18 +100,3 @@ func (m *Metrics) Populate(ctx context.Context, adders ...AddMetric) error { } return nil } - -func getTemporality(config *Config) metricsdk.TemporalitySelector { - switch config.Temporality { - case wfconfig.MetricsTemporalityCumulative: - return func(metricsdk.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality - } - case wfconfig.MetricsTemporalityDelta: - return func(metricsdk.InstrumentKind) metricdata.Temporality { - return metricdata.DeltaTemporality - } - default: - return metricsdk.DefaultTemporalitySelector - } -} diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index 7d166e5b0276..72d2bbe28bf4 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -230,7 +230,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro } tmpl := &wfv1.Template{} - woc.addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl, "") + woc.addSchedulingConstraints(ctx, pod, woc.execWf.Spec.DeepCopy(), tmpl, "") woc.addMetadata(pod, tmpl) woc.addDNSConfig(pod) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4b542b2cdb2b..5ed3f8bfc944 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -49,6 +49,8 @@ import ( wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/plugins/spec" authutil "github.com/argoproj/argo-workflows/v3/util/auth" + wfctx "github.com/argoproj/argo-workflows/v3/util/context" + "github.com/argoproj/argo-workflows/v3/util/deprecation" "github.com/argoproj/argo-workflows/v3/util/diff" "github.com/argoproj/argo-workflows/v3/util/env" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" @@ -230,6 +232,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli WorkflowCondition: wfc.getWorkflowConditionMetrics, IsLeader: wfc.IsLeader, }) + deprecation.Initialize(wfc.metrics.Metrics.DeprecatedFeature) if err != nil { return nil, err @@ -431,7 +434,7 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { return err } - wfc.syncManager.Initialize(wfList.Items) + wfc.syncManager.Initialize(ctx, wfList.Items) if err := wfc.throttler.Init(wfList.Items); err != nil { return err @@ -888,6 +891,7 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { woc.persistUpdates(ctx) return true } + ctx = wfctx.InjectObjectMeta(ctx, &woc.wf.ObjectMeta) startTime := time.Now() woc.operate(ctx) wfc.metrics.OperationCompleted(ctx, time.Since(startTime).Seconds()) @@ -1413,7 +1417,7 @@ func (wfc *WorkflowController) getMetricsServerConfig() *telemetry.Config { IgnoreErrors: wfc.Config.MetricsConfig.IgnoreErrors, Secure: wfc.Config.MetricsConfig.GetSecure(true), Modifiers: modifiers, - Temporality: wfc.Config.MetricsConfig.Temporality, + Temporality: wfc.Config.MetricsConfig.GetTemporality(), } return &metricsConfig } diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 1b609be9fd10..8f708eb02d05 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -454,7 +454,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex // Release acquired lock completed task. if processedTmpl != nil { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) } scope, err := woc.buildLocalScopeFromTask(dagCtx, task) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 5ddf1cdfce00..69dd6997532c 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -250,7 +250,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { // Workflow Level Synchronization lock if woc.execWf.Spec.Synchronization != nil { - acquired, wfUpdate, msg, failedLockName, err := woc.controller.syncManager.TryAcquire(woc.wf, "", woc.execWf.Spec.Synchronization) + acquired, wfUpdate, msg, failedLockName, err := woc.controller.syncManager.TryAcquire(ctx, woc.wf, "", woc.execWf.Spec.Synchronization) if err != nil { woc.log.Warnf("Failed to acquire the lock %s", failedLockName) woc.markWorkflowFailed(ctx, fmt.Sprintf("Failed to acquire the synchronization lock. %s", err.Error())) @@ -1942,7 +1942,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, if node != nil { fulfilledNode := woc.handleNodeFulfilled(ctx, nodeName, node, processedTmpl) if fulfilledNode != nil { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) return fulfilledNode, nil } woc.log.Debugf("Executing node %s of %s is %s", nodeName, node.Type, node.Phase) @@ -1972,7 +1972,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, unlockedNode := false if processedTmpl.Synchronization != nil { - lockAcquired, wfUpdated, msg, failedLockName, err := woc.controller.syncManager.TryAcquire(woc.wf, woc.wf.NodeID(nodeName), processedTmpl.Synchronization) + lockAcquired, wfUpdated, msg, failedLockName, err := woc.controller.syncManager.TryAcquire(ctx, woc.wf, woc.wf.NodeID(nodeName), processedTmpl.Synchronization) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err } @@ -2061,7 +2061,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, if node != nil { fulfilledNode := woc.handleNodeFulfilled(ctx, nodeName, node, processedTmpl) if fulfilledNode != nil { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) return fulfilledNode, nil } // Memoized nodes don't have StartedAt. @@ -2120,7 +2120,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } } if processedTmpl.Synchronization != nil { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) } _, lastChildNode := getChildNodeIdsAndLastRetriedNode(retryParentNode, woc.wf.Status.Nodes) if lastChildNode != nil { @@ -2210,13 +2210,13 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } } if release { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) return node, err } } if node.Fulfilled() { - woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) } retrieveNode, err := woc.wf.GetNodeByName(node.Name) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 298725327cf2..ea227ff0f236 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -8537,7 +8537,7 @@ func TestMutexWfPendingWithNoPod(t *testing.T) { ctx := context.Background() controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) { }, workflowExistenceFunc) - _, _, _, _, err := controller.syncManager.TryAcquire(wf, "test", &wfv1.Synchronization{Mutex: &wfv1.Mutex{Name: "welcome"}}) + _, _, _, _, err := controller.syncManager.TryAcquire(ctx, wf, "test", &wfv1.Synchronization{Mutex: &wfv1.Mutex{Name: "welcome"}}) require.NoError(t, err) woc := newWorkflowOperationCtx(wf, controller) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index bc5fb5870040..0d2bb0986327 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -20,6 +20,7 @@ import ( "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/deprecation" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" "github.com/argoproj/argo-workflows/v3/util/intstr" "github.com/argoproj/argo-workflows/v3/util/template" @@ -255,7 +256,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin initCtr := woc.newInitContainer(tmpl) pod.Spec.InitContainers = []apiv1.Container{initCtr} - woc.addSchedulingConstraints(pod, wfSpec, tmpl, nodeName) + woc.addSchedulingConstraints(ctx, pod, wfSpec, tmpl, nodeName) woc.addMetadata(pod, tmpl) // Set initial progress from pod metadata if exists. @@ -772,7 +773,7 @@ func (woc *wfOperationCtx) addDNSConfig(pod *apiv1.Pod) { } // addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template -func (woc *wfOperationCtx) addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, nodeName string) { +func (woc *wfOperationCtx) addSchedulingConstraints(ctx context.Context, pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, nodeName string) { // Get boundaryNode Template (if specified) boundaryTemplate, err := woc.GetBoundaryTemplate(nodeName) if err != nil { @@ -818,8 +819,10 @@ func (woc *wfOperationCtx) addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1 // Set priority (if specified) if tmpl.Priority != nil { pod.Spec.Priority = tmpl.Priority + deprecation.Record(ctx, deprecation.PodPriority) } else if wfSpec.PodPriority != nil { pod.Spec.Priority = wfSpec.PodPriority + deprecation.Record(ctx, deprecation.PodPriority) } // set hostaliases diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index 8cedef1e459a..106bc33e588d 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -28,6 +28,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" + wfctx "github.com/argoproj/argo-workflows/v3/util/context" "github.com/argoproj/argo-workflows/v3/util/env" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/events" @@ -172,6 +173,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool { logCtx.WithError(err).Error("malformed cron workflow: could not convert from unstructured") return true } + ctx = wfctx.InjectObjectMeta(ctx, &cronWf.ObjectMeta) cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer) @@ -193,7 +195,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool { // The job is currently scheduled, remove it and re add it. cc.cron.Delete(key.(string)) - for _, schedule := range cronWf.Spec.GetSchedulesWithTimezone() { + for _, schedule := range cronWf.Spec.GetSchedulesWithTimezone(ctx) { lastScheduledTimeFunc, err := cc.cron.AddJob(key.(string), schedule, cronWorkflowOperationCtx) if err != nil { logCtx.WithError(err).Error("could not schedule CronWorkflow") diff --git a/workflow/cron/operator.go b/workflow/cron/operator.go index d4f8198acb26..fdef6573c8db 100644 --- a/workflow/cron/operator.go +++ b/workflow/cron/operator.go @@ -136,7 +136,7 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti func (woc *cronWfOperationCtx) validateCronWorkflow(ctx context.Context) error { wftmplGetter := informer.NewWorkflowTemplateFromInformerGetter(woc.wftmplInformer, woc.cronWf.ObjectMeta.Namespace) cwftmplGetter := informer.NewClusterWorkflowTemplateFromInformerGetter(woc.cwftmplInformer) - err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, woc.cronWf) + err := validate.ValidateCronWorkflow(ctx, wftmplGetter, cwftmplGetter, woc.cronWf) if err != nil { woc.reportCronWorkflowError(ctx, v1alpha1.ConditionTypeSpecError, fmt.Sprint(err)) } else { @@ -323,7 +323,7 @@ func (woc *cronWfOperationCtx) terminateOutstandingWorkflows(ctx context.Context } func (woc *cronWfOperationCtx) runOutstandingWorkflows(ctx context.Context) (bool, error) { - missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx) if err != nil { return false, err } @@ -334,14 +334,14 @@ func (woc *cronWfOperationCtx) runOutstandingWorkflows(ctx context.Context) (boo return false, nil } -func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (time.Time, error) { +func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun(ctx context.Context) (time.Time, error) { // If the CronWorkflow schedule was just updated, then do not run any outstanding workflows. if woc.cronWf.IsUsingNewSchedule() { return time.Time{}, nil } // If this CronWorkflow has been run before, check if we have missed any scheduled executions if woc.cronWf.Status.LastScheduledTime != nil { - for _, schedule := range woc.cronWf.Spec.GetSchedules() { + for _, schedule := range woc.cronWf.Spec.GetSchedules(ctx) { var now time.Time var cronSchedule cron.Schedule if woc.cronWf.Spec.Timezone != "" { diff --git a/workflow/cron/operator_test.go b/workflow/cron/operator_test.go index 693a62aff000..9b8d3c7a3b64 100644 --- a/workflow/cron/operator_test.go +++ b/workflow/cron/operator_test.go @@ -55,6 +55,7 @@ var scheduledWf = ` func TestRunOutstandingWorkflows(t *testing.T) { // To ensure consistency, always start at the next 30 second mark _, _, sec := time.Now().Clock() + ctx := context.Background() var toWait time.Duration if sec <= 30 { toWait = time.Duration(30-sec) * time.Second @@ -78,7 +79,7 @@ func TestRunOutstandingWorkflows(t *testing.T) { log: logrus.WithFields(logrus.Fields{}), } woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString()) - missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) // The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix()) @@ -90,14 +91,14 @@ func TestRunOutstandingWorkflows(t *testing.T) { cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), } - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) // Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation // In this case, since a schedule change is detected, not workflow should be run woc.cronWf.SetSchedule("0 * * * *") - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) @@ -119,7 +120,7 @@ func TestRunOutstandingWorkflows(t *testing.T) { } // Reset last-used-schedule as if the current schedule has been used before woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString()) - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) // The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix()) @@ -131,14 +132,14 @@ func TestRunOutstandingWorkflows(t *testing.T) { cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), } - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) // Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation // In this case, since a schedule change is detected, not workflow should be run woc.cronWf.SetSchedule("0 * * * *") - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) } @@ -310,6 +311,7 @@ spec: func TestLastUsedSchedule(t *testing.T) { var cronWf v1alpha1.CronWorkflow + ctx := context.Background() v1alpha1.MustUnmarshal([]byte(lastUsedSchedule), &cronWf) cs := fake.NewSimpleClientset() @@ -325,7 +327,7 @@ func TestLastUsedSchedule(t *testing.T) { scheduledTimeFunc: inferScheduledTime, } - missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.Equal(t, time.Time{}, missedExecutionTime) @@ -383,6 +385,7 @@ status: ` func TestMissedScheduleAfterCronScheduleWithForbid(t *testing.T) { + ctx := context.Background() var cronWf v1alpha1.CronWorkflow v1alpha1.MustUnmarshal([]byte(forbidMissedSchedule), &cronWf) // StartingDeadlineSeconds is after the current second, so cron should be run @@ -395,7 +398,7 @@ func TestMissedScheduleAfterCronScheduleWithForbid(t *testing.T) { log: logrus.WithFields(logrus.Fields{}), } woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString()) - missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) }) @@ -589,6 +592,7 @@ func TestSpecErrorWithValidAndInvalidSchedules(t *testing.T) { func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) { // To ensure consistency, always start at the next 30 second mark _, _, sec := time.Now().Clock() + ctx := context.Background() var toWait time.Duration if sec <= 30 { toWait = time.Duration(30-sec) * time.Second @@ -612,7 +616,7 @@ func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) { log: logrus.WithFields(logrus.Fields{}), } woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString()) - missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) // The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix()) @@ -624,14 +628,14 @@ func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) { cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), } - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) // Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation // In this case, since a schedule change is detected, not workflow should be run woc.cronWf.SetSchedules([]string{"0 * * * *,1 * * * *"}) - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) @@ -653,7 +657,7 @@ func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) { } // Reset last-used-schedule as if the current schedule has been used before woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString()) - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) // The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix()) @@ -665,14 +669,14 @@ func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) { cronWf: &cronWf, log: logrus.WithFields(logrus.Fields{}), } - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) // Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation // In this case, since a schedule change is detected, not workflow should be run woc.cronWf.SetSchedules([]string{"0 * * * *,1 * * * *"}) - missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun() + missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx) require.NoError(t, err) assert.True(t, missedExecutionTime.IsZero()) } diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index 46f5b6195996..19243036be6e 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -23,6 +23,7 @@ func New(ctx context.Context, serviceName, prometheusName string, config *teleme err = m.Populate(ctx, telemetry.AddVersion, + telemetry.AddDeprecationCounter, ) if err != nil { return nil, err diff --git a/workflow/sync/multiple_test.go b/workflow/sync/multiple_test.go index 5eaf5179014c..76d4d7efbb82 100644 --- a/workflow/sync/multiple_test.go +++ b/workflow/sync/multiple_test.go @@ -38,6 +38,7 @@ func templatedWorkflow(name string, syncBlock string) *wfv1.Workflow { } func TestMultipleMutexLock(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) t.Run("MultipleMutex", func(t *testing.T) { @@ -62,7 +63,7 @@ func TestMultipleMutexLock(t *testing.T) { - name: three `) // Acquire 1 - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -70,7 +71,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.True(t, wfUpdate) // Acquire 2 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -78,7 +79,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.True(t, wfUpdate) // Acquire 3 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf3, "", wf3.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf3, "", wf3.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -86,7 +87,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.True(t, wfUpdate) // Fail to acquire because one locked - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/one", failedLockName) @@ -95,7 +96,7 @@ func TestMultipleMutexLock(t *testing.T) { syncManager.ReleaseAll(wf1) // Fail to acquire because two locked - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -104,7 +105,7 @@ func TestMultipleMutexLock(t *testing.T) { syncManager.ReleaseAll(wf2) // Fail to acquire because three locked - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/three", failedLockName) @@ -113,7 +114,7 @@ func TestMultipleMutexLock(t *testing.T) { syncManager.ReleaseAll(wf3) // Now lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -139,7 +140,7 @@ func TestMultipleMutexLock(t *testing.T) { - name: two `) // Acquire 1 - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -147,7 +148,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.True(t, wfUpdate) // Fail to acquire because one locked - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/one", failedLockName) @@ -155,7 +156,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.True(t, wfUpdate) // Attempt 2, but blocked by all - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -163,7 +164,7 @@ func TestMultipleMutexLock(t *testing.T) { assert.False(t, wfUpdate) // Fail to acquire because one locked - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/one", failedLockName) @@ -174,7 +175,7 @@ func TestMultipleMutexLock(t *testing.T) { syncManager.ReleaseAll(wf2) // Now lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfall, "", wfall.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfall, "", wfall.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -237,7 +238,7 @@ func TestMutexAndSemaphore(t *testing.T) { name: my-config `) // Acquire sem + 1 - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wfmands1, "", wfmands1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wfmands1, "", wfmands1.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -245,7 +246,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.True(t, wfUpdate) // Acquire sem + 2 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfmands2, "", wfmands2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfmands2, "", wfmands2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -253,7 +254,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.True(t, wfUpdate) // Fail 1 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/one", failedLockName) @@ -261,7 +262,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.True(t, wfUpdate) // Fail 2 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -269,7 +270,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.True(t, wfUpdate) // Fail sem - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfsem, "", wfsem.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfsem, "", wfsem.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/double", failedLockName) @@ -280,7 +281,7 @@ func TestMutexAndSemaphore(t *testing.T) { syncManager.ReleaseAll(wfmands1) // Succeed 1 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -288,7 +289,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.True(t, wfUpdate) // Fail 2 - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -296,7 +297,7 @@ func TestMutexAndSemaphore(t *testing.T) { assert.False(t, wfUpdate) // Succeed sem - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfsem, "", wfsem.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfsem, "", wfsem.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -307,7 +308,7 @@ func TestMutexAndSemaphore(t *testing.T) { syncManager.ReleaseAll(wfsem) // And reacquire in a sem+mutex wf - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfmands1copy, "", wfmands1copy.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfmands1copy, "", wfmands1copy.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -317,6 +318,7 @@ func TestMutexAndSemaphore(t *testing.T) { }) } func TestPriority(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) t.Run("Priority", func(t *testing.T) { @@ -340,7 +342,7 @@ func TestPriority(t *testing.T) { - name: two `) // Acquire 1 + 2 as low - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wflow, "", wflow.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wflow, "", wflow.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -348,7 +350,7 @@ func TestPriority(t *testing.T) { assert.True(t, wfUpdate) // Attempt to acquire 2, fail - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -356,7 +358,7 @@ func TestPriority(t *testing.T) { assert.True(t, wfUpdate) // Attempt get 1 + 2 as high but fail - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfhigh, "", wfhigh.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfhigh, "", wfhigh.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/one", failedLockName) @@ -364,7 +366,7 @@ func TestPriority(t *testing.T) { assert.True(t, wfUpdate) // Attempt to acquire 2 again as two, fail - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -375,7 +377,7 @@ func TestPriority(t *testing.T) { syncManager.ReleaseAll(wflow) // Attempt to acquire 2 again, but priority blocks - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/two", failedLockName) @@ -383,7 +385,7 @@ func TestPriority(t *testing.T) { assert.False(t, wfUpdate) // Attempt get 1 + 2 as high and priority succeeds - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wfhigh, "", wfhigh.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wfhigh, "", wfhigh.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -393,6 +395,7 @@ func TestPriority(t *testing.T) { } func TestDuplicates(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) t.Run("Mutex", func(t *testing.T) { @@ -403,7 +406,7 @@ func TestDuplicates(t *testing.T) { - name: one - name: one `) - _, _, _, _, err := syncManager.TryAcquire(wfdupmutex, "", wfdupmutex.Spec.Synchronization) + _, _, _, _, err := syncManager.TryAcquire(ctx, wfdupmutex, "", wfdupmutex.Spec.Synchronization) assert.Error(t, err) }) t.Run("Semaphore", func(t *testing.T) { @@ -418,7 +421,7 @@ func TestDuplicates(t *testing.T) { key: double name: my-config `) - _, _, _, _, err := syncManager.TryAcquire(wfdupsemaphore, "", wfdupsemaphore.Spec.Synchronization) + _, _, _, _, err := syncManager.TryAcquire(ctx, wfdupsemaphore, "", wfdupsemaphore.Spec.Synchronization) assert.Error(t, err) }) } diff --git a/workflow/sync/mutex_test.go b/workflow/sync/mutex_test.go index fc3e5e19d95f..507d88479e90 100644 --- a/workflow/sync/mutex_test.go +++ b/workflow/sync/mutex_test.go @@ -110,6 +110,7 @@ status: ` func TestMutexLock(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) t.Run("InitializeSynchronization", func(t *testing.T) { @@ -121,7 +122,7 @@ func TestMutexLock(t *testing.T) { ctx := context.Background() wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(wfList.Items) + syncManager.Initialize(ctx, wfList.Items) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) { @@ -133,7 +134,7 @@ func TestMutexLock(t *testing.T) { wf1 := wf.DeepCopy() wf2 := wf.DeepCopy() wf3 := wf.DeepCopy() - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -145,7 +146,7 @@ func TestMutexLock(t *testing.T) { assert.Equal(t, getHolderKey(wf, ""), wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.True(t, status) assert.Empty(t, msg) @@ -153,7 +154,7 @@ func TestMutexLock(t *testing.T) { assert.False(t, wfUpdate) wf1.Name = "two" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/test", failedLockName) @@ -163,7 +164,7 @@ func TestMutexLock(t *testing.T) { wf2.Name = "three" wf2.Spec.Priority = ptr.To(int32(5)) holderKey2 := getHolderKey(wf2, "") - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/test", failedLockName) @@ -171,20 +172,20 @@ func TestMutexLock(t *testing.T) { assert.True(t, wfUpdate) wf3.Name = "four" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf3, "", wf3.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf3, "", wf3.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/test", failedLockName) assert.False(t, status) assert.True(t, wfUpdate) - syncManager.Release(wf, "", wf.Spec.Synchronization) + syncManager.Release(ctx, wf, "", wf.Spec.Synchronization) assert.Equal(t, holderKey2, nextWorkflow) require.NotNil(t, wf.Status.Synchronization) assert.Empty(t, wf.Status.Synchronization.Mutex.Holding) // Low priority workflow try to acquire the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/test", failedLockName) @@ -192,7 +193,7 @@ func TestMutexLock(t *testing.T) { assert.False(t, wfUpdate) // High Priority workflow acquires the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -214,7 +215,7 @@ func TestMutexLock(t *testing.T) { wf1 := wf.DeepCopy() wf2 := wf.DeepCopy() wf3 := wf.DeepCopy() - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -227,7 +228,7 @@ func TestMutexLock(t *testing.T) { assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.True(t, status) assert.Empty(t, failedLockName) @@ -236,7 +237,7 @@ func TestMutexLock(t *testing.T) { wf1.Name = "two" wf1.Namespace = "two" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "other/Mutex/test", failedLockName) @@ -247,7 +248,7 @@ func TestMutexLock(t *testing.T) { wf2.Namespace = "three" wf2.Spec.Priority = ptr.To(int32(5)) holderKey2 := getHolderKey(wf2, "") - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "other/Mutex/test", failedLockName) @@ -256,20 +257,20 @@ func TestMutexLock(t *testing.T) { wf3.Name = "four" wf3.Namespace = "four" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf3, "", wf3.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf3, "", wf3.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "other/Mutex/test", failedLockName) assert.False(t, status) assert.True(t, wfUpdate) - syncManager.Release(wf, "", wf.Spec.Synchronization) + syncManager.Release(ctx, wf, "", wf.Spec.Synchronization) assert.Equal(t, holderKey2, nextWorkflow) require.NotNil(t, wf.Status.Synchronization) assert.Empty(t, wf.Status.Synchronization.Mutex.Holding) // Low priority workflow try to acquire the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "other/Mutex/test", failedLockName) @@ -277,7 +278,7 @@ func TestMutexLock(t *testing.T) { assert.False(t, wfUpdate) // High Priority workflow acquires the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -392,6 +393,7 @@ status: ` func TestMutexTmplLevel(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) @@ -403,7 +405,7 @@ func TestMutexTmplLevel(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(mutexWfWithTmplLevel) tmpl := wf.Spec.Templates[1] - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -415,14 +417,14 @@ func TestMutexTmplLevel(t *testing.T) { assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization) require.NoError(t, err) assert.True(t, wfUpdate) assert.Equal(t, "default/Mutex/welcome", failedLockName) assert.False(t, status) assert.NotEmpty(t, msg) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-1432992664", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-1432992664", tmpl.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/welcome", failedLockName) @@ -431,12 +433,12 @@ func TestMutexTmplLevel(t *testing.T) { expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474") assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) - syncManager.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) + syncManager.Release(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) require.NotNil(t, wf.Status.Synchronization) require.NotNil(t, wf.Status.Synchronization.Mutex) assert.Empty(t, wf.Status.Synchronization.Mutex.Holding) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -448,7 +450,7 @@ func TestMutexTmplLevel(t *testing.T) { assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) assert.NotEqual(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder) - syncManager.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) + syncManager.Release(ctx, wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) require.NotNil(t, wf.Status.Synchronization) require.NotNil(t, wf.Status.Synchronization.Mutex) assert.NotEmpty(t, wf.Status.Synchronization.Mutex.Holding) diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index fccf99857065..fb0feda826d5 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -1,6 +1,7 @@ package sync import ( + "context" "fmt" "strings" "sync" @@ -105,9 +106,9 @@ const ( // a synchronization exists both at the template level // and at the workflow level -> impossible to upgrade correctly // due to ambiguity. Currently we just assume workflow level. -func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelType, error) { +func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName string) (SyncLevelType, error) { if wf.Spec.Synchronization != nil { - syncItems, err := allSyncItems(wf.Spec.Synchronization) + syncItems, err := allSyncItems(ctx, wf.Spec.Synchronization) if err != nil { return ErrorLevel, err } @@ -126,7 +127,7 @@ func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelTy var lastErr error for _, template := range wf.Spec.Templates { if template.Synchronization != nil { - syncItems, err := allSyncItems(template.Synchronization) + syncItems, err := allSyncItems(ctx, template.Synchronization) if err != nil { return ErrorLevel, err } @@ -149,7 +150,7 @@ func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelTy return ErrorLevel, lastErr } -func (sm *Manager) Initialize(wfs []wfv1.Workflow) { +func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) { for _, wf := range wfs { if wf.Status.Synchronization == nil { continue @@ -168,7 +169,7 @@ func (sm *Manager) Initialize(wfs []wfv1.Workflow) { } for _, holders := range holding.Holders { - level, err := getWorkflowSyncLevelByName(&wf, holding.Semaphore) + level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Semaphore) if err != nil { log.Warnf("cannot obtain lock level for '%s' : %v", holding.Semaphore, err) continue @@ -188,7 +189,7 @@ func (sm *Manager) Initialize(wfs []wfv1.Workflow) { if mutex == nil { mutex := sm.initializeMutex(holding.Mutex) if holding.Holder != "" { - level, err := getWorkflowSyncLevelByName(&wf, holding.Mutex) + level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Mutex) if err != nil { log.Warnf("cannot obtain lock level for '%s' : %v", holding.Mutex, err) continue @@ -206,7 +207,7 @@ func (sm *Manager) Initialize(wfs []wfv1.Workflow) { // TryAcquire tries to acquire the lock from semaphore. // It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available, the failed lock, and any error encountered -func (sm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, string, error) { +func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, string, error) { sm.lock.Lock() defer sm.lock.Unlock() @@ -215,7 +216,7 @@ func (sm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *w } failedLockName := "" - syncItems, err := allSyncItems(syncLockRef) + syncItems, err := allSyncItems(ctx, syncLockRef) if err != nil { return false, false, "", failedLockName, fmt.Errorf("requested configuration is invalid: %w", err) } @@ -305,7 +306,7 @@ func (sm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *w } } -func (sm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization) { +func (sm *Manager) Release(ctx context.Context, wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization) { if syncRef == nil { return } @@ -316,7 +317,7 @@ func (sm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Syn holderKey := getHolderKey(wf, nodeName) // Ignoring error here is as good as it's going to be, we shouldn't get here as we should // should never have acquired anything if this errored - syncItems, _ := allSyncItems(syncRef) + syncItems, _ := allSyncItems(ctx, syncRef) for _, syncItem := range syncItems { lockName, err := getLockName(syncItem, wf.Namespace) diff --git a/workflow/sync/sync_manager_test.go b/workflow/sync/sync_manager_test.go index 4e926a12c8f1..b70fd89de56e 100644 --- a/workflow/sync/sync_manager_test.go +++ b/workflow/sync/sync_manager_test.go @@ -344,7 +344,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(wfList.Items) + syncManager.Initialize(ctx, wfList.Items) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("InitializeSynchronizationWithInvalid", func(t *testing.T) { @@ -356,7 +356,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wfclientset := fakewfclientset.NewSimpleClientset(wf) wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(wfList.Items) + syncManager.Initialize(ctx, wfList.Items) assert.Empty(t, syncManager.syncLockMap) }) @@ -369,7 +369,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wf1 := wf.DeepCopy() wf2 := wf.DeepCopy() wf3 := wf.DeepCopy() - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -382,7 +382,7 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) // Try to acquire again - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.True(t, status) assert.Empty(t, msg) @@ -390,7 +390,7 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.False(t, wfUpdate) wf1.Name = "two" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) @@ -400,7 +400,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wf2.Name = "three" wf2.Spec.Priority = ptr.To(int32(5)) holderKey2 := getHolderKey(wf2, "") - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) @@ -408,20 +408,20 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.True(t, wfUpdate) wf3.Name = "four" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf3, "", wf3.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf3, "", wf3.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) assert.False(t, status) assert.True(t, wfUpdate) - syncManager.Release(wf, "", wf.Spec.Synchronization) + syncManager.Release(ctx, wf, "", wf.Spec.Synchronization) assert.Equal(t, holderKey2, nextKey) require.NotNil(t, wf.Status.Synchronization) assert.Empty(t, wf.Status.Synchronization.Semaphore.Holding[0].Holders) // Low priority workflow try to acquire the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) @@ -429,7 +429,7 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.True(t, wfUpdate) // High Priority workflow acquires the lock - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -470,7 +470,7 @@ func TestResizeSemaphoreSize(t *testing.T) { wf.CreationTimestamp = metav1.Time{Time: time.Now()} wf1 := wf.DeepCopy() wf2 := wf.DeepCopy() - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -482,7 +482,7 @@ func TestResizeSemaphoreSize(t *testing.T) { assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) wf1.Name = "two" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) @@ -490,7 +490,7 @@ func TestResizeSemaphoreSize(t *testing.T) { assert.True(t, wfUpdate) wf2.Name = "three" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/workflow", failedLockName) @@ -504,7 +504,7 @@ func TestResizeSemaphoreSize(t *testing.T) { _, err = kube.CoreV1().ConfigMaps("default").Update(ctx, cm, metav1.UpdateOptions{}) require.NoError(t, err) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.True(t, status) assert.Empty(t, msg) @@ -515,7 +515,7 @@ func TestResizeSemaphoreSize(t *testing.T) { key = getHolderKey(wf1, "") assert.Equal(t, key, wf1.Status.Synchronization.Semaphore.Holding[0].Holders[0]) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -546,7 +546,7 @@ func TestSemaphoreTmplLevel(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(wfWithTmplSemaphore) tmpl := wf.Spec.Templates[2] - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -558,26 +558,26 @@ func TestSemaphoreTmplLevel(t *testing.T) { assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) // Try to acquire again - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) require.NoError(t, err) assert.True(t, status) assert.Empty(t, failedLockName) assert.False(t, wfUpdate) assert.Empty(t, msg) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "semaphore-tmpl-level-xjvln-1607747183", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "semaphore-tmpl-level-xjvln-1607747183", tmpl.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/ConfigMap/my-config/template", failedLockName) assert.True(t, wfUpdate) assert.False(t, status) - syncManager.Release(wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) + syncManager.Release(ctx, wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) require.NotNil(t, wf.Status.Synchronization) require.NotNil(t, wf.Status.Synchronization.Semaphore) assert.Empty(t, wf.Status.Synchronization.Semaphore.Holding[0].Holders) - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf, "semaphore-tmpl-level-xjvln-1607747183", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf, "semaphore-tmpl-level-xjvln-1607747183", tmpl.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -611,7 +611,7 @@ func TestTriggerWFWithAvailableLock(t *testing.T) { for i := 0; i < 3; i++ { wf := wfv1.MustUnmarshalWorkflow(wfWithSemaphore) wf.Name = fmt.Sprintf("%s-%d", "acquired", i) - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(msg) assert.Empty(failedLockName) @@ -623,21 +623,22 @@ func TestTriggerWFWithAvailableLock(t *testing.T) { for i := 0; i < 3; i++ { wf := wfv1.MustUnmarshalWorkflow(wfWithSemaphore) wf.Name = fmt.Sprintf("%s-%d", "wait", i) - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(msg) assert.Equal("default/ConfigMap/my-config/workflow", failedLockName) assert.False(status) assert.True(wfUpdate) } - syncManager.Release(&wfs[0], "", wfs[0].Spec.Synchronization) + syncManager.Release(ctx, &wfs[0], "", wfs[0].Spec.Synchronization) triggerCount = 0 - syncManager.Release(&wfs[1], "", wfs[1].Spec.Synchronization) + syncManager.Release(ctx, &wfs[1], "", wfs[1].Spec.Synchronization) assert.Equal(2, triggerCount) }) } func TestMutexWfLevel(t *testing.T) { + ctx := context.Background() kube := fake.NewSimpleClientset() syncLimitFunc := GetSyncLimitFunc(kube) t.Run("WorkflowLevelMutexAcquireAndRelease", func(t *testing.T) { @@ -649,7 +650,7 @@ func TestMutexWfLevel(t *testing.T) { wf1 := wf.DeepCopy() wf2 := wf.DeepCopy() - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "", wf.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "", wf.Spec.Synchronization) require.NoError(t, err) assert.Empty(t, msg) assert.Empty(t, failedLockName) @@ -660,7 +661,7 @@ func TestMutexWfLevel(t *testing.T) { require.NotNil(t, wf.Status.Synchronization.Mutex.Holding) wf1.Name = "two" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf1, "", wf1.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/my-mutex", failedLockName) @@ -668,7 +669,7 @@ func TestMutexWfLevel(t *testing.T) { assert.True(t, wfUpdate) wf2.Name = "three" - status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(wf2, "", wf2.Spec.Synchronization) + status, wfUpdate, msg, failedLockName, err = syncManager.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) assert.NotEmpty(t, msg) assert.Equal(t, "default/Mutex/my-mutex", failedLockName) @@ -709,10 +710,10 @@ func TestCheckWorkflowExistence(t *testing.T) { wfSema := wfv1.MustUnmarshalWorkflow(wfWithSemaphore) wfSema1 := wfSema.DeepCopy() wfSema1.Name = "test2" - _, _, _, _, _ = syncManager.TryAcquire(wfMutex, "", wfMutex.Spec.Synchronization) - _, _, _, _, _ = syncManager.TryAcquire(wfMutex1, "", wfMutex.Spec.Synchronization) - _, _, _, _, _ = syncManager.TryAcquire(wfSema, "", wfSema.Spec.Synchronization) - _, _, _, _, _ = syncManager.TryAcquire(wfSema1, "", wfSema.Spec.Synchronization) + _, _, _, _, _ = syncManager.TryAcquire(ctx, wfMutex, "", wfMutex.Spec.Synchronization) + _, _, _, _, _ = syncManager.TryAcquire(ctx, wfMutex1, "", wfMutex.Spec.Synchronization) + _, _, _, _, _ = syncManager.TryAcquire(ctx, wfSema, "", wfSema.Spec.Synchronization) + _, _, _, _, _ = syncManager.TryAcquire(ctx, wfSema1, "", wfSema.Spec.Synchronization) mutex := syncManager.syncLockMap["default/Mutex/my-mutex"].(*prioritySemaphore) semaphore := syncManager.syncLockMap["default/ConfigMap/my-config/workflow"] @@ -872,7 +873,7 @@ status: }, WorkflowExistenceFunc) t.Run("InitializeMutex", func(t *testing.T) { tmpl := wf.Spec.Templates[1] - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "synchronization-tmpl-level-sgg6t-1949670081", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-sgg6t-1949670081", tmpl.Synchronization) require.NoError(t, err) assert.Empty(msg) assert.Empty(failedLockName) @@ -883,7 +884,7 @@ status: }) t.Run("InitializeSemaphore", func(t *testing.T) { tmpl := wf.Spec.Templates[2] - status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(wf, "synchronization-tmpl-level-sgg6t-1899337224", tmpl.Synchronization) + status, wfUpdate, msg, failedLockName, err := syncManager.TryAcquire(ctx, wf, "synchronization-tmpl-level-sgg6t-1899337224", tmpl.Synchronization) require.NoError(t, err) assert.Empty(msg) assert.Empty(failedLockName) @@ -1125,6 +1126,7 @@ status: ` func TestMutexMigration(t *testing.T) { + ctx := context.Background() assert := assert.New(t) require := require.New(t) kube := fake.NewSimpleClientset() @@ -1148,9 +1150,9 @@ func TestMutexMigration(t *testing.T) { syncMgr.syncLockMap = make(map[string]semaphore) wfs := []wfv1.Workflow{*wfMutex2.DeepCopy()} - syncMgr.Initialize(wfs) + syncMgr.Initialize(ctx, wfs) - syncItems, err := allSyncItems(wfMutex2.Spec.Synchronization) + syncItems, err := allSyncItems(ctx, wfMutex2.Spec.Synchronization) require.NoError(err) lockName, err := getLockName(syncItems[0], wfMutex2.Namespace) require.NoError(err) @@ -1165,7 +1167,7 @@ func TestMutexMigration(t *testing.T) { assert.Equal(holderKey, holders[0]) // We should already have this lock since we acquired it above - status, _, _, _, err := syncMgr.TryAcquire(wfMutex2, "", wfMutex.Spec.Synchronization) + status, _, _, _, err := syncMgr.TryAcquire(ctx, wfMutex2, "", wfMutex.Spec.Synchronization) require.NoError(err) // BUG NOT PRESENT: https://github.com/argoproj/argo-workflows/issues/8684 assert.True(status) @@ -1191,9 +1193,9 @@ func TestMutexMigration(t *testing.T) { assert.Equal(1, numFound) wfs := []wfv1.Workflow{*wfMutex3.DeepCopy()} - syncMgr.Initialize(wfs) + syncMgr.Initialize(ctx, wfs) - syncItems, err := allSyncItems(wfMutex3.Spec.Templates[1].Synchronization) + syncItems, err := allSyncItems(ctx, wfMutex3.Spec.Templates[1].Synchronization) require.NoError(err) lockName, err := getLockName(syncItems[0], wfMutex3.Namespace) require.NoError(err) @@ -1209,7 +1211,7 @@ func TestMutexMigration(t *testing.T) { // PROVE: bug absent assert.Equal(holderKey, holders[0]) - status, _, _, _, err := syncMgr.TryAcquire(wfMutex3, foundNodeID, wfMutex.Spec.Synchronization) + status, _, _, _, err := syncMgr.TryAcquire(ctx, wfMutex3, foundNodeID, wfMutex.Spec.Synchronization) require.NoError(err) // BUG NOT PRESENT: https://github.com/argoproj/argo-workflows/issues/8684 assert.True(status) diff --git a/workflow/sync/syncitems.go b/workflow/sync/syncitems.go index 8e8cd79a3860..db28f1fde0ff 100644 --- a/workflow/sync/syncitems.go +++ b/workflow/sync/syncitems.go @@ -1,10 +1,12 @@ package sync import ( + "context" "errors" "reflect" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/deprecation" ) type syncItem struct { @@ -12,13 +14,15 @@ type syncItem struct { mutex *v1alpha1.Mutex } -func allSyncItems(sync *v1alpha1.Synchronization) ([]*syncItem, error) { +func allSyncItems(ctx context.Context, sync *v1alpha1.Synchronization) ([]*syncItem, error) { var syncItems []*syncItem if sync.Semaphore != nil { syncItems = append(syncItems, &syncItem{semaphore: sync.Semaphore}) + deprecation.Record(ctx, deprecation.Semaphore) } if sync.Mutex != nil { syncItems = append(syncItems, &syncItem{mutex: sync.Mutex}) + deprecation.Record(ctx, deprecation.Mutex) } for _, semaphore := range sync.Semaphores { syncItems = append(syncItems, &syncItem{semaphore: semaphore}) diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index a466bc52f51b..f27375a7b3a9 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -1,6 +1,7 @@ package validate import ( + "context" "encoding/json" "fmt" "reflect" @@ -366,7 +367,7 @@ func ValidateClusterWorkflowTemplate(wftmplGetter templateresolution.WorkflowTem } // ValidateCronWorkflow validates a CronWorkflow -func ValidateCronWorkflow(wftmplGetter templateresolution.WorkflowTemplateNamespacedGetter, cwftmplGetter templateresolution.ClusterWorkflowTemplateGetter, cronWf *wfv1.CronWorkflow) error { +func ValidateCronWorkflow(ctx context.Context, wftmplGetter templateresolution.WorkflowTemplateNamespacedGetter, cwftmplGetter templateresolution.ClusterWorkflowTemplateGetter, cronWf *wfv1.CronWorkflow) error { if len(cronWf.Spec.Schedules) > 0 && cronWf.Spec.Schedule != "" { return fmt.Errorf("cron workflow cant be configured with both Spec.Schedule and Spec.Schedules") } @@ -377,7 +378,7 @@ func ValidateCronWorkflow(wftmplGetter templateresolution.WorkflowTemplateNamesp return fmt.Errorf("cron workflow name %q must not be more than 52 characters long (currently %d)", cronWf.Name, len(cronWf.Name)) } - for _, schedule := range cronWf.Spec.GetSchedules() { + for _, schedule := range cronWf.Spec.GetSchedules(ctx) { if _, err := cron.ParseStandard(schedule); err != nil { return errors.Errorf(errors.CodeBadRequest, "cron schedule %s is malformed: %s", schedule, err) } diff --git a/workflow/validate/validate_test.go b/workflow/validate/validate_test.go index e5a597d86a43..3f541cfb5cac 100644 --- a/workflow/validate/validate_test.go +++ b/workflow/validate/validate_test.go @@ -2802,7 +2802,7 @@ func TestMaxLengthName(t *testing.T) { require.EqualError(t, err, "cluster workflow template name \"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\" must not be more than 63 characters long (currently 70)") cwf := &wfv1.CronWorkflow{ObjectMeta: metav1.ObjectMeta{Name: strings.Repeat("a", 60)}} - err = ValidateCronWorkflow(wftmplGetter, cwftmplGetter, cwf) + err = ValidateCronWorkflow(context.Background(), wftmplGetter, cwftmplGetter, cwf) require.EqualError(t, err, "cron workflow name \"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\" must not be more than 52 characters long (currently 60)") }