Reinforcement Learning tutorial

Posted March 22 by Rokas Balsys


Asynchronous Actor-Critic Agent:

In this tutorial I will provide an implementation of Asynchronous Advantage Actor-Critic (A3C) algorithm in Tensorflow and Keras. We will use it to solve a simple challenge in Pong environment! If you are new to Deep Learning and Reinforcement Learning, I suggest checking out my previous tutorials before going through this post in order to understand all the building blocks which will be utilized here. If you have been following the series: thank you! Writing these tutorials I have learned so much about RL in the past months, and am happy to share it with everyone.

So what is A3C? The A3C algorithm was released by Google’s DeepMind group in 2016, and it essentially beated DQN. It was faster, simpler, more robust, and able to achieve much better scores on the standard Deep RL tasks. On top of all that it could work in continuous as well as discrete action spaces. A3C became the go-to Deep RL algorithm for new challenging problems with complex state and action spaces.

Asynchronous Advantage Actor Critic is quite hard to implement. Let’s start by breaking down the name, and then the mechanics behind the algorithm itself.

  • Asynchronous: The algorithm is an asynchronous algorithm where multiple worker agents are trained in parallel, each with their own environment. This allows our algorithm to not only train faster as more workers are training in parallel, but also to attain a more diverse training experience as each workers’ experience is independent.
  • Advantage: Advantage is a metric to judge both how good its actions were, but also how they turned out. This allows the algorithm to focus on where the network's predictions were lacking. Intuitively, this allows us to measure the advantage of taking action, a, over following the policy π at the given time step.
  • Actor-Critic: The Actor-Critic aspect of the algorithm uses an architecture that shares layers between the policy and value function.

How does A3C work?

At a high level, the A3C algorithm uses an asynchronous updating scheme that operates on fixed-length time steps of experience in continuous environment and batched-length time steps of experience in episodic environment. It will use these segments to compute estimators of the rewards and the advantage function. Each worker performs the following workflow cycle:

  1. Fetch the global network parameters
  2. Interact with the environment by following the local policy for n number of steps.
  3. Calculate value and policy loss
  4. Get gradients from losses
  5. Update the global network
  6. Repeat

With this training steps, we expect to see a linear speed up with the number of agents. However, the number of agents our machine can support is bound by the number of CPU cores available. In addition, A3C can even scale to more than one machine, and some newer research (such as IMPALA) supports scaling it even further. Check out the link for more in-depth information!

Implementation:

Because I already implemented A2C agent in my previous tutorial, this part won't affect agent code that much, we simply need to make our agents work in parallel. Let’s first define what kind of model we’ll be using. The master agent will have the global network and each local worker agent will have a copy of this network in their own process.

parallel_training.png

So I use my previous A2C tutorial code as backbone, because we only need to make it work asinchronously. So first what I do is I import necessary libraries which we will use to make TensorFlow and Keras work in parallel environments:

# import needed for threading
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
import threading
from threading import Thread, Lock
import time

# configure Keras and TensorFlow sessions and graph
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)
set_session(sess)
K.set_session(sess)
graph = tf.get_default_graph()

Next we can move to our initialization step, now we will use local environment memory instead of global, so we are removing following lines:

self.states, self.actions, self.rewards = [], [], []

Before we used self.image_memory, where we were saving game frames, because now we will use this memory locally for every environment, we can remove this memory:

self.image_memory = np.zeros(self.state_size)

In init function I added self.episode = 0 parameter which will be used to track total count of episodes played through all environments. And defined self.lock = Lock() parameters, used to lock all threads to update parameters without other thread interruption.

After creating and compiling our Actor and Critic models, we must create threaded predict function and save new graph as global, we do this in following lines:

# make predict function to work while multithreading
self.Actor._make_predict_function()
self.Critic._make_predict_function()

global graph
graph = tf.get_default_graph()

If you were following my previous A2C code, we used remember function to create memory, now we are removing this function. Next, in remember function we used to reset our states, action and rewards memories, we are removing this line. Instead of using global memory in replay function, we will give this memory to a function in following way:

