Skip to content

Commit

Permalink
polled messages now extends messages
Browse files Browse the repository at this point in the history
  • Loading branch information
AayuStark007 committed May 9, 2024
1 parent 2aa9af5 commit e663e2b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.entities.ProducerMessage;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
import com.google.common.collect.ArrayListMultimap;

/**
* Message tracking implementation for PolledMessage type.
Expand All @@ -21,22 +19,11 @@ public PolledMessageTracker(Consumer<O> committer, PolledMessage<O> message) {

@Override
public Message getMessage() {
return new PolledMessageWrapper<>(message);
return message;
}

@Override
public void onConsumed(MessageConsumptionStatus status) {
committer.commitIndividualAsync(message);
}

// TODO(aayush): come up with better modeling of message and message with offset
static class PolledMessageWrapper<O extends Offset> extends ProducerMessage {
PolledMessage<O> polledMessage;
// keeping headers as properties and outside the payload

public PolledMessageWrapper(PolledMessage<O> polledMessage) {
super(polledMessage.getPayload(), ArrayListMultimap.create());
this.polledMessage = polledMessage;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.flipkart.varadhi.pulsar.entities;

import com.flipkart.varadhi.spi.services.PolledMessage;
import com.google.common.collect.Multimap;
import lombok.RequiredArgsConstructor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageIdImpl;

import java.util.List;

@RequiredArgsConstructor
public class PulsarMessage implements PolledMessage<PulsarOffset> {

Expand All @@ -25,11 +28,41 @@ public PulsarOffset getOffset() {
return new PulsarOffset(msg.getMessageId());
}

@Override
public String getMessageId() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public String getGroupId() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public boolean hasHeader(String key) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public String getHeader(String key) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public List<String> getHeaders(String key) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public byte[] getPayload() {
return msg.getValue();
}

@Override
public Multimap<String, String> getRequestHeaders() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void release() {
msg.release();
Expand Down
1 change: 1 addition & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ plugins {
dependencies {
api(project(':entities'))
api('com.fasterxml.jackson.core:jackson-databind')
testFixturesImplementation("com.google.guava:guava:$guava_version")
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.flipkart.varadhi.spi.services;

import com.flipkart.varadhi.entities.MessageWithOffset;
import com.flipkart.varadhi.entities.Offset;

/**
* Represents a single message polled from some partition.
*/
// TODO: Keeping payload as byte[] only for now. When implementing, there will be opportunity to evaluate how to avoid
// unnecessary deserialization / array copy & then we can tweak this interface.
public interface PolledMessage<O extends Offset> {
public interface PolledMessage<O extends Offset> extends MessageWithOffset<O> {

String getTopicName();

Expand All @@ -16,11 +17,6 @@ public interface PolledMessage<O extends Offset> {
*/
int getPartition();

/**
* @return the offset of this message in the partition.
*/
O getOffset();

/**
* @return the payload of this message.
*/
Expand All @@ -29,7 +25,6 @@ public interface PolledMessage<O extends Offset> {

// TODO: evaluate method for message properties that live outside of payload.


/**
* releases any resources that may be associated to it. Accessing message object once released is undefined.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi.spi.services;

import com.flipkart.varadhi.spi.services.DummyProducer.DummyOffset;
import com.google.common.collect.Multimap;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
Expand Down Expand Up @@ -83,11 +84,41 @@ public DummyOffset getOffset() {
return new DummyOffset(1);
}

@Override
public String getMessageId() {
return null;
}

@Override
public String getGroupId() {
return null;
}

@Override
public boolean hasHeader(String key) {
return false;
}

@Override
public String getHeader(String key) {
return null;
}

@Override
public List<String> getHeaders(String key) {
return null;
}

@Override
public byte[] getPayload() {
return message.getBytes(StandardCharsets.UTF_8);
}

@Override
public Multimap<String, String> getRequestHeaders() {
return null;
}

@Override
public void release() {
// no op
Expand Down

0 comments on commit e663e2b

Please sign in to comment.