Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use address for remote calls in vertx clustered event bus #165

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,12 @@ public static String getHostName() throws UnknownHostException {
log.debug("getHostName: completed");
return host;
}

public static String getHostAddress() throws UnknownHostException {
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
// debug to see how much time it takes, in case DNS resolution is taking time.
log.debug("getHostAddress: started");
String host = InetAddress.getLocalHost().getHostAddress();
log.debug("getHostAddress: completed");
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ public void TestGetHostName() throws UnknownHostException {
Assertions.assertNotNull(host);
Assertions.assertEquals(InetAddress.getLocalHost().getHostName(), host);
}

@Test
public void TestGetHostAddress() throws UnknownHostException {
// dummy test.. no validations here.
String address = HostUtils.getHostAddress();
Assertions.assertNotNull(address);
Assertions.assertEquals(InetAddress.getLocalHost().getHostAddress(), address);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void preTest() {
@Test
public void testAddConsumerNode() {
MemberInfo memberInfo =
new MemberInfo("Consumer.01", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
new MemberInfo("Consumer.01", "", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
ConsumerInfo consumerInfo = ConsumerInfo.from(memberInfo);
ConsumerNode consumerNode = new ConsumerNode(memberInfo);
doReturn(CompletableFuture.completedFuture(consumerInfo)).when(consumerApi).getConsumerInfo();
Expand All @@ -74,7 +74,7 @@ public void testAddConsumerNode() {
@Test
public void testAddConsumerNodeWhenGetConsumerInfoFailsExceptionally() {
MemberInfo memberInfo =
new MemberInfo("Consumer.01", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
new MemberInfo("Consumer.01", "", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
ConsumerNode consumerNode = new ConsumerNode(memberInfo);
doReturn(CompletableFuture.failedFuture(
new ReplyException(ReplyFailure.NO_HANDLERS, "Host not available."))).when(consumerApi)
Expand All @@ -89,7 +89,7 @@ public void testAddConsumerNodeWhenGetConsumerInfoFailsExceptionally() {
@Test
public void testAddConsumerNodeWhenGetConsumerInfoThrows() {
MemberInfo memberInfo =
new MemberInfo("Consumer.01", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
new MemberInfo("Consumer.01", "", 0, new ComponentKind[]{ComponentKind.Consumer}, new NodeCapacity());
ConsumerNode consumerNode = new ConsumerNode(memberInfo);
doThrow(new RuntimeException("Some unknown failure.")).when(consumerApi).getConsumerInfo();
RuntimeException re =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

public record MemberInfo(
String hostname,
String address,
int port,
ComponentKind[] roles,
NodeCapacity provisionedCapacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ public static List<ConsumerNode> getConsumerNodes(int numNodes) {
public static List<ConsumerNode> getConsumerNodes(int numNodes, NodeCapacity capacity) {
List<ConsumerNode> nodes = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
nodes.add(new ConsumerNode(new MemberInfo("test.consumer-node." + i, 0, new ComponentKind[]{ComponentKind.Consumer}, capacity)));
nodes.add(new ConsumerNode(new MemberInfo("test.consumer-node." + i, "", 0, new ComponentKind[]{ComponentKind.Consumer}, capacity)));
}
return nodes;
}

public static ConsumerNode getConsumerNode(String nodeName, NodeCapacity capacity) {
return new ConsumerNode(new MemberInfo(nodeName, 0, new ComponentKind[]{ComponentKind.Consumer}, capacity));
return new ConsumerNode(new MemberInfo(nodeName, "", 0, new ComponentKind[]{ComponentKind.Consumer}, capacity));
}

public static NodeCapacity getNodeCapacity(int qps, int throughputKbps) {
Expand Down
20 changes: 11 additions & 9 deletions server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.flipkart.varadhi;

import com.flipkart.varadhi.entities.cluster.NodeCapacity;
import com.flipkart.varadhi.utils.JsonMapper;
import com.flipkart.varadhi.verticles.consumer.ConsumerVerticle;
import com.flipkart.varadhi.verticles.webserver.WebServerVerticle;
import com.flipkart.varadhi.entities.cluster.MemberInfo;
import com.flipkart.varadhi.cluster.VaradhiClusterManager;
import com.flipkart.varadhi.cluster.custom.VaradhiZkClusterManager;
import com.flipkart.varadhi.entities.cluster.ComponentKind;
import com.flipkart.varadhi.verticles.controller.ControllerVerticle;
import com.flipkart.varadhi.config.AppConfiguration;
import com.flipkart.varadhi.config.MemberConfig;
import com.flipkart.varadhi.entities.cluster.ComponentKind;
import com.flipkart.varadhi.entities.cluster.MemberInfo;
import com.flipkart.varadhi.entities.cluster.NodeCapacity;
import com.flipkart.varadhi.exceptions.InvalidConfigException;
import com.flipkart.varadhi.utils.CuratorFrameworkCreator;
import com.flipkart.varadhi.utils.HostUtils;
import com.flipkart.varadhi.utils.JsonMapper;
import com.flipkart.varadhi.verticles.consumer.ConsumerVerticle;
import com.flipkart.varadhi.verticles.controller.ControllerVerticle;
import com.flipkart.varadhi.verticles.webserver.WebServerVerticle;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
Expand Down Expand Up @@ -75,10 +75,11 @@ public static void main(String[] args) {
}

private static MemberInfo getMemberInfo(MemberConfig memberConfig) throws UnknownHostException {
String host = HostUtils.getHostName();
String hostName = HostUtils.getHostName();
String hostAddress = HostUtils.getHostAddress();
int networkKBps = memberConfig.getNetworkMBps() * 1000;
NodeCapacity provisionedCapacity = new NodeCapacity(memberConfig.getMaxQps(), networkKBps);
return new MemberInfo(host, memberConfig.getClusterPort(), memberConfig.getRoles(), provisionedCapacity);
return new MemberInfo(hostName, hostAddress, memberConfig.getClusterPort(), memberConfig.getRoles(), provisionedCapacity);
}

private static VaradhiZkClusterManager getClusterManager(AppConfiguration config, String host) {
Expand All @@ -97,6 +98,7 @@ private static Future<Vertx> createClusteredVertx(
EventBusOptions eventBusOptions = new EventBusOptions()
.setHost(memberInfo.hostname())
.setPort(port)
.setClusterPublicHost(memberInfo.address())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in eventbus options, what is host used for if cluster public host is already present?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host will be used for nodeId (internal to vertx) but for actual comms, address will be used.

.setClusterNodeMetadata(memberInfoJson);

VertxOptions vertxOptions = config.getVertxOptions()
Expand Down
Loading