Bonus Tutorial: Function approximation¶

Week 3, Day 4: Basic Reinforcement Learning (RL)

Content creators: Marcelo G Mattar, Eric DeWitt, Matt Krause, Matthew Sargent, Anoop Kulkarni, Sowmya Parthiban, Feryal Behbahani, Jane Wang

Content reviewers: Ella Batty, Byron Galbraith, Michael Waskom, Ezekiel Williams, Mehul Rastogi, Lily Cheng, Roberto Guidotti, Arush Tagade, Kelson Shilling-Scrivo

Production editors: Gagana B, Spiros Chavlis

Tutorial Objectives¶

This whole notebook contains bonus material to get a better sense of more complex reinforcement learning models as well as real world applications of RL. Previously, we implemented fundemental ideas of RL in basic Python. Here, we will show how these can be implemented using the Acme library by DeepMind. For the project on GitHub see here.

By the end of the tutorial, you should be able to:

1. Implement, train, and test a NFQ Agent

2. Use a Deep Q-network

3. Learn the Policy Gradient and the Actor Critic

4. Use RL in real-world applications

Setup¶

Run the following Setup cells in order to set up needed functions. Don’t worry about the code for now!

Note: There is an issue with some images not showing up if you’re using a Safari browser. Please switch to Chrome if this is the case.

Kaggle users: You need to downgrade acme and use acme v. 0.2. So, in the cell below, comment out the three last lines and add:

!pip install dm-acme==0.2 --quiet

Install requirements¶

# @title Install requirements

# @markdown We install the acme library, see [here](https://github.com/deepmind/acme) for more info.

# @markdown **WARNING:** There may be *errors* and/or *warnings* reported during the installation. However, they should be ignored.

!pip install imageio --quiet
!pip install imageio-ffmpeg --quiet
!pip install gym --quiet
!pip install enum34 --quiet
!pip install pandas --quiet
!pip install grpcio==1.34.0 --quiet
!pip install typing --quiet
!pip install einops --quiet
!pip install dm-acme[reverb] --quiet
!pip install dm-acme[jax,tensorflow] --quiet
!pip install dm-acme[envs] --quiet

WARNING: dm-acme 0.4.0 does not provide the extra 'reverb'


WARNING: dm-acme 0.4.0 does not provide the extra 'tensorflow'


# Import modules
import gym
import enum
import copy
import time
import acme
import torch
import base64
import dm_env
import IPython
import imageio
import warnings
import itertools
import collections

import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt

from typing import Sequence

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical

from acme import specs
from acme import wrappers
from acme.utils import tree_utils
from acme.utils import loggers

warnings.filterwarnings('ignore')
np.set_printoptions(precision=3, suppress=1)


Figure settings¶

# @title Figure settings
import ipywidgets as widgets  # Interactive display
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
mpl.rc('image', cmap='Blues')


Helper Functions¶

Implement helpers for value visualisation

# @title Helper Functions
# @markdown Implement helpers for value visualisation

map_from_action_to_subplot = lambda a: (2, 6, 8, 4)[a]
map_from_action_to_name = lambda a: ("up", "right", "down", "left")[a]

def plot_values(values, colormap='pink', vmin=-1, vmax=10):
"""
Plots incoming values

Args:
values: List
List of values to be plotted
colormap: String
Defines colormap of plot
vmin: Integer
Smallest possible value within "values"
vmax: Integer
Highest possible value within "values"

Returns:
Nothing
"""
plt.imshow(values, interpolation="nearest",
cmap=colormap, vmin=vmin, vmax=vmax)
plt.yticks([])
plt.xticks([])
plt.colorbar(ticks=[vmin, vmax])

def plot_state_value(state_values, epsilon=0.1):
"""
Helper function to plot state value

Args:
state_values: np.ndarray
Action values with shape (9, 10, 4)
epsilon: Float
Sets the exploitation-exploration control hyperparameter [default=0.1]

Returns:
Nothing
"""
q = state_values
fig = plt.figure(figsize=(4, 4))
vmin = np.min(state_values)
vmax = np.max(state_values)
v = (1 - epsilon) * np.max(q, axis=-1) + epsilon * np.mean(q, axis=-1)
plot_values(v, colormap='summer', vmin=vmin, vmax=vmax)
plt.title("$v(s)$")

def plot_action_values(action_values, epsilon=0.1):
"""
Helper function to plot action value

Args:
action_values: np.ndarray
Action values with shape (9, 10, 4)
epsilon: Float
Sets the exploitation-exploration control hyperparameter [default=0.1]
Returns:
Nothing
"""
q = action_values
fig = plt.figure(figsize=(8, 8))
vmin = np.min(action_values)
vmax = np.max(action_values)
dif = vmax - vmin
for a in [0, 1, 2, 3]:
plt.subplot(3, 3, map_from_action_to_subplot(a))

plot_values(q[..., a], vmin=vmin - 0.05*dif, vmax=vmax + 0.05*dif)
action_name = map_from_action_to_name(a)
plt.title(r"$q(s, \mathrm{" + action_name + r"})$")

plt.subplot(3, 3, 5)
v = (1 - epsilon) * np.max(q, axis=-1) + epsilon * np.mean(q, axis=-1)
plot_values(v, colormap='summer', vmin=vmin, vmax=vmax)
plt.title("$v(s)$")


Set random seed¶

Executing set_seed(seed=seed) you are setting the seed

# @title Set random seed

# @markdown Executing set_seed(seed=seed) you are setting the seed

# For DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.

# Call set_seed function in the exercises to ensure reproducibility.
import random
import torch

def set_seed(seed=None, seed_torch=True):
"""
Function that controls randomness. NumPy and random modules must be imported.

Args:
seed : Integer
A non-negative integer that defines the random state. Default is None.
seed_torch : Boolean
If True sets the random seed for pytorch tensors, so pytorch module
must be imported. Default is True.

Returns:
Nothing.
"""
if seed is None:
seed = np.random.choice(2 ** 32)
random.seed(seed)
np.random.seed(seed)
if seed_torch:
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

print(f'Random seed {seed} has been set.')

# In case that DataLoader is used
def seed_worker(worker_id):
"""
DataLoader will reseed workers following randomness in

Args:
worker_id: integer
ID of subprocess to seed. 0 means that
the data will be loaded in the main process

Returns:
Nothing
"""
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
random.seed(worker_seed)


Set device (GPU or CPU). Execute set_device()¶

# @title Set device (GPU or CPU). Execute set_device()
# especially if torch modules used.

# Inform the user if the notebook uses GPU or CPU.

def set_device():
"""
Set the device. CUDA if available, CPU otherwise

Args:
None

Returns:
Nothing
"""
device = "cuda" if torch.cuda.is_available() else "cpu"
if device != "cuda":
print("WARNING: For this notebook to perform best, "
"if possible, in the menu under Runtime -> "
"Change runtime type.  select GPU ")
else:
print("GPU is enabled in this notebook.")

return device

SEED = 2021
set_seed(seed=SEED)
DEVICE = set_device()

Random seed 2021 has been set.
WARNING: For this notebook to perform best, if possible, in the menu under Runtime -> Change runtime type.  select GPU


Acme: a research framework for reinforcement learning¶

Acme is a library of reinforcement learning (RL) agents and agent building blocks by Google DeepMind. Acme strives to expose simple, efficient, and readable agents, that serve both as reference implementations of popular algorithms and as strong baselines, while still providing enough flexibility to do novel research. The design of Acme also attempts to provide multiple points of entry to the RL problem at differing levels of complexity.

For this practical session, we will focus on a simple grid world environment, which consists of a 9 x 10 grid of wall and empty cells - depicted in black and white, respectively. The smiling agent starts from an initial location and needs to navigate to reach the goal square.

Below you will find an implementation of this Gridworld as a dm_env.Environment.

There is no coding in this section, but if you want, you can look over the provided code so that you can familiarize yourself with an example of how to set up a grid world environment.

Implement GridWorld¶

Double-click to inspect the contents of this cell.¶

# @title Implement GridWorld

# @markdown ##### *Double-click* to inspect the contents of this cell.

class ObservationType(enum.IntEnum):
"""
Class to examine observation type including goal position and state index;
Attributes:
observation_type: Enum
Enum observation type to use. One of:
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)
"""
STATE_INDEX = enum.auto()
AGENT_ONEHOT = enum.auto()
GRID = enum.auto()
AGENT_GOAL_POS = enum.auto()

