Skip to content

Commit

Permalink
support postfilter
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed Oct 28, 2023
1 parent 6708ed4 commit c94d7e2
Show file tree
Hide file tree
Showing 20 changed files with 531 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ guest/.tinygo-target.json: scheduler/go.mod
build-wat: $(wildcard scheduler/test/testdata/*/*.wat)
@for f in $^; do \
wasm=$$(echo $$f | sed -e 's/\.wat/\.wasm/'); \
wat2wasm -o $$wasm --debug-names $$f; \
wat2wasm $$f -o $$wasm --debug-names $$f; \
done

.PHONY: testdata
Expand Down
10 changes: 10 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ type FilterPlugin interface {
Filter(state CycleState, pod proto.Pod, nodeInfo NodeInfo) *Status
}

// TODO: fix
// PostFilterPlugin is a WebAssembly implementation of framework.PostFilterPlugin.
type PostFilterPlugin interface {
Plugin

PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatusMap) (nominatedNodeName string, nominatingMode int32, status *Status)
}

// EnqueueExtensions is a WebAssembly implementation of framework.EnqueueExtensions.
type EnqueueExtensions interface {
EventsToRegister() []ClusterEvent
Expand Down Expand Up @@ -97,3 +105,5 @@ type NodeInfo interface {

Node() proto.Node
}

type NodeToStatusMap map[string]*Status
22 changes: 22 additions & 0 deletions guest/internal/postfilter/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

//go:wasmimport k8s.io/scheduler result.nominated_node_name
func setNominatedNodeNameResult(ptr, size uint32)
22 changes: 22 additions & 0 deletions guest/internal/postfilter/imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build !tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

// setNominatedNodeNameResult is stubbed for compilation outside TinyGo.
func setNominatedNodeNameResult(uint32, uint32) {}
74 changes: 74 additions & 0 deletions guest/internal/postfilter/postfilter_exports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postfilter is defined internally so that it can export Pod as
// cyclestate.Pod, without circular dependencies or exporting it publicly.
package postfilter

import (
"runtime"
"unsafe"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// TODO: should postfilter be defined internally?
// postfilter is the current plugin assigned with SetPlugin.
var postfilter api.PostFilterPlugin

// SetPlugin is exposed to prevent package cycles.
func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
if postfilterPlugin == nil {
panic("nil postfilterPlugin")
}
postfilter = postfilterPlugin
plugin.MustSet(postfilterPlugin)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint64 = _postfilter

// _postfilter is only exported to the host.
//
//export postfilter
func _postfilter() uint64 { //nolint
// This function begins a new scheduling cycle: zero out any cycle state.
currentPod = nil
currentCycleState = map[string]any{}
currentNodeToStatusMap = map[string]any{}

if postfilter == nil { // Then, the user didn't define one.
// Unlike most plugins we always export postfilter so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
nominatedNodeName, nominatingMode, status := postfilter.PostFilter(CycleState, Pod, nil)

cString := []byte(nominatedNodeName)
if cString != nil {
ptr := uint32(uintptr(unsafe.Pointer(&cString[0])))
size := uint32(len(cString))
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(cString) // until ptr is no longer needed.
}

return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status))
}
72 changes: 72 additions & 0 deletions guest/internal/postfilter/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package postfilter

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
)

// Pod is exposed for the cyclestate package.
var Pod proto.Pod = pod{}

// CycleState is exposed for the cyclestate package.
var CycleState api.CycleState = cycleState{}

var currentCycleState = map[string]any{}

var currentNodeToStatusMap = map[string]any{}

type cycleState struct{}

func (cycleState) Read(key string) (val any, ok bool) {
val, ok = currentCycleState[key]
return
}

func (cycleState) Write(key string, val any) {
currentCycleState[key] = val
}

func (cycleState) Delete(key string) {
delete(currentCycleState, key)
}

type pod struct{}

func (pod) GetName() string {
return internalproto.GetName(lazyPod())
}

func (pod) GetNamespace() string {
return internalproto.GetNamespace(lazyPod())
}

func (pod) GetUid() string {
return internalproto.GetUid(lazyPod())
}

func (pod) Spec() *protoapi.PodSpec {
return lazyPod().Spec
}

func (pod) Status() *protoapi.PodStatus {
return lazyPod().Status
}

var currentPod *protoapi.Pod

// lazyPod lazy initializes currentPod from imports.Pod.
func lazyPod() *protoapi.Pod {
if pod := currentPod; pod != nil {
return pod
}

var msg protoapi.Pod
if err := imports.Pod(msg.UnmarshalVT); err != nil {
panic(err.Error())
}
currentPod = &msg
return currentPod
}
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
Expand Down Expand Up @@ -51,6 +52,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(plugin)
}
Expand Down
53 changes: 53 additions & 0 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postfilter exports an api.PostFilterPlugin to the host.
package postfilter

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
internalpostfilter "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/postfilter"
)

// TODO fix explanation
// SetPlugin should be called in `main` to assign an api.PostFilterPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := filterPlugin{}
// prefilter.SetPlugin(plugin)
// filter.SetPlugin(plugin)
// }
//
// type filterPlugin struct{}
//
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod, nodeList proto.NodeList) {
// // Write state you need on Filter
// }
//
// func (filterPlugin) Filter(state api.CycleState, pod api.Pod, nodeInfo api.NodeInfo) (status *api.Status) {
// var Filter int32
// // Derive Filter for the node name using state set on PreFilter!
// return Filter, nil
// }
//
// Note: This may be set without filter.SetPlugin, if the pre-filter plugin has
// the only filtering logic, or only used to configure api.CycleState.
func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
internalpostfilter.SetPlugin(postfilterPlugin)
}
70 changes: 48 additions & 22 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@ import (
)

const (
guestExportMemory = "memory"
guestExportEnqueue = "enqueue"
guestExportPreFilter = "prefilter"
guestExportFilter = "filter"
guestExportPreScore = "prescore"
guestExportScore = "score"
guestExportMemory = "memory"
guestExportEnqueue = "enqueue"
guestExportPreFilter = "prefilter"
guestExportFilter = "filter"
guestExportPostFilter = "postfilter"
guestExportPreScore = "prescore"
guestExportScore = "score"
)

type guest struct {
guest wazeroapi.Module
out *bytes.Buffer
enqueueFn wazeroapi.Function
prefilterFn wazeroapi.Function
filterFn wazeroapi.Function
prescoreFn wazeroapi.Function
scoreFn wazeroapi.Function
callStack []uint64
guest wazeroapi.Module
out *bytes.Buffer
enqueueFn wazeroapi.Function
prefilterFn wazeroapi.Function
filterFn wazeroapi.Function
postfilterFn wazeroapi.Function
prescoreFn wazeroapi.Function
scoreFn wazeroapi.Function
callStack []uint64
}

func compileGuest(ctx context.Context, runtime wazero.Runtime, guestBin []byte) (guest wazero.CompiledModule, err error) {
Expand Down Expand Up @@ -82,14 +84,15 @@ func (pl *wasmPlugin) newGuest(ctx context.Context) (*guest, error) {
callStack := make([]uint64, 1)

return &guest{
guest: g,
out: &out,
enqueueFn: g.ExportedFunction(guestExportEnqueue),
prefilterFn: g.ExportedFunction(guestExportPreFilter),
filterFn: g.ExportedFunction(guestExportFilter),
prescoreFn: g.ExportedFunction(guestExportPreScore),
scoreFn: g.ExportedFunction(guestExportScore),
callStack: callStack,
guest: g,
out: &out,
enqueueFn: g.ExportedFunction(guestExportEnqueue),
prefilterFn: g.ExportedFunction(guestExportPreFilter),
filterFn: g.ExportedFunction(guestExportFilter),
postfilterFn: g.ExportedFunction(guestExportPostFilter),
prescoreFn: g.ExportedFunction(guestExportPreScore),
scoreFn: g.ExportedFunction(guestExportScore),
callStack: callStack,
}, nil
}

Expand Down Expand Up @@ -131,6 +134,24 @@ func (g *guest) filter(ctx context.Context) *framework.Status {
return framework.NewStatus(framework.Code(statusCode), statusReason)
}

// postFilter calls guestExportPostFilter.
func (g *guest) postFilter(ctx context.Context) (*framework.PostFilterResult, *framework.Status) {
defer g.out.Reset()
callStack := g.callStack

if err := g.postfilterFn.CallWithStack(ctx, callStack); err != nil {
return nil, framework.AsStatus(decorateError(g.out, guestExportPostFilter, err))
}
nominatedNodeName := paramsFromContext(ctx).resultNominatedNodeName
nominatingMode := framework.NominatingMode(int32(callStack[0] >> 32))

statusCode := int32(callStack[0])
statusReason := paramsFromContext(ctx).resultStatusReason

nominatingInfo := &framework.NominatingInfo{NominatedNodeName: nominatedNodeName, NominatingMode: nominatingMode}
return &framework.PostFilterResult{NominatingInfo: nominatingInfo}, framework.NewStatus(framework.Code(statusCode), statusReason)
}

// preScore calls guestExportPreScore.
func (g *guest) preScore(ctx context.Context) *framework.Status {
defer g.out.Reset()
Expand Down Expand Up @@ -188,6 +209,11 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
}
e |= iFilterPlugin
case guestExportPostFilter:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i64}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i64)", name)
}
e |= iPostFilterPlugin
case guestExportPreScore:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
Expand Down
Loading

0 comments on commit c94d7e2

Please sign in to comment.