Skip to content

Commit

Permalink
Add gRPC authenticator for exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
saxocellphone committed Dec 13, 2024
1 parent e3cfede commit 05f6085
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat
field.setAccessible(true);
Object value = field.get(builder);
if (value instanceof GrpcExporterBuilder) {
throw new IllegalArgumentException("GrpcExporterBuilder not supported yet.");
((GrpcExporterBuilder<?>) value).setAuthenticator(authenticator);
} else if (value instanceof HttpExporterBuilder) {
((HttpExporterBuilder<?>) value).setAuthenticator(authenticator);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
private TlsConfigHelper tlsConfigHelper = new TlsConfigHelper();
@Nullable private RetryPolicy retryPolicy = RetryPolicy.getDefault();
private Supplier<MeterProvider> meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider;
@Nullable private Authenticator authenticator;

// Use Object type since gRPC may not be on the classpath.
@Nullable private Object grpcChannel;
Expand Down Expand Up @@ -147,6 +149,11 @@ public GrpcExporterBuilder<T> setMeterProvider(Supplier<MeterProvider> meterProv
return this;
}

public GrpcExporterBuilder<T> setAuthenticator(Authenticator authenticator) {
this.authenticator = authenticator;
return this;
}

@SuppressWarnings("BuilderReturnThis")
public GrpcExporterBuilder<T> copy() {
GrpcExporterBuilder<T> copy =
Expand Down Expand Up @@ -209,7 +216,8 @@ public GrpcExporter<T> build() {
grpcStubFactory,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager());
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
authenticator);
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.grpc;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -40,5 +41,6 @@ <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager);
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,16 +33,22 @@ void getHeaders() {

@Test
void setAuthenticatorOnDelegate_Success() {
HttpExporterBuilder<?> builder =
// For HTTP exporter
HttpExporterBuilder<?> httpBuilder =
new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test");

assertThat(builder).extracting("authenticator").isNull();

assertThat(httpBuilder).extracting("authenticator").isNull();
Authenticator authenticator = Collections::emptyMap;
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(httpBuilder), authenticator);
assertThat(httpBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);

Authenticator.setAuthenticatorOnDelegate(new WithDelegate(builder), authenticator);

assertThat(builder)
// For GRPC exporter
GrpcExporterBuilder<?> grpcBuilder =
new GrpcExporterBuilder<>("otlp", "test", 60, URI.create("test"), null, "/test");
assertThat(grpcBuilder).extracting("authenticator").isNull();
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(grpcBuilder), authenticator);
assertThat(grpcBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void setUp() {
Collections::emptyMap,
null,
null,
null,
null),
MeterProvider::noop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

package io.opentelemetry.exporter.sender.grpc.managedchannel.internal;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.CompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand All @@ -21,6 +26,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -47,7 +53,8 @@ public <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
Expand Down Expand Up @@ -83,11 +90,29 @@ public OutputStream compress(OutputStream os) throws IOException {
compression = compressor.getEncoding();
}

CallCredentials cred =
new CallCredentials() {
@Override
public void applyRequestMetadata(
RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) {
Metadata headers = new Metadata();
if (authenticator != null) {
// For each header provided in the authenticator, put it in the header of the
// Metadata.
for (Map.Entry<String, String> e : authenticator.getHeaders().entrySet()) {
headers.put(Metadata.Key.of(e.getKey(), ASCII_STRING_MARSHALLER), e.getValue());
}

Check warning on line 104 in exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java#L103-L104

Added lines #L103 - L104 were not covered by tests
}
metadataApplier.apply(headers);
}
};

MarshalerServiceStub<T, ?, ?> stub =
stubFactory
.get()
.apply((Channel) managedChannel, authorityOverride)
.withCompression(compression);
.withCompression(compression)
.withCallCredentials(cred);

return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.opentelemetry.api.internal.InstrumentationUtil;
import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
Expand Down Expand Up @@ -80,7 +82,8 @@ public OkHttpGrpcSender(
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
.dispatcher(OkHttpUtil.newDispatcher())
Expand All @@ -90,7 +93,6 @@ public OkHttpGrpcSender(
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
}

boolean isPlainHttp = endpoint.startsWith("http://");
if (isPlainHttp) {
clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
Expand All @@ -102,6 +104,20 @@ public OkHttpGrpcSender(
}
}

if (authenticator != null) {
Map<String, List<String>> headers = headersSupplier.get();

Check warning on line 108 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L108

Added line #L108 was not covered by tests
// Convert the auth header type of Map<String, String> to the expected type of
// OkHttpGrpcSender of Map<String, List<String>>
Map<String, List<String>> authHeaders =
authenticator.getHeaders().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> Collections.singletonList(e.getValue())));

Check warning on line 115 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L111-L115

Added lines #L111 - L115 were not covered by tests
// The authenticator headers will override the default headers if there are duplicate keys.
headers.putAll(authHeaders);
headersSupplier = () -> headers;

Check warning on line 118 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L117-L118

Added lines #L117 - L118 were not covered by tests
}

this.client = clientBuilder.build();
this.headersSupplier = headersSupplier;
this.url = HttpUrl.get(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.sender.okhttp.internal;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand Down Expand Up @@ -41,7 +42,8 @@ public <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
return new OkHttpGrpcSender<>(
endpoint.resolve(endpointPath).toString(),
compressor,
Expand All @@ -50,6 +52,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
headersSupplier,
retryPolicy,
sslContext,
trustManager);
trustManager,
authenticator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable
@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {
Expand Down

0 comments on commit 05f6085

Please sign in to comment.