class GridWorld(dm_env.Environment):
"""
Build a grid environment.
Simple gridworld defined by a map layout, a start and a goal state.
Layout should be a NxN grid, containing:
* 0: Empty
* -1: Wall
* Any other positive value: value indicates reward;
Episode will terminate
"""

def __init__(self,
layout,
start_state,
goal_state=None,
observation_type=ObservationType.STATE_INDEX,
discount=0.9,
penalty_for_walls=-5,
reward_goal=10,
max_episode_length=None,
randomize_goals=False):
"""
Initiates grid environment

Args:
layout: List
NxN array of numbers, indicating the layout of the environment.
start_state: Tuple
Tuple (y, x) of starting location.
goal_state: Tuple
Optional tuple (y, x) of goal location. Will be randomly
sampled once if None.
observation_type: Enum
Enum observation type to use.
discount: Float
Discounting factor included in all Timesteps.
penalty_for_walls: Integer
Reward added when hitting a wall (should be negative).
reward_goal: Integer
Reward added when finding the goal (should be positive).
max_episode_length: Integer
If set, will terminate an episode after this many steps.
randomize_goals: Boolean
If true, randomize goal at every episode.

Returns:
None
"""
if observation_type not in ObservationType:
raise ValueError('observation_type should be a ObservationType instace.')
self._layout = np.array(layout)
self._start_state = start_state
self._state = self._start_state
self._number_of_states = np.prod(np.shape(self._layout))
self._discount = discount
self._penalty_for_walls = penalty_for_walls
self._reward_goal = reward_goal
self._observation_type = observation_type
self._layout_dims = self._layout.shape
self._max_episode_length = max_episode_length
self._num_episode_steps = 0
self._randomize_goals = randomize_goals
if goal_state is None:
# Randomly sample goal_state if not provided
goal_state = self._sample_goal()
self.goal_state = goal_state

def _sample_goal(self):
"""
Randomly sample reachable non-starting state.

Args:
None

Returns:
Nothing
"""
# Sample a new goal
n = 0
max_tries = 1e5
while n < max_tries:
goal_state = tuple(np.random.randint(d) for d in self._layout_dims)
if goal_state != self._state and self._layout[goal_state] == 0:
# Reachable state found!
return goal_state
n += 1
raise ValueError('Failed to sample a goal state.')

@property
def layout(self):
return self._layout

@property
def number_of_states(self):
return self._number_of_states

@property
def goal_state(self):
return self._goal_state

@property
def start_state(self):
return self._start_state

@property
def state(self):
return self._state

def set_state(self, x, y):
self._state = (y, x)

def action_spec(self):
return specs.DiscreteArray(4, dtype=int, name='action')

@goal_state.setter
def goal_state(self, new_goal):
if new_goal == self._state or self._layout[new_goal] < 0:
raise ValueError('This is not a valid goal!')
# Zero out any other goal
self._layout[self._layout > 0] = 0
# Setup new goal location
self._layout[new_goal] = self._reward_goal
self._goal_state = new_goal

def plot_greedy_policy(self, q):
greedy_actions = np.argmax(q, axis=2)
self.plot_policy(greedy_actions)

def observation_spec(self):
"""
Function to return the spec-list based on observation type

Args:
None

Returns:
Specification-list based on observation type
"""
if self._observation_type is ObservationType.AGENT_ONEHOT:
return specs.Array(
shape=self._layout_dims,
dtype=np.float32,
name='observation_agent_onehot')
elif self._observation_type is ObservationType.GRID:
return specs.Array(
shape=self._layout_dims + (3,),
dtype=np.float32,
name='observation_grid')
elif self._observation_type is ObservationType.AGENT_GOAL_POS:
return specs.Array(
shape=(4, ), dtype=np.float32, name='observation_agent_goal_pos')
elif self._observation_type is ObservationType.STATE_INDEX:
return specs.DiscreteArray(
self._number_of_states, dtype=int, name='observation_state_index')

def get_obs(self):
"""
Returns observation initiating agent state, position, goal state

Args:
None

Returns:
Observation
"""
if self._observation_type is ObservationType.AGENT_ONEHOT:
obs = np.zeros(self._layout.shape, dtype=np.float32)
# Place agent
obs[self._state] = 1
return obs
elif self._observation_type is ObservationType.GRID:
obs = np.zeros(self._layout.shape + (3,), dtype=np.float32)
obs[..., 0] = self._layout < 0
obs[self._state[0], self._state[1], 1] = 1
obs[self._goal_state[0], self._goal_state[1], 2] = 1
return obs
elif self._observation_type is ObservationType.AGENT_GOAL_POS:
return np.array(self._state + self._goal_state, dtype=np.float32)
elif self._observation_type is ObservationType.STATE_INDEX:
y, x = self._state
return y * self._layout.shape[1] + x

def reset(self):
"""
Helper function to reset GridWorld

Args:
None

Returns:
Reset environment
"""
self._state = self._start_state
self._num_episode_steps = 0
if self._randomize_goals:
self.goal_state = self._sample_goal()
return dm_env.TimeStep(
step_type=dm_env.StepType.FIRST,
reward=None,
discount=None,
observation=self.get_obs())

def step(self, action):
"""
Helper function to process current position and
optimize future steps towards goal

Args:
action: Integer
if 0, move up; if 1, move right; if 2, more down and if 3, move left

Returns:
Observation from new position;
"""
y, x = self._state

if action == 0:  # Up
new_state = (y - 1, x)
elif action == 1:  # Right
new_state = (y, x + 1)
elif action == 2:  # Down
new_state = (y + 1, x)
elif action == 3:  # Left
new_state = (y, x - 1)
else:
raise ValueError(
'Invalid action: {} is not 0, 1, 2, or 3.'.format(action))

new_y, new_x = new_state
step_type = dm_env.StepType.MID
if self._layout[new_y, new_x] == -1:  # Wall
reward = self._penalty_for_walls
discount = self._discount
new_state = (y, x)
elif self._layout[new_y, new_x] == 0:  # Empty cell
reward = 0.
discount = self._discount
else:  # Goal
reward = self._layout[new_y, new_x]
discount = 0.
new_state = self._start_state
step_type = dm_env.StepType.LAST

self._state = new_state
self._num_episode_steps += 1
if (self._max_episode_length is not None and
self._num_episode_steps >= self._max_episode_length):
step_type = dm_env.StepType.LAST
return dm_env.TimeStep(
step_type=step_type,
reward=np.float32(reward),
discount=discount,
observation=self.get_obs())

"""
Helper function to plot GridWorld

Args:

Returns:
Nothing
"""
plt.figure(figsize=(4, 4))
plt.imshow(self._layout <= -1, interpolation='nearest')
ax = plt.gca()
ax.grid(0)
plt.xticks([])
plt.yticks([])
plt.text(
self._start_state[1],
self._start_state[0],
r'$\mathbf{S}$',
fontsize=16,
ha='center',
va='center')
plt.text(
self._goal_state[1],
self._goal_state[0],
r'$\mathbf{G}$',
fontsize=16,
ha='center',
va='center')
h, w = self._layout.shape
for y in range(h - 1):
plt.plot([-0.5, w - 0.5], [y + 0.5, y + 0.5], '-w', lw=2)
for x in range(w - 1):
plt.plot([x + 0.5, x + 0.5], [-0.5, h - 0.5], '-w', lw=2)

def plot_state(self, return_rgb=False):
"""
Helper function to plot agent state

Args:
return_rgb: Boolean
if True, process GridWorld with number-of-channels = 3

Returns:
data: np.ndarray
Array of size (h, w, 3) describing environment
"""
plt.text(
self._state[1],
self._state[0],
r'$\mathbf{A}$',
fontsize=18,
ha='center',
va='center',
)
if return_rgb:
fig = plt.gcf()
plt.axis('tight')
plt.subplots_adjust(0, 0, 1, 1, 0, 0)
fig.canvas.draw()
data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
w, h = fig.canvas.get_width_height()
data = data.reshape((h, w, 3))
plt.close(fig)
return data

