Reinforcement Learning tutorial

Posted November 3 by Rokas Balsys

##### Epsilon-Greedy in Deep Q learning

In previous tutorial I said, that in next turorial we'll try to implement Prioritized Experience Replay (PER) method, but before doing that I decided that we should cover Epsilon Greedy method and fix/prepare the source code for PER method. So this will be quite short tutorial.

The epsilon-greedy algorithm is very simple and occurs in several areas of machine learning. One common use of epsilon-greedy is in the so-called multi-armed bandit problem.

Let's take an example. Suppose we are standing in front of k = 3 slot machines. Each machine pays out according to a different probability distribution, and these distributions are unknown to you. And suppose you can play a total of 100 times.

We have two goals. The first goal is to experiment with a few coins to try and determine which machine pays out the best. The second, related, goal is to get as much money as possible. The terms “explore” and “exploit” are used to indicate that we have to use some coins to explore to find the best machine, and we want to use as many coins as possible on the best machine to exploit our knowledge.

Epsilon-greedy is almost too simple. As we play the machines, we keep track of the average payout of each machine. Then, we select the machine with the highest current average payout with probability = (1 – epsilon) + (epsilon / k) where epsilon is a small value like 0.10. And we select machines that don’t have the highest current payout average with probability = epsilon / k.

Its much easier to understand with a concrete example. Suppose, after our first 12 pulls, we played machine #1 four times and won 1 dollar two times and 0 dollar two times. The average for machine #1 is 2/4 = 0.50 dollars.

And suppose we’ve played machine #2 five times and won 1 dollar three times and 0 dollar two times. The average payout for machine #2 is 3/5 = 0.60 dollars.

And suppose we’ve played machine #3 three times and won 1 dollar one time and 0 dollar two times. The average payout for machine #3 is 1/3 = 0.33 dollars.

Now we have to select a machine to play on try number 13. We generate a random number p, between 0.0 and 1.0. Suppose we have set epsilon = 0.10. If p > 0.10 (which it will be 90% of the time), we select machine #2 because it has the current highest average payout. But if p < 0.10 (which it will be only 10% of the time), we select a random machine, so each machine has a 1/3 chance of being selected.

Notice that machine #2 might get picked anyway because we select randomly from all machines.

Over time, the best machine will be played more and more often because it will pay out more often. In short, epsilon-greedy means pick the current best option ("greedy") most of the time, but pick a random option with a small (epsilon) probability sometimes.

There are many other algorithms for the multi-armed bandit problem. But epsilon-greedy is incredibly simple, and often works as well as, or even better than, more sophisticated algorithms such as UCB ("upper confidence bound") variations.

The idea is that in the beginning, we’ll use the epsilon greedy strategy:

- We specify an exploration rate “epsilon,” which we set to 1 in the beginning. This is the rate of steps that we’ll do randomly. In the beginning, this rate must be at its highest value, because we don’t know anything about the values in Q-table. This means we need to do a lot of exploration, by randomly choosing our actions.
- We generate a random number. If this number > epsilon, then we will do “exploitation” (this means we use what we already know to select the best action at each step). Else, we’ll do exploration.
- The idea is that we must have a big epsilon at the beginning of the training of the Q-function. Then, reduce it progressively as the agent becomes more confident at estimating Q-values.

So, what we changed in code? Actually, it almost didn't changed. We inserted self.epsilot_greedy = True # use epsilon greedy strategy Boolean option if we want to use this method or not. Exploration hyperparameters for epsilon and epsilon greedy strategy will be quite the same:

self.epsilon = 1.0 # exploration probability at start self.epsilon_min = 0.01 # minimum exploration probability self.epsilon_decay = 0.0005 # exponential decay rate for exploration prob

From our previous tutorial we have following "remember" function:

def remember(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) if len(self.memory) > self.train_start: if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay

We change it to following:

def remember(self, state, action, reward, next_state, done): experience = state, action, reward, next_state, done self.memory.append((experience))

Before we were checking if our memory list is equal to self.train_start. From now on we'll do training from our first episode of the game, we won't fill memory with samples. So, we are removing all lines and we'll implement epsilon greedy function in different place.

Until now we were doing following "act" function:

def act(self, state): if np.random.random() <= self.epsilon: return random.randrange(self.action_size) else: return np.argmax(self.model.predict(state))

But now we'll implement another epsilon greedy function, where we could change our used epsilon method with Boolean. Here we'll use an improved version of our epsilon greedy strategy for Q-learning where we reduce epsilon progressively as the agent becomes more confident at estimating Q-values. The function is almost the same, we simply inserted explore_probability calculation into epsilon greedy strategy:

