Tutorial 4: Policy-based Player
Contents
Tutorial 4: Policy-based Player¶
Week 3, Day 5: Reinforcement Learning for Games and Deep Learning Thinking 3
By Neuromatch Academy
Content creators: Mandana Samiei, Raymond Chua, Tim Lilicrap, Blake Richards
Content reviewers: Arush Tagade, Lily Cheng, Melvin Selim Atay, Kelson Shilling-Scrivo
Content editors: Melvin Selim Atay, Spiros Chavlis, Gunnar Blohm
Production editors: Namrata Bafna, Gagana B, Spiros Chavlis
Tutorial Objectives¶
In this tutorial, you will learn how to implement a game loop and improve the performance of a random player.
The specific objectives for this tutorial:
Understand the format of two-players games
Learn about value network and policy network
In the Bonus sections you will learn about Monte Carlo Tree Search (MCTS) and compare its performance to policy-based and value-based players.
Setup¶
Install dependencies¶
# @title Install dependencies
!pip install coloredlogs --quiet
!pip3 install vibecheck datatops --quiet
from vibecheck import DatatopsContentReviewContainer
def content_review(notebook_section: str):
return DatatopsContentReviewContainer(
"", # No text prompt
notebook_section,
{
"url": "https://pmyvdlilci.execute-api.us-east-1.amazonaws.com/klab",
"name": "public_testbed",
"user_key": "3zg0t05r",
},
).render()
# Imports
import os
import time
import torch
import random
import logging
import coloredlogs
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm.notebook import tqdm
from pickle import Unpickler
log = logging.getLogger(__name__)
coloredlogs.install(level='INFO') # Change this to DEBUG to see more info.
Set random seed¶
Executing set_seed(seed=seed)
you are setting the seed
# @title Set random seed
# @markdown Executing `set_seed(seed=seed)` you are setting the seed
# For DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.
# Read more here: https://pytorch.org/docs/stable/notes/randomness.html
# Call `set_seed` function in the exercises to ensure reproducibility.
import random
import torch
def set_seed(seed=None, seed_torch=True):
"""
Function that controls randomness. NumPy and random modules must be imported.
Args:
seed : Integer
A non-negative integer that defines the random state. Default is `None`.
seed_torch : Boolean
If `True` sets the random seed for pytorch tensors, so pytorch module
must be imported. Default is `True`.
Returns:
Nothing.
"""
if seed is None:
seed = np.random.choice(2 ** 32)
random.seed(seed)
np.random.seed(seed)
if seed_torch:
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
print(f'Random seed {seed} has been set.')
# In case that `DataLoader` is used
def seed_worker(worker_id):
"""
DataLoader will reseed workers following randomness in
multi-process data loading algorithm.
Args:
worker_id: integer
ID of subprocess to seed. 0 means that
the data will be loaded in the main process
Refer: https://pytorch.org/docs/stable/data.html#data-loading-randomness for more details
Returns:
Nothing
"""
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
random.seed(worker_seed)
Set device (GPU or CPU). Execute set_device()
¶
# @title Set device (GPU or CPU). Execute `set_device()`
# especially if torch modules used.
# Inform the user if the notebook uses GPU or CPU.
def set_device():
"""
Set the device. CUDA if available, CPU otherwise
Args:
None
Returns:
Nothing
"""
device = "cuda" if torch.cuda.is_available() else "cpu"
if device != "cuda":
print("WARNING: For this notebook to perform best, "
"if possible, in the menu under `Runtime` -> "
"`Change runtime type.` select `GPU` ")
else:
print("GPU is enabled in this notebook.")
return device
SEED = 2021
set_seed(seed=SEED)
DEVICE = set_device()
Random seed 2021 has been set.
GPU is enabled in this notebook.
Download the modules¶
# @title Download the modules
# @markdown Run this cell!
# @markdown Download from OSF. The original repo is https://github.com/raymondchua/nma_rl_games.git
import os, io, sys, shutil, zipfile
from urllib.request import urlopen
# download from github repo directly
#!git clone git://github.com/raymondchua/nma_rl_games.git --quiet
REPO_PATH = 'nma_rl_games'
if os.path.exists(REPO_PATH):
download_string = "Redownloading"
shutil.rmtree(REPO_PATH)
else:
download_string = "Downloading"
zipurl = 'https://osf.io/kf4p9/download'
print(f"{download_string} and unzipping the file... Please wait.")
with urlopen(zipurl) as zipresp:
with zipfile.ZipFile(io.BytesIO(zipresp.read())) as zfile:
zfile.extractall()
print("Download completed.")
print(f"Add the {REPO_PATH} in the path and import the modules.")
# add the repo in the path
sys.path.append('nma_rl_games/alpha-zero')
# @markdown Import modules designed for use in this notebook
import Arena
from utils import *
from Game import Game
from NeuralNet import NeuralNet
from othello.OthelloLogic import Board
Redownloading and unzipping the file... Please wait.
Download completed.
Add the nma_rl_games in the path and import the modules.
Helper functions from previous tutorials¶
# @title Helper functions from previous tutorials
class OthelloGame(Game):
"""
Instantiate Othello Game
"""
square_content = {
-1: "X",
+0: "-",
+1: "O"
}
@staticmethod
def getSquarePiece(piece):
return OthelloGame.square_content[piece]
def __init__(self, n):
self.n = n
def getInitBoard(self):
# Return initial board (numpy board)
b = Board(self.n)
return np.array(b.pieces)
def getBoardSize(self):
# (a,b) tuple
return (self.n, self.n)
def getActionSize(self):
# Return number of actions, n is the board size and +1 is for no-op action
return self.n*self.n + 1
def getCanonicalForm(self, board, player):
# Return state if player==1, else return -state if player==-1
return player*board
def stringRepresentation(self, board):
return board.tobytes()
def stringRepresentationReadable(self, board):
board_s = "".join(self.square_content[square] for row in board for square in row)
return board_s
def getScore(self, board, player):
b = Board(self.n)
b.pieces = np.copy(board)
return b.countDiff(player)
@staticmethod
def display(board):
n = board.shape[0]
print(" ", end="")
for y in range(n):
print(y, end=" ")
print("")
print("-----------------------")
for y in range(n):
print(y, "|", end="") # Print the row
for x in range(n):
piece = board[y][x] # Get the piece to print
print(OthelloGame.square_content[piece], end=" ")
print("|")
print("-----------------------")
@staticmethod
def displayValidMoves(moves):
# Display possible moves
A=np.reshape(moves[0:-1], board.shape)
n = board.shape[0]
print(" ")
print("possible moves")
print(" ", end="")
for y in range(n):
print(y, end=" ")
print("")
print("-----------------------")
for y in range(n):
print(y, "|", end="") # Print the row
for x in range(n):
piece = A[y][x] # Get the piece to print
print(OthelloGame.square_content[piece], end=" ")
print("|")
print("-----------------------")
def getNextState(self, board, player, action):
"""
Helper function to make valid move
If player takes action on board, return next (board,player)
and action must be a valid move
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
player: Integer
ID of current player
action: np.ndarray
Space of actions
Returns:
(board,player) tuple signifying next state
"""
if action == self.n*self.n:
return (board, -player)
b = Board(self.n)
b.pieces = np.copy(board)
move = (int(action/self.n), action%self.n)
b.execute_move(move, player)
return (b.pieces, -player)
def getValidMoves(self, board, player):
"""
Helper function to make valid move
If player takes action on board, return next (board,player)
and action must be a valid move
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
player: Integer
ID of current player
action: np.ndarray
Space of action
Returns:
valids: np.ndarray
Returns a fixed size binary vector
"""
valids = [0]*self.getActionSize()
b = Board(self.n)
b.pieces = np.copy(board)
legalMoves = b.get_legal_moves(player)
if len(legalMoves)==0:
valids[-1]=1
return np.array(valids)
for x, y in legalMoves:
valids[self.n*x+y]=1
return np.array(valids)
def getGameEnded(self, board, player):
"""
Helper function to signify if game has ended
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
player: Integer
ID of current player
Returns:
0 if not ended, 1 if player 1 won, -1 if player 1 lost
"""
b = Board(self.n)
b.pieces = np.copy(board)
if b.has_legal_moves(player):
return 0
if b.has_legal_moves(-player):
return 0
if b.countDiff(player) > 0:
return 1
return -1
def getSymmetries(self, board, pi):
"""
Get mirror/rotational configurations of board
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
pi: np.ndarray
Dimension of board
Returns:
l: list
90 degree of board, 90 degree of pi_board
"""
assert(len(pi) == self.n**2+1) # 1 for pass
pi_board = np.reshape(pi[:-1], (self.n, self.n))
l = []
for i in range(1, 5):
for j in [True, False]:
newB = np.rot90(board, i)
newPi = np.rot90(pi_board, i)
if j:
newB = np.fliplr(newB)
newPi = np.fliplr(newPi)
l += [(newB, list(newPi.ravel()) + [pi[-1]])]
return l
class RandomPlayer():
"""
Simulates Random Player
"""
def __init__(self, game):
self.game = game
def play(self, board):
"""
Simulates game play
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
Returns:
a: int
Randomly chosen move
"""
# Compute the valid moves using getValidMoves()
valids = self.game.getValidMoves(board, 1)
# Compute the probability of each move being played (random player means this should
# be uniform for valid moves, 0 for others)
prob = valids/valids.sum()
# Pick an action based on the probabilities (hint: np.choice is useful)
a = np.random.choice(self.game.getActionSize(), p=prob)
return a
class OthelloNNet(nn.Module):
"""
Instantiate Othello Neural Net with following configuration
nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1) # Convolutional Layer 1
nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1, padding=1) # Convolutional Layer 2
nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1) # Convolutional Layer 3
nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1) # Convolutional Layer 4
nn.BatchNorm2d(args.num_channels) X 4
nn.Linear(args.num_channels * (self.board_x - 4) * (self.board_y - 4), 1024) # Fully-connected Layer 1
nn.Linear(1024, 512) # Fully-connected Layer 2
nn.Linear(512, self.action_size) # Fully-connected Layer 3
nn.Linear(512, 1) # Fully-connected Layer 4
"""
def __init__(self, game, args):
"""
Initialise game parameters
Args:
game: OthelloGame instance
Instance of the OthelloGame class above;
args: dictionary
Instantiates number of iterations and episodes, controls temperature threshold, queue length,
arena, checkpointing, and neural network parameters:
learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,
num_channels: 512
Returns:
Nothing
"""
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
self.args = args
super(OthelloNNet, self).__init__()
self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
self.conv2 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1,
padding=1)
self.conv3 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)
self.conv4 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)
self.bn1 = nn.BatchNorm2d(args.num_channels)
self.bn2 = nn.BatchNorm2d(args.num_channels)
self.bn3 = nn.BatchNorm2d(args.num_channels)
self.bn4 = nn.BatchNorm2d(args.num_channels)
self.fc1 = nn.Linear(args.num_channels * (self.board_x - 4) * (self.board_y - 4), 1024)
self.fc_bn1 = nn.BatchNorm1d(1024)
self.fc2 = nn.Linear(1024, 512)
self.fc_bn2 = nn.BatchNorm1d(512)
self.fc3 = nn.Linear(512, self.action_size)
self.fc4 = nn.Linear(512, 1)
def forward(self, s):
"""
Controls forward pass of OthelloNNet
Args:
s: np.ndarray
Array of size (batch_size x board_x x board_y)
Returns:
Probability distribution over actions at the current state and the value of the current state.
"""
s = s.view(-1, 1, self.board_x, self.board_y) # batch_size x 1 x board_x x board_y
s = F.relu(self.bn1(self.conv1(s))) # batch_size x num_channels x board_x x board_y
s = F.relu(self.bn2(self.conv2(s))) # batch_size x num_channels x board_x x board_y
s = F.relu(self.bn3(self.conv3(s))) # batch_size x num_channels x (board_x-2) x (board_y-2)
s = F.relu(self.bn4(self.conv4(s))) # batch_size x num_channels x (board_x-4) x (board_y-4)
s = s.view(-1, self.args.num_channels * (self.board_x - 4) * (self.board_y - 4)) # reshaping of
s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.args.dropout, training=self.training) # batch_size x 1024
s = F.dropout(F.relu(self.fc_bn2(self.fc2(s))), p=self.args.dropout, training=self.training) # batch_size x 512
pi = self.fc3(s) # batch_size x action_size
v = self.fc4(s) # batch_size x 1
# Returns probability distribution over actions at the current state and the value of the current state.
return F.log_softmax(pi, dim=1), torch.tanh(v)
class ValueBasedPlayer():
"""
Simulate Value Based Player
"""
def __init__(self, game, vnet):
"""
Initialise value based player parameters
Args:
game: OthelloGame instance
Instance of the OthelloGame class above;
vnet: Value Network instance
Instance of the Value Network class above;
Returns:
Nothing
"""
self.game = game
self.vnet = vnet
def play(self, board):
"""
Simulate game play
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
Returns:
candidates: List
Collection of tuples describing action and values of future predicted states
"""
valids = self.game.getValidMoves(board, 1)
candidates = []
max_num_actions = 4
va = np.where(valids)[0]
va_list = va.tolist()
random.shuffle(va_list)
for a in va_list:
# Return next board state using getNextState() function
nextBoard, _ = self.game.getNextState(board, 1, a)
# Predict the value of next state using value network
value = self.vnet.predict(nextBoard)
# Add the value and the action as a tuple to the candidate lists, note that you might need to change the sign of the value based on the player
candidates += [(-value, a)]
if len(candidates) == max_num_actions:
break
# Sort by the values
candidates.sort()
# Return action associated with highest value
return candidates[0][1]
class ValueNetwork(NeuralNet):
"""
Initiates the Value Network
"""
def __init__(self, game):
"""
Initialise network parameters
Args:
game: OthelloGame instance
Instance of the OthelloGame class above;
Returns:
Nothing
"""
self.nnet = OthelloNNet(game, args)
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
self.nnet.to(args.device)
def train(self, games):
"""
Function to train value network
Args:
games: list
List of examples with each example is of form (board, pi, v)
Returns:
Nothing
"""
optimizer = optim.Adam(self.nnet.parameters())
for examples in games:
for epoch in range(args.epochs):
print('EPOCH ::: ' + str(epoch + 1))
self.nnet.train()
v_losses = [] # To store the losses per epoch
batch_count = int(len(examples) / args.batch_size) # len(examples)=200, batch-size=64, batch_count=3
t = tqdm(range(batch_count), desc='Training Value Network')
for _ in t:
sample_ids = np.random.randint(len(examples), size=args.batch_size) # Read the ground truth information from MCTS simulation using the loaded examples
boards, pis, vs = list(zip(*[examples[i] for i in sample_ids])) # Length of boards, pis, vis = 64
boards = torch.FloatTensor(np.array(boards).astype(np.float64))
target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))
# Predict
# To run on GPU if available
boards, target_vs = boards.contiguous().to(args.device), target_vs.contiguous().to(args.device)
# Compute output
_, out_v = self.nnet(boards)
l_v = self.loss_v(target_vs, out_v) # Total loss
# Record loss
v_losses.append(l_v.item())
t.set_postfix(Loss_v=l_v.item())
# Compute gradient and do SGD step
optimizer.zero_grad()
l_v.backward()
optimizer.step()
def predict(self, board):
"""
Function to perform prediction
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
Returns:
v: OthelloNet instance
Data of the OthelloNet class instance above;
"""
# Timing
start = time.time()
# Preparing input
board = torch.FloatTensor(board.astype(np.float64))
board = board.contiguous().to(args.device)
board = board.view(1, self.board_x, self.board_y)
self.nnet.eval()
with torch.no_grad():
_, v = self.nnet(board)
return v.data.cpu().numpy()[0]
def loss_v(self, targets, outputs):
"""
Calculates Mean squared error
Args:
targets: np.ndarray
Ground Truth variables corresponding to input
outputs: np.ndarray
Predictions of Network
Returns:
MSE Loss calculated as: square of the difference between your model's predictions
and the ground truth and average across the whole dataset
"""
# Mean squared error (MSE)
return torch.sum((targets - outputs.view(-1)) ** 2) / targets.size()[0]
def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
"""
Code Checkpointing
Args:
folder: string
Path specifying training examples
filename: string
File name of training examples
Returns:
Nothing
"""
filepath = os.path.join(folder, filename)
if not os.path.exists(folder):
print("Checkpoint Directory does not exist! Making directory {}".format(folder))
os.mkdir(folder)
else:
print("Checkpoint Directory exists! ")
torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
print("Model saved! ")
def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
"""
Load code checkpoint
Args:
folder: string
Path specifying training examples
filename: string
File name of training examples
Returns:
Nothing
"""
# https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
filepath = os.path.join(folder, filename)
if not os.path.exists(filepath):
raise ("No model in path {}".format(filepath))
checkpoint = torch.load(filepath, map_location=args.device)
self.nnet.load_state_dict(checkpoint['state_dict'])
def loadTrainExamples(folder, filename):
"""
Helper function to load Training examples
Args:
folder: string
Path specifying training examples
filename: string
File name of training examples
Returns:
trainExamplesHistory: list
Returns examples based on the model were already collected (loaded)
"""
trainExamplesHistory = []
modelFile = os.path.join(folder, filename)
examplesFile = modelFile + ".examples"
if not os.path.isfile(examplesFile):
print(f'File "{examplesFile}" with trainExamples not found!')
r = input("Continue? [y|n]")
if r != "y":
sys.exit()
else:
print("File with train examples found. Loading it...")
with open(examplesFile, "rb") as f:
trainExamplesHistory = Unpickler(f).load()
print('Loading done!')
return trainExamplesHistory
The hyperparameters used throughout the notebook.
args = dotdict({
'numIters': 1, # In training, number of iterations = 1000 and num of episodes = 100
'numEps': 1, # Number of complete self-play games to simulate during a new iteration.
'tempThreshold': 15, # To control exploration and exploitation
'updateThreshold': 0.6, # During arena playoff, new neural net will be accepted if threshold or more of games are won.
'maxlenOfQueue': 200, # Number of game examples to train the neural networks.
'numMCTSSims': 15, # Number of games moves for MCTS to simulate.
'arenaCompare': 10, # Number of games to play during arena play to determine if new net will be accepted.
'cpuct': 1,
'maxDepth':5, # Maximum number of rollouts
'numMCsims': 5, # Number of monte carlo simulations
'mc_topk': 3, # Top k actions for monte carlo rollout
'checkpoint': './temp/',
'load_model': False,
'load_folder_file': ('/dev/models/8x100x50','best.pth.tar'),
'numItersForTrainExamplesHistory': 20,
# Define neural network arguments
'lr': 0.001, # lr: Learning Rate
'dropout': 0.3,
'epochs': 10,
'batch_size': 64,
'device': DEVICE,
'num_channels': 512,
})
Load in trained value network
# @markdown Load in trained value network
model_save_name = 'ValueNetwork.pth.tar'
path = "nma_rl_games/alpha-zero/pretrained_models/models/"
set_seed(seed=SEED)
game = OthelloGame(6)
vnet = ValueNetwork(game)
vnet.load_checkpoint(folder=path, filename=model_save_name)
# Alternative if the downloading of trained model didn't work (will train the model)
if not os.listdir('nma_rl_games/alpha-zero/pretrained_models/models/'):
path = "nma_rl_games/alpha-zero/pretrained_models/data/"
loaded_games = loadTrainExamples(folder=path, filename='checkpoint_1.pth.tar')
set_seed(seed=SEED)
game = OthelloGame(6)
vnet = ValueNetwork(game)
vnet.train(loaded_games)
Random seed 2021 has been set.
A reminder of the network architecture