def plot_policy(self, policy):
"""
Helper function to visualize the policy;

Args:
Describes the principles that govern agent movement

Returns:
Nothing
"""
action_names = [
r'$\uparrow$', r'$\rightarrow$', r'$\downarrow$', r'$\leftarrow$'
]
self.plot_grid()
plt.title('Policy Visualization')
h, w = self._layout.shape
for y in range(h):
for x in range(w):
# if ((y, x) != self._start_state) and ((y, x) != self._goal_state):
if (y, x) != self._goal_state:
action_name = action_names[policy[y, x]]
plt.text(x, y, action_name, ha='center', va='center')

discount=0.9,
penalty_for_walls=-5,
observation_type=ObservationType.STATE_INDEX,
max_episode_length=200):
"""
Construct a particular Gridworld layout with start/goal states.

Args:
String name of the task to use. One of {'simple', 'obstacle',
'random_goal'}.
discount: Float
Discounting factor included in all Timesteps.
penalty_for_walls: Integer
Reward added when hitting a wall (should be negative).
observation_type: Enum O
bservation type to use. One of:
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x).
max_episode_length: Integer
If set, will terminate an episode after this many steps.

Returns:
Nothing
"""
'simple': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
'goal_state': (7, 2)
},
'obstacle': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, -1, 0, 0, -1],
[-1, 0, 0, 0, -1, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
'goal_state': (2, 8)
},
'random_goal': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
# 'randomize_goals': True
},
}
return GridWorld(
discount=discount,
penalty_for_walls=penalty_for_walls,
observation_type=observation_type,
max_episode_length=max_episode_length,

def setup_environment(environment):
"""
Returns the environment and its spec.

Args:
environment: acme.wrappers.single_precision.SinglePrecisionWrapper instance
Wrapped environment into single-precision floats [avoids floating point errors]

Returns:
environment: acme.wrappers.single_precision.SinglePrecisionWrapper instance
Wrapped environment into single-precision floats [avoids floating point errors]
environment_spec: acme.specs.EnvironmentSpec(dm_env.specs.Array instance, dm_env.specs.DiscreteArray instance, dm_env.specs.Array instance, dm_env.specs.BoundedArray instance)
Descibes specification of the GridWorld Environment
"""

# Make sure the environment outputs single-precision floats.
environment = wrappers.SinglePrecisionWrapper(environment)

# Grab the spec of the environment.
environment_spec = specs.make_environment_spec(environment)

return environment, environment_spec


We will use two distinct tabular GridWorlds:

• simple where the goal is at the bottom left of the grid, little navigation required.

• obstacle where the goal is behind an obstacle the agent must avoid.

You can visualize the grid worlds by running the cell below.

Note that S indicates the start state and G indicates the goal.

Visualise GridWorlds¶

# @title Visualise GridWorlds

# Instantiate two tabular environments, a simple task, and one that involves
# the avoidance of an obstacle.

# Plot them.
simple_grid.plot_grid()
plt.title('Simple')

obstacle_grid.plot_grid()
plt.title('Obstacle')
plt.show()


In this environment, the agent has four possible actions: up, right, down, and left. The reward is -5 for bumping into a wall, +10 for reaching the goal, and 0 otherwise. The episode ends when the agent reaches the goal, and otherwise continues. The discount on continuing steps, is $$\gamma = 0.9$$.

Before we start building an agent to interact with this environment, let’s first look at the types of objects the environment either returns (e.g., observations) or consumes (e.g., actions). The environment_spec will show you the form of the observations, rewards and discounts that the environment exposes and the form of the actions that can be taken.

Look at environment_spec¶

Note:setup_environment is implemented in the same cell as GridWorld.¶

# @title Look at environment_spec

# @markdown ##### **Note:** setup_environment is implemented in the same cell as GridWorld.
environment, environment_spec = setup_environment(simple_grid)

print('actions:\n', environment_spec.actions, '\n')
print('observations:\n', environment_spec.observations, '\n')
print('rewards:\n', environment_spec.rewards, '\n')
print('discounts:\n', environment_spec.discounts, '\n')

actions:
DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)

observations:
Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')

rewards:
Array(shape=(), dtype=dtype('float32'), name='reward')

discounts:
BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)


We first set the environment to its initial state by calling the reset() method which returns the first observation and resets the agent to the starting location.

environment.reset()
environment.plot_state()


Note that A indicates the agent’s location.

Now we want to take an action to interact with the environment. We do this by passing a valid action to the dm_env.Environment.step() method which returns a dm_env.TimeStep namedtuple with fields (step_type, reward, discount, observation).

Let’s take an action and visualise the resulting state of the grid-world. (You’ll need to rerun the cell if you pick a new action.)

Note for kaggle users: As Kaggle does not render the forms automatically the students should be careful to notice the various instructions and manually play around with the values for the variables

Pick an action and see the state changing¶

# @title Pick an action and see the state changing
action = "down" #@param ["up", "right", "down", "left"] {type:"string"}

action_int = {'up': 0,
'right': 1,
'down': 2,
'left':3 }
action = int(action_int[action])
timestep = environment.step(action)  # pytype: dm_env.TimeStep
environment.plot_state()


Run loop¶

# @title Run loop

# @markdown ##### This function runs an agent in the environment for a number of episodes, allowing it to learn.

# @markdown ##### *Double-click* to inspect the run_loop function.

def run_loop(environment,
agent,
num_episodes=None,
num_steps=None,
logger_time_delta=1.,
label='training_loop',
log_loss=False,
):
"""
Perform the Acme run loop.
Run the environment loop for num_episodes episodes. Each episode is itself
a loop which interacts first with the environment to get an observation and
then give that observation to the agent in order to retrieve an action. Upon
termination of an episode a new episode will be started. If the number of
episodes is not given then this will interact with the environment
infinitely.

Args:
environment: dm_env
Used to generate trajectories.
agent: acme.Actor
For selecting actions in the run loop.
num_steps: Integer
Number of steps to run the loop for. If None (default), runs
without limit.
num_episodes: Integer
Nmber of episodes to run the loop for. If None (default),
runs without limit.
logger_time_delta: Float
Time interval (in seconds) between consecutive logging steps.
label: String
Optional label used at logging steps.
log_loss: Boolean
If true, log_loss function is used to compute loss
Else, use raw loss function

Returns:
all_returns: List
Log of return per episode
"""
logger = loggers.TerminalLogger(label=label, time_delta=logger_time_delta)
iterator = range(num_episodes) if num_episodes else itertools.count()
all_returns = []

num_total_steps = 0
for episode in iterator:
# Reset any counts and start the environment.
start_time = time.time()
episode_steps = 0
episode_return = 0
episode_loss = 0

timestep = environment.reset()

# Make the first observation.
agent.observe_first(timestep)

# Run an episode.
while not timestep.last():
# Generate an action from the agent's policy and step the environment.
action = agent.select_action(timestep.observation)
timestep = environment.step(action)

# Have the agent observe the timestep and let the agent update itself.
agent.observe(action, next_timestep=timestep)
agent.update()

# Book-keeping.
episode_steps += 1
num_total_steps += 1
episode_return += timestep.reward

if log_loss:
episode_loss += agent.last_loss

if num_steps is not None and num_total_steps >= num_steps:
break

# Collect the results and combine with counts.
steps_per_second = episode_steps / (time.time() - start_time)
result = {
'episode': episode,
'episode_length': episode_steps,
'episode_return': episode_return,
}
if log_loss:
result['loss_avg'] = episode_loss/episode_steps

all_returns.append(episode_return)

# Log the given results.
logger.write(result)

if num_steps is not None and num_total_steps >= num_steps:
break
return all_returns


Implement the evaluation loop¶

# @title Implement the evaluation loop

# @markdown ##### This function runs the agent in the environment for a number of episodes, without allowing it to learn, in order to evaluate it.

# @markdown ##### *Double-click* to inspect the evaluate function.

def evaluate(environment: dm_env.Environment,
agent: acme.Actor,
evaluation_episodes: int):
"""
Helper function to run evaluation loop

Args:
environment: dm_env
Used to generate trajectories.
agent: acme.Actor
For selecting actions in the run loop.
evaluation_episodes: Integer
Number of episodes for which evaluation loop is to be run for.

Returns:
frames: List
Log of environment state for each time step.
"""
frames = []

for episode in range(evaluation_episodes):
timestep = environment.reset()
episode_return = 0
steps = 0
while not timestep.last():
frames.append(environment.plot_state(return_rgb=True))

