Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for running csi sanity tests. #244

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ script:
- go test -v github.com/opensds/nbp/csi/... -cover
- go test -v github.com/opensds/nbp/flexvolume/... -cover
- go test -v github.com/opensds/nbp/cindercompatibleapi/... -cover
- go test -v github.com/opensds/nbp/csi/server/sanity/... -cover

after_success:
# Clean OpenSDS northbound plugin built data
Expand Down
4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
name = "k8s.io/client-go"
version = "kubernetes-1.13.0-beta.1"

[[constraint]]
name = "github.com/kubernetes-csi/csi-test"
version = "1.1.0"

[prune]
non-go = true
go-tests = true
Expand Down
130 changes: 93 additions & 37 deletions csi/server/plugin/opensds/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ func (p *Plugin) ControllerPublishVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, msg)
}

if req.GetVolumeCapability() == nil {
msg := "volume capability must be provided"
glog.Error(msg)
return nil, status.Error(codes.InvalidArgument, msg)
}

if req.GetReadonly() {
return nil, status.Error(codes.AlreadyExists, "read only volumes are not supported")
}

glog.V(5).Infof("plugin information %#v", p)
glog.V(5).Infof("current storage type: %s", p.PluginStorageType)

Expand Down Expand Up @@ -193,11 +203,29 @@ func (p *Plugin) ControllerUnpublishVolume(
}

// ValidateVolumeCapabilities implementation
func (p *Plugin) ValidateVolumeCapabilities(
ctx context.Context,
func (p *Plugin) ValidateVolumeCapabilities(ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (
*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")

volId := req.GetVolumeId()
if volId == "" {
return nil, status.Error(codes.InvalidArgument, "")
}

if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "")
}

vol, err := p.Client.GetVolume(volId)
if vol == nil || err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}

return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeCapabilities: req.VolumeCapabilities,
},
}, nil
}

