From 8a0ebcb94acd6e0bb116a2b7a00bfeb9d51b00a8 Mon Sep 17 00:00:00 2001 From: malitsky Date: Sun, 11 Feb 2018 11:12:35 -0500 Subject: [PATCH] added the mnist example --- examples/mnist/README.txt | 12 ++ examples/mnist/mnist_factory.py | 64 ++++++++ examples/mnist/spark_horovod.ipynb | 250 +++++++++++++++++++++++++++++ 3 files changed, 326 insertions(+) create mode 100644 examples/mnist/README.txt create mode 100644 examples/mnist/mnist_factory.py create mode 100644 examples/mnist/spark_horovod.ipynb diff --git a/examples/mnist/README.txt b/examples/mnist/README.txt new file mode 100644 index 0000000..5c999ef --- /dev/null +++ b/examples/mnist/README.txt @@ -0,0 +1,12 @@ + +Running Spark-MPI with Jupyter +------------------------------- + +export HYDRA_PROXY_PORT=55555 + +export PYSPARK_DRIVER_PYTHON='jupyter' +export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=7777' + +pyspark --master local[*] + +pkill -9 "hydra_pmi_proxy" diff --git a/examples/mnist/mnist_factory.py b/examples/mnist/mnist_factory.py new file mode 100644 index 0000000..e7266b9 --- /dev/null +++ b/examples/mnist/mnist_factory.py @@ -0,0 +1,64 @@ +# It is defined after Horovod's example: +# https://github.com/uber/horovod/blob/master/examples/tensorflow_mnist.py + +import tensorflow as tf +import horovod.tensorflow as hvd + +layers = tf.contrib.layers + +def make_model(feature, target, mode): + """2-layer convolution model.""" + # Convert the target to a one-hot tensor of shape (batch_size, 10) and + # with a on-value of 1 for each one-hot vector of length 10. + target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0) + + # Reshape feature to 4d tensor with 2nd and 3rd dimensions being + # image width and height final dimension being the number of color channels. + feature = tf.reshape(feature, [-1, 28, 28, 1]) + + # First conv layer will compute 32 features for each 5x5 patch + with tf.variable_scope('conv_layer1'): + h_conv1 = layers.conv2d( + feature, 32, kernel_size=[5, 5], activation_fn=tf.nn.relu) + h_pool1 = tf.nn.max_pool( + h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') + + # Second conv layer will compute 64 features for each 5x5 patch. + with tf.variable_scope('conv_layer2'): + h_conv2 = layers.conv2d( + h_pool1, 64, kernel_size=[5, 5], activation_fn=tf.nn.relu) + h_pool2 = tf.nn.max_pool( + h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') + # reshape tensor into a batch of vectors + h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64]) + + # Densely connected layer with 1024 neurons. + h_fc1 = layers.dropout( + layers.fully_connected( + h_pool2_flat, 1024, activation_fn=tf.nn.relu), + keep_prob=0.5, + is_training=mode == tf.contrib.learn.ModeKeys.TRAIN) + + # Compute logits (1 per class) and compute loss. + logits = layers.fully_connected(h_fc1, 10, activation_fn=None) + loss = tf.losses.softmax_cross_entropy(target, logits) + + return tf.argmax(logits, 1), loss + +def make_hooks(hvd_size, global_step, loss): + + hooks = [ + # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states + # from rank 0 to all other processes. This is necessary to ensure consistent + # initialization of all workers when training is started with random weights + # or restored from a checkpoint. + hvd.BroadcastGlobalVariablesHook(0), + + # Horovod: adjust number of steps based on number of GPUs. + tf.train.StopAtStepHook(last_step=20000 // hvd.size()), + + tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss}, + every_n_iter=10), + ] + + return hooks diff --git a/examples/mnist/spark_horovod.ipynb b/examples/mnist/spark_horovod.ipynb new file mode 100644 index 0000000..b96ab14 --- /dev/null +++ b/examples/mnist/spark_horovod.ipynb @@ -0,0 +1,250 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# The MNIST Application" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example demostrates integration of the Horovod MPI-based distributed deep learning framework and the Spark platform within the context of the MNIST application. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "import os\n", + "import time\n", + "from datetime import timedelta, datetime, tzinfo" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": true + }, + "source": [ + "## Initialize the Spark RDD collection associated with MPI workers" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"spark-horovod-mnist\").getOrCreate()\n", + "\n", + "partitions = 2\n", + "rdd = spark.sparkContext.parallelize(range(partitions), partitions)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Download the MNIST dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "def read_data_sets(pid, it):\n", + " \n", + " import tensorflow as tf\n", + " \n", + " learn = tf.contrib.learn\n", + " learn.datasets.mnist.read_data_sets('MNIST-data-%d' % pid)\n", + " \n", + " yield pid\n", + "\n", + "rdd.mapPartitionsWithIndex(read_data_sets).sum()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Start the PMI server" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "if os.system(\"/opt/spark-mpi/bin/pmiserv -n \" + str(partitions) + \" hello &\") != 0:\n", + " print (\"pmiserv: ERROR\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": true + }, + "source": [ + "## Train the Horovod MPI-based distributed engine on the Spark workers" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# The train method is defined after horovod'd example\n", + "# https://github.com/uber/horovod/blob/master/examples/tensorflow_mnist.py\n", + "\n", + "def train(pid, it):\n", + " \n", + " import os \n", + " os.environ['CUDA_VISIBLE_DEVICES'] = '-1'\n", + " \n", + " import tensorflow as tf\n", + " import horovod.tensorflow as hvd\n", + " import mnist_factory\n", + " \n", + " # define the MPI enviromental variables \n", + " os.environ[\"PMI_PORT\"] = os.uname()[1] + \":\" + os.getenv(\"HYDRA_PROXY_PORT\")\n", + " os.environ[\"PMI_ID\"] = str(pid)\n", + " \n", + " # initialize Horovod \n", + " hvd.init()\n", + " \n", + " # Extract the MNIST dataset\n", + " learn = tf.contrib.learn\n", + " mnist = learn.datasets.mnist.read_data_sets('MNIST-data-%d' % hvd.rank())\n", + " \n", + " # Build model...\n", + " with tf.name_scope('input'):\n", + " image = tf.placeholder(tf.float32, [None, 784], name='image')\n", + " label = tf.placeholder(tf.float32, [None], name='label')\n", + " predict, loss = mnist_factory.make_model(image, label, tf.contrib.learn.ModeKeys.TRAIN)\n", + " \n", + " global_step = tf.contrib.framework.get_or_create_global_step()\n", + " \n", + " # Horovod: add Horovod Distributed Optimizer.\n", + " opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())\n", + " opt = hvd.DistributedOptimizer(opt)\n", + " train_op = opt.minimize(loss, global_step=global_step)\n", + " \n", + " # Create hooks\n", + " hooks = mnist_factory.make_hooks(hvd.size(), global_step, loss)\n", + " \n", + " # Horovod: save checkpoints only on worker 0 \n", + " checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None\n", + " \n", + " # The MonitoredTrainingSession takes care of session initialization,\n", + " # restoring from a checkpoint, saving to a checkpoint, and closing when done\n", + " # or an error occurs.\n", + " with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,\n", + " hooks=hooks) as mon_sess:\n", + " while not mon_sess.should_stop():\n", + " # Run a training step synchronously.\n", + " image_, label_ = mnist.train.next_batch(100)\n", + " mon_sess.run(train_op, feed_dict={image: image_, label: label_})\n", + " \n", + " yield pid\n", + " \n", + "rdd.mapPartitionsWithIndex(train).sum()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": true + }, + "source": [ + "## Stop the PMI server" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "if os.system(\"pkill -9 \\\"hydra_pmi_proxy\\\" &\") != 0:\n", + " print (\"pkill: ERROR\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "anaconda-cloud": {}, + "kernelspec": { + "display_name": "Python [default]", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.5.2" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}