-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Failing to commit offset at the end of the Observable #240
Comments
Hello @fchaillou, Not sure what were you trying to achieve with this example... However I think that is not correctly formulated. In the test, the observable is set to fail if in 5 seconds in those cases where we have not received the next element (which will always fail since you are only sending two messages). On the other hand the This is my guess, but probably something wrong is happening there that is forcing the consumer to close before we get to commit the records. Hope it helps :) |
Hello @paualarco, Basically, the topic we're reading from is populated in batch fashion and so we want to consume until there is nothing available anymore, that's the purpose of The issue here is that the consumer is closed too soon, and when we reach the |
hi @fchaillou, sorry for the late response, I think I did not understand the issue when I saw it from first time. This how it would look like: def manualCommitResource[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = {
for {
consumer <- Resource.make(createConsumer[K, V](cfg, topics)){ consumer =>
Task.evalAsync(consumer.synchronized{ blocking(consumer.close())})
}
observable <- Resource.pure(manualCommit(cfg, consumer)))
} yield (observable)
} @Avasil do you agree this is a better way of releasing the kafka consumer? In case yes, should we deprecate the old methods? |
It should also fix this #186 |
Trigger ci 1 Remove printline Update scala version build Refinement Bring back serialization test
Hello @paualarco Your suggestion makes a lot of sense but doesn't work because the private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = {
// Forced asynchronous boundary
val cancelTask = Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
}
// By applying memoization, we are turning this
// into an idempotent action, such that we are
// guaranteed that consumer.close() happens
// at most once
cancelTask.memoize
} I'm wondering if the best solution should be to have this change done : trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
protected def config: KafkaConsumerConfig
protected def consumer: Resource[Task, Consumer[K, V]]] |
Hi @fchaillou, |
Oh, I didn't understand this was related to another PR. |
Hello,
I'm using monix-kafka's
KafkaConsumerObservable.manualCommit
consumer to consume data from kafka in a batch application.We consume data until there is no more (using
.timeoutOnSlowUpstreamTo(5.seconds, Observable.empty)
to detect the end).I'm processing the data and at the end of the Observable (but still part of the Observable), i commit the offset with a CommitableBatch.
The commit fails with :
Here is my main logic FYI :
And here is a repo with a ready sbt project to reproduce it : https://github.com/fchaillou/monix-kafka-issue
Let me know if there is anything else i can do to help
Thanks
Fabien
The text was updated successfully, but these errors were encountered: