DQN PER with Convolutional Neural Networks

Now I will show you how to implement DQN with CNN. After this tutorial, you'll be able to create an agent that successfully plays almost ‘any’ game using only pixel inputs

This tutorial will show you how to implement one of the most groundbreaking Reinforcement Learning algorithms - DQN with pixels. After the end of this tutorial, you will create an agent that successfully plays almost 'any' game using only pixel inputs.

We've used game-specific inputs in all my previous DQN tutorials (like cart position or pole angle). Now we will be more general and use something that all games have in common - pixels. To begin with, I would like to come back to our first DQN tutorial, where we wrote our first agent code to take actions randomly. Now we'll do the same thing, but instead of using specific environment inputs, we'll use pixels. This is our random action code from the first tutorial:

import gym
import random

env = gym.make("CartPole-v0")
env.reset()

def Random_games():
    for episode in range(10):
        env.reset()
        for t in range(500):
            env.render()
            action = env.action_space.sample()
            next_state, reward, done, info = env.step(action)
            print(t, next_state, reward, done, info, action)
            if done:
                break
                
Random_games()

This shortcode was playing random games for us, and now we'll need to build our own render, GetImage, reset and step functions to work with pixels.

Before we used env.render() function, when we wanted to see how our agent plays the game, in gym documentation, it's written that if we use env.render(mode='rgb_array') it will return us rendered frame image, this way, we'll get our image. Yes, it's sad that we'll need to render every frame of our game, it will be much slower than training without a render, but there is no other way to get game frames in a real situation. We could use different gym environment games to give us frames without rendering, but now we'll try a real-life example. So, here is the function to get rendered pixels:

img = env.render(mode='rgb_array') 

We'll see the following RGB image:

Cartpole gym environment outputs 600x400 RGB arrays (600x400x3). That’s way too many pixels with such a simple task, more than we need. We'll convert it to grayscale and downsize it with the following lines:

img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
img_rgb_resized = cv2.resize(img_rgb, (240, 160), interpolation=cv2.INTER_CUBIC)

And we'll receive the following results:

This will use fewer resources while training because of the image with 1 channel and smaller size, but to make everything simpler, we'll make it a full black Cartpole:

img_rgb_resized[img_rgb_resized < 255] = 0

So, for the final training result's we'll use the following images:

But do we have all the information to determine what’s really going on? No, we don’t have any information about the movements of the game objects, and I hope we all can agree that’s crucial in the world of games. So, how can we overcome this?

Let’s for each observation stack two consecutive frames:

Okay, now we can see the direction and the velocity of the pole. But do we know its acceleration? No. Let’s stack three frames then:

Now we can derive both the direction, velocity, and acceleration of the moving objects. Still, as not every game is rendered at the same pace, we’ll keep 4 frames - to be sure that we have all the necessary information:

Assuming that we will store four last frames for each step, our input shape will be 240x160x4. So now we'll do these steps? First, we'll define our 4 frame image memory: image_memory = np.zeros((4, 160, 240)). Every time before adding an image to our image_memory, we need to push our data by 1 frame, similar as deq() function work, by following way: image_memory = np.roll(image_memory, 1, axis = 0).

The last step is to add images to free space: image_memory[0,:,:] = img_rgb_resized.

So, this is how our complete code looks like for random steps:

import gym
import random
import numpy as np
import cv2

