Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a message copy to apply fieldFilters in exec events #1693

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads FGS events
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads Tetragon events
type ioReaderClient struct {
scanner *bufio.Scanner
allowlist hubbleFilters.FilterFuncs
Expand Down Expand Up @@ -106,10 +107,18 @@ func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
filter.Filter(&res)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}
return &res, nil
return filterEvent, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
15 changes: 12 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -162,22 +163,30 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event
if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
}
for _, filter := range filters {
filter.Filter(event)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- event:
case aggregator.GetEventChannel() <- filterEvent:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(event); err != nil {
if err = server.Send(filterEvent); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down
Loading