Skip to content

Commit

Permalink
Consolidate logger to model agent sidecar and add readiness probe to …
Browse files Browse the repository at this point in the history
…model container (kubeflow#1223)

* Add batcher docker publisher

* logger readiness probe

* Consolidate loggger to agent

* Inject logger to agent

* Update to golang 1.14

* Inject agent when logger is specified

* Fix port

* Add readiness probe when injecting logger

* Enable logger test

* Fix logger and agent tests

* Remove logger build

* Consolidate files

* Add dispatcher in test

* Add cloud event check test

* Fix agent image

* Add debugging

* Use a non-common port number
  • Loading branch information
yuzisun authored Dec 3, 2020
1 parent a5a434b commit 45ac80f
Show file tree
Hide file tree
Showing 27 changed files with 865 additions and 588 deletions.
76 changes: 76 additions & 0 deletions .github/workflows/batcher-docker-publisher.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Batcher Docker Publisher

on:
push:
# Publish `master` as Docker `latest` image.
branches:
- master

# Publish `v1.2.3` tags as releases.
tags:
- v*

# Run tests for any PRs.
pull_request:

env:
IMAGE_NAME: batcher

jobs:
# Run tests.
# See also https://docs.docker.com/docker-hub/builds/automated-testing/
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Run tests
run: |
if [ -f docker-compose.test.yml ]; then
docker-compose --file docker-compose.test.yml build
docker-compose --file docker-compose.test.yml run sut
else
docker build . --file batcher.Dockerfile
fi
# Push image to GitHub Packages.
# See also https://docs.docker.com/docker-hub/builds/
push:
# Ensure test job passes before pushing image.
needs: test

runs-on: ubuntu-latest
if: github.event_name == 'push'

steps:
- uses: actions/checkout@v2

- name: Build image
run: docker build . --file batcher.Dockerfile --tag $IMAGE_NAME

- name: Log into registry
run: docker login -u ${{ secrets.DOCKER_USER }} -p ${{ secrets.DOCKER_PASSWORD }}

- name: Push image
run: |
IMAGE_ID=kfserving/$IMAGE_NAME
# Change all uppercase to lowercase
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
# Strip git ref prefix from version
VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
# Strip "v" prefix from tag name
# [[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//')
# Use Docker `latest` tag convention
[ "$VERSION" == "master" ] && VERSION=latest
echo IMAGE_ID=$IMAGE_ID
echo VERSION=$VERSION
docker tag $IMAGE_NAME $IMAGE_ID:$VERSION
docker push $IMAGE_ID:$VERSION
7 changes: 4 additions & 3 deletions agent.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Build the inference-agent binary
FROM golang:1.13.0 as builder
FROM golang:1.14.0 as builder

# Copy in the go src
WORKDIR /go/src/github.com/kubeflow/kfserving
COPY pkg/ pkg/
COPY cmd/ cmd/
COPY go.mod go.mod
COPY go.sum go.sum

RUN go mod download

COPY pkg/ pkg/
COPY cmd/ cmd/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o agent ./cmd/agent

Expand Down
179 changes: 179 additions & 0 deletions cmd/agent/execprobe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"

"k8s.io/apimachinery/pkg/util/wait"
network "knative.dev/networking/pkg"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/queue/health"
"knative.dev/serving/pkg/queue/readiness"
)

const (
healthURLPrefix = "http://127.0.0.1:"

// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
aggressivePollInterval = 25 * time.Millisecond

badProbeTemplate = "unexpected probe header value: "
)

// As well as running as a long-running proxy server, the Queue Proxy can be
// run as an exec probe if the `--probe-period` flag is passed.
//
// In this mode, the exec probe (repeatedly) sends an HTTP request to the Queue
// Proxy server with a Probe header. The handler for this probe request
// (knativeProbeHandler) then performs the actual health check against the user
// container. The exec probe binary exits 0 (success) if it eventually gets a
// 200 status code back from the knativeProbeHandler, or 1 (fail) if it never
// does.
//
// The reason we use an exec probe to hit an HTTP endpoint on the Queue
// Proxy to then run the actual probe against the user container rather than
// using an HTTP probe directly is because an exec probe can be launched
// immediately after the container starts, whereas an HTTP probe would
// initially race with the server starting up and fail. The minimum retry
// period after this failure in upstream kubernetes is a second. The exec
// probe, on the other hand, automatically polls on an aggressivePollInterval
// until the HTTP endpoint responds with success. This allows us to get an
// initial readiness result much faster than the effective upstream Kubernetes
// minimum of 1 second.
func standaloneProbeMain(timeout time.Duration, transport http.RoundTripper, port string) (exitCode int) {
servingPort, err := strconv.ParseUint(port, 10, 16 /*ports are 16 bit*/)
if err != nil {
fmt.Fprintln(os.Stderr, "parse queue port:", err)
return 1
}
if servingPort == 0 {
fmt.Fprintln(os.Stderr, "port must be a positive value, got 0")
return 1
}

if timeout == 0 {
timeout = readiness.PollTimeout
}

if err := probeQueueHealthPath(timeout, int(servingPort), transport); err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}

return 0
}

func probeQueueHealthPath(timeout time.Duration, servingPort int, transport http.RoundTripper) error {
url := healthURLPrefix + strconv.Itoa(servingPort)

req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("probe failed: error creating request: %w", err)
}

// Add the header to indicate this is a probe request.
req.Header.Add(network.ProbeHeaderName, queue.Name)
req.Header.Add(network.UserAgentKey, network.QueueProxyUserAgent)

httpClient := &http.Client{
Timeout: timeout,
Transport: transport,
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var (
lastErr error
res *http.Response
)
// Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first
// invocation of conditionFunc, it exits immediately without trying for a second time.
timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) {
res, lastErr = httpClient.Do(req)
if lastErr != nil {
// Return nil error for retrying.
return false, nil
}

defer func() {
// Ensure body is read and closed to ensure connection can be re-used via keep-alive.
// No point handling errors here, connection just won't be reused.
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
}()

// Fail readiness immediately rather than retrying if we get a header indicating we're shutting down.
if health.IsHTTPProbeShuttingDown(res) {
lastErr = errors.New("failing probe deliberately for shutdown")
return false, lastErr
}
return health.IsHTTPProbeReady(res), nil
}, ctx.Done())

if lastErr != nil {
return fmt.Errorf("failed to probe: %w", lastErr)
}

// An http.StatusOK was never returned during probing.
if timeoutErr != nil {
return errors.New("probe returned not ready")
}

return nil
}

// ProbeHandler returns a http.HandlerFunc that responds to health checks if the
// knative network probe header is passed, and otherwise delegates to the next handler.
func ProbeHandler(healthState *health.State, prober func() bool, isAggressive bool, tracingEnabled bool, next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ph := network.KnativeProbeHeader(r)

if ph == "" {
next.ServeHTTP(w, r)
return
}

if ph != queue.Name {
http.Error(w, badProbeTemplate+ph, http.StatusBadRequest)
return
}

if prober == nil {
http.Error(w, "no probe", http.StatusInternalServerError)
return
}

healthState.HandleHealthProbe(func() bool {
if !prober() {
return false
}
return true
}, isAggressive, w)
}
}
Loading

0 comments on commit 45ac80f

Please sign in to comment.