Skip to content

Commit

Permalink
feat: adds periodic health reporting to the k8s inventory agent
Browse files Browse the repository at this point in the history
Addresses: Enterprise-2483

Signed-off-by: Bob Melander <[email protected]>
  • Loading branch information
bobmel committed Oct 9, 2024
1 parent 70a81f1 commit da3b770
Show file tree
Hide file tree
Showing 17 changed files with 2,440 additions and 15 deletions.
14 changes: 14 additions & 0 deletions anchore-k8s-inventory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ log:
# enable/disable checking for application updates on startup
check-for-app-update: true

anchore-registration:
# The id to register the agent as with Enterprise, so Enterprise can map the agent to its integration uuid.
# If left unspecified, the agent will attempt to set registration-id to the uid of the K8s Deployment for the agent.
# If that fails (e.g., if the agent is not deployed on K8s), the agent will generate a UUID to use as registration-id.
registration-id:
# The name that the agent should have. If left unspecified, the agent will attempt to set it to the name of the K8s
# Deployment for the agent. If that fails it will be empty.
integration-name:
# A short description for the agent
integration-description:

kubeconfig:
path:
cluster: docker-desktop
Expand Down Expand Up @@ -117,6 +128,9 @@ ignore-not-running: true
# Only respected if mode is periodic
polling-interval-seconds: 300

# Only respected if mode is periodic
health-report-interval-seconds: 60

# Batch Request configuration
inventory-report-limits:
namespaces: 0 # default of 0 means no limit per report
Expand Down
34 changes: 28 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package cmd
import (
"errors"
"fmt"
"os"
"runtime/pprof"

"github.com/anchore/k8s-inventory/pkg/healthreporter"
"github.com/anchore/k8s-inventory/pkg/integration"
"github.com/anchore/k8s-inventory/pkg/mode"
"github.com/anchore/k8s-inventory/pkg/reporter"
"os"
"runtime/pprof"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -45,9 +46,29 @@ var rootCmd = &cobra.Command{
os.Exit(1)
}

instance, err := integration.PerformRegistration(appConfig)
if err != nil {
os.Exit(1)
}

switch appConfig.RunMode {
case mode.PeriodicPolling:
pkg.PeriodicallyGetInventoryReport(appConfig)
neverDone := make(chan bool, 1)

var gatedReportInfo *healthreporter.GatedReportInfo
if !integration.HealthReportingDisabled {
gatedReportInfo = &healthreporter.GatedReportInfo{
AccountInventoryReports: make(healthreporter.AccountK8SInventoryReports, 0),
}

go healthreporter.PeriodicallySendHealthReport(appConfig, instance, gatedReportInfo)
} else {
gatedReportInfo = nil
}

go pkg.PeriodicallyGetInventoryReport(appConfig, gatedReportInfo)

<-neverDone
default:
reports, err := pkg.GetInventoryReports(appConfig)
if appConfig.Dev.ProfileCPU {
Expand All @@ -58,18 +79,19 @@ var rootCmd = &cobra.Command{
os.Exit(1)
}
anErrorOccurred := false
reportInfo := healthreporter.InventoryReportInfo{}
for account, reportsForAccount := range reports {
for count, report := range reportsForAccount {
log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount))
err = pkg.HandleReport(report, appConfig, account)
err = pkg.HandleReport(report, &reportInfo, appConfig, account)
if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) {
// Retry with default account
retryAccount := appConfig.AnchoreDetails.Account
if appConfig.AccountRouteByNamespaceLabel.DefaultAccount != "" {
retryAccount = appConfig.AccountRouteByNamespaceLabel.DefaultAccount
}
log.Warnf("Error sending to Anchore Account %s, sending to default account", account)
err = pkg.HandleReport(report, appConfig, retryAccount)
err = pkg.HandleReport(report, &reportInfo, appConfig, retryAccount)
}
if err != nil {
log.Errorf("Failed to handle Image Results: %+v", err)
Expand Down
250 changes: 250 additions & 0 deletions internal/anchore/anchoreclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package anchore

import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"github.com/anchore/k8s-inventory/internal/config"
"github.com/anchore/k8s-inventory/internal/log"
"github.com/anchore/k8s-inventory/internal/tracker"
"github.com/h2non/gock"
"io"
"net/http"
"net/url"
"os"
"strings"
"syscall"
"time"
)

type ControllerErrorDetails struct {
Type string `json:"type"`
Title string `json:"title"`
Detail string `json:"detail"`
Status int `json:"status"`
}

type APIErrorDetails struct {
Message string `json:"message"`
Detail map[string]interface{} `json:"detail"`
HTTPCode int `json:"httpcode"`
}

type APIClientError struct {
HTTPStatusCode int
Message string
Path string
Method string
Body *[]byte
APIErrorDetails *APIErrorDetails
ControllerErrorDetails *ControllerErrorDetails
}

func (e *APIClientError) Error() string {
return fmt.Sprintf("API errorMsg(%d): %s Path: %q %v %v", e.HTTPStatusCode, e.Message, e.Path,
e.APIErrorDetails, e.ControllerErrorDetails)
}

func Post(requestBody []byte, id string, path string, anchoreDetails config.AnchoreInfo, operation string) (*[]byte, error) {
defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation))

log.Debugf("Performing %s to Anchore using endpoint: %s", operation, strings.Replace(path, "{{id}}", id, 1))

client := getClient(anchoreDetails)

anchoreURL, err := getURL(anchoreDetails, path, id)
if err != nil {
return nil, err
}

request, err := getPostRequest(anchoreDetails, anchoreURL, requestBody, operation)
if err != nil {
return nil, err
}

return doPost(client, request, operation)
}

