Skip to content

Commit

Permalink
[system/process] add support for mutlierr (#166)
Browse files Browse the repository at this point in the history
## What does this PR do?

- Previously, we weren't passing errors to the caller while monitoring
set of processes.
- With the recent introduction of the status reporter for metricsets, it
is impossible to change the status to degraded if such errors are not
passed to the caller.
- Fix this by passing errors to the caller. We also populate the process
related information to our best-effort.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added an entry in `CHANGELOG.md`

<!-- Recommended
Link related issues below. Insert the issue link or reference after the
word "Closes" if merging this should automatically close it.

- Closes #123
- Relates #123
- Requires #123
- Superseds #123
-->
- Closes
#164

---------

Co-authored-by: Craig MacKenzie <[email protected]>
  • Loading branch information
VihasMakwana and cmacknz authored Jul 23, 2024
1 parent a9ecc37 commit 3abd5c0
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 35 deletions.
76 changes: 76 additions & 0 deletions metric/system/process/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package process

import (
"errors"
"fmt"
"syscall"
"testing"

"github.com/stretchr/testify/require"
)

func TestErrors(t *testing.T) {
cases := []struct {
name string
check func(t *testing.T)
}{
{
name: "non fatal error",
check: func(t *testing.T) {
err := fmt.Errorf("Faced non-fatal error: %w", NonFatalErr{Err: syscall.EPERM})
require.True(t, isNonFatal(err), "Should be a non fatal error")
},
},
{
name: "non fatal error - unwrapped",
check: func(t *testing.T) {
err := fmt.Errorf("Faced non-fatal error: %w", syscall.EPERM)
require.True(t, isNonFatal(err), "Should be a non fatal error")
},
},
{
name: "non fatal error - hierarchy",
check: func(t *testing.T) {
err := fmt.Errorf("Faced non-fatal error: %w", syscall.EPERM)
err2 := errors.Join(toNonFatal(err))
require.True(t, isNonFatal(err2), "Should be a non fatal error")
},
},
{
name: "fatal error",
check: func(t *testing.T) {
err := fmt.Errorf("Faced fatal error: %w", errors.New("FATAL"))
err = toNonFatal(err) // shouldn't have any effect as it's a fatal error
require.Falsef(t, isNonFatal(err), "Should be a fatal error")
},
},
{
name: "fatal error - hierarchy",
check: func(t *testing.T) {
err := fmt.Errorf("Faced fatal error: %w", errors.New("FATAL"))
err2 := errors.Join(err)
require.Falsef(t, isNonFatal(err2), "Should be a fatal error")
},
},
}
for _, c := range cases {
t.Run(c.name, c.check)
}
}
10 changes: 10 additions & 0 deletions metric/system/process/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,13 @@ func GetProcCPUPercentage(s0, s1 ProcState) ProcState {
return s1

}

func toNonFatal(err error) error {
if err == nil {
return nil
}
if !isNonFatal(err) {
return err
}
return NonFatalErr{Err: err}
}
35 changes: 35 additions & 0 deletions metric/system/process/helpers_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build !windows

package process

import (
"errors"
"syscall"
)

func isNonFatal(err error) bool {
if err == nil {
return true
}
return (errors.Is(err, syscall.EACCES) ||
errors.Is(err, syscall.EPERM) ||
errors.Is(err, syscall.EINVAL) ||
errors.Is(err, NonFatalErr{}))
}
37 changes: 37 additions & 0 deletions metric/system/process/helpers_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build windows

package process

import (
"errors"
"syscall"

"golang.org/x/sys/windows"
)

func isNonFatal(err error) bool {
if err == nil {
return true
}
return errors.Is(err, windows.ERROR_ACCESS_DENIED) ||
errors.Is(err, syscall.EPERM) ||
errors.Is(err, syscall.EINVAL) ||
errors.Is(err, windows.ERROR_INVALID_PARAMETER) || errors.Is(err, NonFatalErr{})
}
55 changes: 34 additions & 21 deletions metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"sort"
"strings"
"syscall"
"time"

psutil "github.com/shirou/gopsutil/v3/process"
Expand Down Expand Up @@ -54,11 +55,11 @@ func ListStates(hostfs resolve.Resolver) ([]ProcState, error) {

// actually fetch the PIDs from the OS-specific code
_, plist, err := init.FetchPids()
if err != nil {
if err != nil && !isNonFatal(err) {
return nil, fmt.Errorf("error gathering PIDs: %w", err)
}

return plist, nil
return plist, toNonFatal(err)
}

// GetPIDState returns the state of a given PID
Expand Down Expand Up @@ -90,10 +91,10 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
}

// actually fetch the PIDs from the OS-specific code
pidMap, plist, err := procStats.FetchPids()
pidMap, plist, wrappedErr := procStats.FetchPids()

if err != nil {
return nil, nil, fmt.Errorf("error gathering PIDs: %w", err)
if wrappedErr != nil && !isNonFatal(wrappedErr) {
return nil, nil, fmt.Errorf("error gathering PIDs: %w", wrappedErr)
}
// We use this to track processes over time.
procStats.ProcsMap.SetMap(pidMap)
Expand Down Expand Up @@ -133,13 +134,13 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
rootEvents = append(rootEvents, rootMap)
}

return procs, rootEvents, nil
return procs, rootEvents, toNonFatal(wrappedErr)
}

