-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Message class as an interface. Model the producer message and polled message as implementations #137
Conversation
@@ -0,0 +1,9 @@ | |||
package com.flipkart.varadhi.entities; | |||
|
|||
public interface MessageWithOffset<O extends Offset> extends Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to merge it with PolledMessage, when it is needed independently meaning if there is Offset associated with message it should have other properties like topic/partition etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; we don't need a separate message with offset.
PolledMessage acts as the Message contract from the consumer side. It has the info on partitions and offsets.
@@ -25,11 +28,41 @@ public PulsarOffset getOffset() { | |||
return new PulsarOffset(msg.getMessageId()); | |||
} | |||
|
|||
@Override | |||
public String getMessageId() { | |||
throw new UnsupportedOperationException("Not implemented"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notimplementedexception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
import java.util.List; | ||
|
||
public class ProducerMessage implements Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be moved to messaging.produce (PulsarProducer test might require a test fixture in entities for this)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PulsarProducerTest is able to access producer message via the entities dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, suggestion was to check if this can be moved to messaging.produce as this is specific to produce path alone. ?
e663e2b
to
5042f5d
Compare
Test Results218 tests 218 ✅ 43s ⏱️ Results for commit c607d3a. ♻️ This comment has been updated with latest results. |
} | ||
|
||
@Override | ||
public void onConsumed(MessageConsumptionStatus status) { | ||
committer.commitIndividualAsync(message); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since PolledMessage is now a Message, we don't need to implement a Wrapper to make it work with the MessageTracker contracts
@@ -25,11 +29,41 @@ public PulsarOffset getOffset() { | |||
return new PulsarOffset(msg.getMessageId()); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to figure out, how to conform to the Message contracts around headers, message and group Id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
planning to take it independently ?
@@ -6,4 +6,5 @@ plugins { | |||
dependencies { | |||
api(project(':entities')) | |||
api('com.fasterxml.jackson.core:jackson-databind') | |||
testFixturesImplementation("com.google.guava:guava:$guava_version") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed so that Dummy Consumer can import Guava Multimap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments, please check prior to merge.
|
||
import java.util.List; | ||
|
||
public class ProducerMessage implements Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, suggestion was to check if this can be moved to messaging.produce as this is specific to produce path alone. ?
@@ -25,11 +29,41 @@ public PulsarOffset getOffset() { | |||
return new PulsarOffset(msg.getMessageId()); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
planning to take it independently ?
Fixes: #144
This PR proposes the following changes:
Messages can have extra details associated with them on the consume side, like offset, topic name, partition, etc. Here, PolledMessage can act as the entity to represent the consume-side message contracts.
The benefit of the above is that the message concept is unified in the codebase, and extensions are provided via PolledMessage to model the consume side.
This also simplifies the MessageSrc implementation where we can implement MessageTracker over PolledMessage without writing the code to convert underlying PolledMessage to Message (it's simple now since both share the same type)