def act_e_greedy(self, state, decay_step): # EPSILON GREEDY STRATEGY if self.epsilot_greedy: # Here we'll use an improved version of our epsilon greedy strategy for Q-learning explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step) # OLD EPSILON STRATEGY else: if self.epsilon > self.epsilon_min: self.epsilon *= (1-self.epsilon_decay) explore_probability = self.epsilon if explore_probability > np.random.rand(): # Make a random action (exploration) return random.randrange(self.action_size), explore_probability else: # Get action from Q-network (exploitation) # Estimate the Qs values state # Take the biggest Q value (= the best action) return np.argmax(self.model.predict(state)), explore_probability

In all code I am removing following lines, to start training agent from first steps:

if len(self.memory) < self.train_start: return

And finally, we are modifying our def run(self): function:

- Inserting decay_step = 0 variable
- Instead of old action = self.act(state) function we call it in following way: action, explore_probability = self.act_e_greedy(state, decay_step)
- We change our log print line, where before we were printing self.epsilon, now we'll print explore_probability in this place.

So, there weren’t a lot of changes, but we still need to do them, here is the full code:

import os os.environ['CUDA_VISIBLE_DEVICES'] = '-1' import random import gym import numpy as np from collections import deque from keras.models import Model, load_model from keras.layers import Input, Dense, Dropout, Lambda, Add from keras.optimizers import Adam, RMSprop import pylab from keras import backend as k def OurModel(input_shape, action_space, dueling): X_input = Input(input_shape) X = X_input # 'Dense' is the basic form of a neural network layer # Input Layer of state size(4) and Hidden Layer with 512 nodes X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X) # Hidden layer with 256 nodes X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X) # Hidden layer with 64 nodes X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X) if dueling: state_value = Dense(1, kernel_initializer='he_uniform')(X) state_value = Lambda(lambda s: k.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value) action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X) action_advantage = Lambda(lambda a: a[:, :] - k.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage) X = Add()([state_value, action_advantage]) else: # Output Layer with # of actions: 2 nodes (left, right) X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X) model = Model(inputs = X_input, outputs = X, name='CartPole DDDQN model') model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"]) model.summary() return model class DQNAgent: def __init__(self): self.env = gym.make('CartPole-v1') # by default, CartPole-v1 has max episode steps = 500 # we can use this to experiment beyond 500 self.env._max_episode_steps = 4000 self.state_size = self.env.observation_space.shape[0] self.action_size = self.env.action_space.n self.EPISODES = 500 # Instantiate memory self.memory = deque(maxlen=2000) self.gamma = 0.95 # discount rate # EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy self.epsilon = 1.0 # exploration probability at start self.epsilon_min = 0.01 # minimum exploration probability self.epsilon_decay = 0.0005 # exponential decay rate for exploration prob self.batch_size = 32 self.train_start = 1000 # defining model parameters self.ddqn = True # use doudle deep q network self.Soft_Update = False # use soft parameter update self.dueling = True # use dealing netowrk self.epsilot_greedy = True # use epsilon greedy strategy self.TAU = 0.1 # target network soft update hyperparameter self.Save_Path = 'Models' self.Model_name = 'CartPole' pylab.figure(figsize=(18, 9)) self.scores, self.episodes, self.average = [], [], [] self.i_records = 0 if self.ddqn: print("----------Double DQN--------") self.Model_name = self.Save_Path+"/"+"DDQN_"+self.Model_name+".h5" else: print("-------------DQN------------") self.Model_name = self.Save_Path+"/"+"DQN_"+self.Model_name+".h5" # create main model and target model self.model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling) self.target_model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling) # after some time interval update the target model to be same with model def update_target_model(self): if not self.Soft_Update and self.ddqn: self.target_model.set_weights(self.model.get_weights()) return if self.Soft_Update and self.ddqn: q_model_theta = self.model.get_weights() target_model_theta = self.target_model.get_weights() counter = 0 for q_weight, target_weight in zip(q_model_theta, target_model_theta): target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU target_model_theta[counter] = target_weight counter += 1 self.target_model.set_weights(target_model_theta) def remember(self, state, action, reward, next_state, done): experience = state, action, reward, next_state, done self.memory.append((experience)) def act_e_greedy(self, state, decay_step): # EPSILON GREEDY STRATEGY if self.epsilot_greedy: # Here we'll use an improved version of our epsilon greedy strategy for Q-learning explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step) # OLD EPSILON STRATEGY else: if self.epsilon > self.epsilon_min: self.epsilon *= (1-self.epsilon_decay) explore_probability = self.epsilon if explore_probability > np.random.rand(): # Make a random action (exploration) return random.randrange(self.action_size), explore_probability else: # Get action from Q-network (exploitation) # Estimate the Qs values state # Take the biggest Q value (= the best action) return np.argmax(self.model.predict(state)), explore_probability def replay(self): # Randomly sample minibatch from the deque memory minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size)) state = np.zeros((self.batch_size, self.state_size)) next_state = np.zeros((self.batch_size, self.state_size)) action, reward, done = [], [], [] # do this before prediction # for speedup, this could be done on the tensor level # but easier to understand using a loop for i in range(len(minibatch)): state[i] = minibatch[i][0] action.append(minibatch[i][1]) reward.append(minibatch[i][2]) next_state[i] = minibatch[i][3] done.append(minibatch[i][4]) # do batch prediction to save speed # predict Q-values for starting state using the main network target = self.model.predict(state) target_old = np.array(target) # predict best action in ending state using the main network target_next = self.model.predict(next_state) # predict Q-values for ending state using the target network target_val = self.target_model.predict(next_state) for i in range(len(minibatch)): # correction on the Q value for the action used if done[i]: target[i][action[i]] = reward[i] else: # the key point of Double DQN # selection of action is from model # update is from target model if self.ddqn: # Double - DQN # current Q Network selects the action # a'_max = argmax_a' Q(s', a') a = np.argmax(target_next[i]) # target Q Network evaluates the action # Q_max = Q_target(s', a'_max) target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a]) else: # Standard - DQN # DQN chooses the max Q value among next actions # selection and evaluation of action is on the target Q Network # Q_max = max_a' Q_target(s', a') target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i])) # Train the Neural Network with batches self.model.fit(state, target, batch_size=self.batch_size, verbose=0) def load(self, name): self.model = load_model(name) def save(self, name): if self.ddqn: self.model.save(name) else: self.model.save(name) def PlotModel(self, score, episode): self.scores.append(score) self.episodes.append(episode) self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:])) pylab.plot(self.episodes, self.average, 'r') pylab.plot(self.episodes, self.scores, 'b') dqn = 'DQN' softupdate = '' dueling = '' PER = '' if self.ddqn: dqn = 'DDQN' if self.Soft_Update: softupdate = '_soft' if self.dueling: dueling = "_dueling" try: pylab.savefig(dqn+"_cartpole"+softupdate+dueling+PER+".png") except OSError: pass return str(self.average[-1])[:5] def TrackScores(self, i): # maybe better would be tracking mean of scores if self.i_records <= i: self.i_records = i print("model save", i) if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path) #self.save(self.Model_name) def run(self): decay_step = 0 for e in range(self.EPISODES): state = self.env.reset() state = np.reshape(state, [1, self.state_size]) done = False i = 0 while not done: decay_step += 1 #self.env.render() action, explore_probability = self.act_e_greedy(state, decay_step) next_state, reward, done, _ = self.env.step(action) next_state = np.reshape(next_state, [1, self.state_size]) if not done or i == self.env._max_episode_steps-1: reward = reward else: reward = -100 self.remember(state, action, reward, next_state, done) state = next_state i += 1 if done: # save best model self.TrackScores(i) # every step update target model self.update_target_model() # every episode, plot the result average = self.PlotModel(i, e) print("episode: {}/{}, score: {}, e: {:.3}, average: {}".format(e, self.EPISODES, i, explore_probability, average)) self.replay() def test(self): self.load(self.Model_name) for e in range(self.EPISODES): state = self.env.reset() state = np.reshape(state, [1, self.state_size]) done = False i = 0 while not done: self.env.render() action = np.argmax(self.model.predict(state)) next_state, reward, done, _ = self.env.step(action) state = np.reshape(next_state, [1, self.state_size]) i += 1 if done: print("episode: {}/{}, score: {}".format(e, self.EPISODES, i)) break if __name__ == "__main__": agent = DQNAgent() agent.run() #agent.test()

##### Conclusion:

I won't do a comparison how our agent performs with epsilon greedy and without greedy, because before we were using very similar strategy to reduce epsilon, so there won't be significant difference. In this short tutorial we simply prepared our code for coming tutorial, where we'll integrate Priority Experience Replay method to our agent, with this method our agent should improve significantly, so keep watching!