Skip to content

Commit

Permalink
Merge pull request #47 from alibaba/branch-0.2.0
Browse files Browse the repository at this point in the history
Branch 0.2.0
  • Loading branch information
wuchaochen authored Apr 2, 2020
2 parents 11dea7c + 75d176e commit 4da60fb
Show file tree
Hide file tree
Showing 18 changed files with 65 additions and 56 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ This project combines TensorFlow with Flink and provides users with more conveni
**Currently, Flink job code uses java language and the algorithm code uses python language.**

## Support Version
TensorFlow: 1.11.0
TensorFlow: 1.13.1

Flink: 1.8.0
Flink: 1.10.0

## Quick Start

Expand Down Expand Up @@ -112,7 +112,7 @@ export PATH=${PATH}:${MAVEN_HOME}/bin
```

### Build From Source
**Compiling source code depends on tensorflow 1.11.0. Compiling commands will automatically install tensorflow 1.11.0**
**Compiling source code depends on tensorflow 1.13.1. Compiling commands will automatically install tensorflow 1.13.1**

```shell
mvn -DskipTests=true clean install
Expand Down Expand Up @@ -412,7 +412,7 @@ sh start_cluster.sh
* **Copy virtual environment package to hdfs**

```shell
docker exec flink-jm /opt/hadoop-2.7.0/bin/hadoop fs -put -f /opt/work_home/temp/test/tfenv.zip /user/root/tfenv.zip
docker exec flink-jm /opt/hadoop-2.8.0/bin/hadoop fs -put -f /opt/work_home/temp/test/tfenv.zip /user/root/tfenv.zip
```

* **Download mnist data**
Expand All @@ -434,7 +434,7 @@ zip -r ${projectRoot}/flink-ml-examples/target/code.zip code

* **Put code package to hdfs**
```shell
docker exec flink-jm /opt/hadoop-2.7.0/bin/hadoop fs -put -f /opt/work_home/flink-ml-examples/target/code.zip hdfs://minidfs:9000/user/root/
docker exec flink-jm /opt/hadoop-2.8.0/bin/hadoop fs -put -f /opt/work_home/flink-ml-examples/target/code.zip hdfs://minidfs:9000/user/root/
```

### Submit train job
Expand Down
25 changes: 12 additions & 13 deletions docker/flink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,24 @@ WORKDIR /opt
# RUN cp opt/flink-table_2.11-1.6.0.jar lib/

# set up hadoop
# RUN cd /opt; wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz; tar xf hadoop-2.7.0.tar.gz; rm -f hadoop-2.7.0.tar.gz
RUN wget http://etaose.oss-cn-hangzhou-zmf.aliyuncs.com/test/flink/package/hadoop-2.7.0.tar.gz; tar xf hadoop-2.7.0.tar.gz; rm -f hadoop-2.7.0.tar.gz
ENV HADOOP_HDFS_HOME /opt/hadoop-2.7.0
ENV HADOOP_HOME /opt/hadoop-2.7.0
COPY core-site.xml /opt/hadoop-2.7.0/etc/hadoop/

# install flink 1.8
RUN rm -rf /opt/flink; wget http://etaose.oss-cn-hangzhou-zmf.aliyuncs.com/test/flink/package/flink-1.8.0-bin-scala_2.11.tgz
RUN tar xf flink-1.8.0-bin-scala_2.11.tgz
RUN mv flink-1.8.0 flink
RUN rm -f flink-1.8.0-bin-scala_2.11.tgz
RUN wget https://flink-ai-extended.oss-cn-beijing.aliyuncs.com/hadoop-2.8.0.tar.gz; tar xf hadoop-2.8.0.tar.gz; rm -f hadoop-2.8.0.tar.gz
ENV HADOOP_HDFS_HOME /opt/hadoop-2.8.0
ENV HADOOP_HOME /opt/hadoop-2.8.0
COPY core-site.xml /opt/hadoop-2.8.0/etc/hadoop/

# install flink 1.10
RUN rm -rf /opt/flink; wget https://flink-ai-extended.oss-cn-beijing.aliyuncs.com/flink-1.10.0-bin-scala_2.11.tgz
RUN tar xf flink-1.10.0-bin-scala_2.11.tgz
RUN mv flink-1.10.0 flink
RUN rm -f flink-1.10.0-bin-scala_2.11.tgz

