diff --git a/docs/.buildinfo b/docs/.buildinfo index 54c36710..038d092e 100644 --- a/docs/.buildinfo +++ b/docs/.buildinfo @@ -1,4 +1,4 @@ # Sphinx build info version 1 # This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. -config: c31f3f4c132de601af2c7dd3d80ea76f +config: 7466c912b353c5a104638dccc44e9814 tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/docs/_modules/index.html b/docs/_modules/index.html index 821b0c21..ee9b96cc 100644 --- a/docs/_modules/index.html +++ b/docs/_modules/index.html @@ -6,7 +6,7 @@ - Overview: module code — TensorFlowOnSpark 1.3.0 documentation + Overview: module code — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

Navigation

  • modules |
  • - + @@ -78,11 +78,11 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/TFCluster.html b/docs/_modules/tensorflowonspark/TFCluster.html index f7f69a35..aeba3ce4 100644 --- a/docs/_modules/tensorflowonspark/TFCluster.html +++ b/docs/_modules/tensorflowonspark/TFCluster.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFCluster — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFCluster — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -416,12 +416,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/TFManager.html b/docs/_modules/tensorflowonspark/TFManager.html index 05236afc..1d004fc6 100644 --- a/docs/_modules/tensorflowonspark/TFManager.html +++ b/docs/_modules/tensorflowonspark/TFManager.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFManager — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFManager — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -48,6 +48,7 @@

    Source code for tensorflowonspark.TFManager

     from multiprocessing.managers import BaseManager
     from multiprocessing import JoinableQueue
     
    +
     
    [docs]class TFManager(BaseManager): """Python multiprocessing.Manager for distributed, multi-process communication."""
    pass @@ -58,18 +59,22 @@

    Source code for tensorflowonspark.TFManager

     qdict = {}        # dictionary of queues
     kdict = {}        # dictionary of key-values
     
    +
     def _get(key):
       return kdict[key]
     
    +
     def _set(key, value):
       kdict[key] = value
     
    +
     def _get_queue(qname):
       try:
         return qdict[qname]
       except KeyError:
         return None
     
    +
     
    [docs]def start(authkey, queues, mode='local'): """Create a new multiprocess.Manager (or return existing one). @@ -91,12 +96,13 @@

    Source code for tensorflowonspark.TFManager

       TFManager.register('get', callable=lambda key: _get(key))
       TFManager.register('set', callable=lambda key, value: _set(key, value))
       if mode == 'remote':
    -    mgr = TFManager(address=('',0), authkey=authkey)
    +    mgr = TFManager(address=('', 0), authkey=authkey)
       else:
         mgr = TFManager(authkey=authkey)
       mgr.start()
    return mgr +
    [docs]def connect(address, authkey): """Connect to a multiprocess.Manager. @@ -113,7 +119,6 @@

    Source code for tensorflowonspark.TFManager

       m = TFManager(address, authkey=authkey)
       m.connect()
    return m -
    @@ -146,12 +151,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/TFNode.html b/docs/_modules/tensorflowonspark/TFNode.html index 4f68f2df..31e22ff6 100644 --- a/docs/_modules/tensorflowonspark/TFNode.html +++ b/docs/_modules/tensorflowonspark/TFNode.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFNode — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFNode — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -108,7 +108,7 @@

    Source code for tensorflowonspark.TFNode

       cluster_spec = ctx.cluster_spec
       logging.info("{0}: Cluster spec: {1}".format(ctx.worker_num, cluster_spec))
     
    -  if tf.test.is_built_with_cuda():
    +  if tf.test.is_built_with_cuda() and num_gpus > 0:
         # GPU
         gpu_initialized = False
         retries = 3
    @@ -358,12 +358,12 @@ 

    Navigation

  • modules |
  • - +
    diff --git a/docs/_modules/tensorflowonspark/TFSparkNode.html b/docs/_modules/tensorflowonspark/TFSparkNode.html index c4ac9c9d..e0859ddd 100644 --- a/docs/_modules/tensorflowonspark/TFSparkNode.html +++ b/docs/_modules/tensorflowonspark/TFSparkNode.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFSparkNode — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFSparkNode — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -593,12 +593,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/dfutil.html b/docs/_modules/tensorflowonspark/dfutil.html index c1acbcd2..92c18e50 100644 --- a/docs/_modules/tensorflowonspark/dfutil.html +++ b/docs/_modules/tensorflowonspark/dfutil.html @@ -6,7 +6,7 @@ - tensorflowonspark.dfutil — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.dfutil — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -147,7 +147,10 @@

    Source code for tensorflowonspark.dfutil

           elif dtype in int64_dtypes:
             feature = (name, tf.train.Feature(int64_list=tf.train.Int64List(value=[row[name]])))
           elif dtype in bytes_dtypes:
    -        feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(row[name])])))
    +        if dtype == 'binary':
    +          feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(row[name])])))
    +        else:
    +          feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(row[name]).encode('utf-8')])))
           elif dtype in float_list_dtypes:
             feature = (name, tf.train.Feature(float_list=tf.train.FloatList(value=row[name])))
           elif dtype in int64_list_dtypes:
    @@ -219,16 +222,15 @@ 

    Source code for tensorflowonspark.dfutil

       """
       # convert from protobuf-like dict to DataFrame-friendly dict
       def _get_value(k, v):
    -    # special handling for binary features
    -    if k in binary_features:
    -      return bytearray(v.bytes_list.value[0])
    -
         if v.int64_list.value:
           result = v.int64_list.value
         elif v.float_list.value:
           result = v.float_list.value
    -    else:
    -      result = v.bytes_list.value
    +    else:  # string or bytearray
    +      if k in binary_features:
    +        return bytearray(v.bytes_list.value[0])
    +      else:
    +        return v.bytes_list.value[0].decode('utf-8')
     
         if len(result) > 1:         # represent multi-item tensors as python lists
           return list(result)
    @@ -278,12 +280,12 @@ 

    Navigation

  • modules |
  • - +
    diff --git a/docs/_modules/tensorflowonspark/gpu_info.html b/docs/_modules/tensorflowonspark/gpu_info.html index 58479d66..7f6687cb 100644 --- a/docs/_modules/tensorflowonspark/gpu_info.html +++ b/docs/_modules/tensorflowonspark/gpu_info.html @@ -6,7 +6,7 @@ - tensorflowonspark.gpu_info — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.gpu_info — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - +
    @@ -233,12 +233,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/marker.html b/docs/_modules/tensorflowonspark/marker.html index 7e12be10..a74dfaae 100644 --- a/docs/_modules/tensorflowonspark/marker.html +++ b/docs/_modules/tensorflowonspark/marker.html @@ -6,7 +6,7 @@ - tensorflowonspark.marker — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.marker — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -86,12 +86,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/pipeline.html b/docs/_modules/tensorflowonspark/pipeline.html index a81b546e..2810da05 100644 --- a/docs/_modules/tensorflowonspark/pipeline.html +++ b/docs/_modules/tensorflowonspark/pipeline.html @@ -6,7 +6,7 @@ - tensorflowonspark.pipeline — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.pipeline — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -429,7 +429,7 @@

    Source code for tensorflowonspark.pipeline

             assert local_args.tfrecord_dir, "Please specify --tfrecord_dir to export DataFrame to TFRecord."
             if self.getInputMapping():
               # if input mapping provided, filter only required columns before exporting
    -          dataset = dataset.select(self.getInputMapping().keys())
    +          dataset = dataset.select(list(self.getInputMapping()))
             logging.info("Exporting DataFrame {} as TFRecord to: {}".format(dataset.dtypes, local_args.tfrecord_dir))
             dfutil.saveAsTFRecords(dataset, local_args.tfrecord_dir)
             logging.info("Done saving")
    @@ -439,7 +439,7 @@ 

    Source code for tensorflowonspark.pipeline

                                 local_args.tensorboard, local_args.input_mode, driver_ps_nodes=local_args.driver_ps_nodes)
         if local_args.input_mode == TFCluster.InputMode.SPARK:
           # feed data, using a deterministic order for input columns (lexicographic by key)
    -      input_cols = sorted(self.getInputMapping().keys())
    +      input_cols = sorted(self.getInputMapping())
           cluster.train(dataset.select(input_cols).rdd, local_args.epochs)
         cluster.shutdown()
     
    @@ -711,12 +711,12 @@ 

    Navigation

  • modules |
  • - +
    diff --git a/docs/_modules/tensorflowonspark/reservation.html b/docs/_modules/tensorflowonspark/reservation.html index 7326a130..e0e72c04 100644 --- a/docs/_modules/tensorflowonspark/reservation.html +++ b/docs/_modules/tensorflowonspark/reservation.html @@ -6,7 +6,7 @@ - tensorflowonspark.reservation — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.reservation — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - +
    @@ -328,12 +328,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/_modules/tensorflowonspark/util.html b/docs/_modules/tensorflowonspark/util.html index 5e3e8307..2f33bc49 100644 --- a/docs/_modules/tensorflowonspark/util.html +++ b/docs/_modules/tensorflowonspark/util.html @@ -6,7 +6,7 @@ - tensorflowonspark.util — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.util — TensorFlowOnSpark 1.3.1 documentation @@ -25,7 +25,7 @@

    Navigation

  • modules |
  • - + @@ -106,12 +106,12 @@

    Navigation

  • modules |
  • - + diff --git a/docs/genindex.html b/docs/genindex.html index 834abf32..c825348b 100644 --- a/docs/genindex.html +++ b/docs/genindex.html @@ -7,7 +7,7 @@ - Index — TensorFlowOnSpark 1.3.0 documentation + Index — TensorFlowOnSpark 1.3.1 documentation @@ -26,7 +26,7 @@

    Navigation

  • modules |
  • - + @@ -617,11 +617,11 @@

    Navigation

  • modules |
  • - + diff --git a/docs/index.html b/docs/index.html index 97e0b617..4c237ca2 100644 --- a/docs/index.html +++ b/docs/index.html @@ -6,7 +6,7 @@ - Welcome to TensorFlowOnSpark’s documentation! — TensorFlowOnSpark 1.3.0 documentation + Welcome to TensorFlowOnSpark’s documentation! — TensorFlowOnSpark 1.3.1 documentation @@ -29,7 +29,7 @@

    Navigation

  • next |
  • - + @@ -122,11 +122,11 @@

    Navigation

  • next |
  • - + diff --git a/docs/py-modindex.html b/docs/py-modindex.html index 67eeed51..332cdc11 100644 --- a/docs/py-modindex.html +++ b/docs/py-modindex.html @@ -6,7 +6,7 @@ - Python Module Index — TensorFlowOnSpark 1.3.0 documentation + Python Module Index — TensorFlowOnSpark 1.3.1 documentation @@ -28,7 +28,7 @@

    Navigation

  • modules |
  • - + @@ -137,11 +137,11 @@

    Navigation

  • modules |
  • - + diff --git a/docs/search.html b/docs/search.html index fecf7909..beff06fd 100644 --- a/docs/search.html +++ b/docs/search.html @@ -6,7 +6,7 @@ - Search — TensorFlowOnSpark 1.3.0 documentation + Search — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • modules |
  • - + @@ -84,11 +84,11 @@

    Navigation

  • modules |
  • - + diff --git a/docs/tensorflowonspark.TFCluster.html b/docs/tensorflowonspark.TFCluster.html index 7d51bfe6..55e19271 100644 --- a/docs/tensorflowonspark.TFCluster.html +++ b/docs/tensorflowonspark.TFCluster.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFCluster module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFCluster module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -317,12 +317,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.TFManager.html b/docs/tensorflowonspark.TFManager.html index 4514bb1f..19f26c2f 100644 --- a/docs/tensorflowonspark.TFManager.html +++ b/docs/tensorflowonspark.TFManager.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFManager module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFManager module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -150,12 +150,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.TFNode.html b/docs/tensorflowonspark.TFNode.html index 5a9d7306..c53f1415 100644 --- a/docs/tensorflowonspark.TFNode.html +++ b/docs/tensorflowonspark.TFNode.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFNode module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFNode module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -297,12 +297,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.TFSparkNode.html b/docs/tensorflowonspark.TFSparkNode.html index 19aef37c..29f4c1e5 100644 --- a/docs/tensorflowonspark.TFSparkNode.html +++ b/docs/tensorflowonspark.TFSparkNode.html @@ -6,7 +6,7 @@ - tensorflowonspark.TFSparkNode module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.TFSparkNode module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -280,12 +280,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.dfutil.html b/docs/tensorflowonspark.dfutil.html index 3b64749e..469b588f 100644 --- a/docs/tensorflowonspark.dfutil.html +++ b/docs/tensorflowonspark.dfutil.html @@ -6,7 +6,7 @@ - tensorflowonspark.dfutil module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.dfutil module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -241,12 +241,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.gpu_info.html b/docs/tensorflowonspark.gpu_info.html index 11d0ed40..e125f548 100644 --- a/docs/tensorflowonspark.gpu_info.html +++ b/docs/tensorflowonspark.gpu_info.html @@ -6,7 +6,7 @@ - tensorflowonspark.gpu_info module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.gpu_info module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -124,12 +124,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.html b/docs/tensorflowonspark.html index c8ee6eb6..1fa782ac 100644 --- a/docs/tensorflowonspark.html +++ b/docs/tensorflowonspark.html @@ -6,7 +6,7 @@ - tensorflowonspark package — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark package — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -122,11 +122,11 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.marker.html b/docs/tensorflowonspark.marker.html index 35a31d55..8d609940 100644 --- a/docs/tensorflowonspark.marker.html +++ b/docs/tensorflowonspark.marker.html @@ -6,7 +6,7 @@ - tensorflowonspark.marker module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.marker module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -111,12 +111,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.pipeline.html b/docs/tensorflowonspark.pipeline.html index 1ab4f1d5..73338af5 100644 --- a/docs/tensorflowonspark.pipeline.html +++ b/docs/tensorflowonspark.pipeline.html @@ -6,7 +6,7 @@ - tensorflowonspark.pipeline module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.pipeline module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -627,12 +627,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.reservation.html b/docs/tensorflowonspark.reservation.html index 1152ea15..fd8141a3 100644 --- a/docs/tensorflowonspark.reservation.html +++ b/docs/tensorflowonspark.reservation.html @@ -6,7 +6,7 @@ - tensorflowonspark.reservation module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.reservation module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -286,12 +286,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.reservation_client.html b/docs/tensorflowonspark.reservation_client.html index a34d1a84..ca178efb 100644 --- a/docs/tensorflowonspark.reservation_client.html +++ b/docs/tensorflowonspark.reservation_client.html @@ -6,7 +6,7 @@ - tensorflowonspark.reservation_client module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.reservation_client module — TensorFlowOnSpark 1.3.1 documentation @@ -33,7 +33,7 @@

    Navigation

  • previous |
  • - + @@ -97,12 +97,12 @@

    Navigation

  • previous |
  • - + diff --git a/docs/tensorflowonspark.util.html b/docs/tensorflowonspark.util.html index 8a6751d3..b4d94800 100644 --- a/docs/tensorflowonspark.util.html +++ b/docs/tensorflowonspark.util.html @@ -6,7 +6,7 @@ - tensorflowonspark.util module — TensorFlowOnSpark 1.3.0 documentation + tensorflowonspark.util module — TensorFlowOnSpark 1.3.1 documentation @@ -29,7 +29,7 @@

    Navigation

  • previous |
  • - + @@ -111,12 +111,12 @@

    Navigation

  • previous |
  • - + diff --git a/examples/mnist/estimator/mnist_estimator.py b/examples/mnist/estimator/mnist_estimator.py index 9aef2d22..32a5d7ab 100644 --- a/examples/mnist/estimator/mnist_estimator.py +++ b/examples/mnist/estimator/mnist_estimator.py @@ -179,8 +179,10 @@ def main(args, ctx): parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions") parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1) parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) + parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") + args = parser.parse_args() print("args:", args) - cluster = TFCluster.run(sc, main, args, args.cluster_size, args.num_ps, tensorboard=False, input_mode=TFCluster.InputMode.TENSORFLOW, log_dir=args.model, master_node='master') + cluster = TFCluster.run(sc, main, args, args.cluster_size, args.num_ps, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, log_dir=args.model, master_node='master') cluster.shutdown() diff --git a/examples/mnist/keras/mnist_mlp_estimator.py b/examples/mnist/keras/mnist_mlp_estimator.py new file mode 100644 index 00000000..0d66e68c --- /dev/null +++ b/examples/mnist/keras/mnist_mlp_estimator.py @@ -0,0 +1,133 @@ +import numpy +import tensorflow as tf +from tensorflow.python import keras +from tensorflow.python.keras.models import Sequential +from tensorflow.python.keras.layers import Dense, Dropout +from tensorflow.python.keras.optimizers import RMSprop +from tensorflowonspark import TFNode + + +def main_fun(args, ctx): + IMAGE_PIXELS = 28 + num_classes = 10 + + # use Keras API to load data + from tensorflow.python.keras.datasets import mnist + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train = x_train.reshape(60000, 784) + x_test = x_test.reshape(10000, 784) + x_train = x_train.astype('float32') / 255 + x_test = x_test.astype('float32') / 255 + + # convert class vectors to binary class matrices + y_train = keras.utils.to_categorical(y_train, num_classes) + y_test = keras.utils.to_categorical(y_test, num_classes) + + # setup a Keras model + model = Sequential() + model.add(Dense(512, activation='relu', input_shape=(784,))) + model.add(Dropout(0.2)) + model.add(Dense(512, activation='relu')) + model.add(Dropout(0.2)) + model.add(Dense(10, activation='softmax')) + model.compile(loss='categorical_crossentropy', + optimizer=RMSprop(), + metrics=['accuracy']) + model.summary() + + # convert Keras model to tf.estimator + estimator = tf.keras.estimator.model_to_estimator(model, model_dir=args.model_dir) + + # setup train_input_fn for InputMode.TENSORFLOW or InputMode.SPARK + if args.input_mode == 'tf': + train_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"dense_1_input": x_train}, + y=y_train, + batch_size=128, + num_epochs=None, + shuffle=True) + else: # 'spark' + tf_feed = TFNode.DataFeed(ctx.mgr) + + def rdd_generator(): + while not tf_feed.should_stop(): + batch = tf_feed.next_batch(1) + if len(batch) > 0: + record = batch[0] + image = numpy.array(record[0]).astype(numpy.float32) / 255.0 + label = numpy.array(record[1]).astype(numpy.float32) + yield (image, label) + + def train_input_fn(): + ds = tf.data.Dataset.from_generator(rdd_generator, + (tf.float32, tf.float32), + (tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10]))) + ds = ds.batch(args.batch_size) + return ds + + # eval_input_fn ALWAYS uses data loaded in memory, since InputMode.SPARK can only feed one RDD at a time + eval_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"dense_1_input": x_test}, + y=y_test, + num_epochs=args.epochs, + shuffle=False) + + # setup tf.estimator.train_and_evaluate() + train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=args.steps) + eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn) + tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) + + # export a saved_model, if export_dir provided + if args.export_dir: + def serving_input_receiver_fn(): + """An input receiver that expects a serialized tf.Example.""" + serialized_tf_example = tf.placeholder(dtype=tf.string, + shape=[args.batch_size], + name='input_example_tensor') + receiver_tensors = {'dense_1_input': serialized_tf_example} + feature_spec = {'dense_1_input': tf.FixedLenFeature(784, tf.string)} + features = tf.parse_example(serialized_tf_example, feature_spec) + return tf.estimator.export.ServingInputReceiver(features, receiver_tensors) + + estimator.export_savedmodel(args.export_dir, serving_input_receiver_fn) + + +if __name__ == '__main__': + import argparse + from pyspark.context import SparkContext + from pyspark.conf import SparkConf + from tensorflowonspark import TFCluster + + sc = SparkContext(conf=SparkConf().setAppName("mnist_mlp")) + executors = sc._conf.get("spark.executor.instances") + num_executors = int(executors) if executors is not None else 1 + num_ps = 1 + + parser = argparse.ArgumentParser() + parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100) + parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) + parser.add_argument("--epochs", help="number of epochs of training data", type=int, default=1) + parser.add_argument("--export_dir", help="directory to export saved_model") + parser.add_argument("--images", help="HDFS path to MNIST images in parallelized CSV format") + parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf") + parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format") + parser.add_argument("--model_dir", help="directory to write model checkpoints") + parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1) + parser.add_argument("--steps", help="max number of steps to train", type=int, default=2000) + parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") + + args = parser.parse_args() + print("args:", args) + + if args.input_mode == 'tf': + # for TENSORFLOW mode, each node will load/train entire dataset in memory per original example + cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir, master_node='master') + cluster.shutdown() + else: # 'spark' + # for SPARK mode, just use CSV format as an example + images = sc.textFile(args.images).map(lambda ln: [float(x) for x in ln.split(',')]) + labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')]) + dataRDD = images.zip(labels) + cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir, master_node='master') + cluster.train(dataRDD, args.epochs) + cluster.shutdown() diff --git a/pom.xml b/pom.xml index b06881e8..6435bb35 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 2.11.8 3.0.3 3.7.0 - 1.4.0 + 1.8.0 diff --git a/setup.py b/setup.py index 0a423c72..d0a8cb51 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name = 'tensorflowonspark', packages = ['tensorflowonspark'], - version = '1.3.0', + version = '1.3.1', description = 'Deep learning with TensorFlow on Apache Spark clusters', author = 'Yahoo, Inc.', url = 'https://github.com/yahoo/TensorFlowOnSpark', diff --git a/tensorflowonspark/TFManager.py b/tensorflowonspark/TFManager.py index 6cc21154..a83f8c03 100644 --- a/tensorflowonspark/TFManager.py +++ b/tensorflowonspark/TFManager.py @@ -10,6 +10,7 @@ from multiprocessing.managers import BaseManager from multiprocessing import JoinableQueue + class TFManager(BaseManager): """Python multiprocessing.Manager for distributed, multi-process communication.""" pass @@ -20,18 +21,22 @@ class TFManager(BaseManager): qdict = {} # dictionary of queues kdict = {} # dictionary of key-values + def _get(key): return kdict[key] + def _set(key, value): kdict[key] = value + def _get_queue(qname): try: return qdict[qname] except KeyError: return None + def start(authkey, queues, mode='local'): """Create a new multiprocess.Manager (or return existing one). @@ -53,12 +58,13 @@ def start(authkey, queues, mode='local'): TFManager.register('get', callable=lambda key: _get(key)) TFManager.register('set', callable=lambda key, value: _set(key, value)) if mode == 'remote': - mgr = TFManager(address=('',0), authkey=authkey) + mgr = TFManager(address=('', 0), authkey=authkey) else: mgr = TFManager(authkey=authkey) mgr.start() return mgr + def connect(address, authkey): """Connect to a multiprocess.Manager. @@ -75,4 +81,3 @@ def connect(address, authkey): m = TFManager(address, authkey=authkey) m.connect() return m - diff --git a/tensorflowonspark/dfutil.py b/tensorflowonspark/dfutil.py index 59c47ece..e27cec93 100644 --- a/tensorflowonspark/dfutil.py +++ b/tensorflowonspark/dfutil.py @@ -109,7 +109,10 @@ def _toTFFeature(name, dtype, row): elif dtype in int64_dtypes: feature = (name, tf.train.Feature(int64_list=tf.train.Int64List(value=[row[name]]))) elif dtype in bytes_dtypes: - feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(row[name])]))) + if dtype == 'binary': + feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(row[name])]))) + else: + feature = (name, tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(row[name]).encode('utf-8')]))) elif dtype in float_list_dtypes: feature = (name, tf.train.Feature(float_list=tf.train.FloatList(value=row[name]))) elif dtype in int64_list_dtypes: @@ -181,16 +184,15 @@ def fromTFExample(iter, binary_features=[]): """ # convert from protobuf-like dict to DataFrame-friendly dict def _get_value(k, v): - # special handling for binary features - if k in binary_features: - return bytearray(v.bytes_list.value[0]) - if v.int64_list.value: result = v.int64_list.value elif v.float_list.value: result = v.float_list.value - else: - result = v.bytes_list.value + else: # string or bytearray + if k in binary_features: + return bytearray(v.bytes_list.value[0]) + else: + return v.bytes_list.value[0].decode('utf-8') if len(result) > 1: # represent multi-item tensors as python lists return list(result) diff --git a/tensorflowonspark/pipeline.py b/tensorflowonspark/pipeline.py index 825af1d4..73d9722a 100755 --- a/tensorflowonspark/pipeline.py +++ b/tensorflowonspark/pipeline.py @@ -391,7 +391,7 @@ def _fit(self, dataset): assert local_args.tfrecord_dir, "Please specify --tfrecord_dir to export DataFrame to TFRecord." if self.getInputMapping(): # if input mapping provided, filter only required columns before exporting - dataset = dataset.select(self.getInputMapping().keys()) + dataset = dataset.select(list(self.getInputMapping())) logging.info("Exporting DataFrame {} as TFRecord to: {}".format(dataset.dtypes, local_args.tfrecord_dir)) dfutil.saveAsTFRecords(dataset, local_args.tfrecord_dir) logging.info("Done saving") @@ -401,7 +401,7 @@ def _fit(self, dataset): local_args.tensorboard, local_args.input_mode, driver_ps_nodes=local_args.driver_ps_nodes) if local_args.input_mode == TFCluster.InputMode.SPARK: # feed data, using a deterministic order for input columns (lexicographic by key) - input_cols = sorted(self.getInputMapping().keys()) + input_cols = sorted(self.getInputMapping()) cluster.train(dataset.select(input_cols).rdd, local_args.epochs) cluster.shutdown() diff --git a/test/test.py b/test/test.py index d948e7b7..27a1eed7 100644 --- a/test/test.py +++ b/test/test.py @@ -4,6 +4,7 @@ from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession + class SparkTest(unittest.TestCase): """Base class for unittests using Spark. Sets up and tears down a cluster per test class""" diff --git a/test/test_TFCluster.py b/test/test_TFCluster.py index d51fe271..ac7d7c70 100644 --- a/test/test_TFCluster.py +++ b/test/test_TFCluster.py @@ -2,6 +2,7 @@ import test from tensorflowonspark import TFCluster, TFNode + class TFClusterTest(test.SparkTest): @classmethod def setUpClass(cls): @@ -17,12 +18,12 @@ def _map_fun(args, ctx): import tensorflow as tf x = tf.constant(args['x']) y = tf.constant(args['y']) - sum = tf.add(x,y) + sum = tf.add(x, y) with tf.Session() as sess: result = sess.run([sum]) assert result[0] == 3 - args = { 'x':1, 'y':2 } + args = {'x': 1, 'y': 2} cluster = TFCluster.run(self.sc, _map_fun, tf_args=args, num_executors=self.num_workers, num_ps=0) cluster.shutdown() @@ -41,20 +42,20 @@ def _map_fun(args, ctx): sq = tf.square(x) init_op = tf.global_variables_initializer() sv = tf.train.Supervisor(is_chief=(ctx.task_index == 0), - init_op=init_op) + init_op=init_op) with sv.managed_session(server.target) as sess: tf_feed = TFNode.DataFeed(ctx.mgr, False) while not sv.should_stop() and not tf_feed.should_stop(): - outputs = sess.run([sq], feed_dict={ x: tf_feed.next_batch(10) }) + outputs = sess.run([sq], feed_dict={x: tf_feed.next_batch(10)}) tf_feed.batch_results(outputs[0]) sv.stop() - input = [ [x] for x in range(1000) ] # set up input as tensors of shape [1] to match placeholder + input = [[x] for x in range(1000)] # set up input as tensors of shape [1] to match placeholder rdd = self.sc.parallelize(input, 10) cluster = TFCluster.run(self.sc, _map_fun, tf_args={}, num_executors=self.num_workers, num_ps=0, input_mode=TFCluster.InputMode.SPARK) rdd_out = cluster.inference(rdd) rdd_sum = rdd_out.sum() - self.assertEqual(rdd_sum, sum( [x * x for x in range(1000)] )) + self.assertEqual(rdd_sum, sum([x * x for x in range(1000)])) cluster.shutdown() diff --git a/test/test_TFNode.py b/test/test_TFNode.py index a6c71d9b..d6be7d2b 100644 --- a/test/test_TFNode.py +++ b/test/test_TFNode.py @@ -3,6 +3,7 @@ import unittest from tensorflowonspark import TFManager, TFNode + class TFNodeTest(unittest.TestCase): def test_hdfs_path(self): """Normalization of absolution & relative string paths depending on filesystem""" @@ -25,7 +26,7 @@ def test_hdfs_path(self): def test_datafeed(self): """TFNode.DataFeed basic operations""" - mgr = TFManager.start('abc', ['input', 'output'], 'local') + mgr = TFManager.start('abc'.encode('utf-8'), ['input', 'output'], 'local') # insert 10 numbers followed by an end-of-feed marker q = mgr.get_queue('input') diff --git a/test/test_dfutil.py b/test/test_dfutil.py index dd0e65ef..047c00a7 100644 --- a/test/test_dfutil.py +++ b/test/test_dfutil.py @@ -5,6 +5,7 @@ from tensorflowonspark import dfutil + class DFUtilTest(test.SparkTest): @classmethod def setUpClass(cls): @@ -30,7 +31,7 @@ def test_dfutils(self): row1 = ('text string', 1, [2, 3, 4, 5], -1.1, [-2.2, -3.3, -4.4, -5.5], bytearray(b'\xff\xfe\xfd\xfc')) rdd = self.sc.parallelize([row1]) df1 = self.spark.createDataFrame(rdd, ['a', 'b', 'c', 'd', 'e', 'f']) - print ("schema: {}".format(df1.schema)) + print("schema: {}".format(df1.schema)) # save the DataFrame as TFRecords dfutil.saveAsTFRecords(df1, self.tfrecord_dir) diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 7d666f0a..2da38449 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -2,11 +2,13 @@ import os import shutil import test +import time import unittest from tensorflowonspark import TFCluster, dfutil from tensorflowonspark.pipeline import HasBatchSize, HasSteps, Namespace, TFEstimator, TFParams + class PipelineTest(test.SparkTest): @classmethod def setUpClass(cls): @@ -14,13 +16,13 @@ def setUpClass(cls): # create an artificial training dataset of two features with labels computed from known weights np.random.seed(1234) - cls.features = np.random.rand(1000,2) + cls.features = np.random.rand(1000, 2) cls.weights = np.array([3.14, 1.618]) cls.labels = np.matmul(cls.features, cls.weights) # convert to Python types for use with Spark DataFrames - cls.train_examples = [ (cls.features[i].tolist(), [cls.labels[i].item()]) for i in range(1000) ] + cls.train_examples = [(cls.features[i].tolist(), [cls.labels[i].item()]) for i in range(1000)] # create a simple test dataset - cls.test_examples = [ ([1.0, 1.0], [0.0]) ] + cls.test_examples = [([1.0, 1.0], [0.0])] # define model_dir and export_dir for tests cls.model_dir = os.getcwd() + os.sep + "test_model" @@ -44,13 +46,13 @@ def tearDown(self): def test_namespace(self): """Namespace class initializers""" # from dictionary - d = { 'string': 'foo', 'integer': 1, 'float': 3.14, 'array': [1,2,3], 'map': {'a':1, 'b':2} } + d = {'string': 'foo', 'integer': 1, 'float': 3.14, 'array': [1, 2, 3], 'map': {'a': 1, 'b': 2}} n1 = Namespace(d) self.assertEqual(n1.string, 'foo') self.assertEqual(n1.integer, 1) self.assertEqual(n1.float, 3.14) - self.assertEqual(n1.array, [1,2,3]) - self.assertEqual(n1.map, {'a':1, 'b':2}) + self.assertEqual(n1.array, [1, 2, 3]) + self.assertEqual(n1.map, {'a': 1, 'b': 2}) self.assertTrue('string' in n1) self.assertFalse('extra' in n1) @@ -59,8 +61,8 @@ def test_namespace(self): self.assertEqual(n2.string, 'foo') self.assertEqual(n2.integer, 1) self.assertEqual(n2.float, 3.14) - self.assertEqual(n2.array, [1,2,3]) - self.assertEqual(n2.map, {'a':1, 'b':2}) + self.assertEqual(n2.array, [1, 2, 3]) + self.assertEqual(n2.map, {'a': 1, 'b': 2}) self.assertTrue('string' in n2) self.assertFalse('extra' in n2) @@ -76,10 +78,10 @@ def __init__(self, args): super(Foo, self).__init__() self.args = args - n = Namespace({ 'a': 1, 'b': 2 }) + n = Namespace({'a': 1, 'b': 2}) f = Foo(n).setBatchSize(10).setSteps(100) combined_args = f.merge_args_params() - expected_args = Namespace({ 'a': 1, 'b': 2, 'batch_size': 10, 'steps': 100 }) + expected_args = Namespace({'a': 1, 'b': 2, 'batch_size': 10, 'steps': 100}) self.assertEqual(combined_args, expected_args) def test_spark_checkpoint(self): @@ -91,7 +93,7 @@ def test_spark_checkpoint(self): # train model args = {} estimator = TFEstimator(self.get_function('spark/train'), args) \ - .setInputMapping( { 'col1': 'x', 'col2': 'y_' }) \ + .setInputMapping({'col1': 'x', 'col2': 'y_'}) \ .setModelDir(self.model_dir) \ .setClusterSize(self.num_workers) \ .setNumPS(1) \ @@ -104,8 +106,8 @@ def test_spark_checkpoint(self): testDF = self.spark.createDataFrame(self.test_examples, ['c1', 'c2']) # test model from checkpoint, referencing tensors directly - model.setInputMapping( { 'c1': 'x' }) \ - .setOutputMapping( { 'y': 'cout' }) + model.setInputMapping({'c1': 'x'}) \ + .setOutputMapping({'y': 'cout'}) preds = model.transform(testDF).head() # take first/only result, e.g. [ Row(cout=[4.758000373840332])] pred = preds.cout[0] # unpack scalar from tensor self.assertAlmostEqual(pred, np.sum(self.weights), 5) @@ -119,7 +121,7 @@ def test_spark_saved_model(self): # train model args = {} estimator = TFEstimator(self.get_function('spark/train'), args) \ - .setInputMapping( { 'col1': 'x', 'col2': 'y_' }) \ + .setInputMapping({'col1': 'x', 'col2': 'y_'}) \ .setExportDir(self.export_dir) \ .setClusterSize(self.num_workers) \ .setNumPS(1) \ @@ -134,8 +136,8 @@ def test_spark_saved_model(self): # test saved_model using exported signature model.setTagSet('test_tag') \ .setSignatureDefKey('test_key') \ - .setInputMapping({ 'c1': 'features' }) \ - .setOutputMapping({ 'prediction': 'cout' }) + .setInputMapping({'c1': 'features'}) \ + .setOutputMapping({'prediction': 'cout'}) preds = model.transform(testDF).head() # take first/only result pred = preds.cout[0] # unpack scalar from tensor expected = np.sum(self.weights) @@ -144,8 +146,8 @@ def test_spark_saved_model(self): # test saved_model using custom/direct mapping model.setTagSet('test_tag') \ .setSignatureDefKey(None) \ - .setInputMapping({ 'c1': 'x'}) \ - .setOutputMapping({ 'y': 'cout1', 'y2': 'cout2' }) + .setInputMapping({'c1': 'x'}) \ + .setOutputMapping({'y': 'cout1', 'y2': 'cout2'}) preds = model.transform(testDF).head() # take first/only result pred = preds.cout1[0] # unpack pred scalar from tensor squared_pred = preds.cout2[0] # unpack squared pred from tensor @@ -160,14 +162,14 @@ def test_tf_column_filter(self): trainDF = self.spark.createDataFrame(self.train_examples, ['col1', 'col2']) # and add some extra columns - df = trainDF.withColumn('extra1', trainDF.col1 ) + df = trainDF.withColumn('extra1', trainDF.col1) df = df.withColumn('extra2', trainDF.col2) self.assertEquals(len(df.columns), 4) # train model args = {} estimator = TFEstimator(self.get_function('tf/train'), args, export_fn=self.get_function('tf/export')) \ - .setInputMapping( { 'col1': 'x', 'col2': 'y_' }) \ + .setInputMapping({'col1': 'x', 'col2': 'y_'}) \ .setInputMode(TFCluster.InputMode.TENSORFLOW) \ .setModelDir(self.model_dir) \ .setExportDir(self.export_dir) \ @@ -175,7 +177,7 @@ def test_tf_column_filter(self): .setClusterSize(self.num_workers) \ .setNumPS(1) \ .setBatchSize(10) - model = estimator.fit(df) + estimator.fit(df) self.assertTrue(os.path.isdir(self.model_dir)) self.assertTrue(os.path.isdir(self.tfrecord_dir)) @@ -191,7 +193,7 @@ def test_tf_checkpoint_with_export_fn(self): # train model args = {} estimator = TFEstimator(self.get_function('tf/train'), args, export_fn=self.get_function('tf/export')) \ - .setInputMapping( { 'col1': 'x', 'col2': 'y_' }) \ + .setInputMapping({'col1': 'x', 'col2': 'y_'}) \ .setInputMode(TFCluster.InputMode.TENSORFLOW) \ .setModelDir(self.model_dir) \ .setExportDir(self.export_dir) \ @@ -201,14 +203,15 @@ def test_tf_checkpoint_with_export_fn(self): .setBatchSize(10) model = estimator.fit(trainDF) self.assertTrue(os.path.isdir(self.model_dir)) + self.assertTrue(os.path.isdir(self.export_dir)) # create a Spark DataFrame of test examples (features, labels) testDF = self.spark.createDataFrame(self.test_examples, ['c1', 'c2']) # test model from checkpoint, referencing tensors directly model.setTagSet('test_tag') \ - .setInputMapping( { 'c1': 'x' }) \ - .setOutputMapping( { 'y': 'cout1', 'y2': 'cout2' }) + .setInputMapping({'c1': 'x'}) \ + .setOutputMapping({'y': 'cout1', 'y2': 'cout2'}) preds = model.transform(testDF).head() # take first/only result, e.g. [ Row(cout=[4.758000373840332])] pred1, pred2 = preds.cout1[0], preds.cout2[0] self.assertAlmostEqual(pred1, np.sum(self.weights), 5) @@ -233,7 +236,7 @@ def _spark_train(args, ctx): cluster=cluster)): x = tf.placeholder(tf.float32, [None, 2], name='x') y_ = tf.placeholder(tf.float32, [None, 1], name='y_') - w = tf.Variable(tf.truncated_normal([2,1]), name='w') + w = tf.Variable(tf.truncated_normal([2, 1]), name='w') y = tf.matmul(x, w, name='y') y2 = tf.square(y, name="y2") # extra/optional output for testing multiple output tensors cost = tf.reduce_mean(tf.square(y_ - y), name='cost') @@ -242,15 +245,15 @@ def _spark_train(args, ctx): saver = tf.train.Saver() sv = tf.train.Supervisor(is_chief=(ctx.task_index == 0), - init_op=init_op) + init_op=init_op) with sv.managed_session(server.target) as sess: tf_feed = TFNode.DataFeed(ctx.mgr, input_mapping=args.input_mapping) while not sv.should_stop() and not tf_feed.should_stop(): batch = tf_feed.next_batch(10) if args.input_mapping: if len(batch['x']) > 0: - feed = { x: batch['x'], y_: batch['y_'] } - opt = sess.run(optimizer, feed_dict=feed) + feed = {x: batch['x'], y_: batch['y_']} + sess.run(optimizer, feed_dict=feed) if sv.is_chief: if args.model_dir: @@ -262,8 +265,8 @@ def _spark_train(args, ctx): # export a saved_model signatures = { 'test_key': { - 'inputs': { 'features': x }, - 'outputs': { 'prediction': y }, + 'inputs': {'features': x}, + 'outputs': {'prediction': y}, 'method_name': 'test' } } @@ -281,9 +284,10 @@ def _tf_train(args, ctx): tf.reset_default_graph() # reset graph in case we're re-using a Spark python worker cluster, server = TFNode.start_cluster_server(ctx) + def _get_examples(batch_size): """Generate test data (mocking a queue_runner of file inputs)""" - features = tf.random_uniform([batch_size,2]) # (batch_size x 2) + features = tf.random_uniform([batch_size, 2]) # (batch_size x 2) weights = tf.constant([[3.14], [1.618]]) # (2, 1) labels = tf.matmul(features, weights) return features, labels @@ -295,7 +299,7 @@ def _get_examples(batch_size): worker_device="/job:worker/task:%d" % ctx.task_index, cluster=cluster)): x, y_ = _get_examples(10) # no input placeholders, TF code reads (or in this case "generates") input - w = tf.Variable(tf.truncated_normal([2,1]), name='w') + w = tf.Variable(tf.truncated_normal([2, 1]), name='w') y = tf.matmul(x, w, name='y') global_step = tf.Variable(0) @@ -306,7 +310,7 @@ def _get_examples(batch_size): saver = tf.train.Saver() sv = tf.train.Supervisor(is_chief=(ctx.task_index == 0), - init_op=init_op) + init_op=init_op) step = 0 with sv.managed_session(server.target) as sess: while not sv.should_stop() and step < args.steps: @@ -320,6 +324,9 @@ def _get_examples(batch_size): ckpt_name = args.model_dir + "/model.ckpt" print("Saving checkpoint to: {}".format(ckpt_name)) saver.save(sess, ckpt_name) + + # wait for rest of cluster to connect + time.sleep(30) sv.stop() def _tf_export(args): @@ -329,7 +336,7 @@ def _tf_export(args): tf.reset_default_graph() # reset graph in case we're re-using a Spark python worker x = tf.placeholder(tf.float32, [None, 2], name='x') - w = tf.Variable(tf.truncated_normal([2,1]), name='w') + w = tf.Variable(tf.truncated_normal([2, 1]), name='w') y = tf.matmul(x, w, name='y') y2 = tf.square(y, name="y2") # extra/optional output for testing multiple output tensors saver = tf.train.Saver() @@ -343,8 +350,8 @@ def _tf_export(args): # exported signatures defined in code signatures = { 'test_key': { - 'inputs': { 'features': x }, - 'outputs': { 'prediction': y, 'pred2': y2 }, + 'inputs': {'features': x}, + 'outputs': {'prediction': y, 'pred2': y2}, 'method_name': 'test' } } diff --git a/test/test_reservation.py b/test/test_reservation.py index 6b7f1858..05bda6fc 100644 --- a/test/test_reservation.py +++ b/test/test_reservation.py @@ -3,6 +3,7 @@ from tensorflowonspark.reservation import Reservations, Server, Client + class ReservationTest(unittest.TestCase): def test_reservation_class(self): """Test core reservation class, expecting 2 reservations""" @@ -10,12 +11,12 @@ def test_reservation_class(self): self.assertFalse(r.done()) # add first reservation - r.add({'node':1}) + r.add({'node': 1}) self.assertFalse(r.done()) self.assertEquals(r.remaining(), 1) # add second reservation - r.add({'node':2}) + r.add({'node': 2}) self.assertTrue(r.done()) self.assertEquals(r.remaining(), 0) @@ -30,7 +31,7 @@ def test_reservation_server(self): # add first reservation c = Client(addr) - resp = c.register({'node':1}) + resp = c.register({'node': 1}) self.assertEqual(resp, 'OK') # get list of reservations @@ -53,8 +54,8 @@ def test_reservation_server_multi(self): def reserve(num): c = Client(addr) - #time.sleep(random.randint(0,5)) # simulate varying start times - resp = c.register({'node':num}) + # time.sleep(random.randint(0,5)) # simulate varying start times + resp = c.register({'node': num}) self.assertEqual(resp, 'OK') c.await_reservations() c.close()