Skip to content

Commit

Permalink
feat: add local mode for YurtHub to enhance operational efficiency of…
Browse files Browse the repository at this point in the history
… K8s cluster in user's IDC (#2156)
  • Loading branch information
huangchenzhao committed Oct 29, 2024
1 parent 549c765 commit 3a4d21e
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 147 deletions.
82 changes: 54 additions & 28 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
type YurtHubConfiguration struct {
LBMode string
RemoteServers []*url.URL
TenantKasService string // ip:port, used in local mode
GCFrequency int
NodeName string
HeartbeatFailedRetry int
Expand Down Expand Up @@ -99,11 +100,12 @@ type YurtHubConfiguration struct {
CoordinatorStorageAddr string // ip:port
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
us, err := parseRemoteServers(options.ServerAddr)
us, err := parseRemoteServers(options.WorkingMode, options.ServerAddr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -173,42 +175,53 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
}

certMgr, err := certificatemgr.NewYurtHubCertManager(options, us)
if err != nil {
return nil, err
}
certMgr.Start()
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 4*time.Minute, true, func(ctx context.Context) (bool, error) {
isReady := certMgr.Ready()
if isReady {
return true, nil
// if yurthub is in local mode, certMgr and networkMgr are no need to start
if cfg.WorkingMode != util.WorkingModeLocal {
certMgr, err := certificatemgr.NewYurtHubCertManager(options, us)
if err != nil {
return nil, err

Check warning on line 185 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L185

Added line #L185 was not covered by tests
}
return false, nil
})
if err != nil {
return nil, fmt.Errorf("hub certificates preparation failed, %v", err)
}
cfg.CertManager = certMgr

if options.EnableDummyIf {
klog.V(2).Infof("create dummy network interface %s(%s) and init iptables manager", options.HubAgentDummyIfName, options.HubAgentDummyIfIP)
networkMgr, err := network.NewNetworkManager(options)
certMgr.Start()
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 4*time.Minute, true, func(ctx context.Context) (bool, error) {
isReady := certMgr.Ready()
if isReady {
return true, nil
}
return false, nil
})
if err != nil {
return nil, fmt.Errorf("could not create network manager, %w", err)
return nil, fmt.Errorf("hub certificates preparation failed, %v", err)

Check warning on line 196 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L196

Added line #L196 was not covered by tests
}
cfg.NetworkMgr = networkMgr
}
cfg.CertManager = certMgr

if err = prepareServerServing(options, certMgr, cfg); err != nil {
return nil, err
if options.EnableDummyIf {
klog.V(2).Infof("create dummy network interface %s(%s) and init iptables manager", options.HubAgentDummyIfName, options.HubAgentDummyIfIP)
networkMgr, err := network.NewNetworkManager(options)
if err != nil {
return nil, fmt.Errorf("could not create network manager, %w", err)

Check warning on line 204 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L201-L204

Added lines #L201 - L204 were not covered by tests
}
cfg.NetworkMgr = networkMgr

Check warning on line 206 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L206

Added line #L206 was not covered by tests
}

if err = prepareServerServing(options, certMgr, cfg); err != nil {
return nil, err

Check warning on line 210 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L210

Added line #L210 was not covered by tests
}
} else {

Check warning on line 212 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L212

Added line #L212 was not covered by tests
// if yurthub is in local mode, cfg.TenantKasService is used to represented as the service address (ip:port) of multiple apiserver daemonsets
cfg.TenantKasService = options.ServerAddr

Check warning on line 214 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L214

Added line #L214 was not covered by tests
}

return cfg, nil
}

func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
func parseRemoteServers(workingMode string, serverAddr string) ([]*url.URL, error) {
// if yurthub is in local mode, the format of serverAddr is ip:port, skip this function
if workingMode == string(util.WorkingModeLocal) {
return nil, nil

Check warning on line 223 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L223

Added line #L223 was not covered by tests
}
if serverAddr == "" {
return make([]*url.URL, 0), fmt.Errorf("--server-addr should be set for hub agent")
}
Expand Down Expand Up @@ -242,7 +255,12 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
func createClientAndSharedInformers(options *options.YurtHubOptions) (kubernetes.Interface, informers.SharedInformerFactory, dynamicinformer.DynamicSharedInformerFactory, error) {
var kubeConfig *rest.Config
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), "")
// If yurthub is in local mode, create kubeconfig for host control plane to prepare informerFactory.
if util.WorkingMode(options.WorkingMode) == util.WorkingModeLocal {
kubeConfig, err = clientcmd.BuildConfigFromFlags(fmt.Sprintf("http://%s", options.HostControlPlaneAddr), "")

Check warning on line 260 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L260

Added line #L260 was not covered by tests
} else {
kubeConfig, err = clientcmd.BuildConfigFromFlags(fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), "")
}
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -273,7 +291,7 @@ func createClientAndSharedInformers(options *options.YurtHubOptions) (kubernetes
return client, informers.NewSharedInformerFactory(client, 24*time.Hour), dynamicInformerFactory, nil
}

