diff --git a/plugins/k8saudit-aks/.gitignore b/plugins/k8saudit-aks/.gitignore index 831e20ff..449c6f7e 100644 --- a/plugins/k8saudit-aks/.gitignore +++ b/plugins/k8saudit-aks/.gitignore @@ -2,3 +2,4 @@ test_files libk8saudit-aks.so .vscode falco.yaml +.envrc diff --git a/plugins/k8saudit-aks/falco_k8s_audit.yaml b/plugins/k8saudit-aks/falco_k8s_audit.yaml deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index d7da998d..a0bd3cde 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -127,8 +127,8 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { } func (p *Plugin) Open(_ string) (source.Instance, error) { - ctx, _ := context.WithCancel(context.Background()) - + ctx, cancel := context.WithCancel(context.Background()) + checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) if err != nil { p.Logger.Printf("error opening connection to blob storage: %v", err) @@ -152,7 +152,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { p.Logger.Printf("error creating consumer client: %v", err) return nil, err } - // Do not defer closing the consumerClient here processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) if err != nil { @@ -173,10 +172,17 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { go func() { for { - partitionClient := processor.NextPartitionClient(context.Background()) + partitionClient := processor.NextPartitionClient(ctx) if partitionClient == nil { break } + defer func() { + // Ensure that pc.Close() is called when the goroutine ends, + // regardless of whether Process returned an error. + if cerr := partitionClient.Close(ctx); cerr != nil { + p.Logger.Printf("error closing partition client: %v", cerr) + } + }() go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) { if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil { p.Logger.Printf("error processing partition client: %v", err) @@ -228,6 +234,8 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { } // Close pushEventC to signal no more events close(pushEventC) + // Cancel the context so that the processor stops + cancel() }), ) } diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index 54225c4f..eb0c8ec2 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -35,9 +34,7 @@ func (p *Processor) Process( ctx := context.Background() receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) - fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID()) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) - fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID()) receiveCtxCancel() if err != nil && !errors.Is(err, context.DeadlineExceeded) { return err @@ -60,7 +57,6 @@ func (p *Processor) Process( if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil { return err } - fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID()) } } }