From ca4b78001eef407d18082646c4982f759ad0b08c Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Fri, 18 Aug 2023 15:35:37 +0300 Subject: [PATCH] Rpc protobuf metadata: add internal codec with no rsocket-rpc-metadata-idl dependency --- gradle.properties | 6 +- gradle/bom.gradle | 2 +- gradle/dependency-management.gradle | 5 + .../java/com/jauntsdn/rsocket/Headers.java | 63 +- .../main/java/com/jauntsdn/rsocket/Rpc.java | 95 +++ .../rsocket/exceptions/MetadataException.java | 25 + rsocket-test/build.gradle | 68 ++ rsocket-test/gradle.lockfile | 25 + .../java/io/rsocket/rpc/RpcCallMetadata.java | 594 ++++++++++++++++++ .../rsocket/rpc/RpcCallMetadataOrBuilder.java | 34 + .../io/rsocket/rpc/RpcCallMetadataProto.java | 49 ++ .../rsocket/ProtobufMetadataTest.java | 151 +++++ settings.gradle | 3 +- 13 files changed, 1097 insertions(+), 23 deletions(-) create mode 100644 rsocket-messages/src/main/java/com/jauntsdn/rsocket/exceptions/MetadataException.java create mode 100644 rsocket-test/build.gradle create mode 100644 rsocket-test/gradle.lockfile create mode 100644 rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadata.java create mode 100644 rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataOrBuilder.java create mode 100644 rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataProto.java create mode 100644 rsocket-test/src/test/java/com/jauntsdn/rsocket/ProtobufMetadataTest.java diff --git a/gradle.properties b/gradle.properties index e58ec95..c3385fc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ group=com.jauntsdn.rsocket version=1.5.1 dependencyManagementPluginVersion=1.1.0 -protobufPluginVersion=0.9.2 +protobufPluginVersion=0.8.19 googleJavaFormatPluginVersion=0.9 gitPluginVersion=0.13.0 versionsPluginVersion=0.45.0 @@ -18,9 +18,11 @@ javaxInjectVersion=1 javaxAnnotationVersion=1.3.2 jakartaInjectVersion=2.0.1 jakartaAnnotationVersion=2.1.1 - protobufVersion=3.23.4 +junitVersion=5.9.0 +assertjVersion=3.23.1 + release=false virtualthreads=true toolchains=true diff --git a/gradle/bom.gradle b/gradle/bom.gradle index b7bcdde..8dfe04e 100644 --- a/gradle/bom.gradle +++ b/gradle/bom.gradle @@ -18,6 +18,6 @@ ext.removeDependencyScope = { pom -> ext.managedDependencyModules = { parent -> parent.subprojects - .findAll { !it.name.endsWith("bom")} + .findAll { !it.name.endsWith("bom") && !it.name.endsWith("test")} .sort { "$it.name" } } diff --git a/gradle/dependency-management.gradle b/gradle/dependency-management.gradle index 3165ffd..957e59e 100644 --- a/gradle/dependency-management.gradle +++ b/gradle/dependency-management.gradle @@ -9,6 +9,7 @@ subprojects { imports { mavenBom "io.netty:netty-bom:${nettyBomVersion}" mavenBom "io.projectreactor:reactor-bom:${reactorBomVersion}" + mavenBom "org.junit:junit-bom:${junitVersion}" } dependencies { @@ -20,6 +21,10 @@ subprojects { dependency "javax.inject:javax.inject:${javaxInjectVersion}" dependency "javax.annotation:javax.annotation-api:${javaxAnnotationVersion}" dependency "com.google.code.findbugs:jsr305:${jsr305Version}" + + dependency "com.google.protobuf:protobuf-java:${protobufVersion}" + dependency "com.google.protobuf:protoc:${protobufVersion}" + dependency "org.assertj:assertj-core:${assertjVersion}" } generatedPomCustomization { enabled = false diff --git a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java index abea2c7..755c1b2 100644 --- a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java +++ b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java @@ -27,16 +27,18 @@ import java.util.Objects; public final class Headers { - private static final Headers EMPTY = new Headers(false, Collections.emptyList()); - private static final Headers DEFAULT_SERVICE = new Headers(true, Collections.emptyList()); + private static final Headers EMPTY = new Headers(false, Collections.emptyList(), 0); + private static final Headers DEFAULT_SERVICE = new Headers(true, Collections.emptyList(), 0); private final boolean isDefaultService; + private final int serializedSize; private final List nameValues; private volatile ByteBuf cache; - private Headers(boolean isDefaultService, List nameValues) { + private Headers(boolean isDefaultService, List nameValues, int serializedSize) { this.isDefaultService = isDefaultService; this.nameValues = nameValues; + this.serializedSize = serializedSize; } public boolean isDefaultService() { @@ -139,11 +141,11 @@ public static Headers create(String... headers) { } public static Headers create(boolean isDefaultService, String... headers) { - requireValid(headers, "headers"); + int serializedSize = requireValid(headers, "headers"); if (headers.length == 0) { return isDefaultService ? DEFAULT_SERVICE : EMPTY; } - return new Headers(isDefaultService, Arrays.asList(headers)); + return new Headers(isDefaultService, Arrays.asList(headers), serializedSize); } public static Headers empty() { @@ -163,11 +165,11 @@ public static Headers.Builder newBuilder(int size) { } static Headers create(List headers) { - requireValid(headers, "headers"); + int serializedSize = requireValid(headers, "headers"); if (headers.isEmpty()) { return EMPTY; } - return new Headers(false, headers); + return new Headers(false, headers, serializedSize); } ByteBuf cache() { @@ -184,9 +186,14 @@ List headers() { return nameValues; } + public int serializedSize() { + return serializedSize; + } + public static final class Builder { private final List nameValues; private boolean isDefaultService; + private int serializedSize; private Builder(int size, List headers) { int length = headers.size(); @@ -215,6 +222,8 @@ public Builder add(String name, String value) { List nv = nameValues; nv.add(name); nv.add(value); + serializedSize += + Rpc.ProtoMetadata.serializedSize(name) + Rpc.ProtoMetadata.serializedSize(value); return this; } @@ -223,8 +232,14 @@ public Builder remove(String name) { List nv = nameValues; for (int i = nv.size() - 2; i >= 0; i -= 2) { if (name.equals(nv.get(i))) { - nv.remove(i + 1); - nv.remove(i); + String removedValue = nv.remove(i + 1); + String removedName = nv.remove(i); + if (removedValue != null) { + serializedSize -= Rpc.ProtoMetadata.serializedSize(removedValue); + } + if (removedName != null) { + serializedSize -= Rpc.ProtoMetadata.serializedSize(removedName); + } } } return this; @@ -236,15 +251,21 @@ public Builder remove(String name, String value) { List nv = nameValues; for (int i = nv.size() - 2; i >= 0; i -= 2) { if (name.equals(nv.get(i)) && value.equals(nv.get(i + 1))) { - nv.remove(i + 1); - nv.remove(i); + String removedValue = nv.remove(i + 1); + String removedName = nv.remove(i); + if (removedValue != null) { + serializedSize -= Rpc.ProtoMetadata.serializedSize(removedValue); + } + if (removedName != null) { + serializedSize -= Rpc.ProtoMetadata.serializedSize(removedName); + } } } return this; } public Headers build() { - return new Headers(isDefaultService, nameValues); + return new Headers(isDefaultService, nameValues, serializedSize); } } @@ -256,13 +277,14 @@ private static String requireNonEmpty(String seq, String message) { return seq; } - private static List requireValid(List keyValues, String message) { + private static int requireValid(List keyValues, String message) { Objects.requireNonNull(keyValues, "keyValues"); - int size = keyValues.size(); - if (size % 2 != 0) { + int length = keyValues.size(); + if (length % 2 != 0) { throw new IllegalArgumentException(message + " size must be even"); } - for (int i = 0; i < size; i++) { + int size = 0; + for (int i = 0; i < length; i++) { String kv = keyValues.get(i); if (kv == null) { throw new IllegalArgumentException(message + " elements must be non-null"); @@ -271,16 +293,18 @@ private static List requireValid(List keyValues, String message) if (isKey && kv.isEmpty()) { throw new IllegalArgumentException(message + " keys must be non-empty"); } + size += Rpc.ProtoMetadata.serializedSize(kv); } - return keyValues; + return size; } - private static String[] requireValid(String[] keyValues, String message) { + private static int requireValid(String[] keyValues, String message) { Objects.requireNonNull(keyValues, "keyValues"); int length = keyValues.length; if (length % 2 != 0) { throw new IllegalArgumentException(message + " size must be even"); } + int size = 0; for (int i = 0; i < length; i++) { String kv = keyValues[i]; if (kv == null) { @@ -290,7 +314,8 @@ private static String[] requireValid(String[] keyValues, String message) { if (isKey && kv.isEmpty()) { throw new IllegalArgumentException(message + " keys must be non-empty"); } + size += Rpc.ProtoMetadata.serializedSize(kv); } - return keyValues; + return size; } } diff --git a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Rpc.java b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Rpc.java index a85070b..57679aa 100644 --- a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Rpc.java +++ b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Rpc.java @@ -16,14 +16,21 @@ package com.jauntsdn.rsocket; +import com.jauntsdn.rsocket.exceptions.MetadataException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; public final class Rpc { @@ -328,5 +335,93 @@ public static ByteBuf getCache(Headers headers) { public static void setCache(Headers headers, ByteBuf cache) { headers.cache(cache); } + + private static final int LEN_TAG = /*field*/ 1 << 3 | /*wire type LEN*/ 2; + private static final int VARINT_BYTE_MAX = 128; + private static final int HEADER_LENGTH_MAX = 8192; + + public static ByteBuf encodeHeaders(Headers headers) { + Objects.requireNonNull(headers, "headers"); + if (headers.isEmpty()) { + return Unpooled.EMPTY_BUFFER; + } + ByteBuf cache = headers.cache(); + if (cache != null) { + return cache; + } + int serializedSize = headers.serializedSize(); + ByteBuf byteBuf = + new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, serializedSize, serializedSize); + + List asciiHeaders = headers.headers(); + for (int i = 0; i < asciiHeaders.size(); i++) { + String asciiHeader = asciiHeaders.get(i); + encodeLen(byteBuf, asciiHeader.length()); + ByteBufUtil.writeAscii(byteBuf, asciiHeader); + } + headers.cache(byteBuf); + return byteBuf; + } + + static void encodeLen(ByteBuf byteBuf, int len) { + if (len < VARINT_BYTE_MAX) { + byteBuf.writeShort(LEN_TAG << 8 | len); + } else { + byteBuf.writeByte(LEN_TAG); + int varintLen = (len & 0x7F | /*cont bit*/ 0x80) << 8 | ((len >> 7) & 0x7F); + byteBuf.writeShort(varintLen); + } + } + + static int serializedSize(String asciiString) { + int headerLength = asciiString.length(); + return headerLength < VARINT_BYTE_MAX ? headerLength + 2 : headerLength + 3; + } + + public static Headers decodeHeaders(ByteBuf metadata) { + Objects.requireNonNull(metadata, "metadata"); + if (metadata.readableBytes() == 0) { + return Headers.empty(); + } + List headers = null; + int remaining = metadata.readableBytes(); + do { + if (remaining < 2) { + throw new MetadataException("unexpected metadata structure"); + } + remaining -= Short.BYTES; + short tagLenStart = metadata.readShort(); + int tag = tagLenStart >> 8; + if (tag != LEN_TAG) { + throw new MetadataException("unexpected protobuf metadata message tag: " + tag); + } + int lenStart = tagLenStart & 0xFF; + int len; + if ((lenStart & /*cont bit*/ 0x80) == 0) { + len = lenStart & 0x7F; + } else { + remaining -= Byte.BYTES; + byte lenEnd = metadata.readByte(); + if ((lenEnd & /*cont bit*/ 0x80) != 0) { + throw new MetadataException( + "unexpected protobuf metadata header length, exceeds " + HEADER_LENGTH_MAX); + } + len = lenStart & 0x7F | lenEnd << 7; + if (len > HEADER_LENGTH_MAX) { + throw new MetadataException( + "unexpected protobuf metadata header length, exceeds " + + HEADER_LENGTH_MAX + + ": " + + len); + } + } + if (headers == null) { + headers = new ArrayList<>(4); + } + remaining -= len; + headers.add(metadata.readCharSequence(len, StandardCharsets.US_ASCII).toString()); + } while (remaining > 0); + return Headers.create(headers); + } } } diff --git a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/exceptions/MetadataException.java b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/exceptions/MetadataException.java new file mode 100644 index 0000000..4a7fb6c --- /dev/null +++ b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/exceptions/MetadataException.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.jauntsdn.rsocket.exceptions; + +public final class MetadataException extends ApplicationErrorException { + private static final long serialVersionUID = -5140098709867987805L; + + public MetadataException(String message) { + super(message); + } +} diff --git a/rsocket-test/build.gradle b/rsocket-test/build.gradle new file mode 100644 index 0000000..8fde3fb --- /dev/null +++ b/rsocket-test/build.gradle @@ -0,0 +1,68 @@ +/* + * Copyright 2020 - present Maksym Ostroverkhov. + */ + +plugins { + id "java" + id "com.google.protobuf" + id "idea" +} + +dependencies { + protobuf project(":rsocket-rpc-metadata-idl") + implementation "com.google.protobuf:protobuf-java" + + testImplementation project(":rsocket-messages") + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.junit.jupiter:junit-jupiter-params" + testImplementation "org.assertj:assertj-core" + + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine" +} + +sourceSets { + main { + java { srcDir "src/generated" } + } +} + +test { + useJUnitPlatform() + + testLogging { + events "failed" + exceptionFormat "full" + } +} + +/*generate proto on build*/ +if (findProperty("generateProto") == "true") { + protobuf { + generatedFilesBaseDir = "${projectDir}/src/generated" + protoc { + artifact = "com.google.protobuf:protoc" + } + } + + clean { + delete "src/generated" + } +/*skip generate proto on build (default)*/ +} else { + tasks.whenTaskAdded { task -> + if (task.name.contains("generateProto")) { + task.enabled = false + } + } +} + +idea { + module { + sourceDirs += file("src/main/proto") + sourceDirs += file("src/generated/main/java") + + generatedSourceDirs += file("src/generated/main/java") + } +} + +description = "RSocket-jvm tests module" \ No newline at end of file diff --git a/rsocket-test/gradle.lockfile b/rsocket-test/gradle.lockfile new file mode 100644 index 0000000..54a6edd --- /dev/null +++ b/rsocket-test/gradle.lockfile @@ -0,0 +1,25 @@ +# This is a Gradle generated file for dependency locking. +# Manual edits can break the build and are not advised. +# This file is expected to be part of source control. +com.google.code.findbugs:jsr305:3.0.2=googleJavaFormat1.6 +com.google.errorprone:error_prone_annotations:2.0.18=googleJavaFormat1.6 +com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 +com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 +com.google.guava:guava:22.0=googleJavaFormat1.6 +com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 +com.google.protobuf:protobuf-java:3.23.4=compileClasspath,compileProtoPath,testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +com.google.protobuf:protoc:3.23.4=protobufToolsLocator_protoc +io.netty:netty-buffer:4.1.96.Final=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +io.netty:netty-common:4.1.96.Final=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +net.bytebuddy:byte-buddy:1.12.10=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath +org.assertj:assertj-core:3.23.1=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.codehaus.mojo:animal-sniffer-annotations:1.14=googleJavaFormat1.6 +org.junit.jupiter:junit-jupiter-api:5.9.0=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-engine:5.9.0=testRuntimeClasspath +org.junit.jupiter:junit-jupiter-params:5.9.0=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.junit.platform:junit-platform-commons:1.9.0=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.junit.platform:junit-platform-engine:1.9.0=testRuntimeClasspath +org.junit:junit-bom:5.9.0=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +org.opentest4j:opentest4j:1.2.0=testCompileClasspath,testCompileProtoPath,testRuntimeClasspath +empty=annotationProcessor,protobuf,testAnnotationProcessor,testProtobuf diff --git a/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadata.java b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadata.java new file mode 100644 index 0000000..acb9faf --- /dev/null +++ b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadata.java @@ -0,0 +1,594 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: rsocket/call_metadata.proto + +package io.rsocket.rpc; + +/** + * Protobuf type {@code io.rsocket.rpc.RpcCallMetadata} + */ +public final class RpcCallMetadata extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:io.rsocket.rpc.RpcCallMetadata) + RpcCallMetadataOrBuilder { +private static final long serialVersionUID = 0L; + // Use RpcCallMetadata.newBuilder() to construct. + private RpcCallMetadata(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private RpcCallMetadata() { + nameValues_ = + com.google.protobuf.LazyStringArrayList.emptyList(); + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + UnusedPrivateParameter unused) { + return new RpcCallMetadata(); + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return io.rsocket.rpc.RpcCallMetadataProto.internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return io.rsocket.rpc.RpcCallMetadataProto.internal_static_io_rsocket_rpc_RpcCallMetadata_fieldAccessorTable + .ensureFieldAccessorsInitialized( + io.rsocket.rpc.RpcCallMetadata.class, io.rsocket.rpc.RpcCallMetadata.Builder.class); + } + + public static final int NAME_VALUES_FIELD_NUMBER = 1; + @SuppressWarnings("serial") + private com.google.protobuf.LazyStringArrayList nameValues_ = + com.google.protobuf.LazyStringArrayList.emptyList(); + /** + * repeated string name_values = 1; + * @return A list containing the nameValues. + */ + public com.google.protobuf.ProtocolStringList + getNameValuesList() { + return nameValues_; + } + /** + * repeated string name_values = 1; + * @return The count of nameValues. + */ + public int getNameValuesCount() { + return nameValues_.size(); + } + /** + * repeated string name_values = 1; + * @param index The index of the element to return. + * @return The nameValues at the given index. + */ + public java.lang.String getNameValues(int index) { + return nameValues_.get(index); + } + /** + * repeated string name_values = 1; + * @param index The index of the value to return. + * @return The bytes of the nameValues at the given index. + */ + public com.google.protobuf.ByteString + getNameValuesBytes(int index) { + return nameValues_.getByteString(index); + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + for (int i = 0; i < nameValues_.size(); i++) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 1, nameValues_.getRaw(i)); + } + getUnknownFields().writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < nameValues_.size(); i++) { + dataSize += computeStringSizeNoTag(nameValues_.getRaw(i)); + } + size += dataSize; + size += 1 * getNameValuesList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof io.rsocket.rpc.RpcCallMetadata)) { + return super.equals(obj); + } + io.rsocket.rpc.RpcCallMetadata other = (io.rsocket.rpc.RpcCallMetadata) obj; + + if (!getNameValuesList() + .equals(other.getNameValuesList())) return false; + if (!getUnknownFields().equals(other.getUnknownFields())) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (getNameValuesCount() > 0) { + hash = (37 * hash) + NAME_VALUES_FIELD_NUMBER; + hash = (53 * hash) + getNameValuesList().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + public static io.rsocket.rpc.RpcCallMetadata parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + + public static io.rsocket.rpc.RpcCallMetadata parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static io.rsocket.rpc.RpcCallMetadata parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(io.rsocket.rpc.RpcCallMetadata prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code io.rsocket.rpc.RpcCallMetadata} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:io.rsocket.rpc.RpcCallMetadata) + io.rsocket.rpc.RpcCallMetadataOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return io.rsocket.rpc.RpcCallMetadataProto.internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return io.rsocket.rpc.RpcCallMetadataProto.internal_static_io_rsocket_rpc_RpcCallMetadata_fieldAccessorTable + .ensureFieldAccessorsInitialized( + io.rsocket.rpc.RpcCallMetadata.class, io.rsocket.rpc.RpcCallMetadata.Builder.class); + } + + // Construct using io.rsocket.rpc.RpcCallMetadata.newBuilder() + private Builder() { + + } + + private Builder( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + + } + @java.lang.Override + public Builder clear() { + super.clear(); + bitField0_ = 0; + nameValues_ = + com.google.protobuf.LazyStringArrayList.emptyList(); + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return io.rsocket.rpc.RpcCallMetadataProto.internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor; + } + + @java.lang.Override + public io.rsocket.rpc.RpcCallMetadata getDefaultInstanceForType() { + return io.rsocket.rpc.RpcCallMetadata.getDefaultInstance(); + } + + @java.lang.Override + public io.rsocket.rpc.RpcCallMetadata build() { + io.rsocket.rpc.RpcCallMetadata result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public io.rsocket.rpc.RpcCallMetadata buildPartial() { + io.rsocket.rpc.RpcCallMetadata result = new io.rsocket.rpc.RpcCallMetadata(this); + if (bitField0_ != 0) { buildPartial0(result); } + onBuilt(); + return result; + } + + private void buildPartial0(io.rsocket.rpc.RpcCallMetadata result) { + int from_bitField0_ = bitField0_; + if (((from_bitField0_ & 0x00000001) != 0)) { + nameValues_.makeImmutable(); + result.nameValues_ = nameValues_; + } + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof io.rsocket.rpc.RpcCallMetadata) { + return mergeFrom((io.rsocket.rpc.RpcCallMetadata)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(io.rsocket.rpc.RpcCallMetadata other) { + if (other == io.rsocket.rpc.RpcCallMetadata.getDefaultInstance()) return this; + if (!other.nameValues_.isEmpty()) { + if (nameValues_.isEmpty()) { + nameValues_ = other.nameValues_; + bitField0_ |= 0x00000001; + } else { + ensureNameValuesIsMutable(); + nameValues_.addAll(other.nameValues_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + java.lang.String s = input.readStringRequireUtf8(); + ensureNameValuesIsMutable(); + nameValues_.add(s); + break; + } // case 10 + default: { + if (!super.parseUnknownField(input, extensionRegistry, tag)) { + done = true; // was an endgroup tag + } + break; + } // default: + } // switch (tag) + } // while (!done) + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } finally { + onChanged(); + } // finally + return this; + } + private int bitField0_; + + private com.google.protobuf.LazyStringArrayList nameValues_ = + com.google.protobuf.LazyStringArrayList.emptyList(); + private void ensureNameValuesIsMutable() { + if (!nameValues_.isModifiable()) { + nameValues_ = new com.google.protobuf.LazyStringArrayList(nameValues_); + } + bitField0_ |= 0x00000001; + } + /** + * repeated string name_values = 1; + * @return A list containing the nameValues. + */ + public com.google.protobuf.ProtocolStringList + getNameValuesList() { + nameValues_.makeImmutable(); + return nameValues_; + } + /** + * repeated string name_values = 1; + * @return The count of nameValues. + */ + public int getNameValuesCount() { + return nameValues_.size(); + } + /** + * repeated string name_values = 1; + * @param index The index of the element to return. + * @return The nameValues at the given index. + */ + public java.lang.String getNameValues(int index) { + return nameValues_.get(index); + } + /** + * repeated string name_values = 1; + * @param index The index of the value to return. + * @return The bytes of the nameValues at the given index. + */ + public com.google.protobuf.ByteString + getNameValuesBytes(int index) { + return nameValues_.getByteString(index); + } + /** + * repeated string name_values = 1; + * @param index The index to set the value at. + * @param value The nameValues to set. + * @return This builder for chaining. + */ + public Builder setNameValues( + int index, java.lang.String value) { + if (value == null) { throw new NullPointerException(); } + ensureNameValuesIsMutable(); + nameValues_.set(index, value); + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + /** + * repeated string name_values = 1; + * @param value The nameValues to add. + * @return This builder for chaining. + */ + public Builder addNameValues( + java.lang.String value) { + if (value == null) { throw new NullPointerException(); } + ensureNameValuesIsMutable(); + nameValues_.add(value); + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + /** + * repeated string name_values = 1; + * @param values The nameValues to add. + * @return This builder for chaining. + */ + public Builder addAllNameValues( + java.lang.Iterable values) { + ensureNameValuesIsMutable(); + com.google.protobuf.AbstractMessageLite.Builder.addAll( + values, nameValues_); + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + /** + * repeated string name_values = 1; + * @return This builder for chaining. + */ + public Builder clearNameValues() { + nameValues_ = + com.google.protobuf.LazyStringArrayList.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001);; + onChanged(); + return this; + } + /** + * repeated string name_values = 1; + * @param value The bytes of the nameValues to add. + * @return This builder for chaining. + */ + public Builder addNameValuesBytes( + com.google.protobuf.ByteString value) { + if (value == null) { throw new NullPointerException(); } + checkByteStringIsUtf8(value); + ensureNameValuesIsMutable(); + nameValues_.add(value); + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:io.rsocket.rpc.RpcCallMetadata) + } + + // @@protoc_insertion_point(class_scope:io.rsocket.rpc.RpcCallMetadata) + private static final io.rsocket.rpc.RpcCallMetadata DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new io.rsocket.rpc.RpcCallMetadata(); + } + + public static io.rsocket.rpc.RpcCallMetadata getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @java.lang.Override + public RpcCallMetadata parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + Builder builder = newBuilder(); + try { + builder.mergeFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(builder.buildPartial()); + } catch (com.google.protobuf.UninitializedMessageException e) { + throw e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial()); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException(e) + .setUnfinishedMessage(builder.buildPartial()); + } + return builder.buildPartial(); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public io.rsocket.rpc.RpcCallMetadata getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + +} + diff --git a/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataOrBuilder.java b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataOrBuilder.java new file mode 100644 index 0000000..9e63b08 --- /dev/null +++ b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataOrBuilder.java @@ -0,0 +1,34 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: rsocket/call_metadata.proto + +package io.rsocket.rpc; + +public interface RpcCallMetadataOrBuilder extends + // @@protoc_insertion_point(interface_extends:io.rsocket.rpc.RpcCallMetadata) + com.google.protobuf.MessageOrBuilder { + + /** + * repeated string name_values = 1; + * @return A list containing the nameValues. + */ + java.util.List + getNameValuesList(); + /** + * repeated string name_values = 1; + * @return The count of nameValues. + */ + int getNameValuesCount(); + /** + * repeated string name_values = 1; + * @param index The index of the element to return. + * @return The nameValues at the given index. + */ + java.lang.String getNameValues(int index); + /** + * repeated string name_values = 1; + * @param index The index of the value to return. + * @return The bytes of the nameValues at the given index. + */ + com.google.protobuf.ByteString + getNameValuesBytes(int index); +} diff --git a/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataProto.java b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataProto.java new file mode 100644 index 0000000..b1526df --- /dev/null +++ b/rsocket-test/src/generated/main/java/io/rsocket/rpc/RpcCallMetadataProto.java @@ -0,0 +1,49 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: rsocket/call_metadata.proto + +package io.rsocket.rpc; + +public final class RpcCallMetadataProto { + private RpcCallMetadataProto() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + static final com.google.protobuf.Descriptors.Descriptor + internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor; + static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_io_rsocket_rpc_RpcCallMetadata_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\033rsocket/call_metadata.proto\022\016io.rsocke" + + "t.rpc\"&\n\017RpcCallMetadata\022\023\n\013name_values\030" + + "\001 \003(\tB(\n\016io.rsocket.rpcB\024RpcCallMetadata" + + "ProtoP\001b\006proto3" + }; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }); + internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_io_rsocket_rpc_RpcCallMetadata_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_io_rsocket_rpc_RpcCallMetadata_descriptor, + new java.lang.String[] { "NameValues", }); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/rsocket-test/src/test/java/com/jauntsdn/rsocket/ProtobufMetadataTest.java b/rsocket-test/src/test/java/com/jauntsdn/rsocket/ProtobufMetadataTest.java new file mode 100644 index 0000000..cef4b51 --- /dev/null +++ b/rsocket-test/src/test/java/com/jauntsdn/rsocket/ProtobufMetadataTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2023 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.jauntsdn.rsocket; + +import com.jauntsdn.rsocket.exceptions.MetadataException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.rsocket.rpc.RpcCallMetadata; +import java.util.Arrays; +import java.util.List; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ProtobufMetadataTest { + + @Test + void encodeLength() { + for (int l = 1; l < 8192; l++) { + String str = "s".repeat(l); + Headers headers = Headers.create(str, str); + int lenSize = l <= 127 ? 2 : 3; + ByteBuf expectedLenBuf = encodeProtobufJava(headers).slice(0, lenSize); + ByteBuf actualLenBuf = encodeProtobufHeadersLen(str.length()); + try { + Assertions.assertThat(actualLenBuf.readableBytes()).isEqualTo(lenSize); + Assertions.assertThat((actualLenBuf)).isEqualTo((expectedLenBuf)); + } finally { + expectedLenBuf.release(); + actualLenBuf.release(); + } + } + } + + @Test + void encodeSmallHeaders() { + Headers headers = Headers.create("k", "v", "a", "b"); + ByteBuf expectedBuf = encodeProtobufJava(headers); + ByteBuf actualBuf = encodeProtobufHeaders(headers); + try { + Assertions.assertThat(actualBuf).isEqualTo(expectedBuf); + } finally { + expectedBuf.release(); + actualBuf.release(); + } + } + + @Test + void encodeLargeHeaders() { + String k = "k".repeat(300); + String v = "v".repeat(300); + String a = "a".repeat(300); + String b = "b".repeat(300); + Headers headers = Headers.create(k, v, a, b); + ByteBuf expectedBuf = encodeProtobufJava(headers); + ByteBuf actualBuf = encodeProtobufHeaders(headers); + try { + Assertions.assertThat(actualBuf).isEqualTo(expectedBuf); + } finally { + expectedBuf.release(); + actualBuf.release(); + } + } + + @Test + void decodeHeaders() { + for (int l = 1; l < 8192; l++) { + String key1 = "k".repeat(l); + String value1 = "v".repeat(l); + String key2 = "a".repeat(l); + String value2 = "b".repeat(l); + Headers expected = Headers.create(key1, value1, key2, value2); + ByteBuf metadata = encodeProtobufJava(expected); + int actualSerializedSize = metadata.readableBytes(); + Headers actual = Rpc.ProtoMetadata.decodeHeaders(metadata); + try { + Assertions.assertThat(expected.serializedSize()).isEqualTo(actualSerializedSize); + Assertions.assertThat(expected.headers()).isEqualTo(actual.headers()); + } finally { + metadata.release(); + } + } + } + + @Test + void decodeTooLargeHeaders() { + String key = "k".repeat(42_000); + String value = "v".repeat(22_000); + ByteBuf metadata = encodeProtobufJava(Arrays.asList(key, value)); + try { + org.junit.jupiter.api.Assertions.assertThrows( + MetadataException.class, + () -> { + Headers headers = Rpc.ProtoMetadata.decodeHeaders(metadata); + }); + } finally { + metadata.release(); + } + } + + @Test + void decodeEmptyHeaders() { + Headers actual = Rpc.ProtoMetadata.decodeHeaders(Unpooled.EMPTY_BUFFER); + Assertions.assertThat(actual).isSameAs(Headers.empty()); + } + + public static ByteBuf encodeProtobufHeadersLen(int len) { + ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(3); + Rpc.ProtoMetadata.encodeLen(buffer, len); + return buffer; + } + + public static ByteBuf encodeProtobufHeaders(Headers headers) { + return Rpc.ProtoMetadata.encodeHeaders(headers); + } + + public static ByteBuf encodeProtobufJava(Headers headers) { + return encodeProtobufJava(Rpc.ProtoMetadata.getHeaders(headers)); + } + + public static ByteBuf encodeProtobufJava(List headers) { + RpcCallMetadata message = RpcCallMetadata.newBuilder().addAllNameValues(headers).build(); + + int length = message.getSerializedSize(); + ByteBuf content = ByteBufAllocator.DEFAULT.buffer(length, length); + try { + message.writeTo( + com.google.protobuf.CodedOutputStream.newInstance(content.internalNioBuffer(0, length))); + content.writerIndex(length); + return content; + } catch (Throwable t) { + content.release(); + throw new com.jauntsdn.rsocket.exceptions.SerializationException( + "Message serialization error", t); + } + } +} diff --git a/settings.gradle b/settings.gradle index f05adaf..e7e0bbb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,6 +14,7 @@ include "rsocket-rpc-idl" include "rsocket-rpc-metadata-idl" include "rsocket-bom" include "rsocket-messages" +include "rsocket-test" include "rsocket-reactor" include "rsocket-rpc-reactor" include "rsocket-rxjava" @@ -39,4 +40,4 @@ boolean includeVirtualThreads() { } def virtualthreads = properties.get("virtualthreads") return virtualthreads == null || virtualthreads.isEmpty() || virtualthreads == "true" -} \ No newline at end of file +}