Reinforcement Learning tutorial

Posted November 3 by Rokas Balsys


04_CartPole-reinforcement-learning.png

Epsilon-Greedy in Deep Q learning

In previous tutorial I said, that in next turorial we'll try to implement Prioritized Experience Replay (PER) method, but before doing that I decided that we should cover Epsilon Greedy method and fix/prepare the source code for PER method. So this will be quite short tutorial.

The epsilon-greedy algorithm is very simple and occurs in several areas of machine learning. One common use of epsilon-greedy is in the so-called multi-armed bandit problem.

Let's take an example. Suppose we are standing in front of k = 3 slot machines. Each machine pays out according to a different probability distribution, and these distributions are unknown to you. And suppose you can play a total of 100 times.

We have two goals. The first goal is to experiment with a few coins to try and determine which machine pays out the best. The second, related, goal is to get as much money as possible. The terms “explore” and “exploit” are used to indicate that we have to use some coins to explore to find the best machine, and we want to use as many coins as possible on the best machine to exploit our knowledge.

Epsilon-greedy is almost too simple. As we play the machines, we keep track of the average payout of each machine. Then, we select the machine with the highest current average payout with probability = (1 – epsilon) + (epsilon / k) where epsilon is a small value like 0.10. And we select machines that don’t have the highest current payout average with probability = epsilon / k.

Its much easier to understand with a concrete example. Suppose, after our first 12 pulls, we played machine #1 four times and won 1 dollar two times and 0 dollar two times. The average for machine #1 is 2/4 = 0.50 dollars.

And suppose we’ve played machine #2 five times and won 1 dollar three times and 0 dollar two times. The average payout for machine #2 is 3/5 = 0.60 dollars.

And suppose we’ve played machine #3 three times and won 1 dollar one time and 0 dollar two times. The average payout for machine #3 is 1/3 = 0.33 dollars.

Now we have to select a machine to play on try number 13. We generate a random number p, between 0.0 and 1.0. Suppose we have set epsilon = 0.10. If p > 0.10 (which it will be 90% of the time), we select machine #2 because it has the current highest average payout. But if p < 0.10 (which it will be only 10% of the time), we select a random machine, so each machine has a 1/3 chance of being selected.

Notice that machine #2 might get picked anyway because we select randomly from all machines.

Over time, the best machine will be played more and more often because it will pay out more often. In short, epsilon-greedy means pick the current best option ("greedy") most of the time, but pick a random option with a small (epsilon) probability sometimes.

There are many other algorithms for the multi-armed bandit problem. But epsilon-greedy is incredibly simple, and often works as well as, or even better than, more sophisticated algorithms such as UCB ("upper confidence bound") variations.

The idea is that in the beginning, we’ll use the epsilon greedy strategy:

  • We specify an exploration rate “epsilon,” which we set to 1 in the beginning. This is the rate of steps that we’ll do randomly. In the beginning, this rate must be at its highest value, because we don’t know anything about the values in Q-table. This means we need to do a lot of exploration, by randomly choosing our actions.
  • We generate a random number. If this number > epsilon, then we will do “exploitation” (this means we use what we already know to select the best action at each step). Else, we’ll do exploration.
  • The idea is that we must have a big epsilon at the beginning of the training of the Q-function. Then, reduce it progressively as the agent becomes more confident at estimating Q-values.
04_CartPole-reinforcement-learning.png

So, what we changed in code? Actually, it almost didn't changed. We inserted self.epsilot_greedy = True # use epsilon greedy strategy Boolean option if we want to use this method or not. Exploration hyperparameters for epsilon and epsilon greedy strategy will be quite the same:

self.epsilon = 1.0  # exploration probability at start
self.epsilon_min = 0.01  # minimum exploration probability
self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob

From our previous tutorial we have following "remember" function:

def remember(self, state, action, reward, next_state, done):
    self.memory.append((state, action, reward, next_state, done))
    if len(self.memory) > self.train_start:
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

We change it to following:

def remember(self, state, action, reward, next_state, done):
    experience = state, action, reward, next_state, done
    self.memory.append((experience))

Before we were checking if our memory list is equal to self.train_start. From now on we'll do training from our first episode of the game, we won't fill memory with samples. So, we are removing all lines and we'll implement epsilon greedy function in different place.

Until now we were doing following "act" function:

def act(self, state):
    if np.random.random() <= self.epsilon:
        return random.randrange(self.action_size)
    else:
        return np.argmax(self.model.predict(state))

