Skip to content

Commit

Permalink
Merge branch 'main' into filebeat_shipper_integration_test
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman authored Sep 6, 2023
2 parents 4c599cc + 2b0076b commit 1edf315
Show file tree
Hide file tree
Showing 22 changed files with 387 additions and 254 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091]
- Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158]
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]

==== Deprecated

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix panic when redact option is not provided to CEL input. {issue}36387[36387] {pull}36388[36388]
- Remove 'onFilteredOut' and 'onDroppedOnPublish' callback logs {issue}36299[36299] {pull}36399[36399]
- Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496]
- Ensure winlog input retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483]

*Heartbeat*

Expand Down Expand Up @@ -139,6 +140,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Winlogbeat*

- Ensure event loggers retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483]

*Elastic Logging Plugin*

Expand All @@ -153,6 +155,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Mark `translate_sid` processor is GA. {issue}36279[36279] {pull}36280[36280]
- dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394]
- Mark `syslog` processor as GA, improve docs about how processor handles syslog messages. {issue}36416[36416] {pull}36417[36417]
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]

*Auditbeat*

Expand Down Expand Up @@ -212,6 +215,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Make HTTPJSON response body decoding errors more informative. {pull}36481[36481]
- Allow fine-grained control of entity analytics API requests for Okta provider. {issue}36440[36440] {pull}36492[36492]
- Add support for expanding `journald.process.capabilities` into the human-readable effective capabilities in the ECS `process.thread.capabilities.effective` field. {issue}36454[36454] {pull}36470[36470]
- Allow fine-grained control of entity analytics API requests for AzureAD provider. {issue}36440[36440] {pull}36441[36441]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ The `aws` module requires AWS credentials configuration in order to make AWS API
Users can either use `access_key_id`, `secret_access_key` and/or
`session_token`, or use `role_arn` AWS IAM role, or use shared AWS credentials file.

Users may use `external_id` to support assuming a role in another account, see
https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html[the AWS documentation for use of external IDs].

Please see <<aws-credentials-options,AWS credentials options>> for more details.

