Skip to content

Commit

Permalink
Added GrpcAccessLogFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
ajay-jalgaonkar committed Jul 19, 2024
1 parent 0f99aca commit 60c54a2
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ public class GrpcConfig {

@JsonProperty("server.executorThreads")
private int executorThreads = 0;

@JsonProperty("enableAccessLogs")
private boolean enableAccessLogs;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.flipkart.gjex.core.filter;

import com.flipkart.gjex.core.logging.Logging;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Metadata;

public class GrpcAccessLogFilter <R extends GeneratedMessageV3,
S extends GeneratedMessageV3> implements Filter<R, S>, Logging {
private long startTime = 0;
private ServerRequestParams serverRequestParams;
@Override
public Filter<R,S> getInstance(){
return new GrpcAccessLogFilter<>();
}

@Override
public void doProcessRequest(R request) {
this.startTime = System.currentTimeMillis();
}

@Override
public void doFilterRequest(ServerRequestParams serverRequestParams, Metadata headers) {
this.serverRequestParams = serverRequestParams;
}

@Override
public void doProcessResponseHeaders(Metadata responseHeaders) {}

@Override
public void doProcessResponse(S response) {
String size = null;
if (response != null){
size = String.valueOf(response.getSerializedSize());
}
StringBuilder sb = new StringBuilder()
.append(serverRequestParams.getClientIp()).append(" ")
.append(serverRequestParams.getMethodName()).append(" ")
.append(size).append(" ")
.append(System.currentTimeMillis()-startTime);
error(sb.toString());
}
}
1 change: 1 addition & 0 deletions examples/src/main/resources/hello_world_config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Grpc:
server.port: 50051
server.executorThreads : 4
enableAccessLogs: true

Dashboard:
service.port: 9999
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.flipkart.gjex.core.context.GJEXContext;
import com.flipkart.gjex.core.filter.Filter;
import com.flipkart.gjex.core.filter.GrpcAccessLogFilter;
import com.flipkart.gjex.core.filter.MethodFilters;
import com.flipkart.gjex.core.filter.ServerRequestParams;
import com.flipkart.gjex.core.logging.Logging;
Expand Down Expand Up @@ -45,7 +46,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -63,17 +63,18 @@ public class FilterInterceptor implements ServerInterceptor, Logging {
* Map of Filter instances mapped to Service and its method
*/
@SuppressWarnings("rawtypes")
private Map<String, List<Filter>> filtersMap = new HashMap<String, List<Filter>>();
private final Map<String, List<Filter>> filtersMap = new HashMap<>();

@SuppressWarnings("rawtypes")
public void registerFilters(List<Filter> filters, List<BindableService> services) {
public void registerFilters(List<Filter> filters, List<BindableService> services,
boolean enableAccessLogs) {
Map<Class<?>, Filter> classToInstanceMap = filters.stream()
.collect(Collectors.toMap(Object::getClass, Function.identity()));
services.forEach(service -> {
List<Pair<?, Method>> annotatedMethods = AnnotationUtils.getAnnotatedMethods(service.getClass(), MethodFilters.class);
if (annotatedMethods != null) {
annotatedMethods.forEach(pair -> {
List<Filter> filtersForMethod = new LinkedList<Filter>();
List<Filter> filtersForMethod = new LinkedList<>();
Arrays.asList(pair.getValue().getAnnotation(MethodFilters.class).value()).forEach(filterClass -> {
if (!classToInstanceMap.containsKey(filterClass)) {
throw new RuntimeException("Filter instance not bound for Filter class :" + filterClass.getName());
Expand All @@ -82,8 +83,12 @@ public void registerFilters(List<Filter> filters, List<BindableService> services
});
// Key is of the form <Service Name>+ "/" +<Method Name>
// reflecting the structure followed in the gRPC HandlerRegistry using MethodDescriptor#getFullMethodName()
filtersMap.put((service.bindService().getServiceDescriptor().getName() + "/" + pair.getValue().getName()).toLowerCase(),
filtersForMethod);
String methodSignature =
(service.bindService().getServiceDescriptor().getName() + "/" + pair.getValue().getName()).toLowerCase();
filtersMap.put(methodSignature, filtersForMethod);
if (enableAccessLogs){
filtersMap.get(methodSignature).add(new GrpcAccessLogFilter());
}
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public void registerResources(List<ResourceConfig> resourceConfigs) {
}

public void registerHttpFilters(List<HttpFilterParams> httpFilterParamsList,
boolean registerAccessLogFilter){
if (registerAccessLogFilter) {
boolean enableAccessLogs){
if (enableAccessLogs) {
context.addFilter(new FilterHolder(accessLogFilter), "/*" ,
EnumSet.of(DispatcherType.REQUEST));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import com.flipkart.gjex.grpc.interceptor.FilterInterceptor;
import com.flipkart.gjex.grpc.interceptor.StatusMetricInterceptor;
import com.flipkart.gjex.grpc.interceptor.TracingInterceptor;
import io.grpc.*;
import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.internal.GrpcUtil;
import io.grpc.protobuf.services.ProtoReflectionService;

Expand Down Expand Up @@ -107,8 +112,9 @@ public void doStop() {
info("GJEX GrpcServer stopped.");
}

public void registerFilters(@SuppressWarnings("rawtypes") List<Filter> filters, List<BindableService> services) {
this.filterInterceptor.registerFilters(filters, services);
public void registerFilters(@SuppressWarnings("rawtypes") List<Filter> filters,
List<BindableService> services, boolean enableAccessLogs) {
this.filterInterceptor.registerFilters(filters, services, enableAccessLogs);
}

public void registerTracingSamplers(List<TracingSampler> samplers, List<BindableService> services) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void run(T configuration, U configMap, Environment environment) {

// Add all Grpc Filters to the Grpc Server
filters = getInstances(baseInjector, Filter.class);
grpcServer.registerFilters(filters, bindableServices);
grpcServer.registerFilters(filters, bindableServices, configuration.getGrpc().isEnableAccessLogs());

// Add all Grpc Filters to the Grpc Server
tracingSamplers = getInstances(baseInjector, TracingSampler.class);
Expand Down

0 comments on commit 60c54a2

Please sign in to comment.