From a4c6ab763769f1775591c47b44037a6fcb1a317f Mon Sep 17 00:00:00 2001 From: regadas Date: Thu, 6 Oct 2022 19:14:17 -0400 Subject: [PATCH] Support -1 as parallelism value for adaptive batch adaptive_scheduler --- apis/flinkcluster/v1beta1/flinkcluster_default.go | 1 + apis/flinkcluster/v1beta1/flinkcluster_validate.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default.go b/apis/flinkcluster/v1beta1/flinkcluster_default.go index 6834e2fa..e2ae463e 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default.go @@ -27,6 +27,7 @@ import ( ) const ( + DefaultParallelism = -1 DefaultJobManagerReplicas = 1 DefaultTaskManagerReplicas = 3 ForceTearDownAfter = time.Second * 10 diff --git a/apis/flinkcluster/v1beta1/flinkcluster_validate.go b/apis/flinkcluster/v1beta1/flinkcluster_validate.go index cac603a1..5f962e02 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_validate.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_validate.go @@ -533,8 +533,8 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error { return fmt.Errorf("job jarFile or pythonFile or pythonModule is unspecified") } - if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 { - return fmt.Errorf("job parallelism must be >= 1") + if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 && *jobSpec.Parallelism != DefaultParallelism { + return fmt.Errorf("job parallelism must be -1 (adaptive) or >= 1") } if jobSpec.RestartPolicy == nil {