But now we'll implement another epsilon greedy function, where we could change our used epsilon method with Boolean. Here we'll use an improved version of our epsilon greedy strategy for Q-learning where we reduce epsilon progressively as the agent becomes more confident at estimating Q-values. The function is almost the same, we simply inserted explore_probability calculation into epsilon greedy strategy:

def act_e_greedy(self, state, decay_step):
    # EPSILON GREEDY STRATEGY
    if self.epsilot_greedy:
    # Here we'll use an improved version of our epsilon greedy strategy for Q-learning
        explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
    # OLD EPSILON STRATEGY
    else:
        if self.epsilon > self.epsilon_min:
            self.epsilon *= (1-self.epsilon_decay)
        explore_probability = self.epsilon

    if explore_probability > np.random.rand():
        # Make a random action (exploration)
        return random.randrange(self.action_size), explore_probability
    else:
        # Get action from Q-network (exploitation)
        # Estimate the Qs values state
        # Take the biggest Q value (= the best action)
        return np.argmax(self.model.predict(state)), explore_probability

In all code I am removing following lines, to start training agent from first steps:

if len(self.memory) < self.train_start:
    return

And finally, we are modifying our def run(self): function:

  • Inserting decay_step = 0 variable
  • Instead of old action = self.act(state) function we call it in following way: action, explore_probability = self.act_e_greedy(state, decay_step)
  • We change our log print line, where before we were printing self.epsilon, now we'll print explore_probability in this place.

So, there weren’t a lot of changes, but we still need to do them, here is the full code:

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import random
import gym
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense, Dropout, Lambda, Add
from keras.optimizers import Adam, RMSprop
import pylab
from keras import backend as k

def OurModel(input_shape, action_space, dueling):
    X_input = Input(input_shape)
    X = X_input

    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X)

    # Hidden layer with 256 nodes
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    
    # Hidden layer with 64 nodes
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    if dueling:
        state_value = Dense(1, kernel_initializer='he_uniform')(X)
        state_value = Lambda(lambda s: k.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

        action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X)
        action_advantage = Lambda(lambda a: a[:, :] - k.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage)

        X = Add()([state_value, action_advantage])
    else:
        # Output Layer with # of actions: 2 nodes (left, right)
        X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_input, outputs = X, name='CartPole DDDQN model')
    model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

