Skip to content

Commit

Permalink
Add Flink sample that outputs to stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
viren-nadkarni authored Dec 9, 2024
2 parents a64439e + 13dbede commit 88bba74
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 2 deletions.
10 changes: 10 additions & 0 deletions java/Printer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Printer

* Flink version: 1.20
* Flink API: DataStream API
* Language Java (11)

The Flink application uses a synthetic source to generate ticker records,
parses them, and just prints the results.

It is based on the S3Sink application in this repo.
184 changes: 184 additions & 0 deletions java/Printer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>msaf-printer</artifactId>
<version>1.0-SNAPSHOT</version>

<distributionManagement>
<repository>
<id>github</id>
<name>GitHub Packages</name>
<url>https://maven.pkg.github.com/localstack-samples/amazon-managed-service-for-apache-flink-examples</url>
</repository>
</distributionManagement>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}</jar.finalName>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<flink.connector.version>5.0.0-1.20</flink.connector.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.12.677</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink connectors -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

<build>
<directory>${buildDirectory}</directory>
<finalName>${jar.finalName}</finalName>

<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
<forceJavacCompilerUse>true</forceJavacCompilerUse>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.amazonaws.services.msf;

import java.sql.Timestamp;

public class StockPrice {
private Timestamp eventTime;
private String ticker;
private Double price;


public StockPrice() {
}

public StockPrice(Timestamp eventTime, String ticker, Double price) {
this.eventTime = eventTime;
this.ticker = ticker;
this.price = price;
}

public Timestamp getEventTime() {
return eventTime;
}

public void setEventTime(Timestamp eventTime) {
this.eventTime = eventTime;
}

public String getTicker() {
return ticker;
}

public void setTicker(String ticker) {
this.ticker = ticker;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}

@Override
public String toString() {
return "StockPrice{" +
"eventTime=" + eventTime +
", ticker='" + ticker + '\'' +
", price=" + price +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.amazonaws.services.msf;

import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.connector.datagen.source.GeneratorFunction;

import java.sql.Timestamp;
import java.time.Instant;

public class StockPriceGeneratorFunction implements GeneratorFunction<Long, StockPrice> {
private static final String[] TICKERS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"};

@Override
public StockPrice map(Long aLong) {
return new StockPrice(
new Timestamp(Instant.now().toEpochMilli()),
TICKERS[RandomUtils.nextInt(0, TICKERS.length)],
RandomUtils.nextDouble(10,100)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.amazonaws.services.msf;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

private static final Logger LOGGER = LogManager.getLogger(StreamingJob.class);

// Create ObjectMapper instance to serialise POJOs into JSONs
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static boolean isLocal(StreamExecutionEnvironment env) {
return env instanceof LocalStreamEnvironment;
}

private static DataGeneratorSource<StockPrice> getStockPriceDataGeneratorSource() {
long recordPerSecond = 100;
return new DataGeneratorSource<>(
new StockPriceGeneratorFunction(),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(recordPerSecond),
TypeInformation.of(StockPrice.class));
}

public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// Local dev specific settings
if (isLocal(env)) {
// Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS
env.enableCheckpointing(60000);
env.setParallelism(2);
}

// Source
DataGeneratorSource<StockPrice> source = getStockPriceDataGeneratorSource();

// DataStream from Source
DataStream<StockPrice> kinesis = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "data-generator").setParallelism(1);

// Print
kinesis.print();

env.execute("Flink Demo Printer Job");
}
}
12 changes: 12 additions & 0 deletions java/Printer/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n

#logger.verbose.name = com.amazonaws.services.msf
#logger.verbose.level = debug
#logger.verbose.additivity = false
#logger.verbose.appenderRef.console.ref = ConsoleAppender

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
12 changes: 10 additions & 2 deletions java/S3Sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@

<groupId>com.amazonaws</groupId>
<artifactId>flink-kds-s3</artifactId>
<version>1.0</version>
<version>1.0-SNAPSHOT</version>

<distributionManagement>
<repository>
<id>github</id>
<name>GitHub Packages</name>
<url>https://maven.pkg.github.com/localstack-samples/amazon-managed-service-for-apache-flink-examples</url>
</repository>
</distributionManagement>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -173,4 +181,4 @@
</plugins>
</build>

</project>
</project>

0 comments on commit 88bba74

Please sign in to comment.