Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backpressure handling #175

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

helgeolav
Copy link
Contributor

We discussed this a bit in #135. This is my proposal for an interface that can be used to handle backpressure.

@tsaikd
Copy link
Owner

tsaikd commented Oct 15, 2021

The message channel full procedure can be handled by native go select syntax.

// func (t *InputModule) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error)
select {
case msgChan<-msg:
  // send log event normally
default:
  // the message channel is full
}

@helgeolav
Copy link
Contributor Author

helgeolav commented Oct 15, 2021

I don't think that pattern is good enough. There is no way for the output to signal back that there are some issues that needs attention, and the select pattern does only cover the case where the channel is full and is blocking.

Let's use the HTTP output as an example. There are several outcomes from the Output():

  1. The event was passed on. Everything is ok. This is the normal situation.
  2. An issue, let it be server down or network connectivity prevents the output to complete a tree-way handshake. In this case the output blocks the channel for an amount of time before you get a timeout and an error it sent back and the next event is served.
  3. Another issue where the webserver is either stopped (getting a connection reset immediately) or some internal issue in the receiving web app where it returns a HTTP 5xx error.

In either case gogstash will continue to process messages and they will be lost. The problem with the input is that it does not know when it is ok to continue to receive messages and will just have to retry at random intervals.

While thinking, should there also be a Requeue() to allow the output to reschedule the event at a later time in case of issues?

@helgeolav
Copy link
Contributor Author

I hope you see why I think we need a better kind of backpressure handling. I have been thinking and I think it can be cleaner to move RequestPause() and RequestResume into output.OutputConfig - and RegisterInput() into config.InputConfig, still using CanPause so the developer get some freedom in how to implement it. This way it looks more «integrated» into gogstash.

I can write a new proposal on this if you want.

@tsaikd
Copy link
Owner

tsaikd commented Oct 18, 2021

  1. An issue, let it be server down or network connectivity prevents the output to complete a tree-way handshake. In this case the output blocks the channel for an amount of time before you get a timeout and an error it sent back and the next event is served.
  2. Another issue where the webserver is either stopped (getting a connection reset immediately) or some internal issue in the receiving web app where it returns a HTTP 5xx error.

Output modules can retry forever to prevent data lost. In another word, the error handling can be done in output modules.

My questions:

  1. when the input module get the notification of RequestPause, what's the reaction you want to do?
  2. how to handle the case if some of the Input/Output modules do not support pause function?

@helgeolav
Copy link
Contributor Author

Hi

I disagree that the output modules can retry forever. At some point they will either drop the event or gogstash runs out of memory. In my case am I using gogstash to handle around 180k events per second, and in case an output tries to queue everything it will not take long before I run out of memory and gogstash crashes.

If we look at the HTTP output. It will drop the message right away (or after a timeout), reporting an error. The elastic output will not report an error if the message has been received by olivere, but asynchronously just log that there has been an error. The file output will drop the event if it for some reason cannot write to the file.

For your questions;

  1. I want to let the input decide for it self how to handle it. I will put a list of suggestions for each input below.
  2. In case the input does not support Pause then it will be as today. Not all sources can pause without loosing data.

For the inputs;

  • beats: I don't know enough about beats to answer about this input. It seems to use a network connection to another daemon. If so this input can close the connection and wait until it gets a Resume().
  • dockerlog: I don't know much about this input either. But it seems to have a "since"-flag and can probably pause and resume at that point.
  • dockerstats: I don't know much about this input either. But from what I can see the event will be lost in either case. Pausing the input will just make sure that we keep gogstash running without not too many errors in the log.
  • exec: This input, if paused, will not run its command and create an event until resumed.
  • file: With the since-db I will assume that this input can pause reading and continue where it left of without any loss of events.
  • http: As with file, it will not go out and get new data until resumed.
  • httplisten: If paused it will either stop listening or return a 5xx error message util resumed. This way the sending side can decide on how to deal with this error on its own. Many systems have some kind of retry-mechanism.
  • kafka: Kafka, by design, can pause reading and continue where it left off. I am not sure if this input supports it (between restarts) as there is no storage for the sequence-number. But even without it it should be possible to pause the input and resume where it left off.
  • lorem: a pause will just stop sending messages, good for testing.
  • nats: I can't tell for this input. But nats is resilient, so there should be a way for this input to pause.
  • nsq: Supported in NSQ. Just don't read the message and it will be queued in the server.
  • redis: I am not sure how REDIS works here, so I can't tell for sure.
  • socket: A Pause() should perhaps close all sockets so the other side cannot connect. This works fine for TCP but with UDP the sending side will probably not know (or care) about any issues. Messages can be lost, it will depend on the sending side.

Implementing this is not an easy task as each input that we work on will take some time. My intention is to start on the framework and work on the inputs/outputs I know. Just make sure it works before we move on.

I also mentioned a Requeue() event. Providing something like this makes it easier for each output to requeue an event in case of a retry-able error, and have a common way to handle such errors.

tsaikd added a commit that referenced this pull request Oct 19, 2021
@tsaikd
Copy link
Owner

tsaikd commented Oct 19, 2021

50d72c4 does the interface fit your requirement?

@helgeolav
Copy link
Contributor Author

I had a quick look and it seems to work. I suggest we just need to give it a try and get some experience.

tsaikd added a commit that referenced this pull request Oct 20, 2021
tsaikd added a commit that referenced this pull request Oct 20, 2021
@tsaikd tsaikd mentioned this pull request Oct 21, 2021
@helgeolav
Copy link
Contributor Author

Is it something else you want me to do here?

tsaikd added a commit that referenced this pull request Oct 29, 2021
@tsaikd
Copy link
Owner

tsaikd commented Oct 29, 2021

Up to you, keep it for discussing or close it.

Could you help to check daf1889 is working or not?

@helgeolav
Copy link
Contributor Author

Hi

Sorry for the late answer. I have looked more into your control branch and implemented correct handling on two inputs. in #178. I have tested both inputs by modifying outputstdout (that change is not commited) to pause for a few seconds after each message received.

Would it be possible to merge my change in and commit it to master?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants