Skip to content

Commit

Permalink
Fix regression in handling of default heap memory config (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 16, 2020
1 parent 6f7192b commit b5eb196
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 76 deletions.
22 changes: 20 additions & 2 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,17 @@ Below is the list of fields in the custom resource and their description:
* **taskSlots** `type:int32 required=true`
Number of task slots per task manager.

* **offHeapMemoryFraction** `type:float64`
**For Flink 1.10 and below**
A value between 0 and 1 that represents % of container memory dedicated to system / off heap. The
remaining memory is given to the taskmanager. Note that Flink may further reserve some of this
memory for off-heap uses like network buffers, so you may see the JVM heap size configured to
a lower amount. This configures `taskmanager.heap.size`.

* **systemMemoryFraction** `type:float64`
A value between 0 and 1 that represents % of container memory dedicated to the system. The remaining memory is given to the taskmanager.
**For Flink 1.11 and above**
A value between 0 and 1 that represents % of container memory dedicated to the system. The remaining memory is
given to the taskmanager process. This configures `taskmanger.memory.process.size`.

* **nodeSelector** `type:map[string]string`
Configuration for the node selectors used for the task manager.
Expand All @@ -61,8 +70,17 @@ Below is the list of fields in the custom resource and their description:
Number of job managers for the flink cluster. If multiple job managers are provided, the user has to ensure that
correct environment variables are set for High availability mode.

* **offHeapMemoryFraction** `type:float64`
**For Flink 1.10 and below**
A value between 0 and 1 that represents % of container memory dedicated to system / off heap. The
remaining memory is given to the jobmanager. Note that Flink may further reserve some of this
memory for off-heap uses like network buffers, so you may see the JVM heap size configured to
a lower amount. This configures `jobmanager.heap.size`.

* **systemMemoryFraction** `type:float64`
A value between 0 and 1 that represents % of container memory dedicated to the system. The remaining memory is given to the job manager.
**For Flink 1.11 and above**
A value between 0 and 1 that represents % of container memory dedicated to the system. The remaining memory is
given to the jobmanager process. This configures `jobmanager.memory.process.size`.

* **nodeSelector** `type:map[string]string`
Configuration for the node selectors used for the job manager.
Expand Down
30 changes: 14 additions & 16 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,25 +107,23 @@ func (in *FlinkConfig) DeepCopy() *FlinkConfig {
}

type JobManagerConfig struct {
Resources *apiv1.ResourceRequirements `json:"resources,omitempty"`
EnvConfig EnvironmentConfig `json:"envConfig"`
Replicas *int32 `json:"replicas,omitempty"`
// Deprecated: use SystemMemoryFraction instead
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
SystemMemoryFraction *float64 `json:"systemMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
Resources *apiv1.ResourceRequirements `json:"resources,omitempty"`
EnvConfig EnvironmentConfig `json:"envConfig"`
Replicas *int32 `json:"replicas,omitempty"`
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
SystemMemoryFraction *float64 `json:"systemMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
}

type TaskManagerConfig struct {
Resources *apiv1.ResourceRequirements `json:"resources,omitempty"`
EnvConfig EnvironmentConfig `json:"envConfig"`
TaskSlots *int32 `json:"taskSlots,omitempty"`
// Deprecated: use SystemMemoryFraction instead
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
SystemMemoryFraction *float64 `json:"systemMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
Resources *apiv1.ResourceRequirements `json:"resources,omitempty"`
EnvConfig EnvironmentConfig `json:"envConfig"`
TaskSlots *int32 `json:"taskSlots,omitempty"`
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
SystemMemoryFraction *float64 `json:"systemMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
}

type EnvironmentConfig struct {
Expand Down
66 changes: 40 additions & 26 deletions pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,11 @@ func firstNonNil(x *int32, y int32) int32 {
return y
}

func getFraction(systemMemoryFraction *float64, offHeapMemoryFraction *float64) float64 {
if isValidFraction(systemMemoryFraction) {
return *systemMemoryFraction
}
if isValidFraction(offHeapMemoryFraction) {
return *offHeapMemoryFraction
}
if offHeapMemoryFraction != nil {
return OffHeapMemoryDefaultFraction
func getValidFraction(x *float64, d float64) float64 {
if x != nil && *x >= float64(0) && *x <= float64(1) {
return *x
}

return SystemMemoryDefaultFraction
}

func isValidFraction(fraction *float64) bool {
return fraction != nil && *fraction >= float64(0) && *fraction <= float64(1)
return d
}

func getTaskmanagerSlots(app *v1beta1.FlinkApplication) int32 {
Expand Down Expand Up @@ -108,13 +97,31 @@ func computeMemory(memoryInBytes float64, fraction float64) string {
return fmt.Sprintf("%dk", kbs)
}

func getTaskManagerMemory(app *v1beta1.FlinkApplication, fraction float64) string {
// heap memory configs are used for Flink < 1.11

func getTaskManagerHeapMemory(app *v1beta1.FlinkApplication) string {
tmMemory := float64(getRequestedTaskManagerMemory(app))
fraction := getValidFraction(app.Spec.TaskManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction)
return computeMemory(tmMemory, fraction)
}

func getJobManagerMemory(app *v1beta1.FlinkApplication, fraction float64) string {
func getJobManagerHeapMemory(app *v1beta1.FlinkApplication) string {
jmMemory := float64(getRequestedJobManagerMemory(app))
fraction := getValidFraction(app.Spec.JobManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction)
return computeMemory(jmMemory, fraction)
}

// process memory configs are used for Flink >= 1.11

func getTaskManagerProcessMemory(app *v1beta1.FlinkApplication) string {
tmMemory := float64(getRequestedTaskManagerMemory(app))
fraction := getValidFraction(app.Spec.TaskManagerConfig.SystemMemoryFraction, SystemMemoryDefaultFraction)
return computeMemory(tmMemory, fraction)
}

func getJobManagerProcessMemory(app *v1beta1.FlinkApplication) string {
jmMemory := float64(getRequestedJobManagerMemory(app))
fraction := getValidFraction(app.Spec.JobManagerConfig.SystemMemoryFraction, SystemMemoryDefaultFraction)
return computeMemory(jmMemory, fraction)
}

Expand Down Expand Up @@ -143,17 +150,24 @@ func renderFlinkConfig(app *v1beta1.FlinkApplication) (string, error) {
appVersion, err := version.NewVersion(getFlinkVersion(app))
v11, _ := version.NewVersion("1.11")

//nolint // fall back to the old config for backwards-compatibility
jobManagerFraction := getFraction(app.Spec.JobManagerConfig.SystemMemoryFraction, app.Spec.JobManagerConfig.OffHeapMemoryFraction)
//nolint // fall back to the old config for backwards-compatibility
taskManagerFraction := getFraction(app.Spec.TaskManagerConfig.SystemMemoryFraction, app.Spec.TaskManagerConfig.OffHeapMemoryFraction)

if err != nil || appVersion == nil || appVersion.LessThan(v11) {
(*config)["jobmanager.heap.size"] = getJobManagerMemory(app, jobManagerFraction)
(*config)["taskmanager.heap.size"] = getTaskManagerMemory(app, taskManagerFraction)
// if process memory is specified for < 11, error out
if app.Spec.JobManagerConfig.SystemMemoryFraction != nil || app.Spec.TaskManagerConfig.SystemMemoryFraction != nil {
return "", fmt.Errorf("systemMemoryFraction config cannot be used with flinkVersion < 1.11', use " +
"offHeapMemoryFraction instead")
}

(*config)["jobmanager.heap.size"] = getJobManagerHeapMemory(app)
(*config)["taskmanager.heap.size"] = getTaskManagerHeapMemory(app)
} else {
(*config)["jobmanager.memory.process.size"] = getJobManagerMemory(app, jobManagerFraction)
(*config)["taskmanager.memory.process.size"] = getTaskManagerMemory(app, taskManagerFraction)
// if heap memory is used for >= 1.11, error out
if app.Spec.JobManagerConfig.OffHeapMemoryFraction != nil || app.Spec.TaskManagerConfig.OffHeapMemoryFraction != nil {
return "", fmt.Errorf("offHeapMemoryFraction config cannot be used with flinkVersion >= 1.11'; " +
"use systemMemoryFraction istead")
}

(*config)["jobmanager.memory.process.size"] = getJobManagerProcessMemory(app)
(*config)["taskmanager.memory.process.size"] = getTaskManagerProcessMemory(app)
}

// get the keys for the map
Expand Down
33 changes: 24 additions & 9 deletions pkg/controller/flink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestEnsureNoFractionalHeapMemory(t *testing.T) {
//nolint // fall back to the old config for backwards-compatibility
app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

assert.Equal(t, "41287k", getTaskManagerMemory(&app, offHeapMemoryFraction))
assert.Equal(t, "41287k", getTaskManagerHeapMemory(&app))
}

func TestGetTaskManagerHeapMemory(t *testing.T) {
Expand All @@ -164,10 +164,9 @@ func TestGetTaskManagerHeapMemory(t *testing.T) {
}
offHeapMemoryFraction := float64(0.5)
app.Spec.TaskManagerConfig.Resources = &tmResources
//nolint // fall back to the old config for backwards-compatibility
app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

assert.Equal(t, "32768k", getTaskManagerMemory(&app, offHeapMemoryFraction))
assert.Equal(t, "32768k", getTaskManagerHeapMemory(&app))
}

func TestGetJobManagerHeapMemory(t *testing.T) {
Expand All @@ -184,10 +183,26 @@ func TestGetJobManagerHeapMemory(t *testing.T) {
}
offHeapMemoryFraction := float64(0.5)
app.Spec.JobManagerConfig.Resources = &jmResources
//nolint // fall back to the old config for backwards-compatibility
app.Spec.JobManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

assert.Equal(t, "32768k", getJobManagerMemory(&app, offHeapMemoryFraction))
assert.Equal(t, "32768k", getJobManagerHeapMemory(&app))
}

func TestGetJobManagerDefaultHeapMemory(t *testing.T) {
app := v1beta1.FlinkApplication{}
jmResources := coreV1.ResourceRequirements{
Requests: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
}
app.Spec.JobManagerConfig.Resources = &jmResources

assert.Equal(t, "32768k", getJobManagerHeapMemory(&app))
}

func TestGetJobManagerProcessMemory(t *testing.T) {
Expand All @@ -206,7 +221,7 @@ func TestGetJobManagerProcessMemory(t *testing.T) {
app.Spec.JobManagerConfig.Resources = &jmResources
app.Spec.JobManagerConfig.SystemMemoryFraction = &systemMemoryFraction

assert.Equal(t, "52428k", getJobManagerMemory(&app, systemMemoryFraction))
assert.Equal(t, "52428k", getJobManagerProcessMemory(&app))
}

func TestGetTaskManagerProcessMemory(t *testing.T) {
Expand All @@ -225,7 +240,7 @@ func TestGetTaskManagerProcessMemory(t *testing.T) {
app.Spec.TaskManagerConfig.Resources = &tmResources
app.Spec.TaskManagerConfig.SystemMemoryFraction = &systemMemoryFraction

assert.Equal(t, "52428k", getTaskManagerMemory(&app, systemMemoryFraction))
assert.Equal(t, "52428k", getTaskManagerProcessMemory(&app))
}

func MemoryConfigurationForVersion(t *testing.T, version string) []string {
Expand Down Expand Up @@ -321,12 +336,12 @@ func TestMemoryConfigurationForVersionBelow11(t *testing.T) {

expected := []string{
fmt.Sprintf("blob.server.port: %d", BlobDefaultPort),
"jobmanager.heap.size: 419430k",
"jobmanager.heap.size: 262144k",
fmt.Sprintf("jobmanager.rpc.port: %d", RPCDefaultPort),
fmt.Sprintf("jobmanager.web.port: %d", UIDefaultPort),
fmt.Sprintf("metrics.internal.query-service.port: %d", MetricsQueryDefaultPort),
fmt.Sprintf("query.server.port: %d", QueryDefaultPort),
"taskmanager.heap.size: 1677721k",
"taskmanager.heap.size: 1048576k",
fmt.Sprintf("taskmanager.numberOfTaskSlots: %d", TaskManagerDefaultSlots),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "371961d2"
const testAppHash = "752c76d3"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
hash := "10a41c95"
hash := "5e7c7283"
app.Annotations = common.DuplicateMap(annotations)
expectedLabels := map[string]string{
"flink-app": "app-name",
Expand Down Expand Up @@ -105,10 +105,10 @@ func TestJobManagerCreateSuccess(t *testing.T) {
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 2516582k\n"+
assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 838860k\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
hash := "a860c62b"
hash := "52623ded"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand All @@ -187,10 +187,10 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 2516582k\n"+
assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 838860k\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: $HOST_IP\n",
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestJobManagerSecurityContextAssignment(t *testing.T) {
RunAsNonRoot: &runAsNonRoot,
}

hash := "26ca0a3a"
hash := "c06b960b"

ctr := 0
mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster)
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestJobManagerCreateSuccessWithVersion(t *testing.T) {
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = common.DuplicateMap(annotations)
hash := "6f67fe75"
hash := "5cb5943e"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand All @@ -401,10 +401,10 @@ func TestJobManagerCreateSuccessWithVersion(t *testing.T) {
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 2516582k\n"+
assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 838860k\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
Expand Down
Loading

0 comments on commit b5eb196

Please sign in to comment.