Skip to content

Commit

Permalink
Refactor the agent
Browse files Browse the repository at this point in the history
This commit broadly refactors the agent to:
- use kube Controller construct
- take advantage of Server Side Apply
- prepare for resource split and CDI+DRA
- don't put everything under a util directory
- use closer to kube upstream kube client
- update proto definitions for device plugins
- use kubelet pod resources monitoring interface rather than CRI to do
  slot reconciliation
- Use CRD definition in Rust code to generate yaml file

Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed Dec 28, 2023
1 parent a0a173c commit 8b30652
Show file tree
Hide file tree
Showing 61 changed files with 5,385 additions and 9,289 deletions.
760 changes: 601 additions & 159 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ blake2 = "0.9.0"
env_logger = "0.10.0"
futures = { version = "0.3.1", package = "futures" }
hyper = "0.14.2"
k8s-openapi = { version = "0.17.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.80.0", features = ["derive"] }
kube-runtime = "0.80.0"
itertools = "0.12.0"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = { version = "0.87.1", features = ["unstable-runtime-reconcile-on"] }
lazy_static = "1.4"
log = "0.4"
mockall_double = "0.2.0"
Expand All @@ -34,8 +35,9 @@ serde = "1.0.104"
serde_derive = "1.0.104"
serde_json = "1.0.45"
serde_yaml = { version = "0.8.11", optional = true }
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] }
tokio-stream = { version = "0.1", features = ["net"] }
thiserror = "1.0.50"
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tonic = "0.5.2"
tower = "0.4.8"

Expand Down
12 changes: 10 additions & 2 deletions agent/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
fn main() {
tonic_build::configure()
.build_client(true)
.out_dir("./src/util")
.compile(&["./proto/pluginapi.proto"], &["./proto"])
.out_dir("./src/plugin_manager")
.compile(
&["./proto/pluginapi.proto", "./proto/podresources.proto"],
&["./proto"],
)
.expect("failed to compile protos");
tonic_build::configure()
.build_client(false)
.out_dir("./src/plugin_manager")
.compile(&["./proto/pluginregistration.proto"], &["./proto"])
.expect("failed to compile protos");
}
352 changes: 200 additions & 152 deletions agent/proto/pluginapi.proto
Original file line number Diff line number Diff line change
@@ -1,152 +1,200 @@
syntax = 'proto3';

package v1beta1;


// Registration is the service advertised by the Kubelet
// Only when Kubelet answers with a success code to a Register Request
// may Device Plugins start their service
// Registration may fail when device plugin version is not supported by
// Kubelet or the registered resourceName is already taken by another
// active device plugin. Device plugin is expected to terminate upon registration failure
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}

message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
}

message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}

message Empty {
}

// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
message ListAndWatchResponse {
repeated Device devices = 1;
}

/* E.g:
* struct Device {
* ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
* State: "Healthy",
*} */
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
}

// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}

// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}

// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}

// Mount specifies a host volume to mount into a container.
// where device library or tools are installed on host and container
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}


syntax = "proto3";

package v1beta1;


// Registration is the service advertised by the Kubelet
// Only when Kubelet answers with a success code to a Register Request
// may Device Plugins start their service
// Registration may fail when device plugin version is not supported by
// Kubelet or the registered resourceName is already taken by another
// active device plugin. Device plugin is expected to terminate upon registration failure
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}

message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
// Indicates if GetPreferredAllocation is implemented and available for calling
bool get_preferred_allocation_available = 2;
}

message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}

message Empty {
}

// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
rpc GetPreferredAllocation(PreferredAllocationRequest) returns (PreferredAllocationResponse) {}

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
message ListAndWatchResponse {
repeated Device devices = 1;
}

message TopologyInfo {
repeated NUMANode nodes = 1;
}

message NUMANode {
int64 ID = 1;
}

/* E.g:
* struct Device {
* ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
* Health: "Healthy",
* Topology:
* Node:
* ID: 1
*} */
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
// Topology for device
TopologyInfo topology = 3;
}

// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}

// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}

// PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
// at pod admission time. The device plugin should take the list of
// `available_deviceIDs` and calculate a preferred allocation of size
// 'allocation_size' from them, making sure to include the set of devices
// listed in 'must_include_deviceIDs'.
message PreferredAllocationRequest {
repeated ContainerPreferredAllocationRequest container_requests = 1;
}

message ContainerPreferredAllocationRequest {
// List of available deviceIDs from which to choose a preferred allocation
repeated string available_deviceIDs = 1;
// List of deviceIDs that must be included in the preferred allocation
repeated string must_include_deviceIDs = 2;
// Number of devices to include in the preferred allocation
int32 allocation_size = 3;
}

// PreferredAllocationResponse returns a preferred allocation,
// resulting from a PreferredAllocationRequest.
message PreferredAllocationResponse {
repeated ContainerPreferredAllocationResponse container_responses = 1;
}

message ContainerPreferredAllocationResponse {
repeated string deviceIDs = 1;
}

// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}

// Mount specifies a host volume to mount into a container.
// where device library or tools are installed on host and container
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}
Loading

0 comments on commit 8b30652

Please sign in to comment.