diff --git a/internal/beater/beater.go b/internal/beater/beater.go index efe1191e1b9..05549a5a45a 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -179,43 +179,12 @@ func (s *Runner) Run(ctx context.Context) error { } } - // Obtain the memory limit for the APM Server process. Certain config - // values will be sized according to the maximum memory set for the server. - var memLimitGB float64 - if cgroupReader := newCgroupReader(); cgroupReader != nil { - if limit, err := cgroupMemoryLimit(cgroupReader); err != nil { - s.logger.Warn(err) - } else { - memLimitGB = float64(limit) / 1024 / 1024 / 1024 - } - } - if limit, err := systemMemoryLimit(); err != nil { - s.logger.Warn(err) - } else { - var fallback bool - if memLimitGB <= 0 { - s.logger.Info("no cgroups detected, falling back to total system memory") - fallback = true - } - if memLimitGB > float64(limit) { - s.logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory") - fallback = true - } - if fallback { - // If no cgroup limit is set, return a fraction of the total memory - // to have a margin of safety for other processes. The fraction value - // of 0.625 is used to keep the 80% of the total system memory limit - // to be 50% of the total for calculating the number of decoders. - memLimitGB = float64(limit) / 1024 / 1024 / 1024 * 0.625 - } - } - if memLimitGB <= 0 { - memLimitGB = 1 - s.logger.Infof( - "failed to discover memory limit, default to %0.1fgb of memory", - memLimitGB, - ) - } + memLimitGB := processMemoryLimit( + newCgroupReader(), + sysMemoryReaderFunc(systemMemoryLimit), + s.logger, + ) + if s.config.MaxConcurrentDecoders == 0 { s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB) s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory", @@ -1035,6 +1004,49 @@ func queryClusterUUID(ctx context.Context, esClient *elasticsearch.Client) error return nil } +// processMemoryLimit obtains the memory limit for the APM Server process. Certain config +// values will be sized according to the maximum memory set for the server. +func processMemoryLimit(cgroups cgroupReader, sys sysMemoryReader, logger *logp.Logger) (memLimitGB float64) { + var memLimit uint64 + if cgroups != nil { + if limit, err := cgroupMemoryLimit(cgroups); err != nil { + logger.Warn(err) + } else { + memLimit = limit + } + } + if limit, err := sys.Limit(); err != nil { + logger.Warn(err) + } else { + var fallback bool + if memLimit <= 0 { + logger.Info("no cgroups detected, falling back to total system memory") + fallback = true + } + if memLimit > limit { + logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory") + fallback = true + } + if fallback { + // If no cgroup limit is set, return a fraction of the total memory + // to have a margin of safety for other processes. The fraction value + // of 0.625 is used to keep the 80% of the total system memory limit + // to be 50% of the total for calculating the number of decoders. + memLimit = uint64(float64(limit) * 0.625) + } + } + // Convert the memory limit to gigabytes to calculate the config values. + memLimitGB = float64(memLimit) / (1 << 30) + if memLimitGB <= 0 { + memLimitGB = 1 + logger.Infof( + "failed to discover memory limit, default to %0.1fgb of memory", + memLimitGB, + ) + } + return +} + type nopProcessingSupporter struct { } diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 4fec5b897f8..289ef975303 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -22,6 +22,7 @@ import ( "context" "encoding/json" "encoding/pem" + "errors" "fmt" "net/http" "net/http/httptest" @@ -40,6 +41,10 @@ import ( agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/opt" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2" "github.com/elastic/go-docappender/v2" ) @@ -279,3 +284,90 @@ func TestNewInstrumentation(t *testing.T) { assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels) assert.Equal(t, "Bearer secret", auth) } + +func TestProcessMemoryLimit(t *testing.T) { + l := logp.NewLogger("test") + const gb = 1 << 30 + for name, testCase := range map[string]struct { + cgroups cgroupReader + sys sysMemoryReader + wantMemLimitGB float64 + }{ + "LimitErrShouldResultInDefaultLimit": { + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 0, errors.New("test") + }), + wantMemLimitGB: 1, + }, + "NilCgroupsShouldResultInScaledSysLimit": { + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsErrShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{errv: errors.New("test")}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsV1OkLimitShouldResultInCgroupsV1OkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: gb}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 1, + }, + "CgroupsV1OverMaxLimitShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: 15 * gb}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsV2OkLimitShouldResultInCgroupsV1OkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(gb)}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 1, + }, + "CgroupsV2OverMaxLimitShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(15 * gb)}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + } { + t.Run(name, func(t *testing.T) { + memLimitGB := processMemoryLimit(testCase.cgroups, testCase.sys, l) + assert.Equal(t, testCase.wantMemLimitGB, memLimitGB) + }) + } +} diff --git a/internal/beater/memlimit_cgroup.go b/internal/beater/memlimit_cgroup.go index 577f5c24025..08de74363e8 100644 --- a/internal/beater/memlimit_cgroup.go +++ b/internal/beater/memlimit_cgroup.go @@ -27,7 +27,16 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) -func newCgroupReader() *cgroup.Reader { +// cgroupReader defines a short interface useful for testing purposes +// that provides a way to obtain cgroups process memory limit. +// Implemented by github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup Reader. +type cgroupReader interface { + CgroupsVersion(int) (cgroup.CgroupsVersion, error) + GetV1StatsForProcess(int) (*cgroup.StatsV1, error) + GetV2StatsForProcess(int) (*cgroup.StatsV2, error) +} + +func newCgroupReader() cgroupReader { cgroupOpts := cgroup.ReaderOptions{ RootfsMountpoint: resolve.NewTestResolver(""), IgnoreRootCgroups: true, @@ -37,13 +46,16 @@ func newCgroupReader() *cgroup.Reader { if isset { cgroupOpts.CgroupsHierarchyOverride = override } - reader, _ := cgroup.NewReaderOptions(cgroupOpts) + reader, err := cgroup.NewReaderOptions(cgroupOpts) + if err != nil { + return nil + } return reader } // Returns the cgroup maximum memory if running within a cgroup in GigaBytes, // otherwise, it returns 0 and an error. -func cgroupMemoryLimit(rdr *cgroup.Reader) (uint64, error) { +func cgroupMemoryLimit(rdr cgroupReader) (uint64, error) { pid := os.Getpid() vers, err := rdr.CgroupsVersion(pid) if err != nil { diff --git a/internal/beater/memlimit_cgroup_test.go b/internal/beater/memlimit_cgroup_test.go new file mode 100644 index 00000000000..c0c013ea523 --- /dev/null +++ b/internal/beater/memlimit_cgroup_test.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "errors" + "testing" + + "github.com/elastic/elastic-agent-libs/opt" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2" + + "github.com/stretchr/testify/assert" +) + +func TestCgroupMemoryLimit(t *testing.T) { + err := errors.New("test") + for name, testCase := range map[string]struct { + cgroups cgroupReader + wantErr bool + wantLimit uint64 + }{ + "CgroupsVersionErrShouldResultInError": { + cgroups: mockCgroupReader{errv: err}, + wantErr: true, + }, + "CgroupsInvalidVersionShouldResultInError": { + cgroups: mockCgroupReader{v: -1}, + wantErr: true, + }, + "CgroupsV1ErrShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, errv1: err}, + wantErr: true, + }, + "CgroupsV1NilLimitShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{}}, + wantErr: true, + }, + "CgroupsV1OkLimitShouldResultInOkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: 1000}, + }, + }, + }}, + wantLimit: 1000, + }, + "CgroupsV2ErrShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, errv2: err}, + wantErr: true, + }, + "CgroupsV2NilLimitShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{}}, + wantErr: true, + }, + "CgroupsV2OkLimitShouldResultInOkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(1000)}, + }, + }, + }}, + wantLimit: 1000, + }, + } { + t.Run(name, func(t *testing.T) { + limit, err := cgroupMemoryLimit(testCase.cgroups) + if testCase.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, testCase.wantLimit, limit) + }) + } +} + +type mockCgroupReader struct { + v cgroup.CgroupsVersion + v1 *cgroup.StatsV1 + v2 *cgroup.StatsV2 + errv, errv1, errv2 error +} + +func (r mockCgroupReader) CgroupsVersion(int) (cgroup.CgroupsVersion, error) { + return r.v, r.errv +} + +func (r mockCgroupReader) GetV1StatsForProcess(int) (*cgroup.StatsV1, error) { + return r.v1, r.errv1 +} + +func (r mockCgroupReader) GetV2StatsForProcess(int) (*cgroup.StatsV2, error) { + return r.v2, r.errv2 +} diff --git a/internal/beater/memlimit_system.go b/internal/beater/memlimit_system.go index abb2f8e2036..b2c02d3ba52 100644 --- a/internal/beater/memlimit_system.go +++ b/internal/beater/memlimit_system.go @@ -17,9 +17,20 @@ package beater -import ( - "github.com/elastic/go-sysinfo" -) +import "github.com/elastic/go-sysinfo" + +// sysMemoryReader defines an interface useful for testing purposes +// that provides a way to obtain the total system memory limit. +type sysMemoryReader interface { + Limit() (uint64, error) +} + +// sysMemoryReaderFunc func implementation of sysMemoryReader. +type sysMemoryReaderFunc func() (uint64, error) + +func (f sysMemoryReaderFunc) Limit() (uint64, error) { + return f() +} // systemMemoryLimit returns the total system memory. func systemMemoryLimit() (uint64, error) {