Bonus Tutorial: Function approximation
Contents
Bonus Tutorial: Function approximation¶
Week 3, Day 4: Basic Reinforcement Learning (RL)
By Neuromatch Academy
Content creators: Marcelo G Mattar, Eric DeWitt, Matt Krause, Matthew Sargent, Anoop Kulkarni, Sowmya Parthiban, Feryal Behbahani, Jane Wang
Content reviewers: Ella Batty, Byron Galbraith, Michael Waskom, Ezekiel Williams, Mehul Rastogi, Lily Cheng, Roberto Guidotti, Arush Tagade, Kelson Shilling-Scrivo
Production editors: Gagana B, Spiros Chavlis
Tutorial Objectives¶
This whole notebook contains bonus material to get a better sense of more complex reinforcement learning models as well as real world applications of RL. Previously, we implemented fundemental ideas of RL in basic Python. Here, we will show how these can be implemented using the Acme library by DeepMind. For the project on GitHub see here.
By the end of the tutorial, you should be able to:
Implement, train, and test a NFQ Agent
Use a Deep Q-network
Learn the Policy Gradient and the Actor Critic
Use RL in real-world applications
Setup¶
Run the following Setup cells in order to set up needed functions. Don’t worry about the code for now!
Note: There is an issue with some images not showing up if you’re using a Safari browser. Please switch to Chrome if this is the case.
Kaggle users: You need to downgrade acme and use acme v. 0.2. So, in the cell below, comment out the three last lines and add:
!pip install dm-acme==0.2 --quiet
Install requirements¶
# @title Install requirements
# @markdown We install the acme library, see [here](https://github.com/deepmind/acme) for more info.
# @markdown **WARNING:** There may be *errors* and/or *warnings* reported during the installation. However, they should be ignored.
!pip install --upgrade pip --quiet
!pip install imageio --quiet
!pip install imageio-ffmpeg --quiet
!pip install gym --quiet
!pip install enum34 --quiet
!pip install pandas --quiet
!pip install grpcio==1.34.0 --quiet
!pip install typing --quiet
!pip install einops --quiet
!pip install dm-acme[reverb] --quiet
!pip install dm-acme[jax,tensorflow] --quiet
!pip install dm-acme[envs] --quiet
WARNING: dm-acme 0.4.0 does not provide the extra 'reverb'
WARNING: dm-acme 0.4.0 does not provide the extra 'tensorflow'
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
myst-parser 0.13.7 requires markdown-it-py~=0.6.2, but you have markdown-it-py 2.2.0 which is incompatible.
mdit-py-plugins 0.2.6 requires markdown-it-py<2.0.0,>=0.5.8, but you have markdown-it-py 2.2.0 which is incompatible.
jupytext 1.10.3 requires markdown-it-py~=0.6.0, but you have markdown-it-py 2.2.0 which is incompatible.
# Import modules
import gym
import enum
import copy
import time
import acme
import torch
import base64
import dm_env
import IPython
import imageio
import warnings
import itertools
import collections
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
from typing import Sequence
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.distributions import Categorical
from acme import specs
from acme import wrappers
from acme.utils import tree_utils
from acme.utils import loggers
warnings.filterwarnings('ignore')
np.set_printoptions(precision=3, suppress=1)
Figure settings¶
# @title Figure settings
import ipywidgets as widgets # Interactive display
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.style.use("https://raw.githubusercontent.com/NeuromatchAcademy/content-creation/main/nma.mplstyle")
mpl.rc('image', cmap='Blues')
Helper Functions¶
Implement helpers for value visualisation
# @title Helper Functions
# @markdown Implement helpers for value visualisation
map_from_action_to_subplot = lambda a: (2, 6, 8, 4)[a]
map_from_action_to_name = lambda a: ("up", "right", "down", "left")[a]
def plot_values(values, colormap='pink', vmin=-1, vmax=10):
"""
Plots incoming values
Args:
values: List
List of values to be plotted
colormap: String
Defines colormap of plot
vmin: Integer
Smallest possible value within "values"
vmax: Integer
Highest possible value within "values"
Returns:
Nothing
"""
plt.imshow(values, interpolation="nearest",
cmap=colormap, vmin=vmin, vmax=vmax)
plt.yticks([])
plt.xticks([])
plt.colorbar(ticks=[vmin, vmax])
def plot_state_value(state_values, epsilon=0.1):
"""
Helper function to plot state value
Args:
state_values: np.ndarray
Action values with shape (9, 10, 4)
epsilon: Float
Sets the exploitation-exploration control hyperparameter [default=0.1]
Returns:
Nothing
"""
q = state_values
fig = plt.figure(figsize=(4, 4))
vmin = np.min(state_values)
vmax = np.max(state_values)
v = (1 - epsilon) * np.max(q, axis=-1) + epsilon * np.mean(q, axis=-1)
plot_values(v, colormap='summer', vmin=vmin, vmax=vmax)
plt.title("$v(s)$")
def plot_action_values(action_values, epsilon=0.1):
"""
Helper function to plot action value
Args:
action_values: np.ndarray
Action values with shape (9, 10, 4)
epsilon: Float
Sets the exploitation-exploration control hyperparameter [default=0.1]
Returns:
Nothing
"""
q = action_values
fig = plt.figure(figsize=(8, 8))
fig.subplots_adjust(wspace=0.3, hspace=0.3)
vmin = np.min(action_values)
vmax = np.max(action_values)
dif = vmax - vmin
for a in [0, 1, 2, 3]:
plt.subplot(3, 3, map_from_action_to_subplot(a))
plot_values(q[..., a], vmin=vmin - 0.05*dif, vmax=vmax + 0.05*dif)
action_name = map_from_action_to_name(a)
plt.title(r"$q(s, \mathrm{" + action_name + r"})$")
plt.subplot(3, 3, 5)
v = (1 - epsilon) * np.max(q, axis=-1) + epsilon * np.mean(q, axis=-1)
plot_values(v, colormap='summer', vmin=vmin, vmax=vmax)
plt.title("$v(s)$")
Set random seed¶
Executing set_seed(seed=seed)
you are setting the seed
# @title Set random seed
# @markdown Executing `set_seed(seed=seed)` you are setting the seed
# For DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.
# Read more here: https://pytorch.org/docs/stable/notes/randomness.html
# Call `set_seed` function in the exercises to ensure reproducibility.
import random
import torch
def set_seed(seed=None, seed_torch=True):
"""
Function that controls randomness. NumPy and random modules must be imported.
Args:
seed : Integer
A non-negative integer that defines the random state. Default is `None`.
seed_torch : Boolean
If `True` sets the random seed for pytorch tensors, so pytorch module
must be imported. Default is `True`.
Returns:
Nothing.
"""
if seed is None:
seed = np.random.choice(2 ** 32)
random.seed(seed)
np.random.seed(seed)
if seed_torch:
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
print(f'Random seed {seed} has been set.')
# In case that `DataLoader` is used
def seed_worker(worker_id):
"""
DataLoader will reseed workers following randomness in
multi-process data loading algorithm.
Args:
worker_id: integer
ID of subprocess to seed. 0 means that
the data will be loaded in the main process
Refer: https://pytorch.org/docs/stable/data.html#data-loading-randomness for more details
Returns:
Nothing
"""
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
random.seed(worker_seed)
Set device (GPU or CPU). Execute set_device()
¶
# @title Set device (GPU or CPU). Execute `set_device()`
# especially if torch modules used.
# Inform the user if the notebook uses GPU or CPU.
def set_device():
"""
Set the device. CUDA if available, CPU otherwise
Args:
None
Returns:
Nothing
"""
device = "cuda" if torch.cuda.is_available() else "cpu"
if device != "cuda":
print("WARNING: For this notebook to perform best, "
"if possible, in the menu under `Runtime` -> "
"`Change runtime type.` select `GPU` ")
else:
print("GPU is enabled in this notebook.")
return device
SEED = 2021
set_seed(seed=SEED)
DEVICE = set_device()
Random seed 2021 has been set.
WARNING: For this notebook to perform best, if possible, in the menu under `Runtime` -> `Change runtime type.` select `GPU`
Acme: a research framework for reinforcement learning¶
Acme is a library of reinforcement learning (RL) agents and agent building blocks by Google DeepMind. Acme strives to expose simple, efficient, and readable agents, that serve both as reference implementations of popular algorithms and as strong baselines, while still providing enough flexibility to do novel research. The design of Acme also attempts to provide multiple points of entry to the RL problem at differing levels of complexity.
For more information see the github’s repository deepmind/acme.
For this practical session, we will focus on a simple grid world environment, which consists of a 9 x 10 grid of wall and empty cells - depicted in black and white, respectively. The smiling agent starts from an initial location and needs to navigate to reach the goal square.