class DQN_CNN_Agent:
    def __init__(self, env_name):
        self.env_name = env_name       
        self.env = gym.make(env_name)
        self.ROWS = 160
        self.COLS = 240
        self.REM_STEP = 4

        self.EPISODES = 10

        self.image_memory = np.zeros((self.REM_STEP, self.ROWS, self.COLS))

    def imshow(self, image, rem_step=0):
        cv2.imshow(env_name+str(rem_step), image[rem_step,...])
        if cv2.waitKey(25) & 0xFF == ord("q"):
            cv2.destroyAllWindows()
            return

    def GetImage(self):
        img = self.env.render(mode='rgb_array')
  
        img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        img_rgb_resized = cv2.resize(img_rgb, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)
        img_rgb_resized[img_rgb_resized < 255] = 0
        img_rgb_resized = img_rgb_resized / 255

        self.image_memory = np.roll(self.image_memory, 1, axis = 0)
        self.image_memory[0,:,:] = img_rgb_resized

        self.imshow(self.image_memory,0)
        
        return np.expand_dims(self.image_memory, axis=0)
    
    def reset(self):
        self.env.reset()
        for i in range(self.REM_STEP):
            state = self.GetImage()
        return state

    def step(self,action):
        next_state, reward, done, info = self.env.step(action)
        next_state = self.GetImage()
        return next_state, reward, done, info

    def run(self):
        # Each of this episode is its own game.
        for episode in range(self.EPISODES):
            self.reset()
            # this is each frame, up to 500...but we wont make it that far with random.
            for t in range(500):               
                # This will just create a sample action in any environment.
                # In this environment, the action can be 0 or 1, which is left or right
                action = self.env.action_space.sample()

                # this executes the environment with an action, 
                # and returns the observation of the environment, 
                # the reward, if the env is over, and other info.
                next_state, reward, done, info = self.step(action)
                
                # lets print everything in one line:
                #print(t, next_state, reward, done, info, action)
                if done:
                    break

if __name__ == "__main__":
    env_name = 'CartPole-v1'
    agent = DQN_CNN_Agent(env_name)
    agent.run()

You can test this random code by yourself, and it's pretty simple.

DQN with Convolutional Neural Network:

Before merging everything to one code, we must make one major improvement - implement Convolutional Neural Networks (CNN) to our current code. If you are not familiar with CNN, I have several tutorials about them; I recommend checking them out before moving forward. So, first, we'll implement few more functions for our CNN; we'll start with imports:

from keras.layers import Flatten, Conv2D, Activation

Bellow is already a modified agent model that we could use CNN. The input to the neural network will consist of 160 x 240 x 4 images. The first hidden layer convolves 64 filters of 5 x 5 with stride 3 with the input image and applies a ReLu activation. The second hidden layer convolves 64 filters of 4 x 4 with stride 2, followed by a ReLu activation. This is followed by a third convolutional layer that convolves 64 filters of 3 x 3 with stride 1 followed by a ReLu activation. The final hidden layer is fully connected to "Flatten" units. After these layers, goes all the same dense layers as before:

def OurModel(input_shape, action_space, dueling):
    X_input = Input(input_shape)
    X = X_input
    
    X = Conv2D(64, 5, strides=(3, 3),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 4, strides=(2, 2),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 3, strides=(1, 1),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Flatten()(X)
    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X)

    # Hidden layer with 256 nodes
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    
    # Hidden layer with 64 nodes
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    if dueling:
        state_value = Dense(1, kernel_initializer='he_uniform')(X)
        state_value = Lambda(lambda s: K.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

        action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X)
        action_advantage = Lambda(lambda a: a[:, :] - K.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage)

        X = Add()([state_value, action_advantage])
    else:
        # Output Layer with # of actions: 2 nodes (left, right)
        X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_input, outputs = X, name='CartPole PER D3QN CNN model')
    model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

Now it's time to merge the above random code functions (GetImage, reset, step) with our previous tutorials code. We'll remove the function from our previous tutorial, and we don't need this anymore because our input will be pixels. Now we'll use some newly defined variables:

self.ROWS = 160
self.COLS = 240
self.REM_STEP = 4

self.image_memory = np.zeros((self.REM_STEP, self.ROWS, self.COLS))
self.state_size = (self.REM_STEP, self.ROWS, self.COLS)

Here we defined our image size, which we'll give to our Neural Network; this is why we need to define the new self.state_size. Because of this change, we must make some changes in self.model and self.target_model to the following:

self.model = OurCnnModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)
self.target_model = OurCnnModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)

Because we changed inputs to our model, we must make changes in the replay function:

  • We change state = np.zeros((self.batch_size, self.state_size)) to state = np.zeros((self.batch_size,) + self.state_size);
  • We change next_state = np.zeros((self.batch_size, self.state_size)) to next_state = np.zeros((self.batch_size,) + self.state_size).

Also, as I told while merging the above code with previous tutorials code, in the main run function, we change:

  • state = self.reset() to state = self.reset();
  • self.env.step(action) to self.step(action).

Complete tutorial code on GitHub link:

# Tutorial by www.pylessons.com
# Tutorial written for - Tensorflow 1.15, Keras 2.2.4

import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten
from keras.optimizers import Adam, RMSprop
from keras import backend as K
from PER import *
import cv2

def OurModel(input_shape, action_space, dueling):
    X_input = Input(input_shape)
    X = X_input
    
    X = Conv2D(64, 5, strides=(3, 3),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 4, strides=(2, 2),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 3, strides=(1, 1),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Flatten()(X)
    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    X = Dense(512, activation="relu", kernel_initializer='he_uniform')(X)

    # Hidden layer with 256 nodes
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    
    # Hidden layer with 64 nodes
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    if dueling:
        state_value = Dense(1, kernel_initializer='he_uniform')(X)
        state_value = Lambda(lambda s: K.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

        action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X)
        action_advantage = Lambda(lambda a: a[:, :] - K.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage)

        X = Add()([state_value, action_advantage])
    else:
        # Output Layer with # of actions: 2 nodes (left, right)
        X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_input, outputs = X, name='CartPole PER D3QN CNN model')
    model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

