Reinforcement Learning tutorial

Posted November 3 by Rokas Balsys

##### Epsilon-Greedy in Deep Q learning

In previous tutorial I said, that in next turorial we'll try to implement Prioritized Experience Replay (PER) method, but before doing that I decided that we should cover Epsilon Greedy method and fix/prepare the source code for PER method. So this will be quite short tutorial.

The epsilon-greedy algorithm is very simple and occurs in several areas of machine learning. One common use of epsilon-greedy is in the so-called multi-armed bandit problem.

Let's take an example. Suppose we are standing in front of k = 3 slot machines. Each machine pays out according to a different probability distribution, and these distributions are unknown to you. And suppose you can play a total of 100 times.

We have two goals. The first goal is to experiment with a few coins to try and determine which machine pays out the best. The second, related, goal is to get as much money as possible. The terms “explore” and “exploit” are used to indicate that we have to use some coins to explore to find the best machine, and we want to use as many coins as possible on the best machine to exploit our knowledge.

Epsilon-greedy is almost too simple. As we play the machines, we keep track of the average payout of each machine. Then, we select the machine with the highest current average payout with probability = (1 – epsilon) + (epsilon / k) where epsilon is a small value like 0.10. And we select machines that don’t have the highest current payout average with probability = epsilon / k.

Its much easier to understand with a concrete example. Suppose, after our first 12 pulls, we played machine #1 four times and won 1 dollar two times and 0 dollar two times. The average for machine #1 is 2/4 = 0.50 dollars.

And suppose we’ve played machine #2 five times and won 1 dollar three times and 0 dollar two times. The average payout for machine #2 is 3/5 = 0.60 dollars.

And suppose we’ve played machine #3 three times and won 1 dollar one time and 0 dollar two times. The average payout for machine #3 is 1/3 = 0.33 dollars.

Now we have to select a machine to play on try number 13. We generate a random number p, between 0.0 and 1.0. Suppose we have set epsilon = 0.10. If p > 0.10 (which it will be 90% of the time), we select machine #2 because it has the current highest average payout. But if p < 0.10 (which it will be only 10% of the time), we select a random machine, so each machine has a 1/3 chance of being selected.

Notice that machine #2 might get picked anyway because we select randomly from all machines.

Over time, the best machine will be played more and more often because it will pay out more often. In short, epsilon-greedy means pick the current best option ("greedy") most of the time, but pick a random option with a small (epsilon) probability sometimes.

There are many other algorithms for the multi-armed bandit problem. But epsilon-greedy is incredibly simple, and often works as well as, or even better than, more sophisticated algorithms such as UCB ("upper confidence bound") variations.

The idea is that in the beginning, we’ll use the epsilon greedy strategy:

• We specify an exploration rate “epsilon,” which we set to 1 in the beginning. This is the rate of steps that we’ll do randomly. In the beginning, this rate must be at its highest value, because we don’t know anything about the values in Q-table. This means we need to do a lot of exploration, by randomly choosing our actions.
• We generate a random number. If this number > epsilon, then we will do “exploitation” (this means we use what we already know to select the best action at each step). Else, we’ll do exploration.
• The idea is that we must have a big epsilon at the beginning of the training of the Q-function. Then, reduce it progressively as the agent becomes more confident at estimating Q-values.

So, what we changed in code? Actually, it almost didn't changed. We inserted self.epsilot_greedy = True # use epsilon greedy strategy Boolean option if we want to use this method or not. Exploration hyperparameters for epsilon and epsilon greedy strategy will be quite the same:

self.epsilon = 1.0  # exploration probability at start
self.epsilon_min = 0.01  # minimum exploration probability
self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob


From our previous tutorial we have following "remember" function:

def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
if len(self.memory) > self.train_start:
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay


We change it to following:

def remember(self, state, action, reward, next_state, done):
experience = state, action, reward, next_state, done
self.memory.append((experience))


Before we were checking if our memory list is equal to self.train_start. From now on we'll do training from our first episode of the game, we won't fill memory with samples. So, we are removing all lines and we'll implement epsilon greedy function in different place.

Until now we were doing following "act" function:

def act(self, state):
if np.random.random() <= self.epsilon:
return random.randrange(self.action_size)
else:
return np.argmax(self.model.predict(state))


But now we'll implement another epsilon greedy function, where we could change our used epsilon method with Boolean. Here we'll use an improved version of our epsilon greedy strategy for Q-learning where we reduce epsilon progressively as the agent becomes more confident at estimating Q-values. The function is almost the same, we simply inserted explore_probability calculation into epsilon greedy strategy:

def act_e_greedy(self, state, decay_step):
# EPSILON GREEDY STRATEGY
if self.epsilot_greedy:
# Here we'll use an improved version of our epsilon greedy strategy for Q-learning
explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
# OLD EPSILON STRATEGY
else:
if self.epsilon > self.epsilon_min:
self.epsilon *= (1-self.epsilon_decay)
explore_probability = self.epsilon

if explore_probability > np.random.rand():
# Make a random action (exploration)
return random.randrange(self.action_size), explore_probability
else:
# Get action from Q-network (exploitation)
# Estimate the Qs values state
# Take the biggest Q value (= the best action)
return np.argmax(self.model.predict(state)), explore_probability


In all code I am removing following lines, to start training agent from first steps:

if len(self.memory) < self.train_start:
return


And finally, we are modifying our def run(self): function:

• Inserting decay_step = 0 variable
• Instead of old action = self.act(state) function we call it in following way: action, explore_probability = self.act_e_greedy(state, decay_step)
• We change our log print line, where before we were printing self.epsilon, now we'll print explore_probability in this place.

So, there weren’t a lot of changes, but we still need to do them, here is the full code:

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import random
import gym
import numpy as np
from collections import deque
from keras.layers import Input, Dense, Dropout, Lambda, Add
import pylab
from keras import backend as k

def OurModel(input_shape, action_space, dueling):
X_input = Input(input_shape)
X = X_input

# 'Dense' is the basic form of a neural network layer
# Input Layer of state size(4) and Hidden Layer with 512 nodes
X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X)

# Hidden layer with 256 nodes
X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)

# Hidden layer with 64 nodes
X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

if dueling:
state_value = Dense(1, kernel_initializer='he_uniform')(X)
state_value = Lambda(lambda s: k.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

else:
# Output Layer with # of actions: 2 nodes (left, right)
X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

model = Model(inputs = X_input, outputs = X, name='CartPole DDDQN model')
model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

model.summary()
return model

class DQNAgent:
def __init__(self):
self.env = gym.make('CartPole-v1')
# by default, CartPole-v1 has max episode steps = 500
# we can use this to experiment beyond 500
self.env._max_episode_steps = 4000
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
self.EPISODES = 500

# Instantiate memory
self.memory = deque(maxlen=2000)

self.gamma = 0.95    # discount rate

# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
self.epsilon = 1.0  # exploration probability at start
self.epsilon_min = 0.01  # minimum exploration probability
self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob

self.batch_size = 32
self.train_start = 1000

# defining model parameters
self.ddqn = True # use doudle deep q network
self.Soft_Update = False # use soft parameter update
self.dueling = True # use dealing netowrk
self.epsilot_greedy = True # use epsilon greedy strategy

self.TAU = 0.1 # target network soft update hyperparameter

self.Save_Path = 'Models'
self.Model_name = 'CartPole'
pylab.figure(figsize=(18, 9))
self.scores, self.episodes, self.average = [], [], []
self.i_records = 0

if self.ddqn:
print("----------Double DQN--------")
self.Model_name = self.Save_Path+"/"+"DDQN_"+self.Model_name+".h5"
else:
print("-------------DQN------------")
self.Model_name = self.Save_Path+"/"+"DQN_"+self.Model_name+".h5"

# create main model and target model
self.model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling)
self.target_model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling)

# after some time interval update the target model to be same with model
def update_target_model(self):
if not self.Soft_Update and self.ddqn:
self.target_model.set_weights(self.model.get_weights())
return
if self.Soft_Update and self.ddqn:
q_model_theta = self.model.get_weights()
target_model_theta = self.target_model.get_weights()
counter = 0
for q_weight, target_weight in zip(q_model_theta, target_model_theta):
target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU
target_model_theta[counter] = target_weight
counter += 1
self.target_model.set_weights(target_model_theta)

def remember(self, state, action, reward, next_state, done):
experience = state, action, reward, next_state, done
self.memory.append((experience))

def act_e_greedy(self, state, decay_step):
# EPSILON GREEDY STRATEGY
if self.epsilot_greedy:
# Here we'll use an improved version of our epsilon greedy strategy for Q-learning
explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
# OLD EPSILON STRATEGY
else:
if self.epsilon > self.epsilon_min:
self.epsilon *= (1-self.epsilon_decay)
explore_probability = self.epsilon

if explore_probability > np.random.rand():
# Make a random action (exploration)
return random.randrange(self.action_size), explore_probability
else:
# Get action from Q-network (exploitation)
# Estimate the Qs values state
# Take the biggest Q value (= the best action)
return np.argmax(self.model.predict(state)), explore_probability

def replay(self):
# Randomly sample minibatch from the deque memory
minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))

state = np.zeros((self.batch_size, self.state_size))
next_state = np.zeros((self.batch_size, self.state_size))
action, reward, done = [], [], []

# do this before prediction
# for speedup, this could be done on the tensor level
# but easier to understand using a loop
for i in range(len(minibatch)):
state[i] = minibatch[i][0]
action.append(minibatch[i][1])
reward.append(minibatch[i][2])
next_state[i] = minibatch[i][3]
done.append(minibatch[i][4])

# do batch prediction to save speed
# predict Q-values for starting state using the main network
target = self.model.predict(state)
target_old = np.array(target)
# predict best action in ending state using the main network
target_next = self.model.predict(next_state)
# predict Q-values for ending state using the target network
target_val = self.target_model.predict(next_state)

for i in range(len(minibatch)):
# correction on the Q value for the action used
if done[i]:
target[i][action[i]] = reward[i]
else:
# the key point of Double DQN
# selection of action is from model
# update is from target model
if self.ddqn: # Double - DQN
# current Q Network selects the action
# a'_max = argmax_a' Q(s', a')
a = np.argmax(target_next[i])
# target Q Network evaluates the action
# Q_max = Q_target(s', a'_max)
target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a])
else: # Standard - DQN
# DQN chooses the max Q value among next actions
# selection and evaluation of action is on the target Q Network
# Q_max = max_a' Q_target(s', a')
target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))

# Train the Neural Network with batches
self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

def save(self, name):
if self.ddqn:
self.model.save(name)
else:
self.model.save(name)

def PlotModel(self, score, episode):
self.scores.append(score)
self.episodes.append(episode)
self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
pylab.plot(self.episodes, self.average, 'r')
pylab.plot(self.episodes, self.scores, 'b')
dqn = 'DQN'
softupdate = ''
dueling = ''
PER = ''
if self.ddqn:
dqn = 'DDQN'
if self.Soft_Update:
softupdate = '_soft'
if self.dueling:
dueling = "_dueling"
try:
pylab.savefig(dqn+"_cartpole"+softupdate+dueling+PER+".png")
except OSError:
pass

return str(self.average[-1])[:5]

def TrackScores(self, i):
# maybe better would be tracking mean of scores
if self.i_records <= i:
self.i_records = i
print("model save", i)
if not os.path.exists(self.Save_Path):
os.makedirs(self.Save_Path)
#self.save(self.Model_name)

def run(self):
decay_step = 0
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
decay_step += 1
#self.env.render()
action, explore_probability = self.act_e_greedy(state, decay_step)
next_state, reward, done, _ = self.env.step(action)
next_state = np.reshape(next_state, [1, self.state_size])
if not done or i == self.env._max_episode_steps-1:
reward = reward
else:
reward = -100
self.remember(state, action, reward, next_state, done)

state = next_state
i += 1
if done:
# save best model
self.TrackScores(i)

# every step update target model
self.update_target_model()

# every episode, plot the result
average = self.PlotModel(i, e)

print("episode: {}/{}, score: {}, e: {:.3}, average: {}".format(e, self.EPISODES, i, explore_probability, average))

self.replay()

def test(self):
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
self.env.render()
action = np.argmax(self.model.predict(state))
next_state, reward, done, _ = self.env.step(action)
state = np.reshape(next_state, [1, self.state_size])
i += 1
if done:
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
break

if __name__ == "__main__":
agent = DQNAgent()
agent.run()
#agent.test()


##### Conclusion:

I won't do a comparison how our agent performs with epsilon greedy and without greedy, because before we were using very similar strategy to reduce epsilon, so there won't be significant difference. In this short tutorial we simply prepared our code for coming tutorial, where we'll integrate Priority Experience Replay method to our agent, with this method our agent should improve significantly, so keep watching!