Below you will find an implementation of this Gridworld as a dm_env.Environment
.
There is no coding in this section, but if you want, you can look over the provided code so that you can familiarize yourself with an example of how to set up a grid world environment.
Implement GridWorld¶
Double-click to inspect the contents of this cell.¶
# @title Implement GridWorld
# @markdown ##### *Double-click* to inspect the contents of this cell.
class ObservationType(enum.IntEnum):
"""
Class to examine observation type including goal position and state index;
Attributes:
observation_type: Enum
Enum observation type to use. One of:
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)
"""
STATE_INDEX = enum.auto()
AGENT_ONEHOT = enum.auto()
GRID = enum.auto()
AGENT_GOAL_POS = enum.auto()
class GridWorld(dm_env.Environment):
"""
Build a grid environment.
Simple gridworld defined by a map layout, a start and a goal state.
Layout should be a NxN grid, containing:
* 0: Empty
* -1: Wall
* Any other positive value: value indicates reward;
Episode will terminate
"""
def __init__(self,
layout,
start_state,
goal_state=None,
observation_type=ObservationType.STATE_INDEX,
discount=0.9,
penalty_for_walls=-5,
reward_goal=10,
max_episode_length=None,
randomize_goals=False):
"""
Initiates grid environment
Args:
layout: List
NxN array of numbers, indicating the layout of the environment.
start_state: Tuple
Tuple (y, x) of starting location.
goal_state: Tuple
Optional tuple (y, x) of goal location. Will be randomly
sampled once if None.
observation_type: Enum
Enum observation type to use.
discount: Float
Discounting factor included in all Timesteps.
penalty_for_walls: Integer
Reward added when hitting a wall (should be negative).
reward_goal: Integer
Reward added when finding the goal (should be positive).
max_episode_length: Integer
If set, will terminate an episode after this many steps.
randomize_goals: Boolean
If true, randomize goal at every episode.
Returns:
None
"""
if observation_type not in ObservationType:
raise ValueError('observation_type should be a ObservationType instace.')
self._layout = np.array(layout)
self._start_state = start_state
self._state = self._start_state
self._number_of_states = np.prod(np.shape(self._layout))
self._discount = discount
self._penalty_for_walls = penalty_for_walls
self._reward_goal = reward_goal
self._observation_type = observation_type
self._layout_dims = self._layout.shape
self._max_episode_length = max_episode_length
self._num_episode_steps = 0
self._randomize_goals = randomize_goals
if goal_state is None:
# Randomly sample goal_state if not provided
goal_state = self._sample_goal()
self.goal_state = goal_state
def _sample_goal(self):
"""
Randomly sample reachable non-starting state.
Args:
None
Returns:
Nothing
"""
# Sample a new goal
n = 0
max_tries = 1e5
while n < max_tries:
goal_state = tuple(np.random.randint(d) for d in self._layout_dims)
if goal_state != self._state and self._layout[goal_state] == 0:
# Reachable state found!
return goal_state
n += 1
raise ValueError('Failed to sample a goal state.')
@property
def layout(self):
return self._layout
@property
def number_of_states(self):
return self._number_of_states
@property
def goal_state(self):
return self._goal_state
@property
def start_state(self):
return self._start_state
@property
def state(self):
return self._state
def set_state(self, x, y):
self._state = (y, x)
def action_spec(self):
return specs.DiscreteArray(4, dtype=int, name='action')
@goal_state.setter
def goal_state(self, new_goal):
if new_goal == self._state or self._layout[new_goal] < 0:
raise ValueError('This is not a valid goal!')
# Zero out any other goal
self._layout[self._layout > 0] = 0
# Setup new goal location
self._layout[new_goal] = self._reward_goal
self._goal_state = new_goal
def plot_greedy_policy(self, q):
greedy_actions = np.argmax(q, axis=2)
self.plot_policy(greedy_actions)
def observation_spec(self):
"""
Function to return the spec-list based on observation type
Args:
None
Returns:
Specification-list based on observation type
"""
if self._observation_type is ObservationType.AGENT_ONEHOT:
return specs.Array(
shape=self._layout_dims,
dtype=np.float32,
name='observation_agent_onehot')
elif self._observation_type is ObservationType.GRID:
return specs.Array(
shape=self._layout_dims + (3,),
dtype=np.float32,
name='observation_grid')
elif self._observation_type is ObservationType.AGENT_GOAL_POS:
return specs.Array(
shape=(4, ), dtype=np.float32, name='observation_agent_goal_pos')
elif self._observation_type is ObservationType.STATE_INDEX:
return specs.DiscreteArray(
self._number_of_states, dtype=int, name='observation_state_index')
def get_obs(self):
"""
Returns observation initiating agent state, position, goal state
Args:
None
Returns:
Observation
"""
if self._observation_type is ObservationType.AGENT_ONEHOT:
obs = np.zeros(self._layout.shape, dtype=np.float32)
# Place agent
obs[self._state] = 1
return obs
elif self._observation_type is ObservationType.GRID:
obs = np.zeros(self._layout.shape + (3,), dtype=np.float32)
obs[..., 0] = self._layout < 0
obs[self._state[0], self._state[1], 1] = 1
obs[self._goal_state[0], self._goal_state[1], 2] = 1
return obs
elif self._observation_type is ObservationType.AGENT_GOAL_POS:
return np.array(self._state + self._goal_state, dtype=np.float32)
elif self._observation_type is ObservationType.STATE_INDEX:
y, x = self._state
return y * self._layout.shape[1] + x
def reset(self):
"""
Helper function to reset GridWorld
Args:
None
Returns:
Reset environment
"""
self._state = self._start_state
self._num_episode_steps = 0
if self._randomize_goals:
self.goal_state = self._sample_goal()
return dm_env.TimeStep(
step_type=dm_env.StepType.FIRST,
reward=None,
discount=None,
observation=self.get_obs())
def step(self, action):
"""
Helper function to process current position and
optimize future steps towards goal
Args:
action: Integer
if 0, move up; if 1, move right; if 2, more down and if 3, move left
Returns:
Observation from new position;
"""
y, x = self._state
if action == 0: # Up
new_state = (y - 1, x)
elif action == 1: # Right
new_state = (y, x + 1)
elif action == 2: # Down
new_state = (y + 1, x)
elif action == 3: # Left
new_state = (y, x - 1)
else:
raise ValueError(
'Invalid action: {} is not 0, 1, 2, or 3.'.format(action))
new_y, new_x = new_state
step_type = dm_env.StepType.MID
if self._layout[new_y, new_x] == -1: # Wall
reward = self._penalty_for_walls
discount = self._discount
new_state = (y, x)
elif self._layout[new_y, new_x] == 0: # Empty cell
reward = 0.
discount = self._discount
else: # Goal
reward = self._layout[new_y, new_x]
discount = 0.
new_state = self._start_state
step_type = dm_env.StepType.LAST
self._state = new_state
self._num_episode_steps += 1
if (self._max_episode_length is not None and
self._num_episode_steps >= self._max_episode_length):
step_type = dm_env.StepType.LAST
return dm_env.TimeStep(
step_type=step_type,
reward=np.float32(reward),
discount=discount,
observation=self.get_obs())
def plot_grid(self, add_start=True):
"""
Helper function to plot GridWorld
Args:
add_start: Boolean
if True, add start/goal positions
Returns:
Nothing
"""
plt.figure(figsize=(4, 4))
plt.imshow(self._layout <= -1, interpolation='nearest')
ax = plt.gca()
ax.grid(0)
plt.xticks([])
plt.yticks([])
# Add start/goal
if add_start:
plt.text(
self._start_state[1],
self._start_state[0],
r'$\mathbf{S}$',
fontsize=16,
ha='center',
va='center')
plt.text(
self._goal_state[1],
self._goal_state[0],
r'$\mathbf{G}$',
fontsize=16,
ha='center',
va='center')
h, w = self._layout.shape
for y in range(h - 1):
plt.plot([-0.5, w - 0.5], [y + 0.5, y + 0.5], '-w', lw=2)
for x in range(w - 1):
plt.plot([x + 0.5, x + 0.5], [-0.5, h - 0.5], '-w', lw=2)
def plot_state(self, return_rgb=False):
"""
Helper function to plot agent state
Args:
return_rgb: Boolean
if True, process GridWorld with number-of-channels = 3
Returns:
data: np.ndarray
Array of size (h, w, 3) describing environment
"""
self.plot_grid(add_start=False)
# Add the agent location
plt.text(
self._state[1],
self._state[0],
r'$\mathbf{A}$',
fontsize=18,
ha='center',
va='center',
)
if return_rgb:
fig = plt.gcf()
plt.axis('tight')
plt.subplots_adjust(0, 0, 1, 1, 0, 0)
fig.canvas.draw()
data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
w, h = fig.canvas.get_width_height()
data = data.reshape((h, w, 3))
plt.close(fig)
return data
def plot_policy(self, policy):
"""
Helper function to visualize the policy;
Args:
policy: PolicyGradientNet instance
Describes the principles that govern agent movement
Returns:
Nothing
"""
action_names = [
r'$\uparrow$', r'$\rightarrow$', r'$\downarrow$', r'$\leftarrow$'
]
self.plot_grid()
plt.title('Policy Visualization')
h, w = self._layout.shape
for y in range(h):
for x in range(w):
# if ((y, x) != self._start_state) and ((y, x) != self._goal_state):
if (y, x) != self._goal_state:
action_name = action_names[policy[y, x]]
plt.text(x, y, action_name, ha='center', va='center')
def build_gridworld_task(task,
discount=0.9,
penalty_for_walls=-5,
observation_type=ObservationType.STATE_INDEX,
max_episode_length=200):
"""
Construct a particular Gridworld layout with start/goal states.
Args:
task: String
String name of the task to use. One of {'simple', 'obstacle',
'random_goal'}.
discount: Float
Discounting factor included in all Timesteps.
penalty_for_walls: Integer
Reward added when hitting a wall (should be negative).
observation_type: Enum O
bservation type to use. One of:
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x).
max_episode_length: Integer
If set, will terminate an episode after this many steps.
Returns:
Nothing
"""
tasks_specifications = {
'simple': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
'goal_state': (7, 2)
},
'obstacle': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, -1, 0, 0, -1],
[-1, 0, 0, 0, -1, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
'goal_state': (2, 8)
},
'random_goal': {
'layout': [
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, -1, -1, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, 0, 0, 0, 0, 0, 0, 0, 0, -1],
[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
],
'start_state': (2, 2),
# 'randomize_goals': True
},
}
return GridWorld(
discount=discount,
penalty_for_walls=penalty_for_walls,
observation_type=observation_type,
max_episode_length=max_episode_length,
**tasks_specifications[task])
def setup_environment(environment):
"""
Returns the environment and its spec.
Args:
environment: acme.wrappers.single_precision.SinglePrecisionWrapper instance
Wrapped environment into single-precision floats [avoids floating point errors]
Returns:
environment: acme.wrappers.single_precision.SinglePrecisionWrapper instance
Wrapped environment into single-precision floats [avoids floating point errors]
environment_spec: acme.specs.EnvironmentSpec(dm_env.specs.Array instance, dm_env.specs.DiscreteArray instance, dm_env.specs.Array instance, dm_env.specs.BoundedArray instance)
Descibes specification of the GridWorld Environment
"""
# Make sure the environment outputs single-precision floats.
environment = wrappers.SinglePrecisionWrapper(environment)
# Grab the spec of the environment.
environment_spec = specs.make_environment_spec(environment)
return environment, environment_spec
We will use two distinct tabular GridWorlds:
simple
where the goal is at the bottom left of the grid, little navigation required.obstacle
where the goal is behind an obstacle the agent must avoid.
You can visualize the grid worlds by running the cell below.
Note that S indicates the start state and G indicates the goal.
Visualise GridWorlds¶
# @title Visualise GridWorlds
# Instantiate two tabular environments, a simple task, and one that involves
# the avoidance of an obstacle.
simple_grid = build_gridworld_task(
task='simple', observation_type=ObservationType.GRID)
obstacle_grid = build_gridworld_task(
task='obstacle', observation_type=ObservationType.GRID)
# Plot them.
simple_grid.plot_grid()
plt.title('Simple')
obstacle_grid.plot_grid()
plt.title('Obstacle')
plt.show()


In this environment, the agent has four possible actions: up
, right
, down
, and left
. The reward is -5
for bumping into a wall, +10
for reaching the goal, and 0
otherwise. The episode ends when the agent reaches the goal, and otherwise continues. The discount on continuing steps, is \(\gamma = 0.9\).
Before we start building an agent to interact with this environment, let’s first look at the types of objects the environment either returns (e.g., observations) or consumes (e.g., actions). The environment_spec
will show you the form of the observations, rewards and discounts that the environment exposes and the form of the actions that can be taken.
Look at environment_spec
¶
Note: setup_environment
is implemented in the same cell as GridWorld.¶
# @title Look at `environment_spec`
# @markdown ##### **Note:** `setup_environment` is implemented in the same cell as GridWorld.
environment, environment_spec = setup_environment(simple_grid)
print('actions:\n', environment_spec.actions, '\n')
print('observations:\n', environment_spec.observations, '\n')
print('rewards:\n', environment_spec.rewards, '\n')
print('discounts:\n', environment_spec.discounts, '\n')
actions:
DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)
observations:
Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')
rewards:
Array(shape=(), dtype=dtype('float32'), name='reward')
discounts:
BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
We first set the environment to its initial state by calling the reset()
method which returns the first observation and resets the agent to the starting location.
environment.reset()
environment.plot_state()

Note that A indicates the agent’s location.
Now we want to take an action to interact with the environment. We do this by passing a valid action to the dm_env.Environment.step()
method which returns a dm_env.TimeStep
namedtuple with fields (step_type, reward, discount, observation)
.
Let’s take an action and visualise the resulting state of the grid-world. (You’ll need to rerun the cell if you pick a new action.)
Note for kaggle users: As Kaggle does not render the forms automatically the students should be careful to notice the various instructions and manually play around with the values for the variables
Pick an action and see the state changing¶
# @title Pick an action and see the state changing
action = "down" #@param ["up", "right", "down", "left"] {type:"string"}
action_int = {'up': 0,
'right': 1,
'down': 2,
'left':3 }
action = int(action_int[action])
timestep = environment.step(action) # pytype: dm_env.TimeStep
environment.plot_state()

Run loop¶
# @title Run loop
# @markdown ##### This function runs an agent in the environment for a number of episodes, allowing it to learn.
# @markdown ##### *Double-click* to inspect the `run_loop` function.
def run_loop(environment,
agent,
num_episodes=None,
num_steps=None,
logger_time_delta=1.,
label='training_loop',
log_loss=False,
):
"""
Perform the Acme run loop.
Run the environment loop for `num_episodes` episodes. Each episode is itself
a loop which interacts first with the environment to get an observation and
then give that observation to the agent in order to retrieve an action. Upon
termination of an episode a new episode will be started. If the number of
episodes is not given then this will interact with the environment
infinitely.
Args:
environment: dm_env
Used to generate trajectories.
agent: acme.Actor
For selecting actions in the run loop.
num_steps: Integer
Number of steps to run the loop for. If `None` (default), runs
without limit.
num_episodes: Integer
Nmber of episodes to run the loop for. If `None` (default),
runs without limit.
logger_time_delta: Float
Time interval (in seconds) between consecutive logging steps.
label: String
Optional label used at logging steps.
log_loss: Boolean
If true, log_loss function is used to compute loss
Else, use raw loss function
Returns:
all_returns: List
Log of return per episode
"""
logger = loggers.TerminalLogger(label=label, time_delta=logger_time_delta)
iterator = range(num_episodes) if num_episodes else itertools.count()
all_returns = []
num_total_steps = 0
for episode in iterator:
# Reset any counts and start the environment.
start_time = time.time()
episode_steps = 0
episode_return = 0
episode_loss = 0
timestep = environment.reset()
# Make the first observation.
agent.observe_first(timestep)
# Run an episode.
while not timestep.last():
# Generate an action from the agent's policy and step the environment.
action = agent.select_action(timestep.observation)
timestep = environment.step(action)
# Have the agent observe the timestep and let the agent update itself.
agent.observe(action, next_timestep=timestep)
agent.update()
# Book-keeping.
episode_steps += 1
num_total_steps += 1
episode_return += timestep.reward
if log_loss:
episode_loss += agent.last_loss
if num_steps is not None and num_total_steps >= num_steps:
break
# Collect the results and combine with counts.
steps_per_second = episode_steps / (time.time() - start_time)
result = {
'episode': episode,
'episode_length': episode_steps,
'episode_return': episode_return,
}
if log_loss:
result['loss_avg'] = episode_loss/episode_steps
all_returns.append(episode_return)
# Log the given results.
logger.write(result)
if num_steps is not None and num_total_steps >= num_steps:
break
return all_returns
Implement the evaluation loop¶
# @title Implement the evaluation loop
# @markdown ##### This function runs the agent in the environment for a number of episodes, without allowing it to learn, in order to evaluate it.
# @markdown ##### *Double-click* to inspect the `evaluate` function.
def evaluate(environment: dm_env.Environment,
agent: acme.Actor,
evaluation_episodes: int):
"""
Helper function to run evaluation loop
Args:
environment: dm_env
Used to generate trajectories.
agent: acme.Actor
For selecting actions in the run loop.
evaluation_episodes: Integer
Number of episodes for which evaluation loop is to be run for.
Returns:
frames: List
Log of environment state for each time step.
"""
frames = []
for episode in range(evaluation_episodes):
timestep = environment.reset()
episode_return = 0
steps = 0
while not timestep.last():
frames.append(environment.plot_state(return_rgb=True))
action = agent.select_action(timestep.observation)
timestep = environment.step(action)
steps += 1
episode_return += timestep.reward
print(
f'Episode {episode} ended with reward {episode_return} in {steps} steps'
)
return frames
def display_video(frames: Sequence[np.ndarray],
filename: str = 'temp.mp4',
frame_rate: int = 12):
"""
Save and render video.
Args:
frames: Sequence[np.ndarray]
Log of environment state for each time step.
filename: String
Name for the video file generated
frame_rate: Integer
Specifies frequency at which frames are displayed.
Returns:
IPython.display.HTML(video_tag)
"""
# Write the frames to a video.
with imageio.get_writer(filename, fps=frame_rate) as video:
for frame in frames:
video.append_data(frame)
# Read video and display the video.
video = open(filename, 'rb').read()
b64_video = base64.b64encode(video)
video_tag = ('<video width="320" height="240" controls alt="test" '
'src="data:video/mp4;base64,{0}">').format(b64_video.decode())
return IPython.display.HTML(video_tag)
Section 1: Function Approximation¶
Time estimate: ~25mins
Video 1: Function approximation¶

So far we only considered look-up tables for value-functions. In all previous cases every state and action pair \((\color{red}{s}, \color{blue}{a})\), had an entry in our \(\color{green}Q\)-table. Again, this is possible in this environment as the number of states is equal to the number of cells in the grid. But this is not scalable to situations where, say, the goal location changes or the obstacles are in different locations at every episode (consider how big the table could be in this situation?).
An example (not covered in this tutorial) is ATARI from pixels, where the number of possible frames an agent can see is exponential in the number of pixels on the screen.

But what we really want is just to be able to compute the Q-value, when fed with a particular \((\color{red}{s}, \color{blue}{a})\) pair. So if we had a way to get a function to do this work instead of keeping a big table, we’d get around this problem.
To address this, we can use function approximation as a way to generalize Q-values over some representation of the very large state space, and train them to output the values they should. In this section, we will explore \(\color{green}Q\)-learning with function approximation, which (although it has been theoretically proven to diverge for some degenerate MDPs) can yield impressive results in very large environments. In particular, we will look at Neural Fitted Q (NFQ) Iteration and Deep Q-Networks (DQN).
Section 1.1 Replay Buffers¶
An important property of off-policy methods like \(\color{green}Q\)-learning is that they involve two policies: one for exploration and one that is being optimized (via the \(\color{green}Q\)-function updates). This means that we can generate data from the behavior policy and insert that data into some form of data storage—usually referred to as replay.
In order to optimize the \(\color{green}Q\)-function we can then sample data from the replay dataset and use that data to perform an update. An illustration of this learning loop is shown below.

In the next cell we will show how to implement a simple replay buffer. This can be as simple as a python list containing transition data. In more complicated scenarios we might want to have a more performance-tuned variant, we might have to be more concerned about how large replay is and what to do when its full, and we might want to sample from replay in different ways. But a simple python list can go a surprisingly long way.
# Simple replay buffer
# Create a convenient container for the SARSA tuples required by deep RL agents.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])
class ReplayBuffer(object):
"""
A simple Python Replay Buffer.
Queue based implementation.
"""
def __init__(self, capacity: int = None):
self.buffer = collections.deque(maxlen=capacity)
self._prev_state = None
def add_first(self, initial_timestep: dm_env.TimeStep):
self._prev_state = initial_timestep.observation
def add(self, action: int, timestep: dm_env.TimeStep):
transition = Transitions(
state=self._prev_state,
action=action,
reward=timestep.reward,
discount=timestep.discount,
next_state=timestep.observation,
)
self.buffer.append(transition)
self._prev_state = timestep.observation
def sample(self, batch_size: int) -> Transitions:
# Sample a random batch of Transitions as a list.
batch_as_list = random.sample(self.buffer, batch_size)
# Convert the list of `batch_size` Transitions into a single Transitions
# object where each field has `batch_size` stacked fields.
return tree_utils.stack_sequence_fields(batch_as_list)
def flush(self) -> Transitions:
entire_buffer = tree_utils.stack_sequence_fields(self.buffer)
self.buffer.clear()
return entire_buffer
def is_ready(self, batch_size: int) -> bool:
return batch_size <= len(self.buffer)
Section 1.2: NFQ Agent¶
Neural Fitted Q Iteration was one of the first papers to demonstrate how to leverage recent advances in Deep Learning to approximate the Q-value by a neural network\(^\dagger\). In other words, the value \(\color{green}Q(\color{red}{s}, \color{blue}{a})\) are approximated by the output of a neural network \(\color{green}{Q_w}(\color{red}{s}, \color{blue}{a})\) for each possible action \(\color{blue}{a} \in \color{blue}{\mathcal{A}}\)\(^\ddagger\).
When introducing function approximations, and neural networks in particular, we need to have a loss to optimize. But looking back at the tabular setting above, you can see that we already have some notion of error: the TD error.
By training our neural network to output values such that the TD error is minimized, we will also satisfy the Bellman Optimality Equation, which is a good sufficient condition to enforce, to obtain an optimal policy. Thanks to automatic differentiation, we can just write the TD error as a loss, e.g., with an \(\ell^2\) loss, but others would work too:
Then we can compute the gradient with respect to the parameters of the neural network and improve our Q-value approximation incrementally.
NFQ builds on \(\color{green}Q\)-learning, but if one were to update the Q-values online directly, the training can be unstable and very slow. Instead, NFQ uses a replay buffer, similar to what we see implemented above, to update the Q-value in a batched setting.
When it was introduced, it also was entirely off-policy using a uniformly random policy to collect data, which was prone to instability when applied to more complex environments (e.g., when the input are pixels or the tasks are longer and more complicated).
But it is a good stepping stone to the more complex agents used today. Here, we will look at a slightly different and modernised implementation of NFQ.
Below you will find an incomplete NFQ agent that takes in observations from a gridworld. Instead of receiving a tabular state, it receives an observation in the form of its \((x,y)\) coordinates in the gridworld, and the \((x,y)\) coordinates of the goal.
The goal of this coding exercise is to complete this agent by implementing the loss, using mean squared error.
\(^\dagger\) If you read the NFQ paper, they use a “control” notation, where there is a “cost to minimize”, instead of “rewards to maximize”, so don’t be surprised if signs/max/min do not correspond.
\(^\ddagger\) We could feed it \(\color{blue}{a}\) as well and ask \(Q_w\) for a single scalar value, but given we have a fixed number of actions and we usually need to take an \(argmax\) over them, it’s easiest to just output them all in one pass.
Coding Exercise 1.1: Implement NFQ¶
# Create a convenient container for the SARS tuples required by NFQ.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])
class NeuralFittedQAgent(acme.Actor):
"""
Implementation of a Neural Fitted Agent
"""
def __init__(self,
environment_spec: specs.EnvironmentSpec,
q_network: nn.Module,
replay_capacity: int = 100_000,
epsilon: float = 0.1,
batch_size: int = 1,
learning_rate: float = 3e-4):
"""
Neural Fitted Agent Initialisation
Args:
environment_spec: specs.EnvironmentSpec
* actions: DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)
* observations: Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')
* rewards: Array(shape=(), dtype=dtype('float32'), name='reward')
* discounts: BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
q_network: nn.Module,
Q Network
replay_capacity: int,
Capacity of the replay buffer [default: 100000]
epsilon: float
Controls the exploration-exploitation tradeoff
batch_size: int
Batch Size [default = 1]
learning_rate: float
Rate at which the neural fitted agent learns [default = 3e-4]
Returns:
Nothing
"""
# Store agent hyperparameters and network.
self._num_actions = environment_spec.actions.num_values
self._epsilon = epsilon
self._batch_size = batch_size
self._q_network = q_network
# Container for the computed loss (see run_loop implementation above).
self.last_loss = 0.0
# Create the replay buffer.
self._replay_buffer = ReplayBuffer(replay_capacity)
# Setup optimizer that will train the network to minimize the loss.
self._optimizer = torch.optim.Adam(self._q_network.parameters(), lr=learning_rate)
self._loss_fn = nn.MSELoss()
def select_action(self, observation):
"""
Chooses epsilon-greedy action
Args:
observation: enum
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)
Returns:
action: Integer
Chosen action based on epsilon-greedy policy
"""
# Compute Q-values.
q_values = self._q_network(torch.tensor(observation).unsqueeze(0)) # Adds batch dimension.
q_values = q_values.squeeze(0) # Removes batch dimension
# Select epsilon-greedy action.
if self._epsilon < torch.rand(1):
action = q_values.argmax(axis=-1)
else:
action = torch.randint(low=0, high=self._num_actions, size=(1, ), dtype=torch.int64).squeeze()
return action
def q_values(self, observation):
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))
return q_values.squeeze(0).detach()
def update(self):
"""
Updates replay buffer
Args:
None
Returns:
Nothing
"""
if not self._replay_buffer.is_ready(self._batch_size):
# If the replay buffer is not ready to sample from, do nothing.
return
# Sample a minibatch of transitions from experience replay.
transitions = self._replay_buffer.sample(self._batch_size)
# Note: each of these tensors will be of shape [batch_size, ...].
s = torch.tensor(transitions.state)
a = torch.tensor(transitions.action)
r = torch.tensor(transitions.reward)
d = torch.tensor(transitions.discount)
next_s = torch.tensor(transitions.next_state)
# Compute the Q-values at next states in the transitions.
with torch.no_grad():
q_next_s = self._q_network(next_s) # Shape [batch_size, num_actions].
max_q_next_s = q_next_s.max(axis=-1)[0]
# Compute the TD error and then the losses.
target_q_value = r + d * max_q_next_s
# Compute the Q-values at original state.
q_s = self._q_network(s)
# Gather the Q-value corresponding to each action in the batch.
q_s_a = q_s.gather(1, a.view(-1, 1)).squeeze(0)
#################################################
# Fill in missing code below (...),
# then remove or comment the line below to test your implementation
raise NotImplementedError("Student exercise: complete the NFQ Agent")
#################################################
# TODO Average the squared TD errors over the entire batch using
# self._loss_fn, which is defined above as nn.MSELoss()
# HINT: Take a look at the reference for nn.MSELoss here:
# https://pytorch.org/docs/stable/generated/torch.nn.MSELoss.html
# What should you put for the input and the target?
loss = ...
# Compute the gradients of the loss with respect to the q_network variables.
self._optimizer.zero_grad()
loss.backward()
# Apply the gradient update.
self._optimizer.step()
# Store the loss for logging purposes (see run_loop implementation above).
self.last_loss = loss.detach().numpy()
def observe_first(self, timestep: dm_env.TimeStep):
self._replay_buffer.add_first(timestep)
def observe(self, action: int, next_timestep: dm_env.TimeStep):
self._replay_buffer.add(action, next_timestep)
Train and Evaluate the NFQ Agent¶
Training the NFQ Agent¶
# @title Training the NFQ Agent
epsilon = 0.6 # @param {type:"number"}
max_episode_length = 200
# Create the environment.
grid = build_gridworld_task(
task='simple',
observation_type=ObservationType.AGENT_GOAL_POS,
max_episode_length=max_episode_length)
environment, environment_spec = setup_environment(grid)
# Define the neural function approximator (aka Q network).
q_network = nn.Sequential(nn.Linear(4, 50),
nn.ReLU(),
nn.Linear(50, 50),
nn.ReLU(),
nn.Linear(50, environment_spec.actions.num_values))
# Build the trainable Q-learning agent
agent = NeuralFittedQAgent(
environment_spec,
q_network,
epsilon=epsilon,
replay_capacity=100_000,
batch_size=10,
learning_rate=1e-3)
returns = run_loop(
environment=environment,
agent=agent,
num_episodes=500,
logger_time_delta=1.,
log_loss=True)
Evaluating the agent (set \(\epsilon=0\))¶
Temporarily change epsilon to be more greedy; remember to change it back.¶
# @title Evaluating the agent (set $\epsilon=0$)
# @markdown ##### Temporarily change epsilon to be more greedy; remember to change it back.
agent._epsilon = 0.0
# Record a few episodes.
frames = evaluate(environment, agent, evaluation_episodes=5)
# Change epsilon back.
agent._epsilon = epsilon
# Display the video of the episodes.
display_video(frames, frame_rate=6)
Episode 0 ended with reward -990.0 in 200 steps
Episode 1 ended with reward -990.0 in 200 steps
Episode 2 ended with reward -990.0 in 200 steps
Episode 3 ended with reward -990.0 in 200 steps
Episode 4 ended with reward -990.0 in 200 steps
Visualise the learned \(Q\) values¶
Evaluate the policy for every state, similar to tabular agents above.¶
# @title Visualise the learned $Q$ values
# @markdown ##### Evaluate the policy for every state, similar to tabular agents above.
environment.reset()
pi = np.zeros(grid._layout_dims, dtype=np.int32)
q = np.zeros(grid._layout_dims + (4, ))
for y in range(grid._layout_dims[0]):
for x in range(grid._layout_dims[1]):
# Hack observation to see what the Q-network would output at that point.
environment.set_state(x, y)
obs = environment.get_obs()
q[y, x] = np.asarray(agent.q_values(obs))
pi[y, x] = np.asarray(agent.select_action(obs))
plot_action_values(q)

