Skip to content

Commit

Permalink
New dependencies and updates
Browse files Browse the repository at this point in the history
  • Loading branch information
smoell committed Apr 23, 2018
1 parent 16beeb2 commit 2750746
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 46 deletions.
2 changes: 1 addition & 1 deletion services/kinesis-consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Kinesis Consumer
This AWS Lambda function consumes data from an Amazon Kinesis Stream and persists the data in an Amazon DynamoDB table.
This AWS Lambda function consumes data from an Amazon Kinesis Date Stream and persists the data in an Amazon DynamoDB table.

## General concept
In this Lambda function, the invocation code is separated from the business logic for better testing. The invocation code is triggered with a maximum number of 100 Kinesis events. Protobuf is used to reduce message size and saturate the Kinesis Stream. Unwrapping the data from Protobuf to Java objects is implemented in the invocation layer (`KinesisConsumerHandler`) of the Lambda function. The objects are passed to the business logic (`KinesisConsumer`) and used to persist data in a DynamoDB table. In order to reduce cold startup time, only few additional libraries are used as dependecies. In contract to the `redis-updater`-function, it is not necessary to access resources in private subnets, so it is sufficient to use the default networking environment of Lambda.
Expand Down
20 changes: 14 additions & 6 deletions services/kinesis-consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,39 @@
<groupId>com.amazon</groupId>
<artifactId>kinesis-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<version>1.1</version>
<name>kinesis-consumer</name>
<url>http://maven.apache.org</url>
<dependencies>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.11.163</version>
<version>1.11.311</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.3.0</version>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.9.0</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
</dependency>

<dependency>
Expand All @@ -54,10 +60,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
import com.google.protobuf.InvalidProtocolBufferException;

import java.nio.ByteBuffer;

public class KinesisConsumerHandler {
public class KinesisConsumerHandler implements RequestHandler<KinesisEvent, Void> {

private DynamoDB dynamoDB;

public void handleRequest(KinesisEvent kinesisEvent, Context context) {
@Override
public Void handleRequest(KinesisEvent kinesisEvent, Context context) {

LambdaLogger logger = context.getLogger();

Expand All @@ -54,7 +56,7 @@ public void handleRequest(KinesisEvent kinesisEvent, Context context) {
String tableName = System.getenv(Constants.TABLE_NAME);
logger.log("Received " + kinesisEvent.getRecords().size() + " raw Event Records.");

for (KinesisEvent.KinesisEventRecord eventRecord : kinesisEvent.getRecords()) {
for (KinesisEventRecord eventRecord : kinesisEvent.getRecords()) {
// Unwrap protobuf
try {
ByteBuffer buffer = eventRecord.getKinesis().getData();
Expand All @@ -81,7 +83,7 @@ public void handleRequest(KinesisEvent kinesisEvent, Context context) {
}
}

return;
return null;
}

private AmazonDynamoDB createDynamodbClient(final Region region) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.*;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
Expand Down Expand Up @@ -78,5 +79,10 @@ public void writeToDynamoDb() {
LOGGER.info(item);
Assert.assertNotNull(item);
Assert.assertEquals(item.getString("program_id"), trackingMessage.getProgramId());

DeleteItemSpec deleteItemSpec = new DeleteItemSpec()
.withPrimaryKey(new PrimaryKey("id", id));
DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec);
LOGGER.info("DeleteItemOutcome: " + deleteItemOutcome);
}
}
2 changes: 1 addition & 1 deletion services/redis-updater/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Redis Updater
This AWS Lambda function consumes data from an Amazon Kinesis Stream, persists the data in Redis, and notifies subscribers of data changes.
This AWS Lambda function consumes data from an Amazon Kinesis Data Stream, persists the data in Redis, and notifies subscribers of data changes.

## General concept
In this Lambda function, the invocation code is separated from the business logic for better testing. The invocation code is triggered with a maximum of 100 Kinesis events. JSON is used as data format to send data using Kinesis. Unwrapping and conversion of the data from JSON to a POJO is implemented in the invocation layer (`RedisUpdaterHandler`) of the Lambda function. The objects are passed to the business logic (`RedisUpdater`) and used to persist data in Redis and to notify subscribers of data changes. In order to reduce cold startup time, only a few additional libraries are used a dependencies. For this Lambda-function, it is necessary to access the Amazon ElastiCache cluster in private subnets inside a VPC. In this case, it is necessary to provide additional VPC-specific configuration information that includes VPC subnet IDs and security group IDs. AWS Lambda uses this information to set up elastic network interfaces (ENIs) that enables the function to connect securely to other resources within the private VPC.
Expand Down
16 changes: 12 additions & 4 deletions services/redis-updater/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.amazon</groupId>
<artifactId>redis-updater</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<version>1.1</version>
<name>redis-updater</name>
<url>http://maven.apache.org</url>

Expand All @@ -18,13 +18,19 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.3.0</version>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.9.0</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -56,10 +62,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import static com.amazon.util.Constants.REDIS_HOST;
import static com.amazon.util.Constants.REDIS_PORT;

public class RedisUpdaterHandler {
public class RedisUpdaterHandler implements RequestHandler<KinesisEvent, Void> {

private Jedis jedis;
private Charset charset = Charset.forName("UTF-8");
private ObjectMapper mapper = new ObjectMapper();

public void handleRequest(KinesisEvent kinesisEvent, Context context) {
@Override
public Void handleRequest(KinesisEvent kinesisEvent, Context context) {

RedisUpdater redisUpdater = new RedisUpdater();
LambdaLogger logger = context.getLogger();
Expand All @@ -49,7 +50,7 @@ public void handleRequest(KinesisEvent kinesisEvent, Context context) {

if (System.getenv(REDIS_HOST) == null) {
logger.log("Not Redis host specified");
return;
return null;
}
String redisHost = System.getenv(REDIS_HOST);
int redisPort = System.getenv(REDIS_PORT) == null ? 6379 : Integer.parseInt(System.getenv(REDIS_PORT));
Expand All @@ -76,7 +77,7 @@ public void handleRequest(KinesisEvent kinesisEvent, Context context) {
}
}

return;
return null;
}


Expand Down
16 changes: 16 additions & 0 deletions services/tracking-service/reactive-vertx/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
.git
Dockerfile
.DS_Store
.gitignore
README.md
pom.xml
src/*
.idea/*
.ipynb_checkpoints/*
.vertx/*
target/classes/*
target/generated-sources/*
target/maven-archiver/*
target/maven-status/*
maven-verticle.iml
reactive-vertx.iml
13 changes: 7 additions & 6 deletions services/tracking-service/reactive-vertx/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ FROM alpine:3.7

# Java Version and other ENV
ENV JAVA_VERSION_MAJOR=8 \
JAVA_VERSION_MINOR=162 \
JAVA_VERSION_BUILD=12 \
JAVA_VERSION_MINOR=171 \
JAVA_VERSION_BUILD=11 \
JAVA_PACKAGE=jdk \
HOTSWAP_AGENT_VERSION=1.2.0 \
JAVA_JCE=unlimited \
Expand All @@ -29,7 +29,7 @@ RUN set -ex && \
curl -L -o /tmp/dcevm/DCEVM-light-8u112-installer.jar "https://github.com/dcevm/dcevm/releases/download/light-jdk8u112%2B8/DCEVM-light-8u112-installer.jar" && \
mkdir /opt && \
curl -jksSLH "Cookie: oraclelicense=accept-securebackup-cookie" -o /tmp/java.tar.gz \
http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/0da788060d494f5095bf8624735fa2f1/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-linux-x64.tar.gz && \
http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/512cd62ec5174c3487ac17c61aaa89e8/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-linux-x64.tar.gz && \
JAVA_PACKAGE_SHA256=$(curl -sSL https://www.oracle.com/webfolder/s/digest/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}checksum.html | grep -E "${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-linux-x64\.tar\.gz" | grep -Eo '(sha256: )[^<]+' | cut -d: -f2 | xargs) && \
echo "${JAVA_PACKAGE_SHA256} /tmp/java.tar.gz" > /tmp/java.tar.gz.sha256 && \
sha256sum -c /tmp/java.tar.gz.sha256 && \
Expand All @@ -50,7 +50,7 @@ RUN set -ex && \
cp -v /tmp/UnlimitedJCEPolicyJDK8/*.jar /opt/jdk/jre/lib/security/; \
fi && \
sed -i s/#networkaddress.cache.ttl=-1/networkaddress.cache.ttl=10/ $JAVA_HOME/jre/lib/security/java.security && \
apk del curl glibc-i18n && \
apk del glibc-i18n && \
rm -rf /opt/jdk/*src.zip \
/opt/jdk/lib/missioncontrol \
/opt/jdk/lib/visualvm \
Expand Down Expand Up @@ -96,7 +96,8 @@ ENV JAVA_HOME /opt/jdk/
ADD target/reactive-vertx-1.1-fat.jar srv/vertx/

CMD ["java", "-server", "-XX:+DoEscapeAnalysis", "-XX:+UseStringDeduplication", \
"-XX:+UseCompressedOops", "-Xms100M", "-Xmx200M", "-XX:+UseG1GC", "-jar", \
"srv/vertx/reactive-vertx-1.1-fat.jar"]
"-XX:+UseCompressedOops", "-Xms100M", "-Xmx200M", "-XX:+UseG1GC", \
"-Dcom.amazonaws.sdk.disableCbor", \
"-jar", "srv/vertx/reactive-vertx-1.1-fat.jar"]

EXPOSE 8080
6 changes: 3 additions & 3 deletions services/tracking-service/reactive-vertx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.192</version>
<version>1.11.311</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
<version>24.1-jre</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;

public class BootStrapVerticle extends AbstractVerticle {

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
Expand Down Expand Up @@ -146,13 +145,19 @@ private AmazonKinesisAsync createClient() {
AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain();

// Configuring Kinesis-client with configuration
Region myRegion = Regions.getCurrentRegion();
LOGGER.debug("Using Region " + myRegion);
String myRegion = System.getenv("REGION");

if (null == myRegion || myRegion.trim().length() == 0) {
myRegion = Regions.US_EAST_1.getName();
LOGGER.info("Using default region");
}

LOGGER.info("Deploying in Region " + myRegion);

AmazonKinesisAsync kinesisClient = AmazonKinesisAsyncClientBuilder.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(awsCredentialsProvider)
.withRegion(myRegion.getName())
.withRegion(myRegion)
.build();

return kinesisClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void start() {
String envRedisHost = System.getenv(REDIS_HOST);
String envRedisPort = System.getenv(REDIS_PORT);

String redisHost = envRedisHost == null ? "localhost" : envRedisHost;
String redisHost = envRedisHost == null ? "some-redis" : envRedisHost;
int redisPort = envRedisPort == null ? 6379 : Integer.parseInt(envRedisPort);

LOGGER.info("--> Using Redis Host " + redisHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void after(TestContext context) {
public void writeFromRedisTest() {
LOGGER.info(" ---> Testcase: writeFromRedisTest");

String message = Json.encode(prepareData());
JsonObject message = JsonObject.mapFrom(prepareData());
eb.send(Constants.CACHE_REDIS_EVENTBUS_ADDRESS, message, res -> {
if (res.succeeded()) {
Object body = res.result().body();
Expand All @@ -88,13 +89,14 @@ public void readFromCacheTest() {
LOGGER.info(" ---> Testcase: readFromCacheTest");

TrackingMessage testMessage = prepareData();
String message = Json.encode(testMessage);
JsonObject message = JsonObject.mapFrom(testMessage);
eb.send(Constants.CACHE_EVENTBUS_ADDRESS, message, res -> {
if (res.succeeded()) {
Object body = res.result().body();
JsonObject body = (JsonObject)res.result().body();
LOGGER.info("Received result " + body + " -> " + body.getClass().getName());
Assert.assertNotNull(body);
TrackingMessage resultMessage = Json.decodeValue((String)body, TrackingMessage.class);
LOGGER.info(body.getClass().getName());
TrackingMessage resultMessage = Json.decodeValue(body.encode(), TrackingMessage.class);

Assert.assertEquals(testMessage.getProgramId(), resultMessage.getProgramId());

Expand Down
Loading

0 comments on commit 2750746

Please sign in to comment.