Skip to content

Commit

Permalink
Use a message copy to apply fieldFilters in exec events
Browse files Browse the repository at this point in the history
[upstream commit: dca73e3]

As we keep them in the process cache, removing fields may affect the
correctness of the process cache. In order to fix that, we create a copy
of the event and apply fieldFilters only on the copy.

FIXES: #1428

Signed-off-by: Anastasios Papagiannis <[email protected]>
  • Loading branch information
tpapagian authored and willfindlay committed Oct 31, 2023
1 parent a621852 commit 5a43f7e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
15 changes: 12 additions & 3 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads FGS events
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads Tetragon events
type ioReaderClient struct {
scanner *bufio.Scanner
allowlist hubbleFilters.FilterFuncs
Expand Down Expand Up @@ -106,10 +107,18 @@ func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
filter.Filter(&res)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}
return &res, nil
return filterEvent, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
15 changes: 12 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -162,22 +163,30 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event
if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
}
for _, filter := range filters {
filter.Filter(event)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- event:
case aggregator.GetEventChannel() <- filterEvent:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(event); err != nil {
if err = server.Send(filterEvent); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down

0 comments on commit 5a43f7e

Please sign in to comment.