Skip to content


added the mnist example
Browse files Browse the repository at this point in the history
  • Loading branch information
malitsky authored and malitsky committed Feb 11, 2018
1 parent da52022 commit 8a0ebcb
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 0 deletions.
12 changes: 12 additions & 0 deletions examples/mnist/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

Running Spark-MPI with Jupyter

export HYDRA_PROXY_PORT=55555

export PYSPARK_DRIVER_PYTHON='jupyter'
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=7777'

pyspark --master local[*]

pkill -9 "hydra_pmi_proxy"
64 changes: 64 additions & 0 deletions examples/mnist/
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# It is defined after Horovod's example:

import tensorflow as tf
import horovod.tensorflow as hvd

layers = tf.contrib.layers

def make_model(feature, target, mode):
"""2-layer convolution model."""
# Convert the target to a one-hot tensor of shape (batch_size, 10) and
# with a on-value of 1 for each one-hot vector of length 10.
target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)

# Reshape feature to 4d tensor with 2nd and 3rd dimensions being
# image width and height final dimension being the number of color channels.
feature = tf.reshape(feature, [-1, 28, 28, 1])

# First conv layer will compute 32 features for each 5x5 patch
with tf.variable_scope('conv_layer1'):
h_conv1 = layers.conv2d(
feature, 32, kernel_size=[5, 5], activation_fn=tf.nn.relu)
h_pool1 = tf.nn.max_pool(
h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

# Second conv layer will compute 64 features for each 5x5 patch.
with tf.variable_scope('conv_layer2'):
h_conv2 = layers.conv2d(
h_pool1, 64, kernel_size=[5, 5], activation_fn=tf.nn.relu)
h_pool2 = tf.nn.max_pool(
h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# reshape tensor into a batch of vectors
h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])

# Densely connected layer with 1024 neurons.
h_fc1 = layers.dropout(
h_pool2_flat, 1024, activation_fn=tf.nn.relu),
is_training=mode == tf.contrib.learn.ModeKeys.TRAIN)

# Compute logits (1 per class) and compute loss.
logits = layers.fully_connected(h_fc1, 10, activation_fn=None)
loss = tf.losses.softmax_cross_entropy(target, logits)

return tf.argmax(logits, 1), loss

def make_hooks(hvd_size, global_step, loss):

hooks = [
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.

# Horovod: adjust number of steps based on number of GPUs.
tf.train.StopAtStepHook(last_step=20000 // hvd.size()),

tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},

return hooks
250 changes: 250 additions & 0 deletions examples/mnist/spark_horovod.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# The MNIST Application"
"cell_type": "markdown",
"metadata": {},
"source": [
"This example demostrates integration of the Horovod MPI-based distributed deep learning framework and the Spark platform within the context of the MNIST application. "
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"import os\n",
"import time\n",
"from datetime import timedelta, datetime, tzinfo"
"cell_type": "markdown",
"metadata": {
"collapsed": true
"source": [
"## Initialize the Spark RDD collection associated with MPI workers"
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"spark-horovod-mnist\").getOrCreate()\n",
"partitions = 2\n",
"rdd = spark.sparkContext.parallelize(range(partitions), partitions)"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Download the MNIST dataset"
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
"outputs": [
"data": {
"text/plain": [
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
"source": [
"def read_data_sets(pid, it):\n",
" \n",
" import tensorflow as tf\n",
" \n",
" learn = tf.contrib.learn\n",
" learn.datasets.mnist.read_data_sets('MNIST-data-%d' % pid)\n",
" \n",
" yield pid\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start the PMI server"
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"if os.system(\"/opt/spark-mpi/bin/pmiserv -n \" + str(partitions) + \" hello &\") != 0:\n",
" print (\"pmiserv: ERROR\")"
"cell_type": "markdown",
"metadata": {
"collapsed": true
"source": [
"## Train the Horovod MPI-based distributed engine on the Spark workers"
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false,
"scrolled": true
"outputs": [
"data": {
"text/plain": [
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
"source": [
"# The train method is defined after horovod'd example\n",
"def train(pid, it):\n",
" \n",
" import os \n",
" os.environ['CUDA_VISIBLE_DEVICES'] = '-1'\n",
" \n",
" import tensorflow as tf\n",
" import horovod.tensorflow as hvd\n",
" import mnist_factory\n",
" \n",
" # define the MPI enviromental variables \n",
" os.environ[\"PMI_PORT\"] = os.uname()[1] + \":\" + os.getenv(\"HYDRA_PROXY_PORT\")\n",
" os.environ[\"PMI_ID\"] = str(pid)\n",
" \n",
" # initialize Horovod \n",
" hvd.init()\n",
" \n",
" # Extract the MNIST dataset\n",
" learn = tf.contrib.learn\n",
" mnist = learn.datasets.mnist.read_data_sets('MNIST-data-%d' % hvd.rank())\n",
" \n",
" # Build model...\n",
" with tf.name_scope('input'):\n",
" image = tf.placeholder(tf.float32, [None, 784], name='image')\n",
" label = tf.placeholder(tf.float32, [None], name='label')\n",
" predict, loss = mnist_factory.make_model(image, label, tf.contrib.learn.ModeKeys.TRAIN)\n",
" \n",
" global_step = tf.contrib.framework.get_or_create_global_step()\n",
" \n",
" # Horovod: add Horovod Distributed Optimizer.\n",
" opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())\n",
" opt = hvd.DistributedOptimizer(opt)\n",
" train_op = opt.minimize(loss, global_step=global_step)\n",
" \n",
" # Create hooks\n",
" hooks = mnist_factory.make_hooks(hvd.size(), global_step, loss)\n",
" \n",
" # Horovod: save checkpoints only on worker 0 \n",
" checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None\n",
" \n",
" # The MonitoredTrainingSession takes care of session initialization,\n",
" # restoring from a checkpoint, saving to a checkpoint, and closing when done\n",
" # or an error occurs.\n",
" with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,\n",
" hooks=hooks) as mon_sess:\n",
" while not mon_sess.should_stop():\n",
" # Run a training step synchronously.\n",
" image_, label_ = mnist.train.next_batch(100)\n",
", feed_dict={image: image_, label: label_})\n",
" \n",
" yield pid\n",
" \n",
"cell_type": "markdown",
"metadata": {
"collapsed": true
"source": [
"## Stop the PMI server"
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"if os.system(\"pkill -9 \\\"hydra_pmi_proxy\\\" &\") != 0:\n",
" print (\"pkill: ERROR\")"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": []
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
"nbformat": 4,
"nbformat_minor": 2

0 comments on commit 8a0ebcb

Please sign in to comment.