Skip to content

Commit

Permalink
Add gRPC authenticator for exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
saxocellphone committed Dec 17, 2024
1 parent e3cfede commit 8713eed
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat
field.setAccessible(true);
Object value = field.get(builder);
if (value instanceof GrpcExporterBuilder) {
throw new IllegalArgumentException("GrpcExporterBuilder not supported yet.");
((GrpcExporterBuilder<?>) value).setAuthenticator(authenticator);
} else if (value instanceof HttpExporterBuilder) {
((HttpExporterBuilder<?>) value).setAuthenticator(authenticator);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
private TlsConfigHelper tlsConfigHelper = new TlsConfigHelper();
@Nullable private RetryPolicy retryPolicy = RetryPolicy.getDefault();
private Supplier<MeterProvider> meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider;
@Nullable private Authenticator authenticator;

// Use Object type since gRPC may not be on the classpath.
@Nullable private Object grpcChannel;
Expand Down Expand Up @@ -147,6 +149,11 @@ public GrpcExporterBuilder<T> setMeterProvider(Supplier<MeterProvider> meterProv
return this;
}

public GrpcExporterBuilder<T> setAuthenticator(Authenticator authenticator) {
this.authenticator = authenticator;
return this;
}

@SuppressWarnings("BuilderReturnThis")
public GrpcExporterBuilder<T> copy() {
GrpcExporterBuilder<T> copy =
Expand Down Expand Up @@ -209,7 +216,8 @@ public GrpcExporter<T> build() {
grpcStubFactory,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager());
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
authenticator);
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.grpc;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -40,5 +41,6 @@ <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager);
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@
{
"name":"io.opentelemetry.exporter.internal.compression.Compressor",
"queryAllDeclaredMethods":true
},
{
"name":"io.opentelemetry.exporter.internal.auth.Authenticator",
"queryAllDeclaredMethods":true
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,16 +33,22 @@ void getHeaders() {

@Test
void setAuthenticatorOnDelegate_Success() {
HttpExporterBuilder<?> builder =
// For HTTP exporter
HttpExporterBuilder<?> httpBuilder =
new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test");

assertThat(builder).extracting("authenticator").isNull();

assertThat(httpBuilder).extracting("authenticator").isNull();
Authenticator authenticator = Collections::emptyMap;
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(httpBuilder), authenticator);
assertThat(httpBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);

Authenticator.setAuthenticatorOnDelegate(new WithDelegate(builder), authenticator);

assertThat(builder)
// For GRPC exporter
GrpcExporterBuilder<?> grpcBuilder =
new GrpcExporterBuilder<>("otlp", "test", 60, URI.create("test"), null, "/test");
assertThat(grpcBuilder).extracting("authenticator").isNull();
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(grpcBuilder), authenticator);
assertThat(grpcBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void setUp() {
Collections::emptyMap,
null,
null,
null,
null),
MeterProvider::noop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

package io.opentelemetry.exporter.sender.grpc.managedchannel.internal;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.CompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand All @@ -21,6 +26,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -47,7 +53,8 @@ public <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
Expand Down Expand Up @@ -83,11 +90,29 @@ public OutputStream compress(OutputStream os) throws IOException {
compression = compressor.getEncoding();
}

CallCredentials cred =
new CallCredentials() {
@Override
public void applyRequestMetadata(
RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) {
Metadata headers = new Metadata();
if (authenticator != null) {
// For each header provided in the authenticator, put it in the header of the
// Metadata.
for (Map.Entry<String, String> e : authenticator.getHeaders().entrySet()) {
headers.put(Metadata.Key.of(e.getKey(), ASCII_STRING_MARSHALLER), e.getValue());
}

Check warning on line 104 in exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java#L103-L104

Added lines #L103 - L104 were not covered by tests
}
metadataApplier.apply(headers);
}
};

MarshalerServiceStub<T, ?, ?> stub =
stubFactory
.get()
.apply((Channel) managedChannel, authorityOverride)
.withCompression(compression);
.withCompression(compression)
.withCallCredentials(cred);

return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.opentelemetry.api.internal.InstrumentationUtil;
import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
Expand Down Expand Up @@ -80,7 +82,8 @@ public OkHttpGrpcSender(
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
.dispatcher(OkHttpUtil.newDispatcher())
Expand All @@ -90,7 +93,6 @@ public OkHttpGrpcSender(
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
}

boolean isPlainHttp = endpoint.startsWith("http://");
if (isPlainHttp) {
clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
Expand All @@ -102,6 +104,20 @@ public OkHttpGrpcSender(
}
}

if (authenticator != null) {
Map<String, List<String>> headers = headersSupplier.get();

Check warning on line 108 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L108

Added line #L108 was not covered by tests
// Convert the auth header type of Map<String, String> to the expected type of
// OkHttpGrpcSender of Map<String, List<String>>
Map<String, List<String>> authHeaders =
authenticator.getHeaders().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> Collections.singletonList(e.getValue())));

Check warning on line 115 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L111-L115

Added lines #L111 - L115 were not covered by tests
// The authenticator headers will override the default headers if there are duplicate keys.
headers.putAll(authHeaders);
headersSupplier = () -> headers;

Check warning on line 118 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L117-L118

Added lines #L117 - L118 were not covered by tests
}

this.client = clientBuilder.build();
this.headersSupplier = headersSupplier;
this.url = HttpUrl.get(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.sender.okhttp.internal;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand Down Expand Up @@ -41,7 +42,8 @@ public <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
return new OkHttpGrpcSender<>(
endpoint.resolve(endpointPath).toString(),
compressor,
Expand All @@ -50,6 +52,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
headersSupplier,
retryPolicy,
sslContext,
trustManager);
trustManager,
authenticator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable
@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {
Expand Down

0 comments on commit 8713eed

Please sign in to comment.