A.I. learns to play Pong with Deep Q Network

In this tutorial, I'll implement a Deep Neural Network for Reinforcement Learning (Deep Q Network), and we will see it learns and finally becomes good enough to beat the computer in Pong!

There's a massive difference between reading about Reinforcement Learning and implementing it. In this tutorial, I'll implement a Deep Neural Network for Reinforcement Learning (Deep Q Network), and we will see it learns and finally becomes good enough to beat the computer in Pong!

By the end of this post, you'll be able to do the following:

  • Write a Neural Network from scratch;
  • Implement a Deep Q Network with Reinforcement Learning;
  • Build an A.I. for Pong that can beat the computer in less than 300 lines of Python;
  • Use OpenAI gym.

Considering the limited time and for learning purposes, I am not aiming for a perfect trained agent. Still, I hope this project could help people get familiar with the basic process of DQN algorithms and Keras. The following tutorial took two days for four different agents to learn on a computer with GPU. A lot more training and tuning are required to obtain production results, which is not my focus.

Tutorial Prerequisites: familiarity with Neural Networks, supervised learning, Tensorflow 1.15, Keras 2.2.4, OpenAI gym.

This tutorial is a part of my previous tutorials. I use my previous tutorial code structure - this way, it's easier to understand line by line when you start learning from the first tutorial part. I should mention that there won't be a theory in this tutorial because we already covered all the approaches we need in previous tutorials. We will change several lines of code that our Network would learn to play Pong instead of Cartpole.

Setup:

  1. Open up the code to follow along and copy or clone it;
  2. Follow the instructions for installing OpenAI Gym. You may need to install CMake first;
  3. Run pip install gym[atari];
  4. Let's get to the next part.

We are given the following problems:

  1. A sequence of images (frames) representing each frame of the Pong game;
  2. An indication of when we've won or lost the game;
  3. An opponent agent that is the traditional Pong computer player;

An agent we control that we can tell to do one step out of 6 at each frame.

Can we use these pieces to train our agent to beat the computer? Moreover, can we make our solution good enough to be reused to win in games that aren't Pong?

Solution:

Indeed, we can! We'll do this by building a Neural Network that takes in each image and outputs a command to our A.I. to take the correct move.

We can break this down into the following steps:

  1. Take in images from the game and preprocess it (remove color, background, down-sample, etc.);
  2. Use the Neural Network to compute a probability of taking the correct action;
  3. Sample from that probability distribution and tell the agent to move up, down, or stay in the same position;
  4. If the round is over (you missed the ball or the opponent missed the ball), find whether you won or lost;
  5. When the episode has finished(someone got to 21 points), pass the result through the training algorithm;
  6. Repeat this process until our agent is tuned to the point where we can beat the computer. That's it! Let’s start looking at how our code achieves this.

Ok, now that we’ve described the problem and its solution, let’s get to writing some code (Details in YouTube tutorial)!

Code:

You can check the complete Video tutorial code and download trained models on GitHub.

# Tutorial by www.pylessons.com
# Tutorial written for - Tensorflow 1.15, Keras 2.2.4

import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten
from keras.optimizers import Adam, RMSprop
from keras import backend as K
from PER import *
import cv2

def OurModel(input_shape, action_space, dueling):
    X_input = Input(input_shape)
    X = X_input
    
    #X = Conv2D(64, 5, strides=(3, 3),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
    X = Conv2D(32, 8, strides=(4, 4),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 4, strides=(2, 2),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Conv2D(64, 3, strides=(1, 1),padding="valid", activation="relu", data_format="channels_first")(X)
    X = Flatten()(X)
    
    # 'Dense' is the basic form of a neural network layer
    X = Dense(512, activation="relu", kernel_initializer='he_uniform')(X)

    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)

    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    if dueling:
        state_value = Dense(1, kernel_initializer='he_uniform')(X)
        state_value = Lambda(lambda s: K.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

        action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X)
        action_advantage = Lambda(lambda a: a[:, :] - K.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage)

        X = Add()([state_value, action_advantage])
    else:
        # Output Layer with # of actions: 2 nodes (left, right)
        X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_input, outputs = X)
    model.compile(optimizer=Adam(lr=0.00005), loss='mean_squared_error')

    model.summary()
    return model

class DQNAgent:
    def __init__(self, env_name):
        self.env_name = env_name       
        self.env = gym.make(env_name)
        #self.env.seed(0)  
        self.action_size = self.env.action_space.n
        self.EPISODES = 1000
        
        # Instantiate memory
        memory_size = 25000
        self.MEMORY = Memory(memory_size)
        self.memory = deque(maxlen=memory_size)

        self.gamma = 0.99    # discount rate
        
        # EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
        self.epsilon = 1.0  # exploration probability at start
        self.epsilon_min = 0.02  # minimum exploration probability 
        self.epsilon_decay = 0.00002  # exponential decay rate for exploration prob
        
        self.batch_size = 32

        # defining model parameters
        self.ddqn = False # use doudle deep q network
        self.dueling = False # use dealing network
        self.epsilon_greedy = False # use epsilon greedy strategy
        self.USE_PER = False # use priority experienced replay
        

        self.Save_Path = 'Models'
        if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path)
        self.scores, self.episodes, self.average = [], [], []

        self.Model_name = os.path.join(self.Save_Path, self.env_name+"_CNN.h5")

        self.ROWS = 80
        self.COLS = 80
        self.REM_STEP = 4
        self.update_model_steps = 1000
        
        self.state_size = (self.REM_STEP, self.ROWS, self.COLS)
        self.image_memory = np.zeros(self.state_size)
        
        # create main model and target model
        self.model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)
        self.target_model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)  

    # after some time interval update the target model to be same with model
    def update_target_model(self, game_steps):
        if game_steps % self.update_model_steps == 0:
            self.target_model.set_weights(self.model.get_weights())
            return

    def remember(self, state, action, reward, next_state, done):
        experience = state, action, reward, next_state, done
        if self.USE_PER:
            self.MEMORY.store(experience)
        else:
            self.memory.append((experience))

    def act(self, state, decay_step):
        # EPSILON GREEDY STRATEGY
        if self.epsilon_greedy:
        # Here we'll use an improved version of our epsilon greedy strategy for Q-learning
            explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
        # OLD EPSILON STRATEGY
        else:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= (1-self.epsilon_decay)
            explore_probability = self.epsilon
    
        if explore_probability > np.random.rand():
            # Make a random action (exploration)
            return random.randrange(self.action_size), explore_probability
        else:
            # Get action from Q-network (exploitation)
            # Estimate the Qs values state
            # Take the biggest Q value (= the best action)
            return np.argmax(self.model.predict(state)), explore_probability
                
    def replay(self):
        if self.USE_PER:
            # Sample minibatch from the PER memory
            tree_idx, minibatch  = self.MEMORY.sample(self.batch_size)
        else:
            if len(self.memory) > self.batch_size:
            # Randomly sample minibatch from the deque memory
                minibatch = random.sample(self.memory, self.batch_size)
            else:
                return

        state = np.zeros((self.batch_size, *self.state_size), dtype=np.float32)
        action = np.zeros(self.batch_size, dtype=np.int32)
        reward = np.zeros(self.batch_size, dtype=np.float32)
        next_state = np.zeros((self.batch_size, *self.state_size), dtype=np.float32)
        done = np.zeros(self.batch_size, dtype=np.uint8)

        # do this before prediction
        # for speedup, this could be done on the tensor level
        # but easier to understand using a loop       
        for i in range(len(minibatch)):
            state[i], action[i], reward[i], next_state[i], done[i] = minibatch[i]

        # do batch prediction to save speed
        # predict Q-values for starting state using the main network
        target = self.model.predict(state)
        target_old = np.array(target)
        # predict best action in ending state using the main network
        target_next = self.model.predict(next_state)
        # predict Q-values for ending state using the target network
        target_val = self.target_model.predict(next_state)


        for i in range(len(minibatch)):
            # correction on the Q value for the action used
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                # the key point of Double DQN
                # selection of action is from model
                # update is from target model
                if self.ddqn: # Double - DQN
                    # current Q Network selects the action
                    # a'_max = argmax_a' Q(s', a')
                    a = np.argmax(target_next[i])
                    # target Q Network evaluates the action
                    # Q_max = Q_target(s', a'_max)
                    target[i][action[i]] = reward[i] + self.gamma * target_val[i][a]
                else: # Standard - DQN
                    # DQN chooses the max Q value among next actions
                    # selection and evaluation of action is on the target Q Network
                    # Q_max = max_a' Q_target(s', a')
                    # when using target model in simple DQN rules, we get better performance
                    target[i][action[i]] = reward[i] + self.gamma * np.amax(target_val[i])
            
        if self.USE_PER:
            indices = np.arange(self.batch_size, dtype=np.int32)
            absolute_errors = np.abs(target_old[indices, action]-target[indices, action])

            # Update priority
            self.MEMORY.batch_update(tree_idx, absolute_errors)
                
        # Train the Neural Network with batches
        self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

    def load(self, name):
        self.model = load_model()

    def save(self, name):
        self.model.save(name)

    pylab.figure(figsize=(18, 9))
    def PlotModel(self, score, episode):
        self.scores.append(score)
        self.episodes.append(episode)
        self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
        pylab.plot(self.episodes, self.average, 'r')
        pylab.plot(self.episodes, self.scores, 'b')
        pylab.ylabel('Score', fontsize=18)
        pylab.xlabel('Games', fontsize=18)
        dqn = 'DQN_'
        dueling = ''
        greedy = ''
        PER = ''
        if self.ddqn: dqn = '_DDQN'
        if self.dueling: dueling = '_Dueling'
        if self.epsilon_greedy: greedy = '_Greedy'
        if self.USE_PER: PER = '_PER'
        try:
            pylab.savefig(self.env_name+dqn+dueling+greedy+PER+"_CNN.png")
        except OSError:
            pass
        # no need to worry about model, when doing a lot of experiments
        self.Model_name = os.path.join(self.Save_Path, self.env_name+dqn+dueling+greedy+PER+"_CNN.h5")

        return self.average[-1]

    def imshow(self, image, rem_step=0):
        cv2.imshow("cartpole"+str(rem_step), image[rem_step,...])
        if cv2.waitKey(25) & 0xFF == ord("q"):
            cv2.destroyAllWindows()
            return

    def GetImage(self, frame):
        self.env.render()
        
        # croping frame to 80x80 size
        frame_cropped = frame[35:195:2, ::2,:]
        if frame_cropped.shape[0] != self.COLS or frame_cropped.shape[1] != self.ROWS:
            # OpenCV resize function 
            frame_cropped = cv2.resize(frame, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)
        
        # converting to RGB (numpy way)
        frame_rgb = 0.299*frame_cropped[:,:,0] + 0.587*frame_cropped[:,:,1] + 0.114*frame_cropped[:,:,2]
        # converting to RGB (OpenCV way)
        #frame_rgb = cv2.cvtColor(frame_cropped, cv2.COLOR_RGB2GRAY)     

        # dividing by 255 we expresses value to 0-1 representation
        new_frame = np.array(frame_rgb).astype(np.float32) / 255.0

        # push our data by 1 frame, similar as deq() function work
        self.image_memory = np.roll(self.image_memory, 1, axis = 0)

        # inserting new frame to free space
        self.image_memory[0,:,:] = new_frame

        # show image frame   
        #self.imshow(self.image_memory,0)
        #self.imshow(self.image_memory,1)
        #self.imshow(self.image_memory,2)
        #self.imshow(self.image_memory,3)
        
        return np.expand_dims(self.image_memory, axis=0)

    def reset(self):
        frame = self.env.reset()
        for i in range(self.REM_STEP):
            state = self.GetImage(frame)
        return state

    def step(self,action):
        next_state, reward, done, info = self.env.step(action)
        next_state = self.GetImage(next_state)
        return next_state, reward, done, info
    
    def run(self):
        decay_step = 0
        max_average = -21.0
        for e in range(self.EPISODES):
            state = self.reset()
            done = False
            score = 0
            SAVING = ''
            while not done:
                decay_step += 1
                action, explore_probability = self.act(state, decay_step)
                next_state, reward, done, _ = self.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                score += reward
                if done:
                    # every episode, plot the result
                    average = self.PlotModel(score, e)

                    # saving best models
                    if average >= max_average:
                        max_average = average
                        self.save(self.Model_name)
                        SAVING = "SAVING"
                    else:
                        SAVING = ""
                    print("episode: {}/{}, score: {}, e: {:.2f}, average: {:.2f} {}".format(e, self.EPISODES, score, explore_probability, average, SAVING))
                    
                # update target model
                self.update_target_model(decay_step)

                # train model
                self.replay()

        # close environemnt when finish training
        self.env.close()

    def test(self, Model_name):
        self.load(Model_name)
        for e in range(self.EPISODES):
            state = self.reset()
            done = False
            score = 0
            while not done:
                self.env.render()
                action = np.argmax(self.model.predict(state))
                next_state, reward, done, _ = self.env.step(action)
                score += reward
                if done:
                    print("episode: {}/{}, score: {}".format(e, self.EPISODES, score))
                    break
        self.env.close()

if __name__ == "__main__":
    env_name = 'Pong-v0'
    agent = DQNAgent(env_name)
    #agent.run()
    agent.test('Models/Pong-v0_DQN_CNN.h5')

Results:

So I started to train the model from the simplest one, a simple Deep Q network. Parameters I used to use:

self.ddqn = False # use doudle deep q networks
self.dueling = False # use dealing network
self.USE_PER = False # use priority experienced replay

I thought that this should be trained and that it will perform the worst. The maximum average score it could get was as close to 5 scores. But let us look at other models performance:

Ok, I thought, simple DQN model performed quite nice; I will add a double network to it self.ddqn = True and we'll see how it will work. But what I saw in my results was not satisfying me; it performed even worse:

Then I added dueling self.dueling = True network, and I thought it should really get better than the two above-trained models. But results again were quite similar, average score just a little more than zero...

Here was my final hope self.USE_PER = True, because this performed best in our Cartpole game, it should be best here. But results were even worse:

Conclusion:

From the above example results, I can say that when the environment for Q function is too complex to be learned, DQN will fail miserably. This is why this is my last DQN tutorial. Policy Gradients are generally believed to apply to a broader range of problems because they directly operate in the policy. So in the coming tutorial, I will cover Policy Gradient reinforcement learning.