Compare the Q-values approximated with the neural network with the tabular case. Notice how the neural network is generalizing from the visited states to the unvisited similar states, while in the tabular case we updated the value of each state only when we visited that state.
Compare the greedy and behaviour (\(\epsilon\)-greedy) policies¶
Compare the greedy policy with the agent’s policy¶
Notice that the agent’s behavior policy has a lot more randomness, due to the high \(\epsilon\). However, the greedy policy that’s learned is optimal.¶
# @title Compare the greedy policy with the agent's policy
# @markdown ##### Notice that the agent's behavior policy has a lot more randomness, due to the high $\epsilon$. However, the greedy policy that's learned is optimal.
environment.plot_greedy_policy(q)
plt.figtext(-.08, .95, 'Greedy policy using the learnt Q-values')
plt.title('')
plt.show()
environment.plot_policy(pi)
plt.figtext(-.08, .95, "Policy using the agent's behavior policy")
plt.title('')
plt.show()


Section 2: Deep Q-Networks (DQN)¶
Time estimate: ~30mins
Video 2: Deep Q-Networks (DQN)¶

In this section, we will look at an advanced deep RL Agent based on the following publication, Playing Atari with Deep Reinforcement Learning, which introduced the first deep learning model to successfully learn control policies directly from high-dimensional pixel inputs using RL.
Here the agent will act directly on a pixel representation of the gridworld. You can find an incomplete implementation below.
Coding Exercise 2.1: Run a DQN Agent¶
class DQN(acme.Actor):
"""
Implementation of a Deep Q Network Agent
"""
def __init__(self,
environment_spec: specs.EnvironmentSpec,
network: nn.Module,
replay_capacity: int = 100_000,
epsilon: float = 0.1,
batch_size: int = 1,
learning_rate: float = 5e-4,
target_update_frequency: int = 10):
"""
DQN Based Agent Initialisation
Args:
environment_spec: specs.EnvironmentSpec
* actions: DiscreteArray(shape=(), dtype=int32, name=action, minimum=0, maximum=3, num_values=4)
* observations: Array(shape=(9, 10, 3), dtype=dtype('float32'), name='observation_grid')
* rewards: Array(shape=(), dtype=dtype('float32'), name='reward')
* discounts: BoundedArray(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
network: nn.Module
Deep Q Network
replay_capacity: int
Capacity of the replay buffer [default: 100000]
epsilon: float
Controls the exploration-exploitation tradeoff
batch_size: int
Batch Size [default = 1]
learning_rate: float
Rate at which the neural fitted agent learns [default = 3e-4]
target_update_frequency: int
Frequency with which target network is updated
Returns:
Nothing
"""
# Store agent hyperparameters and network.
self._num_actions = environment_spec.actions.num_values
self._epsilon = epsilon
self._batch_size = batch_size
self._q_network = q_network
# Create a second q net with the same structure and initial values, which
# we'll be updating separately from the learned q-network.
self._target_network = copy.deepcopy(self._q_network)
# Container for the computed loss (see run_loop implementation above).
self.last_loss = 0.0
# Create the replay buffer.
self._replay_buffer = ReplayBuffer(replay_capacity)
# Keep an internal tracker of steps
self._current_step = 0
# How often to update the target network
self._target_update_frequency = target_update_frequency
# Setup optimizer that will train the network to minimize the loss.
self._optimizer = torch.optim.Adam(self._q_network.parameters(), lr=learning_rate)
self._loss_fn = nn.MSELoss()
def select_action(self, observation):
"""
Action Selection Algorithm
Args:
observation: enum
* ObservationType.STATE_INDEX: int32 index of agent occupied tile.
* ObservationType.AGENT_ONEHOT: NxN float32 grid, with a 1 where the
agent is and 0 elsewhere.
* ObservationType.GRID: NxNx3 float32 grid of feature channels.
First channel contains walls (1 if wall, 0 otherwise), second the
agent position (1 if agent, 0 otherwise) and third goal position
(1 if goal, 0 otherwise)
* ObservationType.AGENT_GOAL_POS: float32 tuple with
(agent_y, agent_x, goal_y, goal_x)
Returns:
action: Integer
Chosen random action
"""
# Compute Q-values.
# Sonnet requires a batch dimension, which we squeeze out right after.
q_values = self._q_network(torch.tensor(observation).unsqueeze(0)) # Adds batch dimension.
q_values = q_values.squeeze(0) # Removes batch dimension
# Select epsilon-greedy action.
if self._epsilon < torch.rand(1):
action = q_values.argmax(axis=-1)
else:
action = torch.randint(low=0, high=self._num_actions , size=(1, ), dtype=torch.int64).squeeze()
return action
def q_values(self, observation):
q_values = self._q_network(torch.tensor(observation).unsqueeze(0))
return q_values.squeeze(0).detach()
def update(self):
"""
Updates replay buffer
Args:
None
Returns:
Nothing
"""
self._current_step += 1
if not self._replay_buffer.is_ready(self._batch_size):
# If the replay buffer is not ready to sample from, do nothing.
return
# Sample a minibatch of transitions from experience replay.
transitions = self._replay_buffer.sample(self._batch_size)
# Optionally unpack the transitions to lighten notation.
# Note: each of these tensors will be of shape [batch_size, ...].
s = torch.tensor(transitions.state)
a = torch.tensor(transitions.action)
r = torch.tensor(transitions.reward)
d = torch.tensor(transitions.discount)
next_s = torch.tensor(transitions.next_state)
# Compute the Q-values at next states in the transitions.
with torch.no_grad():
#################################################
# Fill in missing code below (...),
# then remove or comment the line below to test your implementation
raise NotImplementedError("Student exercise: complete the DQN Agent")
#################################################
#TODO get the value of the next states evaluated by the target network
# HINT: use self._target_network, defined above.
q_next_s = ... # Shape [batch_size, num_actions].
max_q_next_s = q_next_s.max(axis=-1)[0]
# Compute the TD error and then the losses.
target_q_value = r + d * max_q_next_s
# Compute the Q-values at original state.
q_s = self._q_network(s)
# Gather the Q-value corresponding to each action in the batch.
q_s_a = q_s.gather(1, a.view(-1, 1)).squeeze(0)
# Average the squared TD errors over the entire batch
loss = self._loss_fn(target_q_value, q_s_a)
# Compute the gradients of the loss with respect to the q_network variables.
self._optimizer.zero_grad()
loss.backward()
# Apply the gradient update.
self._optimizer.step()
if self._current_step % self._target_update_frequency == 0:
self._target_network.load_state_dict(self._q_network.state_dict())
# Store the loss for logging purposes (see run_loop implementation above).
self.last_loss = loss.detach().numpy()
def observe_first(self, timestep: dm_env.TimeStep):
self._replay_buffer.add_first(timestep)
def observe(self, action: int, next_timestep: dm_env.TimeStep):
self._replay_buffer.add(action, next_timestep)
# Create a convenient container for the SARS tuples required by NFQ.
Transitions = collections.namedtuple(
'Transitions', ['state', 'action', 'reward', 'discount', 'next_state'])
Train and evaluate the DQN agent¶
# @title Train and evaluate the DQN agent
epsilon = 0.25 # @param {type: "number"}
num_episodes = 500 # @param {type: "integer"}
max_episode_length = 50 # @param {type: "integer"}
grid = build_gridworld_task(
task='simple',
observation_type=ObservationType.GRID,
max_episode_length=max_episode_length)
environment, environment_spec = setup_environment(grid)
class Permute(nn.Module):
"""
Build Agent's Network
"""
def __init__(self, order: list):
super(Permute,self).__init__()
self.order = order
def forward(self, x):
return x.permute(self.order)
q_network = nn.Sequential(Permute([0, 3, 1, 2]),
nn.Conv2d(3, 32, kernel_size=4,
stride=2,padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3,
stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(3, 1),
nn.Flatten(),
nn.Linear(384, 50),
nn.ReLU(),
nn.Linear(50, environment_spec.actions.num_values)
)
agent = DQN(
environment_spec=environment_spec,
network=q_network,
batch_size=10,
epsilon=epsilon,
target_update_frequency=25)
returns = run_loop(
environment=environment,
agent=agent,
num_episodes=num_episodes,
num_steps=100000)
Visualise the learned \(Q\) values¶
Evaluate the policy for every state, similar to tabular agents above.¶
# @title Visualise the learned $Q$ values
# @markdown ##### Evaluate the policy for every state, similar to tabular agents above.
pi = np.zeros(grid._layout_dims, dtype=np.int32)
q = np.zeros(grid._layout_dims + (4,))
for y in range(grid._layout_dims[0]):
for x in range(grid._layout_dims[1]):
# Hack observation to see what the Q-network would output at that point.
environment.set_state(x, y)
obs = environment.get_obs()
q[y, x] = np.asarray(agent.q_values(obs))
pi[y, x] = np.asarray(agent.select_action(obs))
plot_action_values(q)

Compare the greedy policy with the agent’s policy¶
# @title Compare the greedy policy with the agent's policy
environment.plot_greedy_policy(q)
plt.figtext(-.08, .95, "Greedy policy using the learnt Q-values")
plt.title('')
plt.show()
environment.plot_policy(pi)
plt.figtext(-.08, .95, "Policy using the agent's epsilon-greedy policy")
plt.title('')
plt.show()


Note: You get a better estimate of the value functions if you increase num_episodes
and max_episode_length
, but this will take longer to train. Feel free to play around after the day!
Section 3: Learning the policy directly¶
Time estimate: ~25mins
Video 3: Other Deep RL Methods¶
Cartpole task¶
Here we switch to training on a different kind of task, which has a continuous action space: Cartpole in Gym. As you recall from the video, policy-based methods are particularly well-suited for these kinds of tasks. We will be exploring two of those methods below.

Make a CartPole environment, gym.make('CartPole-v1')
¶
# @title Make a CartPole environment, `gym.make('CartPole-v1')`
env = gym.make('CartPole-v1')
# Set seeds
env.seed(SEED)
set_seed(SEED)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/tmp/ipykernel_47358/788376341.py in <module>
3
4 # Set seeds
----> 5 env.seed(SEED)
6 set_seed(SEED)
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239 if name.startswith("_"):
240 raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241 return getattr(self.env, name)
242
243 @property
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239 if name.startswith("_"):
240 raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241 return getattr(self.env, name)
242
243 @property
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/gym/core.py in __getattr__(self, name)
239 if name.startswith("_"):
240 raise AttributeError(f"accessing private attribute '{name}' is prohibited")
--> 241 return getattr(self.env, name)
242
243 @property
AttributeError: 'CartPoleEnv' object has no attribute 'seed'
Section 3.1: Policy gradient¶
Now we will turn to policy gradient methods. Rather than defining the policy in terms of a value function, i.e., \(\color{blue}\pi(\color{red}s) = \arg\max_{\color{blue}a}\color{green}Q(\color{red}s, \color{blue}a)\), we will directly parameterize the policy and write it as the distribution
Here \(\theta\) represent the parameters of the policy. We will update the policy parameters using gradient ascent to maximize expected future reward.
One convenient way to represent the conditional distribution above is as a function that takes a state \(\color{red}s\) and returns a distribution over actions \(\color{blue}a\).
Defined below is an agent which implements the REINFORCE algorithm. REINFORCE (Williams, 1992) is the simplest model-free general reinforcement learning technique.
The basic idea is to use probabilistic action choice. If the reward at the end turns out to be high, we make all actions in this sequence more likely (otherwise, we do the opposite).
This strategy could reinforce “bad” actions as well, however they will turn out to be part of trajectories with low reward and will likely not get accentuated.
From the lectures, we know that we need to compute
where \(\color{green} G_t\) is the sum over future rewards from time \(t\), defined as
The algorithm below will collect the state, action, and reward data in its buffer until it reaches a full trajectory. It will then update its policy given the above gradient (and the Adam optimizer).
A policy gradient trains an agent without explicitly mapping the value for every state-action pair in an environment by taking small steps and updating the policy based on the reward associated with that step. In this section, we will build a small network that trains using policy gradient using PyTorch.
The agent can receive a reward immediately for an action or it can receive the award at a later time such as the end of the episode.
The policy function our agent will try to learn is \(\pi_\theta(a,s)\), where \(\theta\) is the parameter vector, \(s\) is a particular state, and \(a\) is an action.
Monte-Carlo Policy Gradient approach will be used, which means the agent will run through an entire episode and then update policy based on the rewards obtained.
Set the hyperparameters for Policy Gradient¶
Only used in Policy Gradient Method:
# @title Set the hyperparameters for Policy Gradient
num_steps = 300
learning_rate = 0.01 # @param {type:"number"}
gamma = 0.99 # @param {type:"number"}
dropout = 0.6 # @param {type:"number"}
# @markdown Only used in Policy Gradient Method:
hidden_neurons = 128 # @param {type:"integer"}
Coding Exercise 3.1: Creating a simple neural network¶
Below you will find some incomplete code. Fill in the missing code to construct the specified neural network.
Let us define a simple feed forward neural network with one hidden layer of \(128\) neurons and a dropout of \(0.6\). Let’s use Adam as our optimizer and a learning rate of \(0.01\). Use the hyperparameters already defined rather than using explicit values.
Using dropout will significantly improve the performance of the policy. Do compare your results with and without dropout and experiment with other hyper-parameter values as well.
class PolicyGradientNet(nn.Module):
"""
Defines Policy Gradient Network with the following attributes:
Feed Forward Network with a single hidden layer
width: 128 neurons
dropout: 0.6
Optimizer: Adam
Learning Rate: 0.01
"""
def __init__(self):
"""
Initiate Policy Gradient Network with above mentioned parameters/hyperparameters
Args:
None
Returns:
Nothing
"""
super(PolicyGradientNet, self).__init__()
self.state_space = env.observation_space.shape[0]
self.action_space = env.action_space.n
#################################################
## TODO for students: Define two linear layers
## from the first expression
raise NotImplementedError("Student exercise: Create FF neural network.")
#################################################
# HINT: you can construct linear layers using nn.Linear(); what are the
# sizes of the inputs and outputs of each of the layers? Also remember
# that you need to use hidden_neurons (see hyperparameters section above).
# https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
self.l1 = ...
self.l2 = ...
self.gamma = gamma
# Episode policy and past rewards
self.past_policy = Variable(torch.Tensor())
self.reward_episode = []
# Overall reward and past loss
self.past_reward = []
self.past_loss = []
def forward(self, x):
model = torch.nn.Sequential(
self.l1,
nn.Dropout(p=dropout),
nn.ReLU(),
self.l2,
nn.Softmax(dim=-1)
)
return model(x)
Now let’s create an instance of the network we have defined and use Adam as the optimizer using the learning_rate as hyperparameter already defined above.
policy = PolicyGradientNet()
pg_optimizer = optim.Adam(policy.parameters(), lr=learning_rate)
Select Action¶
The select_action()
function chooses an action based on our policy probability distribution using the PyTorch distributions package. Our policy returns a probability for each possible action in our action space (move left or move right) as an array of length two such as \([0.7, 0.3]\). We then choose an action based on these probabilities, record our history, and return our action.
def select_action(state):
"""
Select an action (0 or 1) by running policy model and choosing based on the probabilities in state;
Args:
state: np.ndarray
Describes Agent's state
Returns:
action:
Returns chosen action based on policy's probability distribution
"""
state = torch.from_numpy(state).type(torch.FloatTensor)
state = policy(Variable(state))
c = Categorical(state)
action = c.sample()
# Add log probability of chosen action
if policy.past_policy.dim() != 0:
policy.past_policy = torch.cat([policy.past_policy, c.log_prob(action).reshape(1)])
else:
policy.past_policy = (c.log_prob(action).reshape(1))
return action
Update policy¶
This function updates the policy.
Reward \(G_t\)¶
We update our policy by taking a sample of the action value function \(Q^{\pi_\theta} (s_t,a_t)\) by playing through episodes of the game. \(Q^{\pi_\theta} (s_t,a_t)\) is defined as the expected return by taking action \(a\) in state \(s\) following policy \(\pi\).
We know that for every step the simulation continues we receive a reward of \(1\). We can use this to calculate the policy gradient at each time step, where \(r\) is the reward for a particular state-action pair. Rather than using the instantaneous reward, \(r\), we instead use a long term reward \(v_{t}\) where \(v_t\) is the discounted sum of all future rewards for the length of the episode. \(v_{t}\) is then,
where \(\gamma\) is the discount factor (\(0.99\)). For example, if an episode lasts 5 steps, the reward for each step will be \([4.90, 3.94, 2.97, 1.99, 1]\). Next we scale our reward vector by substracting the mean from each element and scaling to unit variance by dividing by the standard deviation. This practice is common for machine learning applications and the same operation as Scikit Learn’s StandardScaler. It also has the effect of compensating for future uncertainty.
Update Policy: equation¶
After each episode we apply Monte-Carlo Policy Gradient to improve our policy according to the equation:
We will then feed our policy history multiplied by our rewards to our optimizer and update the weights of our neural network using stochastic gradient ascent. This should increase the likelihood of actions that got our agent a larger reward.
The following function update_policy
updates the network weights and therefore the policy.
def update_policy():
"""
Helper function to update network weights and policy
Args:
None
Returns:
Nothing
"""
R = 0
rewards = []
# Discount future rewards back to the present using gamma
for r in policy.reward_episode[::-1]:
R = r + policy.gamma * R
rewards.insert(0, R)
# Scale rewards
rewards = torch.FloatTensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() +
np.finfo(np.float32).eps)
# Calculate loss
pg_loss = (torch.sum(torch.mul(policy.past_policy,
Variable(rewards)).mul(-1), -1))
# Update network weights
# Use zero_grad(), backward() and step() methods of the optimizer instance.
pg_optimizer.zero_grad()
pg_loss.backward()
# Update the weights
for param in policy.parameters():
param.grad.data.clamp_(-1, 1)
pg_optimizer.step()
# Save and intialize episode past counters
policy.past_loss.append(pg_loss.item())
policy.past_reward.append(np.sum(policy.reward_episode))
policy.past_policy = Variable(torch.Tensor())
policy.reward_episode= []
Training¶
This is our main policy training loop. For each step in a training episode, we choose an action, take a step through the environment, and record the resulting new state and reward. We call update_policy()
at the end of each episode to feed the episode history to our neural network and improve our policy.
def policy_gradient_train(episodes):
"""
Helper function to train policy gradient network
Args:
episodes: List
Log of state per episode
Returns:
Nothing
"""
running_reward = 10
for episode in range(episodes):
state = env.reset()
done = False
for time in range(1000):
action = select_action(state)
# Step through environment using chosen action
state, reward, done, _ = env.step(action.item())
# Save reward
policy.reward_episode.append(reward)
if done:
break
# Used to determine when the environment is solved.
running_reward = (running_reward * gamma) + (time * (1 - gamma))
update_policy()
if episode % 50 == 0:
print(f"Episode {episode}\tLast length: {time:5.0f}"
f"\tAverage length: {running_reward:.2f}")
if running_reward > env.spec.reward_threshold:
print(f"Solved! Running reward is now {running_reward} "
f"and the last episode runs to {time} time steps!")
break
Run the model¶
episodes = 500 # @param {type:"integer"}
policy_gradient_train(episodes)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_47358/1015231315.py in <module>
1 episodes = 500 # @param {type:"integer"}
----> 2 policy_gradient_train(episodes)
/tmp/ipykernel_47358/4092339584.py in policy_gradient_train(episodes)
16
17 for time in range(1000):
---> 18 action = select_action(state)
19 # Step through environment using chosen action
20 state, reward, done, _ = env.step(action.item())
/tmp/ipykernel_47358/1603330273.py in select_action(state)
11 Returns chosen action based on policy's probability distribution
12 """
---> 13 state = torch.from_numpy(state).type(torch.FloatTensor)
14 state = policy(Variable(state))
15 c = Categorical(state)
TypeError: expected np.ndarray (got tuple)
Plot the results¶
Plot the training performance for policy gradient¶
# @title Plot the training performance for policy gradient
def plot_policy_gradient_training():
"""
Helper function to plot the training performance
of policy gradient network
Args:
None
Returns:
Nothing
"""
window = int(episodes / 20)
fig, ((ax1), (ax2)) = plt.subplots(1, 2, sharey=True, figsize=[15, 4]);
rolling_mean = pd.Series(policy.past_reward).rolling(window).mean()
std = pd.Series(policy.past_reward).rolling(window).std()
ax1.plot(rolling_mean)
ax1.fill_between(range(len(policy.past_reward)),
rolling_mean-std, rolling_mean+std,
color='orange', alpha=0.2)
ax1.set_title(f"Episode Length Moving Average ({window}-episode window)")
ax1.set_xlabel('Episode'); ax1.set_ylabel('Episode Length')
ax2.plot(policy.past_reward)
ax2.set_title('Episode Length')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Episode Length')
fig.tight_layout(pad=2)
plt.show()
plot_policy_gradient_training()
Exercise 3.1: Explore different hyperparameters.¶
Try running the model again, by modifying the hyperparameters and observe the outputs. Be sure to rerun the function definition cells in order to pick up on the updated values.
What do you see when you
increase learning rate
decrease learning rate
decrease gamma (\(\gamma\))
increase number of hidden neurons in the network
Section 3.2: Actor-critic¶
Recall the policy gradient
The policy parameters are updated using Monte Carlo technique and uses random samples. This introduces high variability in log probabilities and cumulative reward values. This leads to noisy gradients and can cause unstable learning.
One way to reduce variance and increase stability is subtracting the cumulative reward by a baseline:
Intuitively, reducing cumulative reward will make smaller gradients and thus smaller and more stable (hopefully) updates.
From the lecture slides, we know that in Actor Critic Method:
The Critic estimates the value function. This could be the action-value (the Q value) or state-value (the V value).
The Actor updates the policy distribution in the direction suggested by the Critic (such as with policy gradients).
Both the Critic and Actor functions are parameterized with neural networks. The “Critic” network parameterizes the Q-value.
Set the hyperparameters for Actor Critic¶
# @title Set the hyperparameters for Actor Critic
learning_rate = 0.01 # @param {type:"number"}
gamma = 0.99 # @param {type:"number"}
dropout = 0.6
# Only used in Actor-Critic Method
hidden_size = 256 # @param {type:"integer"}
num_steps = 300
Actor Critic Network¶
class ActorCriticNet(nn.Module):
"""
Build Actor Critic Network
"""
def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
"""
Initiate Actor Critic Network
Args:
num_inputs: int
Number of inputs incoming into the Network
num_actions: int
Number of actions
hidden_size: int
Size of hidden layer in the network
learning_rate: Float
Learning rate of Actor Critic Network
Returns:
Nothing
"""
super(ActorCriticNet, self).__init__()
self.num_actions = num_actions
self.critic_linear1 = nn.Linear(num_inputs, hidden_size)
self.critic_linear2 = nn.Linear(hidden_size, 1)
self.actor_linear1 = nn.Linear(num_inputs, hidden_size)
self.actor_linear2 = nn.Linear(hidden_size, num_actions)
self.all_rewards = []
self.all_lengths = []
self.average_lengths = []
def forward(self, state):
"""
Describes forward pass of Actor Critic Network
Args:
state: np.ndarray
Describes state
Returns:
Value and Policy Distribution
"""
state = Variable(torch.from_numpy(state).float().unsqueeze(0))
value = F.relu(self.critic_linear1(state))
value = self.critic_linear2(value)
policy_dist = F.relu(self.actor_linear1(state))
policy_dist = F.softmax(self.actor_linear2(policy_dist), dim=1)
return value, policy_dist
Training¶
def actor_critic_train(episodes):
"""
Helper function to train Actor Critic Network
Args:
episodes: list
Log of episode for all episodes
Returns:
Nothing
"""
all_lengths = []
average_lengths = []
all_rewards = []
entropy_term = 0
for episode in range(episodes):
log_probs = []
values = []
rewards = []
state = env.reset()
for steps in range(num_steps):
value, policy_dist = actor_critic.forward(state)
value = value.detach().numpy()[0, 0]
dist = policy_dist.detach().numpy()
action = np.random.choice(num_outputs, p=np.squeeze(dist))
log_prob = torch.log(policy_dist.squeeze(0)[action])
entropy = -np.sum(np.mean(dist) * np.log(dist))
new_state, reward, done, _ = env.step(action)
rewards.append(reward)
values.append(value)
log_probs.append(log_prob)
entropy_term += entropy
state = new_state
if done or steps == num_steps - 1:
qval, _ = actor_critic.forward(new_state)
qval = qval.detach().numpy()[0, 0]
all_rewards.append(np.sum(rewards))
all_lengths.append(steps)
average_lengths.append(np.mean(all_lengths[-10:]))
if episode % 50 == 0:
print(f"episode: {episode},\treward: {np.sum(rewards)},"
f"\ttotal length: {steps},"
f"\taverage length: {average_lengths[-1]}")
break
# compute Q values
qvals = np.zeros_like(values)
for t in reversed(range(len(rewards))):
qval = rewards[t] + gamma * qval
qvals[t] = qval
#update actor critic
values = torch.FloatTensor(values)
qvals = torch.FloatTensor(qvals)
log_probs = torch.stack(log_probs)
advantage = qvals - values
actor_loss = (-log_probs * advantage).mean()
critic_loss = 0.5 * advantage.pow(2).mean()
ac_loss = actor_loss + critic_loss + 0.001 * entropy_term
ac_optimizer.zero_grad()
ac_loss.backward()
ac_optimizer.step()
# Store results
actor_critic.average_lengths = average_lengths
actor_critic.all_rewards = all_rewards
actor_critic.all_lengths = all_lengths
Run the model¶
episodes = 500 # @param {type:"integer"}
env.reset()
num_inputs = env.observation_space.shape[0]
num_outputs = env.action_space.n
actor_critic = ActorCriticNet(num_inputs, num_outputs, hidden_size)
ac_optimizer = optim.Adam(actor_critic.parameters())
actor_critic_train(episodes)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_47358/2311026764.py in <module>
9 ac_optimizer = optim.Adam(actor_critic.parameters())
10
---> 11 actor_critic_train(episodes)
/tmp/ipykernel_47358/2597994127.py in actor_critic_train(episodes)
22 state = env.reset()
23 for steps in range(num_steps):
---> 24 value, policy_dist = actor_critic.forward(state)
25 value = value.detach().numpy()[0, 0]
26 dist = policy_dist.detach().numpy()
/tmp/ipykernel_47358/1275356442.py in forward(self, state)
45 Value and Policy Distribution
46 """
---> 47 state = Variable(torch.from_numpy(state).float().unsqueeze(0))
48 value = F.relu(self.critic_linear1(state))
49 value = self.critic_linear2(value)
TypeError: expected np.ndarray (got tuple)
Plot the results¶
Plot the training performance for Actor Critic¶
# @title Plot the training performance for Actor Critic
def plot_actor_critic_training(actor_critic, episodes):
"""
Plot the training performance for Actor Critic
Args:
actor_critic: nn.module
Actor Critic Network whose performance is to be plotted
episodes: int
Number of episodes
Returns:
Nothing
"""
window = int(episodes / 20)
plt.figure(figsize=(15, 4))
plt.subplot(1, 2, 1)
smoothed_rewards = pd.Series(actor_critic.all_rewards).rolling(window).mean()
std = pd.Series(actor_critic.all_rewards).rolling(window).std()
plt.plot(smoothed_rewards, label='Smoothed rewards')
plt.fill_between(range(len(smoothed_rewards)),
smoothed_rewards - std, smoothed_rewards + std,
color='orange', alpha=0.2)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.subplot(1, 2, 2)
plt.plot(actor_critic.all_lengths, label='All lengths')
plt.plot(actor_critic.average_lengths, label='Average lengths')
plt.xlabel('Episode')
plt.ylabel('Episode length')
plt.legend()
plt.tight_layout()
plt.show()
plot_actor_critic_training(actor_critic, episodes)