// registerInformers reconstruct configmap/secret/pod informers
// registerInformers reconstruct configmap/secret/pod/endpoints informers
func registerInformers(options *options.YurtHubOptions,
informerFactory informers.SharedInformerFactory,
workingMode util.WorkingMode,
Expand Down Expand Up @@ -320,6 +338,14 @@ func registerInformers(options *options.YurtHubOptions,
return informer
}
informerFactory.InformerFor(&corev1.Service{}, newServiceInformer)

// endpoints informer is used in local working mode
if workingMode == util.WorkingModeLocal {
newEndpointsInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return coreinformers.NewFilteredEndpointsInformer(client, "kube-public", resyncPeriod, nil, nil)

Check warning on line 345 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L344-L345

Added lines #L344 - L345 were not covered by tests
}
informerFactory.InformerFor(&corev1.Endpoints{}, newEndpointsInformer)

Check warning on line 347 in cmd/yurthub/app/config/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/config/config.go#L347

Added line #L347 was not covered by tests
}
}

func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.YurtCertificateManager, cfg *YurtHubConfiguration) error {
Expand Down
48 changes: 28 additions & 20 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type YurtHubOptions struct {
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
HostControlPlaneAddr string
DiskCachePath string
EnableResourceFilter bool
DisabledResourceFilters []string
Expand Down Expand Up @@ -149,30 +150,36 @@ func (options *YurtHubOptions) Validate() error {
return fmt.Errorf("server-address is empty")
}

if options.BootstrapMode != certificate.KubeletCertificateBootstrapMode {
if len(options.JoinToken) == 0 && len(options.BootstrapFile) == 0 {
return fmt.Errorf("bootstrap token and bootstrap file are empty, one of them must be set")
if options.WorkingMode != string(util.WorkingModeLocal) {
if options.BootstrapMode != certificate.KubeletCertificateBootstrapMode {
if len(options.JoinToken) == 0 && len(options.BootstrapFile) == 0 {
return fmt.Errorf("bootstrap token and bootstrap file are empty, one of them must be set")
}
}
}

if !util.IsSupportedLBMode(options.LBMode) {
return fmt.Errorf("lb mode(%s) is not supported", options.LBMode)
}
if !util.IsSupportedLBMode(options.LBMode) {
return fmt.Errorf("lb mode(%s) is not supported", options.LBMode)
}

if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}
if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}

if err := options.verifyDummyIP(); err != nil {
return fmt.Errorf("dummy ip %s is not invalid, %w", options.HubAgentDummyIfIP, err)
}
if err := options.verifyDummyIP(); err != nil {
return fmt.Errorf("dummy ip %s is not invalid, %w", options.HubAgentDummyIfIP, err)
}

if len(options.HubAgentDummyIfName) > 15 {
return fmt.Errorf("dummy name %s length should not be more than 15", options.HubAgentDummyIfName)
}
if len(options.HubAgentDummyIfName) > 15 {
return fmt.Errorf("dummy name %s length should not be more than 15", options.HubAgentDummyIfName)

Check warning on line 173 in cmd/yurthub/app/options/options.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/options/options.go#L173

Added line #L173 was not covered by tests
}

if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification {
return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue")
if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification {
return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue")
}
} else {
if len(options.HostControlPlaneAddr) == 0 {
return fmt.Errorf("host-control-plane-address is empty")

Check warning on line 181 in cmd/yurthub/app/options/options.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/options/options.go#L179-L181

Added lines #L179 - L181 were not covered by tests
}
}

return nil
Expand All @@ -186,7 +193,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver")
fs.IntVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver")
fs.StringVar(&o.YurtHubNamespace, "namespace", o.YurtHubNamespace, "the namespace of YurtHub Server")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver,the format is: \"server1,server2,...\"")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver, the format is: \"server1,server2,...\"; when yurthub is in local mode, server-addr represents the service address of apiservers, the format is: \"ip:port\".")
fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent")
Expand All @@ -212,7 +219,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy")
fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response")
fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud, local).")
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.MarkDeprecated("enable-node-pool", "It is planned to be removed from OpenYurt in the future version, please use --enable-pool-service-topology instead")
Expand All @@ -225,6 +232,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Yurt-Coordinator etcd, in the format host:port")
bindFlags(&o.LeaderElection, fs)
fs.BoolVar(&o.EnablePoolServiceTopology, "enable-pool-service-topology", o.EnablePoolServiceTopology, "enable service topology feature in the node pool.")
fs.StringVar(&o.HostControlPlaneAddr, "host-control-plane-address", o.HostControlPlaneAddr, "the address (ip:port) of host kubernetes cluster that used for yurthub local mode.")
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
Expand Down
Loading

0 comments on commit 3a4d21e

Please sign in to comment.