func getClient(anchoreDetails config.AnchoreInfo) *http.Client {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: anchoreDetails.HTTP.Insecure},
} // #nosec G402

client := &http.Client{
Transport: tr,
Timeout: time.Duration(anchoreDetails.HTTP.TimeoutSeconds) * time.Second,
}
gock.InterceptClient(client) // Required to use gock for testing custom client

return client
}

func getURL(anchoreDetails config.AnchoreInfo, path string, id string) (string, error) {
anchoreURL, err := url.Parse(anchoreDetails.URL)
if err != nil {
return "", fmt.Errorf("failed to build path (%s) url: %w", path, err)
}

anchoreURL.Path += strings.Replace(path, "{{id}}", id, 1)
return anchoreURL.String(), nil
}

func getPostRequest(anchoreDetails config.AnchoreInfo, endpointURL string, reqBody []byte, operation string) (*http.Request, error) {
request, err := http.NewRequest("POST", endpointURL, bytes.NewBuffer(reqBody))
if err != nil {
return nil, fmt.Errorf("failed to prepare %s request to Anchore: %w", operation, err)
}

request.SetBasicAuth(anchoreDetails.User, anchoreDetails.Password)
request.Header.Set("Content-Type", "application/json")
request.Header.Set("x-anchore-account", anchoreDetails.Account)
return request, nil
}

func doPost(client *http.Client, request *http.Request, operation string) (*[]byte, error) {
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()

err = checkHTTPErrors(response, operation)
if err != nil {
return nil, err
}

responseBody, _ := getBody(response, operation)
return responseBody, nil
}

func checkHTTPErrors(response *http.Response, operation string) error {
switch {
case response.StatusCode >= 400 && response.StatusCode <= 599:
msg := fmt.Sprintf("%s response from Anchore (during %s)", response.Status, operation)
log.Errorf(msg)

respBody, _ := getBody(response, operation)
if respBody == nil {
return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method,
Body: nil, HTTPStatusCode: response.StatusCode}
}

// Depending on where an errorMsg is discovered during request processing on the server, the
// errorMsg information in the response will be either an APIErrorDetails or a ControllerErrorDetails
apiError := APIErrorDetails{}
err := json.Unmarshal(*respBody, &apiError)
if err == nil {
return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method,
Body: nil, HTTPStatusCode: response.StatusCode, APIErrorDetails: &apiError}
}

controllerError := ControllerErrorDetails{}
err = json.Unmarshal(*respBody, &controllerError)
if err == nil {
return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method,
Body: nil, HTTPStatusCode: response.StatusCode, ControllerErrorDetails: &controllerError}
}

return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method,
Body: nil, HTTPStatusCode: response.StatusCode}
case response.StatusCode < 200 || response.StatusCode > 299:
msg := fmt.Sprintf("failed to perform %s to Anchore: %+v", operation, response)
log.Debugf(msg)
return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method,
Body: nil, HTTPStatusCode: response.StatusCode}
}
return nil
}

func getBody(response *http.Response, operation string) (*[]byte, error) {
responseBody, err := io.ReadAll(response.Body)
if err != nil {
errMsg := fmt.Sprintf("failed to read %s response body from Anchore:", operation)
log.Debugf("%s %v", operation, errMsg)
return nil, fmt.Errorf("%s %w", errMsg, err)
}

// Check we received a valid JSON response from Anchore, this will help catch
// any redirect responses where it returns HTML login pages e.g. Enterprise
// running behind cloudflare where a login page is returned with the status 200
if len(responseBody) > 0 && !json.Valid(responseBody) {
log.Debugf("Anchore %s response body: %s", operation, string(responseBody))
return nil, fmt.Errorf("%s response from Anchore is not valid json: %+v", operation, response)
}
return &responseBody, nil
}

func ServerIsOffline(err error) bool {
if os.IsTimeout(err) {
return true
}

if errors.Is(err, syscall.ECONNREFUSED) {
return true
}

if errors.Is(err, syscall.ECONNRESET) {
return true
}

var apiClientError *APIClientError
if errors.As(err, &apiClientError) {
if apiClientError.HTTPStatusCode == http.StatusBadGateway ||
apiClientError.HTTPStatusCode == http.StatusServiceUnavailable ||
apiClientError.HTTPStatusCode == http.StatusGatewayTimeout {
return true
}
}

return false
}

func ServerLacksAgentHealthAPISupport(err error) bool {
var apiClientError *APIClientError
if errors.As(err, &apiClientError) {
if apiClientError.ControllerErrorDetails == nil {
return false
}

if apiClientError.HTTPStatusCode == http.StatusNotFound &&
strings.Contains(apiClientError.ControllerErrorDetails.Detail, "The requested URL was not found") {
return true
}

if apiClientError.HTTPStatusCode == http.StatusMethodNotAllowed &&
apiClientError.ControllerErrorDetails.Detail == "Method Not Allowed" {
return true
}
}

return false
}

func UserLacksAPIPrivileges(err error) bool {
var apiClientError *APIClientError

if errors.As(err, &apiClientError) {
if apiClientError.APIErrorDetails == nil {
return false
}

if apiClientError.HTTPStatusCode == http.StatusForbidden &&
strings.Contains(apiClientError.APIErrorDetails.Message, "Not authorized. Requires permissions") {
return true
}
}
return false
}

func IncorrectCredentials(err error) bool {
// This covers user that does not exist or incorrect password for user
var apiClientError *APIClientError

if errors.As(err, &apiClientError) && apiClientError.HTTPStatusCode == http.StatusUnauthorized {
return true
}

return false
}
Loading

0 comments on commit da3b770

Please sign in to comment.