Skip to content

Commit

Permalink
update(plugins/cloudtrail): Add an S3Interval option
Browse files Browse the repository at this point in the history
Add an "S3Interval" option, which limits log downloads to the specified
time interval. Intervals can be a simple "relative time in the past to
now", "an absolute timestamp until now", or a range of those two
formats.

The interval is applied at the far end using the StartAfter parameter
and locally to filter log pathnames. In my informal testing here the
time from capture start to first event is ~2s.

Signed-off-by: Gerald Combs <[email protected]>
  • Loading branch information
geraldcombs committed May 3, 2023
1 parent 0a01e4f commit 5acdf4e
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 24 deletions.
15 changes: 15 additions & 0 deletions plugins/cloudtrail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,25 @@ The json object has the following properties:

* `sqsDelete`: value is boolean. If true, then the plugin will delete sqs messages from the queue immediately after receiving them. (Default: true)
* `s3DownloadConcurrency`: value is numeric. Controls the number of background goroutines used to download S3 files. (Default: 1)
* `S3Interval`: value is string. Download log files matching the specified time interval. Note that this matches log file *names*, not event timestamps. CloudTrail logs usually cover [the previous 5 minutes of activity](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/get-and-view-cloudtrail-log-files.html). See *Time Intervals* below for possible formats.
* `useS3SNS`: value is boolean. If true, then the plugin will expect SNS messages to originate from S3 instead of directly from Cloudtrail (Default: false)

The init string can be the empty string, which is treated identically to `{}`.

### Time Intervals

S3Interval values can be individual duration values or RFC 3339-style timestamps. In this case the interval will start at the specified time and end at the current time:
* `5d`: A simple duration relative to the current time.
* `2021-03-30T18:07:17Z`: A duration starting from the specified RFC 3339 formatted time.

Simple durations must be a positive integer followed by `w` for weeks, `d` for days, `h` for hours, `m` for minutes, or `s` for seconds.
RFC 3339-style times must be formatted as a datestamp, the letter `T`, a timestamp with no fractional seconds, and the letter `Z`, e.g. `2021-03-30T18:07:17Z`.

Values can also cover a range:
* `5d-2d`: A simple duration interval relative to the current time.
* `2023-04-05T06:00:00Z-2023-04-05T12:00:10Z`: An RFC 3339-style timestamp interval.
* `2023-04-05T06:00:00Z-5d`: A combination of an RFC 3339-style timestamp and a duration.

### Plugin Open Params

