Skip to content

Conversion Guide

leewyang edited this page Aug 29, 2019 · 5 revisions

When converting a single-node TensorFlow application to a distributed TensorFlowOnSpark application, we recommend the following development process:

  1. Develop and test your TensorFlow application as a single-node application on small scale data. This will allow you to iterate faster and troubleshoot any TensorFlow-specific issues without introducing the complexities of Spark and distributed TensorFlow.
  2. Convert your single-node TensorFlow application into a distributed TensorFlow application. At this point, you will add TensorFlow-specific code to allow your application to work in a distributed manner. If you use the higher-level tf.keras or tf.estimator APIs, this will be much simpler than using the low-level APIs. You should still be able to test your code on a single machine by just running multiple processes/ports.
  3. Convert your distributed TensorFlow application to TensorFlowOnSpark. If your application is now "distributed", this conversion step should be fairly simple. If you have a single-node Spark Standalone installation, you can continue working on a single machine. Otherwise, use a small "cluster" of 1-3 executors while you sort out any Spark-specific issues (e.g. spark.executorEnv.LD_LIBRARY_PATH and spark.executorEnv.CLASSPATH). At this point, you will need to choose which InputMode to use. In general, you should prefer InputMode.TENSORFLOW, unless you need to feed data directly from a Spark RDD or DataFrame (without persisting to disk first).
  4. Gradually scale up your cluster. Once your application is working at a small scale, you can gradually increase the size of your cluster. You may need to adjust TensorFlow hyper-parameters as you scale out, e.g. batch sizes, learning rates, etc. Additionally, you may need to adjust various Spark settings, e.g. number of executors, memory limits, etc.

We have included several converted sample TensorFlow applications in this repository to help illustrate the conversion steps. For step 3, we'll highlight some of the main points below.

Replace the main() function with main_fun(argv, ctx)

The argv parameter will contain a full copy of the arguments supplied on the PySpark command line, while the ctx parameter will contain node metadata, like job_name and task_id. Also, make sure that the import tensorflow as tf occurs within this function, since this will be executed/imported on the executors. If there are any functions used by the main function, ensure that they are defined or imported inside the main_fun block. If you see any Spark pickle errors, try separating the Spark and Tensorflow code into separate files and then use --py-files to add the TensorFlow code to the executors.

# def main():
def main_fun(argv, ctx)
  import tensorflow as tf

Replace the tf.app.run method with code to launch the TensorFlowOnSpark cluster

tf.app.run() executes the TensorFlow main function. Replace it with the following code to set up PySpark and launch TensorFlow on the executors. Note that we're using argparse here mostly because the tf.app.FLAGS mechanism is currently not an officially supported TensorFlow API.

if __name__ == '__main__':
    # tf.app.run()
    from pyspark.context import SparkContext
    from pyspark.conf import SparkConf
    from tensorflowonspark import TFCluster
    import argparse
    
    sc = SparkContext(conf=SparkConf().setAppName("your_app_name"))
    executors = sc._conf.get("spark.executor.instances")
    num_executors = int(executors) if executors is not None else 1
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
    parser.add_argument("--num_ps", help="number of parameter servers", type=int, default=1)
    parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
    args = parser.parse_args()

    cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
    cluster.shutdown()

Note: if your application is using tf.app.FLAGS, you can use the following code instead:

def main_fun(argv, ctx):
    sys.argv = argv
    FLAGS = tf.app.flags.FLAGS
    ...

if __name__ == '__main__':
    ...
    args, rem = parser.parse_known_args()
    cluster = TFCluster.run(sc. main_fun, rem, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)

For tf.estimator applications, use tf.estimator.train_and_evaluate instead of separate train and evaluate calls

The train_and_evaluate API is a "utility function provides consistent behavior for both local (non-distributed) and distributed configurations". This is required for distributed estimators.

For tf.keras applications, use tf.keras.estimator.model_to_estimator with tf.estimator.train_and_evaluate or use a DistributionStrategy

The support for distributed training in the Keras API has been evolving over time, so there are multiple techniques. The model_to_estimator API is the oldest and most stable. It essentially converts the Keras model into an estimator and you can proceed from there. Alternatively, the DistributionStrategy API is the newest method and it will be the preferred method going forward, but it is still evolving.

For low-level API applications

Note: the TensorFlow team recommends using the high-level keras (and estimator) APIs going forward.

Replace tf.train.Server with TFNode.start_cluster_server

There should be code that:

  1. extracts the addresses for the ps and worker nodes from the command line args
  2. creates a cluster spec
  3. starts the TensorFlow server

These can all be replaced as follows.

    # ps_hosts = FLAGS.ps_hosts.split(',')
    # worker_hosts = FLAGS.worker_hosts.split(',')
    # cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})
    # server = tf.train.Server( {'ps': ps_hosts, 'worker': worker_hosts},
    #    job_name=FLAGS.job_name, task_index=FLAGS.task_id)
    cluster_spec, server = TFNode.start_cluster_server(ctx)

Add node specific code

  if job_name == "ps":
    server.join()
  elif job_name == "worker":
    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
      worker_device="/job:worker/task:%d" % task_index,
      cluster=cluster)):

Use tf.train.MonitoredTrainingSession

If your code uses tf.Session, you will need to use tf.train.MonitoredTrainingSession instead, e.g.

  with tf.train.MonitoredTrainingSession(master=server.target,
                                         is_chief=(task_index == 0),
                                         scaffold=tf.train.Scaffold(init_op=init_op, summary_op=summary_op, saver=saver),
                                         checkpoint_dir=model_dir,
                                         hooks=[tf.train.StopAtStepHook(last_step=args.steps)]) as sess:

Enable TensorBoard

Finally, if using TensorBoard, ensure that the summaries are saved to the local disk of the "chief" worker with the following naming convention. The tensorboard server on the "chief" worker will look in this specific directory, so do not change the path.

summary_writer = tf.summary.FileWriter("tensorboard_%d" %(ctx.worker_num), graph=tf.get_default_graph())