Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloudtrail s3 interval #275

Merged
merged 5 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugins/cloudtrail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ all: $(OUTPUT)
clean:
@rm -f $(OUTPUT)

$(OUTPUT):
$(OUTPUT): pkg/cloudtrail/*.go plugin/cloudtrail.go
@$(GODEBUGFLAGS) $(GO) build -buildmode=c-shared -o $(OUTPUT) ./plugin

readme:
@$(READMETOOL) -p ./$(OUTPUT) -f README.md
@$(READMETOOL) -p ./$(OUTPUT) -f README.md
15 changes: 15 additions & 0 deletions plugins/cloudtrail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,25 @@ The json object has the following properties:

* `sqsDelete`: value is boolean. If true, then the plugin will delete sqs messages from the queue immediately after receiving them. (Default: true)
* `s3DownloadConcurrency`: value is numeric. Controls the number of background goroutines used to download S3 files. (Default: 1)
* `S3Interval`: value is string. Download log files matching the specified time interval. Note that this matches log file *names*, not event timestamps. CloudTrail logs usually cover [the previous 5 minutes of activity](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/get-and-view-cloudtrail-log-files.html). See *Time Intervals* below for possible formats.
* `useS3SNS`: value is boolean. If true, then the plugin will expect SNS messages to originate from S3 instead of directly from Cloudtrail (Default: false)

The init string can be the empty string, which is treated identically to `{}`.

### Time Intervals

S3Interval values can be individual duration values or RFC 3339-style timestamps. In this case the interval will start at the specified time and end at the current time:
* `5d`: A simple duration relative to the current time.
* `2021-03-30T18:07:17Z`: A duration starting from the specified RFC 3339 formatted time.

Simple durations must be a positive integer followed by `w` for weeks, `d` for days, `h` for hours, `m` for minutes, or `s` for seconds.
RFC 3339-style times must be formatted as a datestamp, the letter `T`, a timestamp with no fractional seconds, and the letter `Z`, e.g. `2021-03-30T18:07:17Z`.

Values can also cover a range:
* `5d-2d`: A simple duration interval relative to the current time.
* `2023-04-05T06:00:00Z-2023-04-05T12:00:10Z`: An RFC 3339-style timestamp interval.
* `2023-04-05T06:00:00Z-5d`: A combination of an RFC 3339-style timestamp and a duration.

### Plugin Open Params

The format of the open params string is a uri-like string with one of the following forms:
Expand Down
2 changes: 2 additions & 0 deletions plugins/cloudtrail/pkg/cloudtrail/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloudtrail
// Struct for plugin init config
type PluginConfig struct {
S3DownloadConcurrency int `json:"s3DownloadConcurrency" jsonschema:"title=S3 download concurrency,description=Controls the number of background goroutines used to download S3 files (Default: 32),default=32"`
S3Interval string `json:"s3Interval" jsonschema:"title=S3 log interval,description=Download log files over the specified interval (Default: no interval),default="`
SQSDelete bool `json:"sqsDelete" jsonschema:"title=Delete SQS messages,description=If true then the plugin will delete SQS messages from the queue immediately after receiving them (Default: true),default=true"`
UseAsync bool `json:"useAsync" jsonschema:"title=Use async extraction,description=If true then async extraction optimization is enabled (Default: true),default=true"`
UseS3SNS bool `json:"useS3SNS" jsonschema:"title=Use S3 SNS,description=If true then the plugin will expect SNS messages to originate from S3 instead of directly from Cloudtrail (Default: false),default=false"`
Expand All @@ -29,6 +30,7 @@ type PluginConfig struct {
func (p *PluginConfig) Reset() {
p.SQSDelete = true
p.S3DownloadConcurrency = 32
p.S3Interval = ""
p.UseAsync = true
p.UseS3SNS = false
p.AWS.Reset()
Expand Down
76 changes: 76 additions & 0 deletions plugins/cloudtrail/pkg/cloudtrail/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright (C) 2022 The Falco Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudtrail

import (
"regexp"
"strconv"
"time"
)

var RFC3339Simple = "2006-01-02T03:04:05Z"

func parseEndpoint(endpoint string) (time.Time, error) {
utc := time.Now().UTC()
var endpointTime time.Time
var err error

durationRE := regexp.MustCompile(`^(\d+)([wdhms])$`)
matches := durationRE.FindStringSubmatch(endpoint)
if matches != nil {
durI, err := strconv.Atoi(matches[1])
if err == nil {
duration := time.Duration(durI)
switch matches[2] {
case "w":
duration *= time.Hour * 24 * 7
case "d":
duration *= time.Hour * 24
case "h":
duration *= time.Hour
case "m":
duration *= time.Minute
case "s":
duration *= time.Second
}
endpointTime = utc.Add(- duration)
}
} else {
endpointTime, err = time.Parse(RFC3339Simple, endpoint)
}
return endpointTime, err
}

// endTime will be zero if no end interval was supplied.
func ParseInterval(interval string) (time.Time, time.Time, error) {
var startTime time.Time
var endTime time.Time
var err error

// First, see if we have an interval.
intervalRE := regexp.MustCompile(`(.*)\s*-\s*(\d+[wdhms]|\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)$`)
matches := intervalRE.FindStringSubmatch(interval)
if matches != nil {
startTime, err = parseEndpoint(matches[1])
if err == nil {
endTime, err = parseEndpoint(matches[2])
}
} else if interval != "" {
startTime, err = parseEndpoint(interval)
}
return startTime, endTime, err
}
141 changes: 117 additions & 24 deletions plugins/cloudtrail/pkg/cloudtrail/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -172,38 +173,130 @@ func (oCtx *PluginInstance) openS3(input string) error {
return err
}

// Fetch the list of keys

type listOrigin struct {
prefix *string
startAfter *string
}

var inputParams []listOrigin
ctx := context.Background()
paginator := s3.NewListObjectsV2Paginator(oCtx.s3.client, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: &prefix,
})

for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
// Try friendlier error sources first.
var aErr smithy.APIError
if errors.As(err, &aErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", aErr.ErrorCode(), aErr.ErrorMessage())
}
// XXX Make empty mean no startTime.
startTime, endTime, err := ParseInterval(oCtx.config.S3Interval)
if err != nil {
return fmt.Errorf(PluginName + " invalid interval: \"%s\": %s", oCtx.config.S3Interval, err.Error())

}

// CloudTrail logs have the format
// bucket_name/prefix_name/AWSLogs/Account ID/CloudTrail/region/YYYY/MM/DD/AccountID_CloudTrail_RegionName_YYYYMMDDTHHmmZ_UniqueString.json.gz
// Reduce the number of pages we have to process using "StartAfter" parameters
// here, then trim individual filepaths below.

var oErr *smithy.OperationError
if errors.As(err, &oErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", oErr.Service(), oErr.Unwrap())
intervalPrefix := prefix

// For durations, carve out a special case for "Copy S3 URI" in the AWS console, which gives you
// bucket_name/prefix_name/AWSLogs/<Account ID>/
awsLogsRE := regexp.MustCompile(`AWSLogs/\d+/?$`)
if awsLogsRE.MatchString(prefix) {
if (! strings.HasSuffix(intervalPrefix, "/")) {
intervalPrefix += "/"
}
intervalPrefix += "CloudTrail/"
}

if strings.HasSuffix(intervalPrefix, "/CloudTrail/") {
delimiter := "/"
// Fetch the list of regions.
output, err := oCtx.s3.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: &intervalPrefix,
Delimiter: &delimiter,
})
if err == nil {
for _, commonPrefix := range output.CommonPrefixes {
params := listOrigin {prefix: commonPrefix.Prefix}
if !startTime.IsZero() {
// startAfter doesn't have to be a real key.
startAfterSuffix := startTime.Format("2006/01/02/")
startAfter := *commonPrefix.Prefix + startAfterSuffix
params.startAfter = &startAfter
}
inputParams = append(inputParams, params)
}
}
}

return fmt.Errorf(PluginName + " plugin error: failed to list objects: " + err.Error())
filepathRE := regexp.MustCompile(`.*_CloudTrail_[^_]+_([^_]+)Z_`)
var startTS string
var endTS string

if len(inputParams) > 0 {
if !startTime.IsZero() {
startTS = startTime.Format("20060102T0304")
if !endTime.IsZero() {
endTS = endTime.Format("20060102T0304")
if endTS < startTS {
return fmt.Errorf(PluginName + " start time %s must be less than end time %s", startTime.Format(RFC3339Simple), endTime.Format(RFC3339Simple))
}
}
}
for _, obj := range page.Contents {
path := obj.Key
isCompressed := strings.HasSuffix(*path, ".json.gz")
if filepath.Ext(*path) != ".json" && !isCompressed {
continue
} else {
// No region prefixes found, just use what we were given.
params := listOrigin {prefix: &prefix, startAfter: nil}
inputParams = append(inputParams, params)
}

// Would it make sense to do this concurrently?
for _, params := range inputParams {
// Fetch the list of keys
paginator := s3.NewListObjectsV2Paginator(oCtx.s3.client, &s3.ListObjectsV2Input{
Bucket: &oCtx.s3.bucket,
Prefix: params.prefix,
StartAfter: params.startAfter,
})

for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
// Try friendlier error sources first.
var aErr smithy.APIError
if errors.As(err, &aErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", aErr.ErrorCode(), aErr.ErrorMessage())
}

var oErr *smithy.OperationError
if errors.As(err, &oErr) {
return fmt.Errorf(PluginName + " plugin error: %s: %s", oErr.Service(), oErr.Unwrap())
}

return fmt.Errorf(PluginName + " plugin error: failed to list objects: " + err.Error())
}
for _, obj := range page.Contents {
path := obj.Key

if startTS != "" {
matches := filepathRE.FindStringSubmatch(*path)
if matches != nil {
pathTS := matches[1]
if pathTS < startTS {
continue
}
if endTS != "" && pathTS > endTS {
continue
}
}
}

isCompressed := strings.HasSuffix(*path, ".json.gz")
if filepath.Ext(*path) != ".json" && !isCompressed {
continue
}

var fi fileInfo = fileInfo{name: *path, isCompressed: isCompressed}
oCtx.files = append(oCtx.files, fi)
var fi fileInfo = fileInfo{name: *path, isCompressed: isCompressed}
oCtx.files = append(oCtx.files, fi)
}
}
}

Expand Down
Loading