class DQNAgent:
    def __init__(self):
        self.env = gym.make('CartPole-v1')
        # by default, CartPole-v1 has max episode steps = 500
        # we can use this to experiment beyond 500
        self.env._max_episode_steps = 4000
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.EPISODES = 500
        
        # Instantiate memory
        self.memory = deque(maxlen=2000)

        self.gamma = 0.95    # discount rate
        
        # EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
        self.epsilon = 1.0  # exploration probability at start
        self.epsilon_min = 0.01  # minimum exploration probability 
        self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob
        
        self.batch_size = 32
        self.train_start = 1000

        # defining model parameters
        self.ddqn = True # use doudle deep q network
        self.Soft_Update = False # use soft parameter update
        self.dueling = True # use dealing netowrk
        self.epsilot_greedy = True # use epsilon greedy strategy
        
        self.TAU = 0.1 # target network soft update hyperparameter

        self.Save_Path = 'Models'
        self.Model_name = 'CartPole'
        pylab.figure(figsize=(18, 9))
        self.scores, self.episodes, self.average = [], [], []
        self.i_records = 0

        if self.ddqn:
            print("----------Double DQN--------")
            self.Model_name = self.Save_Path+"/"+"DDQN_"+self.Model_name+".h5"
        else:
            print("-------------DQN------------")
            self.Model_name = self.Save_Path+"/"+"DQN_"+self.Model_name+".h5"

        # create main model and target model
        self.model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling)
        self.target_model = OurModel(input_shape=(self.state_size,), action_space = self.action_size, dueling = self.dueling)   

    # after some time interval update the target model to be same with model
    def update_target_model(self):
        if not self.Soft_Update and self.ddqn:
            self.target_model.set_weights(self.model.get_weights())
            return
        if self.Soft_Update and self.ddqn:
            q_model_theta = self.model.get_weights()
            target_model_theta = self.target_model.get_weights()
            counter = 0
            for q_weight, target_weight in zip(q_model_theta, target_model_theta):
                target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU
                target_model_theta[counter] = target_weight
                counter += 1
            self.target_model.set_weights(target_model_theta)

    def remember(self, state, action, reward, next_state, done):
        experience = state, action, reward, next_state, done
        self.memory.append((experience))

    def act_e_greedy(self, state, decay_step):
        # EPSILON GREEDY STRATEGY
        if self.epsilot_greedy:
        # Here we'll use an improved version of our epsilon greedy strategy for Q-learning
            explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
        # OLD EPSILON STRATEGY
        else:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= (1-self.epsilon_decay)
            explore_probability = self.epsilon
    
        if explore_probability > np.random.rand():
            # Make a random action (exploration)
            return random.randrange(self.action_size), explore_probability
        else:
            # Get action from Q-network (exploitation)
            # Estimate the Qs values state
            # Take the biggest Q value (= the best action)
            return np.argmax(self.model.predict(state)), explore_probability

    def replay(self):
        # Randomly sample minibatch from the deque memory
        minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))

        state = np.zeros((self.batch_size, self.state_size))
        next_state = np.zeros((self.batch_size, self.state_size))
        action, reward, done = [], [], []

        # do this before prediction
        # for speedup, this could be done on the tensor level
        # but easier to understand using a loop       
        for i in range(len(minibatch)):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        # do batch prediction to save speed
        # predict Q-values for starting state using the main network
        target = self.model.predict(state)
        target_old = np.array(target)
        # predict best action in ending state using the main network
        target_next = self.model.predict(next_state)
        # predict Q-values for ending state using the target network
        target_val = self.target_model.predict(next_state)

        for i in range(len(minibatch)):
            # correction on the Q value for the action used
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                # the key point of Double DQN
                # selection of action is from model
                # update is from target model
                if self.ddqn: # Double - DQN
                    # current Q Network selects the action
                    # a'_max = argmax_a' Q(s', a')
                    a = np.argmax(target_next[i])
                    # target Q Network evaluates the action
                    # Q_max = Q_target(s', a'_max)
                    target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a])
                else: # Standard - DQN
                    # DQN chooses the max Q value among next actions
                    # selection and evaluation of action is on the target Q Network
                    # Q_max = max_a' Q_target(s', a')
                    target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))
                
        # Train the Neural Network with batches
        self.model.fit(state, target, batch_size=self.batch_size, verbose=0)


    def load(self, name):
        self.model = load_model(name)

    def save(self, name):
        if self.ddqn:
            self.model.save(name)
        else:
            self.model.save(name)

    def PlotModel(self, score, episode):
        self.scores.append(score)
        self.episodes.append(episode)
        self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
        pylab.plot(self.episodes, self.average, 'r')
        pylab.plot(self.episodes, self.scores, 'b')
        dqn = 'DQN'
        softupdate = ''
        dueling = ''
        PER = ''
        if self.ddqn:
            dqn = 'DDQN'
        if self.Soft_Update:
            softupdate = '_soft'
        if self.dueling:
            dueling = "_dueling"
        try:
            pylab.savefig(dqn+"_cartpole"+softupdate+dueling+PER+".png")
        except OSError:
            pass
        
        return str(self.average[-1])[:5]

    def TrackScores(self, i):
        # maybe better would be tracking mean of scores
        if self.i_records <= i:
            self.i_records = i
            print("model save", i)
            if not os.path.exists(self.Save_Path):
                os.makedirs(self.Save_Path)
            #self.save(self.Model_name)
            
    def run(self):
        decay_step = 0
        for e in range(self.EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                decay_step += 1
                #self.env.render()
                action, explore_probability = self.act_e_greedy(state, decay_step)
                next_state, reward, done, _ = self.env.step(action)
                next_state = np.reshape(next_state, [1, self.state_size])
                if not done or i == self.env._max_episode_steps-1:
                    reward = reward
                else:
                    reward = -100
                self.remember(state, action, reward, next_state, done)
                
                state = next_state
                i += 1
                if done:
                    # save best model
                    self.TrackScores(i)

                    # every step update target model
                    self.update_target_model()

                    # every episode, plot the result
                    average = self.PlotModel(i, e)
                    
                    print("episode: {}/{}, score: {}, e: {:.3}, average: {}".format(e, self.EPISODES, i, explore_probability, average))

                self.replay()

    def test(self):
        self.load(self.Model_name)
        for e in range(self.EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                self.env.render()
                action = np.argmax(self.model.predict(state))
                next_state, reward, done, _ = self.env.step(action)
                state = np.reshape(next_state, [1, self.state_size])
                i += 1
                if done:
                    print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
                    break


if __name__ == "__main__":
    agent = DQNAgent()
    agent.run()
    #agent.test()

Conclusion:

I won't do a comparison how our agent performs with epsilon greedy and without greedy, because before we were using very similar strategy to reduce epsilon, so there won't be significant difference. In this short tutorial we simply prepared our code for coming tutorial, where we'll integrate Priority Experience Replay method to our agent, with this method our agent should improve significantly, so keep watching!