Skip to content

Commit

Permalink
Merge pull request #554 from BSWANG/main
Browse files Browse the repository at this point in the history
support erdma
  • Loading branch information
l1b0k authored Jan 22, 2024
2 parents 3a329ef + 759f34c commit 301c557
Show file tree
Hide file tree
Showing 29 changed files with 330 additions and 78 deletions.
7 changes: 7 additions & 0 deletions daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func getPoolConfig(cfg *daemon.Config, daemonMode string, limit *instance.Limits
}

poolConfig.MaxIPPerENI = 1
if cfg.EnableERDMA {
poolConfig.ERdmaCapacity = limit.ERdmaAdapters
}
case daemon.ModeENIMultiIP:
maxENI = limit.Adapters
maxENI = int(float64(maxENI)*cfg.EniCapRatio) + cfg.EniCapShift - 1
Expand Down Expand Up @@ -112,6 +115,10 @@ func getPoolConfig(cfg *daemon.Config, daemonMode string, limit *instance.Limits
maxMemberENI = limit.MemberAdapterLimit

poolConfig.MaxIPPerENI = ipPerENI

if cfg.EnableERDMA {
poolConfig.ERdmaCapacity = limit.ERdmaAdapters * limit.IPv4PerAdapter
}
}

poolConfig.ENITags = cfg.ENITags
Expand Down
107 changes: 97 additions & 10 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/AliyunContainerService/terway/deviceplugin"
"github.com/AliyunContainerService/terway/pkg/aliyun/client"
"github.com/AliyunContainerService/terway/pkg/aliyun/credential"
Expand All @@ -31,14 +33,14 @@ import (
"github.com/AliyunContainerService/terway/types"
"github.com/AliyunContainerService/terway/types/daemon"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
k8sErr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/retry"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
Expand Down Expand Up @@ -185,7 +187,9 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}

if pod.ERdma {
req.LocalIPType = eni.LocalIPTypeERDMA
}
if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENIIP)) == 1 {
old := oldRes.GetResourceItemByType(daemon.ResourceTypeENIIP)[0]

Expand Down Expand Up @@ -839,6 +843,22 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
config.EnableENITrunking = false
}

if config.EnableERDMA {
if limit.ERDMARes() <= 0 {
serviceLog.Info("instance is not support erdma", "instanceType", config.InstanceType)
config.EnableERDMA = false
} else {
ok, err := utils.OSSupportERDMA()
if err != nil {
return nil, err
}
if !ok {
config.EnableERDMA = false
serviceLog.Info("os is not support erdma")
}
}
}