action = agent.select_action(timestep.observation)
timestep = environment.step(action)
steps += 1
episode_return += timestep.reward
print(
f'Episode {episode} ended with reward {episode_return} in {steps} steps'
)
return frames

def display_video(frames: Sequence[np.ndarray],
filename: str = 'temp.mp4',
frame_rate: int = 12):
"""
Save and render video.

Args:
frames: Sequence[np.ndarray]
Log of environment state for each time step.
filename: String
Name for the video file generated
frame_rate: Integer
Specifies frequency at which frames are displayed.

Returns:
IPython.display.HTML(video_tag)
"""
# Write the frames to a video.
with imageio.get_writer(filename, fps=frame_rate) as video:
for frame in frames:
video.append_data(frame)

# Read video and display the video.
b64_video = base64.b64encode(video)
video_tag = ('<video  width="320" height="240" controls alt="test" '
'src="data:video/mp4;base64,{0}">').format(b64_video.decode())
return IPython.display.HTML(video_tag)


Section 1: Function Approximation¶

Time estimate: ~25mins

Video 1: Function approximation¶

So far we only considered look-up tables for value-functions. In all previous cases every state and action pair $$(\color{red}{s}, \color{blue}{a})$$, had an entry in our $$\color{green}Q$$-table. Again, this is possible in this environment as the number of states is equal to the number of cells in the grid. But this is not scalable to situations where, say, the goal location changes or the obstacles are in different locations at every episode (consider how big the table could be in this situation?).

An example (not covered in this tutorial) is ATARI from pixels, where the number of possible frames an agent can see is exponential in the number of pixels on the screen.

But what we really want is just to be able to compute the Q-value, when fed with a particular $$(\color{red}{s}, \color{blue}{a})$$ pair. So if we had a way to get a function to do this work instead of keeping a big table, we’d get around this problem.

To address this, we can use function approximation as a way to generalize Q-values over some representation of the very large state space, and train them to output the values they should. In this section, we will explore $$\color{green}Q$$-learning with function approximation, which (although it has been theoretically proven to diverge for some degenerate MDPs) can yield impressive results in very large environments. In particular, we will look at Neural Fitted Q (NFQ) Iteration and Deep Q-Networks (DQN).

Section 1.1 Replay Buffers¶

An important property of off-policy methods like $$\color{green}Q$$-learning is that they involve two policies: one for exploration and one that is being optimized (via the $$\color{green}Q$$-function updates). This means that we can generate data from the behavior policy and insert that data into some form of data storage—usually referred to as replay.

In order to optimize the $$\color{green}Q$$-function we can then sample data from the replay dataset and use that data to perform an update. An illustration of this learning loop is shown below.

In the next cell we will show how to implement a simple replay buffer. This can be as simple as a python list containing transition data. In more complicated scenarios we might want to have a more performance-tuned variant, we might have to be more concerned about how large replay is and what to do when its full, and we might want to sample from replay in different ways. But a simple python list can go a surprisingly long way.

# Simple replay buffer

# Create a convenient container for the SARSA tuples required by deep RL agents.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])

class ReplayBuffer(object):
"""
A simple Python Replay Buffer.
Queue based implementation.
"""

def __init__(self, capacity: int = None):
self.buffer = collections.deque(maxlen=capacity)
self._prev_state = None

self._prev_state = initial_timestep.observation

def add(self, action: int, timestep: dm_env.TimeStep):
transition = Transitions(
state=self._prev_state,
action=action,
reward=timestep.reward,
discount=timestep.discount,
next_state=timestep.observation,
)
self.buffer.append(transition)
self._prev_state = timestep.observation

def sample(self, batch_size: int) -> Transitions:
# Sample a random batch of Transitions as a list.
batch_as_list = random.sample(self.buffer, batch_size)

# Convert the list of batch_size Transitions into a single Transitions
# object where each field has batch_size stacked fields.
return tree_utils.stack_sequence_fields(batch_as_list)

def flush(self) -> Transitions:
entire_buffer = tree_utils.stack_sequence_fields(self.buffer)
self.buffer.clear()
return entire_buffer

def is_ready(self, batch_size: int) -> bool:
return batch_size <= len(self.buffer)


Section 1.2: NFQ Agent¶

Neural Fitted Q Iteration was one of the first papers to demonstrate how to leverage recent advances in Deep Learning to approximate the Q-value by a neural network$$^\dagger$$. In other words, the value $$\color{green}Q(\color{red}{s}, \color{blue}{a})$$ are approximated by the output of a neural network $$\color{green}{Q_w}(\color{red}{s}, \color{blue}{a})$$ for each possible action $$\color{blue}{a} \in \color{blue}{\mathcal{A}}$$$$^\ddagger$$.

When introducing function approximations, and neural networks in particular, we need to have a loss to optimize. But looking back at the tabular setting above, you can see that we already have some notion of error: the TD error.

By training our neural network to output values such that the TD error is minimized, we will also satisfy the Bellman Optimality Equation, which is a good sufficient condition to enforce, to obtain an optimal policy. Thanks to automatic differentiation, we can just write the TD error as a loss, e.g., with an $$\ell^2$$ loss, but others would work too:

(122)$$$L(\color{green}w) = \mathbb{E}\left[ \left( \color{green}{r} + \gamma \max_\color{blue}{a'} \color{green}{Q_w}(\color{red}{s'}, \color{blue}{a'}) \hat{\alpha}' \color{green}{Q_w}(\color{red}{s}, \color{blue}{a}) \right)^2\right].$$$

Then we can compute the gradient with respect to the parameters of the neural network and improve our Q-value approximation incrementally.

NFQ builds on $$\color{green}Q$$-learning, but if one were to update the Q-values online directly, the training can be unstable and very slow. Instead, NFQ uses a replay buffer, similar to what we see implemented above, to update the Q-value in a batched setting.

When it was introduced, it also was entirely off-policy using a uniformly random policy to collect data, which was prone to instability when applied to more complex environments (e.g., when the input are pixels or the tasks are longer and more complicated).

But it is a good stepping stone to the more complex agents used today. Here, we will look at a slightly different and modernised implementation of NFQ.

Below you will find an incomplete NFQ agent that takes in observations from a gridworld. Instead of receiving a tabular state, it receives an observation in the form of its $$(x,y)$$ coordinates in the gridworld, and the $$(x,y)$$ coordinates of the goal.

The goal of this coding exercise is to complete this agent by implementing the loss, using mean squared error.

$$^\dagger$$ If you read the NFQ paper, they use a “control” notation, where there is a “cost to minimize”, instead of “rewards to maximize”, so don’t be surprised if signs/max/min do not correspond.

$$^\ddagger$$ We could feed it $$\color{blue}{a}$$ as well and ask $$Q_w$$ for a single scalar value, but given we have a fixed number of actions and we usually need to take an $$argmax$$ over them, it’s easiest to just output them all in one pass.

Coding Exercise 1.1: Implement NFQ¶

# Create a convenient container for the SARS tuples required by NFQ.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])

class NeuralFittedQAgent(acme.Actor):
"""
Implementation of a Neural Fitted Agent
"""

def __init__(self,
environment_spec: specs.EnvironmentSpec,
q_network: nn.Module,
replay_capacity: int = 100_000,
epsilon: float = 0.1,
batch_size: int = 1,
learning_rate: float = 3e-4):
"""
Neural Fitted Agent Initialisation

Args:
environment_spec: specs.EnvironmentSpec
* actions: DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)
* observations: Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')
* rewards: Array(shape=(), dtype=dtype('float32'), name='reward')
* discounts: BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
q_network: nn.Module,
Q Network
replay_capacity: int,
Capacity of the replay buffer [default: 100000]
epsilon: float
batch_size: int
Batch Size [default = 1]
learning_rate: float
Rate at which the neural fitted agent learns [default = 3e-4]

Returns:
Nothing
"""
# Store agent hyperparameters and network.
self._num_actions = environment_spec.actions.num_values
self._epsilon = epsilon
self._batch_size = batch_size
self._q_network = q_network

# Container for the computed loss (see run_loop implementation above).
self.last_loss = 0.0

# Create the replay buffer.
self._replay_buffer = ReplayBuffer(replay_capacity)

# Setup optimizer that will train the network to minimize the loss.
self._loss_fn = nn.MSELoss()

def select_action(self, observation):
"""
Chooses epsilon-greedy action

Args:
observation: enum
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)

Returns:
action: Integer
Chosen action based on epsilon-greedy policy
"""
# Compute Q-values.
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))  # Adds batch dimension.
q_values = q_values.squeeze(0)  # Removes batch dimension

