Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added discard filter #186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Added discard filter #186

wants to merge 1 commit into from

Conversation

helgeolav
Copy link
Contributor

In this PR I have added a discard filter. This filter allows me to

  1. Discard an event from further processing when I know that I will not need it. Today this has to be done by using a cond output, but you still need to send the event through all stages of processing before it is discarded.
  2. Discard the event in case we have back pressure issues. This can be handy if the output supports back pressure, the input does not and discarding the event is your best option in this case.

I also rewrote the filter handling routine to allow for an event to start at a specific filter, instead of at the beginning of the list of filters. I think there are many good ways to use this. I am currently working on a filter where my goal is to identify and remove all events of a kind expect the last one. To do this I need to discard all but the last event that I need to inject for further processing from the next filter in the pipeline.

Other good examples for the discard code can be logstash aggregate or logstash throttling.

@tsaikd
Copy link
Owner

tsaikd commented Jan 31, 2022

For filters execution priority, I prefer control by users config instead of FilterPos, so the users are more elastic to combine multiple components for their goals. Even using the same filter in multiple times and multiple positions with different configs.

In your case, you can suggest the users to put the filter at the beginning in your README.

@helgeolav
Copy link
Contributor Author

Hi

I think I did not explained this well. FilterPos was a way to serve two purposes:

  1. being able to completely discard events in the filter chain, as shown with the discard filter.
  2. have a way for a filter to take an event out of processing to be able to continue it later.

For my number two change I am working on a filter that is a bit like logstash aggregate. I am running this filter now in test, but I am currently using the filter as a standalone program (gogstash instance 1 -> my filter -> gogstash instance 2).

This filter looks at similar events (based on the value from a configurable key), queues them up in memory and if there are more than one event based on the same key only the last is kept, the others are discarded. After some configured time of inactivity (no new events based on the key value) the event is sent back into gogstash.

A workflow like this is not possible to do in gogstash without a mechanism like FilterPos. It is not my intention to use this field to jump/skip parts of the chain, for that we have cond.

@tsaikd
Copy link
Owner

tsaikd commented Jan 31, 2022

Thinking about this kind of filter:

  1. when receive the new event first time, keep in memory, no output now.
  2. when receive the next event, check the key exist in memory with the same key.
    2-1. if key exist: keep the last one
    2-2. if key not exist: store in memory
  3. flush memory in your config time period.

@helgeolav
Copy link
Contributor Author

The way the filter flow works is that it is single threaded (unless you have more workers), so I need a way to signal back that I want a message dropped. I cannot hold the queue, all filters have to do its work as quickly as possible.

The way you described above is exactly the way I planned to implement it. Step 3 is the hardest, and to be able to flush I need a mechanism to send an event into the next filter in the filter chain.

I can’t see any good way to handle this other than being able to insert an event into the chain at a configurable spot.

@tsaikd
Copy link
Owner

tsaikd commented Jan 31, 2022

There are only one input channel chInFilter and one output channel chFilterOut in a process, and all filters are running in one go-routine, so I think it's ok to be treated as a single thread modal in the filter point of view.

flush example: by FIFO

	var eventPool ThreadSafeTreeWithTimeKey // member of filter
	timer := time.NewTimer(config.FlushDuration)
	for {
		select {
		case <-timer.C:
			processBeforeTime := time.Now().Add(-config.FlushBeforeDuration)
			for {
				eventPool.lock()
				event := eventPool.first()
				if event == nil || event.Time.After(processBeforeTime) {
					eventPool.unlock()
					break
				}
				eventPool.remove(event)
				eventPool.unlock()
				chFilterOut <- event
			}
			timer.Reset(config.FlushDuration)
		}
	}

@helgeolav
Copy link
Contributor Author

From the code snippet above I see that you are sending the event straight to the outputs. I was more in line of letting all filters handle the event. Below is an example of a pipeline that could be something that I wanted to do, assuming that the input codec prepared all the fields for me.

Filter 1: discard

Look into the event and see if I want to continue on it or discard it. If I discard it no further processing is done on the server.

Filter 2: aggregate

This filter will look at each event and see if it is a continuation of a previous event. If it is - update the state. If not - create a new state.

A dispatcher will scan for events that are ready to be sent further into the chain. The dispatched event could be sent back to chFilterIn so it can processed by filter 3 and filter 4.

With my PR I want to use the discard option to discard the event so it is not processed any further.

Filter 3: ip2location

Get IP information from the event.

Filter 4: hash

Make a hash of the event.

@tsaikd
Copy link
Owner

tsaikd commented Feb 3, 2022

In your Filter 2: aggregate, it's the decision maker for discarding or not. So I think the better solution is making the filter to discard events (do not dispatch the event to chFilterOut).
For example, extend the return value of Filter.Event():

type FilterReturn struct {
	event   logevent.LogEvent
	stop    bool // do not process the next filter
	discard bool // discard the event
	// more flow control functions
}

-func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (logevent.LogEvent, bool)
+func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (FilterReturn)

Why not extend the control properties in logevent.LogEvent? Because it's not used in input/output filter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants