Skip to content

Commit

Permalink
feat: adds periodic health reporting to the k8s inventory agent
Browse files Browse the repository at this point in the history
WIP: DO NOT MERGE!!

Addresses:
   - Enterprise-4118
   - Enterprise-4119

Signed-off-by: Bob Melander <[email protected]>
  • Loading branch information
bobmel committed Sep 11, 2024
1 parent 70a81f1 commit 42ea730
Show file tree
Hide file tree
Showing 13 changed files with 745 additions and 12 deletions.
11 changes: 11 additions & 0 deletions anchore-k8s-inventory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ log:
# enable/disable checking for application updates on startup
check-for-app-update: true

registration:
# The id to register the agent as with Enterprise (should be a freshly generated UUIDv4)
# fallback-register-id: db2963a1-b029-4d7c-b42c-4e8f81c263dd
# Force registration to be done with fallback id, not just as a fallback. Defaults to false.
always-use-fallback-register-id: false
# The name to register the agent as with Enterprise. Defaults to anchore_k8s_inventory_agent.
fallback-register-name: anchore_k8s_inventory_agent

kubeconfig:
path:
cluster: docker-desktop
Expand Down Expand Up @@ -117,6 +125,9 @@ ignore-not-running: true
# Only respected if mode is periodic
polling-interval-seconds: 300

# Only respected if mode is periodic
health-report-interval-seconds: 60

# Batch Request configuration
inventory-report-limits:
namespaces: 0 # default of 0 means no limit per report
Expand Down
74 changes: 71 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"errors"
"fmt"
"github.com/anchore/k8s-inventory/pkg/healthreporter"
"github.com/anchore/k8s-inventory/pkg/integration"
"os"
"runtime/pprof"

Expand Down Expand Up @@ -45,9 +47,57 @@ var rootCmd = &cobra.Command{
os.Exit(1)
}

// TODO(bob): Add global variable to disable all health reporting
// * If registration REST call fails with HTTP 404 Not found **API endpoint**
// then that is an indication that agent is interacting with an older version of
// Enterprise that lacks the Integration feature. We should then
// disable all health reporting functionality in the agent but otherwise
// let it perform its normal tasks.
// * If registration REST call fails with HTTP 404 Not found **Account or User**
// then that means the agent has an incorrect configuration and should not be
// allowed to proceed. Log the error and then exit.
// * if
// * If registration REST call fails with HTTP 500 Internal error then what?
// Log the error and then exit?
// * If registration REST call fails with HTTP 409 re-register with different
// integration_id then the agent should change its integration_id to that value
// and re-register.

// TODO(bob): How to handle Enterprise being offline during registration and
// periodic health-reporting. Reasons for being offline could be that:
// - Enterprise is not started (this could be infinite)
// - Enterprise is starting (this could take minutes)
// - Enterprise undergoes upgrade (this could take hours)
// - other reasons?
// The current behavior of an k8s inventory agent is that its starts and then periodically
// starts sending inventory reports. If a sending fails a new attempt will be made in the
// next cycle. In that sense it will wait indefinitely for Enterprise to come online.
// To be backward compatible with the current behavior of k8s inventory agent, failed
// *registrations* should therefore be retried indefinitely.
// -- Do we really want infinite to literally mean infinite?
// Or just a large enough number of retries with exponential backoff
// -- K8s Deployment, StatefulSets etc will normally restart failed (or just exited) pods
// -- Docker also has ability to restart failed (or just exited)
// Failure to send a health report should just be logged and then ignored. A new attempt
// to send a health report will be performed in the next iteration.

instance, err := callHome()
if err != nil {
os.Exit(1)
}

