Skip to content

Commit

Permalink
add total thread count to process info returned by process.GetInfoForPid
Browse files Browse the repository at this point in the history
* On linux:
  -  it extends the information parsed from /proc/[PID]/stat to get the total thread count
  -  unit tests added, they mock /proc/[PID]/stat
* On darwin:
  - it uses the syscall proc_pidinfo
  - unit test only validates GetInfoForPid does not return an error and the total thread count ins't zero
* On windows:
  - it uses the Process Snapshotting APIs: https://learn.microsoft.com/en-us/previous-versions/windows/desktop/proc_snap/overview-of-process-snapshotting
  - unit test only validates GetInfoForPid does not return an error and the total thread count ins't
  - there is a manual test using a prebuilt C++ binary, its src is included, which calls the same windows APIs so the returned total thread count can be compared against the Go implementation.
  • Loading branch information
AndersonQ committed Sep 18, 2023
1 parent dc997bd commit c0d90b9
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 79 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [0.5.0]

### Added

- Collect the total thread count as part of the information collected by `metric/system/process.GetInfoForPid`.

### Fixed

- Fix thread safety in process code #43
- Fix process package build on AIX #54
- Ensure correct devID width in cgv2 #74
Expand Down
20 changes: 9 additions & 11 deletions metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//go:build (darwin && cgo) || freebsd || linux || windows || aix
// +build darwin,cgo freebsd linux windows aix

package process

Expand Down Expand Up @@ -74,7 +73,7 @@ func GetPIDState(hostfs resolve.Resolver, pid int) (PidState, error) {
if !exists {
return "", ProcNotExist
}
//GetInfoForPid will return the smallest possible dataset for a PID
// GetInfoForPid will return the smallest possible dataset for a PID
procState, err := GetInfoForPid(hostfs, pid)
if err != nil {
return "", fmt.Errorf("error getting state info for pid %d: %w", pid, err)
Expand All @@ -85,7 +84,7 @@ func GetPIDState(hostfs resolve.Resolver, pid int) (PidState, error) {

// Get fetches the configured processes and returns a list of formatted events and root ECS fields
func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
//If the user hasn't configured any kind of process glob, return
// If the user hasn't configured any kind of process glob, return
if len(procStats.Procs) == 0 {
return nil, nil, nil
}
Expand All @@ -112,18 +111,17 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
} else {
totalPhyMem = memStats.Total
}

}

//Format the list to the MapStr type used by the outputs
procs := []mapstr.M{}
rootEvents := []mapstr.M{}
// Format the list to the MapStr type used by the outputs
var procs []mapstr.M
var rootEvents []mapstr.M

for _, process := range plist {
process := process
// Add the RSS pct memory first
process.Memory.Rss.Pct = GetProcMemPercentage(process, totalPhyMem)
//Create the root event
// Create the root event
root := process.FormatForRoot()
rootMap := mapstr.M{}
_ = typeconv.Convert(&rootMap, root)
Expand Down Expand Up @@ -207,7 +205,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
}
}

//If we've passed the filter, continue to fill out the rest of the metrics
// If we've passed the filter, continue to fill out the rest of the metrics
status, err = FillPidMetrics(procStats.Hostfs, pid, status, procStats.isWhitelistedEnvVar)
if err != nil {
return status, true, fmt.Errorf("FillPidMetrics: %w", err)
Expand All @@ -216,7 +214,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
status.Cmdline = strings.Join(status.Args, " ")
}