# Select epsilon-greedy action.
if self._epsilon < torch.rand(1):
action = q_values.argmax(axis=-1)
else:
action = torch.randint(low=0, high=self._num_actions, size=(1, ), dtype=torch.int64).squeeze()
return action

def q_values(self, observation):
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))
return q_values.squeeze(0).detach()

def update(self):
"""

Args:
None

Returns:
Nothing
"""
# If the replay buffer is not ready to sample from, do nothing.
return

# Sample a minibatch of transitions from experience replay.
transitions = self._replay_buffer.sample(self._batch_size)

# Note: each of these tensors will be of shape [batch_size, ...].
s = torch.tensor(transitions.state)
a = torch.tensor(transitions.action)
r = torch.tensor(transitions.reward)
d = torch.tensor(transitions.discount)
next_s = torch.tensor(transitions.next_state)

# Compute the Q-values at next states in the transitions.
q_next_s = self._q_network(next_s)  # Shape [batch_size, num_actions].
max_q_next_s = q_next_s.max(axis=-1)[0]
# Compute the TD error and then the losses.
target_q_value = r + d * max_q_next_s

# Compute the Q-values at original state.
q_s = self._q_network(s)

# Gather the Q-value corresponding to each action in the batch.
q_s_a = q_s.gather(1, a.view(-1, 1)).squeeze(0)
#################################################
# Fill in missing code below (...),
# then remove or comment the line below to test your implementation
raise NotImplementedError("Student exercise: complete the NFQ Agent")
#################################################
# TODO Average the squared TD errors over the entire batch using
# self._loss_fn, which is defined above as nn.MSELoss()
# HINT: Take a look at the reference for nn.MSELoss here:
#  https://pytorch.org/docs/stable/generated/torch.nn.MSELoss.html
#  What should you put for the input and the target?
loss = ...

# Compute the gradients of the loss with respect to the q_network variables.

loss.backward()
self._optimizer.step()

# Store the loss for logging purposes (see run_loop implementation above).
self.last_loss = loss.detach().numpy()

def observe_first(self, timestep: dm_env.TimeStep):

def observe(self, action: int, next_timestep: dm_env.TimeStep):


Click for solution

Train and Evaluate the NFQ Agent¶

Training the NFQ Agent¶

# @title Training the NFQ Agent
epsilon = 0.6 # @param {type:"number"}

max_episode_length = 200

# Create the environment.
observation_type=ObservationType.AGENT_GOAL_POS,
max_episode_length=max_episode_length)
environment, environment_spec = setup_environment(grid)

# Define the neural function approximator (aka Q network).
q_network = nn.Sequential(nn.Linear(4, 50),
nn.ReLU(),
nn.Linear(50, 50),
nn.ReLU(),
nn.Linear(50, environment_spec.actions.num_values))

# Build the trainable Q-learning agent
agent = NeuralFittedQAgent(
environment_spec,
q_network,
epsilon=epsilon,
replay_capacity=100_000,
batch_size=10,
learning_rate=1e-3)

returns = run_loop(
environment=environment,
agent=agent,
num_episodes=500,
logger_time_delta=1.,
log_loss=True)


Evaluating the agent (set $$\epsilon=0$$)¶

Temporarily change epsilon to be more greedy; remember to change it back.¶
# @title Evaluating the agent (set $\epsilon=0$)
# @markdown ##### Temporarily change epsilon to be more greedy; remember to change it back.

agent._epsilon = 0.0

# Record a few episodes.
frames = evaluate(environment, agent, evaluation_episodes=5)

# Change epsilon back.
agent._epsilon = epsilon

# Display the video of the episodes.
display_video(frames, frame_rate=6)

Episode 0 ended with reward -990.0 in 200 steps

Episode 1 ended with reward -990.0 in 200 steps

Episode 2 ended with reward -990.0 in 200 steps

Episode 3 ended with reward -990.0 in 200 steps

Episode 4 ended with reward -990.0 in 200 steps


Visualise the learned $$Q$$ values¶

Evaluate the policy for every state, similar to tabular agents above.¶
# @title Visualise the learned $Q$ values

# @markdown ##### Evaluate the policy for every state, similar to tabular agents above.

environment.reset()
pi = np.zeros(grid._layout_dims, dtype=np.int32)
q = np.zeros(grid._layout_dims + (4, ))
for y in range(grid._layout_dims[0]):
for x in range(grid._layout_dims[1]):
# Hack observation to see what the Q-network would output at that point.
environment.set_state(x, y)
obs = environment.get_obs()
q[y, x] = np.asarray(agent.q_values(obs))
pi[y, x] = np.asarray(agent.select_action(obs))

plot_action_values(q)


Compare the Q-values approximated with the neural network with the tabular case. Notice how the neural network is generalizing from the visited states to the unvisited similar states, while in the tabular case we updated the value of each state only when we visited that state.

Compare the greedy and behaviour ($$\epsilon$$-greedy) policies¶

Compare the greedy policy with the agent’s policy¶

Notice that the agent’s behavior policy has a lot more randomness, due to the high $$\epsilon$$. However, the greedy policy that’s learned is optimal.¶
# @title Compare the greedy policy with the agent's policy

# @markdown ##### Notice that the agent's behavior policy has a lot more randomness, due to the high $\epsilon$. However, the greedy policy that's learned is optimal.

environment.plot_greedy_policy(q)
plt.figtext(-.08, .95, 'Greedy policy using the learnt Q-values')
plt.title('')
plt.show()

environment.plot_policy(pi)
plt.figtext(-.08, .95, "Policy using the agent's behavior policy")
plt.title('')
plt.show()


Section 2: Deep Q-Networks (DQN)¶

Time estimate: ~30mins

Video 2: Deep Q-Networks (DQN)¶

In this section, we will look at an advanced deep RL Agent based on the following publication, Playing Atari with Deep Reinforcement Learning, which introduced the first deep learning model to successfully learn control policies directly from high-dimensional pixel inputs using RL.

Here the agent will act directly on a pixel representation of the gridworld. You can find an incomplete implementation below.

Coding Exercise 2.1: Run a DQN Agent¶

class DQN(acme.Actor):
"""
Implementation of a Deep Q Network Agent
"""

def __init__(self,
environment_spec: specs.EnvironmentSpec,
network: nn.Module,
replay_capacity: int = 100_000,
epsilon: float = 0.1,
batch_size: int = 1,
learning_rate: float = 5e-4,
target_update_frequency: int = 10):
"""
DQN Based Agent Initialisation

Args:
environment_spec: specs.EnvironmentSpec
* actions: DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)
* observations: Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')
* rewards: Array(shape=(), dtype=dtype('float32'), name='reward')
* discounts: BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
network: nn.Module
Deep Q Network
replay_capacity: int
Capacity of the replay buffer [default: 100000]
epsilon: float
batch_size: int
Batch Size [default = 1]
learning_rate: float
Rate at which the neural fitted agent learns [default = 3e-4]
target_update_frequency: int
Frequency with which target network is updated

Returns:
Nothing
"""
# Store agent hyperparameters and network.
self._num_actions = environment_spec.actions.num_values
self._epsilon = epsilon
self._batch_size = batch_size
self._q_network = q_network

# Create a second q net with the same structure and initial values, which
# we'll be updating separately from the learned q-network.
self._target_network = copy.deepcopy(self._q_network)

# Container for the computed loss (see run_loop implementation above).
self.last_loss = 0.0

# Create the replay buffer.
self._replay_buffer = ReplayBuffer(replay_capacity)
# Keep an internal tracker of steps
self._current_step = 0

# How often to update the target network
self._target_update_frequency = target_update_frequency
# Setup optimizer that will train the network to minimize the loss.
self._loss_fn = nn.MSELoss()

def select_action(self, observation):
"""
Action Selection Algorithm

Args:
observation: enum
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)

Returns:
action: Integer
Chosen random action
"""
# Compute Q-values.
# Sonnet requires a batch dimension, which we squeeze out right after.
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))  # Adds batch dimension.
q_values = q_values.squeeze(0)  # Removes batch dimension