switch appConfig.RunMode {
case mode.PeriodicPolling:
pkg.PeriodicallyGetInventoryReport(appConfig)
neverDone := make(chan bool, 1)

gatedReportInfo := healthreporter.GatedReportInfo{
AccountInventoryReports: make(healthreporter.AccountK8SInventoryReports, 0),
}

go pkg.PeriodicallyGetInventoryReport(appConfig, &gatedReportInfo)
go healthreporter.PeriodicallySendHealthReport(appConfig, &instance, &gatedReportInfo)

<-neverDone
default:
reports, err := pkg.GetInventoryReports(appConfig)
if appConfig.Dev.ProfileCPU {
Expand All @@ -58,18 +108,19 @@ var rootCmd = &cobra.Command{
os.Exit(1)
}
anErrorOccurred := false
reportInfo := healthreporter.InventoryReportInfo{}
for account, reportsForAccount := range reports {
for count, report := range reportsForAccount {
log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount))
err = pkg.HandleReport(report, appConfig, account)
err = pkg.HandleReport(report, &reportInfo, appConfig, account)
if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) {
// Retry with default account
retryAccount := appConfig.AnchoreDetails.Account
if appConfig.AccountRouteByNamespaceLabel.DefaultAccount != "" {
retryAccount = appConfig.AccountRouteByNamespaceLabel.DefaultAccount
}
log.Warnf("Error sending to Anchore Account %s, sending to default account", account)
err = pkg.HandleReport(report, appConfig, retryAccount)
err = pkg.HandleReport(report, &reportInfo, appConfig, retryAccount)
}
if err != nil {
log.Errorf("Failed to handle Image Results: %+v", err)
Expand All @@ -84,6 +135,23 @@ var rootCmd = &cobra.Command{
},
}

func callHome() (integration.Integration, error) {
namespace := os.Getenv("POD_NAMESPACE")
name := os.Getenv("HOSTNAME")
instance, err := pkg.GetIntegrationInfo(appConfig, namespace, name)
if err != nil {
log.Errorf("Failed to get Integration Info: %+v", err)
return integration.Integration{}, err
}
// Register this agent with enterprise
err = integration.Register(&instance, appConfig.AnchoreDetails)
if err != nil {
log.Errorf("Unable to register agent: %v", err)
return integration.Integration{}, err
}
return instance, nil
}

func init() {
opt := "kubeconfig"
rootCmd.Flags().StringP(opt, "k", "", "(optional) absolute path to the kubeconfig file")
Expand Down
154 changes: 154 additions & 0 deletions internal/anchore/anchoreclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package anchore

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/anchore/k8s-inventory/internal/config"
"github.com/anchore/k8s-inventory/internal/log"
"github.com/anchore/k8s-inventory/internal/tracker"
"github.com/h2non/gock"
"io"
"net/http"
"net/url"
"strings"
"time"
)

type APIError struct {
Message string `json:"message"`
Detail map[string]interface{} `json:"detail"`
HTTPCode int `json:"httpcode"`
}

func ErrAnchoreEndpointDoesNotExist(path string) error {
return fmt.Errorf("api endpoint does not exist: %s", path)
}

type ErrAchoreAPIClient struct {
HTTPStatusCode int
Message string
Path string
Body *[]byte
}

func (e *ErrAchoreAPIClient) Error() string {
return fmt.Sprintf("API error(%d): %s Path: %q", e.HTTPStatusCode, e.Message, e.Path)
}

func Put(requestBody []byte, id string, path string, anchoreDetails config.AnchoreInfo, operation string) (*[]byte, error) {
defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation))

log.Debugf("Performing %s to Anchore using endpoint: %s", operation, strings.Replace(path, "{{id}}", id, 1))

client := getClient(anchoreDetails)

anchoreURL, err := getURL(anchoreDetails, path, id)
if err != nil {
return nil, err
}

request, err := getPutRequest(anchoreDetails, anchoreURL, requestBody, operation)
if err != nil {
return nil, err
}

return doPut(client, request, strings.Replace(path, "{{id}}", id, 1), operation)
}

func getClient(anchoreDetails config.AnchoreInfo) *http.Client {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: anchoreDetails.HTTP.Insecure},
} // #nosec G402

client := &http.Client{
Transport: tr,
Timeout: time.Duration(anchoreDetails.HTTP.TimeoutSeconds) * time.Second,
}
gock.InterceptClient(client) // Required to use gock for testing custom client

return client
}

func getURL(anchoreDetails config.AnchoreInfo, path string, id string) (string, error) {
anchoreURL, err := url.Parse(anchoreDetails.URL)
if err != nil {
return "", fmt.Errorf("failed to build path (%s) url: %w", path, err)
}

anchoreURL.Path += strings.Replace(path, "{{id}}", id, 1)
return anchoreURL.String(), nil
}

