Skip to content

Commit

Permalink
Gossip Router improvements (#1802)
Browse files Browse the repository at this point in the history
* Increase the probe timeout
* Allow setting the CPU and Memory resources

Co-authored-by: Dominika Vagnerova <[email protected]>
  • Loading branch information
pruivo and domiborges authored Aug 10, 2023
1 parent b354682 commit 5f676d4
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 19 deletions.
6 changes: 6 additions & 0 deletions api/v1/infinispan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ type DiscoverySiteSpec struct {
// Enables (default) or disables the Gossip Router pod and cross-site services
// +optional
LaunchGossipRouter *bool `json:"launchGossipRouter,omitempty"`
// Memory resource request for Gossip Router if enabled
// +optional
Memory string `json:"memory,omitempty"`
// CPU resource request for Gossip Router if enabled
// +optional
CPU string `json:"cpu,omitempty"`
}

// Specifies the discovery mode for cross-site configuration
Expand Down
29 changes: 29 additions & 0 deletions api/v1/infinispan_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,35 @@ func (i *Infinispan) validate() error {
log.Info(errMsg, "Request.Namespace", i.Namespace, "Request.Name", i.Name)
}

// validate Gossip Router resources requests
if i.HasSites() {
gr := i.Spec.Service.Sites.Local.Discovery
path := field.NewPath("spec").Child("service").Child("sites").Child("local").Child("discovery")
if gr.CPU != "" {
req, limit, err := gr.CpuResources()
if err != nil {
allErrs = append(allErrs, field.Invalid(path.Child("cpu"), gr.CPU, err.Error()))
}

if req.Cmp(limit) > 0 {
msg := fmt.Sprintf("CPU request '%s' exceeds limit '%s'", req.String(), limit.String())
allErrs = append(allErrs, field.Invalid(path.Child("cpu"), gr.CPU, msg))
}
}

if gr.Memory != "" {
req, limit, err := gr.MemoryResources()
if err != nil {
allErrs = append(allErrs, field.Invalid(path.Child("memory"), gr.Memory, err.Error()))
}

if req.Cmp(limit) > 0 {
msg := fmt.Sprintf("Memory request '%s' exceeds limit '%s'", req.String(), limit.String())
allErrs = append(allErrs, field.Invalid(path.Child("memory"), gr.Memory, msg))
}
}
}

return errorListToError(i, allErrs)
}

Expand Down
20 changes: 20 additions & 0 deletions api/v1/infinispan_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/infinispan/infinispan-operator/pkg/hash"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

// +kubebuilder:scaffold:imports
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -281,6 +282,21 @@ var _ = Describe("Infinispan Webhooks", func() {
Memory: "1Gi:5Gi",
CPU: "1000m:2000m",
},
Service: InfinispanServiceSpec{
Type: ServiceTypeDataGrid,
Sites: &InfinispanSitesSpec{
Local: InfinispanSitesLocalSpec{
Name: "site1",
Expose: CrossSiteExposeSpec{
Type: CrossSiteExposeTypeClusterIP,
},
Discovery: &DiscoverySiteSpec{
Memory: "1Gi:5Gi",
CPU: "1000m:2000m",
},
},
},
},
},
}

Expand All @@ -293,6 +309,10 @@ var _ = Describe("Infinispan Webhooks", func() {
metav1.CauseTypeFieldValueInvalid, "spec.configListener.cpu", "exceeds limit",
}, {
metav1.CauseTypeFieldValueInvalid, "spec.configListener.memory", "exceeds limit",
}, {
metav1.CauseTypeFieldValueInvalid, "spec.service.sites.local.discovery.cpu", "exceeds limit",
}, {
metav1.CauseTypeFieldValueInvalid, "spec.service.sites.local.discovery.memory", "exceeds limit",
}}...)
})

Expand Down
10 changes: 10 additions & 0 deletions api/v1/types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ func (spec *ConfigListenerSpec) MemoryResources() (requests resource.Quantity, l
return getRequestLimits(spec.Memory)
}