def replay(self, states, actions, rewards):
    # reshape memory to appropriate shape for training
    states = np.vstack(states)
    actions = np.vstack(actions)
    ...

What we must change is how we work with every frame, so in following code I will bold code which is new and old code will be stroked through. Before we were doing all of this in global level, now we need to do all these steps for all environments in parallel:

def GetImage(self, frame, image_memory):
    if image_memory.shape == (1,*self.state_size):
        image_memory = np.squeeze(image_memory)
        
    # croping frame to 80x80 size
    frame_cropped = frame[35:195:2, ::2,:]
    if frame_cropped.shape[0] != self.COLS or frame_cropped.shape[1] != self.ROWS:
        # OpenCV resize function 
        frame_cropped = cv2.resize(frame, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)
    
    # converting to RGB (numpy way)
    frame_rgb = 0.299*frame_cropped[:,:,0] + 0.587*frame_cropped[:,:,1] + 0.114*frame_cropped[:,:,2]

    # convert everything to black and white (agent will train faster)
    frame_rgb[frame_rgb < 100] = 0
    frame_rgb[frame_rgb >= 100] = 255
    # converting to RGB (OpenCV way)
    #frame_rgb = cv2.cvtColor(frame_cropped, cv2.COLOR_RGB2GRAY)     

    # dividing by 255 we expresses value to 0-1 representation
    new_frame = np.array(frame_rgb).astype(np.float32) / 255.0

    # push our data by 1 frame, similar as deq() function work
    self.image_memory = np.roll(self.image_memory, 1, axis = 0)

    # inserting new frame to free space
    self.image_memory[0,:,:] = new_frame

    # show image frame   
    #self.imshow(self.image_memory,0)
    #self.imshow(self.image_memory,1)
    #self.imshow(self.image_memory,2)
    #self.imshow(self.image_memory,3)
    
    return np.expand_dims(self.image_memory, axis=0)

def reset(self, env):
    image_memory = np.zeros(self.state_size)
    frame = self.env.reset()
    for i in range(self.REM_STEP):
        state = self.GetImage(frame, image_memory)
    return state

def step(self, action, env, image_memory):
    next_state, reward, done, info = self.env.step(action)
    next_state = self.GetImage(next_state, image_memory)
    return next_state, reward, done, info

So now when we changed our image processing functions, we can work with frames in parallel, they won't be messed up. Now we must change run function, we will add local memory to this function, I will mark them in same way as before:

def run(self):
    for e in range(self.EPISODES):
        state = self.reset(self.env)
        done, score, SAVING = False, 0, ''
        # Instantiate games memory
        states, actions, rewards = [], [], []
        while not done:
            #self.env.render()
            # Actor picks an action
            action = self.act(state)
            # Retrieve new state, reward, and whether the state is terminal
            next_state, reward, done, _ = self.step(action, self.env, state)
            # Memorize (state, action, reward) for training
            self.remember(state, action, reward)
            states.append(state)
            action_onehot = np.zeros([self.action_size])
            action_onehot[action] = 1
            actions.append(action_onehot)
            rewards.append(reward)
            # Update current state
            state = next_state
            score += reward
            if done:
                average = self.PlotModel(score, e)
                # saving best models
                if average >= self.max_average:
                    self.max_average = average
                    self.save()
                    SAVING = "SAVING"
                else:
                    SAVING = ""
                print("episode: {}/{}, score: {}, average: {:.2f} {}".format(e, self.EPISODES, score, average, SAVING))

                self.replay(states, actions, rewards)
                # reset training memory
                states, actions, rewards = [], [], []
                
    self.env.close()

Up to this point we covered all code we used in previous tutorial, we can use it for A2C model to train. Now it's time to create a simple threading code.

With following code, a set of worker agents, each with their own network and environment will be created. Each of these workers will run on a separate processor thread, so if there will be more workers than threads on cpu, simply worker will play game slower, this won't give more speed.

def train(self, n_threads):
    self.env.close()
    # Instantiate one environment per thread
    envs = [gym.make(self.env_name) for i in range(n_threads)]

    # Create threads
    threads = [threading.Thread(
            target=self.train_threading,
            daemon=True,
            args=(self,
                envs[i],
                i)) for i in range(n_threads)]

    for t in threads:
        time.sleep(2)
        t.start()

So, with above code we are closing self.env we created before, because we just used it to get environment parameters. Now we create defined number of environments which we target to train_threading function, this function will be used to train agents in parallel:

def train_threading(self, agent, env, thread):
    global graph
    with graph.as_default():
        while self.episode < self.EPISODES:
            # Reset episode
            score, done, SAVING = 0, False, ''
            state = self.reset(env)
            # Instantiate games memory
            states, actions, rewards = [], [], []
            while not done:
                action = agent.act(state)
                next_state, reward, done, _ = self.step(action, env, state)

                states.append(state)
                action_onehot = np.zeros([self.action_size])
                action_onehot[action] = 1
                actions.append(action_onehot)
                rewards.append(reward)
                
                score += reward
                state = next_state

            self.lock.acquire()
            self.replay(states, actions, rewards)
            self.lock.release()
            # reset training memory
            states, actions, rewards = [], [], []
                    
            # Update episode count
            with self.lock:
                average = self.PlotModel(score, self.episode)
                # saving best models
                if average >= self.max_average:
                    self.max_average = average
                    self.save()
                    SAVING = "SAVING"
                else:
                    SAVING = ""
                print("episode: {}/{}, thread: {}, score: {}, average: {:.2f} {}".format(self.episode, self.EPISODES, thread, score, average, SAVING))
                if(self.episode < self.EPISODES):
                    self.episode += 1
        env.close() 

First to train our global model in parallel, we are using our defined global graph and simply, when our worker finishes game episode and we already have collected game memory we use self.lock.acquire() to lock all treads. We lock threads not to interrupt training by other workers. When finished training, we use self.lock.release() and we let other threads to train model with their memory. Similar stuff we do to save our plot, we use with self.lock:. All other steps are quite the same.

Once a successful update is made to the global network, the whole process repeats! The worker then resets its own network parameters to those of the global network, and the process begins again.

To start training agent in parallel with 5 agents I use following code:

if __name__ == "__main__":
    env_name = 'Pong-v0'
    agent = A3CAgent(env_name)
    #agent.run() # use as A2C
    agent.train(n_threads=5) # use as A3C
    #agent.test('Pong-v0_A3C_2.5e-05_Actor', 'Pong-v0_A3C_2.5e-05_Critic')

So this was quite long tutorial, and here is full completed code:


Same as before you probably wanna see what were my training results with Pong game, right? Actually it was the same, but instead of training agent for few days I was able to train it in a half of day with 5 workers. With asynchronous method we are able to train agents few times faster. It's easier for us to test different model hyper-parameters or even to see if our agent is improving. So, these are the results of my A3C agent playing PongDeterministic-v4:

PongDeterministic-v4_A3C_2.5e-05.png

As you can see our agent in PongDeterministic-v4 environment achieved best average score in around 500th step, comparing A3C with A2C results are quite similar, difference is time, to train agent for 1000 steps took only few hours. Similar win is with Pong-v0 environment, because I won advantage in time, I thought that I will try to train agent for 20000 steps, and will see if it improves, here is the training curve:

Pong-v0_A3C_2.5e-05.png

So we can see, that after 10k steps our agent was still improving, so if we would train it further probably it will improve slightly more, but slower and slower. So we need to change something to achieve results faster.

Conclusion:

I hope this tutorial has been helpful to those who are new to Asynchronous Reinforcement learning! Now you can build almost any reinforcement agent which could be trained in parallel. So there is one more tutorial coming, one of the most popular Proximal Policy Optimization (PPO) algorithm, but I will do it same way - in parallel. I hope you understood how A3C works, because I will use it as a backbone. See you in a next tutorial. Code uploaded on: GitHub.