The format of the open params string is a uri-like string with one of the following forms:
Expand Down
1 change: 1 addition & 0 deletions plugins/cloudtrail/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ require (
github.com/aws/smithy-go v1.13.3
github.com/falcosecurity/plugin-sdk-go v0.6.2
github.com/valyala/fastjson v1.6.3
github.com/xhit/go-str2duration/v2 v2.1.0
)
2 changes: 2 additions & 0 deletions plugins/cloudtrail/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 2 additions & 0 deletions plugins/cloudtrail/pkg/cloudtrail/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloudtrail
// Struct for plugin init config
type PluginConfig struct {
S3DownloadConcurrency int `json:"s3DownloadConcurrency" jsonschema:"title=S3 download concurrency,description=Controls the number of background goroutines used to download S3 files (Default: 1),default=1"`
S3Interval string `json:"s3Interval" jsonschema:"title=S3 log interval,description=Download log files over the specified interval (Default: 24h),default=24h"`
SQSDelete bool `json:"sqsDelete" jsonschema:"title=Delete SQS messages,description=If true then the plugin will delete SQS messages from the queue immediately after receiving them (Default: true),default=true"`
UseAsync bool `json:"useAsync" jsonschema:"title=Use async extraction,description=If true then async extraction optimization is enabled (Default: true),default=true"`
UseS3SNS bool `json:"useS3SNS" jsonschema:"title=Use S3 SNS,description=If true then the plugin will expect SNS messages to originate from S3 instead of directly from Cloudtrail (Default: false),default=false"`
Expand All @@ -29,6 +30,7 @@ type PluginConfig struct {
func (p *PluginConfig) Reset() {
p.SQSDelete = true
p.S3DownloadConcurrency = 1
p.S3Interval = "24h"
p.UseAsync = true
p.UseS3SNS = false
p.AWS.Reset()
Expand Down
63 changes: 63 additions & 0 deletions plugins/cloudtrail/pkg/cloudtrail/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright (C) 2022 The Falco Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudtrail

import (
"regexp"
"time"

"github.com/xhit/go-str2duration/v2"
)

var RFC3339Simple = "2006-01-02T03:04:05Z"

func parseEndpoint(endpoint string) (time.Time, error) {
utc := time.Now().UTC()
var endpointTime time.Time
var err error

epRE := regexp.MustCompile(`^\d+[wdhms]$`)
if (epRE.MatchString(endpoint)) {
duration, err := str2duration.ParseDuration(endpoint)
if (err == nil) {
endpointTime = utc.Add(- duration)
}
} else {
endpointTime, err = time.Parse(RFC3339Simple, endpoint)
}
return endpointTime, err
}

// endTime will be zero if no end interval was supplied.
func ParseInterval(interval string) (time.Time, time.Time, error) {
var startTime time.Time
var endTime time.Time
var err error

// First, see if we have an interval.
intervalRE := regexp.MustCompile(`(.*)(\s*-\s*)(\d+[wdhms]$|\d{4}-\d{2}\d{2}[^Z]*Z)`)
matches := intervalRE.FindStringSubmatch(interval)
if matches != nil {
startTime, err = parseEndpoint(matches[1])
if err == nil {
endTime, err = parseEndpoint(matches[3])
}
} else {
startTime, err = parseEndpoint(interval)
}
return startTime, endTime, err
}
135 changes: 111 additions & 24 deletions plugins/cloudtrail/pkg/cloudtrail/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -172,38 +173,124 @@ func (oCtx *PluginInstance) openS3(input string) error {
return err
}

// Fetch the list of keys

type listOrigin struct {
prefix *string
startAfter *string
}

var inputParams []listOrigin
ctx := context.Background()
paginator := s3.NewListObjectsV2Paginator(oCtx.s3.client, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: &prefix,
})

for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
// Try friendlier error sources first.
var aErr smithy.APIError
if errors.As(err, &aErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", aErr.ErrorCode(), aErr.ErrorMessage())
}
startTime, endTime, err := ParseInterval(oCtx.config.S3Interval)
if err != nil {
return fmt.Errorf(PluginName + " invalid interval: \"%s\": %s", oCtx.config.S3Interval, err.Error())

}
if !endTime.IsZero() && endTime.Compare(startTime) > 0 {
return fmt.Errorf(PluginName + " start time %s must be less than end time %s", startTime.Format(RFC3339Simple), endTime.Format(RFC3339Simple))
}

// CloudTrail logs have the format
// bucket_name/prefix_name/AWSLogs/Account ID/CloudTrail/region/YYYY/MM/DD/AccountID_CloudTrail_RegionName_YYYYMMDDTHHmmZ_UniqueString.json.gz
// Reduce the number of pages we have to process using "StartAfter" parameters
// here, then trim individual filepaths below.

intervalPrefix := prefix
startAfterSuffix := startTime.Format("2006/01/02/")

var oErr *smithy.OperationError
if errors.As(err, &oErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", oErr.Service(), oErr.Unwrap())
// For durations, carve out a special case for "Copy S3 URI" in the AWS console, which gives you
// bucket_name/prefix_name/AWSLogs/<Account ID>/
awsLogsRE := regexp.MustCompile(`AWSLogs/\d+/?$`)
if awsLogsRE.MatchString(prefix) {
if (! strings.HasSuffix(intervalPrefix, "/")) {
intervalPrefix += "/"
}
intervalPrefix += "CloudTrail/"
}

if strings.HasSuffix(intervalPrefix, "/CloudTrail/") {
delimiter := "/"
// Fetch the list of regions.
output, err := oCtx.s3.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: &intervalPrefix,
Delimiter: &delimiter,
})
if err == nil {
for _, commonPrefix := range output.CommonPrefixes {
// startAfter doesn't have to be a real key.
startAfter := *commonPrefix.Prefix + startAfterSuffix
params := listOrigin {prefix: commonPrefix.Prefix, startAfter: &startAfter}
inputParams = append(inputParams, params)
}
}
}

filepathRE := regexp.MustCompile(`.*_CloudTrail_[^_]+_([^_]+)Z_`)
var startTS string
var endTS string

return fmt.Errorf(PluginName + " plugin error: failed to list objects: " + err.Error())
if len(inputParams) > 0 {
startTS = startTime.Format("20060102T0304")
if !endTime.IsZero() {
endTS = endTime.Format("20060102T0304")
}
for _, obj := range page.Contents {
path := obj.Key
isCompressed := strings.HasSuffix(*path, ".json.gz")
if filepath.Ext(*path) != ".json" && !isCompressed {
continue
} else {
// No region prefixes found, just use what we were given.
params := listOrigin {prefix: &prefix, startAfter: nil}
inputParams = append(inputParams, params)
}

// Would it make sense to do this concurrently?
for _, params := range inputParams {
// Fetch the list of keys
paginator := s3.NewListObjectsV2Paginator(oCtx.s3.client, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: params.prefix,
StartAfter: params.startAfter,
})

for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
// Try friendlier error sources first.
var aErr smithy.APIError
if errors.As(err, &aErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", aErr.ErrorCode(), aErr.ErrorMessage())
}

var oErr *smithy.OperationError
if errors.As(err, &oErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", oErr.Service(), oErr.Unwrap())
}

return fmt.Errorf(PluginName + " plugin error: failed to list objects: " + err.Error())
}
for _, obj := range page.Contents {
path := obj.Key

if startTS != "" {
matches := filepathRE.FindStringSubmatch(*path)
if matches != nil {
pathTS := matches[1]
if pathTS < startTS {
continue
}
if endTS != "" && pathTS > endTS {
continue
}
}
}

isCompressed := strings.HasSuffix(*path, ".json.gz")
if filepath.Ext(*path) != ".json" && !isCompressed {
continue
}

var fi fileInfo = fileInfo{name: *path, isCompressed: isCompressed}
oCtx.files = append(oCtx.files, fi)
var fi fileInfo = fileInfo{name: *path, isCompressed: isCompressed}
oCtx.files = append(oCtx.files, fi)
}
}
}

Expand Down

0 comments on commit 5acdf4e

Please sign in to comment.