#RUN rm -rf /opt/flink; wget http://etaose.oss-cn-hangzhou-zmf.aliyuncs.com/test/flink/package/flink-1.8-SNAPSHOT.tgz
#RUN tar xf flink-1.8-SNAPSHOT.tgz
#RUN mv flink-1.8-SNAPSHOT flink
#RUN rm -f flink-1.8-SNAPSHOT.tgz
RUN cp flink/opt/* flink/lib/
RUN cp -r flink/opt/* flink/lib/

ENV HADOOP_CLASSPATH=/opt/hadoop-2.7.0/etc/hadoop:/opt/hadoop-2.7.0/share/hadoop/common/lib/*:/opt/hadoop-2.7.0/share/hadoop/common/*:/opt/hadoop-2.7.0/share/hadoop/hdfs:/opt/hadoop-2.7.0/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.0/share/hadoop/hdfs/*:/opt/hadoop-2.7.0/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.0/share/hadoop/yarn/*:/opt/hadoop-2.7.0/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.7.0/share/hadoop/mapreduce/*:/opt/hadoop-2.7.0/share/hadoop/common/:/opt/hadoop-2.7.0/share/hadoop/common/lib/:/opt/hadoop-2.7.0/share/hadoop/common/lib/:/opt/hadoop-2.7.0/share/hadoop/hdfs/:/opt/hadoop-2.7.0/share/hadoop/hdfs/lib/:/opt/hadoop-2.7.0/share/hadoop/yarn/:/opt/hadoop-2.7.0/share/hadoop/yarn/lib/:/opt/hadoop-2.7.0/contrib/capacity-scheduler/*.jar
ENV HADOOP_CLASSPATH=/opt/hadoop-2.8.0/etc/hadoop:/opt/hadoop-2.8.0/share/hadoop/common/lib/*:/opt/hadoop-2.8.0/share/hadoop/common/*:/opt/hadoop-2.8.0/share/hadoop/hdfs:/opt/hadoop-2.8.0/share/hadoop/hdfs/lib/*:/opt/hadoop-2.8.0/share/hadoop/hdfs/*:/opt/hadoop-2.8.0/share/hadoop/yarn/lib/*:/opt/hadoop-2.8.0/share/hadoop/yarn/*:/opt/hadoop-2.8.0/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.8.0/share/hadoop/mapreduce/*:/opt/hadoop-2.8.0/share/hadoop/common/:/opt/hadoop-2.8.0/share/hadoop/common/lib/:/opt/hadoop-2.8.0/share/hadoop/common/lib/:/opt/hadoop-2.8.0/share/hadoop/hdfs/:/opt/hadoop-2.8.0/share/hadoop/hdfs/lib/:/opt/hadoop-2.8.0/share/hadoop/yarn/:/opt/hadoop-2.8.0/share/hadoop/yarn/lib/:/opt/hadoop-2.8.0/contrib/capacity-scheduler/*.jar

# set up apt
COPY sources.list /etc/apt/
Expand Down
2 changes: 1 addition & 1 deletion flink-ml-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ private void inferenceWithJava(int batchSize, boolean toStream) throws Exception
helper.like(new WorkerRole().name(), tfConfig.getWorkerNum());
helper.like(new PsRole().name(), tfConfig.getPsNum());
helper.like(new AMRole().name(), 1);
StreamGraph streamGraph = helper.matchStreamGraph(flinkEnv.getStreamGraph());
StreamGraph streamGraph = helper.matchStreamGraph(
flinkEnv.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME,false));
flinkEnv.execute();
}
}
Expand Down
2 changes: 1 addition & 1 deletion flink-ml-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion flink-ml-framework/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def build_extension(self, ext):

setup(
name='flink_ml_framework',
version='0.1.0',
version='0.2.0',
include_package_data=True,
packages=find_packages(),
ext_modules=[CMakeExtension('flink_ml_framework/flink_ml_framework')],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public class SysUtilTest {

@Test
public void getProjectVersion() {
Assert.assertEquals("0.1.0", SysUtil.getProjectVersion());
Assert.assertEquals("0.2.0", SysUtil.getProjectVersion());
}
}
2 changes: 1 addition & 1 deletion flink-ml-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion flink-ml-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class MLInputFormat<OUT> extends RichInputFormat<OUT, MLInputSplit> {
private TypeInformation<OUT> outTI;
private transient FutureTask<Void> serverFuture;
private transient MLContext mlContext;
private AtomicBoolean isClose = new AtomicBoolean(false);
private final AtomicBoolean isClose = new AtomicBoolean(false);
private transient FlinkOpHookManager hookManager;
private transient DataExchange<OUT, OUT> dataExchange;

Expand Down Expand Up @@ -169,25 +169,27 @@ public OUT nextRecord(OUT reuse) throws IOException {

@Override
public void close() throws IOException {
if (!isClose.get()) {
try {
if (serverFuture != null && !serverFuture.isCancelled()) {
serverFuture.get();
}
} catch (ExecutionException e) {
LOG.error(mlContext.getIdentity() + " node server failed {}", e.getMessage());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.warn("{} interrupted during waiting server join {}.", mlContext.getIdentity(), e.getMessage());
serverFuture.cancel(true);
} finally {
serverFuture = null;
if (mlContext != null) {
mlContext.close();
mlContext = null;
synchronized (isClose) {
if (!isClose.get()) {
try {
if (serverFuture != null && !serverFuture.isCancelled()) {
serverFuture.get();
}
} catch (ExecutionException e) {
LOG.error(mlContext.getIdentity() + " node server failed {}", e.getMessage());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.warn("{} interrupted during waiting server join {}.", mlContext.getIdentity(), e.getMessage());
serverFuture.cancel(true);
} finally {
serverFuture = null;
if (mlContext != null) {
mlContext.close();
mlContext = null;
}
}
isClose.set(true);
}
isClose.set(true);
}
if (null != hookManager) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public static void execTableJobCustom(MLConfig mlConfig, StreamExecutionEnvironm
helper.like(MLTestConstants.SINK_CONVERSION, 1);
helper.like("debug_source", 1);
helper.like(MLTestConstants.SINK, 1);
StreamGraph streamGraph = helper.matchStreamGraph(streamEnv.getStreamGraph());
StreamGraph streamGraph = helper.matchStreamGraph(streamEnv.getStreamGraph(
StreamExecutionEnvironment.DEFAULT_JOB_NAME,
false));
String plan = FlinkJobHelper.streamPlan(streamGraph);
System.out.println(plan);
streamEnv.execute();
Expand Down
2 changes: 1 addition & 1 deletion flink-ml-pytorch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion flink-ml-tensorflow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink_ai_extended</artifactId>
<groupId>com.alibaba.flink.ml</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions flink-ml-tensorflow/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ def build_extension(self, ext):

setup(
name='flink_ml_tensorflow',
version='0.1.0',
version='0.2.0',
include_package_data=True,
packages=find_packages(),
ext_modules=[CMakeExtension('flink_ml_tensorflow/flink_ml_tensorflow')],
cmdclass=dict(build_ext=CMakeBuild),
zip_safe=False,
install_requires = ['tensorflow==1.13.1', 'tensorboard==1.13.1', 'flink_ml_framework==0.1.0'],
install_requires = ['tensorflow==1.13.1', 'tensorboard==1.13.1', 'flink_ml_framework==0.2.0'],
)
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

public class TFUtils {
private static Logger LOG = LoggerFactory.getLogger(TFUtils.class);
static final TableSchema DUMMY_SCHEMA = new TableSchema(
new String[] { "a" }, new TypeInformation[] { Types.STRING });
private static AtomicInteger count = new AtomicInteger(0);

/* ***********************************************
****** API for Streaming Environment **********
Expand Down Expand Up @@ -428,7 +429,7 @@ private static <OUT> TypeInformation<OUT> getTypeInfo(Class<OUT> clazz) {


private static void writeToDummySink(Table tbl, TableEnvironment tableEnvironment) {
String sinkName = "dummy_sink" + UUID.randomUUID();
String sinkName = String.format("dummy_sink_%s", count.getAndIncrement());
tableEnvironment.registerTableSink(sinkName, new TableStreamDummySink());
tbl.insertInto(sinkName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ public void testJobTimeout() throws Exception {
TFUtils.train(streamEnv, null, tfConfig);
try {
streamEnv.execute();
} catch (JobExecutionException e) {
if (e.getCause().getMessage().matches(".*worker:0 has been idle for \\d+ seconds.*")) {
} catch (Exception e) {
System.out.println(e.getMessage());
if (e.getCause().getMessage().matches(".*Job failed.*")) {
// expected
Thread.sleep(5000);
return;
}
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public static void execTableJobCustom(MLConfig mlConfig, StreamExecutionEnvironm
helper.like(MLTestConstants.SINK_CONVERSION, 1);
helper.like("debug_source", 1);
helper.like(MLTestConstants.SINK, 1);
StreamGraph streamGraph = helper.matchStreamGraph(streamEnv.getStreamGraph());
StreamGraph streamGraph = helper.matchStreamGraph(streamEnv.getStreamGraph(
StreamExecutionEnvironment.DEFAULT_JOB_NAME,
false));
String plan = FlinkJobHelper.streamPlan(streamGraph);
System.out.println(plan);
streamEnv.execute();
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<artifactId>flink_ai_extended</artifactId>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
<version>0.1.0</version>
<version>0.2.0</version>
<description>This project extends support for AI computing framework on Flink</description>
<url>https://github.com/alibaba/flink-ai-extended</url>
<licenses>
Expand Down Expand Up @@ -58,13 +58,13 @@

<properties>
<protobuf.version>3.6.0</protobuf.version>
<flink.version>1.9.0</flink.version>
<flink.version>1.10.0</flink.version>
<scala.major.version>2.11</scala.major.version>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<grpc.version>1.13.2</grpc.version>
<log4j.version>2.11.1</log4j.version>
<hadoop.version>2.7.0</hadoop.version>
<hadoop.version>2.8.0</hadoop.version>
<tensorflow.version>1.13.1</tensorflow.version>
<tensorflow.package.name>tensorflow</tensorflow.package.name>
<netty.version>4.1.30.Final</netty.version>
Expand Down

0 comments on commit 4da60fb

Please sign in to comment.