// GetOne fetches process data for a given PID if its name matches the regexes provided from the host.
func (procStats *Stats) GetOne(pid int) (mapstr.M, error) {
pidStat, _, err := procStats.pidFill(pid, false)
if err != nil {
if err != nil && !isNonFatal(err) {
return nil, fmt.Errorf("error fetching PID %d: %w", pid, err)
}

Expand All @@ -151,9 +152,9 @@ func (procStats *Stats) GetOne(pid int) (mapstr.M, error) {
// GetOneRootEvent is the same as `GetOne()` but it returns an
// event formatted as expected by ECS
func (procStats *Stats) GetOneRootEvent(pid int) (mapstr.M, mapstr.M, error) {
pidStat, _, err := procStats.pidFill(pid, false)
if err != nil {
return nil, nil, fmt.Errorf("error fetching PID %d: %w", pid, err)
pidStat, _, wrappedErr := procStats.pidFill(pid, false)
if wrappedErr != nil && !isNonFatal(wrappedErr) {
return nil, nil, fmt.Errorf("error fetching PID %d: %w", pid, wrappedErr)
}

procStats.ProcsMap.SetPid(pid, pidStat)
Expand All @@ -165,7 +166,7 @@ func (procStats *Stats) GetOneRootEvent(pid int) (mapstr.M, mapstr.M, error) {

rootMap := processRootEvent(&pidStat)

return procMap, rootMap, err
return procMap, rootMap, toNonFatal(wrappedErr)
}

// GetSelf gets process info for the beat itself
Expand All @@ -180,34 +181,41 @@ func (procStats *Stats) GetSelf() (ProcState, error) {
}

pidStat, _, err := procStats.pidFill(self, false)
if err != nil {
if err != nil && !isNonFatal(err) {
return ProcState{}, fmt.Errorf("error fetching PID %d: %w", self, err)
}

procStats.ProcsMap.SetPid(self, pidStat)

return pidStat, nil
return pidStat, toNonFatal(err)
}

// pidIter wraps a few lines of generic code that all OS-specific FetchPids() functions must call.
// this also handles the process of adding to the maps/lists in order to limit the code duplication in all the OS implementations
func (procStats *Stats) pidIter(pid int, procMap ProcsMap, proclist []ProcState) (ProcsMap, []ProcState) {
func (procStats *Stats) pidIter(pid int, procMap ProcsMap, proclist []ProcState) (ProcsMap, []ProcState, error) {
status, saved, err := procStats.pidFill(pid, true)
var nonFatalErr error
if err != nil {
if !errors.Is(err, NonFatalErr{}) {
procStats.logger.Debugf("Error fetching PID info for %d, skipping: %s", pid, err)
return procMap, proclist
// While monitoring a set of processes, some processes might get killed after we get all the PIDs
// So, there's no need to capture "process not found" error.
if errors.Is(err, syscall.ESRCH) {
return procMap, proclist, nil
}
return procMap, proclist, err
}
procStats.logger.Debugf("Non fatal error fetching PID some info for %d, metrics are valid, but partial: %s", pid, err)
nonFatalErr = fmt.Errorf("non fatal error fetching PID some info for %d, metrics are valid, but partial: %w", pid, err)
procStats.logger.Debugf(err.Error())
}
if !saved {
procStats.logger.Debugf("Process name does not match the provided regex; PID=%d; name=%s", pid, status.Name)
return procMap, proclist
return procMap, proclist, nonFatalErr
}
procMap[pid] = status
proclist = append(proclist, status)

return procMap, proclist
return procMap, proclist, nonFatalErr
}

// NonFatalErr is returned when there was an error
Expand All @@ -232,13 +240,17 @@ func (c NonFatalErr) Is(other error) bool {
return is
}

func (c NonFatalErr) Unwrap() error {
return c.Err
}

// pidFill is an entrypoint used by OS-specific code to fill out a pid.
// This in turn calls various OS-specific code to fill out the various bits of PID data
// This is done to minimize the code duplication between different OS implementations
// The second return value will only be false if an event has been filtered out.
func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
// Fetch proc state so we can get the name for filtering based on user's filter.

var wrappedErr error
// OS-specific entrypoint, get basic info so we can at least run matchProcess
status, err := GetInfoForPid(procStats.Hostfs, pid)
if err != nil {
Expand All @@ -265,7 +277,8 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
if !errors.Is(err, NonFatalErr{}) {
return status, true, fmt.Errorf("FillPidMetrics: %w", err)
}
procStats.logger.Debugf("Non-fatal error fetching PID metrics for %d, metrics are valid, but partial: %s", pid, err)
wrappedErr = errors.Join(wrappedErr, fmt.Errorf("non-fatal error fetching PID metrics for %d, metrics are valid, but partial: %w", pid, err))
procStats.logger.Debugf(wrappedErr.Error())
}

if status.CPU.Total.Ticks.Exists() {
Expand Down Expand Up @@ -320,7 +333,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
}
}

return status, true, nil
return status, true, wrappedErr
}

// cacheCmdLine fills out Env and arg metrics from any stored previous metrics for the pid
Expand Down
6 changes: 4 additions & 2 deletions metric/system/process/process_aix.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,22 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) {
pid := C.pid_t(0)

procMap := make(ProcsMap, 0)
var wrappedErr err
var plist []ProcState
for {
// getprocs first argument is a void*
num, err := C.getprocs(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, nil, 0, &pid, 1)
if err != nil {
return nil, nil, fmt.Errorf("error fetching PIDs: %w", err)
}
procMap, plist = procStats.pidIter(int(info.pi_pid), procMap, plist)
procMap, plist, err = procStats.pidIter(int(pid), procMap, plist)
wrappedErr = errors.Join(wrappedErr, err)

if num == 0 {
break
}
}
return procMap, plist, nil
return procMap, plist, toNonFatal(wrappedErr)
}

// GetInfoForPid returns basic info for the process
Expand Down
3 changes: 2 additions & 1 deletion metric/system/process/process_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package process

import (
"fmt"
"os"
"os/user"
"runtime"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestSystemHostFromContainer(t *testing.T) {
validateProcResult(t, result)
} else {
_, roots, err := testStats.Get()
require.NoError(t, err)
require.True(t, isNonFatal(err), fmt.Sprintf("Fatal error: %s", err))

for _, proc := range roots {
t.Logf("proc: %d: %s", proc["process"].(map[string]interface{})["pid"],
Expand Down
Loading

0 comments on commit 3abd5c0

Please sign in to comment.