Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack Kafka messages only after writing to another data source. #111

Open
lemanchester opened this issue Oct 24, 2022 · 3 comments
Open

Ack Kafka messages only after writing to another data source. #111

lemanchester opened this issue Oct 24, 2022 · 3 comments
Assignees

Comments

@lemanchester
Copy link
Member

One of the features of schaufel is:

Read messages from Kafka and write to Postgres, and it seems by using an in Memory Queue, where schaufel reads from Kafka ack the message adding to a Queue to be written into Postgres.

The problem is if the Schaufel process is hard killed or there is problem to write into Postgres, we ended up with data loss because the messages were already acknowledge from Kafka.

We need to make sure that only ack messages after a success written into Postgres or any other data source.

@alip
Copy link
Contributor

alip commented Mar 6, 2023

It sounds like the current implementation of Schaufel has a potential data loss issue if there is a problem with writing to the target data source. To avoid this issue, Schaufel should only acknowledge messages from Kafka after successfully writing them to the target data source.

One possible solution could be to implement a two-phase commit protocol where the Schaufel process first writes the message to a staging area and then commits it to the target data source. If the write to the target data source fails, the message can be rolled back from the staging area, ensuring that no data is lost. Another solution could be to implement a retry mechanism where Schaufel retries the failed writes a certain number of times before giving up and acknowledging the message from Kafka as failed.

It's important to ensure that the solution does not introduce additional performance overhead or latency to the message processing pipeline.

@alip
Copy link
Contributor

alip commented Mar 6, 2023

Using a message queue with persistent storage is a good way to ensure reliable message delivery and handle transient errors. A message queue with persistent storage will write the messages to disk, so if the consumer process crashes, the messages will still be available for processing when the process restarts.

Regarding the use of a write-ahead log (WAL), it can be an effective way to ensure data durability in case of failures. A WAL records changes to a database in a sequential manner, which can be replayed to restore the database to a consistent state after a failure. However, implementing a WAL can be complex and may require considerable changes to the existing codebase.

Message retries are another feature that can help handle transient errors. If a message fails to be processed or written to the database, it can be retried a certain number of times before being moved to the dead-letter queue.

Making the commit interval configurable can have a significant impact on data resiliency.
Note this is currently hardcoded to 2000 in postgresql producer:

schaufel/src/postgres.c

Lines 319 to 324 in d119875

m->count = m->count + 1;
if (m->count == 2000)
{
commit(&m);
}
pthread_mutex_unlock(&m->commit_mutex);

Hardcoding the commit interval to a specific value may not always be the best approach as it may not be optimal for all scenarios. For example, if the commit interval is set too high, then there is a risk of losing data in the event of a failure before the interval has been reached. On the other hand, if the commit interval is set too low, then it can lead to a higher number of transactions and overhead on the database.

By making the commit interval configurable, it allows for greater flexibility in adapting to different use cases and scenarios. The optimal commit interval may vary based on the size and complexity of the data, the rate of data ingestion, and the resources available for processing and storage.

In addition, making the commit interval configurable can also allow for easier tuning and optimization of the system over time as conditions and requirements change. It can also enable better monitoring and management of data resiliency, as different commit intervals can be tested and evaluated to determine the best approach for ensuring reliable and consistent data delivery.

@alip
Copy link
Contributor

alip commented Mar 6, 2023

The approach of using a Write Ahead Log (WAL) to persist messages in case of an abnormal exit and replaying them on startup is a solid strategy to ensure data resiliency. However, implementing it in a multi-threaded environment like Schaufel could be challenging.

To ensure xmarks and other processing by hooks are persisted in the message, we could consider saving the messages with their associated metadata (like xmarks) to the WAL, instead of just the raw messages. This would ensure that when replaying the WAL, the messages are processed by hooks and sent to the correct destination.

As for the simplest approach, we could start with a per-producer WAL that saves only the raw messages and replay them on startup. This approach would be easier to implement but may require additional logic to ensure that messages are sent to the correct destination and processed by hooks. We can then improve on this by adding support for metadata like xmarks and making the WAL configurable.

alip added a commit that referenced this issue Apr 5, 2023
Previously we had the commit interval hardcoded to 2000 in postgres
producer. It is useful to make this configurable for cases where data
resiliency is more important than performance.

This commit introduces no functionality change as long as
commit_interval is not specified in schaufel configuration. In this
case, the default is unchanged and we commit once every 2000 messages.

Ref: #111
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants