diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 2a393f8957f..aeb54067887 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -761,7 +761,7 @@ func (c *Coordinator) compute() (map[string]interface{}, []component.Component, configInjector = c.monitorMgr.MonitoringConfig } - comps, err := c.specs.ToComponents(cfg, configInjector, c.state.logLevel) + comps, err := c.specs.ToComponents(cfg, configInjector, c.state.logLevel, c.agentInfo) if err != nil { return nil, nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index bfefd7a140b..7c15ae16fd5 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -144,6 +144,11 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return err } + agentInfo, err := info.NewAgentInfoWithLog("error", false) + if err != nil { + return fmt.Errorf("could not load agent info: %w", err) + } + if opts.includeMonitoring { // Load the requirements before trying to load the configuration. These should always load // even if the configuration is wrong. @@ -160,7 +165,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, if err != nil { return fmt.Errorf("failed to get monitoring: %w", err) } - components, binaryMapping, err := specs.PolicyToComponents(cfg, lvl) + components, binaryMapping, err := specs.PolicyToComponents(cfg, lvl, agentInfo) if err != nil { return fmt.Errorf("failed to get binary mappings: %w", err) } @@ -252,8 +257,13 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen return fmt.Errorf("failed to get monitoring: %w", err) } + agentInfo, err := info.NewAgentInfoWithLog("error", false) + if err != nil { + return fmt.Errorf("could not load agent info: %w", err) + } + // Compute the components from the computed configuration. - comps, err := specs.ToComponents(m, monitorFn, lvl) + comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo) if err != nil { return fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 15e43cca9d1..f2f46da2036 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -167,7 +167,7 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi if err != nil { return nil, errors.New("failed to create a map from config", err) } - allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel) + allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/pkg/component/component.go b/pkg/component/component.go index 8afd94b0df5..4174b7b540b 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -23,9 +23,15 @@ import ( // GenerateMonitoringCfgFn is a function that can inject information into the model generation process. type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string) (map[string]interface{}, error) +type HeadersProvider interface { + Headers() map[string]string +} + const ( // defaultUnitLogLevel is the default log level that a unit will get if one is not defined. defaultUnitLogLevel = client.UnitLogLevelInfo + headersKey = "headers" + elasticsearchType = "elasticsearch" ) // ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs. @@ -105,8 +111,8 @@ func (c *Component) Type() string { } // ToComponents returns the components that should be running based on the policy and the current runtime specification. -func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInjector GenerateMonitoringCfgFn, ll logp.Level) ([]Component, error) { - components, binaryMapping, err := r.PolicyToComponents(policy, ll) +func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider) ([]Component, error) { + components, binaryMapping, err := r.PolicyToComponents(policy, ll, headers) if err != nil { return nil, err } @@ -119,7 +125,7 @@ func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInj if monitoringCfg != nil { // monitoring is enabled - monitoringComps, _, err := r.PolicyToComponents(monitoringCfg, ll) + monitoringComps, _, err := r.PolicyToComponents(monitoringCfg, ll, headers) if err != nil { return nil, fmt.Errorf("failed to generate monitoring components: %w", err) } @@ -133,8 +139,8 @@ func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInj // PolicyToComponents takes the policy and generated a component model along with providing a mapping between component // and the running binary. -func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}, ll logp.Level) ([]Component, map[string]string, error) { - outputsMap, err := toIntermediate(policy, r.aliasMapping, ll) +func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}, ll logp.Level, headers HeadersProvider) ([]Component, map[string]string, error) { + outputsMap, err := toIntermediate(policy, r.aliasMapping, ll, headers) if err != nil { return nil, nil, err } @@ -421,7 +427,7 @@ func getSupportedShipper(r *RuntimeSpecs, output outputI, inputSpec InputRuntime // toIntermediate takes the policy and returns it into an intermediate representation that is easier to map into a set // of components. -func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level) (map[string]outputI, error) { +func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) { const ( outputsKey = "outputs" enabledKey = "enabled" @@ -470,6 +476,28 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin if err != nil { return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", name, err) } + + // inject headers configured during enroll + if t == elasticsearchType && headers != nil { + // can be nil when called from install/uninstall + if agentHeaders := headers.Headers(); len(agentHeaders) > 0 { + headers := make(map[string]interface{}) + if existingHeadersRaw, found := output[headersKey]; found { + existingHeaders, ok := existingHeadersRaw.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputRaw) + } + headers = existingHeaders + } + + for headerName, headerVal := range agentHeaders { + headers[headerName] = headerVal + } + + output[headersKey] = headers + } + } + outputsMap[name] = outputI{ name: name, enabled: enabled, diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 5a72eb84037..c1c14415167 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -41,6 +41,7 @@ func TestToComponents(t *testing.T) { LogLevel logp.Level Err string Result []Component + headers HeadersProvider }{ { Name: "Empty policy", @@ -1588,6 +1589,215 @@ func TestToComponents(t *testing.T) { }, }, }, + { + Name: "Headers injection", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + "headers": map[string]interface{}{ + "header-one": "val-1", + }, + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "header-one": "val-1", + }}, + }, { + Name: "Headers injection merge", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + "headers": map[string]interface{}{ + "header-two": "val-2", + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + "headers": map[string]interface{}{ + "header-two": "val-2", + "header-one": "val-1", + }, + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "header-one": "val-1", + }}, + }, + { + Name: "Headers injection not injecting kafka", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "kafka", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "kafka", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "header-one": "val-1", + }}, + }, + { + Name: "Headers injection not injecting logstash", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "logstash", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "logstash", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "header-one": "val-1", + }}, + }, } for _, scenario := range scenarios { @@ -1595,7 +1805,7 @@ func TestToComponents(t *testing.T) { runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck()) require.NoError(t, err) - result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel) + result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) } else { @@ -1755,3 +1965,11 @@ func mustExpectedConfigForceType(cfg map[string]interface{}, forceType string) * res.Type = forceType return res } + +type testHeadersProvider struct { + headers map[string]string +} + +func (h *testHeadersProvider) Headers() map[string]string { + return h.headers +}