Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ListVolumes #618

Merged
merged 1 commit into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
})
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER})
d.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
Expand Down
63 changes: 62 additions & 1 deletion pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,68 @@ func (d *Driver) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (

// ListVolumes return all available volumes
func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
var err error
start := 0
if req.StartingToken != "" {
start, err = strconv.Atoi(req.StartingToken)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListVolumes starting token(%s) parsing with error: %v", req.StartingToken, err)
}
if start < 0 {
return nil, status.Errorf(codes.Aborted, "ListVolumes starting token(%d) can not be negative", start)
}
}

disks, derr := d.cloud.DisksClient.ListByResourceGroup(ctx, d.cloud.ResourceGroup)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually there are other scenarios that disks are in created other resource group, so ListVolumes result is only accurate for default condition. On the other hand, agent nodes are all in d.cloud.ResourceGroup, so ListVolumes of all agent nodes would be accurate. I think this could be right solution: list disks of all agent nodes under d.cloud.ResourceGroup

Copy link
Member

@andyzhangx andyzhangx Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking use d.cloud.KubeClient.CoreV1().PersistentVolumes() to get all PV info and then get all diskURI from pv.csi.volumeid and then get disk.ManagedBy, that would provide complete disk list, could refer to:

Copy link
Member

@andyzhangx andyzhangx Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would require much more work, while it could list all disks this cluster is using, otherwise it could not cover all disks and may provide incomplete disk list, not sure what would happen if disk list is incomplete.

if derr != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("ListVolumes on rg(%s) failed with error: %v", d.cloud.ResourceGroup, derr.Error()))
}

if start != 0 && start >= len(disks) {
return nil, status.Errorf(codes.Aborted, "ListVolumes starting token(%d) on rg(%s) is greater than total number of volumes", start, d.cloud.ResourceGroup)
}

maxEntries := len(disks) - start
if req.MaxEntries > 0 && int(req.MaxEntries) < maxEntries {
maxEntries = int(req.MaxEntries)
}
pageEnd := start + maxEntries

vmSet := d.cloud.VMSet
entries := []*csi.ListVolumesResponse_Entry{}
for i := start; i < pageEnd; i++ {
d := disks[i]
nodeList := []string{}

if d.ManagedBy != nil {
attachedNode, err := vmSet.GetNodeNameByProviderID(*d.ManagedBy)
if err != nil {
return nil, err
}
nodeList = append(nodeList, string(attachedNode))
}

entries = append(entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: *d.ID,
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: nodeList,
},
})
}

nextTokenString := ""
if pageEnd < len(disks) {
nextTokenString = strconv.Itoa(pageEnd)
}

listVolumesResp := &csi.ListVolumesResponse{
Entries: entries,
NextToken: nextTokenString,
}

return listVolumesResp, nil
}

// ControllerExpandVolume controller expand volume
Expand Down
133 changes: 127 additions & 6 deletions pkg/azuredisk/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,12 +1481,133 @@ func TestGetCapacity(t *testing.T) {
}

