Skip to content

Commit

Permalink
fix: keep rs informer updated (#3091)
Browse files Browse the repository at this point in the history
* keep rs informer updated

Signed-off-by: zachaller <[email protected]>

* correct bad log

Signed-off-by: zachaller <[email protected]>

* add error context

Signed-off-by: zachaller <[email protected]>

---------

Signed-off-by: zachaller <[email protected]>
  • Loading branch information
zachaller authored Oct 12, 2023
1 parent 017d362 commit 28d9502
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
3 changes: 2 additions & 1 deletion rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type reconcilerBase struct {
replicaSetSynced cache.InformerSynced
rolloutsInformer cache.SharedIndexInformer
rolloutsLister listers.RolloutLister
replicaSetInformer cache.SharedIndexInformer
rolloutsSynced cache.InformerSynced
rolloutsIndexer cache.Indexer
servicesLister v1.ServiceLister
Expand Down Expand Up @@ -176,7 +177,6 @@ func NewController(cfg ControllerConfig) *Controller {
controllerutil.EnqueueAfter(obj, duration, cfg.RolloutWorkQueue)
},
}

base := reconcilerBase{
kubeclientset: cfg.KubeClientSet,
argoprojclientset: cfg.ArgoProjClientset,
Expand All @@ -185,6 +185,7 @@ func NewController(cfg ControllerConfig) *Controller {
replicaSetLister: cfg.ReplicaSetInformer.Lister(),
replicaSetSynced: cfg.ReplicaSetInformer.Informer().HasSynced,
rolloutsInformer: cfg.RolloutsInformer.Informer(),
replicaSetInformer: cfg.ReplicaSetInformer.Informer(),
rolloutsIndexer: cfg.RolloutsInformer.Informer().GetIndexer(),
rolloutsLister: cfg.RolloutsInformer.Lister(),
rolloutsSynced: cfg.RolloutsInformer.Informer().HasSynced,
Expand Down
4 changes: 3 additions & 1 deletion rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error {
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
c.replicaSetInformer.GetIndexer().Update(rs)
}
return err
}
Expand All @@ -56,9 +57,10 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay
}
deadline := timeutil.MetaNow().Add(scaleDownDelaySeconds).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, deadline)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
rs, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds)
c.replicaSetInformer.GetIndexer().Update(rs)
}
return err
}
Expand Down
21 changes: 17 additions & 4 deletions rollout/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,29 @@ func TestReconcileNewReplicaSet(t *testing.T) {
rollout := newBlueGreenRollout("foo", test.rolloutReplicas, nil, "", "")
fake := fake.Clientset{}
k8sfake := k8sfake.Clientset{}

f := newFixture(t)
defer f.Close()
f.objects = append(f.objects, rollout)
f.replicaSetLister = append(f.replicaSetLister, oldRS, newRS)
f.kubeobjects = append(f.kubeobjects, oldRS, newRS)
_, informers, k8sInformer := f.newController(noResyncPeriodFunc)
stopCh := make(chan struct{})
informers.Start(stopCh)
informers.WaitForCacheSync(stopCh)
close(stopCh)

roCtx := rolloutContext{
log: logutil.WithRollout(rollout),
rollout: rollout,
newRS: newRS,
stableRS: oldRS,
reconcilerBase: reconcilerBase{
argoprojclientset: &fake,
kubeclientset: &k8sfake,
recorder: record.NewFakeEventRecorder(),
resyncPeriod: 30 * time.Second,
argoprojclientset: &fake,
kubeclientset: &k8sfake,
recorder: record.NewFakeEventRecorder(),
resyncPeriod: 30 * time.Second,
replicaSetInformer: k8sInformer.Apps().V1().ReplicaSets().Informer(),
},
pauseContext: &pauseContext{
rollout: rollout,
Expand Down
10 changes: 9 additions & 1 deletion rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) {
if annotationsUpdated || minReadySecondsNeedsUpdate || affinityNeedsUpdate {
rsCopy.Spec.MinReadySeconds = c.rollout.Spec.MinReadySeconds
rsCopy.Spec.Template.Spec.Affinity = replicasetutil.GenerateReplicaSetAffinity(*c.rollout)
return c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
rs, err := c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err != nil {
c.log.WithError(err).Error("Error: updating replicaset revision")
return nil, fmt.Errorf("error updating replicaset revision: %v", err)
}
c.log.Infof("Synced revision on ReplicaSet '%s' to '%s'", rs.Name, newRevision)
c.replicaSetInformer.GetIndexer().Update(rs)
return rs, nil
}

// Should use the revision in existingNewRS's annotation, since it set by before
Expand Down Expand Up @@ -370,6 +377,7 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32,
scaled = true
revision, _ := replicasetutil.Revision(rs)
c.recorder.Eventf(rollout, record.EventOptions{EventReason: conditions.ScalingReplicaSetReason}, conditions.ScalingReplicaSetMessage, scalingOperation, rs.Name, revision, oldScale, newScale)
c.replicaSetInformer.GetIndexer().Update(rs)
}
}
return scaled, rs, err
Expand Down

0 comments on commit 28d9502

Please sign in to comment.