func getPutRequest(anchoreDetails config.AnchoreInfo, endpointURL string, reqBody []byte, operation string) (*http.Request, error) {
request, err := http.NewRequest("PUT", endpointURL, bytes.NewBuffer(reqBody))
if err != nil {
return nil, fmt.Errorf("failed to prepare %s request to Anchore: %w", operation, err)
}

request.SetBasicAuth(anchoreDetails.User, anchoreDetails.Password)
request.Header.Set("Content-Type", "application/json")
request.Header.Set("x-anchore-account", anchoreDetails.Account)
return request, nil
}

func doPut(client *http.Client, request *http.Request, path string, operation string) (*[]byte, error) {
response, err := client.Do(request)
if err != nil {
return nil, fmt.Errorf("failed to send %s to Anchore: %w", operation, err)
}
defer response.Body.Close()

err = checkHTTPErrors(response, request, path, operation)
if err != nil {
return nil, err
}

responseBody, _ := getBody(response, operation)
return responseBody, nil
}

func checkHTTPErrors(response *http.Response, _ *http.Request, path string, operation string) error {
switch {
case response.StatusCode == 403:
msg := fmt.Sprintf("forbidden response (403) from Anchore (during %s)", operation)
log.Debug(msg)
return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode}
// return fmt.Errorf("user account not found")
case response.StatusCode == 404:
msg := fmt.Sprintf("forbidden response (404) from Anchore (during %s)", operation)
log.Debugf("%s: path: %s. Please verify that correct version of Anchore is deployed.", msg, path)
return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode}
// return ErrAnchoreEndpointDoesNotExist(path)
case response.StatusCode == 409:
msg := fmt.Sprintf("conflict response (409) from Anchore (during %s)", operation)
log.Debug(msg)
respBody, _ := getBody(response, operation)
return &ErrAchoreAPIClient{Message: msg, Path: path, Body: respBody, HTTPStatusCode: response.StatusCode}
case response.StatusCode < 200 || response.StatusCode > 299:
msg := fmt.Sprintf("failed to perform %s to Anchore: %+v", operation, response)
log.Debugf(msg)
return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode}
// return fmt.Errorf("failed to perform %s to Anchore: %+v", operation, response)
}
return nil
}

func getBody(response *http.Response, operation string) (*[]byte, error) {
responseBody, err := io.ReadAll(response.Body)
if err != nil {
errMsg := fmt.Sprintf("failed to read %s response body from Anchore:", operation)
log.Debugf("%s %v", operation, errMsg)
return nil, fmt.Errorf("%s %w", errMsg, err)
}

// Check we received a valid JSON response from Anchore, this will help catch
// any redirect responses where it returns HTML login pages e.g. Enterprise
// running behind cloudflare where a login page is returned with the status 200
if len(responseBody) > 0 && !json.Valid(responseBody) {
log.Debugf("Anchore %s response body: %s", operation, string(responseBody))
return nil, fmt.Errorf("%s response from Anchore is not valid json: %+v", operation, response)
}
return &responseBody, nil
}
17 changes: 14 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"path"
"strings"

"github.com/anchore/k8s-inventory/pkg/mode"
"gopkg.in/yaml.v2"

"github.com/adrg/xdg"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/spf13/viper"

"github.com/anchore/k8s-inventory/internal"
"github.com/anchore/k8s-inventory/pkg/mode"
)

