Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Termination Grace Period to write_timeout for functions to allow them to complete during a scale down event. #869

Merged
merged 3 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.PHONY: build local push namespaces install charts start-kind stop-kind build-buildx render-charts
TAG?=latest
OWNER?=openfaas
SERVER?=ghcr.io
export DOCKER_CLI_EXPERIMENTAL=enabled

TOOLS_DIR := .tools
Expand All @@ -24,16 +25,16 @@ local:

build-docker:
docker build \
-t ghcr.io/$(OWNER)/faas-netes:$(TAG) .
-t $(SERVER)/$(OWNER)/faas-netes:$(TAG) .

.PHONY: build-buildx
build-buildx:
@echo ghcr.io/$(OWNER)/faas-netes:$(TAG) && \
@echo $(SERVER)/$(OWNER)/faas-netes:$(TAG) && \
docker buildx create --use --name=multiarch --node=multiarch && \
docker buildx build \
--push \
--platform linux/amd64 \
--tag ghcr.io/$(OWNER)/faas-netes:$(TAG) \
--tag $(SERVER)/$(OWNER)/faas-netes:$(TAG) \
.

.PHONY: build-buildx-all
Expand All @@ -42,21 +43,21 @@ build-buildx-all:
docker buildx build \
--platform linux/amd64,linux/arm/v7,linux/arm64 \
--output "type=image,push=false" \
--tag ghcr.io/$(OWNER)/faas-netes:$(TAG) \
--tag $(SERVER)/$(OWNER)/faas-netes:$(TAG) \
.

.PHONY: publish-buildx-all
publish-buildx-all:
@echo ghcr.io/$(OWNER)/faas-netes:$(TAG) && \
@echo $(SERVER)/$(OWNER)/faas-netes:$(TAG) && \
docker buildx create --use --name=multiarch --node=multiarch && \
docker buildx build \
--platform linux/amd64,linux/arm/v7,linux/arm64 \
--push=true \
--tag ghcr.io/$(OWNER)/faas-netes:$(TAG) \
--tag $(SERVER)/$(OWNER)/faas-netes:$(TAG) \
.

push:
docker push ghcr.io/$(OWNER)/faas-netes:$(TAG)
docker push $(SERVER)/$(OWNER)/faas-netes:$(TAG)

charts:
cd chart && helm package openfaas/ && helm package kafka-connector/ && helm package cron-connector/ && helm package nats-connector/ && helm package mqtt-connector/ && helm package pro-builder/
Expand Down
23 changes: 22 additions & 1 deletion pkg/controller/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"strings"
"time"

"github.com/google/go-cmp/cmp"
faasv1 "github.com/openfaas/faas-netes/pkg/apis/openfaas/v1"
Expand Down Expand Up @@ -56,6 +57,25 @@ func newDeployment(
}
}

// add 2s jitter to avoid a race condition between write_timeout and grace period
jitter := time.Second * 2
terminationGracePeriod := time.Second*30 + jitter

if function.Spec.Environment != nil {
e := *function.Spec.Environment
if v, ok := e["write_timeout"]; ok && len(v) > 0 {
period, err := time.ParseDuration(v)
if err != nil {
glog.Warningf("Function %s failed to parse write_timeout: %s",
function.Spec.Name, err.Error())
}

terminationGracePeriod = period + jitter
}
}

terminationGracePeriodSeconds := int64(terminationGracePeriod.Seconds())

allowPrivilegeEscalation := false

