diff --git a/CHANGELOG.md b/CHANGELOG.md index 51bc48127..30a8f056b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,15 +15,69 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Fixed -- Fix thread safety in process code #43 -- Fix process package build on AIX #54 -- Ensure correct devID width in cgv2 #74 +## [0.7.0] + +### Added + +- Collect the number of threads as part of the information collected per process +on `metric/system/process.GetInfoForPid`. + +### Fixed + +## [0.6.1] + +### Changed + +-Bump up go-sysinfo dependency version #86 + +## [0.6.0] + +### Added + +- Add user data to monitoring setup #80 + +### Changed +- Update host functions with `go-sysinfo` API changes #81 +- Move SetupInfoUserMetrics out of a cgo build constraint #82 +- Bumping up version for go-sysinfo dependency #84 + +### Fixed + +-Ensure correct devID width in cgv2 #74 + +## [0.4.6] + +### Fixed + +- Fix type issues for MIPS platforms +- metric/system/cgroup/cgv2: ensure Rdev is correct width + +## [0.4.5] + +### Added + +- add network data to process metrics +- add Process State API + +### Changed +- Go version 1.18.4 +- move exported helpers to _common.go file + +### Removed + +- remove hostfs checks + +### Fixed + +- fix(process): Typo in getargs error message +- fix(process): Make process package buildable on AIX ## [0.4.4] ### Fixed - Fix thread safety in process code #43 + ## [0.4.3] ## Fixed diff --git a/NOTICE.txt b/NOTICE.txt index 169c869d7..d5bc83116 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -645,11 +645,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-structform@v -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-sysinfo -Version: v1.10.0 +Version: v1.10.1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.10.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.10.1/LICENSE.txt: Apache License @@ -1935,11 +1935,11 @@ Contents of probable licence file $GOMODCACHE/go.elastic.co/go-licence-detector@ -------------------------------------------------------------------------------- Dependency : golang.org/x/sys -Version: v0.6.0 +Version: v0.7.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.6.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.7.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -2379,11 +2379,11 @@ Apache License -------------------------------------------------------------------------------- Dependency : github.com/docker/docker -Version: v20.10.24+incompatible +Version: v23.0.3+incompatible Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/docker/docker@v20.10.24+incompatible/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/docker/docker@v23.0.3+incompatible/LICENSE: Apache License @@ -5903,37 +5903,6 @@ DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- -Dependency : github.com/sirupsen/logrus -Version: v1.9.0 -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/sirupsen/logrus@v1.9.0/LICENSE: - -The MIT License (MIT) - -Copyright (c) 2014 Simon Eskildsen - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. - - -------------------------------------------------------------------------------- Dependency : github.com/spf13/cobra Version: v1.3.0 diff --git a/metric/system/process/process.go b/metric/system/process/process.go index d0cce2a7d..37bed52aa 100644 --- a/metric/system/process/process.go +++ b/metric/system/process/process.go @@ -16,7 +16,6 @@ // under the License. //go:build (darwin && cgo) || freebsd || linux || windows || aix -// +build darwin,cgo freebsd linux windows aix package process @@ -74,7 +73,7 @@ func GetPIDState(hostfs resolve.Resolver, pid int) (PidState, error) { if !exists { return "", ProcNotExist } - //GetInfoForPid will return the smallest possible dataset for a PID + // GetInfoForPid will return the smallest possible dataset for a PID procState, err := GetInfoForPid(hostfs, pid) if err != nil { return "", fmt.Errorf("error getting state info for pid %d: %w", pid, err) @@ -85,7 +84,7 @@ func GetPIDState(hostfs resolve.Resolver, pid int) (PidState, error) { // Get fetches the configured processes and returns a list of formatted events and root ECS fields func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) { - //If the user hasn't configured any kind of process glob, return + // If the user hasn't configured any kind of process glob, return if len(procStats.Procs) == 0 { return nil, nil, nil } @@ -112,18 +111,17 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) { } else { totalPhyMem = memStats.Total } - } - //Format the list to the MapStr type used by the outputs - procs := []mapstr.M{} - rootEvents := []mapstr.M{} + // Format the list to the MapStr type used by the outputs + var procs []mapstr.M + var rootEvents []mapstr.M for _, process := range plist { process := process // Add the RSS pct memory first process.Memory.Rss.Pct = GetProcMemPercentage(process, totalPhyMem) - //Create the root event + // Create the root event root := process.FormatForRoot() rootMap := mapstr.M{} _ = typeconv.Convert(&rootMap, root) @@ -207,7 +205,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) { } } - //If we've passed the filter, continue to fill out the rest of the metrics + // If we've passed the filter, continue to fill out the rest of the metrics status, err = FillPidMetrics(procStats.Hostfs, pid, status, procStats.isWhitelistedEnvVar) if err != nil { return status, true, fmt.Errorf("FillPidMetrics: %w", err) @@ -216,7 +214,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) { status.Cmdline = strings.Join(status.Args, " ") } - //postprocess with cgroups and percentages + // postprocess with cgroups and percentages last, ok := procStats.ProcsMap.GetPid(status.Pid.ValueOr(0)) status.SampleTime = time.Now() if procStats.EnableCgroups { @@ -343,7 +341,7 @@ func (procStats *Stats) includeTopProcesses(processes []ProcState) []ProcState { // isWhitelistedEnvVar returns true if the given variable name is a match for // the whitelist. If the whitelist is empty it returns false. -func (procStats Stats) isWhitelistedEnvVar(varName string) bool { +func (procStats *Stats) isWhitelistedEnvVar(varName string) bool { if len(procStats.envRegexps) == 0 { return false } diff --git a/metric/system/process/process_common.go b/metric/system/process/process_common.go index df0559e27..f2382edfd 100644 --- a/metric/system/process/process_common.go +++ b/metric/system/process/process_common.go @@ -16,7 +16,6 @@ // under the License. //go:build darwin || freebsd || linux || windows || aix || netbsd || openbsd -// +build darwin freebsd linux windows aix netbsd openbsd package process @@ -31,13 +30,13 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" "github.com/elastic/go-sysinfo/types" - sysinfo "github.com/elastic/go-sysinfo" + "github.com/elastic/go-sysinfo" ) // ProcNotExist indicates that a process was not found. var ProcNotExist = errors.New("process does not exist") -//ProcsMap is a convinence wrapper for the oft-used ideom of map[int]ProcState +// ProcsMap is a convenience wrapper for the oft-used idiom of map[int]ProcState type ProcsMap map[int]ProcState // ProcsTrack is a thread-safe wrapper for a process Stat object's internal map of processes. @@ -109,31 +108,31 @@ type Stats struct { host types.Host } -//PidState are the constants for various PID states +// PidState are the constants for various PID states type PidState string var ( - //Dead state, on linux this is both "x" and "X" + // Dead state, on linux this is both "x" and "X" Dead PidState = "dead" - //Running state + // Running state Running PidState = "running" - //Sleeping state + // Sleeping state Sleeping PidState = "sleeping" - //Idle state. + // Idle state. Idle PidState = "idle" - //DiskSleep is uninterruptible disk sleep + // DiskSleep is uninterruptible disk sleep DiskSleep PidState = "disk_sleep" - //Stopped state. + // Stopped state. Stopped PidState = "stopped" - //Zombie state. + // Zombie state. Zombie PidState = "zombie" - //WakeKill is a linux state only found on kernels 2.6.33-3.13 + // WakeKill is a linux state only found on kernels 2.6.33-3.13 WakeKill PidState = "wakekill" - //Waking is a linux state only found on kernels 2.6.33-3.13 + // Waking is a linux state only found on kernels 2.6.33-3.13 Waking PidState = "waking" - //Parked is a linux state. On the proc man page, it says it's available on 3.9-3.13, but it appears to still be in the code. + // Parked is a linux state. On the proc man page, it says it's available on 3.9-3.13, but it appears to still be in the code. Parked PidState = "parked" - //Unknown state + // Unknown state Unknown PidState = "unknown" ) @@ -163,7 +162,7 @@ func (procStats *Stats) Init() error { procStats.logger.Warnf("Getting host details: %v", err) } - //footcannon prevention + // footcannon prevention if procStats.Hostfs == nil { procStats.Hostfs = resolve.NewTestResolver("/") } diff --git a/metric/system/process/process_darwin.go b/metric/system/process/process_darwin.go index aa9c59ed7..0b189959c 100644 --- a/metric/system/process/process_darwin.go +++ b/metric/system/process/process_darwin.go @@ -85,9 +85,15 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) { func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) { info := C.struct_proc_taskallinfo{} - err := taskInfo(pid, &info) - if err != nil { - return ProcState{}, fmt.Errorf("could not read task for pid %d", pid) + size := C.int(unsafe.Sizeof(info)) + ptr := unsafe.Pointer(&info) + + // For docs, see the link below. Check the `proc_taskallinfo` struct, which + // is a composition of `proc_bsdinfo` and `proc_taskinfo`. + // https://opensource.apple.com/source/xnu/xnu-1504.3.12/bsd/sys/proc_info.h.auto.html + n := C.proc_pidinfo(C.int(pid), C.PROC_PIDTASKALLINFO, 0, ptr, size) + if n != size { + return ProcState{}, fmt.Errorf("could not read process info for pid %d: proc_pidinfo returned %d", int(n), pid) } status := ProcState{} @@ -112,6 +118,7 @@ func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) { status.Ppid = opt.IntWith(int(info.pbsd.pbi_ppid)) status.Pid = opt.IntWith(pid) status.Pgid = opt.IntWith(int(info.pbsd.pbi_pgid)) + status.NumThreads = opt.IntWith(int(info.ptinfo.pti_threadnum)) // Get process username. Fallback to UID if username is not available. uid := strconv.Itoa(int(info.pbsd.pbi_uid)) @@ -224,18 +231,6 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M, return argv, exeName, envVars, nil } -func taskInfo(pid int, info *C.struct_proc_taskallinfo) error { - size := C.int(unsafe.Sizeof(*info)) - ptr := unsafe.Pointer(info) - - n := C.proc_pidinfo(C.int(pid), C.PROC_PIDTASKALLINFO, 0, ptr, size) - if n != size { - return fmt.Errorf("could not read process info for pid %d", pid) - } - - return nil -} - func sysctl(mib []C.int, old *byte, oldlen *uintptr, new *byte, newlen uintptr) (err error) { p0 := unsafe.Pointer(&mib[0]) diff --git a/metric/system/process/process_linux_common.go b/metric/system/process/process_linux_common.go index f58d9cd3a..bc47adb49 100644 --- a/metric/system/process/process_linux_common.go +++ b/metric/system/process/process_linux_common.go @@ -16,7 +16,6 @@ // under the License. //go:build freebsd || linux -// +build freebsd linux package process @@ -121,7 +120,6 @@ func FillPidMetrics(hostfs resolve.Resolver, pid int, state ProcState, filter fu return state, fmt.Errorf("error getting metadata for pid %d: %w", pid, err) } - //username state.Username, err = getUser(hostfs, pid) if err != nil { return state, fmt.Errorf("error creating username for pid %d: %w", pid, err) @@ -129,56 +127,73 @@ func FillPidMetrics(hostfs resolve.Resolver, pid int, state ProcState, filter fu return state, nil } -// GetInfoForPid fetches the basic hostinfo from /proc/[PID]/stat -func GetInfoForPid(hostfs resolve.Resolver, pid int) (ProcState, error) { - path := hostfs.Join("proc", strconv.Itoa(pid), "stat") - data, err := ioutil.ReadFile(path) +// GetInfoForPid fetches and parses the process information of the process +// identified by pid from /proc/[PID]/stat +func GetInfoForPid(hostFS resolve.Resolver, pid int) (ProcState, error) { + path := hostFS.Join("proc", strconv.Itoa(pid), "stat") + data, err := os.ReadFile(path) // Transform the error into a more sensible error in cases where the directory doesn't exist, i.e the process is gone if err != nil { if os.IsNotExist(err) { return ProcState{}, syscall.ESRCH } return ProcState{}, fmt.Errorf("error reading procdir %s: %w", path, err) + } + state, err := parseProcStat(data) + state.Pid = opt.IntWith(pid) + if err != nil { + return state, fmt.Errorf("failed to parse information for pid %d': %w", pid, err) } + return state, nil +} + +func parseProcStat(data []byte) (ProcState, error) { + const minFields = 36 state := ProcState{} // Extract the comm value with is surrounded by parentheses. lIdx := bytes.Index(data, []byte("(")) rIdx := bytes.LastIndex(data, []byte(")")) if lIdx < 0 || rIdx < 0 || lIdx >= rIdx || rIdx+2 >= len(data) { - return state, fmt.Errorf("failed to extract comm for pid %d from '%v': %w", pid, string(data), err) + return state, fmt.Errorf("failed to extract 'comm' field from '%v'", + string(data)) } state.Name = string(data[lIdx+1 : rIdx]) // Extract the rest of the fields that we are interested in. fields := bytes.Fields(data[rIdx+2:]) - if len(fields) <= 36 { - return state, fmt.Errorf("expected more stat fields for pid %d from '%v': %w", pid, string(data), err) + if len(fields) <= minFields { + return state, fmt.Errorf("expected at least %d stat fields from '%v'", + minFields, string(data)) } + // See https://man7.org/linux/man-pages/man5/proc.5.html for all fields. interests := bytes.Join([][]byte{ - fields[0], // state - fields[1], // ppid - fields[2], // pgrp + fields[0], // state + fields[1], // ppid + fields[2], // pgrp + fields[17], // num_threads }, []byte(" ")) var procState string - var ppid, pgid int - - _, err = fmt.Fscan(bytes.NewBuffer(interests), + var ppid, pgid, numThreads int + _, err := fmt.Fscan(bytes.NewBuffer(interests), &procState, &ppid, &pgid, + &numThreads, ) if err != nil { - return state, fmt.Errorf("failed to parse stat fields for pid %d from '%v': %w", pid, string(data), err) + return state, fmt.Errorf("failed to parse stat fields from '%s': %w", + string(data), err) } + state.State = getProcState(procState[0]) state.Ppid = opt.IntWith(ppid) state.Pgid = opt.IntWith(pgid) - state.Pid = opt.IntWith(pid) + state.NumThreads = opt.IntWith(numThreads) return state, nil } @@ -384,7 +399,7 @@ func getFDStats(hostfs resolve.Resolver, pid int) (ProcFDInfo, error) { pathFD := hostfs.Join("proc", strconv.Itoa(pid), "fd") fds, err := ioutil.ReadDir(pathFD) - if errors.Is(err, os.ErrPermission) { //ignore permission errors, passthrough other data + if errors.Is(err, os.ErrPermission) { // ignore permission errors, passthrough other data return state, nil } else if err != nil { return state, fmt.Errorf("error reading FD directory for pid %d: %w", pid, err) diff --git a/metric/system/process/process_linux_darwin_test.go b/metric/system/process/process_linux_darwin_test.go new file mode 100644 index 000000000..0701b93b7 --- /dev/null +++ b/metric/system/process/process_linux_darwin_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build darwin || linux + +package process + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" +) + +func TestGetInfoForPid_numThreads(t *testing.T) { + want := 42 + + cmd := runThreads(t) + got, err := GetInfoForPid( + resolve.NewTestResolver("/"), cmd.Process.Pid) + require.NoError(t, err, "failed to GetInfoForPid") + + if !got.NumThreads.Exists() { + bs, err := json.Marshal(got) + if err != nil { + t.Logf("could not marshal ProcState: %v", err) + } + t.Fatalf("num_thread was not collected. Collected info: %s", bs) + } + + numThreads := got.NumThreads.ValueOr(-1) + assert.Equalf(t, want, numThreads, + "want %d threads, got %d", want, numThreads) +} diff --git a/metric/system/process/process_linux_test.go b/metric/system/process/process_linux_test.go index d24fbc660..401b0533c 100644 --- a/metric/system/process/process_linux_test.go +++ b/metric/system/process/process_linux_test.go @@ -16,7 +16,6 @@ // under the License. //go:build darwin || freebsd || linux || windows -// +build darwin freebsd linux windows package process @@ -26,10 +25,12 @@ import ( "strconv" "testing" - "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" - "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/opt" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) func TestFetchProcessFromOtherUser(t *testing.T) { @@ -97,3 +98,38 @@ func TestFetchProcessFromOtherUser(t *testing.T) { require.NoError(t, err) t.Logf("got: %s", pidData.StringToPrint()) } + +func TestGetInfoForPid_NumThreads(t *testing.T) { + want := ProcState{ + Name: "elastic-agent", + State: "sleeping", + Pid: opt.IntWith(42), + Ppid: opt.IntWith(1), + Pgid: opt.IntWith(4067478), + NumThreads: opt.IntWith(26), + } + + got, err := GetInfoForPid(resolve.NewTestResolver("testdata"), 42) + require.NoError(t, err, "GetInfoForPid returned an error when it should have succeeded") + + assert.Equal(t, want, got) +} + +func TestParseProcStat(t *testing.T) { + data := []byte("4067478 (elastic-agent) S 1 4067478 4067478 0 -1 4194560 151900 " + + "1587 0 0 8229 3989 0 1 32 12 26" + + " 0 200791940 2675654656 15487 18446744073709551615 1 1 0 0 0 0 0 0 2143420159 0 0 0 17 9 0 0 0 0 0 0 0 0 0 0 0 0 0") + + want := ProcState{ + Name: "elastic-agent", + State: getProcState(byte('S')), + Ppid: opt.IntWith(1), + Pgid: opt.IntWith(4067478), + NumThreads: opt.IntWith(26), + } + + got, err := parseProcStat(data) + require.NoError(t, err, "parseProcStat returned and error") + + assert.Equal(t, want, got, "") +} diff --git a/metric/system/process/process_test.go b/metric/system/process/process_test.go index fe1182b21..3237d57af 100644 --- a/metric/system/process/process_test.go +++ b/metric/system/process/process_test.go @@ -16,14 +16,18 @@ // under the License. //go:build darwin || freebsd || linux || windows -// +build darwin freebsd linux windows package process import ( + "bytes" + "fmt" "os" + "os/exec" + "path" "runtime" "sort" + "strings" "testing" "time" @@ -38,11 +42,63 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) +// BenchmarkGetProcess runs a benchmark of the GetProcess method with caching +// of the command line and environment variables. +func BenchmarkGetProcess(b *testing.B) { + stat, err := initTestResolver() + if err != nil { + b.Fatalf("Failed init: %s", err) + } + procs := make(map[int]mapstr.M, 1) + pid := os.Getpid() + b.ResetTimer() + for i := 0; i < b.N; i++ { + + process, err := stat.GetOne(pid) + if err != nil { + continue + } + + procs[pid] = process + } +} + +func BenchmarkGetTop(b *testing.B) { + stat, err := initTestResolver() + if err != nil { + b.Fatalf("Failed init: %s", err) + } + procs := make(map[int][]mapstr.M) + + for i := 0; i < b.N; i++ { + list, _, err := stat.Get() + if err != nil { + b.Fatalf("error: %s", err) + } + procs[i] = list + } +} + func TestGetState(t *testing.T) { - // Getpid is really the only way to test this in a cross-platform way - state, err := GetPIDState(resolve.NewTestResolver("/"), os.Getpid()) - require.NoError(t, err) - require.Equal(t, Running, state) + want := Running + pid := os.Getpid() + hostfs := resolve.NewTestResolver("/") + + var got PidState + var err error + test := func() bool { + // Getpid is really the only way to test this in a cross-platform way + got, err = GetPIDState(hostfs, pid) + if err != nil { + return false + } + + return want == got + } + + assert.Eventuallyf(t, test, + time.Second, 50*time.Millisecond, + "want process state %q, got %q. Last error: %v", want, got, err) } func TestGetOne(t *testing.T) { @@ -122,7 +178,7 @@ func TestNetworkFilter(t *testing.T) { } func TestFilter(t *testing.T) { - //The logic itself is os-independent, so we'll only test this on the platform least likly to have CI issues + // The logic itself is os-independent, so we'll only test this on the platform least likely to have CI issues if runtime.GOOS != "linux" { t.Skip("Run on Linux only") } @@ -288,7 +344,7 @@ func TestProcCpuPercentage(t *testing.T) { } newState := GetProcCPUPercentage(p1, p2) - //GetProcCPUPercentage wil return a number that varies based on the host, due to NumCPU() + // GetProcCPUPercentage wil return a number that varies based on the host, due to NumCPU() // So "un-normalize" it, then re-normalized with a constant. cpu := float64(runtime.NumCPU()) unNormalized := newState.CPU.Total.Norm.Pct.ValueOr(0) * cpu @@ -298,43 +354,6 @@ func TestProcCpuPercentage(t *testing.T) { assert.EqualValues(t, 3.459, newState.CPU.Total.Pct.ValueOr(0)) } -// BenchmarkGetProcess runs a benchmark of the GetProcess method with caching -// of the command line and environment variables. -func BenchmarkGetProcess(b *testing.B) { - stat, err := initTestResolver() - if err != nil { - b.Fatalf("Failed init: %s", err) - } - procs := make(map[int]mapstr.M, 1) - pid := os.Getpid() - b.ResetTimer() - for i := 0; i < b.N; i++ { - - process, err := stat.GetOne(pid) - if err != nil { - continue - } - - procs[pid] = process - } -} - -func BenchmarkGetTop(b *testing.B) { - stat, err := initTestResolver() - if err != nil { - b.Fatalf("Failed init: %s", err) - } - procs := make(map[int][]mapstr.M) - - for i := 0; i < b.N; i++ { - list, _, err := stat.Get() - if err != nil { - b.Fatalf("error: %s", err) - } - procs[i] = list - } -} - func TestIncludeTopProcesses(t *testing.T) { processes := []ProcState{ { @@ -555,6 +574,54 @@ func TestIncludeTopProcesses(t *testing.T) { } } +// runThreads run the threads binary for the current GOOS. +// +//go:generate docker run --rm -v ./testdata:/app --entrypoint g++ docker.elastic.co/beats-dev/golang-crossbuild:1.21.0-main -pthread -std=c++11 -o /app/threads /app/threads.cpp +//go:generate docker run --rm -v ./testdata:/app --entrypoint o64-clang++ docker.elastic.co/beats-dev/golang-crossbuild:1.21.0-darwin -pthread -std=c++11 -o /app/threads-darwin /app/threads.cpp +//go:generate docker run --rm -v ./testdata:/app --entrypoint x86_64-w64-mingw32-g++-posix docker.elastic.co/beats-dev/golang-crossbuild:1.21.0-main -pthread -std=c++11 -o /app/threads.exe /app/threads.cpp +func runThreads(t *testing.T) *exec.Cmd { + t.Helper() + + supportedPlatforms := []string{"linux/amd64", "darwin/amd64", "windows/amd64"} + + platform := fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH) + if !sliceContains(supportedPlatforms, platform) { + t.Skipf("not supported for %s/%s. Supported patforms: %v", + runtime.GOOS, runtime.GOARCH, supportedPlatforms) + } + + threads := path.Join("testdata", "threads") + + switch runtime.GOOS { + case "linux": + // nothing to do + case "darwin": + threads += "-darwin" + case "windows": + threads += ".exe" + } + + var b bytes.Buffer + cmd := exec.Command(threads) + cmd.Stdout = &b + cmd.Stderr = &b + + err := cmd.Start() + require.NoErrorf(t, err, "failed to start %q", threads) + + var log string + require.Eventually(t, + func() bool { + log += b.String() + return strings.Contains(log, "running") + }, + time.Second, 50*time.Millisecond, + "could not determine if %q is running. Output: %q", + threads, log) + + return cmd +} + func initTestResolver() (Stats, error) { err := logp.DevelopmentSetup() if err != nil { @@ -580,3 +647,13 @@ func initTestResolver() (Stats, error) { err = testConfig.Init() return testConfig, err } + +func sliceContains(s []string, e string) bool { + for _, v := range s { + if e == v { + return true + } + } + + return false +} diff --git a/metric/system/process/process_types.go b/metric/system/process/process_types.go index e2ab30c0e..27740b23c 100644 --- a/metric/system/process/process_types.go +++ b/metric/system/process/process_types.go @@ -29,12 +29,13 @@ import ( // ProcState is the main struct for process information and metrics. type ProcState struct { // Basic Process data - Name string `struct:"name,omitempty"` - State PidState `struct:"state,omitempty"` - Username string `struct:"username,omitempty"` - Pid opt.Int `struct:"pid,omitempty"` - Ppid opt.Int `struct:"ppid,omitempty"` - Pgid opt.Int `struct:"pgid,omitempty"` + Name string `struct:"name,omitempty"` + State PidState `struct:"state,omitempty"` + Username string `struct:"username,omitempty"` + Pid opt.Int `struct:"pid,omitempty"` + Ppid opt.Int `struct:"ppid,omitempty"` + Pgid opt.Int `struct:"pgid,omitempty"` + NumThreads opt.Int `struct:"num_threads,omitempty"` // Extended Process Data Args []string `struct:"args,omitempty"` diff --git a/metric/system/process/process_windows.go b/metric/system/process/process_windows.go index cbfba4ef6..a6fc9d35b 100644 --- a/metric/system/process/process_windows.go +++ b/metric/system/process/process_windows.go @@ -18,19 +18,19 @@ package process import ( + "errors" "fmt" "path/filepath" "syscall" + "unsafe" + + xsyswindows "golang.org/x/sys/windows" "github.com/elastic/elastic-agent-libs/opt" "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" "github.com/elastic/gosigar/sys/windows" ) -var ( - processQueryLimitedInfoAccess = windows.PROCESS_QUERY_LIMITED_INFORMATION -) - // FetchPids returns a map and array of pids func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) { pids, err := windows.EnumProcesses() @@ -40,7 +40,9 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) { procMap := make(ProcsMap, 0) var plist []ProcState - // This is probably the only implementation that doesn't benefit from our little fillPid callback system. We'll need to iterate over everything manually. + // This is probably the only implementation that doesn't benefit from our + // little fillPid callback system. We'll need to iterate over everything + // manually. for _, pid := range pids { procMap, plist = procStats.pidIter(int(pid), procMap, plist) } @@ -50,25 +52,65 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) { // GetInfoForPid returns basic info for the process func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) { - state := ProcState{} + var err error + var errs []error + state := ProcState{Pid: opt.IntWith(pid)} name, err := getProcName(pid) if err != nil { - return state, fmt.Errorf("error fetching name: %w", err) + errs = append(errs, fmt.Errorf("error fetching name: %w", err)) + } else { + state.Name = name } - state.Name = name - state.Pid = opt.IntWith(pid) // system/process doesn't need this here, but system/process_summary does. status, err := getPidStatus(pid) if err != nil { - return state, fmt.Errorf("error fetching status: %w", err) + errs = append(errs, fmt.Errorf("error fetching status: %w", err)) + } else { + state.State = status + } + + if numThreads, err := FetchNumThreads(pid); err != nil { + errs = append(errs, fmt.Errorf("error fetching num threads: %w", err)) + } else { + state.NumThreads = opt.IntWith(numThreads) + } + + if err := errors.Join(errs...); err != nil { + return state, fmt.Errorf("could not get all information for PID %d: %w", + pid, err) } - state.State = status return state, nil } +func FetchNumThreads(pid int) (int, error) { + pHandle, err := syscall.OpenProcess( + xsyswindows.PROCESS_QUERY_INFORMATION, + false, + uint32(pid)) + if err != nil { + return 0, fmt.Errorf("OpenProcess failed for PID %d: %w", pid, err) + } + defer syscall.CloseHandle(pHandle) + + var snapshotHandle syscall.Handle + err = PssCaptureSnapshot(pHandle, PSSCaptureThreads, 0, &snapshotHandle) + if err != nil { + return 0, fmt.Errorf("PssCaptureSnapshot failed: %w", err) + } + + info := PssThreadInformation{} + buffSize := unsafe.Sizeof(info) + err = PssQuerySnapshot(snapshotHandle, PssQueryThreadInformation, &info, uint32(buffSize)) + if err != nil { + return 0, fmt.Errorf("PssQuerySnapshot failed: %w", err) + } + + return int(info.ThreadsCaptured), nil +} + // FillPidMetrics is the windows implementation func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, _ func(string) bool) (ProcState, error) { user, err := getProcCredName(pid) @@ -107,8 +149,11 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, _ func(string) } func getProcArgs(pid int) ([]string, error) { - - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess|windows.PROCESS_VM_READ, false, uint32(pid)) + handle, err := syscall.OpenProcess( + windows.PROCESS_QUERY_LIMITED_INFORMATION| + windows.PROCESS_VM_READ, + false, + uint32(pid)) if err != nil { return nil, fmt.Errorf("OpenProcess failed: %w", err) } @@ -137,7 +182,7 @@ func getProcArgs(pid int) ([]string, error) { } func getProcTimes(pid int) (uint64, uint64, uint64, error) { - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess, false, uint32(pid)) + handle, err := syscall.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) if err != nil { return 0, 0, 0, fmt.Errorf("OpenProcess failed for pid=%v: %w", pid, err) } @@ -155,7 +200,11 @@ func getProcTimes(pid int) (uint64, uint64, uint64, error) { } func procMem(pid int) (uint64, uint64, error) { - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess|windows.PROCESS_VM_READ, false, uint32(pid)) + handle, err := syscall.OpenProcess( + windows.PROCESS_QUERY_LIMITED_INFORMATION| + windows.PROCESS_VM_READ, + false, + uint32(pid)) if err != nil { return 0, 0, fmt.Errorf("OpenProcess failed for pid=%v: %w", pid, err) } @@ -172,7 +221,7 @@ func procMem(pid int) (uint64, uint64, error) { // getProcName returns the process name associated with the PID. func getProcName(pid int) (string, error) { - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess, false, uint32(pid)) + handle, err := syscall.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) if err != nil { return "", fmt.Errorf("OpenProcess failed for pid=%v: %w", pid, err) } @@ -190,7 +239,7 @@ func getProcName(pid int) (string, error) { // getProcStatus returns the status of a process. func getPidStatus(pid int) (PidState, error) { - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess, false, uint32(pid)) + handle, err := syscall.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) if err != nil { return Unknown, fmt.Errorf("OpenProcess failed for pid=%v: %w", pid, err) } @@ -204,7 +253,7 @@ func getPidStatus(pid int) (PidState, error) { return Unknown, fmt.Errorf("GetExitCodeProcess failed for pid=%v: %w", pid, err) } - if exitCode == 259 { //still active + if exitCode == 259 { // still active return Running, nil } return Sleeping, nil @@ -212,7 +261,7 @@ func getPidStatus(pid int) (PidState, error) { // getParentPid returns the parent process ID of a process. func getParentPid(pid int) (int, error) { - handle, err := syscall.OpenProcess(processQueryLimitedInfoAccess, false, uint32(pid)) + handle, err := syscall.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) if err != nil { return 0, fmt.Errorf("OpenProcess failed for pid=%v: %w", pid, err) } diff --git a/metric/system/process/process_windows_test.go b/metric/system/process/process_windows_test.go new file mode 100644 index 000000000..e56efe8e4 --- /dev/null +++ b/metric/system/process/process_windows_test.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package process + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" +) + +func TestGetInfoForPid_numThreads(t *testing.T) { + const want = 42 + // On windows programs always start with more threads than the code + // is launching. It seems to be due to it loading dll/tables in parallel + // or due to the default thread pool. See the following for more info: + // https://stackoverflow.com/questions/42789199/why-there-are-three-unexpected-worker-threads-when-a-win32-console-application-s/42789684#42789684 + // + // Use the Process Explorer https://learn.microsoft.com/en-us/sysinternals/downloads/process-explorer + // to check the number of threads allocated to a process. To see the number + // of threads in, use you'd need to add the Threads column yourself. + // To add it: + // right click on one of the column titles -> select columns -> process performance tab -> select Threads + expected := 45 + + cmd := runThreads(t) + got, err := GetInfoForPid( + resolve.NewTestResolver("/"), cmd.Process.Pid) + require.NoError(t, err, "failed to GetInfoForPid") + + if !got.NumThreads.Exists() { + bs, err := json.Marshal(got) + if err != nil { + t.Logf("could not marshal ProcState: %v", err) + } + t.Fatalf("num_thread was not collected. Collected info: %s", bs) + } + + numThreads := got.NumThreads.ValueOr(-1) + if expected != numThreads { + // it might be an older Windows version or, by the time we got the num_threads, + // the program was indeed using only what it spawned + assert.Equalf(t, want, numThreads, + "got %d, want %d or %d. tl;dr: on Windows process starts with more "+ + "threads than they request, see the test code for details", + numThreads, want, expected) + } +} diff --git a/metric/system/process/syscall_windows.go b/metric/system/process/syscall_windows.go new file mode 100644 index 000000000..e7d965a41 --- /dev/null +++ b/metric/system/process/syscall_windows.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package process + +// Windows Process Snapshotting docs: +// - https://learn.microsoft.com/en-us/previous-versions/windows/desktop/proc_snap/overview-of-process-snapshotting +// PssCaptureSnapshot docs in https://learn.microsoft.com/en-us/windows/win32/api/processsnapshot/nf-processsnapshot-psscapturesnapshot +// PssQuerySnapshot docs in https://learn.microsoft.com/en-us/windows/win32/api/processsnapshot/nf-processsnapshot-pssquerysnapshot + +// Use golang.org/x/sys/windows/mkwinsyscall instead of adriansr/mksyscall +// below once https://github.com/golang/go/issues/42373 is fixed. +//go:generate go get github.com/adriansr/mkwinsyscall +//go:generate $GOPATH/bin/mkwinsyscall.exe -systemdll -output zsyscall_windows.go syscall_windows.go + +//sys PssCaptureSnapshot(processHandle syscall.Handle, captureFlags PSSCaptureFlags, threadContextFlags uint32, snapshotHandle *syscall.Handle) (err error) [failretval!=0] = kernel32.PssCaptureSnapshot +//sys PssQuerySnapshot(snapshotHandle syscall.Handle, informationClass uint32, buffer *PssThreadInformation, bufferLength uint32) (err error) [failretval!=0] = kernel32.PssQuerySnapshot + +// The following constants are PssQueryInformationClass as defined on +// https://learn.microsoft.com/en-us/windows/win32/api/processsnapshot/ne-processsnapshot-pss_query_information_class +const ( + PssQueryProcessInformation uint32 = iota + PssQueryVaCloneInformation + PssQueryAuxiliaryPagesInformation + PssQueryVaSpaceInformation + PssQueryHandleInformation + PssQueryThreadInformation + PssQueryHandleTraceInformation + PssQueryPerformanceCounters +) + +// PSSCaptureFlags from +// https://learn.microsoft.com/en-us/windows/win32/api/processsnapshot/ne-processsnapshot-pss_capture_flags +type PSSCaptureFlags uint32 + +const ( + PSSCaptureNone PSSCaptureFlags = 0x00000000 + PSSCaptureVAClone PSSCaptureFlags = 0x00000001 + PSSCaptureReserved00000002 PSSCaptureFlags = 0x00000002 + PSSCaptureHandles PSSCaptureFlags = 0x00000004 + PSSCaptureHandleNameInformation PSSCaptureFlags = 0x00000008 + PSSCaptureHandleBasicInformation PSSCaptureFlags = 0x00000010 + PSSCaptureHandleTypeSpecificInformation PSSCaptureFlags = 0x00000020 + PSSCaptureHandleTrace PSSCaptureFlags = 0x00000040 + PSSCaptureThreads PSSCaptureFlags = 0x00000080 + PSSCaptureThreadContext PSSCaptureFlags = 0x00000100 + PSSCaptureThreadContextExtended PSSCaptureFlags = 0x00000200 + PSSCaptureReserved00000400 PSSCaptureFlags = 0x00000400 + PSSCaptureVASpace PSSCaptureFlags = 0x00000800 + PSSCaptureVASpaceSectionInformation PSSCaptureFlags = 0x00001000 + PSSCaptureIPTTrace PSSCaptureFlags = 0x00002000 + PSSCaptureReserved00004000 PSSCaptureFlags = 0x00004000 + PSSCreateBreakawayOptional PSSCaptureFlags = 0x04000000 + PSSCreateBreakaway PSSCaptureFlags = 0x08000000 + PSSCreateForceBreakaway PSSCaptureFlags = 0x10000000 + PSSCreateUseVMAllocations PSSCaptureFlags = 0x20000000 + PSSCreateMeasurePerformance PSSCaptureFlags = 0x40000000 + PSSCreateReleaseSection PSSCaptureFlags = 0x80000000 +) + +type PssThreadInformation struct { + ThreadsCaptured uint32 + ContextLength uint32 +} diff --git a/metric/system/process/testdata/proc/42/stat b/metric/system/process/testdata/proc/42/stat new file mode 100644 index 000000000..d8d5c9daf --- /dev/null +++ b/metric/system/process/testdata/proc/42/stat @@ -0,0 +1 @@ +4067478 (elastic-agent) S 1 4067478 4067478 0 -1 4194560 151900 1587 0 0 8229 3989 0 1 32 12 26 0 200791940 2675654656 15487 18446744073709551615 1 1 0 0 0 0 0 0 2143420159 0 0 0 17 9 0 0 0 0 0 0 0 0 0 0 0 0 0" diff --git a/metric/system/process/testdata/threads b/metric/system/process/testdata/threads new file mode 100755 index 000000000..80917ef32 Binary files /dev/null and b/metric/system/process/testdata/threads differ diff --git a/metric/system/process/testdata/threads-darwin b/metric/system/process/testdata/threads-darwin new file mode 100755 index 000000000..e7d968cc9 Binary files /dev/null and b/metric/system/process/testdata/threads-darwin differ diff --git a/metric/system/process/testdata/threads.cpp b/metric/system/process/testdata/threads.cpp new file mode 100644 index 000000000..a23d4c137 --- /dev/null +++ b/metric/system/process/testdata/threads.cpp @@ -0,0 +1,18 @@ +#include +#include + +void importantStuff() { + std::this_thread::sleep_for(std::chrono::minutes(42)); +} + +int main(int argc, char* argv[]) { + std::vector threads; + for (int i = 0; i < 41; ++i) { + threads.push_back(std::thread(importantStuff)); + } + + std::cout << "running...\n" << std::flush; + importantStuff(); + + return 0; +} diff --git a/metric/system/process/testdata/threads.exe b/metric/system/process/testdata/threads.exe new file mode 100755 index 000000000..35aa95018 Binary files /dev/null and b/metric/system/process/testdata/threads.exe differ diff --git a/metric/system/process/zsyscall_windows.go b/metric/system/process/zsyscall_windows.go new file mode 100644 index 000000000..e961c7c49 --- /dev/null +++ b/metric/system/process/zsyscall_windows.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by 'go generate'; DO NOT EDIT. + +package process + +import ( + "syscall" + "unsafe" + + "golang.org/x/sys/windows" +) + +var _ unsafe.Pointer + +// Do the interface allocations only once for common +// Errno values. +const ( + errnoERROR_IO_PENDING = 997 +) + +var ( + errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING) + errERROR_EINVAL error = syscall.EINVAL +) + +// errnoErr returns common boxed Errno values, to prevent +// allocations at runtime. +func errnoErr(e syscall.Errno) error { + switch e { + case 0: + return errERROR_EINVAL + case errnoERROR_IO_PENDING: + return errERROR_IO_PENDING + } + // TODO: add more here, after collecting data on the common + // error values see on Windows. (perhaps when running + // all.bat?) + return e +} + +var ( + modkernel32 = windows.NewLazySystemDLL("kernel32.dll") + + procPssCaptureSnapshot = modkernel32.NewProc("PssCaptureSnapshot") + procPssQuerySnapshot = modkernel32.NewProc("PssQuerySnapshot") +) + +func PssCaptureSnapshot(processHandle syscall.Handle, captureFlags PSSCaptureFlags, threadContextFlags uint32, snapshotHandle *syscall.Handle) (err error) { + r1, _, e1 := syscall.Syscall6(procPssCaptureSnapshot.Addr(), 4, uintptr(processHandle), uintptr(captureFlags), uintptr(threadContextFlags), uintptr(unsafe.Pointer(snapshotHandle)), 0, 0) + if r1 != 0 { + err = errnoErr(e1) + } + return +} + +func PssQuerySnapshot(snapshotHandle syscall.Handle, informationClass uint32, buffer *PssThreadInformation, bufferLength uint32) (err error) { + r1, _, e1 := syscall.Syscall6(procPssQuerySnapshot.Addr(), 4, uintptr(snapshotHandle), uintptr(informationClass), uintptr(unsafe.Pointer(buffer)), uintptr(bufferLength), 0, 0) + if r1 != 0 { + err = errnoErr(e1) + } + return +}