Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC protobuf metadata: add internal codec #12

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ group=com.jauntsdn.rsocket
version=1.5.1

dependencyManagementPluginVersion=1.1.0
protobufPluginVersion=0.9.2
protobufPluginVersion=0.8.19
googleJavaFormatPluginVersion=0.9
gitPluginVersion=0.13.0
versionsPluginVersion=0.45.0
Expand All @@ -18,9 +18,11 @@ javaxInjectVersion=1
javaxAnnotationVersion=1.3.2
jakartaInjectVersion=2.0.1
jakartaAnnotationVersion=2.1.1

protobufVersion=3.23.4

junitVersion=5.9.0
assertjVersion=3.23.1

release=false
virtualthreads=true
toolchains=true
Expand Down
2 changes: 1 addition & 1 deletion gradle/bom.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ ext.removeDependencyScope = { pom ->

ext.managedDependencyModules = { parent ->
parent.subprojects
.findAll { !it.name.endsWith("bom")}
.findAll { !it.name.endsWith("bom") && !it.name.endsWith("test")}
.sort { "$it.name" }
}
5 changes: 5 additions & 0 deletions gradle/dependency-management.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ subprojects {
imports {
mavenBom "io.netty:netty-bom:${nettyBomVersion}"
mavenBom "io.projectreactor:reactor-bom:${reactorBomVersion}"
mavenBom "org.junit:junit-bom:${junitVersion}"
}

dependencies {
Expand All @@ -20,6 +21,10 @@ subprojects {
dependency "javax.inject:javax.inject:${javaxInjectVersion}"
dependency "javax.annotation:javax.annotation-api:${javaxAnnotationVersion}"
dependency "com.google.code.findbugs:jsr305:${jsr305Version}"

dependency "com.google.protobuf:protobuf-java:${protobufVersion}"
dependency "com.google.protobuf:protoc:${protobufVersion}"
dependency "org.assertj:assertj-core:${assertjVersion}"
}
generatedPomCustomization {
enabled = false
Expand Down
63 changes: 44 additions & 19 deletions rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
import java.util.Objects;

public final class Headers {
private static final Headers EMPTY = new Headers(false, Collections.emptyList());
private static final Headers DEFAULT_SERVICE = new Headers(true, Collections.emptyList());
private static final Headers EMPTY = new Headers(false, Collections.emptyList(), 0);
private static final Headers DEFAULT_SERVICE = new Headers(true, Collections.emptyList(), 0);

private final boolean isDefaultService;
private final int serializedSize;
private final List<String> nameValues;
private volatile ByteBuf cache;

private Headers(boolean isDefaultService, List<String> nameValues) {
private Headers(boolean isDefaultService, List<String> nameValues, int serializedSize) {
this.isDefaultService = isDefaultService;
this.nameValues = nameValues;
this.serializedSize = serializedSize;
}

public boolean isDefaultService() {
Expand Down Expand Up @@ -139,11 +141,11 @@ public static Headers create(String... headers) {
}

public static Headers create(boolean isDefaultService, String... headers) {
requireValid(headers, "headers");
int serializedSize = requireValid(headers, "headers");
if (headers.length == 0) {
return isDefaultService ? DEFAULT_SERVICE : EMPTY;
}
return new Headers(isDefaultService, Arrays.asList(headers));
return new Headers(isDefaultService, Arrays.asList(headers), serializedSize);
}

public static Headers empty() {
Expand All @@ -163,11 +165,11 @@ public static Headers.Builder newBuilder(int size) {
}

static Headers create(List<String> headers) {
requireValid(headers, "headers");
int serializedSize = requireValid(headers, "headers");
if (headers.isEmpty()) {
return EMPTY;
}
return new Headers(false, headers);
return new Headers(false, headers, serializedSize);
}

ByteBuf cache() {
Expand All @@ -184,9 +186,14 @@ List<String> headers() {
return nameValues;
}

public int serializedSize() {
return serializedSize;
}

public static final class Builder {
private final List<String> nameValues;
private boolean isDefaultService;
private int serializedSize;

private Builder(int size, List<String> headers) {
int length = headers.size();
Expand Down Expand Up @@ -215,6 +222,8 @@ public Builder add(String name, String value) {
List<String> nv = nameValues;
nv.add(name);
nv.add(value);
serializedSize +=
Rpc.ProtoMetadata.serializedSize(name) + Rpc.ProtoMetadata.serializedSize(value);
return this;
}

Expand All @@ -223,8 +232,14 @@ public Builder remove(String name) {
List<String> nv = nameValues;
for (int i = nv.size() - 2; i >= 0; i -= 2) {
if (name.equals(nv.get(i))) {
nv.remove(i + 1);
nv.remove(i);
String removedValue = nv.remove(i + 1);
String removedName = nv.remove(i);
if (removedValue != null) {
serializedSize -= Rpc.ProtoMetadata.serializedSize(removedValue);
}
if (removedName != null) {
serializedSize -= Rpc.ProtoMetadata.serializedSize(removedName);
}
}
}
return this;
Expand All @@ -236,15 +251,21 @@ public Builder remove(String name, String value) {
List<String> nv = nameValues;
for (int i = nv.size() - 2; i >= 0; i -= 2) {
if (name.equals(nv.get(i)) && value.equals(nv.get(i + 1))) {
nv.remove(i + 1);
nv.remove(i);
String removedValue = nv.remove(i + 1);
String removedName = nv.remove(i);
if (removedValue != null) {
serializedSize -= Rpc.ProtoMetadata.serializedSize(removedValue);
}
if (removedName != null) {
serializedSize -= Rpc.ProtoMetadata.serializedSize(removedName);
}
}
}
return this;
}

public Headers build() {
return new Headers(isDefaultService, nameValues);
return new Headers(isDefaultService, nameValues, serializedSize);
}
}

Expand All @@ -256,13 +277,14 @@ private static String requireNonEmpty(String seq, String message) {
return seq;
}

private static List<String> requireValid(List<String> keyValues, String message) {
private static int requireValid(List<String> keyValues, String message) {
Objects.requireNonNull(keyValues, "keyValues");
int size = keyValues.size();
if (size % 2 != 0) {
int length = keyValues.size();
if (length % 2 != 0) {
throw new IllegalArgumentException(message + " size must be even");
}
for (int i = 0; i < size; i++) {
int size = 0;
for (int i = 0; i < length; i++) {
String kv = keyValues.get(i);
if (kv == null) {
throw new IllegalArgumentException(message + " elements must be non-null");
Expand All @@ -271,16 +293,18 @@ private static List<String> requireValid(List<String> keyValues, String message)
if (isKey && kv.isEmpty()) {
throw new IllegalArgumentException(message + " keys must be non-empty");
}
size += Rpc.ProtoMetadata.serializedSize(kv);
}
return keyValues;
return size;
}

private static String[] requireValid(String[] keyValues, String message) {
private static int requireValid(String[] keyValues, String message) {
Objects.requireNonNull(keyValues, "keyValues");
int length = keyValues.length;
if (length % 2 != 0) {
throw new IllegalArgumentException(message + " size must be even");
}
int size = 0;
for (int i = 0; i < length; i++) {
String kv = keyValues[i];
if (kv == null) {
Expand All @@ -290,7 +314,8 @@ private static String[] requireValid(String[] keyValues, String message) {
if (isKey && kv.isEmpty()) {
throw new IllegalArgumentException(message + " keys must be non-empty");
}
size += Rpc.ProtoMetadata.serializedSize(kv);
}
return keyValues;
return size;
}
}
95 changes: 95 additions & 0 deletions rsocket-messages/src/main/java/com/jauntsdn/rsocket/Rpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@

package com.jauntsdn.rsocket;

import com.jauntsdn.rsocket.exceptions.MetadataException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public final class Rpc {

Expand Down Expand Up @@ -328,5 +335,93 @@ public static ByteBuf getCache(Headers headers) {
public static void setCache(Headers headers, ByteBuf cache) {
headers.cache(cache);
}

private static final int LEN_TAG = /*field*/ 1 << 3 | /*wire type LEN*/ 2;
private static final int VARINT_BYTE_MAX = 128;
private static final int HEADER_LENGTH_MAX = 8192;

public static ByteBuf encodeHeaders(Headers headers) {
Objects.requireNonNull(headers, "headers");
if (headers.isEmpty()) {
return Unpooled.EMPTY_BUFFER;
}
ByteBuf cache = headers.cache();
if (cache != null) {
return cache;
}
int serializedSize = headers.serializedSize();
ByteBuf byteBuf =
new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, serializedSize, serializedSize);

List<String> asciiHeaders = headers.headers();
for (int i = 0; i < asciiHeaders.size(); i++) {
String asciiHeader = asciiHeaders.get(i);
encodeLen(byteBuf, asciiHeader.length());
ByteBufUtil.writeAscii(byteBuf, asciiHeader);
}
headers.cache(byteBuf);
return byteBuf;
}

static void encodeLen(ByteBuf byteBuf, int len) {
if (len < VARINT_BYTE_MAX) {
byteBuf.writeShort(LEN_TAG << 8 | len);
} else {
byteBuf.writeByte(LEN_TAG);
int varintLen = (len & 0x7F | /*cont bit*/ 0x80) << 8 | ((len >> 7) & 0x7F);
byteBuf.writeShort(varintLen);
}
}

static int serializedSize(String asciiString) {
int headerLength = asciiString.length();
return headerLength < VARINT_BYTE_MAX ? headerLength + 2 : headerLength + 3;
}

public static Headers decodeHeaders(ByteBuf metadata) {
Objects.requireNonNull(metadata, "metadata");
if (metadata.readableBytes() == 0) {
return Headers.empty();
}
List<String> headers = null;
int remaining = metadata.readableBytes();
do {
if (remaining < 2) {
throw new MetadataException("unexpected metadata structure");
}
remaining -= Short.BYTES;
short tagLenStart = metadata.readShort();
int tag = tagLenStart >> 8;
if (tag != LEN_TAG) {
throw new MetadataException("unexpected protobuf metadata message tag: " + tag);
}
int lenStart = tagLenStart & 0xFF;
int len;
if ((lenStart & /*cont bit*/ 0x80) == 0) {
len = lenStart & 0x7F;
} else {
remaining -= Byte.BYTES;
byte lenEnd = metadata.readByte();
if ((lenEnd & /*cont bit*/ 0x80) != 0) {
throw new MetadataException(
"unexpected protobuf metadata header length, exceeds " + HEADER_LENGTH_MAX);
}
len = lenStart & 0x7F | lenEnd << 7;
if (len > HEADER_LENGTH_MAX) {
throw new MetadataException(
"unexpected protobuf metadata header length, exceeds "
+ HEADER_LENGTH_MAX
+ ": "
+ len);
}
}
if (headers == null) {
headers = new ArrayList<>(4);
}
remaining -= len;
headers.add(metadata.readCharSequence(len, StandardCharsets.US_ASCII).toString());
} while (remaining > 0);
return Headers.create(headers);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2023 - present Maksym Ostroverkhov.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.jauntsdn.rsocket.exceptions;

public final class MetadataException extends ApplicationErrorException {
private static final long serialVersionUID = -5140098709867987805L;

public MetadataException(String message) {
super(message);
}
}
Loading