Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add RESP2 support #2383

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,11 @@ public abstract class BaseClientConfiguration {
*/
private final ThreadPoolResource threadPoolResource;

/**
* Serialization protocol to be used with the server. If not set, {@link ProtocolVersion#RESP3}
* will be used.
*/
private final ProtocolVersion protocol;

public abstract BaseSubscriptionConfiguration getSubscriptionConfiguration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

/** Represents the communication protocol with the server. */
public enum ProtocolVersion {
/** Use RESP2 to communicate with the server nodes. */
RESP3,
/** Use RESP3 to communicate with the server nodes. */
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
RESP2
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConfigurationError;
import glide.connectors.handlers.ChannelHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -118,6 +120,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration
connectionRequestBuilder.setClientName(configuration.getClientName());
}

if (configuration.getProtocol() != null) {
connectionRequestBuilder.setProtocolValue(configuration.getProtocol().ordinal());
}

return connectionRequestBuilder;
}

Expand Down Expand Up @@ -145,7 +151,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClient(
}

if (configuration.getSubscriptionConfiguration() != null) {
// TODO throw ConfigurationError if RESP2
if (configuration.getProtocol() == ProtocolVersion.RESP2) {
throw new ConfigurationError(
"PubSub subscriptions require RESP3 protocol, but RESP2 was configured.");
}
var subscriptionsBuilder = PubSubSubscriptions.newBuilder();
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.ServerCredentials;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
Expand Down Expand Up @@ -143,6 +144,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.build())
.databaseId(DATABASE_ID)
.clientName(CLIENT_NAME)
.protocol(ProtocolVersion.RESP3)
.subscriptionConfiguration(
StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, gs("channel_1"))
Expand Down Expand Up @@ -176,6 +178,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.build())
.setDatabaseId(DATABASE_ID)
.setClientName(CLIENT_NAME)
.setProtocol(ConnectionRequestOuterClass.ProtocolVersion.RESP3)
.setPubsubSubscriptions(
PubSubSubscriptions.newBuilder()
.putAllChannelsOrPatternsByType(
Expand Down
36 changes: 17 additions & 19 deletions java/integTest/src/test/java/glide/ConnectionTests.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide;

import static glide.TestUtilities.commonClientConfig;
import static glide.TestUtilities.commonClusterClientConfig;

import glide.api.GlideClient;
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.GlideClusterClient;
import glide.api.models.configuration.ProtocolVersion;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

@Timeout(10) // seconds
public class ConnectionTests {

@Test
@ParameterizedTest
@EnumSource(ProtocolVersion.class)
@SneakyThrows
public void basic_client() {
public void basic_client(ProtocolVersion protocol) {
var regularClient =
GlideClient.createClient(
GlideClientConfiguration.builder()
.address(
NodeAddress.builder().port(TestConfiguration.STANDALONE_PORTS[0]).build())
.build())
.get();
GlideClient.createClient(commonClientConfig().protocol(protocol).build()).get();
regularClient.close();
}

@Test
@ParameterizedTest
@EnumSource(ProtocolVersion.class)
@SneakyThrows
public void cluster_client() {
var regularClient =
GlideClient.createClient(
GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(TestConfiguration.CLUSTER_PORTS[0]).build())
.build())
public void cluster_client(ProtocolVersion protocol) {
var clusterClient =
GlideClusterClient.createClient(commonClusterClientConfig().protocol(protocol).build())
.get();
regularClient.close();
clusterClient.close();
}
}
26 changes: 26 additions & 0 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback;
import glide.api.models.configuration.ClusterSubscriptionConfiguration;
import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotType;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
Expand Down Expand Up @@ -280,6 +281,31 @@ private void skipTestsOnMac() {
"PubSub doesn't work on mac OS");
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {true, false})
public void config_error_on_resp2(boolean standalone) {
if (standalone) {
var config =
commonClientConfig()
.subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder().build())
.protocol(ProtocolVersion.RESP2)
.build();
var exception =
assertThrows(ConfigurationError.class, () -> GlideClient.createClient(config));
assertTrue(exception.getMessage().contains("PubSub subscriptions require RESP3 protocol"));
} else {
var config =
commonClusterClientConfig()
.subscriptionConfiguration(ClusterSubscriptionConfiguration.builder().build())
.protocol(ProtocolVersion.RESP2)
.build();
var exception =
assertThrows(ConfigurationError.class, () -> GlideClusterClient.createClient(config));
assertTrue(exception.getMessage().contains("PubSub subscriptions require RESP3 protocol"));
}
}

/** Similar to `test_pubsub_exact_happy_path` in python client tests. */
@SneakyThrows
@ParameterizedTest(name = "standalone = {0}, read messages via {1}")
Expand Down
Loading
Loading