Exercise 3.2.1: Effect of episodes on performance¶
Change the episodes from 500 to 3000 and observe the performance impact.
Exercise 3.2.2: Effect of learning rate on performance¶
Modify the hyperparameters related to learning_rate
and gamma
and observe the impact on the performance.
Be sure to rerun the function definition cells in order to pick up on the updated values.
Section 4: RL in the real world¶
Time estimate: ~10mins
Video 4: Real-world applications and ethics¶
Exercise 4: Group discussion¶
Form a group of 2-3 and have discussions (roughly 3 minutes each) of the following questions:
Safety: what are some safety issues that arise in RL that don’t arise with say, supervised learning?
Generalization: What happens if your RL agent is presented with data it hasn’t trained on (i.e., goes out of distribution)?
How important do you think interpretability is in the ethical and safe deployment of RL agents in the real world?
This should be a very open-ended discussion. Try to have everyone say at least one thing. They can either take these 3 questions in turn, with 3-4 minutes allotted to each, or address them all at once, and allow for a more natural conversation.
Appendix and further reading¶
Books and lecture notes
Sutton, R.S., Barto, A.G. (2018). Reinforcement learning: An introduction. MIT press, 2018. url: incompleteideas.net/book/RLbook2018
Szepesvari, C. (2010). Algorithms for reinforcement learning. Synthesis lectures on artificial intelligence and machine learning, 4(1): 1-103. url: sites.ualberta.ca/~szepesva/papers/RLAlgsInMDPs
Lectures and course
More practical:
Link to the tweet thread with resources recommended by the community.
This notebook is based on the EEML 2020 RL practical by Feryal Behbahani & Gheorghe Comanici. If you are interested in JAX you should try the colab. If you are interested in Tensorflow, there is also a version of the colab for the MLSS 2020 RL Tutorial that you can try!