netSrv.resourceDB, err = storage.NewDiskStorage(
resDBName, utils.NormalizePath(resDBPath), json.Marshal, func(bytes []byte) (interface{}, error) {
resourceRel := &daemon.PodResources{}
Expand Down Expand Up @@ -904,6 +924,10 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
found := false
for _, eni := range enis {
if eni.Trunk && eni.ID == preferTrunkID {
if eni.ERdma {
serviceLog.Info("erdma eni on trunk mode, disable erdma")
config.EnableERDMA = false
}
found = true

poolConfig.TrunkENIID = preferTrunkID
Expand Down Expand Up @@ -955,6 +979,30 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
nodeAnnotations[string(types.NormalIPTypeIPs)] = strconv.Itoa(poolConfig.Capacity)
}

attached, err := factory.GetAttachedNetworkInterface(poolConfig.TrunkENIID)
if err != nil {
return nil, err
}
if len(attached) >= limit.Adapters-limit.ERdmaAdapters {
if attachedERdma := lo.Filter(attached, func(ni *daemon.ENI, idx int) bool { return ni.ERdma }); len(attachedERdma)+limit.Adapters-len(attached) < limit.ERDMARes() {
serviceLog.Info("node has no enough free eni slot to attach more erdma to achieve erdma res: ", limit.ERDMARes())
config.EnableERDMA = false
}
}

if config.EnableERDMA {
if daemonMode == daemon.ModeENIMultiIP {
nodeAnnotations[string(types.NormalIPTypeIPs)] = strconv.Itoa(poolConfig.Capacity - limit.ERDMARes())
nodeAnnotations[string(types.ERDMAIPTypeIPs)] = strconv.Itoa(limit.ERDMARes())
poolConfig.ERdmaCapacity = limit.ERDMARes()
} else if daemonMode == daemon.ModeENIOnly {
nodeAnnotations[string(types.NormalIPTypeIPs)] = strconv.Itoa(poolConfig.Capacity - limit.ExclusiveERDMARes())
nodeAnnotations[string(types.ERDMAIPTypeIPs)] = strconv.Itoa(limit.ExclusiveERDMARes())
poolConfig.ERdmaCapacity = limit.ExclusiveERDMARes()
}

}

if !(daemonMode == daemon.ModeENIMultiIP && !config.EnableENITrunking) {
if !config.DisableDevicePlugin {
res := deviceplugin.ENITypeENI
Expand All @@ -969,6 +1017,17 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
}
}

if config.EnableERDMA {
if !config.DisableDevicePlugin {
res := deviceplugin.ENITypeERDMA
capacity := poolConfig.ERdmaCapacity
if capacity > 0 {
dp := deviceplugin.NewENIDevicePlugin(capacity, res)
go dp.Serve()
}
}
}

// ensure node annotations
err = netSrv.k8s.PatchNodeAnnotations(nodeAnnotations)
if err != nil {
Expand Down Expand Up @@ -1013,11 +1072,6 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (

var eniList []eni.NetworkInterface

attached, err := factory.GetAttachedNetworkInterface(poolConfig.TrunkENIID)
if err != nil {
return nil, err
}

if daemonMode == daemon.ModeVPC {
eniList = append(eniList, &eni.Veth{})
}
Expand All @@ -1036,26 +1090,59 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
}
}
} else {
var (
normalENICount int
erdmaENICount int
)
// the legacy mode
for _, ni := range attached {
eniList = append(eniList, eni.NewLocal(ni, "secondary", factory, poolConfig))
if config.EnableERDMA && ni.ERdma {
erdmaENICount++
eniList = append(eniList, eni.NewLocal(ni, "erdma", factory, poolConfig))
} else {
normalENICount++
eniList = append(eniList, eni.NewLocal(ni, "secondary", factory, poolConfig))
}
}
normalENINeeded := poolConfig.MaxENI - normalENICount
if config.EnableERDMA {
normalENINeeded = poolConfig.MaxENI - limit.ERdmaAdapters - normalENICount
for i := 0; i < limit.ERdmaAdapters-erdmaENICount; i++ {
eniList = append(eniList, eni.NewLocal(nil, "erdma", factory, poolConfig))
}
}

for i := 0; i < (poolConfig.MaxENI - len(attached)); i++ {
for i := 0; i < normalENINeeded; i++ {
eniList = append(eniList, eni.NewLocal(nil, "secondary", factory, poolConfig))
}
}
} else {
var (
normalENICount int
erdmaENICount int
)
for _, ni := range attached {
serviceLog.V(5).Info("found attached eni", "eni", ni)
if config.EnableENITrunking && ni.Trunk && poolConfig.TrunkENIID == ni.ID {
lo := eni.NewLocal(ni, "trunk", factory, poolConfig)
eniList = append(eniList, eni.NewTrunk(netSrv.k8s.GetClient(), lo))
} else if config.EnableERDMA && ni.ERdma {
erdmaENICount++
eniList = append(eniList, eni.NewLocal(ni, "erdma", factory, poolConfig))
} else {
normalENICount++
eniList = append(eniList, eni.NewLocal(ni, "secondary", factory, poolConfig))
}
}
normalENINeeded := poolConfig.MaxENI - normalENICount
if config.EnableERDMA {
normalENINeeded = poolConfig.MaxENI - limit.ERdmaAdapters - normalENICount
for i := 0; i < limit.ERdmaAdapters-erdmaENICount; i++ {
eniList = append(eniList, eni.NewLocal(nil, "erdma", factory, poolConfig))
}
}

for i := 0; i < (poolConfig.MaxENI - len(attached)); i++ {
for i := 0; i < normalENINeeded; i++ {
eniList = append(eniList, eni.NewLocal(nil, "secondary", factory, poolConfig))
}
}
Expand Down
83 changes: 67 additions & 16 deletions deviceplugin/eni.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"syscall"
"time"

"github.com/samber/lo"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,10 +26,12 @@ import (
const (
ENITypeENI = "eni"
ENITypeMember = "member"
ENITypeERDMA = "erdma"

// ENIResName aliyun eni resource name in kubernetes container resource
ENIResName = "aliyun/eni"
MemberENIResName = "aliyun/member-eni"
ERDMAResName = "aliyun/erdma"
)

type eniRes struct {
Expand All @@ -39,23 +43,29 @@ type eniRes struct {
var eniMap = map[string]eniRes{
ENITypeENI: {
resName: ENIResName,
re: regexp.MustCompile("^.*" + "-eni.sock"),
sock: pluginapi.DevicePluginPath + "%d-" + "eni.sock",
re: regexp.MustCompile("^.*-eni.sock"),
sock: pluginapi.DevicePluginPath + "%d-eni.sock",
},
ENITypeMember: {
resName: MemberENIResName,
re: regexp.MustCompile("^.*" + "-member-eni.sock"),
sock: pluginapi.DevicePluginPath + "%d-" + "member-eni.sock",
re: regexp.MustCompile("^.*-member-eni.sock"),
sock: pluginapi.DevicePluginPath + "%d-member-eni.sock",
},
ENITypeERDMA: {
resName: ERDMAResName,
re: regexp.MustCompile("^.*-erdma-eni.sock"),
sock: pluginapi.DevicePluginPath + "%d-erdma-eni.sock",
},
}

// ENIDevicePlugin implements the Kubernetes device plugin API
type ENIDevicePlugin struct {
socket string
server *grpc.Server
count int
stop chan struct{}
eniRes eniRes
socket string
server *grpc.Server
count int
stop chan struct{}
eniRes eniRes
eniType string
sync.Locker
}

Expand All @@ -67,9 +77,10 @@ func NewENIDevicePlugin(count int, eniType string) *ENIDevicePlugin {
}
pluginEndpoint := fmt.Sprintf(res.sock, time.Now().Unix())
return &ENIDevicePlugin{
socket: pluginEndpoint,
count: count,
eniRes: res,
socket: pluginEndpoint,
count: count,
eniRes: res,
eniType: eniType,
}
}

Expand Down Expand Up @@ -203,11 +214,51 @@ func (m *ENIDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateReq
ContainerResponses: []*pluginapi.ContainerAllocateResponse{},
}

klog.Infof("Request Containers: %v", r.GetContainerRequests())
klog.Infof("Request Containers: %v, eniType: %v", r.GetContainerRequests(), m.eniType)
for range r.GetContainerRequests() {
response.ContainerResponses = append(response.ContainerResponses,
&pluginapi.ContainerAllocateResponse{},
)
var devices []*pluginapi.DeviceSpec
if m.eniType == ENITypeERDMA {
infinibandDevs, err := os.ReadDir("/dev/infiniband/")
if err != nil || len(infinibandDevs) == 0 {
if os.IsNotExist(err) {
// maybe first erdma to attach, there is no infiniband dev on device plugin allocate
devices = []*pluginapi.DeviceSpec{
{
ContainerPath: "/dev/infiniband/uverbs0",
HostPath: "/dev/infiniband/uverbs0",
Permissions: "rw",
},
{
ContainerPath: "/dev/infiniband/rdma_cm",
HostPath: "/dev/infiniband/rdma_cm",
Permissions: "rw",
},
}
} else {
return nil, fmt.Errorf("error read infiniband dir: %+v, please check the erdma driver", err)
}
} else {
devices = lo.FilterMap(infinibandDevs, func(v os.DirEntry, _ int) (*pluginapi.DeviceSpec, bool) {
if v.Type()&os.ModeDevice != 0 {
return &pluginapi.DeviceSpec{
ContainerPath: "/dev/infiniband/" + v.Name(),
HostPath: "/dev/infiniband/" + v.Name(),
Permissions: "rw",
}, true
}
return nil, false
})
}

response.ContainerResponses = append(response.ContainerResponses,
&pluginapi.ContainerAllocateResponse{
Devices: devices,
},
)
} else {
response.ContainerResponses = append(response.ContainerResponses,
&pluginapi.ContainerAllocateResponse{})
}
}

return &response, nil
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.7
github.com/pkg/errors v0.9.1
github.com/pmorjan/kmod v1.1.0
github.com/prometheus/client_golang v1.15.1
github.com/pterm/pterm v0.12.62
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
Expand Down Expand Up @@ -107,6 +109,7 @@ require (
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/term v0.15.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmorjan/kmod v1.1.0 h1:ZLb0WalLhz4ENECpySrXMgRqFfBkoqWju680MWL5X94=
github.com/pmorjan/kmod v1.1.0/go.mod h1:iGxkdcq8DCjMw61SXKPMxG7taOrEqjNTIQPPgfvgX88=
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
github.com/prometheus/client_golang v0.0.0-20180209125602-c332b6f63c06/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -725,6 +727,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/safchain/ethtool v0.3.0 h1:gimQJpsI6sc1yIqP/y8GYgiXn/NjgvpM0RNoWLVVmP0=
github.com/safchain/ethtool v0.3.0/go.mod h1:SA9BwrgyAqNo7M+uaL6IYbxpm5wk3L7Mm6ocLW+CJUs=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
Expand Down
Loading

0 comments on commit 301c557

Please sign in to comment.