Skip to content

Commit

Permalink
Merge pull request #99 from gnufied/update-release-10-branch
Browse files Browse the repository at this point in the history
Update release 1.0 branch
  • Loading branch information
k8s-ci-robot authored Aug 24, 2020
2 parents d017b1f + dddcb17 commit de2e56e
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 110 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Note that the external-resizer does not scale with more replicas. Only one exter

* `--leader-election-namespace`: Namespace where the leader election resource lives. Defaults to the pod namespace if not set.

* `--csiTimeout <duration>`: Timeout of all calls to CSI driver. It should be set to value that accommodates majority of `ControllerExpandVolume` calls. 15 seconds is used by default.
* `--timeout <duration>`: Timeout of all calls to CSI driver. It should be set to value that accommodates majority of `ControllerExpandVolume` calls. 10 seconds is used by default.

* `--retry-interval-start`: The starting value of the exponential backoff for failures. 1 second is used by default.

Expand Down
12 changes: 7 additions & 5 deletions cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"context"
"flag"
"fmt"
"os"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os"
"time"

"k8s.io/client-go/util/workqueue"

Expand All @@ -44,8 +45,9 @@ var (
resyncPeriod = flag.Duration("resync-period", time.Minute*10, "Resync period for cache")
workers = flag.Int("workers", 10, "Concurrency to process multiple resize requests")

csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
csiTimeout = flag.Duration("csiTimeout", 15*time.Second, "Timeout for waiting for CSI driver socket.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
timeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")

showVersion = flag.Bool("version", false, "Show version")

retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume resize. It exponentially increases with each failure, up to retry-interval-max.")
Expand Down Expand Up @@ -99,7 +101,7 @@ func main() {

csiResizer, err := resizer.NewResizer(
*csiAddress,
*csiTimeout,
*timeout,
kubeClient,
informerFactory,
*metricsAddress,
Expand Down
164 changes: 114 additions & 50 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -55,15 +55,18 @@ type resizeController struct {
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced cache.InformerSynced

usedPVCs *inUsePVCStore

podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced

// a cache to store PersistentVolume objects
volumes cache.Store
// a cache to store PersistentVolumeClaim objects
claims cache.Store
handleVolumeInUseError bool
}

Expand Down Expand Up @@ -91,11 +94,11 @@ func NewResizeController(
name: name,
resizer: resizer,
kubeClient: kubeClient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
claimQueue: claimQueue,
volumes: pvInformer.Informer().GetStore(),
claims: pvcInformer.Informer().GetStore(),
eventRecorder: eventRecorder,
usedPVCs: newUsedPVCStore(),
handleVolumeInUseError: handleVolumeInUseError,
Expand Down Expand Up @@ -266,6 +269,7 @@ func (ctrl *resizeController) syncPVCs() {

if err := ctrl.syncPVC(key.(string)); err != nil {
// Put PVC back to the queue so that we can retry later.
klog.Errorf("Error syncing PVC: %v", err)
ctrl.claimQueue.AddRateLimited(key)
} else {
ctrl.claimQueue.Forget(key)
Expand All @@ -278,33 +282,42 @@ func (ctrl *resizeController) syncPVC(key string) error {

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
return err
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
}

pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
pvcObject, exists, err := ctrl.claims.GetByKey(key)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
return nil
}
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
return err
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
}

if !exists {
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
return nil
}

pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("expected PVC got: %v", pvcObject)
}

if !ctrl.pvcNeedResize(pvc) {
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
return nil
}

pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
return nil
}
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
return err
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
}

if !exists {
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
return nil
}

pv, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("expected volume but got %+v", volumeObj)
}

if !ctrl.pvNeedResize(pvc, pv) {
Expand Down Expand Up @@ -369,8 +382,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
return err
return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
} else if updatedPVC != nil {
pvc = updatedPVC
}
Expand Down Expand Up @@ -406,7 +418,6 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
// Record an event to indicate that resize operation is failed.
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
}

return err
}

Expand All @@ -426,25 +437,48 @@ func (ctrl *resizeController) resizeVolume(
newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)

if err != nil {
klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
// if this error was a in-use error then it must be tracked so as we don't retry without
// first verifying if volume is in-use
if inUseError(err) {
ctrl.usedPVCs.addPVCWithInUseError(pvc)
}
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
return newSize, fsResizeRequired, fmt.Errorf("resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
}
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)

if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
err = ctrl.updatePVCapacity(pv, newSize)
if err != nil {
return newSize, fsResizeRequired, err
}
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())

return newSize, fsResizeRequired, nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

_, err := ctrl.patchClaim(pvc, newPVC)

if err != nil {
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")

return nil
}

func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Expand All @@ -455,7 +489,12 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{progressCondition})
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)

updatedPVC, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return nil, err
}
return updatedPVC, nil
}

func (ctrl *resizeController) markPVCResizeFinished(
Expand All @@ -464,9 +503,10 @@ func (ctrl *resizeController) markPVCResizeFinished(
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err

_, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
Expand All @@ -475,28 +515,52 @@ func (ctrl *resizeController) markPVCResizeFinished(
return nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
func (ctrl *resizeController) patchClaim(oldPVC, newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := util.GetPVCPatchData(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", util.PVCKey(oldPVC), err)
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
updatedClaim, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return nil, fmt.Errorf("can't patch status of PVC %s with %v", util.PVCKey(oldPVC), updateErr)
}
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")
err = ctrl.claims.Update(updatedClaim)
if err != nil {
return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
}

return updatedClaim, nil
}

func (ctrl *resizeController) updatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity) error {
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
newPV := pv.DeepCopy()
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity

_, err := ctrl.patchPersistentVolume(pv, newPV)
if err != nil {
return fmt.Errorf("updating capacity of PV %q to %s failed: %v", pv.Name, newCapacity.String(), err)
}
return nil
}

func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
patchBytes, err := util.GetPatchData(oldPV, newPV)
if err != nil {
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err)
}
updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if updateErr != nil {
return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr)
}
err = ctrl.volumes.Update(updatedPV)
if err != nil {
return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err)
}
return updatedPV, nil
}

func parsePod(obj interface{}) *v1.Pod {
if obj == nil {
return nil
Expand Down
Loading

0 comments on commit de2e56e

Please sign in to comment.