Skip to content

Commit

Permalink
simplify status and perform in correct order
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Oct 25, 2023
1 parent bf763c8 commit e0d1742
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 280 deletions.
17 changes: 9 additions & 8 deletions controllers/cosmosfullnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewFullNode(
configMapControl: fullnode.NewConfigMapControl(client),
nodeKeyControl: fullnode.NewNodeKeyControl(client),
peerCollector: fullnode.NewPeerCollector(client),
podControl: fullnode.NewPodControl(client, cacheController),
podControl: fullnode.NewPodControl(client),
pvcControl: fullnode.NewPVCControl(client),
recorder: recorder,
serviceControl: fullnode.NewServiceControl(client),
Expand Down Expand Up @@ -118,7 +118,10 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
reporter := kube.NewEventReporter(logger, r.recorder, crd)

fullnode.ResetStatus(crd)
defer r.updateStatus(ctx, crd)

syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

defer r.updateStatus(ctx, crd, &syncInfo)

errs := &kube.ReconcileErrors{}

Expand Down Expand Up @@ -169,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// Reconcile pods.
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums)
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, &syncInfo)
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -218,16 +221,14 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e
return stopResult, err
}

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) {
consensus := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo *cosmosv1.SyncInfoStatus) {
if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) {
status.ObservedGeneration = crd.Status.ObservedGeneration
status.Phase = crd.Status.Phase
status.StatusMessage = crd.Status.StatusMessage
status.Peers = crd.Status.Peers
status.SyncInfo = &consensus
for _, v := range consensus.Pods {
status.SyncInfo = syncInfo
for _, v := range syncInfo.Pods {
if v.Height != nil && *v.Height > 0 {
if status.Height == nil {
status.Height = make(map[string]uint64)
Expand Down
5 changes: 0 additions & 5 deletions internal/cosmos/cache_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,6 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje
return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now())
}

// PodsWithStatus returns all pods with their status.
func (c *CacheController) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []PodStatus {
return c.Collect(ctx, client.ObjectKeyFromObject(crd)).PodsWithStatus(crd)
}

func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.client.List(ctx, &pods,
Expand Down
44 changes: 0 additions & 44 deletions internal/cosmos/status_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -114,46 +113,3 @@ func (coll StatusCollection) Synced() StatusCollection {
func (coll StatusCollection) SyncedPods() []*corev1.Pod {
return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() })
}

// PodStatus is the status of a pod.
type PodStatus struct {
Pod *corev1.Pod
RPCReachable bool
Synced bool
AwaitingUpgrade bool
}

// PodsWithStatus returns all pods with their status.
func (coll StatusCollection) PodsWithStatus(crd *cosmosv1.CosmosFullNode) []PodStatus {
out := make([]PodStatus, len(coll))
for i, status := range coll {
ps := PodStatus{
Pod: status.GetPod(),
}
if crd.Spec.ChainSpec.Versions != nil {
instanceHeight := uint64(0)
if height, ok := crd.Status.Height[status.Pod.Name]; ok {
instanceHeight = height
}
var image string
for _, version := range crd.Spec.ChainSpec.Versions {
if instanceHeight < version.UpgradeHeight {
break
}
image = version.Image
}
if image != "" && status.Pod.Spec.Containers[0].Image != image {
ps.AwaitingUpgrade = true
}
}
if status.Err == nil {
ps.RPCReachable = true
if !status.Status.Result.SyncInfo.CatchingUp {
ps.Synced = true
}
}

out[i] = ps
}
return out
}
81 changes: 48 additions & 33 deletions internal/fullnode/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/cosmos"
"github.com/strangelove-ventures/cosmos-operator/internal/diff"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
Expand All @@ -16,10 +15,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PodFilter interface {
PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus
}

// Client is a controller client. It is a subset of client.Client.
type Client interface {
client.Reader
Expand All @@ -31,22 +26,26 @@ type Client interface {
// PodControl reconciles pods for a CosmosFullNode.
type PodControl struct {
client Client
podFilter PodFilter
computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int
}

// NewPodControl returns a valid PodControl.
func NewPodControl(client Client, filter PodFilter) PodControl {
func NewPodControl(client Client) PodControl {
return PodControl{
client: client,
podFilter: filter,
computeRollout: kube.ComputeRollout,
}
}

// Reconcile is the control loop for pods. The bool return value, if true, indicates the controller should requeue
// the request.
func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd *cosmosv1.CosmosFullNode, cksums ConfigChecksums) (bool, kube.ReconcileError) {
func (pc PodControl) Reconcile(
ctx context.Context,
reporter kube.Reporter,
crd *cosmosv1.CosmosFullNode,
cksums ConfigChecksums,
syncInfo *cosmosv1.SyncInfoStatus,
) (bool, kube.ReconcileError) {
var pods corev1.PodList
if err := pc.client.List(ctx, &pods,
client.InNamespace(crd.Namespace),
Expand All @@ -62,7 +61,7 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd
diffed := diff.New(ptrSlice(pods.Items), wantPods)

for _, pod := range diffed.Creates() {
reporter.Info("Creating pod", "pod", pod.Name)
reporter.Info("Creating pod", "name", pod.Name)
if err := ctrl.SetControllerReference(crd, pod, pc.client.Scheme()); err != nil {
return true, kube.TransientError(fmt.Errorf("set controller reference on pod %q: %w", pod.Name, err))
}
Expand All @@ -72,7 +71,7 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd
}

for _, pod := range diffed.Deletes() {
reporter.Info("Deleting pod", "pod", pod.Name)
reporter.Info("Deleting pod", "name", pod.Name)
if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err))
}
Expand All @@ -86,37 +85,54 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd
diffedUpdates := diffed.Updates()
if len(diffedUpdates) > 0 {
var (
podsWithStatus = pc.podFilter.PodsWithStatus(ctx, crd)
upgradePods = make(map[string]bool)
otherUpdates = make(map[string]*corev1.Pod)
rpcReachablePods = make(map[string]bool)
inSyncPods = make(map[string]bool)
deletedPods = make(map[string]bool)
rpcReachablePods = 0
inSyncPods = 0
)

for _, ps := range podsWithStatus {
if ps.Synced {
inSyncPods[ps.Pod.Name] = true
PodsLoop:
for _, ps := range syncInfo.Pods {
for _, existing := range pods.Items {
if existing.Name == ps.Pod {
if existing.DeletionTimestamp != nil {
continue PodsLoop
}
break
}
}

if ps.InSync != nil && *ps.InSync {
inSyncPods++
}
if ps.RPCReachable {
rpcReachablePods[ps.Pod.Name] = true
rpcReachable := ps.Error == nil
if rpcReachable {
rpcReachablePods++
}
for _, update := range diffedUpdates {
if ps.Pod.Name == update.Name {
if ps.AwaitingUpgrade {
if !ps.RPCReachable {
upgradePods[ps.Pod.Name] = true
reporter.Info("Deleting pod for version upgrade", "podName", ps.Pod.Name)
if ps.Pod == update.Name {
awaitingUpgrade := false
for _, existing := range pods.Items {
if existing.Name == ps.Pod {
if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image {
awaitingUpgrade = true
}
break
}
}
if awaitingUpgrade {
if !rpcReachable {
upgradePods[ps.Pod] = true
reporter.Info("Deleting pod for version upgrade", "name", ps.Pod)
// Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it.
if err := pc.client.Delete(ctx, ps.Pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod.Name, err))
if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod, err))
}
deletedPods[ps.Pod.Name] = true
} else {
otherUpdates[ps.Pod.Name] = ps.Pod
otherUpdates[ps.Pod] = update
}
} else {
otherUpdates[ps.Pod.Name] = ps.Pod
otherUpdates[ps.Pod] = update
}
break
}
Expand All @@ -125,9 +141,9 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd

// If we don't have any pods in sync, we are down anyways, so we can use the number of RPC reachable pods for computing the rollout,
// with the goal of recovering the pods as quickly as possible.
ready := len(inSyncPods)
ready := inSyncPods
if ready == 0 {
ready = len(rpcReachablePods)
ready = rpcReachablePods
}

numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready)
Expand All @@ -150,7 +166,6 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd
if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err))
}
deletedPods[podName] = true
updated++
if updated >= numUpdates {
// done for this round
Expand Down
Loading

0 comments on commit e0d1742

Please sign in to comment.