const redacted = "******"
Expand All @@ -37,8 +37,9 @@ type CliOnlyOptions struct {
// All Application configurations
type Application struct {
ConfigPath string
Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"`
Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"`
Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"`
Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"`
Registration RegistrationOptions `mapstructure:"registration" json:"registration,omitempty" yaml:"registration"`
CliOptions CliOnlyOptions
Dev Development `mapstructure:"dev" json:"dev,omitempty" yaml:"dev"`
KubeConfig KubeConf `mapstructure:"kubeconfig" json:"kubeconfig,omitempty" yaml:"kubeconfig"`
Expand All @@ -54,12 +55,19 @@ type Application struct {
Mode string `mapstructure:"mode" json:"mode,omitempty" yaml:"mode"`
IgnoreNotRunning bool `mapstructure:"ignore-not-running" json:"ignore-not-running,omitempty" yaml:"ignore-not-running"`
PollingIntervalSeconds int `mapstructure:"polling-interval-seconds" json:"polling-interval-seconds,omitempty" yaml:"polling-interval-seconds"`
HealthReportIntervalSeconds int `mapstructure:"health-report-interval-seconds" json:"health-report-interval-seconds,omitempty" yaml:"health-report-interval-seconds"`
InventoryReportLimits InventoryReportLimits `mapstructure:"inventory-report-limits" json:"inventory-report-limits,omitempty" yaml:"inventory-report-limits"`
MetadataCollection MetadataCollection `mapstructure:"metadata-collection" json:"metadata-collection,omitempty" yaml:"metadata-collection"`
AnchoreDetails AnchoreInfo `mapstructure:"anchore" json:"anchore,omitempty" yaml:"anchore"`
VerboseInventoryReports bool `mapstructure:"verbose-inventory-reports" json:"verbose-inventory-reports,omitempty" yaml:"verbose-inventory-reports"`
}

type RegistrationOptions struct {
FallbackRegisterID string `mapstructure:"fallback-register-id" json:"fallback-register-id,omitempty" yaml:"fallback-register-id"`
AlwaysUseFallbackRegisterID bool `mapstructure:"always-use-fallback-register-id" json:"always-use-fallback-register-id,omitempty" yaml:"always-use-fallback-register-id"`
FallbackRegisterName string `mapstructure:"fallback-register-name" json:"fallback-register-name,omitempty" yaml:"fallback-register-name"`
}

// MissingTagConf details the policy for handling missing tags when reporting images
type MissingTagConf struct {
Policy string `mapstructure:"policy" json:"policy,omitempty" yaml:"policy"`
Expand Down Expand Up @@ -150,6 +158,8 @@ func setNonCliDefaultValues(v *viper.Viper) {
v.SetDefault("log.level", "")
v.SetDefault("log.file", "")
v.SetDefault("log.structured", false)
v.SetDefault("registration.always-use-fallback-register-id", false)
v.SetDefault("registration.fallback-register-name", "anchore_k8s_inventory_agent")
v.SetDefault("dev.profile-cpu", false)
v.SetDefault("anchore.account", "admin")
v.SetDefault("kubeconfig.anchore.account", "admin")
Expand All @@ -160,6 +170,7 @@ func setNonCliDefaultValues(v *viper.Viper) {
v.SetDefault("kubernetes.request-batch-size", 100)
v.SetDefault("kubernetes.worker-pool-size", 100)
v.SetDefault("ignore-not-running", true)
v.SetDefault("health-report-interval-seconds", 60)
v.SetDefault("missing-registry-override", "")
v.SetDefault("missing-tag-policy.policy", "digest")
v.SetDefault("missing-tag-policy.tag", "UNKNOWN")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ log:
levelopt: debug
level: debug
file: ./anchore-k8s-inventory.log
registration:
fallback-register-id: ""
always-use-fallback-register-id: false
fallback-register-name: anchore_k8s_inventory_agent
clioptions:
configpath: ../../anchore-k8s-inventory.yaml
verbosity: 0
Expand Down Expand Up @@ -44,6 +48,7 @@ runmode: 0
mode: adhoc
ignore-not-running: true
polling-interval-seconds: 300
health-report-interval-seconds: 60
inventory-report-limits:
namespaces: 0
metadata-collection:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ log:
levelopt: panic
level: ""
file: ""
registration:
fallback-register-id: ""
always-use-fallback-register-id: false
fallback-register-name: ""
clioptions:
configpath: ""
verbosity: 0
Expand Down Expand Up @@ -44,6 +48,7 @@ runmode: 0
mode: ""
ignore-not-running: false
polling-interval-seconds: 0
health-report-interval-seconds: 0
inventory-report-limits:
namespaces: 0
metadata-collection:
Expand Down
Loading

0 comments on commit 42ea730

Please sign in to comment.