We will learn how to use TensorFlow with GPUs: the operation performed is a simple matrix multiplication either on CPU or on GPU.
The first step is to install a version of TensorFlow that supports GPUs. The official TensorFlow Installation Instruction is your starting point . Remember that you need to have an environment supporting GPUs either via CUDA or CuDNN.
We proceed with the recipe as follows:
import sys
import numpy as np import tensorflow as tf
from datetime import datetime
device_name = sys.argv[1] # Choose device from cmd line. Options: gpu or cpu
shape = (int(sys.argv[2]), int(sys.argv[2])) if device_name == "gpu":
device_name = "/gpu:0" else:
device_name = "/cpu:0"
with tf.device(device_name):
random_matrix = tf.random_uniform(shape=shape, minval=0, maxval=1) dot_operation = tf.matmul(random_matrix, tf.transpose(random_matrix)) sum_operation = tf.reduce_sum(dot_operation)
startTime = datetime.now()
with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as session:
result = session.run(sum_operation) print(result)
print("Shape:", shape, "Device:", device_name) print("Time taken:", datetime.now() - startTime)
This recipe explains how to assign TensorFlow computations either to CPUs or to GPUs. The code is pretty simple and it will be used as a basis for the next recipe.
We will show an example of data parallelism where data is split across multiple GPUs
This recipe is inspired by a good blog posting written by Neil Tenenholtz and available online: https://clindatsci.com/blog/2017/5/31/distributed-tensorflow
We proceed with the recipe as follows:
# single GPU (baseline) import tensorflow as tf
# place the initial data on the cpu with tf.device('/cpu:0'):
input_data = tf.Variable([[1., 2., 3.],
[4., 5., 6.],
[7., 8., 9.],
[10., 11., 12.]])
b = tf.Variable([[1.], [1.], [2.]])
# compute the result on the 0th gpu with tf.device('/gpu:0'):
output = tf.matmul(input_data, b)
# create a session and run with tf.Session() as sess:
sess.run(tf.global_variables_initializer()) print sess.run(output)
# in-graph replication import tensorflow as tf num_gpus = 2
# place the initial data on the cpu with tf.device('/cpu:0'):
input_data = tf.Variable([[1., 2., 3.],
[4., 5., 6.],
[7., 8., 9.],
[10., 11., 12.]])
b = tf.Variable([[1.], [1.], [2.]])
# split the data into chunks for each gpu inputs = tf.split(input_data, num_gpus) outputs = []
# loop over available gpus and pass input data for i in range(num_gpus):
with tf.device('/gpu:'+str(i)): outputs.append(tf.matmul(inputs[i], b))
# merge the results of the devices with tf.device('/cpu:0'):
output = tf.concat(outputs, axis=0)
# create a session and run with tf.Session() as sess:
sess.run(tf.global_variables_initializer()) print sess.run(output)
This is a very simple recipe where the graph is split in two parts by the CPU acting as master and distributed to two GPUs acting as distributed workers. The result of the computation is collected back to the CPU.
We will learn how to distribute a TensorFlow computation across multiple servers. The key assumption is that the code is same for both the workers and the parameter servers. Therefore the role of each computation node is passed by a command line argument.
Again, this recipe is inspired by a good blog posting written by Neil Tenenholtz and available online: https://clindatsci.com/blog/2017/5/31/distributed-tensorflow
We proceed with the recipe as follows:
import sys
import tensorflow as tf
# specify the cluster's architecture
cluster = tf.train.ClusterSpec({'ps': ['192.168.1.1:1111'], 'worker': ['192.168.1.2:1111',
'192.168.1.3:1111']
})
# parse command-line to specify machine
job_type = sys.argv[1] # job type: "worker" or "ps" task_idx = sys.argv[2] # index job in the worker or ps list
# as defined in the ClusterSpec
# create TensorFlow Server. This is how the machines communicate.
server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)
If the role is a parameter server, then the condition is to join the server. Note that in this case there is no code to execute because the workers will continuously push updates and the only thing that the Parameter Server has to do is waiting.
Otherwise the worker code is executed on a specific device within the cluster. This part of code is similar to the one executed on a single machine where we first build the model and then we train it locally. Note that all the distribution of the work and the collection of the updated results is done transparently by Tensoflow. Note that TensorFlow provides a convenient tf.train.replica_device_setter that automatically assigns operations to devices.
# parameter server is updated by remote clients.
# will not proceed beyond this if statement. if job_type == 'ps':
server.join() else:
# workers only
with tf.device(tf.train.replica_device_setter( worker_device='/job:worker/task:'+task_idx, cluster=cluster)):
# build your model here as if you only were using a single machine
with tf.Session(server.target):
# train your model here
We have seen how to create a cluster with multiple computation nodes. A node can be either playing the role of a Parameter server or playing the role of a worker.
In both cases the code executed is the same but the execution of the code is different according to parameters collected from the command line. The parameter server only needs to wait until the workers send updates. Note that tf.train.replica_device_setter(..) takes the role of automatically assigning operations to available devices, while tf.train.ClusterSpec(..) is used for cluster setup.
An example of distributed training for MNIST is available online. In addition, to that you can decide to have more than one parameter server for efficiency reasons. Using parameters the server can provide better network utilization, and it allows to scale models to more parallel machines. It is possible to allocate more than one parameter server. The interested reader can have a look in here.
In this we trained a full MNIST classifier in a distributed way. This recipe is inspired by the blog post in http://ischlag.github.io/2016/06/12/async-distributed- tensorflow/ and the code running on TensorFlow 1.2 is available here https://github. com/ischlag/distributed-tensorflow-example
This recipe is based on the previous one. So it might be convenient to read them in order.
We proceed with the recipe as follows:
import tensorflow as tf import sys
import time
# cluster specification parameter_servers = ["pc-01:2222"] workers = [ "pc-02:2222",
"pc-03:2222",
"pc-04:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")FLAGS = tf.app.flags.FLAGS
# start a server for a specific task server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
# config batch_size = 100
learning_rate = 0.0005
training_epochs = 20 logs_path = "/tmp/mnist/1"
# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
if FLAGS.job_name == "ps": server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
# count the number of updates
global_step = tf.get_variable( 'global_step', [], initializer = tf.constant_initializer(0),
trainable = False)
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
# model parameters will change during training so we use tf.Variable tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.Variable(tf.random_normal([784, 100])) W2 = tf.Variable(tf.random_normal([100, 10]))
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([100])) b2 = tf.Variable(tf.zeros([10]))
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x,W1),b1) a2 = tf.nn.sigmoid(z2)
z3 = tf.add(tf.matmul(a2,W2),b2) y = tf.nn.softmax(z3)
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
# specify optimizer
with tf.name_scope('train'):
# optimizer is an "operation" which we can execute in a session grad_op = tf.train.GradientDescentOptimizer(learning_rate)
train_op = grad_op.minimize(cross_entropy, global_step=global_step)
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# create a summary for our cost and accuracy tf.summary.scalar("cost", cross_entropy) tf.summary.scalar("accuracy", accuracy)
# merge all summaries into a single "operation" which we can execute in a session
summary_op = tf.summary.merge_all()
init_op = tf.global_variables_initializer() print("Variables initialized ...")
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target) as sess:
# create log writer object (this will log on every machine)
writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
# perform training cycles start_time = time.time()
for epoch in range(training_epochs):
# number of batches in one epoch
batch_count = int(mnist.train.num_examples/batch_size) count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# perform the operations we defined earlier on batch
_, cost, summary, step = sess.run(
[train_op, cross_entropy, summary_op, global_step], feed_dict={x: batch_x, y_: batch_y}) writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i+1 == batch_count: elapsed_time = time.time() - start_time start_time = time.time()
print("Step: %d," % (step+1),
" Epoch: %2d," % (epoch+1), " Batch: %3d of %3d," % (i+1, batch_count),
" Cost: %.4f," % cost,
"AvgTime:%3.2fms" % float(elapsed_time*1000/frequency)) count = 0
print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("Total Time: %3.2fs" % float(time.time() - begin_time)) print("Final Cost: %.4f" % cost)
sv.stop()
print("done")
This recipe describes an example of distributed MNIST classifier. In this example, TensorFlow allows us to define a cluster of three machines. One acts as parameter server and two more machines are used as workers working on separate batches of the training data.
If you enjoyed this excerpt, check out the book TensorFlow 1.x Deep Learning Cookbook, to become an expert in implementing deep learning techniques in real-world applications.
Emoji Scavenger Hunt showcases TensorFlow.js
The 5 biggest announcements from TensorFlow Developer Summit 2018
Setting up Logistic Regression model using TensorFlow