Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon

Intelligent mobile projects with TensorFlow: Build your first Reinforcement Learning model on Raspberry Pi [Tutorial]

Save for later
  • 13 min read
  • 05 Sep 2018

article-image

OpenAI Gym (https://gym.openai.com) is an open source Python toolkit that offers many simulated environments to help you develop, compare, and train reinforcement learning algorithms, so you don't have to buy all the sensors and train your robot in the real environment, which can be costly in both time and money.

In this article, we'll show you how to develop and train a reinforcement learning model on Raspberry Pi using TensorFlow in an OpenAI Gym's simulated environment called CartPole (https://gym.openai.com/envs/CartPole-v0).

This tutorial is an excerpt from a book written by Jeff Tang titled Intelligent Mobile Projects with TensorFlow.


To install OpenAI Gym, run the following commands:

git clone https://github.com/openai/gym.git
cd gym
sudo pip install -e .


You can verify that you have TensorFlow 1.6 and gym installed by running pip list:

pi@raspberrypi:~ $ pip list
gym (0.10.4, /home/pi/gym)
tensorflow (1.6.0)


Or you can start IPython then import TensorFlow and gym:

pi@raspberrypi:~ $ ipython
Python 2.7.9 (default, Sep 17 2016, 20:26:04) 
IPython 5.5.0 -- An enhanced Interactive Python.

In [1]: import tensorflow as tf

In [2]: import gym

In [3]: tf.__version__
Out[3]: '1.6.0'

In [4]: gym.__version__
Out[4]: '0.10.4'


We're now all set to use TensorFlow and gym to build some interesting reinforcement learning model running on Raspberry Pi.

Understanding the CartPole simulated environment


CartPole is an environment that can be used to train a robot to stay in balance. In the CartPole environment, a pole is attached to a cart, which moves horizontally along a track. You can take an action of 1 (accelerating right) or 0 (accelerating left) to the cart. The pole starts upright, and the goal is to prevent it from falling over. A reward of 1 is provided for every time step that the pole remains upright. An episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.

Let's play with the CartPole environment now. First, create a new environment and find out the possible actions an agent can take in the environment:

env = gym.make("CartPole-v0")
env.action_space
# Discrete(2)
env.action_space.sample()
# 0 or 1


Every observation (state) consists of four values about the cart: its horizontal position, its velocity, its pole's angle, and its angular velocity:

obs=env.reset()
obs
# array([ 0.04052535, 0.00829587, -0.03525301, -0.00400378])


Each step (action) in the environment will result in a new observation, a reward of the action, whether the episode is done (if it is then you can't take any further steps), and some additional information:

obs, reward, done, info = env.step(1)
obs
# array([ 0.04069127, 0.2039052 , -0.03533309, -0.30759772])


Remember action (or step) 1 means moving right, and 0 left. To see how long an episode can last when you keep moving the cart right, run:

while not done:
    obs, reward, done, info = env.step(1)
    print(obs)

#[ 0.08048328 0.98696604 -0.09655727 -1.54009127]
#[ 0.1002226 1.18310769 -0.12735909 -1.86127705]
#[ 0.12388476 1.37937549 -0.16458463 -2.19063676]
#[ 0.15147227 1.5756628 -0.20839737 -2.52925864]
#[ 0.18298552 1.77178219 -0.25898254 -2.87789912]


Let's now manually go through a series of actions from start to end and print out the observation's first value (the horizontal position) and third value (the pole's angle in degrees from vertical) as they're the two values that determine whether an episode is done.

First, reset the environment and accelerate the cart right a few times:

import numpy as np
obs=env.reset()
obs[0], obs[2]*360/np.pi
# (0.008710582898326602, 1.4858315848689436)

obs, reward, done, info = env.step(1)
obs[0], obs[2]*360/np.pi
# (0.009525842685697472, 1.5936049816642313)

obs, reward, done, info = env.step(1)
obs[0], obs[2]*360/np.pi
# (0.014239775393474322, 1.040038643681757)

obs, reward, done, info = env.step(1)
obs[0], obs[2]*360/np.pi
# (0.0228521194217381, -0.17418034908781568)


You can see that the cart's position value gets bigger and bigger as it's moved right, the pole's vertical degree gets smaller and smaller, and the last step shows a negative degree, meaning the pole is going to the left side of the center. All this makes sense, with just a little vivid picture in your mind of your favorite dog pushing a cart with a pole. Now change the action to accelerate the cart left (0) a few times:

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.03536432554326476, -2.0525933052704954)
obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.04397450935915654, -3.261322987287562)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.04868738508385764, -3.812330822419413)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.04950617929263011, -3.7134404042580687)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.04643238384389254, -2.968245724428785)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.039465670006712444, -1.5760901885345346)


You may be surprised at first to see the 0 action causes the positions (obs[0]) to continue to get bigger for several times, but remember that the cart is moving at a velocity and one or several actions of moving the cart to the other direction won't decrease the position value immediately. But if you keep moving the cart to the left, you'll see that the cart's position starts becoming smaller (toward the left). Now continue the 0 action and you'll see the position gets smaller and smaller, with a negative value meaning the cart enters the left side of the center, while the pole's angle gets bigger and bigger:

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.028603948219811447, 0.46789197320636305)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (0.013843572459953138, 3.1726728882727504)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (-0.00482029774222077, 6.551160678086707)

obs, reward, done, info = env.step(0)
obs[0], obs[2]*360/np.pi
# (-0.02739315127299434, 10.619948631208114)


For the CartPole environment, the reward value returned in each step call is always 1, and the info is always {}.  So that's all there's to know about the CartPole simulated environment. Now that we understand how CartPole works, let's see what kinds of policies we can come up with so at each state (observation), we can let the policy tell us which action (step) to take in order to keep the pole upright for as long as possible, in other words, to maximize our rewards.

Using neural networks to build a better policy

Let's first see how to build a random policy using a simple fully connected (dense) neural network, which takes 4 values in an observation as input, uses a hidden layer of 4 neurons, and outputs the probability of the 0 action, based on which, the agent can sample the next action between 0 and 1:


To follow along you can download the code files from the book's GitHub repository.

# nn_random_policy.py
import tensorflow as tf
import numpy as np
import gym

env = gym.make("CartPole-v0")

num_inputs = env.observation_space.shape[0]
inputs = tf.placeholder(tf.float32, shape=[None, num_inputs])
hidden = tf.layers.dense(inputs, 4, activation=tf.nn.relu)
outputs = tf.layers.dense(hidden, 1, activation=tf.nn.sigmoid)
action = tf.multinomial(tf.log(tf.concat([outputs, 1-outputs], 1)), 1)

with tf.Session() as sess:
sess.run(tf.global_variables_initializer())

total_rewards = []
for _ in range(1000):
rewards = 0
obs = env.reset()
while True:
a = sess.run(action, feed_dict={inputs: obs.reshape(1, num_inputs)})
obs, reward, done, info = env.step(a[0][0]) 
rewards += reward
if done:
break
total_rewards.append(rewards)

print(np.mean(total_rewards))


Note that we use the tf.multinomial function to sample an action based on the probability distribution of action 0 and 1, defined as outputs and 1-outputs, respectively (the sum of the two probabilities is 1). The mean of the total rewards will be around 20-something. This is a neural network that is generating a random policy, with no training at all.

To train the network, we use tf.nn.sigmoid_cross_entropy_with_logits to define the loss function between the network output and the desired y_target action, defined using the basic simple policy in the previous subsection, so we expect this neural network policy to achieve about the same rewards as the basic non-neural-network policy:

# nn_simple_policy.py
import tensorflow as tf
import numpy as np
import gym

env = gym.make("CartPole-v0")

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
num_inputs = env.observation_space.shape[0]
inputs = tf.placeholder(tf.float32, shape=[None, num_inputs])
y = tf.placeholder(tf.float32, shape=[None, 1])
hidden = tf.layers.dense(inputs, 4, activation=tf.nn.relu)
logits = tf.layers.dense(hidden, 1)
outputs = tf.nn.sigmoid(logits)
action = tf.multinomial(tf.log(tf.concat([outputs, 1-outputs], 1)), 1)

cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(0.01)
training_op = optimizer.minimize(cross_entropy)

with tf.Session() as sess:
sess.run(tf.global_variables_initializer())

for _ in range(1000):
obs = env.reset()

while True:
y_target = np.array([[1. if obs[2] < 0 else 0.]])
a, _ = sess.run([action, training_op], feed_dict={inputs: obs.reshape(1, num_inputs), y: y_target})
obs, reward, done, info = env.step(a[0][0])
if done:
break
print("training done")


We define outputs as a sigmoid function of the logits net output, that is, the probability of action 0, and then use the tf.multinomial to sample an action. Note that we use the standard tf.train.AdamOptimizer and its minimize method to train the network. To test and see how good the policy is, run the following code:

  total_rewards = []
  for _ in range(1000):
    rewards = 0
    obs = env.reset()

while True:
y_target = np.array([1. if obs[2] < 0 else 0.])
a = sess.run(action, feed_dict={inputs: obs.reshape(1, num_inputs)})
obs, reward, done, info = env.step(a[0][0])
rewards += reward
if done:
break
total_rewards.append(rewards)

print(np.mean(total_rewards))


We're now all set to explore how we can implement a policy gradient method on top of this to make our neural network perform much better, getting rewards several times larger.

The basic idea of a policy gradient is that in order to train a neural network to generate a better policy, when all an agent knows from the environment is the rewards it can get when taking an action from any given state, we can adopt two new mechanisms:

  • Discounted rewards: Each action's value needs to consider its future action rewards. For example, an action that gets an immediate reward, 1, but ends the episode two actions (steps) later should have fewer long-term rewards than an action that gets an immediate reward, 1, but ends the episode 10 steps later.
  • Test run the current policy and see which actions lead to higher discounted rewards, then update the current policy's gradients (of the loss for weights) with the discounted rewards, in a way that an action with higher discounted rewards will, after the network update, have a higher probability of being chosen next time. Repeat such test runs and update the process many times to train a neural network for a better policy.

Implementing a policy gradient in TensorFlow


Let's now see how to implement a policy gradient for our CartPole problem in TensorFlow.

First, import tensorflow, numpy, and gym, and define a helper method that calculates the normalized and discounted rewards:

import tensorflow as tf
import numpy as np
import gym

def normalized_discounted_rewards(rewards):
dr = np.zeros(len(rewards))
dr[-1] = rewards[-1]
for n in range(2, len(rewards)+1):
dr[-n] = rewards[-n] + dr[-n+1] * discount_rate
return (dr - dr.mean()) / dr.std()


Next, create the CartPole gym environment, define the learning_rate and discount_rate hyper-parameters, and build the network with four input neurons, four hidden neurons, and one output neuron as before:

env = gym.make("CartPole-v0")

learning_rate = 0.05
discount_rate = 0.95

num_inputs = env.observation_space.shape[0]
inputs = tf.placeholder(tf.float32, shape=[None, num_inputs])
hidden = tf.layers.dense(inputs, 4, activation=tf.nn.relu) 
logits = tf.layers.dense(hidden, 1)
outputs = tf.nn.sigmoid(logits) 
action = tf.multinomial(tf.log(tf.concat([outputs, 1-outputs], 1)), 1)

prob_action_0 = tf.to_float(1-action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=prob_action_0)
optimizer = tf.train.AdamOptimizer(learning_rate)


To manually fine-tune the gradients to take into consideration the discounted rewards for each action we first use the compute_gradients method, then update the gradients the way we want, and finally call the apply_gradients method.

So let's now  compute the gradients of the cross-entropy loss for the network parameters (weights and biases), and set up gradient placeholders, which are to be fed later with the values that consider both the computed gradients and the discounted rewards of the actions taken using the current policy during test run:

gvs = optimizer.compute_gradients(cross_entropy)
gvs = [(g, v) for g, v in gvs if g != None]
gs = [g for g, _ in gvs]

gps = []
gvs_feed = []
for g, v in gvs:
gp = tf.placeholder(tf.float32, shape=g.get_shape())
gps.append(gp)
gvs_feed.append((gp, v))
training_op = optimizer.apply_gradients(gvs_feed)


The  gvs returned from optimizer.compute_gradients(cross_entropy) is a list of tuples, and each tuple consists of the gradient (of the cross_entropy for a trainable variable) and the trainable variable.

If you run the script multiple times from IPython, the default graph of the tf object will contain trainable variables from previous runs, so unless you call tf.reset_default_graph(), you need to use gvs = [(g, v) for g, v in gvs if g != None] to remove those obsolete training variables, which would return None gradients.


Now, play some games and save the rewards and gradient values:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

for _ in range(1000):
rewards, grads = [], []
obs = env.reset()
# using current policy to test play a game
while True:
a, gs_val = sess.run([action, gs], feed_dict={inputs: 
obs.reshape(1, num_inputs)})
obs, reward, done, info = env.step(a[0][0])
rewards.append(reward)
grads.append(gs_val)
if done:
break


After the test play of a game, update the gradients with discounted rewards and train the network (remember that training_op is defined as optimizer.apply_gradients(gvs_feed)):

        # update gradients and do the training 
        nd_rewards = normalized_discounted_rewards(rewards)
        gp_val = {}
        for i, gp in enumerate(gps):
            gp_val[gp] = np.mean([grads[k][i] * reward for k, reward in 
                             enumerate(nd_rewards)], axis=0)
        sess.run(training_op, feed_dict=gp_val)


Finally, after 1,000 iterations of test play and updates, we can test the trained model:

total_rewards = []
for _ in range(100):
rewards = 0
obs = env.reset()

while True:
a = sess.run(action, feed_dict={inputs: obs.reshape(1, 
num_inputs)})
obs, reward, done, info = env.step(a[0][0])
rewards += reward
if done:
break
total_rewards.append(rewards)

print(np.mean(total_rewards))


Note that we now use the trained policy network and sess.run to get the next action with the current observation as input. The output mean of the total rewards will be about 200.

You can also save a trained model after the training using tf.train.Saver:

  saver = tf.train.Saver()
  saver.save(sess, "./nnpg.ckpt")


Then you can reload it in a separate test program with:

with tf.Session() as sess:
  saver.restore(sess, "./nnpg.ckpt")


Now that you have a powerful neural-network-based policy model that can help your robot keep in balance, fully tested in a simulated environment, you can deploy it in a real physical environment, after replacing the simulated environment API returns with real environment data, of course—but the code to build and train the neural network reinforcement learning model can certainly be easily reused.

If you liked this tutorial and would like to learn more such techniques, pick up this book, Intelligent Mobile Projects with TensorFlow, authored by Jeff Tang.

AI on mobile: How AI is taking over the mobile devices marketspace

Introducing Intelligent Apps

AI and the Raspberry Pi: Machine Learning and IoT, What’s the Impact?