//postprocess with cgroups and percentages
// postprocess with cgroups and percentages
last, ok := procStats.ProcsMap.GetPid(status.Pid.ValueOr(0))
status.SampleTime = time.Now()
if procStats.EnableCgroups {
Expand Down Expand Up @@ -343,7 +341,7 @@ func (procStats *Stats) includeTopProcesses(processes []ProcState) []ProcState {

// isWhitelistedEnvVar returns true if the given variable name is a match for
// the whitelist. If the whitelist is empty it returns false.
func (procStats Stats) isWhitelistedEnvVar(varName string) bool {
func (procStats *Stats) isWhitelistedEnvVar(varName string) bool {
if len(procStats.envRegexps) == 0 {
return false
}
Expand Down
7 changes: 3 additions & 4 deletions metric/system/process/process_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//go:build darwin || freebsd || linux || windows || aix || netbsd || openbsd
// +build darwin freebsd linux windows aix netbsd openbsd

package process

Expand All @@ -31,13 +30,13 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo/types"

sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo"
)

// ProcNotExist indicates that a process was not found.
var ProcNotExist = errors.New("process does not exist")

//ProcsMap is a convinence wrapper for the oft-used ideom of map[int]ProcState
// ProcsMap is a convinence wrapper for the oft-used ideom of map[int]ProcState
type ProcsMap map[int]ProcState

// ProcsTrack is a thread-safe wrapper for a process Stat object's internal map of processes.
Expand Down Expand Up @@ -109,7 +108,7 @@ type Stats struct {
host types.Host
}

//PidState are the constants for various PID states
// PidState are the constants for various PID states
type PidState string

var (
Expand Down
25 changes: 10 additions & 15 deletions metric/system/process/process_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) {
func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) {
info := C.struct_proc_taskallinfo{}

err := taskInfo(pid, &info)
if err != nil {
return ProcState{}, fmt.Errorf("could not read task for pid %d", pid)
size := C.int(unsafe.Sizeof(info))
ptr := unsafe.Pointer(&info)

// For docs, see the link below. Check the `proc_taskallinfo` struct, which
// is a composition of `proc_bsdinfo` and `proc_taskinfo`.
// https://opensource.apple.com/source/xnu/xnu-1504.3.12/bsd/sys/proc_info.h.auto.html
n := C.proc_pidinfo(C.int(pid), C.PROC_PIDTASKALLINFO, 0, ptr, size)
if n != size {
return ProcState{}, fmt.Errorf("could not read process info for pid %d: proc_pidinfo returned %d", int(n), pid)
}

status := ProcState{}
Expand All @@ -112,6 +118,7 @@ func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) {
status.Ppid = opt.IntWith(int(info.pbsd.pbi_ppid))
status.Pid = opt.IntWith(pid)
status.Pgid = opt.IntWith(int(info.pbsd.pbi_pgid))
status.NumThreads = opt.IntWith(int(info.ptinfo.pti_threadnum))

// Get process username. Fallback to UID if username is not available.
uid := strconv.Itoa(int(info.pbsd.pbi_uid))
Expand Down Expand Up @@ -224,18 +231,6 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,
return argv, exeName, envVars, nil
}

func taskInfo(pid int, info *C.struct_proc_taskallinfo) error {
size := C.int(unsafe.Sizeof(*info))
ptr := unsafe.Pointer(info)

n := C.proc_pidinfo(C.int(pid), C.PROC_PIDTASKALLINFO, 0, ptr, size)
if n != size {
return fmt.Errorf("could not read process info for pid %d", pid)
}

return nil
}

func sysctl(mib []C.int, old *byte, oldlen *uintptr,
new *byte, newlen uintptr) (err error) {
p0 := unsafe.Pointer(&mib[0])
Expand Down
36 changes: 36 additions & 0 deletions metric/system/process/process_darwin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package process

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestFetchNumThreads only checks some value is returned for NumThreads. However,
// it cannot verify the correctness of the value.
func TestFetchNumThreads(t *testing.T) {
got, err := GetInfoForPid(nil, os.Getpid())
require.NoError(t, err, "GetInfoForPid failed")

assert.NotZero(t, got.NumThreads, "numThreads should not be zero, try"+
"manually testing it")
}
52 changes: 34 additions & 18 deletions metric/system/process/process_linux_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//go:build freebsd || linux
// +build freebsd linux

package process

Expand Down Expand Up @@ -121,64 +120,81 @@ func FillPidMetrics(hostfs resolve.Resolver, pid int, state ProcState, filter fu
return state, fmt.Errorf("error getting metadata for pid %d: %w", pid, err)
}

//username
// username
state.Username, err = getUser(hostfs, pid)
if err != nil {
return state, fmt.Errorf("error creating username for pid %d: %w", pid, err)
}
return state, nil
}

// GetInfoForPid fetches the basic hostinfo from /proc/[PID]/stat
func GetInfoForPid(hostfs resolve.Resolver, pid int) (ProcState, error) {
path := hostfs.Join("proc", strconv.Itoa(pid), "stat")
data, err := ioutil.ReadFile(path)
// GetInfoForPid fetches and parses the process information of the process
// identified by pid from /proc/[PID]/stat
func GetInfoForPid(hostFS resolve.Resolver, pid int) (ProcState, error) {
path := hostFS.Join("proc", strconv.Itoa(pid), "stat")
data, err := os.ReadFile(path)
// Transform the error into a more sensible error in cases where the directory doesn't exist, i.e the process is gone
if err != nil {
if os.IsNotExist(err) {
return ProcState{}, syscall.ESRCH
}
return ProcState{}, fmt.Errorf("error reading procdir %s: %w", path, err)
}

state, err := parseProcStat(data)
state.Pid = opt.IntWith(pid)
if err != nil {
return state, fmt.Errorf("failed to parse infos for pid %d': %w", pid, err)
}

return state, nil
}

func parseProcStat(data []byte) (ProcState, error) {
const minFields = 36
state := ProcState{}

// Extract the comm value with is surrounded by parentheses.
lIdx := bytes.Index(data, []byte("("))
rIdx := bytes.LastIndex(data, []byte(")"))
if lIdx < 0 || rIdx < 0 || lIdx >= rIdx || rIdx+2 >= len(data) {
return state, fmt.Errorf("failed to extract comm for pid %d from '%v': %w", pid, string(data), err)
return state, fmt.Errorf("failed to extract comm from '%v'",
string(data))
}
state.Name = string(data[lIdx+1 : rIdx])

// Extract the rest of the fields that we are interested in.
fields := bytes.Fields(data[rIdx+2:])
if len(fields) <= 36 {
return state, fmt.Errorf("expected more stat fields for pid %d from '%v': %w", pid, string(data), err)
if len(fields) <= minFields {
return state, fmt.Errorf("expected at least %d stat fields from '%v'",
minFields, string(data))
}

// See https://man7.org/linux/man-pages/man5/proc.5.html for all fields.
interests := bytes.Join([][]byte{
fields[0], // state
fields[1], // ppid
fields[2], // pgrp
fields[0], // state
fields[1], // ppid
fields[2], // pgrp
fields[17], // num_threads
}, []byte(" "))

var procState string
var ppid, pgid int

_, err = fmt.Fscan(bytes.NewBuffer(interests),
var ppid, pgid, numThreads int
_, err := fmt.Fscan(bytes.NewBuffer(interests),
&procState,
&ppid,
&pgid,
&numThreads,
)
if err != nil {
return state, fmt.Errorf("failed to parse stat fields for pid %d from '%v': %w", pid, string(data), err)
return state, fmt.Errorf("failed to parse stat fields from '%v': %w",
string(data), err)
}

state.State = getProcState(procState[0])
state.Ppid = opt.IntWith(ppid)
state.Pgid = opt.IntWith(pgid)
state.Pid = opt.IntWith(pid)
state.NumThreads = opt.IntWith(numThreads)

return state, nil
}
Expand Down Expand Up @@ -384,7 +400,7 @@ func getFDStats(hostfs resolve.Resolver, pid int) (ProcFDInfo, error) {

pathFD := hostfs.Join("proc", strconv.Itoa(pid), "fd")
fds, err := ioutil.ReadDir(pathFD)
if errors.Is(err, os.ErrPermission) { //ignore permission errors, passthrough other data
if errors.Is(err, os.ErrPermission) { // ignore permission errors, passthrough other data
return state, nil
} else if err != nil {
return state, fmt.Errorf("error reading FD directory for pid %d: %w", pid, err)
Expand Down
42 changes: 39 additions & 3 deletions metric/system/process/process_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//go:build darwin || freebsd || linux || windows
// +build darwin freebsd linux windows

package process

Expand All @@ -26,10 +25,12 @@ import (
"strconv"
"testing"

"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

func TestFetchProcessFromOtherUser(t *testing.T) {
Expand Down Expand Up @@ -97,3 +98,38 @@ func TestFetchProcessFromOtherUser(t *testing.T) {
require.NoError(t, err)
t.Logf("got: %s", pidData.StringToPrint())
}

func TestGetInfoForPid_NumThreads(t *testing.T) {
want := ProcState{
Name: "elastic-agent",
State: "sleeping",
Pid: opt.IntWith(42),
Ppid: opt.IntWith(1),
Pgid: opt.IntWith(4067478),
NumThreads: opt.IntWith(26),
}

got, err := GetInfoForPid(resolve.NewTestResolver("testdata"), 42)
require.NoError(t, err, "GetInfoForPid returned an error when it should have succeeded")

assert.Equal(t, want, got)
}

func TestParseProcStat(t *testing.T) {
data := []byte("4067478 (elastic-agent) S 1 4067478 4067478 0 -1 4194560 151900 " +
"1587 0 0 8229 3989 0 1 32 12 26" +
" 0 200791940 2675654656 15487 18446744073709551615 1 1 0 0 0 0 0 0 2143420159 0 0 0 17 9 0 0 0 0 0 0 0 0 0 0 0 0 0")

want := ProcState{
Name: "elastic-agent",
State: getProcState(byte('S')),
Ppid: opt.IntWith(1),
Pgid: opt.IntWith(4067478),
NumThreads: opt.IntWith(26),
}

got, err := parseProcStat(data)
require.NoError(t, err, "parseProcStat returned and error")

assert.Equal(t, want, got, "")
}
Loading

0 comments on commit c0d90b9

Please sign in to comment.