// ListVolumes implementation
Expand Down Expand Up @@ -296,6 +324,13 @@ func (p *Plugin) ControllerGetCapabilities(
},
},
},
&csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_PUBLISH_READONLY,
},
},
},
},
}, nil
}
Expand Down Expand Up @@ -426,12 +461,17 @@ func (p *Plugin) DeleteSnapshot(
glog.V(5).Infof("start to delete snapshot, snapshot id: %v, delete snapshot secrets: %v!",
req.SnapshotId, req.Secrets)

if 0 == len(req.SnapshotId) {
snpId := req.GetSnapshotId()
if snpId == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot id cannot be empty")
}

err := p.Client.DeleteVolumeSnapshot(req.SnapshotId, nil)
snp, _ := p.Client.GetVolumeSnapshot(snpId)
if snp == nil {
return &csi.DeleteSnapshotResponse{}, nil
}

err := p.Client.DeleteVolumeSnapshot(req.SnapshotId, nil)
if nil != err {
msg := fmt.Sprintf("delete snapshot failed: %v", err)
glog.Error(msg)
Expand Down Expand Up @@ -488,14 +528,14 @@ func (p *Plugin) ListSnapshots(
break
case (0 == snapshotIDLen) && (0 != sourceVolumeIdLen):
if len(snapshotsFilterByVolumeId) <= 0 {
return nil, status.Error(codes.NotFound, fmt.Sprintf("no snapshot with source volume id %s", sourceVolumeId))
return &csi.ListSnapshotsResponse{Entries: []*csi.ListSnapshotsResponse_Entry{}}, nil
}

filterResult = snapshotsFilterByVolumeId
break
case (0 != snapshotIDLen) && (0 == sourceVolumeIdLen):
if len(snapshotsFilterById) <= 0 {
return nil, status.Error(codes.NotFound, fmt.Sprintf("no snapshot with id %s", snapshotId))
return &csi.ListSnapshotsResponse{Entries: []*csi.ListSnapshotsResponse_Entry{}}, nil
}

filterResult = snapshotsFilterById
Expand Down Expand Up @@ -687,28 +727,24 @@ func (p *Plugin) UnpublishRoutine() {

if err := p.Client.DeleteVolumeAttachment(act.Id, act); err != nil {
glog.Errorf("%s failed to unpublish: %v", act.Id, err)
} else {
waitAttachmentDeleted(act.Id, func(id string) (interface{}, error) {
return p.Client.GetVolumeAttachment(id)
}, e)
}

waitAttachmentDeleted(act.Id, func(id string) (interface{}, error) {
return p.Client.GetVolumeAttachment(id)
}, e)

// delete fileshare access control list if storage type is file
case *model.FileShareAclSpec:
act := e.Value.(*model.FileShareAclSpec)

if err := p.Client.DeleteFileShareAcl(act.Id); err != nil {
if strings.Contains(err.Error(), "Not Found") {
glog.Infof("delete attachment %s successfully", act.Id)
UnpublishAttachmentList.Delete(e)
} else {
glog.Errorf("%s failed to unpublish: %v", act.Id, err)
}
} else {
waitAttachmentDeleted(act.Id, func(id string) (interface{}, error) {
return p.Client.GetFileShareAcl(id)
}, e)
glog.Errorf("%s failed to unpublish: %v", act.Id, err)
}

waitAttachmentDeleted(act.Id, func(id string) (interface{}, error) {
return p.Client.GetFileShareAcl(id)
}, e)

}

time.Sleep(10 * time.Second)
Expand All @@ -726,16 +762,25 @@ func waitAttachmentDeleted(id string, f func(string) (interface{}, error), e *li
for {
select {
case <-ticker.C:
_, err := f(id)

if err != nil && strings.Contains(err.Error(), "Not Found") {
glog.Infof("delete attachment %s successfully", id)
UnpublishAttachmentList.Delete(e)
return
} else {
glog.Errorf("delete attachment failed: %v", err)
v, _ := f(id)

switch v.(type) {
case *model.VolumeAttachmentSpec:
if v.(*model.VolumeAttachmentSpec) == nil {
glog.Infof("delete attachment %s successfully", id)
UnpublishAttachmentList.Delete(e)
return
}
case *model.FileShareAclSpec:
if v.(*model.FileShareAclSpec) == nil {
glog.Infof("delete attachment %s successfully", id)
UnpublishAttachmentList.Delete(e)
return
}
}

glog.Errorf("delete attachment %#v failed", v)

case <-timeout:
glog.Errorf("waiting to delete %s timeout", id)
return
Expand All @@ -755,14 +800,14 @@ func extractISCSIInitiatorFromNodeInfo(nodeInfo string) (string, error) {
}

func extractNvmeofInitiatorFromNodeInfo(nodeInfo string) (string, error) {
for _, v := range strings.Split(nodeInfo, ",") {
if strings.Contains(v, "nqn") {
glog.V(5).Info("Nvmeof initiator is ", v)
return v, nil
}
}

return "", errors.New("no Nvmeof initiators found")
for _, v := range strings.Split(nodeInfo, ",") {
if strings.Contains(v, "nqn") {
glog.V(5).Info("Nvmeof initiator is ", v)
return v, nil
}
}

return "", errors.New("no Nvmeof initiators found")
}

func extractFCInitiatorFromNodeInfo(nodeInfo string) ([]string, error) {
Expand All @@ -782,6 +827,17 @@ func extractFCInitiatorFromNodeInfo(nodeInfo string) ([]string, error) {
return wwpns, nil
}

func extractIpFromNodeInfo(nodeInfo string) (string, error) {
for _, v := range strings.Split(nodeInfo, ",") {
ip := net.ParseIP(v)
if ip != nil {
return v, nil
}
}

return "", errors.New("cannot find valid ip address")
}

func getZone(requirement *csi.TopologyRequirement) string {
if requirement == nil {
return ""
Expand Down
10 changes: 7 additions & 3 deletions csi/server/plugin/opensds/fileshare.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ const (
)

type FileShare struct {
Client *client.Client
Client *client.Client
Mounter connector.Mounter
}

func NewFileshare(c *client.Client) *FileShare {
return &FileShare{Client: c}
func NewFileshare(c *client.Client, Mounter connector.Mounter) *FileShare {
return &FileShare{
Client: c,
Mounter: Mounter,
}
}

func (f *FileShare) CreateFileShare(req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
Expand Down
6 changes: 3 additions & 3 deletions csi/server/plugin/opensds/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (p *Plugin) NodeGetInfo(
glog.Info("start to get node info")
defer glog.Info("end to get node info")

hostName, err := connector.GetHostName()
hostName, err := p.Mounter.GetHostName()
if err != nil {
msg := fmt.Sprintf("failed to get node name: %v", err)
glog.Error(msg)
Expand All @@ -171,7 +171,7 @@ func (p *Plugin) NodeGetInfo(

var initiators []string

volDriverTypes := []string{connector.FcDriver, connector.IscsiDriver, connector.NvmeofDriver}
volDriverTypes := []string{connector.FcDriver, connector.IscsiDriver, connector.NvmeofDriver, connector.SampleDriver}

for _, volDriverType := range volDriverTypes {
volDriver := connector.NewConnector(volDriverType)
Expand All @@ -195,7 +195,7 @@ func (p *Plugin) NodeGetInfo(
return nil, status.Error(codes.FailedPrecondition, msg)
}

nodeId := hostName + "," + strings.Join(initiators, ",") + "," + connector.GetHostIP()
nodeId := hostName + "," + strings.Join(initiators, ",") + "," + p.Mounter.GetHostIP()

glog.Infof("node info is %s", nodeId)

Expand Down
7 changes: 5 additions & 2 deletions csi/server/plugin/opensds/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/opensds/nbp/client/opensds"
"github.com/opensds/nbp/csi/server/plugin"
"github.com/opensds/opensds/client"
"github.com/opensds/opensds/contrib/connector"
)

// Plugin define
Expand All @@ -27,6 +28,7 @@ type Plugin struct {
Client *client.Client
VolumeClient *Volume
FileShareClient *FileShare
Mounter connector.Mounter
}

func NewServer(endpoint, authStrategy, storageType string) (plugin.Service, error) {
Expand All @@ -40,8 +42,9 @@ func NewServer(endpoint, authStrategy, storageType string) (plugin.Service, erro
p := &Plugin{
PluginStorageType: storageType,
Client: client,
VolumeClient: NewVolume(client),
FileShareClient: NewFileshare(client),
VolumeClient: NewVolume(client, connector.GetCommonMounter()),
FileShareClient: NewFileshare(client, connector.GetCommonMounter()),
Mounter: connector.GetCommonMounter(),
}

// When there are multiple volumes unmount at the same time,
Expand Down
Loading