# Select epsilon-greedy action.
if self._epsilon < torch.rand(1):
action = q_values.argmax(axis=-1)
else:
action = torch.randint(low=0, high=self._num_actions , size=(1, ), dtype=torch.int64).squeeze()
return action

def q_values(self, observation):
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))
return q_values.squeeze(0).detach()

def update(self):
"""

Args:
None

Returns:
Nothing
"""
self._current_step += 1

# If the replay buffer is not ready to sample from, do nothing.
return

# Sample a minibatch of transitions from experience replay.
transitions = self._replay_buffer.sample(self._batch_size)

# Optionally unpack the transitions to lighten notation.
# Note: each of these tensors will be of shape [batch_size, ...].
s = torch.tensor(transitions.state)
a = torch.tensor(transitions.action)
r = torch.tensor(transitions.reward)
d = torch.tensor(transitions.discount)
next_s = torch.tensor(transitions.next_state)

# Compute the Q-values at next states in the transitions.
#################################################
# Fill in missing code below (...),
# then remove or comment the line below to test your implementation
raise NotImplementedError("Student exercise: complete the DQN Agent")
#################################################
#TODO get the value of the next states evaluated by the target network
# HINT: use self._target_network, defined above.
q_next_s = ...  # Shape [batch_size, num_actions].

max_q_next_s = q_next_s.max(axis=-1)[0]
# Compute the TD error and then the losses.
target_q_value = r + d * max_q_next_s

# Compute the Q-values at original state.
q_s = self._q_network(s)

# Gather the Q-value corresponding to each action in the batch.
q_s_a = q_s.gather(1, a.view(-1, 1)).squeeze(0)

# Average the squared TD errors over the entire batch
loss = self._loss_fn(target_q_value, q_s_a)

# Compute the gradients of the loss with respect to the q_network variables.

loss.backward()
self._optimizer.step()

if self._current_step % self._target_update_frequency == 0:
# Store the loss for logging purposes (see run_loop implementation above).
self.last_loss = loss.detach().numpy()

def observe_first(self, timestep: dm_env.TimeStep):

def observe(self, action: int, next_timestep: dm_env.TimeStep):

# Create a convenient container for the SARS tuples required by NFQ.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])


Click for solution

Train and evaluate the DQN agent¶

# @title Train and evaluate the DQN agent

epsilon = 0.25  # @param {type: "number"}
num_episodes = 500  # @param {type: "integer"}
max_episode_length = 50  # @param {type: "integer"}

observation_type=ObservationType.GRID,
max_episode_length=max_episode_length)
environment, environment_spec = setup_environment(grid)

class Permute(nn.Module):
"""
Build Agent's Network
"""
def __init__(self, order: list):
super(Permute,self).__init__()
self.order = order

def forward(self, x):
return x.permute(self.order)

q_network = nn.Sequential(Permute([0, 3, 1, 2]),
nn.Conv2d(3, 32, kernel_size=4,
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3,
nn.ReLU(),
nn.MaxPool2d(3, 1),
nn.Flatten(),
nn.Linear(384, 50),
nn.ReLU(),
nn.Linear(50, environment_spec.actions.num_values)
)

agent = DQN(
environment_spec=environment_spec,
network=q_network,
batch_size=10,
epsilon=epsilon,
target_update_frequency=25)

returns = run_loop(
environment=environment,
agent=agent,
num_episodes=num_episodes,
num_steps=100000)


Visualise the learned $$Q$$ values¶

Evaluate the policy for every state, similar to tabular agents above.¶
# @title Visualise the learned $Q$ values
# @markdown ##### Evaluate the policy for every state, similar to tabular agents above.

pi = np.zeros(grid._layout_dims, dtype=np.int32)
q = np.zeros(grid._layout_dims + (4,))
for y in range(grid._layout_dims[0]):
for x in range(grid._layout_dims[1]):
# Hack observation to see what the Q-network would output at that point.
environment.set_state(x, y)
obs = environment.get_obs()
q[y, x] = np.asarray(agent.q_values(obs))
pi[y, x] = np.asarray(agent.select_action(obs))

plot_action_values(q)


Compare the greedy policy with the agent’s policy¶

# @title Compare the greedy policy with the agent's policy

environment.plot_greedy_policy(q)
plt.figtext(-.08, .95, "Greedy policy using the learnt Q-values")
plt.title('')
plt.show()

environment.plot_policy(pi)
plt.figtext(-.08, .95, "Policy using the agent's epsilon-greedy policy")
plt.title('')
plt.show()


Note: You get a better estimate of the value functions if you increase num_episodes and max_episode_length, but this will take longer to train. Feel free to play around after the day!

Section 3: Learning the policy directly¶

Time estimate: ~25mins

Video 3: Other Deep RL Methods¶

Here we switch to training on a different kind of task, which has a continuous action space: Cartpole in Gym. As you recall from the video, policy-based methods are particularly well-suited for these kinds of tasks. We will be exploring two of those methods below.

Make a CartPole environment, gym.make('CartPole-v1')¶

# @title Make a CartPole environment, gym.make('CartPole-v1')
env = gym.make('CartPole-v1')

# Set seeds
env.seed(SEED)
set_seed(SEED)

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_47223/788376341.py in <module>
3
4 # Set seeds
----> 5 env.seed(SEED)
6 set_seed(SEED)

/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239         if name.startswith("_"):
240             raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241         return getattr(self.env, name)
242
243     @property

/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239         if name.startswith("_"):
240             raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241         return getattr(self.env, name)
242
243     @property

/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239         if name.startswith("_"):
240             raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241         return getattr(self.env, name)
242
243     @property

AttributeError: 'CartPoleEnv' object has no attribute 'seed'


Now we will turn to policy gradient methods. Rather than defining the policy in terms of a value function, i.e., $$\color{blue}\pi(\color{red}s) = \arg\max_{\color{blue}a}\color{green}Q(\color{red}s, \color{blue}a)$$, we will directly parameterize the policy and write it as the distribution

(123)$$$\color{blue}a_t \sim \color{blue}\pi_{\theta}(\color{blue}a_t|\color{red}s_t).$$$

Here $$\theta$$ represent the parameters of the policy. We will update the policy parameters using gradient ascent to maximize expected future reward.

One convenient way to represent the conditional distribution above is as a function that takes a state $$\color{red}s$$ and returns a distribution over actions $$\color{blue}a$$.

Defined below is an agent which implements the REINFORCE algorithm. REINFORCE (Williams, 1992) is the simplest model-free general reinforcement learning technique.

The basic idea is to use probabilistic action choice. If the reward at the end turns out to be high, we make all actions in this sequence more likely (otherwise, we do the opposite).

This strategy could reinforce “bad” actions as well, however they will turn out to be part of trajectories with low reward and will likely not get accentuated.

From the lectures, we know that we need to compute

(124)$$$\nabla J(\theta) = \mathbb{E} \left[ \sum_{t=0}^T \color{green} G_t \nabla\log\color{blue}\pi_\theta(\color{red}{s_t}) \right]$$$

where $$\color{green} G_t$$ is the sum over future rewards from time $$t$$, defined as

(125)$$$\color{green} G_t = \sum_{n=t}^T \gamma^{n-t} \color{green} R(\color{red}{s_t}, \color{blue}{a_t}, \color{red}{s_{t+1}}).$$$

The algorithm below will collect the state, action, and reward data in its buffer until it reaches a full trajectory. It will then update its policy given the above gradient (and the Adam optimizer).

A policy gradient trains an agent without explicitly mapping the value for every state-action pair in an environment by taking small steps and updating the policy based on the reward associated with that step. In this section, we will build a small network that trains using policy gradient using PyTorch.

The agent can receive a reward immediately for an action or it can receive the award at a later time such as the end of the episode.

The policy function our agent will try to learn is $$\pi_\theta(a,s)$$, where $$\theta$$ is the parameter vector, $$s$$ is a particular state, and $$a$$ is an action.

Monte-Carlo Policy Gradient approach will be used, which means the agent will run through an entire episode and then update policy based on the rewards obtained.

Set the hyperparameters for Policy Gradient¶

Only used in Policy Gradient Method:

# @title Set the hyperparameters for Policy Gradient

num_steps = 300

learning_rate = 0.01  # @param {type:"number"}
gamma = 0.99  # @param {type:"number"}
dropout = 0.6 # @param {type:"number"}

# @markdown Only used in Policy Gradient Method:
hidden_neurons = 128  # @param {type:"integer"}


Coding Exercise 3.1: Creating a simple neural network¶

Below you will find some incomplete code. Fill in the missing code to construct the specified neural network.

Let us define a simple feed forward neural network with one hidden layer of $$128$$ neurons and a dropout of $$0.6$$. Let’s use Adam as our optimizer and a learning rate of $$0.01$$. Use the hyperparameters already defined rather than using explicit values.

Using dropout will significantly improve the performance of the policy. Do compare your results with and without dropout and experiment with other hyper-parameter values as well.

class PolicyGradientNet(nn.Module):
"""
Defines Policy Gradient Network with the following attributes:
Feed Forward Network with a single hidden layer
width: 128 neurons
dropout: 0.6
Learning Rate: 0.01
"""

def __init__(self):
"""
Initiate Policy Gradient Network with above mentioned parameters/hyperparameters

Args:
None

Returns:
Nothing
"""
self.state_space = env.observation_space.shape[0]
self.action_space = env.action_space.n
#################################################
## TODO for students: Define two linear layers
## from the first expression
raise NotImplementedError("Student exercise: Create FF neural network.")
#################################################
# HINT: you can construct linear layers using nn.Linear(); what are the
# sizes of the inputs and outputs of each of the layers? Also remember
# that you need to use hidden_neurons (see hyperparameters section above).
#   https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
self.l1 = ...
self.l2 = ...

self.gamma = gamma
# Episode policy and past rewards
self.past_policy = Variable(torch.Tensor())
self.reward_episode = []
# Overall reward and past loss
self.past_reward = []
self.past_loss = []

def forward(self, x):
model = torch.nn.Sequential(
self.l1,
nn.Dropout(p=dropout),
nn.ReLU(),
self.l2,
nn.Softmax(dim=-1)
)
return model(x)


Click for solution

Now let’s create an instance of the network we have defined and use Adam as the optimizer using the learning_rate as hyperparameter already defined above.

policy = PolicyGradientNet()


Select Action¶

The select_action() function chooses an action based on our policy probability distribution using the PyTorch distributions package. Our policy returns a probability for each possible action in our action space (move left or move right) as an array of length two such as $$[0.7, 0.3]$$. We then choose an action based on these probabilities, record our history, and return our action.

def select_action(state):
"""
Select an action (0 or 1) by running policy model and choosing based on the probabilities in state;

Args:
state: np.ndarray
Describes Agent's state

Returns:
action:
Returns chosen action based on policy's probability distribution
"""
state = torch.from_numpy(state).type(torch.FloatTensor)
state = policy(Variable(state))
c = Categorical(state)
action = c.sample()

# Add log probability of chosen action
if policy.past_policy.dim() != 0:
policy.past_policy = torch.cat([policy.past_policy, c.log_prob(action).reshape(1)])
else:
policy.past_policy = (c.log_prob(action).reshape(1))
return action


Update policy¶

Reward $$G_t$$¶

We update our policy by taking a sample of the action value function $$Q^{\pi_\theta} (s_t,a_t)$$ by playing through episodes of the game. $$Q^{\pi_\theta} (s_t,a_t)$$ is defined as the expected return by taking action $$a$$ in state $$s$$ following policy $$\pi$$.

We know that for every step the simulation continues we receive a reward of $$1$$. We can use this to calculate the policy gradient at each time step, where $$r$$ is the reward for a particular state-action pair. Rather than using the instantaneous reward, $$r$$, we instead use a long term reward $$v_{t}$$ where $$v_t$$ is the discounted sum of all future rewards for the length of the episode. $$v_{t}$$ is then,

(126)$$$\color{green} G_t = \sum_{n=t}^T \gamma^{n-t} \color{green} R(\color{red}{s_t}, \color{blue}{a_t}, \color{red}{s_{t+1}}).$$$

where $$\gamma$$ is the discount factor ($$0.99$$). For example, if an episode lasts 5 steps, the reward for each step will be $$[4.90, 3.94, 2.97, 1.99, 1]$$. Next we scale our reward vector by substracting the mean from each element and scaling to unit variance by dividing by the standard deviation. This practice is common for machine learning applications and the same operation as Scikit Learn’s StandardScaler. It also has the effect of compensating for future uncertainty.

Update Policy: equation¶

After each episode we apply Monte-Carlo Policy Gradient to improve our policy according to the equation:

(127)$$$\Delta\theta_t = \alpha\nabla_\theta \, \log \pi_\theta (s_t,a_t)G_t$$$

We will then feed our policy history multiplied by our rewards to our optimizer and update the weights of our neural network using stochastic gradient ascent. This should increase the likelihood of actions that got our agent a larger reward.

The following function update_policy updates the network weights and therefore the policy.

def update_policy():
"""
Helper function to update network weights and policy

Args:
None

Returns:
Nothing
"""
R = 0
rewards = []

# Discount future rewards back to the present using gamma
for r in policy.reward_episode[::-1]:
R = r + policy.gamma * R
rewards.insert(0, R)

# Scale rewards
rewards = torch.FloatTensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() +
np.finfo(np.float32).eps)