func TestListVolumes(t *testing.T) {
d, _ := NewFakeDriver(t)
andyzhangx marked this conversation as resolved.
Show resolved Hide resolved
req := csi.ListVolumesRequest{}
resp, err := d.ListVolumes(context.Background(), &req)
assert.Nil(t, resp)
if !reflect.DeepEqual(err, status.Error(codes.Unimplemented, "")) {
t.Errorf("Unexpected error: %v", err)
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{
name: "Valid list without max_entries or starting_token",
testFunc: func(t *testing.T) {
req := csi.ListVolumesRequest{}
d, _ := NewFakeDriver(t)
fakeVolumeID := "test"
disk := compute.Disk{ID: &fakeVolumeID}
disks := []compute.Disk{}
disks = append(disks, disk)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
d.cloud.DisksClient.(*mockdiskclient.MockInterface).EXPECT().ListByResourceGroup(gomock.Any(), gomock.Any()).Return(disks, nil).AnyTimes()
expectedErr := error(nil)
listVolumesResponse, err := d.ListVolumes(context.TODO(), &req)
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
if listVolumesResponse.NextToken != "" {
t.Errorf("actualNextToken: (%v), expectedNextToken: (%v)", listVolumesResponse.NextToken, "")
}
},
},
{
name: "Valid list with max_entries",
testFunc: func(t *testing.T) {
req := csi.ListVolumesRequest{
MaxEntries: 1,
}
d, _ := NewFakeDriver(t)
fakeVolumeID := "test"
disk1, disk2 := compute.Disk{ID: &fakeVolumeID}, compute.Disk{ID: &fakeVolumeID}
disks := []compute.Disk{}
disks = append(disks, disk1, disk2)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
d.cloud.DisksClient.(*mockdiskclient.MockInterface).EXPECT().ListByResourceGroup(gomock.Any(), gomock.Any()).Return(disks, nil).AnyTimes()
expectedErr := error(nil)
listVolumesResponse, err := d.ListVolumes(context.TODO(), &req)
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
if len(listVolumesResponse.Entries) != int(req.MaxEntries) {
t.Errorf("Actual number of entries: (%v), Expected number of entries: (%v)", len(listVolumesResponse.Entries), req.MaxEntries)
}
if listVolumesResponse.NextToken != "1" {
t.Errorf("actualNextToken: (%v), expectedNextToken: (%v)", listVolumesResponse.NextToken, "1")
}
},
},
{
name: "Valid list with max_entries and starting_token",
testFunc: func(t *testing.T) {
req := csi.ListVolumesRequest{
StartingToken: "1",
MaxEntries: 1,
}
d, _ := NewFakeDriver(t)
fakeVolumeID1, fakeVolumeID12 := "test1", "test2"
disk1, disk2 := compute.Disk{ID: &fakeVolumeID1}, compute.Disk{ID: &fakeVolumeID12}
disks := []compute.Disk{}
disks = append(disks, disk1, disk2)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
d.cloud.DisksClient.(*mockdiskclient.MockInterface).EXPECT().ListByResourceGroup(gomock.Any(), gomock.Any()).Return(disks, nil).AnyTimes()
expectedErr := error(nil)
listVolumesResponse, err := d.ListVolumes(context.TODO(), &req)
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
if len(listVolumesResponse.Entries) != int(req.MaxEntries) {
t.Errorf("Actual number of entries: (%v), Expected number of entries: (%v)", len(listVolumesResponse.Entries), req.MaxEntries)
}
if listVolumesResponse.NextToken != "" {
t.Errorf("actualNextToken: (%v), expectedNextToken: (%v)", listVolumesResponse.NextToken, "")
}
if listVolumesResponse.Entries[0].Volume.VolumeId != fakeVolumeID12 {
t.Errorf("actualVolumeId: (%v), expectedVolumeId: (%v)", listVolumesResponse.Entries[0].Volume.VolumeId, fakeVolumeID12)
}
},
},
{
name: "ListVolumes request with starting token but no entries in response",
testFunc: func(t *testing.T) {
req := csi.ListVolumesRequest{
StartingToken: "1",
}
d, _ := NewFakeDriver(t)
disks := []compute.Disk{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
d.cloud.DisksClient.(*mockdiskclient.MockInterface).EXPECT().ListByResourceGroup(gomock.Any(), gomock.Any()).Return(disks, nil).AnyTimes()
expectedErr := status.Error(codes.Aborted, "ListVolumes starting token(1) on rg(rg) is greater than total number of volumes")
_, err := d.ListVolumes(context.TODO(), &req)
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
},
},
{
name: "ListVolumes list resource error",
testFunc: func(t *testing.T) {
req := csi.ListVolumesRequest{
StartingToken: "1",
}
d, _ := NewFakeDriver(t)
disks := []compute.Disk{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
rerr := &retry.Error{
RawError: fmt.Errorf("test"),
}
d.cloud.DisksClient.(*mockdiskclient.MockInterface).EXPECT().ListByResourceGroup(gomock.Any(), gomock.Any()).Return(disks, rerr).AnyTimes()
expectedErr := status.Error(codes.Internal, "ListVolumes on rg(rg) failed with error: Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: test")
_, err := d.ListVolumes(context.TODO(), &req)
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
},
},
}

for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func NewFakeDriver(t *testing.T) (*Driver, error) {
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
})
driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER})
driver.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
Expand Down
3 changes: 3 additions & 0 deletions test/integration/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ echo 'Attach volume test:'
"$CSC_BIN" controller publish --endpoint "$endpoint" --node-id "$node" --cap 1,block "$volumeid"
sleep 20

echo 'ListVolumes test:'
"$CSC_BIN" controller list-volumes --endpoint "$endpoint" --max-entries 1 --starting-token 0

echo 'Detach volume test:'
"$CSC_BIN" controller unpublish --endpoint "$endpoint" --node-id "$node" "$volumeid"
sleep 30
Expand Down
2 changes: 1 addition & 1 deletion test/sanity/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ _output/azurediskplugin --endpoint "$endpoint" --nodeid "$nodeid" -v=5 &

echo 'Begin to run sanity test...'
readonly CSI_SANITY_BIN='csi-sanity'
"$CSI_SANITY_BIN" --ginkgo.v --csi.endpoint="$endpoint" --ginkgo.skip='should work|should fail when volume does not exist on the specified path'
"$CSI_SANITY_BIN" --ginkgo.v --csi.endpoint="$endpoint" --ginkgo.skip='should work|should fail when volume does not exist on the specified path|should return appropriate capabilities|pagination should detect volumes added between pages and accept tokens when the last volume from a page is deleted'