Skip to content

Commit

Permalink
Added port option
Browse files Browse the repository at this point in the history
  • Loading branch information
dmmiller612 committed May 18, 2019
1 parent 30d7625 commit a46f4e7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 25 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
# SparkFlow

This is an implementation of Tensorflow on Spark. The goal of this library is to provide a simple, understandable interface
in using Tensorflow on Spark. With SparkFlow, you can easily integrate your deep learning model with a ML Spark Pipeline.
Underneath, SparkFlow uses a parameter server to train the Tensorflow network in a distributed manner. Through the api,
This is an implementation of TensorFlow on Spark. The goal of this library is to provide a simple, understandable interface
in using TensorFlow on Spark. With SparkFlow, you can easily integrate your deep learning model with a ML Spark Pipeline.
Underneath, SparkFlow uses a parameter server to train the TensorFlow network in a distributed manner. Through the api,
the user can specify the style of training, whether that is Hogwild or async with locking.

[![Build Status](https://api.travis-ci.org/lifeomic/sparkflow.svg?branch=master)](https://travis-ci.org/lifeomic/sparkflow)
[![license](https://img.shields.io/github/license/mashape/apistatus.svg?maxAge=2592000)](https://github.com/lifeomic/sparkflow/blob/master/LICENSE)

## Why should I use this?
While there are other libraries that use Tensorflow on Apache Spark, Sparkflow's objective is to work seemlessly
with ML Pipelines, provide a simple interface for training Tensorflow graphs, and give basic abstractions for
faster development. For training, Sparkflow uses a parameter server which lives on the driver and allows for asynchronous training. This tool
While there are other libraries that use TensorFlow on Apache Spark, SparkFlow's objective is to work seamlessly
with ML Pipelines, provide a simple interface for training TensorFlow graphs, and give basic abstractions for
faster development. For training, SparkFlow uses a parameter server which lives on the driver and allows for asynchronous training. This tool
provides faster training time when using big data.

## Installation

Install sparkflow via pip: `pip install sparkflow`
Install SparkFlow via pip: `pip install sparkflow`

SparkFlow requires Apache Spark >= 2.0, flask, and Tensorflow to all be installed.
SparkFlow requires Apache Spark >= 2.0, flask, dill, and TensorFlow to be installed. As of sparkflow >= 0.7.0, only
python >= 3.5 will be supported.


## Example
Expand Down Expand Up @@ -162,7 +163,7 @@ optimizerOptions: Json options to apply to tensorflow optimizers.

#### Optimization Configuration

As of Sparkflow version 0.2.1, Tensorflow optimization configuration options can be added to SparkAsyncDL for more control
As of SparkFlow version 0.2.1, TensorFlow optimization configuration options can be added to SparkAsyncDL for more control
over the optimizer. While the user can supply the configuration json directly, there are a few provided utility
functions that include the parameters necessary. An example is provided below.

Expand Down Expand Up @@ -208,7 +209,7 @@ Contributions are always welcome. This could be fixing a bug, changing documenta
new changes against existing tests, we have provided a Docker container which takes in an argument of the python version.
This allows the user to check their work before pushing to Github, where travis-ci will run.

For 2.7:
For 2.7 (sparkflow <= 0.6.0):
```
docker build -t local-test --build-arg PYTHON_VERSION=2.7 .
docker run --rm local-test:latest bash -i -c "python tests/dl_runner.py"
Expand Down
22 changes: 12 additions & 10 deletions sparkflow/HogwildSparkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ def __init__(self,
shuffle=True,
verbose=0,
partition_shuffles=1,
loss_callback=None):
loss_callback=None,
port=5000):
self.tensorflowGraph = tensorflowGraph
self.iters = iters
self.tfInput = tfInput
self.tfLabel = tfLabel
self.acquire_lock = acquire_lock
graph = tf.MetaGraphDef()
metagraph = json_format.Parse(tensorflowGraph, graph)
self.start_server(metagraph, optimizer)
self.start_server(metagraph, optimizer, port)
#allow server to start up on separate thread
time.sleep(serverStartup)
self.mini_batch = mini_batch
Expand All @@ -144,24 +145,25 @@ def __init__(self,
self.shuffle = shuffle
self.partition_shuffles= partition_shuffles
self.loss_callback = loss_callback
self.master_url = master_url if master_url is not None else HogwildSparkModel.determine_master()
self.master_url = master_url if master_url is not None else HogwildSparkModel.determine_master(port)
self.port = port

@staticmethod
def determine_master():
def determine_master(port):
"""
Get the url of the driver node. This is kind of crap on mac.
"""
try:
master_url = socket.gethostbyname(socket.gethostname()) + ':5000'
master_url = socket.gethostbyname(socket.gethostname()) + ':' + str(port)
return master_url
except:
return 'localhost:5000'
return 'localhost:' + str(port)

def start_server(self, tg, optimizer):
def start_server(self, tg, optimizer, port):
"""
Starts the server with a copy of the argument for weird tensorflow multiprocessing issues
"""
self.server = Process(target=self.start_service, args=(tg, optimizer))
self.server = Process(target=self.start_service, args=(tg, optimizer, port))
self.server.daemon = True
self.server.start()

Expand All @@ -172,7 +174,7 @@ def stop_server(self):
self.server.terminate()
self.server.join()

def start_service(self, metagraph, optimizer):
def start_service(self, metagraph, optimizer, port):
"""
Asynchronous flask service. This may be a bit confusing why the server starts here and not init.
It is basically because this is ran in a separate process, and when python call fork, we want to fork from this
Expand Down Expand Up @@ -241,7 +243,7 @@ def update_parameters():

return 'completed'

self.app.run(host='0.0.0.0', use_reloader=False, threaded=True, port=5000)
self.app.run(host='0.0.0.0', use_reloader=False, threaded=True, port=port)

def train(self, rdd):
try:
Expand Down
19 changes: 14 additions & 5 deletions sparkflow/tensorflow_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class SparkAsyncDL(Estimator, HasInputCol, HasPredictionCol, HasLabelCol,Pyspark
toKeepDropout = Param(Params._dummy(), "toKeepDropout", "", typeConverter=TypeConverters.toBoolean)
partitionShuffles = Param(Params._dummy(), "partitionShuffles", "", typeConverter=TypeConverters.toInt)
optimizerOptions = Param(Params._dummy(), "optimizerOptions", "", typeConverter=TypeConverters.toString)
port = Param(Params._dummy(), "port", "", typeConverter=TypeConverters.toInt)

@keyword_only
def __init__(self,
Expand All @@ -140,7 +141,8 @@ def __init__(self,
verbose=None,
labelCol=None,
partitionShuffles=None,
optimizerOptions=None):
optimizerOptions=None,
port=None):
"""
:param inputCol: Spark dataframe inputCol. Similar to other spark ml inputCols
:param tensorflowGraph: The protobuf tensorflow graph. You can use the utility function in graph_utils
Expand Down Expand Up @@ -168,6 +170,7 @@ def __init__(self,
if you have 2 partition shuffles and 100 iterations, it will run 100 iterations then reshuffle and run 100 iterations again.
The repartition hits performance and should be used with care.
:param optimizerOptions: Json options to apply to tensorflow optimizers.
:param port: Flask Port
"""
super(SparkAsyncDL, self).__init__()
self._setDefault(inputCol='transformed', tensorflowGraph='',
Expand All @@ -176,7 +179,7 @@ def __init__(self,
miniBatchSize=128, miniStochasticIters=-1,
shufflePerIter=True, tfDropout=None, acquireLock=False, verbose=0,
iters=1000, toKeepDropout=False, predictionCol='predicted', labelCol=None,
partitionShuffles=1, optimizerOptions=None)
partitionShuffles=1, optimizerOptions=None, port=5000)
kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand All @@ -201,7 +204,8 @@ def setParams(self,
verbose=None,
labelCol=None,
partitionShuffles=None,
optimizerOptions=None):
optimizerOptions=None,
port=None):
kwargs = self._input_kwargs
return self._set(**kwargs)

Expand Down Expand Up @@ -256,6 +260,9 @@ def getPartitionShuffles(self):
def getOptimizerOptions(self):
return self.getOrDefault(self.optimizerOptions)

def getPort(self):
return self.getOrDefault(self.port)

def _fit(self, dataset):
inp_col = self.getInputCol()
graph_json = self.getTensorflowGraph()
Expand All @@ -278,6 +285,7 @@ def _fit(self, dataset):
tf_dropout = self.getTfDropout()
to_keep_dropout = self.getToKeepDropout()
partition_shuffles = self.getPartitionShuffles()
port = self.getPort()

df = dataset.rdd.map(lambda x: handle_data(x, inp_col, label))
df = df.coalesce(partitions) if partitions < df.getNumPartitions() else df
Expand All @@ -288,13 +296,14 @@ def _fit(self, dataset):
tfInput=tf_input,
tfLabel=tf_label,
optimizer=tf_optimizer,
master_url=SparkContext._active_spark_context.getConf().get("spark.driver.host").__str__() + ":5000",
master_url=SparkContext._active_spark_context.getConf().get("spark.driver.host").__str__() + ":" + str(port),
acquire_lock=acquire_lock,
mini_batch=mbs,
mini_stochastic_iters=msi,
shuffle=spi,
verbose=verbose,
partition_shuffles=partition_shuffles
partition_shuffles=partition_shuffles,
port=port
)

weights = spark_model.train(df)
Expand Down
20 changes: 20 additions & 0 deletions tests/dl_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,26 @@ def test_auto_encoder(self):
encoded = spark_model.fit(processed).transform(processed).take(10)
print(encoded[0]['predicted'])

def test_change_port(self):
processed = self.generate_random_data()
mg = build_graph(SparkFlowTests.create_random_model)

spark_model = SparkAsyncDL(
inputCol='features',
tensorflowGraph=mg,
tfInput='x:0',
tfLabel='y:0',
tfOutput='outer/Sigmoid:0',
tfOptimizer='adam',
tfLearningRate=.1,
iters=35,
partitions=2,
predictionCol='predicted',
labelCol='label',
port=3000
)
self.handle_assertions(spark_model, processed)


if __name__ == '__main__':
unittest.main()

0 comments on commit a46f4e7

Please sign in to comment.