# Calculate loss
pg_loss = (torch.sum(torch.mul(policy.past_policy,
Variable(rewards)).mul(-1), -1))

# Update network weights
# Use zero_grad(), backward() and step() methods of the optimizer instance.
pg_loss.backward()

# Update the weights
for param in policy.parameters():
pg_optimizer.step()

# Save and intialize episode past counters
policy.past_loss.append(pg_loss.item())
policy.past_reward.append(np.sum(policy.reward_episode))
policy.past_policy = Variable(torch.Tensor())
policy.reward_episode= []


Training¶

This is our main policy training loop. For each step in a training episode, we choose an action, take a step through the environment, and record the resulting new state and reward. We call update_policy() at the end of each episode to feed the episode history to our neural network and improve our policy.

def policy_gradient_train(episodes):
"""
Helper function to train policy gradient network

Args:
episodes: List
Log of state per episode

Returns:
Nothing
"""
running_reward = 10
for episode in range(episodes):
state = env.reset()
done = False

for time in range(1000):
action = select_action(state)
# Step through environment using chosen action
state, reward, done, _ = env.step(action.item())

# Save reward
policy.reward_episode.append(reward)
if done:
break

# Used to determine when the environment is solved.
running_reward = (running_reward * gamma) + (time * (1 - gamma))

update_policy()

if episode % 50 == 0:
print(f"Episode {episode}\tLast length: {time:5.0f}"
f"\tAverage length: {running_reward:.2f}")

if running_reward > env.spec.reward_threshold:
print(f"Solved! Running reward is now {running_reward} "
f"and the last episode runs to {time} time steps!")
break


Run the model¶

episodes = 500   # @param {type:"integer"}

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_47223/1015231315.py in <module>
1 episodes = 500   # @param {type:"integer"}

16
17     for time in range(1000):
---> 18       action = select_action(state)
19       # Step through environment using chosen action
20       state, reward, done, _ = env.step(action.item())

/tmp/ipykernel_47223/1603330273.py in select_action(state)
11       Returns chosen action based on policy's probability distribution
12   """
---> 13   state = torch.from_numpy(state).type(torch.FloatTensor)
14   state = policy(Variable(state))
15   c = Categorical(state)

TypeError: expected np.ndarray (got tuple)


Plot the results¶

Plot the training performance for policy gradient¶

# @title Plot the training performance for policy gradient

"""
Helper function to plot the training performance

Args:
None