class DQNAgent:
    def __init__(self, env_name):
        self.env_name = env_name       
        self.env = gym.make(env_name)
        self.env.seed(0)  
        # by default, CartPole-v1 has max episode steps = 500
        # we can use this to experiment beyond 500
        self.env._max_episode_steps = 4000
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.EPISODES = 1000
        
        # Instantiate memory
        memory_size = 10000
        self.MEMORY = Memory(memory_size)
        self.memory = deque(maxlen=2000)

        self.gamma = 0.95    # discount rate
        
        # EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
        self.epsilon = 1.0  # exploration probability at start
        self.epsilon_min = 0.01  # minimum exploration probability 
        self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob
        
        self.batch_size = 32

        # defining model parameters
        self.ddqn = True # use doudle deep q network
        self.Soft_Update = False # use soft parameter update
        self.dueling = True # use dealing netowrk
        self.epsilon_greedy = False # use epsilon greedy strategy
        self.USE_PER = True # use priority experienced replay
        
        self.TAU = 0.1 # target network soft update hyperparameter

        self.Save_Path = 'Models'
        if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path)
        self.scores, self.episodes, self.average = [], [], []

        self.Model_name = os.path.join(self.Save_Path, self.env_name+"_PER_D3QN_CNN.h5")

        self.ROWS = 160
        self.COLS = 240
        self.REM_STEP = 4

        self.image_memory = np.zeros((self.REM_STEP, self.ROWS, self.COLS))
        self.state_size = (self.REM_STEP, self.ROWS, self.COLS)
        
        # create main model and target model
        self.model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)
        self.target_model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)  

    # after some time interval update the target model to be same with model
    def update_target_model(self):
        if not self.Soft_Update and self.ddqn:
            self.target_model.set_weights(self.model.get_weights())
            return
        if self.Soft_Update and self.ddqn:
            q_model_theta = self.model.get_weights()
            target_model_theta = self.target_model.get_weights()
            counter = 0
            for q_weight, target_weight in zip(q_model_theta, target_model_theta):
                target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU
                target_model_theta[counter] = target_weight
                counter += 1
            self.target_model.set_weights(target_model_theta)

    def remember(self, state, action, reward, next_state, done):
        experience = state, action, reward, next_state, done
        if self.USE_PER:
            self.MEMORY.store(experience)
        else:
            self.memory.append((experience))

    def act(self, state, decay_step):
        # EPSILON GREEDY STRATEGY
        if self.epsilon_greedy:
        # Here we'll use an improved version of our epsilon greedy strategy for Q-learning
            explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
        # OLD EPSILON STRATEGY
        else:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= (1-self.epsilon_decay)
            explore_probability = self.epsilon
    
        if explore_probability > np.random.rand():
            # Make a random action (exploration)
            return random.randrange(self.action_size), explore_probability
        else:
            # Get action from Q-network (exploitation)
            # Estimate the Qs values state
            # Take the biggest Q value (= the best action)
            return np.argmax(self.model.predict(state)), explore_probability
                
    def replay(self):
        if self.USE_PER:
            # Sample minibatch from the PER memory
            tree_idx, minibatch  = self.MEMORY.sample(self.batch_size)
        else:
            # Randomly sample minibatch from the deque memory
            minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))

        state = np.zeros((self.batch_size,) + self.state_size)
        next_state = np.zeros((self.batch_size,) + self.state_size)
        action, reward, done = [], [], []

        # do this before prediction
        # for speedup, this could be done on the tensor level
        # but easier to understand using a loop       
        for i in range(len(minibatch)):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        # do batch prediction to save speed
        # predict Q-values for starting state using the main network
        target = self.model.predict(state)
        target_old = np.array(target)
        # predict best action in ending state using the main network
        target_next = self.model.predict(next_state)
        # predict Q-values for ending state using the target network
        target_val = self.target_model.predict(next_state)

        for i in range(len(minibatch)):
            # correction on the Q value for the action used
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                # the key point of Double DQN
                # selection of action is from model
                # update is from target model
                if self.ddqn: # Double - DQN
                    # current Q Network selects the action
                    # a'_max = argmax_a' Q(s', a')
                    a = np.argmax(target_next[i])
                    # target Q Network evaluates the action
                    # Q_max = Q_target(s', a'_max)
                    target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a])
                else: # Standard - DQN
                    # DQN chooses the max Q value among next actions
                    # selection and evaluation of action is on the target Q Network
                    # Q_max = max_a' Q_target(s', a')
                    target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))

            if self.USE_PER:
                absolute_errors = np.abs(target_old[i]-target[i])
                # Update priority
                self.MEMORY.batch_update(tree_idx, absolute_errors)
                
        # Train the Neural Network with batches
        self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

    def load(self, name):
        self.model = load_model(name)

    def save(self, name):
        self.model.save(name)

    pylab.figure(figsize=(18, 9))
    def PlotModel(self, score, episode):
        self.scores.append(score)
        self.episodes.append(episode)
        self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
        pylab.plot(self.episodes, self.average, 'r')
        pylab.plot(self.episodes, self.scores, 'b')
        pylab.ylabel('Score', fontsize=18)
        pylab.xlabel('Steps', fontsize=18)
        dqn = 'DQN_'
        softupdate = ''
        dueling = ''
        greedy = ''
        PER = ''
        if self.ddqn: dqn = 'DDQN_'
        if self.Soft_Update: softupdate = '_soft'
        if self.dueling: dueling = '_Dueling'
        if self.epsilon_greedy: greedy = '_Greedy'
        if self.USE_PER: PER = '_PER'
        try:
            pylab.savefig(dqn+self.env_name+softupdate+dueling+greedy+PER+"_CNN.png")
        except OSError:
            pass

        return str(self.average[-1])[:5]

    def imshow(self, image, rem_step=0):
        cv2.imshow("cartpole"+str(rem_step), image[rem_step,...])
        if cv2.waitKey(25) & 0xFF == ord("q"):
            cv2.destroyAllWindows()
            return

    def GetImage(self):
        img = self.env.render(mode='rgb_array')
        
        img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        img_rgb_resized = cv2.resize(img_rgb, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)
        img_rgb_resized[img_rgb_resized < 255] = 0
        img_rgb_resized = img_rgb_resized / 255

        self.image_memory = np.roll(self.image_memory, 1, axis = 0)
        self.image_memory[0,:,:] = img_rgb_resized

        #self.imshow(self.image_memory,0)
        
        return np.expand_dims(self.image_memory, axis=0)

    def reset(self):
        self.env.reset()
        for i in range(self.REM_STEP):
            state = self.GetImage()
        return state

    def step(self,action):
        next_state, reward, done, info = self.env.step(action)
        next_state = self.GetImage()
        return next_state, reward, done, info
    
    def run(self):
        decay_step = 0
        for e in range(self.EPISODES):
            state = self.reset()
            done = False
            i = 0
            while not done:
                decay_step += 1
                action, explore_probability = self.act(state, decay_step)
                next_state, reward, done, _ = self.step(action)
                if not done or i == self.env._max_episode_steps-1:
                    reward = reward
                else:
                    reward = -100
                self.remember(state, action, reward, next_state, done)
                state = next_state
                i += 1
                if done:
                    # every REM_STEP update target model
                    if e % self.REM_STEP == 0:
                        self.update_target_model()
                    
                    # every episode, plot the result
                    average = self.PlotModel(i, e)
                    
                    print("episode: {}/{}, score: {}, e: {:.2}, average: {}".format(e, self.EPISODES, i, explore_probability, average))
                    if i == self.env._max_episode_steps:
                        print("Saving trained model to", self.Model_name)
                        #self.save(self.Model_name)
                        break
                self.replay()

    def test(self):
        self.load(self.Model_name)
        for e in range(self.EPISODES):
            state = self.reset()
            done = False
            i = 0
            while not done:
                action = np.argmax(self.model.predict(state))
                next_state, reward, done, _ = env.step(action)
                i += 1
                if done:
                    print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
                    break

