From f3624c7312cbebca6459b1e26ce9bb358d2070b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francesco=20Pirro=CC=80?= Date: Wed, 20 Nov 2024 15:48:14 +0100 Subject: [PATCH 1/8] feat(plugins/aksaudit): add aks audit logs plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Francesco Pirrò update(plugins/gcpaudit): bump plugin version to 0.5.0 Signed-off-by: Francesco Pirrò chore(plugins/gcpaudit): update changelogs with v0.5.0 changes Signed-off-by: Francesco Pirrò add initial plugin structure Signed-off-by: Igor Eulalio add processor function, refactor workflow to leverage channels Signed-off-by: Igor Eulalio refactoring main function to handle Process in underlying package, introducing channels Signed-off-by: Igor Eulalio add makefile Signed-off-by: Igor Eulalio update regisry + readme for k8saudit-aks Signed-off-by: Thomas Labarussias add owners Signed-off-by: Thomas Labarussias fix Open method arg Signed-off-by: Thomas Labarussias refactor code to handle the channel logic, add Makefile helpers, add new rule Signed-off-by: Igor Eulalio add logs using proper plugin, finish configuration Signed-off-by: Igor Eulalio feat: add .envrc to gitignore Signed-off-by: Igor Eulalio feat: add .envrc to gitignore Signed-off-by: Igor Eulalio --- README.md | 1 + plugins/k8saudit-aks/.gitignore | 4 + plugins/k8saudit-aks/CHANGELOG.md | 1 + plugins/k8saudit-aks/Makefile | 43 ++++ plugins/k8saudit-aks/OWNERS | 3 + plugins/k8saudit-aks/README.md | 157 ++++++++++++ plugins/k8saudit-aks/falco_aks_audit.yaml | 7 + plugins/k8saudit-aks/falco_k8s_audit.yaml | 0 plugins/k8saudit-aks/go.mod | 40 +++ plugins/k8saudit-aks/go.sum | 134 ++++++++++ .../pkg/k8sauditaks/k8sauditaks.go | 233 ++++++++++++++++++ plugins/k8saudit-aks/plugin/main.go | 36 +++ registry.yaml | 28 +++ shared/go/azure/eventhub/go.mod | 16 ++ shared/go/azure/eventhub/go.sum | 52 ++++ shared/go/azure/eventhub/processor.go | 79 ++++++ 16 files changed, 834 insertions(+) create mode 100644 plugins/k8saudit-aks/.gitignore create mode 100644 plugins/k8saudit-aks/CHANGELOG.md create mode 100755 plugins/k8saudit-aks/Makefile create mode 100644 plugins/k8saudit-aks/OWNERS create mode 100644 plugins/k8saudit-aks/README.md create mode 100644 plugins/k8saudit-aks/falco_aks_audit.yaml create mode 100644 plugins/k8saudit-aks/falco_k8s_audit.yaml create mode 100644 plugins/k8saudit-aks/go.mod create mode 100644 plugins/k8saudit-aks/go.sum create mode 100644 plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go create mode 100644 plugins/k8saudit-aks/plugin/main.go create mode 100644 shared/go/azure/eventhub/go.mod create mode 100644 shared/go/azure/eventhub/go.sum create mode 100644 shared/go/azure/eventhub/processor.go diff --git a/README.md b/README.md index b87b3c93..c665de44 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ These comments and the text between them should not be edited by hand --> | [kafka](https://github.com/falcosecurity/plugins/tree/main/plugins/kafka) | **Event Sourcing**
ID: 18
`kafka` | Read events from Kafka topics into Falco

Authors: [Hunter Madison](https://falco.org/community)
License: Apache-2.0 | | [gitlab](https://github.com/an1245/falco-plugin-gitlab) | **Event Sourcing**
ID: 19
`gitlab`
**Field Extraction**
`gitlab` | Falco plugin providing basic runtime threat detection and auditing logging for GitLab

Authors: [Andy](https://github.com/an1245/falco-plugin-gitlab/issues)
License: Apache-2.0 | | [keycloak](https://github.com/mattiaforc/falco-keycloak-plugin) | **Event Sourcing**
ID: 20
`keycloak`
**Field Extraction**
`keycloak` | Falco plugin for sourcing and extracting Keycloak user/admin events

Authors: [Mattia Forcellese](https://github.com/mattiaforc/falco-keycloak-plugin/issues)
License: Apache-2.0 | +| [k8saudit-aks](https://github.com/falcosecurity/plugins/tree/main/plugins/k8saudit-aks) | **Event Sourcing**
ID: 21
`k8s_audit`
**Field Extraction**
`k8s_audit` | Read Kubernetes Audit Events from AWS AKS Clusters

Authors: [The Falco Authors](https://falco.org/community)
License: Apache-2.0 | diff --git a/plugins/k8saudit-aks/.gitignore b/plugins/k8saudit-aks/.gitignore new file mode 100644 index 00000000..831e20ff --- /dev/null +++ b/plugins/k8saudit-aks/.gitignore @@ -0,0 +1,4 @@ +test_files +libk8saudit-aks.so +.vscode +falco.yaml diff --git a/plugins/k8saudit-aks/CHANGELOG.md b/plugins/k8saudit-aks/CHANGELOG.md new file mode 100644 index 00000000..825c32f0 --- /dev/null +++ b/plugins/k8saudit-aks/CHANGELOG.md @@ -0,0 +1 @@ +# Changelog diff --git a/plugins/k8saudit-aks/Makefile b/plugins/k8saudit-aks/Makefile new file mode 100755 index 00000000..13f8664d --- /dev/null +++ b/plugins/k8saudit-aks/Makefile @@ -0,0 +1,43 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024 The Falco Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +SHELL=/bin/bash -o pipefail +GO ?= go + +NAME := k8saudit-aks +OUTPUT := lib$(NAME).so + +ifeq ($(DEBUG), 1) + GODEBUGFLAGS= GODEBUG=cgocheck=1 +else + GODEBUGFLAGS= GODEBUG=cgocheck=0 +endif + +all: $(OUTPUT) + +clean: + @rm -f *.so + +$(OUTPUT): clean + @$(GODEBUGFLAGS) $(GO) build -buildmode=c-shared -o $(OUTPUT) ./plugin + +readme: + @$(READMETOOL) -p ./$(OUTPUT) -f README.md + +copy: all + @cp ./$(OUTPUT) /usr/share/falco/plugins/ + @cp falco.yaml /etc/falco/falco.yaml + +run-falco: copy + @falco -c /etc/falco/falco.yaml diff --git a/plugins/k8saudit-aks/OWNERS b/plugins/k8saudit-aks/OWNERS new file mode 100644 index 00000000..78125ee5 --- /dev/null +++ b/plugins/k8saudit-aks/OWNERS @@ -0,0 +1,3 @@ +approvers: + - IgorEulalio + - Issif diff --git a/plugins/k8saudit-aks/README.md b/plugins/k8saudit-aks/README.md new file mode 100644 index 00000000..99ca5cef --- /dev/null +++ b/plugins/k8saudit-aks/README.md @@ -0,0 +1,157 @@ +# Kubernetes Audit Events Plugin for AKS + +## Introduction + +This plugin extends Falco to support [Kubernetes Audit Events](https://kubernetes.io/docs/tasks/debug-application-cluster/audit/#audit-backends) from AWS AKS clusters as a new data source. +For more details about what Audit logs are, see the [README of k8saudit plugin](https://github.com/falcosecurity/plugins/blob/main/plugins/k8saudit/README.md). + +### Functionality + +This plugin supports consuming Kubernetes Audit Events stored in Azure Event Hub for the AKS Clusters, see [Azure official documentation](https://learn.microsoft.com/en-us/azure/aks/monitor-aks#aks-control-planeresource-logs) for details. + +## Capabilities + +The `k8saudit-aks` uses the field extraction methods of the [`k8saudit`](https://github.com/falcosecurity/plugins/tree/main/plugins/k8saudit) plugin as the format for the Audit Logs is same. + +### Event Source + +The event source for Kubernetes Audit Events from AKS is `k8s_audit`, it allows to use same rules than `k8saudit` plugin. + +### Supported Fields + +Here is the current set of supported fields (from `k8saudit` plugin's extractor): + + +| NAME | TYPE | ARG | DESCRIPTION | +|----------------------------------------------------|-----------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `ka.auditid` | `string` | None | The unique id of the audit event | +| `ka.stage` | `string` | None | Stage of the request (e.g. RequestReceived, ResponseComplete, etc.) | +| `ka.auth.decision` | `string` | None | The authorization decision | +| `ka.auth.reason` | `string` | None | The authorization reason | +| `ka.auth.openshift.decision` | `string` | None | The authentication decision of the openshfit apiserver extention. Only available on openshift clusters | +| `ka.auth.openshift.username` | `string` | None | The user name performing the openshift authentication operation. Only available on openshift clusters | +| `ka.user.name` | `string` | None | The user name performing the request | +| `ka.user.groups` | `string (list)` | None | The groups to which the user belongs | +| `ka.impuser.name` | `string` | None | The impersonated user name | +| `ka.verb` | `string` | None | The action being performed | +| `ka.uri` | `string` | None | The request URI as sent from client to server | +| `ka.uri.param` | `string` | Key, Required | The value of a given query parameter in the uri (e.g. when uri=/foo?key=val, ka.uri.param[key] is val). | +| `ka.target.name` | `string` | None | The target object name | +| `ka.target.namespace` | `string` | None | The target object namespace | +| `ka.target.resource` | `string` | None | The target object resource | +| `ka.target.subresource` | `string` | None | The target object subresource | +| `ka.target.pod.name` | `string` | None | The target pod name | +| `ka.req.binding.subjects` | `string (list)` | None | When the request object refers to a cluster role binding, the subject (e.g. account/users) being linked by the binding | +| `ka.req.binding.role` | `string` | None | When the request object refers to a cluster role binding, the role being linked by the binding | +| `ka.req.binding.subject.has_name` | `string` | Key, Required | Deprecated, always returns "N/A". Only provided for backwards compatibility | +| `ka.req.configmap.name` | `string` | None | If the request object refers to a configmap, the configmap name | +| `ka.req.configmap.obj` | `string` | None | If the request object refers to a configmap, the entire configmap object | +| `ka.req.pod.containers.image` | `string (list)` | Index | When the request object refers to a pod, the container's images. | +| `ka.req.container.image` | `string` | None | Deprecated by ka.req.pod.containers.image. Returns the image of the first container only | +| `ka.req.pod.containers.image.repository` | `string (list)` | Index | The same as req.container.image, but only the repository part (e.g. falcosecurity/falco). | +| `ka.req.container.image.repository` | `string` | None | Deprecated by ka.req.pod.containers.image.repository. Returns the repository of the first container only | +| `ka.req.pod.host_ipc` | `string` | None | When the request object refers to a pod, the value of the hostIPC flag. | +| `ka.req.pod.host_network` | `string` | None | When the request object refers to a pod, the value of the hostNetwork flag. | +| `ka.req.container.host_network` | `string` | None | Deprecated alias for ka.req.pod.host_network | +| `ka.req.pod.host_pid` | `string` | None | When the request object refers to a pod, the value of the hostPID flag. | +| `ka.req.pod.containers.host_port` | `string (list)` | Index | When the request object refers to a pod, all container's hostPort values. | +| `ka.req.pod.containers.privileged` | `string (list)` | Index | When the request object refers to a pod, the value of the privileged flag for all containers. | +| `ka.req.container.privileged` | `string` | None | Deprecated by ka.req.pod.containers.privileged. Returns true if any container has privileged=true | +| `ka.req.pod.containers.allow_privilege_escalation` | `string (list)` | Index | When the request object refers to a pod, the value of the allowPrivilegeEscalation flag for all containers | +| `ka.req.pod.containers.read_only_fs` | `string (list)` | Index | When the request object refers to a pod, the value of the readOnlyRootFilesystem flag for all containers | +| `ka.req.pod.run_as_user` | `string` | None | When the request object refers to a pod, the runAsUser uid specified in the security context for the pod. See ....containers.run_as_user for the runAsUser for individual containers | +| `ka.req.pod.containers.run_as_user` | `string (list)` | Index | When the request object refers to a pod, the runAsUser uid for all containers | +| `ka.req.pod.containers.eff_run_as_user` | `string (list)` | Index | When the request object refers to a pod, the initial uid that will be used for all containers. This combines information from both the pod and container security contexts and uses 0 if no uid is specified | +| `ka.req.pod.run_as_group` | `string` | None | When the request object refers to a pod, the runAsGroup gid specified in the security context for the pod. See ....containers.run_as_group for the runAsGroup for individual containers | +| `ka.req.pod.containers.run_as_group` | `string (list)` | Index | When the request object refers to a pod, the runAsGroup gid for all containers | +| `ka.req.pod.containers.eff_run_as_group` | `string (list)` | Index | When the request object refers to a pod, the initial gid that will be used for all containers. This combines information from both the pod and container security contexts and uses 0 if no gid is specified | +| `ka.req.pod.containers.proc_mount` | `string (list)` | Index | When the request object refers to a pod, the procMount types for all containers | +| `ka.req.role.rules` | `string (list)` | None | When the request object refers to a role/cluster role, the rules associated with the role | +| `ka.req.role.rules.apiGroups` | `string (list)` | Index | When the request object refers to a role/cluster role, the api groups associated with the role's rules | +| `ka.req.role.rules.nonResourceURLs` | `string (list)` | Index | When the request object refers to a role/cluster role, the non resource urls associated with the role's rules | +| `ka.req.role.rules.verbs` | `string (list)` | Index | When the request object refers to a role/cluster role, the verbs associated with the role's rules | +| `ka.req.role.rules.resources` | `string (list)` | Index | When the request object refers to a role/cluster role, the resources associated with the role's rules | +| `ka.req.pod.fs_group` | `string` | None | When the request object refers to a pod, the fsGroup gid specified by the security context. | +| `ka.req.pod.supplemental_groups` | `string (list)` | None | When the request object refers to a pod, the supplementalGroup gids specified by the security context. | +| `ka.req.pod.containers.add_capabilities` | `string (list)` | Index | When the request object refers to a pod, all capabilities to add when running the container. | +| `ka.req.service.type` | `string` | None | When the request object refers to a service, the service type | +| `ka.req.service.ports` | `string (list)` | Index | When the request object refers to a service, the service's ports | +| `ka.req.pod.volumes.hostpath` | `string (list)` | Index | When the request object refers to a pod, all hostPath paths specified for all volumes | +| `ka.req.volume.hostpath` | `string` | Key, Required | Deprecated by ka.req.pod.volumes.hostpath. Return true if the provided (host) path prefix is used by any volume | +| `ka.req.pod.volumes.flexvolume_driver` | `string (list)` | Index | When the request object refers to a pod, all flexvolume drivers specified for all volumes | +| `ka.req.pod.volumes.volume_type` | `string (list)` | Index | When the request object refers to a pod, all volume types for all volumes | +| `ka.resp.name` | `string` | None | The response object name | +| `ka.response.code` | `string` | None | The response code | +| `ka.response.reason` | `string` | None | The response reason (usually present only for failures) | +| `ka.useragent` | `string` | None | The useragent of the client who made the request to the apiserver | +| `ka.sourceips` | `string (list)` | Index | The IP addresses of the client who made the request to the apiserver | +| `ka.cluster.name` | `string` | None | The name of the k8s cluster | + + +## Usage + +### Configuration + +Here's an example of configuration of `falco.yaml`: + +```yaml +plugins: + - name: k8saudit-aks + library_path: libk8saudit-aks.so + init_config: + event_hub_namespace_connection_string: "xxxx" + event_hub_name: "" + blob_storage_connection_string: "xxxxx" + blob_storage_container_name: "" + rate_limit_events_per_second: 100 + rate_limit_burst: 200 + open_params: "my-cluster" + - name: json + library_path: libjson.so + init_config: "" + +load_plugins: [k8saudit-aks, json] +``` + +**Initialization Config**: +* `event_hub_namespace_connection_string`: The connection string of the EventHub Namespace to read from +* `event_hub_name`: The name of the EventHub to read from +* `blob_storage_connection_string`: The connection string of the Blob Storage to use as checkpoint store +* `blob_storage_container_name`: The name of the Blob Storage container to use as checkpoint store +* `rate_limit_events_per_second`: The rate limit of events per second to read from EventHub +* `rate_limit_burst`: The rate limit burst of events to read from EventHub + +**Open Parameters** + +Todo + +### Rules + +The `k8saudit-aks` plugin ships with no default rule for test purpose, you can use the same rules than those for `k8saudit` plugin. See [here](https://github.com/falcosecurity/plugins/blob/main/plugins/k8saudit/rules/k8s_audit_rules.yaml). + +To test if it works anyway, you can still use this one for example: + +```yaml +- required_engine_version: 15 +- required_plugin_versions: + - name: k8saudit-aks + version: 0.1.0 + +- rule: Dummy rule + desc: > + Dummy rule + condition: > + ka.verb in (get,create,delete,update) + output: user=%ka.user.name verb=%ka.verb target=%ka.target.name target.namespace=%ka.target.namespace resource=%ka.target.resource + priority: WARNING + source: k8s_audit + tags: [k8s] +``` + +### Running locally + +Todo + +### Running in AKS + +Todo diff --git a/plugins/k8saudit-aks/falco_aks_audit.yaml b/plugins/k8saudit-aks/falco_aks_audit.yaml new file mode 100644 index 00000000..90c4e481 --- /dev/null +++ b/plugins/k8saudit-aks/falco_aks_audit.yaml @@ -0,0 +1,7 @@ +- rule: K8s Audit Event Detected + desc: A test rule that detects any Kubernetes audit event + condition: ka.req exists + output: "K8s Audit Event Detected: %ka.req" + priority: DEBUG + source: k8s_audit + tags: [testing, k8s_audit] diff --git a/plugins/k8saudit-aks/falco_k8s_audit.yaml b/plugins/k8saudit-aks/falco_k8s_audit.yaml new file mode 100644 index 00000000..e69de29b diff --git a/plugins/k8saudit-aks/go.mod b/plugins/k8saudit-aks/go.mod new file mode 100644 index 00000000..e1a98589 --- /dev/null +++ b/plugins/k8saudit-aks/go.mod @@ -0,0 +1,40 @@ +module github.com/falcosecurity/plugins/plugins/k8saudit-aks + +go 1.21.3 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0 + github.com/falcosecurity/plugin-sdk-go v0.7.4 + github.com/falcosecurity/plugins/plugins/k8saudit v0.11.0 + github.com/falcosecurity/plugins/plugins/k8saudit-eks v0.6.0 + github.com/falcosecurity/plugins/shared/go/azure/eventhub v0.0.0-00010101000000-000000000000 + github.com/invopop/jsonschema v0.12.0 + golang.org/x/time v0.8.0 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect + github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b // indirect + github.com/aws/aws-sdk-go v1.54.3 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect + github.com/falcosecurity/plugins/shared/go/aws/cloudwatchlogs v0.0.0-20240617170800-b69d0d091240 // indirect + github.com/falcosecurity/plugins/shared/go/aws/session v0.0.0-20240617170800-b69d0d091240 // indirect + github.com/iancoleman/orderedmap v0.3.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/rs/zerolog v1.33.0 // indirect + github.com/valyala/fastjson v1.6.4 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/falcosecurity/plugins/shared/go/azure/eventhub => ../../shared/go/azure/eventhub diff --git a/plugins/k8saudit-aks/go.sum b/plugins/k8saudit-aks/go.sum new file mode 100644 index 00000000..890f13dc --- /dev/null +++ b/plugins/k8saudit-aks/go.sum @@ -0,0 +1,134 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 h1:JZg6HRh6W6U4OLl6lk7BZ7BLisIzM9dG1R50zUk9C/M= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0/go.mod h1:YL1xnZ6QejvQHWJrX/AvhFl4WW4rqHVoKspWNVwFk0M= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 h1:6bVZts/82H+hax9b3vdmSpi7+Hw9uWvEaJHeKlafnW4= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3/go.mod h1:qf3s/6aV9ePKYGeEYPsbndK6GGfeS7SrbA6OE/T7NIA= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0 h1:mlmW46Q0B79I+Aj4azKC6xDMFN9a9SyZWESlGWYXbFs= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0/go.mod h1:PXe2h+LKcWTX9afWdZoHyODqR4fBa5boUM/8uJfZ0Jo= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b h1:doCpXjVwui6HUN+xgNsNS3SZ0/jUZ68Eb+mJRNOZfog= +github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b/go.mod h1:/n6+1/DWPltRLWL/VKyUxg6tzsl5kHUCcraimt4vr60= +github.com/aws/aws-sdk-go v1.44.51/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/aws/aws-sdk-go v1.54.3 h1:Bk+EXoq6v5I1xmHR9GQGpsMWZZFXs+FD+5uPyEmfgX0= +github.com/aws/aws-sdk-go v1.54.3/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/falcosecurity/plugin-sdk-go v0.7.4 h1:iNV0pgWgJwOHqSCjTw4Hsvtu5WuwoqckAWzpIEy9giQ= +github.com/falcosecurity/plugin-sdk-go v0.7.4/go.mod h1:NP+y22DYOS+G3GDXIXNmzf0CBL3nfPPMoQuHvAzfitQ= +github.com/falcosecurity/plugins/plugins/k8saudit v0.11.0 h1:ywwQ8kQmMS0HL3PuwBSKUmERqePrCSnajxnSCNC0HQY= +github.com/falcosecurity/plugins/plugins/k8saudit v0.11.0/go.mod h1:RmSc1za6asI52w3uVhZGb/p6RoQr2OWmp/Zc8+kiMWw= +github.com/falcosecurity/plugins/plugins/k8saudit-eks v0.6.0 h1:jU3O1/Kng8OWAa+0Vx8HLbX6FlsGpgtlexmD9kgkcuw= +github.com/falcosecurity/plugins/plugins/k8saudit-eks v0.6.0/go.mod h1:bNQxQY1KMxW2UaOH2HZFyFE9q0O+0A+f/sDJYP6bCOk= +github.com/falcosecurity/plugins/shared/go/aws/cloudwatchlogs v0.0.0-20240617170800-b69d0d091240 h1:Qi+kDNXSLPhI3Z1kwv6OnqfFTsXGFXp/v9I6iEHqbiU= +github.com/falcosecurity/plugins/shared/go/aws/cloudwatchlogs v0.0.0-20240617170800-b69d0d091240/go.mod h1:CYl1dfwy+MAU+4rvPydDdGkYWwEalaHx/SHMQyx8GJ8= +github.com/falcosecurity/plugins/shared/go/aws/session v0.0.0-20240617170800-b69d0d091240 h1:zu8iIYjzOBXM0C1UzTUPD02SRQH7OOw+MQplH2SqMkw= +github.com/falcosecurity/plugins/shared/go/aws/session v0.0.0-20240617170800-b69d0d091240/go.mod h1:k9mEexvqw4joSDsoN9n5NCO0T6qXOFEIxI141ZLr3t4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= +github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= +github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= +github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go new file mode 100644 index 00000000..d7da998d --- /dev/null +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -0,0 +1,233 @@ +package k8sauditaks + +import ( + "context" + "encoding/json" + "log" + "os" + "regexp" + "strconv" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins" + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source" + "github.com/falcosecurity/plugins/plugins/k8saudit/pkg/k8saudit" + falcoeventhub "github.com/falcosecurity/plugins/shared/go/azure/eventhub" + "github.com/invopop/jsonschema" + "golang.org/x/time/rate" +) + +const pluginName = "k8saudit-aks" +const regExpAuditID = `"auditID":[ a-z0-9-"]+` + +var regExpCAuditID *regexp.Regexp + +type Plugin struct { + k8saudit.Plugin + Logger *log.Logger + Config PluginConfig +} + +type PluginConfig struct { + EventHubNamespaceConnectionString string `json:"event_hub_namespace_connection_string" jsonschema:"title=event_hub_namespace_connection_string,description=The connection string of the EventHub Namespace to read from"` + EventHubName string `json:"event_hub_name" jsonschema:"title=event_hub_name,description=The name of the EventHub to read from"` + BlobStorageConnectionString string `json:"blob_storage_connection_string" jsonschema:"title=blob_storage_connection_string,description=The connection string of the Blob Storage to use as checkpoint store"` + BlobStorageContainerName string `json:"blob_storage_container_name" jsonschema:"title=blob_storage_container_name,description=The name of the Blob Storage container to use as checkpoint store"` + RateLimitEventsPerSecond int `json:"rate_limit_events_per_second" jsonschema:"title=rate_limit_events_per_second,description=The rate limit of events per second to read from EventHub"` + RateLimitBurst int `json:"rate_limit_burst" jsonschema:"title=rate_limit_burst,description=The rate limit burst of events to read from EventHub"` +} + +func (p *Plugin) Info() *plugins.Info { + return &plugins.Info{ + ID: 21, + Name: pluginName, + Description: "Read Kubernetes Audit Events for AKS from EventHub and use blob storage as checkpoint store", + Contact: "github.com/falcosecurity/plugins", + Version: "0.1.0", + EventSource: "k8s_audit", + } +} + +// Reset sets the configuration to its default values +func (p *PluginConfig) Reset() { + if i := os.Getenv("EVENTHUB_NAMESPACE_CONNECTION_STRING"); i != "" { + p.EventHubNamespaceConnectionString = i + } + if i := os.Getenv("EVENTHUB_NAME"); i != "" { + p.EventHubName = i + } + if i := os.Getenv("BLOB_STORAGE_CONNECTION_STRING"); i != "" { + p.BlobStorageConnectionString = i + } + if i := os.Getenv("BLOB_STORAGE_CONTAINER_NAME"); i != "" { + p.BlobStorageContainerName = i + } + if i := os.Getenv("RATE_LIMIT_EVENTS_PER_SECOND"); i != "" { + rateLimitEventsPerSecond, err := strconv.Atoi(i) + if err != nil { + return + } + p.RateLimitEventsPerSecond = rateLimitEventsPerSecond + } else { + p.RateLimitEventsPerSecond = 100 + } + if i := os.Getenv("RATE_LIMIT_BURST"); i != "" { + rateLimitBurst, err := strconv.Atoi(i) + if err != nil { + return + } + p.RateLimitBurst = rateLimitBurst + } else { + p.RateLimitBurst = 200 + } +} + +func (p *Plugin) Init(cfg string) error { + // read configuration + p.Plugin.Config.Reset() + p.Config.Reset() + + err := json.Unmarshal([]byte(cfg), &p.Config) + if err != nil { + return err + } + + regExpCAuditID, err = regexp.Compile(regExpAuditID) + if err != nil { + return err + } + + p.Logger = log.New(os.Stderr, "["+pluginName+"] ", log.LstdFlags|log.LUTC|log.Lmsgprefix) + + return nil +} + +func (p *Plugin) InitSchema() *sdk.SchemaInfo { + reflector := jsonschema.Reflector{ + // all properties are optional by default + RequiredFromJSONSchemaTags: true, + // unrecognized properties don't cause a parsing failures + AllowAdditionalProperties: true, + } + if schema, err := reflector.Reflect(&PluginConfig{}).MarshalJSON(); err == nil { + return &sdk.SchemaInfo{ + Schema: string(schema), + } + } + return nil +} + +func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { + return []sdk.OpenParam{ + {Value: "default", Desc: "Cluster Name"}, + }, nil +} + +func (p *Plugin) Open(_ string) (source.Instance, error) { + ctx, _ := context.WithCancel(context.Background()) + + checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) + if err != nil { + p.Logger.Printf("error opening connection to blob storage: %v", err) + return nil, err + } + p.Logger.Printf("opened connection to blob storage") + checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil) + if err != nil { + p.Logger.Printf("error opening blob checkpoint connection: %v", err) + return nil, err + } + p.Logger.Printf("opened blob checkpoint connection") + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString( + p.Config.EventHubNamespaceConnectionString, + p.Config.EventHubName, + azeventhubs.DefaultConsumerGroup, + nil, + ) + p.Logger.Printf("opened consumer client") + if err != nil { + p.Logger.Printf("error creating consumer client: %v", err) + return nil, err + } + // Do not defer closing the consumerClient here + + processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) + if err != nil { + p.Logger.Printf("error creating eventhub processor: %v", err) + return nil, err + } + + rateLimiter := rate.NewLimiter(rate.Limit(p.Config.RateLimitEventsPerSecond), p.Config.RateLimitBurst) + + falcoEventHubProcessor := falcoeventhub.Processor{ + RateLimiter: rateLimiter, + } + + p.Logger.Printf("created eventhub processor") + + eventsC := make(chan falcoeventhub.Record) + pushEventC := make(chan source.PushEvent) + + go func() { + for { + partitionClient := processor.NextPartitionClient(context.Background()) + if partitionClient == nil { + break + } + go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) { + if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil { + p.Logger.Printf("error processing partition client: %v", err) + } + }(partitionClient, eventsC) + } + }() + + // Process events and send to pushEventC + go func() { + for { + select { + case i, ok := <-eventsC: + if !ok { + return + } + values, err := p.Plugin.ParseAuditEventsPayload([]byte(i.Properties.Log)) + if err != nil { + p.Logger.Println(err) + continue + } + for _, j := range values { + if j.Err != nil { + p.Logger.Println(j.Err) + continue + } + pushEventC <- *j + } + case <-ctx.Done(): + p.Logger.Println("context done in eventsC") + return + } + } + }() + + // Run the processor + go func() { + if err := processor.Run(ctx); err != nil { + p.Logger.Printf("error running processor: %v", err) + } + }() + + return source.NewPushInstance( + pushEventC, + source.WithInstanceClose(func() { + // Close consumerClient when the context is canceled + if err := consumerClient.Close(context.Background()); err != nil { + p.Logger.Printf("error closing consumer client: %v", err) + } + // Close pushEventC to signal no more events + close(pushEventC) + }), + ) +} diff --git a/plugins/k8saudit-aks/plugin/main.go b/plugins/k8saudit-aks/plugin/main.go new file mode 100644 index 00000000..a7aadee0 --- /dev/null +++ b/plugins/k8saudit-aks/plugin/main.go @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2023 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins" + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/extractor" + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source" + "github.com/falcosecurity/plugins/plugins/k8saudit-aks/pkg/k8sauditaks" +) + +func init() { + plugins.SetFactory(func() plugins.Plugin { + p := &k8sauditaks.Plugin{} + source.Register(p) + extractor.Register(p) + return p + }) +} + +func main() {} diff --git a/registry.yaml b/registry.yaml index ceb4218d..60fabbd4 100644 --- a/registry.yaml +++ b/registry.yaml @@ -516,3 +516,31 @@ plugins: source: keycloak extraction: supported: true + - name: k8saudit-aks + description: Read Kubernetes Audit Events from AWS AKS Clusters + authors: The Falco Authors + contact: https://falco.org/community + maintainers: + - name: The Falco Authors + email: cncf-falco-dev@lists.cncf.io + url: https://github.com/falcosecurity/plugins/tree/main/plugins/k8saudit-aks + rules_url: https://github.com/falcosecurity/plugins/tree/main/plugins/k8saudit/rules + license: Apache-2.0 + signature: + cosign: + certificate-oidc-issuer: https://token.actions.githubusercontent.com + certificate-identity-regexp: https://github.com/falcosecurity/plugins/ + keywords: + - audit + - audit-log + - audit-events + - kubernetes + - aks + - azure + capabilities: + sourcing: + supported: true + id: 21 + source: k8s_audit + extraction: + supported: true \ No newline at end of file diff --git a/shared/go/azure/eventhub/go.mod b/shared/go/azure/eventhub/go.mod new file mode 100644 index 00000000..85219312 --- /dev/null +++ b/shared/go/azure/eventhub/go.mod @@ -0,0 +1,16 @@ +module github.com/falcosecurity/plugins/shared/go/azure/eventhub + +go 1.21.3 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 + golang.org/x/time v0.8.0 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/text v0.16.0 // indirect +) diff --git a/shared/go/azure/eventhub/go.sum b/shared/go/azure/eventhub/go.sum new file mode 100644 index 00000000..d333dd12 --- /dev/null +++ b/shared/go/azure/eventhub/go.sum @@ -0,0 +1,52 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 h1:6bVZts/82H+hax9b3vdmSpi7+Hw9uWvEaJHeKlafnW4= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3/go.mod h1:qf3s/6aV9ePKYGeEYPsbndK6GGfeS7SrbA6OE/T7NIA= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt2H7QXzZs0q8UBjgRbl56qo8GYM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go new file mode 100644 index 00000000..54225c4f --- /dev/null +++ b/shared/go/azure/eventhub/processor.go @@ -0,0 +1,79 @@ +package eventhub + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "golang.org/x/time/rate" +) + +type Processor struct { + RateLimiter *rate.Limiter +} + +type Record struct { + Properties struct { + Log string `json:"log"` + } `json:"properties"` +} + +type Event struct { + Records []Record `json:"records"` +} + +func (p *Processor) Process( + partitionClient *azeventhubs.ProcessorPartitionClient, + recordChan chan<- Record, +) error { + defer closePartitionResources(partitionClient) + + for { + ctx := context.Background() + + receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) + fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID()) + events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) + fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID()) + receiveCtxCancel() + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return err + } + + for _, event := range events { + eventData, err := UnmarshallEvent(event.Body) + if err != nil { + return err + } + for _, record := range eventData.Records { + ctx := context.Background() + err := p.RateLimiter.Wait(ctx) + if err != nil { + continue + } + recordChan <- record + } + + if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil { + return err + } + fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID()) + } + } +} + +func UnmarshallEvent(eventJObj []byte) (*Event, error) { + var event Event + err := json.Unmarshal(eventJObj, &event) + if err != nil { + return nil, err + } + return &event, nil +} + +func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) { + defer partitionClient.Close(context.TODO()) +} From 382547cf799fd3535cd379be3c34587a3a9acd27 Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Mon, 16 Dec 2024 18:56:07 -0300 Subject: [PATCH 2/8] feat: refactor to remove print logs, add .envrc to .gitignore, configure proper resource shutdown for partitionClient Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/.gitignore | 1 + plugins/k8saudit-aks/falco_k8s_audit.yaml | 0 .../k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go | 16 ++++++++++++---- shared/go/azure/eventhub/processor.go | 4 ---- 4 files changed, 13 insertions(+), 8 deletions(-) delete mode 100644 plugins/k8saudit-aks/falco_k8s_audit.yaml diff --git a/plugins/k8saudit-aks/.gitignore b/plugins/k8saudit-aks/.gitignore index 831e20ff..449c6f7e 100644 --- a/plugins/k8saudit-aks/.gitignore +++ b/plugins/k8saudit-aks/.gitignore @@ -2,3 +2,4 @@ test_files libk8saudit-aks.so .vscode falco.yaml +.envrc diff --git a/plugins/k8saudit-aks/falco_k8s_audit.yaml b/plugins/k8saudit-aks/falco_k8s_audit.yaml deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index d7da998d..a0bd3cde 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -127,8 +127,8 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { } func (p *Plugin) Open(_ string) (source.Instance, error) { - ctx, _ := context.WithCancel(context.Background()) - + ctx, cancel := context.WithCancel(context.Background()) + checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) if err != nil { p.Logger.Printf("error opening connection to blob storage: %v", err) @@ -152,7 +152,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { p.Logger.Printf("error creating consumer client: %v", err) return nil, err } - // Do not defer closing the consumerClient here processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) if err != nil { @@ -173,10 +172,17 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { go func() { for { - partitionClient := processor.NextPartitionClient(context.Background()) + partitionClient := processor.NextPartitionClient(ctx) if partitionClient == nil { break } + defer func() { + // Ensure that pc.Close() is called when the goroutine ends, + // regardless of whether Process returned an error. + if cerr := partitionClient.Close(ctx); cerr != nil { + p.Logger.Printf("error closing partition client: %v", cerr) + } + }() go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) { if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil { p.Logger.Printf("error processing partition client: %v", err) @@ -228,6 +234,8 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { } // Close pushEventC to signal no more events close(pushEventC) + // Cancel the context so that the processor stops + cancel() }), ) } diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index 54225c4f..eb0c8ec2 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -35,9 +34,7 @@ func (p *Processor) Process( ctx := context.Background() receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) - fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID()) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) - fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID()) receiveCtxCancel() if err != nil && !errors.Is(err, context.DeadlineExceeded) { return err @@ -60,7 +57,6 @@ func (p *Processor) Process( if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil { return err } - fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID()) } } } From 3b9af8b05db3e84e29e96e35a69de30cac67ef40 Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Mon, 16 Dec 2024 19:13:52 -0300 Subject: [PATCH 3/8] feat: handling channel closenes in order Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go | 8 ++++---- shared/go/azure/eventhub/processor.go | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index a0bd3cde..f682f629 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -128,7 +128,7 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { func (p *Plugin) Open(_ string) (source.Instance, error) { ctx, cancel := context.WithCancel(context.Background()) - + checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) if err != nil { p.Logger.Printf("error opening connection to blob storage: %v", err) @@ -184,7 +184,7 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { } }() go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) { - if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil { + if err := falcoEventHubProcessor.Process(partitionClient, eventsC, ctx); err != nil { p.Logger.Printf("error processing partition client: %v", err) } }(partitionClient, eventsC) @@ -232,10 +232,10 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { if err := consumerClient.Close(context.Background()); err != nil { p.Logger.Printf("error closing consumer client: %v", err) } - // Close pushEventC to signal no more events - close(pushEventC) // Cancel the context so that the processor stops cancel() + // Close pushEventC to signal no more events + close(pushEventC) }), ) } diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index eb0c8ec2..8ff609fd 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -27,12 +27,11 @@ type Event struct { func (p *Processor) Process( partitionClient *azeventhubs.ProcessorPartitionClient, recordChan chan<- Record, + ctx context.Context, ) error { defer closePartitionResources(partitionClient) for { - ctx := context.Background() - receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) receiveCtxCancel() @@ -71,5 +70,5 @@ func UnmarshallEvent(eventJObj []byte) (*Event, error) { } func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) { - defer partitionClient.Close(context.TODO()) + defer partitionClient.Close(context.Background()) } From 69bd186026f445fadd5d56ad5a35bd96ba29fd5d Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Tue, 17 Dec 2024 13:15:55 -0300 Subject: [PATCH 4/8] chore: remove unused print Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index f682f629..ec01fb18 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -212,7 +212,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { pushEventC <- *j } case <-ctx.Done(): - p.Logger.Println("context done in eventsC") return } } From 3b08d18183996241722126cae9cd4e67754d150a Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Tue, 17 Dec 2024 13:35:53 -0300 Subject: [PATCH 5/8] chore: fix license description on all files, remove test rules file, fix typo on README Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/README.md | 2 +- plugins/k8saudit-aks/falco_aks_audit.yaml | 7 ------- .../k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go | 13 +++++++++++++ plugins/k8saudit-aks/plugin/main.go | 2 +- shared/go/azure/eventhub/processor.go | 14 ++++++++++++++ 5 files changed, 29 insertions(+), 9 deletions(-) delete mode 100644 plugins/k8saudit-aks/falco_aks_audit.yaml diff --git a/plugins/k8saudit-aks/README.md b/plugins/k8saudit-aks/README.md index 99ca5cef..5efc637c 100644 --- a/plugins/k8saudit-aks/README.md +++ b/plugins/k8saudit-aks/README.md @@ -2,7 +2,7 @@ ## Introduction -This plugin extends Falco to support [Kubernetes Audit Events](https://kubernetes.io/docs/tasks/debug-application-cluster/audit/#audit-backends) from AWS AKS clusters as a new data source. +This plugin extends Falco to support [Kubernetes Audit Events](https://kubernetes.io/docs/tasks/debug-application-cluster/audit/#audit-backends) from AKS clusters as a new data source. For more details about what Audit logs are, see the [README of k8saudit plugin](https://github.com/falcosecurity/plugins/blob/main/plugins/k8saudit/README.md). ### Functionality diff --git a/plugins/k8saudit-aks/falco_aks_audit.yaml b/plugins/k8saudit-aks/falco_aks_audit.yaml deleted file mode 100644 index 90c4e481..00000000 --- a/plugins/k8saudit-aks/falco_aks_audit.yaml +++ /dev/null @@ -1,7 +0,0 @@ -- rule: K8s Audit Event Detected - desc: A test rule that detects any Kubernetes audit event - condition: ka.req exists - output: "K8s Audit Event Detected: %ka.req" - priority: DEBUG - source: k8s_audit - tags: [testing, k8s_audit] diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index ec01fb18..b00659ec 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -1,3 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package k8sauditaks import ( diff --git a/plugins/k8saudit-aks/plugin/main.go b/plugins/k8saudit-aks/plugin/main.go index a7aadee0..7ecc42e4 100644 --- a/plugins/k8saudit-aks/plugin/main.go +++ b/plugins/k8saudit-aks/plugin/main.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 /* -Copyright (C) 2023 The Falco Authors. +Copyright (C) 2024 The Falco Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index 8ff609fd..85d3f82e 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -1,3 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 +/* + Copyright (C) 2024 The Falco Authors. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package eventhub import ( From 80dafb5334014dffbd108803973f60a34bbef94f Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Tue, 17 Dec 2024 18:53:26 -0300 Subject: [PATCH 6/8] fix: add waitgroup to prevent sending messages on closed channels Signed-off-by: Igor Eulalio --- .../k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go | 14 +++++++++++--- shared/go/azure/eventhub/processor.go | 9 +++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index b00659ec..54a934f8 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -20,6 +20,7 @@ import ( "os" "regexp" "strconv" + "sync" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" @@ -176,6 +177,7 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { falcoEventHubProcessor := falcoeventhub.Processor{ RateLimiter: rateLimiter, + Logger: p.Logger, } p.Logger.Printf("created eventhub processor") @@ -204,8 +206,10 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { } }() - // Process events and send to pushEventC + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { select { case i, ok := <-eventsC: @@ -244,9 +248,13 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { if err := consumerClient.Close(context.Background()); err != nil { p.Logger.Printf("error closing consumer client: %v", err) } - // Cancel the context so that the processor stops + + // Cancel must be used here instead of as a defer to ensure that the context is canceled only when + // the plugin receive a signal from Falco cancel() - // Close pushEventC to signal no more events + + wg.Wait() + close(eventsC) close(pushEventC) }), ) diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index 85d3f82e..de0b3399 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "log" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -26,6 +27,7 @@ import ( type Processor struct { RateLimiter *rate.Limiter + Logger *log.Logger } type Record struct { @@ -59,12 +61,15 @@ func (p *Processor) Process( return err } for _, record := range eventData.Records { - ctx := context.Background() err := p.RateLimiter.Wait(ctx) if err != nil { continue } - recordChan <- record + select { + case <-ctx.Done(): + return nil + case recordChan <- record: + } } if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil { From da64977419f8299cf1d49dadee6f708aaa6fa36e Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Fri, 20 Dec 2024 17:48:53 -0300 Subject: [PATCH 7/8] feat: refactor it to use init config instead environment variables, add plugin max event size configuration Signed-off-by: Igor Eulalio --- .../pkg/k8sauditaks/k8sauditaks.go | 48 +++++-------------- shared/go/azure/eventhub/processor.go | 2 +- 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index 54a934f8..2e1ed2b4 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -19,7 +19,6 @@ import ( "log" "os" "regexp" - "strconv" "sync" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -52,6 +51,7 @@ type PluginConfig struct { BlobStorageContainerName string `json:"blob_storage_container_name" jsonschema:"title=blob_storage_container_name,description=The name of the Blob Storage container to use as checkpoint store"` RateLimitEventsPerSecond int `json:"rate_limit_events_per_second" jsonschema:"title=rate_limit_events_per_second,description=The rate limit of events per second to read from EventHub"` RateLimitBurst int `json:"rate_limit_burst" jsonschema:"title=rate_limit_burst,description=The rate limit burst of events to read from EventHub"` + MaxEventSize uint64 `json:"maxEventSize" jsonschema:"title=Maximum event size,description=Maximum size of single audit event (Default: 262144),default=262144"` } func (p *Plugin) Info() *plugins.Info { @@ -65,45 +65,20 @@ func (p *Plugin) Info() *plugins.Info { } } -// Reset sets the configuration to its default values -func (p *PluginConfig) Reset() { - if i := os.Getenv("EVENTHUB_NAMESPACE_CONNECTION_STRING"); i != "" { - p.EventHubNamespaceConnectionString = i - } - if i := os.Getenv("EVENTHUB_NAME"); i != "" { - p.EventHubName = i - } - if i := os.Getenv("BLOB_STORAGE_CONNECTION_STRING"); i != "" { - p.BlobStorageConnectionString = i - } - if i := os.Getenv("BLOB_STORAGE_CONTAINER_NAME"); i != "" { - p.BlobStorageContainerName = i - } - if i := os.Getenv("RATE_LIMIT_EVENTS_PER_SECOND"); i != "" { - rateLimitEventsPerSecond, err := strconv.Atoi(i) - if err != nil { - return - } - p.RateLimitEventsPerSecond = rateLimitEventsPerSecond - } else { - p.RateLimitEventsPerSecond = 100 - } - if i := os.Getenv("RATE_LIMIT_BURST"); i != "" { - rateLimitBurst, err := strconv.Atoi(i) - if err != nil { - return - } - p.RateLimitBurst = rateLimitBurst - } else { - p.RateLimitBurst = 200 - } +func (p *PluginConfig) SetDefault() { + p.RateLimitBurst = 200 + p.RateLimitEventsPerSecond = 100 +} + +// Resets sets the configuration to its default values +func (k *PluginConfig) Reset() { + k.MaxEventSize = uint64(sdk.DefaultEvtSize) } func (p *Plugin) Init(cfg string) error { - // read configuration - p.Plugin.Config.Reset() p.Config.Reset() - + p.Plugin.Config.Reset() + p.Config.SetDefault() err := json.Unmarshal([]byte(cfg), &p.Config) if err != nil { return err @@ -142,7 +117,6 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { func (p *Plugin) Open(_ string) (source.Instance, error) { ctx, cancel := context.WithCancel(context.Background()) - checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) if err != nil { p.Logger.Printf("error opening connection to blob storage: %v", err) diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index de0b3399..79a96762 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -48,7 +48,7 @@ func (p *Processor) Process( defer closePartitionResources(partitionClient) for { - receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) + receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Second*10) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) receiveCtxCancel() if err != nil && !errors.Is(err, context.DeadlineExceeded) { From f75ee797c2825ee31cdf722009f58e99f077bed8 Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Fri, 20 Dec 2024 18:08:52 -0300 Subject: [PATCH 8/8] chore: update docs Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/README.md | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/plugins/k8saudit-aks/README.md b/plugins/k8saudit-aks/README.md index 5efc637c..19c8c5e0 100644 --- a/plugins/k8saudit-aks/README.md +++ b/plugins/k8saudit-aks/README.md @@ -114,16 +114,16 @@ load_plugins: [k8saudit-aks, json] ``` **Initialization Config**: -* `event_hub_namespace_connection_string`: The connection string of the EventHub Namespace to read from -* `event_hub_name`: The name of the EventHub to read from -* `blob_storage_connection_string`: The connection string of the Blob Storage to use as checkpoint store -* `blob_storage_container_name`: The name of the Blob Storage container to use as checkpoint store -* `rate_limit_events_per_second`: The rate limit of events per second to read from EventHub -* `rate_limit_burst`: The rate limit burst of events to read from EventHub +* `event_hub_namespace_connection_string` (required): The connection string of the EventHub Namespace to read from +* `event_hub_name` (required) : The name of the EventHub to read from +* `blob_storage_connection_string` (required): The connection string of the Blob Storage to use as checkpoint store +* `blob_storage_container_name` (required): The name of the Blob Storage container to use as checkpoint store +* `rate_limit_events_per_second` (optional): The rate limit of events per second to read from EventHub +* `rate_limit_burst` (optional): The rate limit burst of events to read from EventHub **Open Parameters** -Todo +No open parameters are required for this plugin. ### Rules @@ -148,10 +148,3 @@ To test if it works anyway, you can still use this one for example: tags: [k8s] ``` -### Running locally - -Todo - -### Running in AKS - -Todo