Skip to content

Commit

Permalink
feat: refactor to remove print logs, add .envrc to .gitignore, config…
Browse files Browse the repository at this point in the history
…ure proper resource shutdown for partitionClient

Signed-off-by: Igor Eulalio <[email protected]>
  • Loading branch information
IgorEulalio committed Dec 16, 2024
1 parent bd182ca commit dc2544a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
1 change: 1 addition & 0 deletions plugins/k8saudit-aks/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ test_files
libk8saudit-aks.so
.vscode
falco.yaml
.envrc
Empty file.
16 changes: 12 additions & 4 deletions plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) {
}

func (p *Plugin) Open(_ string) (source.Instance, error) {
ctx, _ := context.WithCancel(context.Background())

ctx, cancel := context.WithCancel(context.Background())
checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil)
if err != nil {
p.Logger.Printf("error opening connection to blob storage: %v", err)
Expand All @@ -152,7 +152,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {
p.Logger.Printf("error creating consumer client: %v", err)
return nil, err
}
// Do not defer closing the consumerClient here

processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
Expand All @@ -173,10 +172,17 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {

go func() {
for {
partitionClient := processor.NextPartitionClient(context.Background())
partitionClient := processor.NextPartitionClient(ctx)
if partitionClient == nil {
break
}
defer func() {
// Ensure that pc.Close() is called when the goroutine ends,
// regardless of whether Process returned an error.
if cerr := partitionClient.Close(ctx); cerr != nil {
p.Logger.Printf("error closing partition client: %v", cerr)
}
}()
go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) {
if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil {
p.Logger.Printf("error processing partition client: %v", err)
Expand Down Expand Up @@ -228,6 +234,8 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {
}
// Close pushEventC to signal no more events
close(pushEventC)
// Cancel the context so that the processor stops
cancel()
}),
)
}
4 changes: 0 additions & 4 deletions shared/go/azure/eventhub/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
Expand Down Expand Up @@ -35,9 +34,7 @@ func (p *Processor) Process(
ctx := context.Background()

receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute)
fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID())
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID())
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
Expand All @@ -60,7 +57,6 @@ func (p *Processor) Process(
if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil {
return err
}
fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID())
}
}
}
Expand Down

0 comments on commit dc2544a

Please sign in to comment.