if __name__ == "__main__":
    env_name = 'CartPole-v1'
    agent = DQNAgent(env_name)
    agent.run()
    #agent.test()

DQN CNN agent performance

I think you are interested in how our new agent performed simply with pixel inputs. I will tell you, not that well as I wanted/expected. But at least we can see that it's working, there is a lot where to improve.

So, the same as before, I trained an agent for 500 episodes to save my time. But as I told you, that this time we'd be keeping current and three history images; I did training tests for 1, 2, 3, and 4 history images, results:

From the above chart, we can see that our agent, just with one input (channels 0), performed the worsts; even our random agent could perform similarly. Then we can see that agents who have more channels than one performs better, and we can assume that the more channels our agent see, the better it performed. But there is one minus - more channels it has - more resources are needed for training. So, I tried to train four channels agent for longer, and I trained it for around 3600 steps:

 

The red dots in the graph are the actual episode score, and the blue line is the 50 games moving average score. Our agent reached the best performance in around 1000 training steps, which was not improving anymore. Our average game score was about 100; it's much worse than using four parameters as input than pixels, but it's impossible to get the best input parameters; they always have some noise or error.

Conclusion:

That's all! We have created a more intelligent agent who learns to play a balancing Cartpole game from image pixels. Awesome! Remember that if you want to have an agent with outstanding performance with pixel data, you need many more GPU hours (about two days of training).

Don't forget to implement each part of the code by yourself. It's essential to try to modify the code I gave you. Try to change NN architecture, change the learning rate, use a more challenging environment, and so on.

Remember that this was quite a long tutorial series, so be sure to understand why we use these new strategies, how they work, and their advantages.

If you don't understand how everything works up to this point, later analyzing and understand a more complicated system will be much more challenging.

In the next tutorial, I will cover more reinforcement learning strategies in simple environments. We'll try reinforcement learning on more complicated tasks only after covering all the most known and popular strategies.