Section 1: Train a policy network from expert game data¶
Time estimate: ~25mins
Goal: How to train a policy network via supervised learning / behavioural cloning.
Exercise:
Train a network to predict the next move in an expert dataset by maximizing the log likelihood of the next action.
Video 1: Train a policy network¶
Submit your feedback¶
# @title Submit your feedback
content_review("W3D5_train_policy_network")
Coding Exercise 1: Implement PolicyNetwork
¶
In section 3 we simply chose to move based on the highest predicted value of the next step. Here, we will use a different approach. We will train a network to directly produce a policy function as a distribution over all possible discrete actions, given the current state. Learning will be based on expert moves; thus, we call this behavioral cloning.
We will use the exact same network that we have used above for the value function learning. But now we will train the network explicitly on every single move of expert players.
For computing our objective function, we will use the negative log-likelihood of targets \(t_i\) by using the cross-entropy function:
Note: remember that the OthelloNet already returns the Log-softmax of the output from the 3rd linear layer…
class PolicyNetwork(NeuralNet):
"""
Initialise Policy Network
"""
def __init__(self, game):
"""
Initalise policy network paramaters
Args:
game: OthelloGame instance
Instance of the OthelloGame class above;
Returns:
Nothing
"""
self.nnet = OthelloNNet(game, args)
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
self.nnet.to(args.device)
def train(self, games):
"""
Function for Policy Network Training
Args:
games: list
List of examples where each example is of form (board, pi, v)
Return:
Nothing
"""
optimizer = optim.Adam(self.nnet.parameters())
for examples in games:
for epoch in range(args.epochs):
print('EPOCH ::: ' + str(epoch + 1))
self.nnet.train()
pi_losses = []
batch_count = int(len(examples) / args.batch_size)
t = tqdm(range(batch_count), desc='Training Policy Network')
for _ in t:
sample_ids = np.random.randint(len(examples), size=args.batch_size)
boards, pis, _ = list(zip(*[examples[i] for i in sample_ids]))
boards = torch.FloatTensor(np.array(boards).astype(np.float64))
target_pis = torch.FloatTensor(np.array(pis))
# Predict
boards, target_pis = boards.contiguous().to(args.device), target_pis.contiguous().to(args.device)
#################################################
## TODO for students: ##
## 1. Compute the policy (pi) predicted by OthelloNNet() ##
## 2. Implement the loss_pi() function below and then use it to update the policy loss. ##
# Fill out function and remove
raise NotImplementedError("Compute the output")
#################################################
# Compute output
out_pi, _ = ...
l_pi = ...
# Record loss
pi_losses.append(l_pi.item())
t.set_postfix(Loss_pi=l_pi.item())
# Compute gradient and do SGD step
optimizer.zero_grad()
l_pi.backward()
optimizer.step()
def predict(self, board):
"""
Function to perform prediction
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
Returns:
Data from the OthelloNet class instance above;
"""
# Timing
start = time.time()
# Preparing input
board = torch.FloatTensor(board.astype(np.float64))
board = board.contiguous().to(args.device)
board = board.view(1, self.board_x, self.board_y)
self.nnet.eval()
with torch.no_grad():
pi,_ = self.nnet(board)
return torch.exp(pi).data.cpu().numpy()[0]
def loss_pi(self, targets, outputs):
"""
Calculates Negative Log Likelihood(NLL) of Targets
Args:
targets: np.ndarray
Ground Truth variables corresponding to input
outputs: np.ndarray
Predictions of Network
Returns:
Negative Log Likelihood calculated as: When training a model, we aspire to find the minima of a
loss function given a set of parameters (in a neural network, these are the weights and biases).
Sum the loss function to all the correct classes. So, whenever the network assigns high confidence at
the correct class, the NLL is low, but when the network assigns low confidence at the correct class,
the NLL is high.
"""
#################################################
## TODO for students: To implement the loss function, please compute and return the negative log likelihood of targets.
## For more information, here is a reference that connects the expression to the neg-log-prob: https://gombru.github.io/2018/05/23/cross_entropy_loss/
# Fill out function and remove
raise NotImplementedError("Compute the loss")
#################################################
return ...
def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
"""
Code Checkpointing
Args:
folder: string
Path specifying training examples
filename: string
File name of training examples
Returns:
Nothing
"""
filepath = os.path.join(folder, filename)
if not os.path.exists(folder):
print("Checkpoint Directory does not exist! Making directory {}".format(folder))
os.mkdir(folder)
else:
print("Checkpoint Directory exists! ")
torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
print("Model saved! ")
def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
"""
Load code checkpoint
Args:
folder: string
Path specifying training examples
filename: string
File name of training examples
Returns:
Nothing
"""
# https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
filepath = os.path.join(folder, filename)
if not os.path.exists(filepath):
raise ("No model in path {}".format(filepath))
checkpoint = torch.load(filepath, map_location=args.device)
self.nnet.load_state_dict(checkpoint['state_dict'])
Train the policy network¶
Important: Only run this cell if you do not have access to the pretrained models in the rl_for_games
repository.
if not os.listdir('nma_rl_games/alpha-zero/pretrained_models/models/'):
set_seed(seed=SEED)
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.train(loaded_games)
Submit your feedback¶
# @title Submit your feedback
content_review("W3D5_policy_network")
Section 2: Use a trained policy network to play games¶
Time estimate: ~25mins
Goal: Use a policy network to play games.
Exercise:
Use the policy network to give probabilities for the next move.
Build a player that takes the move given the maximum probability by the network.
Compare this to another player that samples moves according to the probability distribution output by the network.
Video 2: Play games using a policy network¶
Submit your feedback¶
# @title Submit your feedback
content_review("W3D5_play_games_policy_network")
Note: in the video’s softmax function, \(T=1\) is the softmax kernel and \(z_i\) is the networks output before softmax transformation.
Coding Exercise 2: Implement the PolicyBasedPlayer
¶
First we initialize the game and load in the pre-trained policy net.
model_save_name = 'PolicyNetwork.pth.tar'
path = "nma_rl_games/alpha-zero/pretrained_models/models/"
set_seed(seed=SEED)
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.load_checkpoint(folder=path, filename=model_save_name)
Random seed 2021 has been set.
Next we create our policy-based player by using the policy network to produce a set of action probabilities for all valid board positions.
There are at least 2 ways then to choose the next action:
sampling-based player: we sample from the action probability distribution. This will result in actions with higher probabilities to be randomly selected more often than actions with lower probabilities.
“greedy” player: we always choose the action with the highest action probability.
class PolicyBasedPlayer():
"""
Simulate Policy Based Player
"""
def __init__(self, game, pnet, greedy=True):
"""
Initialize Policy based player parameters
Args:
game: OthelloGame instance
Instance of the OthelloGame class above;
pnet: Policy Network instance
Instance of the Policy Network class above
greedy: Boolean
If true, implement greedy approach
Else, implement random sample policy based player
Returns:
Nothing
"""
self.game = game
self.pnet = pnet
self.greedy = greedy
def play(self, board):
"""
Simulate game play
Args:
board: np.ndarray
Board of size n x n [6x6 in this case]
Returns:
a: np.ndarray
If greedy, implement greedy policy player
Else, implement random sample policy based player
"""
valids = self.game.getValidMoves(board, 1)
#################################################
## TODO for students: ##
## 1. Compute the action probabilities using policy network pnet()
## 2. Mask invalid moves (set their action probability to 0) using valids variable and the action probabilites computed above.
## 3. Compute the sum over the probabilities of the valid actions and store them in sum_vap.
# Fill out function and remove
raise NotImplementedError("Define the play")
#################################################
action_probs = ...
vap = ... # Masking invalid moves
sum_vap = ...
if sum_vap > 0:
vap /= sum_vap # Renormalize
else:
# If all valid moves were masked we make all valid moves equally probable
print("All valid moves were masked, doing a workaround.")
vap = vap + valids
vap /= np.sum(vap)
if self.greedy:
# Greedy policy player
a = np.where(vap == np.max(vap))[0][0]
else:
# Sample-based policy player
a = np.random.choice(self.game.getActionSize(), p=vap)
return a
# Playing games
set_seed(seed=SEED)
num_games = 20
player1 = PolicyBasedPlayer(game, pnet, greedy=True).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
## Uncomment below to test!
# result = arena.playGames(num_games, verbose=False)
# print(f"\n\n{result}")
# win_rate_player1 = result[0] / num_games
# print(f"\nWin rate for greedy policy player 1 (vs random player 2) over {num_games} games: {round(win_rate_player1*100, 1)}%")
Random seed 2021 has been set.
Win rate for greedy policy player 1 (vs random player 2) over 20 games: 80.0%
model_save_name = 'PolicyNetwork.pth.tar'
path = "nma_rl_games/alpha-zero/pretrained_models/models/"
set_seed(seed=SEED)
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.load_checkpoint(folder=path, filename=model_save_name)
Random seed 2021 has been set.
Submit your feedback¶
# @title Submit your feedback
content_review("W3D5_PolicyBasedPlayer")
Section 3: Player comparisons¶
Time estimate: ~10mins
Next we want to compare how our different players fair, i.e. random vs. value-based vs. policy-based (greedy or sampling-based)… Feel free to explore some of the comparisons we have not explicitly provided below.
Comparing a sampling-based policy based player versus a random player¶
There’s often randomness in the results as we are running the players for a low number of games (only 20 games due to compute + time costs). So, when students are running the cells they might not get the expected result. To better measure the strength of players you can run more games!
set_seed(seed=SEED)
num_games = 20
game = OthelloGame(6)
player1 = PolicyBasedPlayer(game, pnet, greedy=False).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
result = arena.playGames(num_games, verbose=False)
print(f"\n\n{result}")
Random seed 2021 has been set.
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Cell In[22], line 7
5 player2 = RandomPlayer(game).play
6 arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
----> 7 result = arena.playGames(num_games, verbose=False)
8 print(f"\n\n{result}")
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:81, in Arena.playGames(self, num, verbose)
79 draws = 0
80 for _ in tqdm(range(num), desc="Arena.playGames (1)"):
---> 81 gameResult = self.playGame(verbose=verbose)
82 if gameResult == 1:
83 oneWon += 1
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:50, in Arena.playGame(self, verbose)
48 print("Turn ", str(it), "Player ", str(curPlayer))
49 self.display(board)
---> 50 action = players[curPlayer + 1](self.game.getCanonicalForm(board, curPlayer))
52 valids = self.game.getValidMoves(self.game.getCanonicalForm(board, curPlayer), 1)
54 if valids[action] == 0:
Cell In[19], line 46, in PolicyBasedPlayer.play(self, board)
39 valids = self.game.getValidMoves(board, 1)
40 #################################################
41 ## TODO for students: ##
42 ## 1. Compute the action probabilities using policy network pnet()
43 ## 2. Mask invalid moves (set their action probability to 0) using valids variable and the action probabilites computed above.
44 ## 3. Compute the sum over the probabilities of the valid actions and store them in sum_vap.
45 # Fill out function and remove
---> 46 raise NotImplementedError("Define the play")
47 #################################################
48 action_probs = ...
NotImplementedError: Define the play
win_rate_player1 = result[0]/num_games
print(f"Win rate for sample-based policy based player 1 (vs random player 2) over {num_games} games: {round(win_rate_player1*100, 1)}%")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[23], line 1
----> 1 win_rate_player1 = result[0]/num_games
2 print(f"Win rate for sample-based policy based player 1 (vs random player 2) over {num_games} games: {round(win_rate_player1*100, 1)}%")
NameError: name 'result' is not defined
Win rate for sample-based policy based player 1 (vs random player 2) over 20 games: 95.0%
Compare greedy policy based player versus value based player¶
set_seed(seed=SEED)
num_games = 20
game = OthelloGame(6)
player1 = PolicyBasedPlayer(game, pnet).play
player2 = ValueBasedPlayer(game, vnet).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
result = arena.playGames(num_games, verbose=False)
print(f"\n\n{result}")
Random seed 2021 has been set.
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Cell In[24], line 7
5 player2 = ValueBasedPlayer(game, vnet).play
6 arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
----> 7 result = arena.playGames(num_games, verbose=False)
8 print(f"\n\n{result}")
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:81, in Arena.playGames(self, num, verbose)
79 draws = 0
80 for _ in tqdm(range(num), desc="Arena.playGames (1)"):
---> 81 gameResult = self.playGame(verbose=verbose)
82 if gameResult == 1:
83 oneWon += 1
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:50, in Arena.playGame(self, verbose)
48 print("Turn ", str(it), "Player ", str(curPlayer))
49 self.display(board)
---> 50 action = players[curPlayer + 1](self.game.getCanonicalForm(board, curPlayer))
52 valids = self.game.getValidMoves(self.game.getCanonicalForm(board, curPlayer), 1)
54 if valids[action] == 0:
Cell In[19], line 46, in PolicyBasedPlayer.play(self, board)
39 valids = self.game.getValidMoves(board, 1)
40 #################################################
41 ## TODO for students: ##
42 ## 1. Compute the action probabilities using policy network pnet()
43 ## 2. Mask invalid moves (set their action probability to 0) using valids variable and the action probabilites computed above.
44 ## 3. Compute the sum over the probabilities of the valid actions and store them in sum_vap.
45 # Fill out function and remove
---> 46 raise NotImplementedError("Define the play")
47 #################################################
48 action_probs = ...
NotImplementedError: Define the play
win_rate_player1 = result[0]/num_games
print(f"Win rate for greedy policy based player 1 vs value based player) over {num_games} games: {round(win_rate_player1*100, 1)}%")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[25], line 1
----> 1 win_rate_player1 = result[0]/num_games
2 print(f"Win rate for greedy policy based player 1 vs value based player) over {num_games} games: {round(win_rate_player1*100, 1)}%")
NameError: name 'result' is not defined
Compare greedy policy based player versus sampling-based policy player¶
set_seed(seed=SEED)
num_games = 20
game = OthelloGame(6)
player1 = PolicyBasedPlayer(game, pnet).play # greedy player
player2 = PolicyBasedPlayer(game, pnet, greedy=False).play # sample-based player
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
result = arena.playGames(num_games, verbose=False)
print(f"\n\n{result}")
Random seed 2021 has been set.
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Cell In[26], line 7
5 player2 = PolicyBasedPlayer(game, pnet, greedy=False).play # sample-based player
6 arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
----> 7 result = arena.playGames(num_games, verbose=False)
8 print(f"\n\n{result}")
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:81, in Arena.playGames(self, num, verbose)
79 draws = 0
80 for _ in tqdm(range(num), desc="Arena.playGames (1)"):
---> 81 gameResult = self.playGame(verbose=verbose)
82 if gameResult == 1:
83 oneWon += 1
File ~/Desktop/TESTNMA/course-content-dl/tutorials/W3D5_ReinforcementLearningForGames/student/nma_rl_games/alpha-zero/Arena.py:50, in Arena.playGame(self, verbose)
48 print("Turn ", str(it), "Player ", str(curPlayer))
49 self.display(board)
---> 50 action = players[curPlayer + 1](self.game.getCanonicalForm(board, curPlayer))
52 valids = self.game.getValidMoves(self.game.getCanonicalForm(board, curPlayer), 1)
54 if valids[action] == 0:
Cell In[19], line 46, in PolicyBasedPlayer.play(self, board)
39 valids = self.game.getValidMoves(board, 1)
40 #################################################
41 ## TODO for students: ##
42 ## 1. Compute the action probabilities using policy network pnet()
43 ## 2. Mask invalid moves (set their action probability to 0) using valids variable and the action probabilites computed above.
44 ## 3. Compute the sum over the probabilities of the valid actions and store them in sum_vap.
45 # Fill out function and remove
---> 46 raise NotImplementedError("Define the play")
47 #################################################
48 action_probs = ...
NotImplementedError: Define the play
win_rate_player1 = result[0]/num_games
print(f"Win rate for greedy policy player 1 (vs sample based policy player) over {num_games} games: {round(win_rate_player1*100, 1)}%")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[27], line 1
----> 1 win_rate_player1 = result[0]/num_games
2 print(f"Win rate for greedy policy player 1 (vs sample based policy player) over {num_games} games: {round(win_rate_player1*100, 1)}%")
NameError: name 'result' is not defined
We’ve been diving into the code so take a few minutes to recap what the different players are with your group and how they’re choosing their actions (random player, value player, greedy policy player, sample-based policy player).
Submit your feedback¶
# @title Submit your feedback
content_review("W3D5_Player_Comparisons")
Section 4: Ethical aspects¶
Time estimate: ~5mins
Video 3: Unstoppable opponents¶
Submit your feedback¶
# @title Submit your feedback
content_review("W3D1_Unstoppable opponents")
Summary¶
In this tutorial, you have learned about policy-based players and compared them to random and value-based players.