Skip to content

Commit

Permalink
Merge pull request #47 from reactiverse/feat/configurable-proxy
Browse files Browse the repository at this point in the history
feat/configurable-proxy
  • Loading branch information
aesteve authored Aug 30, 2020
2 parents 263e075 + 906501c commit c42b50d
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 16 deletions.
17 changes: 17 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
root = true

[*]
charset = utf-8
indent_style = space
indent_size = 2
trim_trailing_whitespace = true
end_of_line = lf
insert_final_newline = true

[Makefile]
indent_style = tab

[**/examples/**.java]
# 84 looks like a odd number, however
# it accounts for 4 spaces (class and example method indentation)
max_line_length = 84
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Artifacts are published [here](https://search.maven.org/search?q=a:vertx-aws-sdk

| Project | Vert.x | AWS sdk |
| ------- | ------ | ------- |
| 0.6.0 | 3.9.2 | 2.14.7 |
| 0.5.1 | 3.9.2 | 2.13.6 |
| 0.5.0 | 3.9.0 | 2.12.0 |
| 0.4.0 | 3.8.3 | 2.10.16 |
Expand Down
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
val vertxVersion = "3.9.2"
val awsSdkVersion = "2.13.6"
val awsSdkVersion = "2.14.7"
val junit5Version = "5.4.0"
val logbackVersion = "1.2.3"
val integrationOption = "tests.integration"

group = "io.reactiverse"
version = "0.5.1"
version = "0.6.0"

plugins {
`java-library`
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/reactiverse/awssdk/VertxNioAsyncHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,36 @@
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static java.util.Objects.requireNonNull;

public class VertxNioAsyncHttpClient implements SdkAsyncHttpClient {

private final Context context;
private final HttpClient client;
private final HttpClientOptions clientOptions;

private static final HttpClientOptions DEFAULT_CLIENT_OPTIONS = new HttpClientOptions()
.setSsl(true)
.setKeepAlive(true);

public VertxNioAsyncHttpClient(Context context) {
this.context = context;
this.client = createVertxHttpClient(context.owner());
this.context = context;
this.clientOptions = DEFAULT_CLIENT_OPTIONS;
this.client = createVertxHttpClient(context.owner());
}

public VertxNioAsyncHttpClient(Context context, HttpClientOptions clientOptions) {
requireNonNull(clientOptions);
this.context = context;
this.clientOptions = clientOptions;
this.client = createVertxHttpClient(context.owner());
}

private static HttpClient createVertxHttpClient(Vertx vertx) {
HttpClientOptions options = new HttpClientOptions()
.setSsl(true)
.setKeepAlive(true);
return vertx.createHttpClient(options);
private HttpClient createVertxHttpClient(Vertx vertx) {
return vertx.createHttpClient(clientOptions);
}

@Override
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/io/reactiverse/awssdk/VertxSdkClient.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package io.reactiverse.awssdk;

import io.vertx.core.Context;
import io.vertx.core.http.HttpClientOptions;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.core.SdkClient;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;

public interface VertxSdkClient {

static<C extends SdkClient, B extends AwsAsyncClientBuilder<B, C>> B withVertx(B builder, Context context) {
return builder
.httpClient(new VertxNioAsyncHttpClient(context))
.asyncConfiguration(conf ->
conf.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, new VertxExecutor(context))
);
}
static<C extends SdkClient, B extends AwsAsyncClientBuilder<B, C>> B withVertx(B builder, Context context) {
return builder
.httpClient(new VertxNioAsyncHttpClient(context))
.asyncConfiguration(conf ->
conf.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, new VertxExecutor(context))
);
}

static<C extends SdkClient, B extends AwsAsyncClientBuilder<B, C>> B withVertx(B builder, HttpClientOptions clientOptions, Context context) {
return builder
.httpClient(new VertxNioAsyncHttpClient(context, clientOptions))
.asyncConfiguration(conf ->
conf.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, new VertxExecutor(context))
);
}

}
96 changes: 96 additions & 0 deletions src/test/java/io/reactiverse/awssdk/ProxyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.reactiverse.awssdk;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.net.ProxyOptions;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;

import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(VertxExtension.class)
public class ProxyTest {

private Vertx vertx;
private HttpServer proxyServer;

private static final int PROXY_PORT = 8000;
private static final String PROXY_HOST = "localhost";

private final AtomicInteger proxyAccess = new AtomicInteger(0);
private static final AwsCredentialsProvider credentialsProvider = () -> new AwsCredentials() {
@Override
public String accessKeyId() {
return "a";
}

@Override
public String secretAccessKey() {
return "a";
}
};

@BeforeEach
public void setUp(VertxTestContext ctx) {
vertx = Vertx.vertx();
proxyServer = vertx.createHttpServer();
proxyServer.requestHandler(req -> {
proxyAccess.incrementAndGet();
req.response().end();
});
proxyServer.listen(PROXY_PORT, PROXY_HOST, res -> {
assertTrue(res.succeeded());
ctx.completeNow();
});
}

@AfterEach
public void tearDown(VertxTestContext ctx) {
if (proxyServer == null) {
return;
}
proxyAccess.set(0);
proxyServer.close(res -> {
assertTrue(res.succeeded());
ctx.completeNow();
});
}

@Test
@Timeout(value = 15, timeUnit = TimeUnit.SECONDS)
public void testGetThroughProxy(VertxTestContext ctx) throws Exception {
final KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder()
.region(Region.EU_WEST_1)
.endpointOverride(new URI("http://localhost:1111")) // something that just doesn't exist, the only thing that matters is that every request has traveled through proxy
.credentialsProvider(credentialsProvider);
HttpClientOptions throughProxyOptions = new HttpClientOptions().setProxyOptions(new ProxyOptions().setHost(PROXY_HOST).setPort(PROXY_PORT));
KinesisAsyncClient kinesis = VertxSdkClient.withVertx(builder, throughProxyOptions, vertx.getOrCreateContext()).build();
assertEquals(proxyAccess.get(), 0, "Proxy access count should have been reset");
kinesis
.listStreams()
.handle((res, err) -> {
assertTrue(proxyAccess.get() > 0, "Requests should have been transferred through proxy");
ctx.completeNow();
return null;
});
}

}

0 comments on commit c42b50d

Please sign in to comment.