-
Notifications
You must be signed in to change notification settings - Fork 1k
FAQ:UDFs
This is a common question raised on the Community Slack channel and through other media. Users want to call out to external services from their own UDFs, UDAFs or UDTFs, or potentially perform some other task that may take a long time or introduce a deliberate delay, say ensuring messages aren't processed for 30 minutes based on the event time in the message.
The question is, can this work with ksqlDB?
The short answer is that this is not recommended.
The slightly longer answer is that its not recommended because it will first and foremost kill the throughput and multiply the processing latency of your system, and that's just the happy path. If the delay becomes high enough, e.g. if the remote system becomes unavailable for some time, then you risk destabilizing your processing.
UDFs in ksqlDB are expected to perform relatively quick computations. Their invocation is performed inline with the flow of processing rows of data. Under the hood, KSQL is using Kafka Streams, which is making use of Kafka Consumer Groups. Consumer Groups are Kafka's way spreading the work across multiple client application instances and threads; in this case KSQL servers. Consumer Groups are designed to detect group members that are not making progress or have failed. When such a member is detected, it is ejected from the group and the work assigned to it given to another member. The challenge with long running UDFs is that they can make it appear as though KSQL is not making any progress.
A slow running UDF can cause the data being processed to be assigned to a different server or thread. The new thread will then invoke the UDF and, potentially, block, causing the data to be assigned to the next and the next thread util, potentially, all threads processing the query were blocked inside your UDF. This may be acceptable if the remote system your UDF is calling out to is completely unavailable, but less ideal if the remote system has only a partial outage, say 1-in-10 servers are running really slowly, of if you're UDF is trying to delay processing by some amount of event-time: one event that needs delaying may block all threads, stopping other events, that don't need delaying, from being processed.
OK, so hopefully that's helped give a high level overview of the problem. How about a little help? There are some patterns that may help.
- Can you replicate this data you're obtaining from your external system into KSQL? You could choose start a Connectors in KSQL to ingest the data, so that the table is available, in-process, to join again. For a Microservice, you may choose to have the microserve pushing its data to Kafka and load this into KSQL as a reference table. This pattern will give you much better throughput, latency, scaling and fault tolerance than calling the external service directly.
- Could your UDF output some special value if the remote service is unavailable? Could rows with this special value be post-processed in some way to be recovered?
- It may be possible to tweak the Consumer Group settings to get your use-case to work in a way that is acceptable to you. You can use the
SET
command to set any consumer properties. At the time of writing,session.timeout.ms
is probably a good one to start with. However, this isn't a clean solution. You're likely to run in to issues maybe not today, maybe not tomorrow, but soon, and only at a crucial point in production.
If none of these patterns can help, then maybe KSQL isn't the right tool. If you switch to writing a Kafka Streams app, you could choose to store the message in a state-store if the remote service was unavailable, or the message needed delaying/capturing for some other reason. Kafka Streams offers much richer functionality that can be used to implement a much wider set of use-cases.
Failing that, you're likely looking at writing a custom application, in any of the many languages for which there are a Kafka client.