Skip to content

Commit

Permalink
Merge pull request #1025 from tu1h/refactor_manifest_localartifact
Browse files Browse the repository at this point in the history
Refactor controllers of Manifest and Localartifact
  • Loading branch information
ErikJiang authored Nov 20, 2023
2 parents dc9e294 + b2d208c commit 4a99ec9
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 908 deletions.
4 changes: 4 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ const Group_vars_yml = "group_vars.yml"
const SSH_privatekey = "ssh-privatekey"

const KubeanClusterHasCompleted = "hasCompleted"

const KeySprayRelease = "kubean.io/sprayRelease"

const KeySprayCommit = "kubean.io/sprayCommit"
169 changes: 50 additions & 119 deletions pkg/controllers/infomanifest/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"reflect"
"sort"
"time"

"github.com/kubean-io/kubean/pkg/util"
Expand All @@ -30,8 +29,6 @@ import (

const Loop = time.Second * 30

const OriginLabel = "origin"

const LocalServiceConfigMap = "kubean-localservice"

type Controller struct {
Expand All @@ -47,77 +44,6 @@ func (c *Controller) Start(ctx context.Context) error {
return nil
}

// FetchLatestInfoManifest , get infomanifest exclude the global-infomanifest.
func (c *Controller) FetchLatestInfoManifest() (*manifestv1alpha1.Manifest, error) {
result, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
items := make([]*manifestv1alpha1.Manifest, 0)
for i := range result.Items {
item := result.Items[i]
if item.Name == constants.InfoManifestGlobal {
continue
}
items = append(items, &item)
}
if len(items) == 0 {
return nil, fmt.Errorf("not found the latest InfoManifest")
}
sort.Slice(items, func(i, j int) bool {
return items[i].CreationTimestamp.After(items[j].CreationTimestamp.Time)
})
return items[0], nil
}

func NewGlobalInfoManifest(latestInfoManifest *manifestv1alpha1.Manifest) *manifestv1alpha1.Manifest {
return &manifestv1alpha1.Manifest{
TypeMeta: metav1.TypeMeta{
Kind: "Manifest",
APIVersion: "kubean.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: constants.InfoManifestGlobal,
Labels: map[string]string{OriginLabel: latestInfoManifest.Name},
},
Spec: latestInfoManifest.Spec,
}
}

func (c *Controller) FetchGlobalInfoManifest() (*manifestv1alpha1.Manifest, error) {
global, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Get(context.Background(), constants.InfoManifestGlobal, metav1.GetOptions{})
if err != nil {
return nil, err
}
return global, nil
}

func (c *Controller) EnsureGlobalInfoManifestBeingLatest(latestInfoManifest *manifestv1alpha1.Manifest) (*manifestv1alpha1.Manifest, error) {
currentGlobalInfoManifest, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Get(context.Background(), constants.InfoManifestGlobal, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
// create global-infomanifest
global, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Create(context.Background(), NewGlobalInfoManifest(latestInfoManifest), metav1.CreateOptions{})
if err != nil {
return nil, err
}
return global, nil
}
if err != nil {
// other error
return nil, err
}
if currentGlobalInfoManifest.Labels == nil || len(currentGlobalInfoManifest.Labels) == 0 || currentGlobalInfoManifest.Labels[OriginLabel] != latestInfoManifest.Name {
currentGlobalInfoManifest.Labels = map[string]string{OriginLabel: latestInfoManifest.Name}
currentGlobalInfoManifest.Spec = latestInfoManifest.Spec
global, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Update(context.Background(), currentGlobalInfoManifest, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
return global, nil
}
return currentGlobalInfoManifest, nil
}

func (c *Controller) FetchLocalServiceCM(namespace string) (*corev1.ConfigMap, error) {
if localServiceCM, _ := c.ClientSet.CoreV1().ConfigMaps(namespace).Get(context.Background(), LocalServiceConfigMap, metav1.GetOptions{}); localServiceCM != nil {
return localServiceCM, nil
Expand Down Expand Up @@ -146,54 +72,62 @@ func (c *Controller) ParseConfigMapToLocalService(localServiceConfigMap *corev1.
return localService, nil
}

func (c *Controller) UpdateGlobalLocalService() {
// UpdateLocalService sync the content of local-service configmap into spec.
func (c *Controller) UpdateLocalService(manifests []manifestv1alpha1.Manifest) bool {
if c.IsOnlineENV() {
// if not airgap env , then do nothing and return
return
return false
}
localServiceConfigMap, err := c.FetchLocalServiceCM(util.GetCurrentNSOrDefault())
if err != nil {
klog.Warningf("ignoring %s", err.Error())
return
klog.Warningf("ignoring error: %s", err.Error())
return false
}
localService, err := c.ParseConfigMapToLocalService(localServiceConfigMap)
if err != nil {
klog.Warningf("ignoring %s", err.Error())
return
klog.Warningf("ignoring error: %s", err.Error())
return false
}
global, err := c.FetchGlobalInfoManifest()
if err != nil {
klog.Warningf("ignoring %s", err.Error())
return
}
if !reflect.DeepEqual(&global.Spec.LocalService, localService) {
global.Spec.LocalService = *localService
klog.Warningf("update the global InfoManifest LocalService")
_, err = c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Update(context.Background(), global, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("ignoring %s", err.Error())
klog.Warningf("ignoring error: %s", err.Error())
return false
}
for _, manifest := range manifests {
if !reflect.DeepEqual(&manifest.Spec.LocalService, localService) {
manifest.Spec.LocalService = *localService
klog.Infof("Update local-service for %s", manifest.Name)
_, err = c.InfoManifestClientSet.KubeanV1alpha1().Manifests().Update(context.Background(), &manifest, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("ignoring error: %s", err.Error())
}
return true
}
}
return false
}

// UpdateLocalAvailableImage update image infos to global-infomanifest-cr.
func (c *Controller) UpdateLocalAvailableImage() {
global, err := c.FetchGlobalInfoManifest()
if err != nil {
klog.Warningf("ignoring %s", err.Error())
return
}
imageRepo := "ghcr.m.daocloud.io"
if len(global.Spec.LocalService.GetGHCRImageRepo()) != 0 {
imageRepo = global.Spec.LocalService.GetGHCRImageRepo() // ghcr.io
}
newImageName := fmt.Sprintf("%s/kubean-io/spray-job:%s", imageRepo, global.Spec.KubeanVersion)
if global.Status.LocalAvailable.KubesprayImage != newImageName {
global.Status.LocalAvailable.KubesprayImage = newImageName
_, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().UpdateStatus(context.Background(), global, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("ignoring %s", err.Error())
return
// UpdateLocalAvailableImage update image infos into status.
func (c *Controller) UpdateLocalAvailableImage(manifests []manifestv1alpha1.Manifest) {
for _, manifest := range manifests {
imageRepo := "ghcr.m.daocloud.io"
if len(manifest.Spec.LocalService.GetGHCRImageRepo()) != 0 {
imageRepo = manifest.Spec.LocalService.GetGHCRImageRepo() // ghcr.io
}
var newImageName string
sprayRelease := manifest.ObjectMeta.Annotations[constants.KeySprayRelease]
sprayCommit := manifest.ObjectMeta.Annotations[constants.KeySprayCommit]
if sprayRelease != "" && sprayCommit != "" {
newImageName = fmt.Sprintf("%s/kubean-io/spray-job:%s-%s", imageRepo, sprayRelease, sprayCommit)
} else {
newImageName = fmt.Sprintf("%s/kubean-io/spray-job:%s", imageRepo, manifest.Spec.KubeanVersion)
}
if manifest.Status.LocalAvailable.KubesprayImage != newImageName {
manifest.Status.LocalAvailable.KubesprayImage = newImageName
_, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().UpdateStatus(context.Background(), &manifest, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("ignoring error: %s", err.Error())
return
}
}
}
}
Expand All @@ -212,22 +146,19 @@ func (c *Controller) IsOnlineENV() bool {
}

func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
if req.Name == constants.InfoManifestGlobal {
return controllerruntime.Result{}, nil
}
klog.Infof("InfoManifest Controller receive event %s", req.Name)
latestInfoManifest, err := c.FetchLatestInfoManifest()
manifestItems, err := c.InfoManifestClientSet.KubeanV1alpha1().Manifests().List(ctx, metav1.ListOptions{})
if err != nil {
klog.Warningf("%s ", err.Error())
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
klog.Error(err)
return controllerruntime.Result{RequeueAfter: Loop}, nil
}
_, err = c.EnsureGlobalInfoManifestBeingLatest(latestInfoManifest)
if err != nil {
klog.Warningf("%s ", err.Error())

if requeue := c.UpdateLocalService(manifestItems.Items); requeue {
return controllerruntime.Result{RequeueAfter: Loop}, nil
}
c.UpdateGlobalLocalService()
c.UpdateLocalAvailableImage()
c.UpdateLocalAvailableImage(manifestItems.Items)
return controllerruntime.Result{RequeueAfter: Loop}, nil
}

Expand Down
Loading

0 comments on commit 4a99ec9

Please sign in to comment.