// CpuResources returns the CPU request and limit values to be used by by Gossip Router pod
func (spec *DiscoverySiteSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.CPU)
}

// MemoryResources returns the Memory request and limit values to be used by Gossip Router pod
func (spec *DiscoverySiteSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.Memory)
}

func getRequestLimits(str string) (requests resource.Quantity, limits resource.Quantity, err error) {
if str == "" {
err = fmt.Errorf("resource string cannot be empty")
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/infinispan.org_infinispans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,18 @@ spec:
description: DiscoverySiteSpec configures the corss-site
replication discovery
properties:
cpu:
description: CPU resource request for Gossip Router
if enabled
type: string
launchGossipRouter:
description: Enables (default) or disables the Gossip
Router pod and cross-site services
type: boolean
memory:
description: Memory resource request for Gossip Router
if enabled
type: string
type:
description: Configures the discovery mode for cross-site
replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ include::{topics}/proc_setting_up_cross_site_kube.adoc[leveloffset=+2]
endif::community[]
include::{topics}/proc_configuring_sites_automatically.adoc[leveloffset=+2]
include::{topics}/proc_configuring_sites_manually.adoc[leveloffset=+1]
include::{topics}/proc_allocating_cpu_memory_cross_site.adoc[leveloffset=+1]
include::{topics}/proc_disabling_gossip_router_cross_site.adoc[leveloffset=+1]
include::{topics}/ref_cross_site_resources.adoc[leveloffset=+1]
include::{topics}/proc_securing_cross_site_connections.adoc[leveloffset=+1]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[id='allocating-cpu-and-memory-cross-site_{context}']
= Allocating CPU and memory for Gossip router pod

[role="_abstract"]
Allocate CPU and memory resources to {brandname} Gossip router.

.Prerequisite
* Have Gossip router enabled. The `service.sites.local.discovery.launchGossipRouter` property must be set to `true`, which is the default value.

.Procedure

. Allocate the number of CPU units using the `service.sites.local.discovery.cpu` field.
. Allocate the amount of memory, in bytes, using the `service.sites.local.discovery.memory` field.
+
The `cpu` and `memory` fields have values in the format of `<limit>:<requests>`.
For example, `cpu: "2000m:1000m"` limits pods to a maximum of `2000m` of CPU and requests `1000m` of CPU for each pod at startup.
Specifying a single value sets both the limit and request.
+
. Apply your `Infinispan` CR.

[source,options="nowrap",subs=attributes+]
----
include::yaml/xsite_gossip_router_resources.yaml[]
----
8 changes: 7 additions & 1 deletion documentation/asciidoc/topics/ref_cross_site_resources.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ The following tables provides fields and descriptions for cross-site resources.
|Specifies the maximum number of pods that can send RELAY messages for cross-site replication. The default value is `1`.

|`service.sites.local.discovery.launchGossipRouter`
|If `false`, the cross-site services and the Gossip Router pod are not created in the local site. The default value is `true`.
|If `false`, the cross-site services and the Gossip router pod are not created in the local site. The default value is `true`.

|`service.sites.local.discovery.memory`
|Allocates the amount of memory in bytes. It uses the following format `<limit>:<requests>` (example `"2Gi:1Gi"`).

|`service.sites.local.discovery.cpu`
|Allocates the number of CPU units. It uses the following format `<limit>:<requests>` (example `"2000m:1000m"`).

|`service.sites.local.expose.type`
|Specifies the network service for cross-site replication. {brandname} clusters use this service to communicate and perform backup operations. You can set the value to `NodePort`, `LoadBalancer`, or `Route`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
spec:
service:
type: DataGrid
sites:
local:
name: LON
discovery:
launchGossipRouter: true
memory: "2Gi:1Gi"
cpu: "2000m:1000m"
21 changes: 14 additions & 7 deletions pkg/reconcile/pipeline/infinispan/handler/configure/xsite.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func getCrossSiteServiceHostPort(service *corev1.Service, ctx pipeline.Context,
}

func TransportTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
log := ctx.Log().WithName("xsite")
keyStoreSecret := &corev1.Secret{}
if err := ctx.Resources().Load(i.GetSiteTransportSecretName(), keyStoreSecret, pipeline.RetryOnErr); err != nil {
return
Expand All @@ -260,7 +261,7 @@ func TransportTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
password := string(keyStoreSecret.Data["password"])
alias := i.GetSiteTransportKeyStoreAlias()

if err := validaXSiteTLSKeyStore(keyStoreSecret.Name, keyStoreFileName, password, alias); err != nil {
if err := validateXSiteTLSKeyStore(keyStoreSecret.Name, keyStoreFileName, password, alias); err != nil {
ctx.Stop(err)
return
}
Expand All @@ -273,7 +274,13 @@ func TransportTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
Type: consts.GetWithDefault(string(keyStoreSecret.Data["type"]), "pkcs12"),
}

ctx.Log().Info("Transport TLS Configured.", "Keystore", keyStoreFileName, "Secret Name", keyStoreSecret.Name)
log.Info("Transport TLS Configured.", "Keystore", keyStoreFileName, "Secret Name", keyStoreSecret.Name)

// do not attempt to load the trust store secret if not configured
if i.GetSiteTrustoreSecretName() == "" {
log.Info("Truststore not configured.")
return
}

trustStoreSecret := &corev1.Secret{}
// Only configure Truststore if the Secret exists
Expand All @@ -286,11 +293,11 @@ func TransportTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
trustStoreFileName := i.GetSiteTrustStoreFileName()
password = string(trustStoreSecret.Data["password"])

if err := validaXSiteTLSTrustStore(trustStoreSecret.Name, trustStoreFileName, password); err != nil {
if err := validateXSiteTLSTrustStore(trustStoreSecret.Name, trustStoreFileName, password); err != nil {
ctx.Stop(err)
return
}
ctx.Log().Info("Found Truststore.", "Truststore", trustStoreFileName, "Secret Name", trustStoreSecret.ObjectMeta.Name)
log.Info("Found Truststore.", "Truststore", trustStoreFileName, "Secret Name", trustStoreSecret.ObjectMeta.Name)
configFiles.Transport.Truststore = &pipeline.Truststore{
File: trustStoreSecret.Data[trustStoreFileName],
Path: fmt.Sprintf("%s/%s", consts.SiteTrustStoreRoot, trustStoreFileName),
Expand All @@ -309,7 +316,7 @@ func GossipRouterTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
password := string(keyStoreSecret.Data["password"])
alias := i.GetSiteRouterKeyStoreAlias()

if err := validaXSiteTLSKeyStore(keyStoreSecret.Name, filename, password, alias); err != nil {
if err := validateXSiteTLSKeyStore(keyStoreSecret.Name, filename, password, alias); err != nil {
ctx.Stop(err)
return
}
Expand All @@ -335,7 +342,7 @@ func GossipRouterTLS(i *ispnv1.Infinispan, ctx pipeline.Context) {
}
}

func validaXSiteTLSKeyStore(secretName, filename, password, alias string) error {
func validateXSiteTLSKeyStore(secretName, filename, password, alias string) error {
if len(filename) == 0 {
return fmt.Errorf("filename is required for Keystore stored in Secret %s", secretName)
}
Expand All @@ -348,7 +355,7 @@ func validaXSiteTLSKeyStore(secretName, filename, password, alias string) error
return nil
}

func validaXSiteTLSTrustStore(secretName, filename, password string) error {
func validateXSiteTLSTrustStore(secretName, filename, password string) error {
if len(filename) == 0 {
return fmt.Errorf("filename is required for KeyStore stored in Secret %s", secretName)
}
Expand Down
61 changes: 50 additions & 11 deletions pkg/reconcile/pipeline/infinispan/handler/provision/gossiprouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@ func GossipRouter(i *ispnv1.Infinispan, ctx pipeline.Context) {
}

router.Labels = routerLabels

container := &corev1.Container{
Name: GossipRouterContainer,
Image: i.ImageName(),
Command: []string{"/opt/gossiprouter/bin/launch.sh"},
Args: args,
Env: []corev1.EnvVar{{Name: "ROUTER_JAVA_OPTIONS", Value: i.Spec.Container.RouterExtraJvmOpts}},
Ports: containerPorts,
LivenessProbe: TcpProbe(probePort, 5, 5, 0, 1, 60),
ReadinessProbe: TcpProbe(probePort, 5, 5, 0, 1, 60),
StartupProbe: TcpProbe(probePort, 15, 1, 1, 1, 60),
}

if podResources, err := gossipRouterPodResources(i.Spec.Service.Sites.Local.Discovery); err != nil {
return err
} else if podResources != nil {
container.Resources = *podResources
}

router.Spec = appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: routerLabels,
Expand All @@ -141,21 +160,12 @@ func GossipRouter(i *ispnv1.Infinispan, ctx pipeline.Context) {
Annotations: routerAnnotations,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: GossipRouterContainer,
Image: i.ImageName(),
Command: []string{"/opt/gossiprouter/bin/launch.sh"},
Args: args,
Env: []corev1.EnvVar{{Name: "ROUTER_JAVA_OPTIONS", Value: i.Spec.Container.RouterExtraJvmOpts}},
Ports: containerPorts,
LivenessProbe: TcpProbe(probePort, 5, 5, 0, 1, 60),
ReadinessProbe: TcpProbe(probePort, 5, 5, 0, 1, 60),
StartupProbe: TcpProbe(probePort, 5, 1, 1, 1, 60),
}},
Containers: []corev1.Container{*container},
},
},
Replicas: replicas,
}

if addKeystoreVolume {
AddSecretVolume(i.GetSiteRouterSecretName(), SiteRouterKeystoreVolumeName, consts.SiteRouterKeyStoreRoot, &router.Spec.Template.Spec, GossipRouterContainer)
}
Expand Down Expand Up @@ -203,3 +213,32 @@ func GossipRouter(i *ispnv1.Infinispan, ctx pipeline.Context) {
i.SetCondition(ispnv1.ConditionGossipRouterReady, metav1.ConditionTrue, "")
})
}

func gossipRouterPodResources(spec *ispnv1.DiscoverySiteSpec) (*corev1.ResourceRequirements, error) {
if spec.CPU == "" && spec.Memory == "" {
return nil, nil
}

req := &corev1.ResourceRequirements{
Limits: corev1.ResourceList{},
Requests: corev1.ResourceList{},
}
if spec.Memory != "" {
memRequests, memLimits, err := spec.MemoryResources()
if err != nil {
return req, err
}
req.Requests[corev1.ResourceMemory] = memRequests
req.Limits[corev1.ResourceMemory] = memLimits
}

if spec.CPU != "" {
cpuRequests, cpuLimits, err := spec.CpuResources()
if err != nil {
return req, err
}
req.Requests[corev1.ResourceCPU] = cpuRequests
req.Limits[corev1.ResourceCPU] = cpuLimits
}
return req, nil
}
4 changes: 4 additions & 0 deletions test/e2e/xsite/xsite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func crossSiteSpec(name string, replicas int32, primarySite, backupSite, siteNam
Port: exposePort,
},
MaxRelayNodes: 2,
Discovery: &ispnv1.DiscoverySiteSpec{
Memory: "500Mi",
CPU: "500m",
},
},
Locations: []ispnv1.InfinispanSiteLocationSpec{
{
Expand Down

0 comments on commit 5f676d4

Please sign in to comment.