Skip to content

Commit

Permalink
Merge pull request #96 from gnufied/read-pv-from-apiserver
Browse files Browse the repository at this point in the history
We need to ensure that we are not reading stale PV objects
  • Loading branch information
k8s-ci-robot authored Aug 21, 2020
2 parents a2dda6c + a1746e8 commit dddcb17
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 92 deletions.
164 changes: 114 additions & 50 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -55,15 +55,18 @@ type resizeController struct {
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced cache.InformerSynced

usedPVCs *inUsePVCStore

podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced

// a cache to store PersistentVolume objects
volumes cache.Store
// a cache to store PersistentVolumeClaim objects
claims cache.Store
handleVolumeInUseError bool
}

Expand Down Expand Up @@ -91,11 +94,11 @@ func NewResizeController(
name: name,
resizer: resizer,
kubeClient: kubeClient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
claimQueue: claimQueue,
volumes: pvInformer.Informer().GetStore(),
claims: pvcInformer.Informer().GetStore(),
eventRecorder: eventRecorder,
usedPVCs: newUsedPVCStore(),
handleVolumeInUseError: handleVolumeInUseError,
Expand Down Expand Up @@ -266,6 +269,7 @@ func (ctrl *resizeController) syncPVCs() {

if err := ctrl.syncPVC(key.(string)); err != nil {
// Put PVC back to the queue so that we can retry later.
klog.Errorf("Error syncing PVC: %v", err)
ctrl.claimQueue.AddRateLimited(key)
} else {
ctrl.claimQueue.Forget(key)
Expand All @@ -278,33 +282,42 @@ func (ctrl *resizeController) syncPVC(key string) error {

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
return err
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
}

pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
pvcObject, exists, err := ctrl.claims.GetByKey(key)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
return nil
}
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
return err
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
}

if !exists {
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
return nil
}

pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("expected PVC got: %v", pvcObject)
}

if !ctrl.pvcNeedResize(pvc) {
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
return nil
}

pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
return nil
}
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
return err
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
}

if !exists {
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
return nil
}

pv, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("expected volume but got %+v", volumeObj)
}

if !ctrl.pvNeedResize(pvc, pv) {
Expand Down Expand Up @@ -369,8 +382,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
return err
return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
} else if updatedPVC != nil {
pvc = updatedPVC
}
Expand Down Expand Up @@ -406,7 +418,6 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
// Record an event to indicate that resize operation is failed.
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
}

return err
}

Expand All @@ -426,25 +437,48 @@ func (ctrl *resizeController) resizeVolume(
newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)

if err != nil {
klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
// if this error was a in-use error then it must be tracked so as we don't retry without
// first verifying if volume is in-use
if inUseError(err) {
ctrl.usedPVCs.addPVCWithInUseError(pvc)
}
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
return newSize, fsResizeRequired, fmt.Errorf("resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
}
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)

if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
err = ctrl.updatePVCapacity(pv, newSize)
if err != nil {
return newSize, fsResizeRequired, err
}
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())

return newSize, fsResizeRequired, nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

_, err := ctrl.patchClaim(pvc, newPVC)

if err != nil {
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")

return nil
}

func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Expand All @@ -455,7 +489,12 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{progressCondition})
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)

updatedPVC, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return nil, err
}
return updatedPVC, nil
}

func (ctrl *resizeController) markPVCResizeFinished(
Expand All @@ -464,9 +503,10 @@ func (ctrl *resizeController) markPVCResizeFinished(
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err

_, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
Expand All @@ -475,28 +515,52 @@ func (ctrl *resizeController) markPVCResizeFinished(
return nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
func (ctrl *resizeController) patchClaim(oldPVC, newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := util.GetPVCPatchData(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", util.PVCKey(oldPVC), err)
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
updatedClaim, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return nil, fmt.Errorf("can't patch status of PVC %s with %v", util.PVCKey(oldPVC), updateErr)
}
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")
err = ctrl.claims.Update(updatedClaim)
if err != nil {
return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
}

return updatedClaim, nil
}

func (ctrl *resizeController) updatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity) error {
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
newPV := pv.DeepCopy()
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity

_, err := ctrl.patchPersistentVolume(pv, newPV)
if err != nil {
return fmt.Errorf("updating capacity of PV %q to %s failed: %v", pv.Name, newCapacity.String(), err)
}
return nil
}

func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
patchBytes, err := util.GetPatchData(oldPV, newPV)
if err != nil {
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err)
}
updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if updateErr != nil {
return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr)
}
err = ctrl.volumes.Update(updatedPV)
if err != nil {
return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err)
}
return updatedPV, nil
}

func parsePod(obj interface{}) *v1.Pod {
if obj == nil {
return nil
Expand Down
87 changes: 87 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,93 @@ func TestController(t *testing.T) {
}
}

func TestResizePVC(t *testing.T) {
fsVolumeMode := v1.PersistentVolumeFilesystem

for _, test := range []struct {
Name string
PVC *v1.PersistentVolumeClaim
PV *v1.PersistentVolume

NodeResize bool
expansionFailure bool
expectFailure bool
}{
{
Name: "Resize PVC with FS resize",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
NodeResize: true,
},
{
Name: "Resize PVC with FS resize failure",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
NodeResize: true,
expansionFailure: true,
expectFailure: true,
},
} {
client := csi.NewMockClient("mock", test.NodeResize, true, true)
if test.expansionFailure {
client.SetExpansionFailed()
}
driverName, _ := client.GetDriverName(context.TODO())

initialObjects := []runtime.Object{}
if test.PVC != nil {
initialObjects = append(initialObjects, test.PVC)
}
if test.PV != nil {
test.PV.Spec.PersistentVolumeSource.CSI.Driver = driverName
initialObjects = append(initialObjects, test.PV)
}

kubeClient, informerFactory := fakeK8s(initialObjects)
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
podInformer := informerFactory.Core().V1().Pods()

metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
metricsAddress := ""
metricsPath := ""
csiResizer, err := resizer.NewResizerFromClient(client, 15*time.Second, kubeClient, informerFactory, metricsManager, metricsAddress, metricsPath)
if err != nil {
t.Fatalf("Test %s: Unable to create resizer: %v", test.Name, err)
}

controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter(), true /* disableVolumeInUseErrorHandler*/)

ctrlInstance, _ := controller.(*resizeController)

stopCh := make(chan struct{})
informerFactory.Start(stopCh)

for _, obj := range initialObjects {
switch obj.(type) {
case *v1.PersistentVolume:
pvInformer.Informer().GetStore().Add(obj)
case *v1.PersistentVolumeClaim:
pvcInformer.Informer().GetStore().Add(obj)
case *v1.Pod:
podInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Test %s: Unknown initalObject type: %+v", test.Name, obj)
}
}

err = ctrlInstance.resizePVC(test.PVC, test.PV)
if test.expectFailure && err == nil {
t.Errorf("for %s expected error got nothing", test.Name)
continue
}
if !test.expectFailure && err != nil {
t.Errorf("for %s, unexpected error: %v", test.Name, err)
}

}
}

func invalidPVC() *v1.PersistentVolumeClaim {
pvc := createPVC(1, 1)
pvc.ObjectMeta.Name = ""
Expand Down
Loading

0 comments on commit dddcb17

Please sign in to comment.