Returns:
Nothing
"""
window = int(episodes / 20)

fig, ((ax1), (ax2)) = plt.subplots(1, 2, sharey=True, figsize=[15, 4]);
rolling_mean = pd.Series(policy.past_reward).rolling(window).mean()
std = pd.Series(policy.past_reward).rolling(window).std()
ax1.plot(rolling_mean)
ax1.fill_between(range(len(policy.past_reward)),
rolling_mean-std, rolling_mean+std,
color='orange', alpha=0.2)
ax1.set_title(f"Episode Length Moving Average ({window}-episode window)")
ax1.set_xlabel('Episode'); ax1.set_ylabel('Episode Length')

ax2.plot(policy.past_reward)
ax2.set_title('Episode Length')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Episode Length')

plt.show()



Exercise 3.1: Explore different hyperparameters.¶

Try running the model again, by modifying the hyperparameters and observe the outputs. Be sure to rerun the function definition cells in order to pick up on the updated values.

What do you see when you

1. increase learning rate

2. decrease learning rate

3. decrease gamma ($$\gamma$$)

4. increase number of hidden neurons in the network

Section 3.2: Actor-critic¶

(128)$$$\nabla J(\theta) = \mathbb{E} \left[ \sum_{t=0}^T \color{green} G_t \nabla\log\color{blue}\pi_\theta(\color{red}{s_t}) \right]$$$

The policy parameters are updated using Monte Carlo technique and uses random samples. This introduces high variability in log probabilities and cumulative reward values. This leads to noisy gradients and can cause unstable learning.

One way to reduce variance and increase stability is subtracting the cumulative reward by a baseline:

(129)$$$\nabla J(\theta) = \mathbb{E} \left[ \sum_{t=0}^T \color{green} (G_t - b) \nabla\log\color{blue}\pi_\theta(\color{red}{s_t}) \right]$$$

Intuitively, reducing cumulative reward will make smaller gradients and thus smaller and more stable (hopefully) updates.

From the lecture slides, we know that in Actor Critic Method:

1. The Critic estimates the value function. This could be the action-value (the Q value) or state-value (the V value).

2. The Actor updates the policy distribution in the direction suggested by the Critic (such as with policy gradients).

Both the Critic and Actor functions are parameterized with neural networks. The “Critic” network parameterizes the Q-value.

Set the hyperparameters for Actor Critic¶

# @title Set the hyperparameters for Actor Critic

learning_rate = 0.01  # @param {type:"number"}
gamma = 0.99  # @param {type:"number"}
dropout = 0.6

# Only used in Actor-Critic Method
hidden_size = 256  # @param {type:"integer"}

num_steps = 300


Actor Critic Network¶

class ActorCriticNet(nn.Module):
"""
Build Actor Critic Network
"""

def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
"""
Initiate Actor Critic Network

Args:
num_inputs: int
Number of inputs incoming into the Network
num_actions: int
Number of actions
hidden_size: int
Size of hidden layer in the network
learning_rate: Float
Learning rate of Actor Critic Network

Returns:
Nothing
"""
super(ActorCriticNet, self).__init__()

self.num_actions = num_actions
self.critic_linear1 = nn.Linear(num_inputs, hidden_size)
self.critic_linear2 = nn.Linear(hidden_size, 1)

self.actor_linear1 = nn.Linear(num_inputs, hidden_size)
self.actor_linear2 = nn.Linear(hidden_size, num_actions)

self.all_rewards = []
self.all_lengths = []
self.average_lengths = []

def forward(self, state):
"""
Describes forward pass of Actor Critic Network

Args:
state: np.ndarray
Describes state

Returns:
Value and Policy Distribution
"""
state = Variable(torch.from_numpy(state).float().unsqueeze(0))
value = F.relu(self.critic_linear1(state))
value = self.critic_linear2(value)

policy_dist = F.relu(self.actor_linear1(state))
policy_dist = F.softmax(self.actor_linear2(policy_dist), dim=1)

return value, policy_dist


Training¶

def actor_critic_train(episodes):
"""
Helper function to train Actor Critic Network

Args:
episodes: list
Log of episode for all episodes

Returns:
Nothing
"""
all_lengths = []
average_lengths = []
all_rewards = []
entropy_term = 0

for episode in range(episodes):
log_probs = []
values = []
rewards = []

state = env.reset()
for steps in range(num_steps):
value, policy_dist = actor_critic.forward(state)
value = value.detach().numpy()[0, 0]
dist = policy_dist.detach().numpy()

action = np.random.choice(num_outputs, p=np.squeeze(dist))
log_prob = torch.log(policy_dist.squeeze(0)[action])
entropy = -np.sum(np.mean(dist) * np.log(dist))
new_state, reward, done, _ = env.step(action)

rewards.append(reward)
values.append(value)
log_probs.append(log_prob)
entropy_term += entropy
state = new_state

if done or steps == num_steps - 1:
qval, _ = actor_critic.forward(new_state)
qval = qval.detach().numpy()[0, 0]
all_rewards.append(np.sum(rewards))
all_lengths.append(steps)
average_lengths.append(np.mean(all_lengths[-10:]))
if episode % 50 == 0:
print(f"episode: {episode},\treward: {np.sum(rewards)},"
f"\ttotal length: {steps},"
f"\taverage length: {average_lengths[-1]}")
break

# compute Q values
qvals = np.zeros_like(values)
for t in reversed(range(len(rewards))):
qval = rewards[t] + gamma * qval
qvals[t] = qval

#update actor critic
values = torch.FloatTensor(values)
qvals = torch.FloatTensor(qvals)
log_probs = torch.stack(log_probs)

ac_loss = actor_loss + critic_loss + 0.001 * entropy_term

ac_loss.backward()
ac_optimizer.step()

# Store results
actor_critic.average_lengths = average_lengths
actor_critic.all_rewards = all_rewards
actor_critic.all_lengths = all_lengths


Run the model¶

episodes = 500   # @param {type:"integer"}

env.reset()

num_inputs = env.observation_space.shape[0]
num_outputs = env.action_space.n

actor_critic = ActorCriticNet(num_inputs, num_outputs, hidden_size)

actor_critic_train(episodes)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_47223/2311026764.py in <module>
10
---> 11 actor_critic_train(episodes)

/tmp/ipykernel_47223/2597994127.py in actor_critic_train(episodes)
22     state = env.reset()
23     for steps in range(num_steps):
---> 24       value, policy_dist = actor_critic.forward(state)
25       value = value.detach().numpy()[0, 0]
26       dist = policy_dist.detach().numpy()

/tmp/ipykernel_47223/1275356442.py in forward(self, state)
45       Value and Policy Distribution
46     """
---> 47     state = Variable(torch.from_numpy(state).float().unsqueeze(0))
48     value = F.relu(self.critic_linear1(state))
49     value = self.critic_linear2(value)

TypeError: expected np.ndarray (got tuple)


Plot the results¶

Plot the training performance for Actor Critic¶

# @title Plot the training performance for Actor Critic

def plot_actor_critic_training(actor_critic, episodes):
"""
Plot the training performance for Actor Critic

Args:
actor_critic: nn.module
Actor Critic Network whose performance is to be plotted
episodes: int
Number of episodes

Returns:
Nothing
"""
window = int(episodes / 20)

plt.figure(figsize=(15, 4))
plt.subplot(1, 2, 1)

smoothed_rewards = pd.Series(actor_critic.all_rewards).rolling(window).mean()
std = pd.Series(actor_critic.all_rewards).rolling(window).std()

plt.plot(smoothed_rewards, label='Smoothed rewards')
plt.fill_between(range(len(smoothed_rewards)),
smoothed_rewards - std, smoothed_rewards + std,
color='orange', alpha=0.2)

plt.xlabel('Episode')
plt.ylabel('Reward')

plt.subplot(1, 2, 2)
plt.plot(actor_critic.all_lengths, label='All lengths')
plt.plot(actor_critic.average_lengths, label='Average lengths')
plt.xlabel('Episode')
plt.ylabel('Episode length')
plt.legend()

plt.tight_layout()
plt.show()

plot_actor_critic_training(actor_critic, episodes)


Exercise 3.2.1: Effect of episodes on performance¶

Change the episodes from 500 to 3000 and observe the performance impact.

Exercise 3.2.2: Effect of learning rate on performance¶

Modify the hyperparameters related to learning_rate and gamma and observe the impact on the performance.

Be sure to rerun the function definition cells in order to pick up on the updated values.

Section 4: RL in the real world¶

Time estimate: ~10mins

Exercise 4: Group discussion¶

Form a group of 2-3 and have discussions (roughly 3 minutes each) of the following questions:

1. Safety: what are some safety issues that arise in RL that don’t arise with say, supervised learning?

2. Generalization: What happens if your RL agent is presented with data it hasn’t trained on (i.e., goes out of distribution)?

3. How important do you think interpretability is in the ethical and safe deployment of RL agents in the real world?

This should be a very open-ended discussion. Try to have everyone say at least one thing. They can either take these 3 questions in turn, with 3-4 minutes allotted to each, or address them all at once, and allow for a more natural conversation.

Click for solution