Skip to content

Commit

Permalink
fix: find zone by region
Browse files Browse the repository at this point in the history
If PV has allowedTopologies option, kubernetes schedule does not find the best location.
We will get first node which has storage type.

Signed-off-by: Serge Logvinov <[email protected]>
  • Loading branch information
sergelogvinov committed Jan 1, 2024
1 parent d8c98ea commit e76681f
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 17 deletions.
28 changes: 21 additions & 7 deletions pkg/csi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,36 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
accessibleTopology := request.GetAccessibilityRequirements()

region, zone := locationFromTopologyRequirement(accessibleTopology)
if region == "" || zone == "" {
klog.Errorf("CreateVolume: region or zone is empty: accessibleTopology=%+v", accessibleTopology)
if region == "" {
klog.Errorf("CreateVolume: region is empty: accessibleTopology=%+v", accessibleTopology)

return nil, status.Error(codes.InvalidArgument, "cannot find best region and zone")
return nil, status.Error(codes.Internal, "cannot find best region")
}

cl, err := d.Cluster.GetProxmoxCluster(region)
if err != nil {
klog.Errorf("failed to get proxmox cluster: %v", err)
klog.Errorf("CreateVolume: failed to get proxmox cluster: %v", err)

return nil, status.Error(codes.Internal, err.Error())
}

if zone == "" {
if zone, err = getNodeWithStorage(cl, params[StorageIDKey]); err != nil {
klog.Errorf("CreateVolume: failed to get node with storage: %v", err)

return nil, status.Errorf(codes.Internal, "cannot find best region and zone: %v", err)
}
}

if region == "" || zone == "" {
klog.Errorf("CreateVolume: region or zone is empty: accessibleTopology=%+v", accessibleTopology)

return nil, status.Error(codes.Internal, "cannot find best region and zone")
}

storageConfig, err := cl.GetStorageConfig(params[StorageIDKey])
if err != nil {
klog.Errorf("failed to get proxmox storage config: %v", err)
klog.Errorf("CreateVolume: failed to get proxmox storage config: %v", err)

return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -157,7 +171,7 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
// https://pve.proxmox.com/wiki/Storage only block/local storage are supported
switch storageConfig["type"].(string) {
case "nfs", "cifs", "pbs":
return nil, status.Error(codes.InvalidArgument, "error: shared storage type nfs,cifs,pbs are not supported")
return nil, status.Error(codes.Internal, "error: shared storage type nfs,cifs,pbs are not supported")
}

topology = &csi.Topology{
Expand All @@ -176,7 +190,7 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
size, err := getVolumeSize(cl, vol)
if err != nil {
if err.Error() != ErrorNotFound {
klog.Errorf("failed to check if pvc exists: %v", err)
klog.Errorf("CreateVolume: failed to check if pvc exists: %v", err)

return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
52 changes: 45 additions & 7 deletions pkg/csi/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ clusters:
},
)

httpmock.RegisterResponder("GET", "https://127.0.0.1:8006/api2/json/nodes",
func(req *http.Request) (*http.Response, error) {
return httpmock.NewJsonResponse(200, map[string]interface{}{
"data": []interface{}{
map[string]interface{}{
"node": "pve-1",
"status": "online",
},
map[string]interface{}{
"node": "pve-2",
"status": "online",
},
},
})
},
)

httpmock.RegisterResponder("GET", "https://127.0.0.1:8006/api2/json/nodes/pve-1/qemu/100/config",
func(req *http.Request) (*http.Response, error) {
return httpmock.NewJsonResponse(200, map[string]interface{}{
Expand Down Expand Up @@ -398,26 +415,28 @@ func (ts *csiTestSuite) TestCreateVolume() {
VolumeCapabilities: []*proto.VolumeCapability{volcap},
CapacityRange: volsize,
},
expectedError: status.Error(codes.InvalidArgument, "cannot find best region and zone"),
expectedError: status.Error(codes.Internal, "cannot find best region"),
},
{
msg: "EmptyZone",
request: &proto.CreateVolumeRequest{
Name: "volume-id",
Parameters: volParam,
Name: "volume-id",
Parameters: map[string]string{
"storage": "fake-storage",
},
VolumeCapabilities: []*proto.VolumeCapability{volcap},
CapacityRange: volsize,
AccessibilityRequirements: &proto.TopologyRequirement{
Preferred: []*proto.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: "region",
corev1.LabelTopologyRegion: "cluster-1",
},
},
},
},
},
expectedError: status.Error(codes.InvalidArgument, "cannot find best region and zone"),
expectedError: status.Error(codes.Internal, "cannot find best region and zone: failed to find node with storage fake-storage"),
},
{
msg: "EmptyRegion",
Expand All @@ -436,7 +455,26 @@ func (ts *csiTestSuite) TestCreateVolume() {
},
},
},
expectedError: status.Error(codes.InvalidArgument, "cannot find best region and zone"),
expectedError: status.Error(codes.Internal, "cannot find best region"),
},
{
msg: "UnknowRegion",
request: &proto.CreateVolumeRequest{
Name: "volume-id",
Parameters: volParam,
VolumeCapabilities: []*proto.VolumeCapability{volcap},
CapacityRange: volsize,
AccessibilityRequirements: &proto.TopologyRequirement{
Preferred: []*proto.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: "unknown-region",
},
},
},
},
},
expectedError: status.Error(codes.Internal, "proxmox cluster unknown-region not found"),
},
{
msg: "NonSupportZonalSMB",
Expand All @@ -458,7 +496,7 @@ func (ts *csiTestSuite) TestCreateVolume() {
},
},
},
expectedError: status.Error(codes.InvalidArgument, "error: shared storage type nfs,cifs,pbs are not supported"),
expectedError: status.Error(codes.Internal, "error: shared storage type nfs,cifs,pbs are not supported"),
},
{
msg: "WrongClusterNotFound",
Expand Down
23 changes: 21 additions & 2 deletions pkg/csi/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func locationFromTopologyRequirement(tr *proto.TopologyRequirement) (region, zon
return "", ""
}

for _, top := range tr.Preferred {
for _, top := range tr.GetPreferred() {
segment := top.GetSegments()

tsr := segment[corev1.LabelTopologyRegion]
Expand All @@ -69,9 +69,28 @@ func locationFromTopologyRequirement(tr *proto.TopologyRequirement) (region, zon
if tsr != "" && tsz != "" {
return tsr, tsz
}

if tsr != "" && region == "" {
region = tsr
}
}

for _, top := range tr.GetRequisite() {
segment := top.GetSegments()

tsr := segment[corev1.LabelTopologyRegion]
tsz := segment[corev1.LabelTopologyZone]

if tsr != "" && tsz != "" {
return tsr, tsz
}

if tsr != "" && region == "" {
region = tsr
}
}

return "", ""
return region, ""
}

func stripSecrets(msg interface{}) string {
Expand Down
31 changes: 30 additions & 1 deletion pkg/csi/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,21 @@ func TestLocationFromTopologyRequirement(t *testing.T) {
},
},
},
expectedRegion: "",
expectedRegion: "region1",
expectedZone: "",
},
{
msg: "EmptyTopologyRequisiteZone",
topology: &proto.TopologyRequirement{
Requisite: []*proto.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: "region1",
},
},
},
},
expectedRegion: "region1",
expectedZone: "",
},
{
Expand Down Expand Up @@ -127,6 +141,21 @@ func TestLocationFromTopologyRequirement(t *testing.T) {
expectedRegion: "region1",
expectedZone: "zone1",
},
{
msg: "TopologyRequisite",
topology: &proto.TopologyRequirement{
Requisite: []*proto.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: "region1",
corev1.LabelTopologyZone: "zone1",
},
},
},
},
expectedRegion: "region1",
expectedZone: "zone1",
},
}

for _, testCase := range tests {
Expand Down
28 changes: 28 additions & 0 deletions pkg/csi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,34 @@ type storageContent struct {
size int64
}

func getNodeWithStorage(cl *pxapi.Client, storageName string) (string, error) {
data, err := cl.GetNodeList()
if err != nil {
return "", fmt.Errorf("failed to get node list: %v", err)
}

if data["data"] == nil {
return "", fmt.Errorf("failed to parce node list: %v", err)
}

for _, item := range data["data"].([]interface{}) {
node, ok := item.(map[string]interface{})
if !ok {
continue
}

vmr := pxapi.NewVmRef(vmID)
vmr.SetNode(node["node"].(string))
vmr.SetVmType("qemu")

if _, err := cl.GetStorageStatus(vmr, storageName); err == nil {
return vmr.Node(), nil
}
}

return "", fmt.Errorf("failed to find node with storage %s", storageName)
}

func getStorageContent(cl *pxapi.Client, vol *volume.Volume) (*storageContent, error) {
vmr := pxapi.NewVmRef(vmID)
vmr.SetNode(vol.Node())
Expand Down

0 comments on commit e76681f

Please sign in to comment.