diff --git a/cmd/tetra/getevents/io_reader_client.go b/cmd/tetra/getevents/io_reader_client.go index d86f2d077c3..acd3b87d0ef 100644 --- a/cmd/tetra/getevents/io_reader_client.go +++ b/cmd/tetra/getevents/io_reader_client.go @@ -16,10 +16,11 @@ import ( hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) // ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient. -// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads FGS events +// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads Tetragon events type ioReaderClient struct { scanner *bufio.Scanner allowlist hubbleFilters.FilterFuncs @@ -106,10 +107,18 @@ func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) { if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) { continue } + filterEvent := &res + if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters + // We need a copy of the exec event as modifing the original message + // can cause issues in the process cache (we keep a copy of that message there). + filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse) + } for _, filter := range i.fieldFilters { - filter.Filter(&res) + // we need not to change res + // maybe only for exec events + filter.Filter(filterEvent) } - return &res, nil + return filterEvent, nil } if err := i.scanner.Err(); err != nil { return nil, err diff --git a/pkg/server/server.go b/pkg/server/server.go index e243f2b0c29..3df14b94e81 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/version" "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" ) type Listener interface { @@ -162,14 +163,22 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon // Filter the GetEventsResponse fields filters := filters.FieldFiltersFromGetEventsRequest(request) + filterEvent := event + if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters + // We need a copy of the exec event as modifing the original message + // can cause issues in the process cache (we keep a copy of that message there). + filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse) + } for _, filter := range filters { - filter.Filter(event) + // we need not to change res + // maybe only for exec events + filter.Filter(filterEvent) } if aggregator != nil { // Send event to aggregator. select { - case aggregator.GetEventChannel() <- event: + case aggregator.GetEventChannel() <- filterEvent: default: logger.GetLogger(). WithField("request", request). @@ -177,7 +186,7 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon } } else { // No need to aggregate. Directly send out the response. - if err = server.Send(event); err != nil { + if err = server.Send(filterEvent); err != nil { s.ctxCleanupWG.Done() return err }