include::../include/gs-link.asciidoc[]
Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ runLoop:
records, err := api.Read()
if eventlog.IsRecoverable(err) {
log.Errorw("Encountered recoverable error when reading from Windows Event Log", "error", err)
if closeErr := api.Close(); closeErr != nil {
log.Errorw("Error closing Windows Event Log handle", "error", closeErr)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
log.Errorw("Encountered channel not found error when reading from Windows Event Log", "error", err)
if closeErr := api.Close(); closeErr != nil {
log.Errorw("Error closing Windows Event Log handle", "error", closeErr)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}
Expand Down
8 changes: 4 additions & 4 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ runLoop:
records, err := api.Read()
if eventlog.IsRecoverable(err) {
e.log.Warnw("Read() encountered recoverable error. Reopening handle...", "error", err, "channel", api.Channel())
if closeErr := api.Close(); closeErr != nil {
e.log.Warnw("Close() error.", "error", err)
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
e.log.Warnw("Read() encountered channel not found error for channel %q. Reopening handle...", "error", err, "channel", api.Channel())
if closeErr := api.Close(); closeErr != nil {
e.log.Warnw("Close() error.", "error", err)
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}
Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type EventLog interface {
// reading and close the log.
Read() ([]Record, error)

// Reset closes the event log channel to allow recovering from recoverable
// errors. Open must be successfully called after a Reset before Read may
// be called.
Reset() error

// Close the event log. It should not be re-opened after closing.
Close() error

Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,11 @@ func (l *winEventLog) createBookmarkFromEvent(evtHandle win.EvtHandle) (string,
return string(l.outputBuf.Bytes()), err
}

func (l *winEventLog) Reset() error {
debugf("%s Closing handle for reset", l.logPrefix)
return win.Close(l.subscription)
}

func (l *winEventLog) Close() error {
debugf("%s Closing handle", l.logPrefix)
l.metrics.close()
Expand Down
9 changes: 9 additions & 0 deletions winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,18 @@ func (l *winEventLogExp) createBookmarkFromEvent(evtHandle win.EvtHandle) (strin
return bookmark.XML()
}

func (l *winEventLogExp) Reset() error {
l.log.Debug("Closing event log reader handles for reset.")
return l.close()
}

func (l *winEventLogExp) Close() error {
l.log.Debug("Closing event log reader handles.")
l.metrics.close()
return l.close()
}

func (l *winEventLogExp) close() error {
if l.iterator == nil {
return l.renderer.Close()
}
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ Example configuration:
enabled: true
id: azure-1
provider: azure-ad
dataset: "all"
sync_interval: "12h"
update_interval: "30m"
client_id: "CLIENT_ID"
Expand All @@ -279,6 +280,14 @@ The client/application ID. Used for authentication. Field is required.

The secret value, used for authentication. Field is required.

[float]
===== `dataset`

The datasets to collect from the API. This can be one of "all", "users" or "devices",
or may be left empty for the default behavior which is to collect all entities.
When the `dataset` is set to "devices", some user entity data is collected in order
to populate the registered users and registered owner fields for each device.

[float]
===== `sync_interval`

Expand Down
42 changes: 31 additions & 11 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -267,19 +268,38 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
groupsDeltaLink = state.groupsLink
}

changedUsers, userLink, err := p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))

changedDevices, deviceLink, err := p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
var (
changedUsers []*fetcher.User
userLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "users":
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
default:
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}

var (
changedDevices []*fetcher.Device
deviceLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "devices":
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
default:
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}
p.logger.Debugf("Received %d devices from API", len(changedUsers))

// Get group changes.
// Get group changes. Groups are required for both users and devices.
// So always collect these.
changedGroups, groupLink, err := p.fetcher.Groups(ctx, groupsDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azuread

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -18,36 +19,64 @@ import (
)

func TestAzure_DoFetch(t *testing.T) {
dbFilename := "TestAzure_DoFetch.db"
store := testSetupStore(t, dbFilename)
t.Cleanup(func() {
testCleanupStore(store, dbFilename)
})

a := azure{
logger: logp.L(),
auth: mockauth.New(""),
fetcher: mockfetcher.New(),
tests := []struct {
dataset string
wantUsers bool
wantDevices bool
}{
{dataset: "", wantUsers: true, wantDevices: true},
{dataset: "all", wantUsers: true, wantDevices: true},
{dataset: "users", wantUsers: true, wantDevices: false},
{dataset: "devices", wantUsers: false, wantDevices: true},
}

ss, err := newStateStore(store)
require.NoError(t, err)
defer ss.close(false)
for _, test := range tests {
t.Run(test.dataset, func(t *testing.T) {
suffix := test.dataset
if suffix != "" {
suffix = "_" + suffix
}
dbFilename := fmt.Sprintf("TestAzure_DoFetch%s.db", suffix)
store := testSetupStore(t, dbFilename)
t.Cleanup(func() {
testCleanupStore(store, dbFilename)
})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
gotUsers, gotDevices, err := a.doFetch(ctx, ss, false)
require.NoError(t, err)
a := azure{
conf: conf{Dataset: test.dataset},
logger: logp.L(),
auth: mockauth.New(""),
fetcher: mockfetcher.New(),
}

var wantModifiedUsers collections.UUIDSet
for _, v := range mockfetcher.UserResponse {
wantModifiedUsers.Add(v.ID)
}
var wantModifiedDevices collections.UUIDSet
for _, v := range mockfetcher.DeviceResponse {
wantModifiedDevices.Add(v.ID)
}
ss, err := newStateStore(store)
require.NoError(t, err)
defer ss.close(false)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
gotUsers, gotDevices, err := a.doFetch(ctx, ss, false)
require.NoError(t, err)

require.Equal(t, wantModifiedUsers.Values(), gotUsers.Values())
require.Equal(t, wantModifiedDevices.Values(), gotDevices.Values())
var wantModifiedUsers collections.UUIDSet
for _, v := range mockfetcher.UserResponse {
wantModifiedUsers.Add(v.ID)
}
var wantModifiedDevices collections.UUIDSet
for _, v := range mockfetcher.DeviceResponse {
wantModifiedDevices.Add(v.ID)
}

if test.wantUsers {
require.Equal(t, wantModifiedUsers.Values(), gotUsers.Values())
} else {
require.Equal(t, 0, gotUsers.Len())
}
if test.wantDevices {
require.Equal(t, wantModifiedDevices.Values(), gotDevices.Values())
} else {
require.Equal(t, 0, gotDevices.Len())
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azuread

import (
"errors"
"strings"
"time"
)

Expand All @@ -21,6 +22,7 @@ type conf struct {
TenantID string `config:"tenant_id" validate:"required"`
SyncInterval time.Duration `config:"sync_interval"`
UpdateInterval time.Duration `config:"update_interval"`
Dataset string `config:"dataset"`
}

// Validate runs validation against the config.
Expand All @@ -34,6 +36,11 @@ func (c *conf) Validate() error {
if c.UpdateInterval == 0 {
return errors.New("update_interval must not be zero")
}
switch strings.ToLower(c.Dataset) {
case "", "all", "users", "devices":
default:
return errors.New("dataset must be 'all', 'users', 'devices' or empty")
}

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func TestConf_Validate(t *testing.T) {
},
WantErr: "sync_interval must be longer than update_interval",
},
"err-invalid-dataset": {
In: conf{
SyncInterval: defaultSyncInterval,
UpdateInterval: defaultUpdateInterval,
Dataset: "everything",
},
WantErr: "dataset must be 'all', 'users', 'devices' or empty",
},
}

for name, tc := range tests {
Expand Down
Loading

0 comments on commit 1edf315

Please sign in to comment.