deploymentSpec := &appsv1.Deployment{
Expand Down Expand Up @@ -99,7 +119,8 @@ func newDeployment(
Annotations: annotations,
},
Spec: corev1.PodSpec{
NodeSelector: nodeSelector,
NodeSelector: nodeSelector,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []corev1.Container{
{
Name: function.Spec.Name,
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,67 @@ import (
"k8s.io/client-go/kubernetes/fake"
)

func Test_GracePeriodFromWriteTimeout(t *testing.T) {

scenarios := []struct {
name string
wantSeconds int64
envs map[string]string
}{
{"grace period is the default", 32, map[string]string{}},
{"grace period is set from write_timeout", 62, map[string]string{"write_timeout": "60s"}},
}

for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {

want := int64(s.wantSeconds)
function := &faasv1.Function{
ObjectMeta: metav1.ObjectMeta{
Name: "alpine",
},
Spec: faasv1.FunctionSpec{
Name: "alpine",
Image: "ghcr.io/openfaas/alpine:latest",
Annotations: &map[string]string{},
ReadOnlyRootFilesystem: true,
Environment: &s.envs},
}

factory := NewFunctionFactory(fake.NewSimpleClientset(),
k8s.DeploymentConfig{
HTTPProbe: false,
SetNonRootUser: true,
LivenessProbe: &k8s.ProbeConfig{
PeriodSeconds: 1,
TimeoutSeconds: 3,
InitialDelaySeconds: 0,
},
ReadinessProbe: &k8s.ProbeConfig{
PeriodSeconds: 1,
TimeoutSeconds: 3,
InitialDelaySeconds: 0,
},
})

secrets := map[string]*corev1.Secret{}

deployment := newDeployment(function, nil, secrets, factory)
got := deployment.Spec.Template.Spec.TerminationGracePeriodSeconds
if got == nil {
t.Errorf("TerminationGracePeriodSeconds not set, but want %d", want)
t.Fail()
return
}

if want != *got {
t.Errorf("TerminationGracePeriodSeconds want %d, but got %d", want, got)
t.Fail()
}
})
}
}

func Test_newDeployment(t *testing.T) {
function := &faasv1.Function{
ObjectMeta: metav1.ObjectMeta{
Expand Down
24 changes: 23 additions & 1 deletion pkg/handlers/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/openfaas/faas-netes/pkg/k8s"

Expand All @@ -24,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
glog "k8s.io/klog"
)

// initialReplicasCount how many replicas to start of creating for a function
Expand Down Expand Up @@ -172,6 +174,24 @@ func makeDeploymentSpec(request types.FunctionDeployment, existingSecrets map[st
return nil, err
}

// add 2s jitter to avoid a race condition between write_timeout and grace period
jitter := time.Second * 2
terminationGracePeriod := time.Second*30 + jitter

if request.EnvVars != nil {
if v, ok := request.EnvVars["write_timeout"]; ok && len(v) > 0 {
period, err := time.ParseDuration(v)
if err != nil {
glog.Warningf("Function %s failed to parse write_timeout: %s",
request.Service, err.Error())
}

terminationGracePeriod = period + jitter
}
}

terminationGracePeriodSeconds := int64(terminationGracePeriod.Seconds())

enableServiceLinks := false
allowPrivilegeEscalation := false

Expand Down Expand Up @@ -211,7 +231,9 @@ func makeDeploymentSpec(request types.FunctionDeployment, existingSecrets map[st
Annotations: annotations,
},
Spec: apiv1.PodSpec{
NodeSelector: nodeSelector,
NodeSelector: nodeSelector,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,

Containers: []apiv1.Container{
{
Name: request.Service,
Expand Down
41 changes: 40 additions & 1 deletion pkg/handlers/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,46 @@ import (
apiv1 "k8s.io/api/core/v1"
)

func Test_GracePeriodFromWriteTimeout(t *testing.T) {

scenarios := []struct {
name string
wantSeconds int64
envs map[string]string
}{
{"grace period is the default", 32, map[string]string{}},
{"grace period is set from write_timeout", 62, map[string]string{"write_timeout": "60s"}},
}

for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
request := types.FunctionDeployment{Service: "testfunc", Image: "ghcr.io/openfaas/alpine:latest"}
factory := k8s.NewFunctionFactory(fake.NewSimpleClientset(), k8s.DeploymentConfig{
LivenessProbe: &k8s.ProbeConfig{},
ReadinessProbe: &k8s.ProbeConfig{},
SetNonRootUser: false,
}, nil)

request.EnvVars = s.envs
deployment, err := makeDeploymentSpec(request, map[string]*apiv1.Secret{}, factory)
if err != nil {
t.Errorf("unexpected makeDeploymentSpec error: %s", err.Error())
}
want := s.wantSeconds
got := deployment.Spec.Template.Spec.TerminationGracePeriodSeconds

if got == nil {
t.Fatalf("want: %d, got: nil", want)
}

if *got != want {
t.Errorf("TerminationGracePeriodSeconds want: %d, got: %d", want, *got)
}

})
}
}

func Test_buildAnnotations_Empty_In_CreateRequest(t *testing.T) {
request := types.FunctionDeployment{}

Expand Down Expand Up @@ -113,7 +153,6 @@ func Test_SetNonRootUser(t *testing.T) {
}
})
}

}

func Test_buildEnvVars_NoSortedKeys(t *testing.T) {
Expand Down