From c5d792e9b85cc7b00a66538b4d19848934cdc9b2 Mon Sep 17 00:00:00 2001 From: Gaurav Ashok Date: Mon, 26 Jun 2023 00:54:45 +0530 Subject: [PATCH] grpc over http2.0 poc --- CONTRIBUTING.md | 3 +- ...art.varadhi.java-common-conventions.gradle | 5 +- server/build.gradle | 33 +++++++++++- .../com/flipkart/varadhi/CoreServices.java | 12 ++--- .../com/flipkart/varadhi/RestVerticle.java | 39 ++++++++++++-- .../java/com/flipkart/varadhi/Server.java | 1 - .../flipkart/varadhi/VerticleDeployer.java | 4 +- .../varadhi/utils/RequestBodyExtension.java | 17 ------ .../varadhi/utils/ResponseExtension.java | 30 ----------- .../flipkart/varadhi/web/AuthHandlers.java | 4 +- .../com/flipkart/varadhi/web/Extensions.java | 52 +++++++++++++++++++ .../com/flipkart/varadhi/web/HandlerUtil.java | 9 ---- ...urProvider.java => RouteConfigurator.java} | 2 +- .../varadhi/web/v1/HealthCheckHandler.java | 6 +-- .../varadhi/web/v1/TopicHandlers.java | 13 +++-- server/src/main/proto/api.v1.proto | 19 +++++++ 16 files changed, 161 insertions(+), 88 deletions(-) delete mode 100644 server/src/main/java/com/flipkart/varadhi/utils/RequestBodyExtension.java delete mode 100644 server/src/main/java/com/flipkart/varadhi/utils/ResponseExtension.java create mode 100644 server/src/main/java/com/flipkart/varadhi/web/Extensions.java delete mode 100644 server/src/main/java/com/flipkart/varadhi/web/HandlerUtil.java rename server/src/main/java/com/flipkart/varadhi/web/routes/{RouteBehaviourProvider.java => RouteConfigurator.java} (76%) create mode 100644 server/src/main/proto/api.v1.proto diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 71ce014d..a98f5cd9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,7 +13,8 @@ - No @author tags in any javadoc. - Use try-with-resources blocks whenever is possible. - TODOs should be associated to at least one issue. -- Always format the contributed code. +- Always format the contributed code. In Intellij, it is recommended to enable "Reformat Code" & "Optimize Imports" + via "Tools > Actions on Save". ## Unit tests diff --git a/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle b/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle index 749bd7fc..cd715708 100644 --- a/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle +++ b/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle @@ -16,7 +16,10 @@ repositories { ext { lombok_version = "1.18.26" slf4j_version = "2.0.7" - vertx_version = "4.4.1" + javax_version = "1.3.2" + vertx_version = "4.4.4" + protoc_version = "3.23.2" + grpc_version = "1.56.0" otl_version = "1.25.0" micrometer_version = "1.10.6" diff --git a/server/build.gradle b/server/build.gradle index 37295806..02e2a052 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -1,9 +1,36 @@ import org.gradle.nativeplatform.platform.internal.DefaultNativePlatform +buildscript { + dependencies { + // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.3' + } +} + plugins { id 'com.flipkart.varadhi.java-application-conventions' + id 'com.google.protobuf' version "0.9.3" } +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protoc_version" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version" + } + vertx { + artifact = "io.vertx:vertx-grpc-protoc-plugin2:$vertx_version" + } + } + generateProtoTasks { + all()*.plugins { + grpc + vertx + } + } +} dependencies { @@ -16,11 +43,15 @@ dependencies { implementation('org.apache.logging.log4j:log4j-core') implementation('com.fasterxml.jackson.core:jackson-databind') + implementation("javax.annotation:javax.annotation-api:$javax_version") implementation("io.vertx:vertx-core") implementation("io.vertx:vertx-config") implementation("io.vertx:vertx-config-yaml") implementation("io.vertx:vertx-web") + implementation("io.vertx:vertx-grpc-server:$vertx_version") + implementation("io.vertx:vertx-grpc-client:$vertx_version") + implementation("io.vertx:vertx-grpc-context-storage:$vertx_version") implementation("io.vertx:vertx-auth-common") implementation("io.vertx:vertx-auth-jwt") implementation("io.vertx:vertx-opentelemetry") @@ -28,7 +59,7 @@ dependencies { // TODO: check why still getting warning on class not found. if (DefaultNativePlatform.getCurrentOperatingSystem().isMacOsX()) { - runtimeOnly('io.netty:netty-resolver-dns-native-macos') + runtimeOnly('io.netty:netty-resolver-dns-native-macos:4.1.91.Final:osx-x86_64') } implementation("io.opentelemetry:opentelemetry-sdk") diff --git a/server/src/main/java/com/flipkart/varadhi/CoreServices.java b/server/src/main/java/com/flipkart/varadhi/CoreServices.java index 629ed7af..aa57bd38 100644 --- a/server/src/main/java/com/flipkart/varadhi/CoreServices.java +++ b/server/src/main/java/com/flipkart/varadhi/CoreServices.java @@ -4,8 +4,8 @@ import com.flipkart.varadhi.db.MetaStoreOptions; import com.flipkart.varadhi.db.MetaStoreProvider; import com.flipkart.varadhi.exceptions.InvalidConfigException; -import com.flipkart.varadhi.services.MessagingStackProvider; import com.flipkart.varadhi.services.MessagingStackOptions; +import com.flipkart.varadhi.services.MessagingStackProvider; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.jmx.JmxConfig; @@ -66,7 +66,7 @@ private MetaStoreProvider setupMetaStoreProvider(MetaStoreOptions metaStoreOptio provider.init(metaStoreOptions); return provider; } - + private MessagingStackProvider setupMessagingStackProvider(MessagingStackOptions messagingStackOptions) { MessagingStackProvider provider = loadClass(messagingStackOptions.getProviderClassName()); provider.init(messagingStackOptions); @@ -81,9 +81,7 @@ private T loadClass(String className) { } throw new InvalidConfigException("No class provided."); } catch (Exception e) { - String errorMsg = String.format("Fail to load class %s.", className); - log.error(errorMsg, e); - throw new InvalidConfigException(e); + throw new InvalidConfigException(String.format("Fail to load class %s.", className), e); } } @@ -115,10 +113,6 @@ private ObservabilityStack setupObservabilityStack(ServerConfiguration configura return new ObservabilityStack(openTelemetry, meterRegistry); } - - - - @Getter @AllArgsConstructor public static class ObservabilityStack { diff --git a/server/src/main/java/com/flipkart/varadhi/RestVerticle.java b/server/src/main/java/com/flipkart/varadhi/RestVerticle.java index bc47b93c..7d94e111 100644 --- a/server/src/main/java/com/flipkart/varadhi/RestVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/RestVerticle.java @@ -1,30 +1,38 @@ package com.flipkart.varadhi; import com.flipkart.varadhi.exceptions.InvalidStateException; +import com.flipkart.varadhi.web.Extensions; import com.flipkart.varadhi.web.FailureHandler; import com.flipkart.varadhi.web.routes.RouteBehaviour; -import com.flipkart.varadhi.web.routes.RouteBehaviourProvider; +import com.flipkart.varadhi.web.routes.RouteConfigurator; import com.flipkart.varadhi.web.routes.RouteDefinition; +import com.flipkart.varadhi.web.v1.proto.MessageProducerGrpc; +import com.flipkart.varadhi.web.v1.proto.SingleMessageResponse; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; +import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; +import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.server.GrpcServer; import lombok.extern.slf4j.Slf4j; +import java.util.Arrays; import java.util.List; import java.util.Map; @Slf4j public class RestVerticle extends AbstractVerticle { private final List apiRoutes; - private final Map behaviorProviders; + private final Map behaviorProviders; private HttpServer httpServer; public RestVerticle( - List apiRoutes, Map behaviorProviders + List apiRoutes, Map behaviorProviders ) { this.apiRoutes = apiRoutes; this.behaviorProviders = behaviorProviders; @@ -36,11 +44,32 @@ public void start(Promise startPromise) { log.info("HttpServer Starting."); Router router = Router.router(vertx); + // grpc + GrpcServer grpcServer = GrpcServer.server(vertx); + + grpcServer.callHandler(MessageProducerGrpc.getProduceMethod(), request -> { + request.response().status(GrpcStatus.OK).statusMessage("OK") + .end(SingleMessageResponse.newBuilder().setOffset("0001").build()); + }); + + router.route() + .consumes("application/grpc") + .produces("application/grpc") + .handler(req -> grpcServer.handle(req.request())); + + router.route() + .consumes("application/json") + .produces("application/json") + .path("/topics/produce") + .method(HttpMethod.POST) + .handler(rc -> Extensions.RoutingContextExtension.endRequestWithResponse(rc, 200, "0001")); + + // http FailureHandler failureHandler = new FailureHandler(); for (RouteDefinition def : apiRoutes) { Route route = router.route().method(def.method()).path(def.path()); def.behaviours().forEach(behaviour -> { - RouteBehaviourProvider behaviorProvider = behaviorProviders.getOrDefault(behaviour, null); + RouteConfigurator behaviorProvider = behaviorProviders.getOrDefault(behaviour, null); if (null != behaviorProvider) { behaviorProvider.configure(route, def); } else { @@ -58,6 +87,8 @@ public void start(Promise startPromise) { HttpServerOptions options = new HttpServerOptions(); // TODO: why? options.setDecompressionSupported(false); + options.setAlpnVersions(Arrays.asList(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1)); + options.setUseAlpn(true); // TODO: create config for http server httpServer = vertx.createHttpServer(options).requestHandler(router).listen(8080, h -> { diff --git a/server/src/main/java/com/flipkart/varadhi/Server.java b/server/src/main/java/com/flipkart/varadhi/Server.java index a9db1456..a9bcee41 100644 --- a/server/src/main/java/com/flipkart/varadhi/Server.java +++ b/server/src/main/java/com/flipkart/varadhi/Server.java @@ -25,7 +25,6 @@ public static void main(String[] args) { log.info("Server Started."); } catch (Exception e) { log.error("Failed to initialise the server.", e); - System.out.println("Failed to initialise the server:" + e); System.exit(-1); } // TODO: check need for shutdown hook diff --git a/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java b/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java index 8b5fd741..f88b5c51 100644 --- a/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java +++ b/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java @@ -8,7 +8,7 @@ import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.web.AuthHandlers; import com.flipkart.varadhi.web.routes.RouteBehaviour; -import com.flipkart.varadhi.web.routes.RouteBehaviourProvider; +import com.flipkart.varadhi.web.routes.RouteConfigurator; import com.flipkart.varadhi.web.routes.RouteDefinition; import com.flipkart.varadhi.web.v1.HealthCheckHandler; import com.flipkart.varadhi.web.v1.TopicHandlers; @@ -29,7 +29,7 @@ public class VerticleDeployer { private final TopicHandlers topicHandlers; private final HealthCheckHandler healthCheckHandler; - private final Map behaviorProviders = new HashMap<>(); + private final Map behaviorProviders = new HashMap<>(); public VerticleDeployer( Vertx vertx, diff --git a/server/src/main/java/com/flipkart/varadhi/utils/RequestBodyExtension.java b/server/src/main/java/com/flipkart/varadhi/utils/RequestBodyExtension.java deleted file mode 100644 index 917adb44..00000000 --- a/server/src/main/java/com/flipkart/varadhi/utils/RequestBodyExtension.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.flipkart.varadhi.utils; - -import io.vertx.ext.web.RequestBody; - -public class RequestBodyExtension { - - /* - Extension method for vertx RequestBody. - builtin asPojo() method is not working because of jackson issues i.e. - it needs default constructor and none final fields. - - Extending RequestBody to have asPojo() custom deserializer to convert requestBody to appropriate Pojo. - */ - public static T asPojo(RequestBody body, Class clazz) { - return JsonMapper.jsonDeserialize(body.asString(), clazz); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/utils/ResponseExtension.java b/server/src/main/java/com/flipkart/varadhi/utils/ResponseExtension.java deleted file mode 100644 index 89f420a8..00000000 --- a/server/src/main/java/com/flipkart/varadhi/utils/ResponseExtension.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.flipkart.varadhi.utils; - -import io.netty.handler.codec.http.HttpHeaderValues; -import io.vertx.core.http.HttpHeaders; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.ext.web.RoutingContext; -import lombok.extern.slf4j.Slf4j; - - -@Slf4j -public class ResponseExtension { - public static void endRequestWithResponse(RoutingContext ctx, T response) { - String responseBody = JsonMapper.jsonSerialize(response); - ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); - ctx.response().putHeader(HttpHeaders.CONTENT_ENCODING, "utf-8"); - ctx.response().end(responseBody, (r) -> { - HttpServerRequest request = ctx.request(); - if (r.succeeded()) { - log.debug("Request {}:{} completed successfully.", request.method(), request.path()); - } else { - log.error("Request {}:{} Failed to send response: {}", request.method(), request.path(), r.cause()); - } - }); - } - - public static void endRequestWithResponse(RoutingContext ctx, int status, T response) { - ctx.response().setStatusCode(status); - endRequestWithResponse(ctx, response); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/web/AuthHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/AuthHandlers.java index 50803c87..2f3bf7c5 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/AuthHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/AuthHandlers.java @@ -5,7 +5,7 @@ import com.flipkart.varadhi.auth.AuthorizationProvider; import com.flipkart.varadhi.exceptions.InvalidConfigException; import com.flipkart.varadhi.exceptions.VaradhiException; -import com.flipkart.varadhi.web.routes.RouteBehaviourProvider; +import com.flipkart.varadhi.web.routes.RouteConfigurator; import com.flipkart.varadhi.web.routes.RouteDefinition; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -25,7 +25,7 @@ import static java.net.HttpURLConnection.HTTP_OK; -public class AuthHandlers implements RouteBehaviourProvider { +public class AuthHandlers implements RouteConfigurator { private final Handler authenticationHandler; private final AuthorizationHandlerBuilder authorizationHandlerBuilder; diff --git a/server/src/main/java/com/flipkart/varadhi/web/Extensions.java b/server/src/main/java/com/flipkart/varadhi/web/Extensions.java new file mode 100644 index 00000000..76f5de6c --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/web/Extensions.java @@ -0,0 +1,52 @@ +package com.flipkart.varadhi.web; + +import com.flipkart.varadhi.utils.JsonMapper; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.RoutingContext; +import lombok.extern.slf4j.Slf4j; + +public class Extensions { + + public static class RequestBodyExtension { + + /* + Extension method for vertx RequestBody. + builtin asPojo() method is not working because of jackson issues i.e. + it needs default constructor and none final fields. + + Extending RequestBody to have asPojo() custom deserializer to convert requestBody to appropriate Pojo. + */ + public static T asPojo(RequestBody body, Class clazz) { + return JsonMapper.jsonDeserialize(body.asString(), clazz); + } + } + + @Slf4j + public static class RoutingContextExtension { + public static void endRequestWithResponse(RoutingContext ctx, T response) { + String responseBody = JsonMapper.jsonSerialize(response); + ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); + ctx.response().putHeader(HttpHeaders.CONTENT_ENCODING, "utf-8"); + ctx.response().end(responseBody, (r) -> { + HttpServerRequest request = ctx.request(); + if (r.succeeded()) { + log.debug("Request {}:{} completed successfully.", request.method(), request.path()); + } else { + log.error("Request {}:{} Failed to send response: {}", request.method(), request.path(), r.cause()); + } + }); + } + + public static void endRequestWithResponse(RoutingContext ctx, int status, T response) { + ctx.response().setStatusCode(status); + endRequestWithResponse(ctx, response); + } + + public static void todo(RoutingContext context) { + context.response().setStatusCode(500).setStatusMessage("Not Implemented").end(); + } + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/web/HandlerUtil.java b/server/src/main/java/com/flipkart/varadhi/web/HandlerUtil.java deleted file mode 100644 index b10671d0..00000000 --- a/server/src/main/java/com/flipkart/varadhi/web/HandlerUtil.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.flipkart.varadhi.web; - -import io.vertx.ext.web.RoutingContext; - -public class HandlerUtil { - public static void handleTodo(RoutingContext context) { - context.response().setStatusCode(500).setStatusMessage("Not Implemented").end(); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/web/routes/RouteBehaviourProvider.java b/server/src/main/java/com/flipkart/varadhi/web/routes/RouteConfigurator.java similarity index 76% rename from server/src/main/java/com/flipkart/varadhi/web/routes/RouteBehaviourProvider.java rename to server/src/main/java/com/flipkart/varadhi/web/routes/RouteConfigurator.java index d1229b1e..6bce9222 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/routes/RouteBehaviourProvider.java +++ b/server/src/main/java/com/flipkart/varadhi/web/routes/RouteConfigurator.java @@ -2,6 +2,6 @@ import io.vertx.ext.web.Route; -public interface RouteBehaviourProvider { +public interface RouteConfigurator { void configure(Route route, RouteDefinition routeDef); } diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/HealthCheckHandler.java b/server/src/main/java/com/flipkart/varadhi/web/v1/HealthCheckHandler.java index 46615562..e3d2f3ff 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/HealthCheckHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/HealthCheckHandler.java @@ -1,8 +1,8 @@ package com.flipkart.varadhi.web.v1; -import com.flipkart.varadhi.utils.ResponseExtension; -import com.flipkart.varadhi.web.routes.RouteProvider; +import com.flipkart.varadhi.web.Extensions.RoutingContextExtension; import com.flipkart.varadhi.web.routes.RouteDefinition; +import com.flipkart.varadhi.web.routes.RouteProvider; import io.vertx.core.Handler; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.RoutingContext; @@ -15,7 +15,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; -@ExtensionMethod({ResponseExtension.class}) +@ExtensionMethod({RoutingContextExtension.class}) public class HealthCheckHandler implements Handler, RouteProvider { // TODO: add appropriate checks diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/TopicHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/TopicHandlers.java index 417efc2e..73e6174d 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/TopicHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/TopicHandlers.java @@ -7,11 +7,10 @@ import com.flipkart.varadhi.entities.VaradhiTopicFactory; import com.flipkart.varadhi.exceptions.DuplicateResourceException; import com.flipkart.varadhi.services.VaradhiTopicService; -import com.flipkart.varadhi.utils.RequestBodyExtension; -import com.flipkart.varadhi.utils.ResponseExtension; -import com.flipkart.varadhi.web.HandlerUtil; -import com.flipkart.varadhi.web.routes.RouteProvider; +import com.flipkart.varadhi.web.Extensions.RequestBodyExtension; +import com.flipkart.varadhi.web.Extensions.RoutingContextExtension; import com.flipkart.varadhi.web.routes.RouteDefinition; +import com.flipkart.varadhi.web.routes.RouteProvider; import com.flipkart.varadhi.web.routes.SubRoutes; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.RoutingContext; @@ -27,7 +26,7 @@ import static com.flipkart.varadhi.web.routes.RouteBehaviour.hasBody; @Slf4j -@ExtensionMethod({RequestBodyExtension.class, ResponseExtension.class}) +@ExtensionMethod({RequestBodyExtension.class, RoutingContextExtension.class}) public class TopicHandlers implements RouteProvider { private final VaradhiTopicFactory varadhiTopicFactory; @@ -67,7 +66,7 @@ public List get() { } public void get(RoutingContext ctx) { - HandlerUtil.handleTodo(ctx); + ctx.todo(); } public void create(RoutingContext ctx) { @@ -92,6 +91,6 @@ public void create(RoutingContext ctx) { public void delete(RoutingContext ctx) { - HandlerUtil.handleTodo(ctx); + ctx.todo(); } } diff --git a/server/src/main/proto/api.v1.proto b/server/src/main/proto/api.v1.proto new file mode 100644 index 00000000..886a6596 --- /dev/null +++ b/server/src/main/proto/api.v1.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.flipkart.varadhi.web.v1.proto"; +option java_outer_classname = "ProduceApi"; + +// The greeting service definition. +service MessageProducer { + // Sends a greeting + rpc produce (SingleMessageRequest) returns (SingleMessageResponse) {} +} + +message SingleMessageRequest { + string payload = 1; +} + +message SingleMessageResponse { + string offset = 1; +}