{ "cells": [ { "cell_type": "markdown", "metadata": { "colab_type": "text", "execution": {}, "id": "view-in-github" }, "source": [ "\"Open   \"Open" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "# Bonus Tutorial: Planning with Monte Carlo Tree Search\n", "\n", "**Week 3, Day 5: Reinforcement Learning for Games & DL Thinking 3**\n", "\n", "**By Neuromatch Academy**\n", "\n", "__Content creators:__ Mandana Samiei, Raymond Chua, Kushaan Gupta, Tim Lilicrap, Blake Richards\n", "\n", "__Content reviewers:__ Arush Tagade, Lily Cheng, Melvin Selim Atay, Kelson Shilling-Scrivo\n", "\n", "__Content editors:__ Melvin Selim Atay, Spiros Chavlis, Gunnar Blohm\n", "\n", "__Production editors:__ Namrata Bafna, Gagana B, Spiros Chavlis" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "---\n", "# Tutorial Objectives\n", "\n", "In this tutorial, you will learn about Monte Carlo Tree Search (MCTS) and compare its performance to policy-based, value-based players, and Monte Carlo planners." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "remove-input" ] }, "outputs": [], "source": [ "# @markdown\n", "from IPython.display import IFrame\n", "from ipywidgets import widgets\n", "out = widgets.Output()\n", "with out:\n", " print(f\"If you want to download the slides: https://osf.io/download/h4utj/\")\n", " display(IFrame(src=f\"https://mfr.ca-1.osf.io/render?url=https://osf.io/h4utj/?direct%26mode=render%26action=download%26mode=render\", width=730, height=410))\n", "display(out)" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "These are the slides for the videos in the tutorial. If you want to locally download the slides, click [here](https://osf.io/h4utj/download)." ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "---\n", "# Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Install dependencies\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Install dependencies\n", "!pip install coloredlogs --quiet" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Install and import feedback gadget\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Install and import feedback gadget\n", "\n", "!pip3 install vibecheck datatops --quiet\n", "\n", "from vibecheck import DatatopsContentReviewContainer\n", "def content_review(notebook_section: str):\n", " return DatatopsContentReviewContainer(\n", " \"\", # No text prompt\n", " notebook_section,\n", " {\n", " \"url\": \"https://pmyvdlilci.execute-api.us-east-1.amazonaws.com/klab\",\n", " \"name\": \"neuromatch_dl\",\n", " \"user_key\": \"f379rz8y\",\n", " },\n", " ).render()\n", "\n", "\n", "feedback_prefix = \"W3D5_T3_Bonus\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "# Imports\n", "import os\n", "import math\n", "import random\n", "import time\n", "import torch\n", "import random\n", "import logging\n", "import coloredlogs\n", "\n", "import numpy as np\n", "import torch.nn as nn\n", "import torch.optim as optim\n", "import torch.nn.functional as F\n", "\n", "from tqdm.notebook import tqdm\n", "\n", "log = logging.getLogger(__name__)\n", "coloredlogs.install(level='INFO') # Change this to DEBUG to see more info." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " Executing `set_seed(seed=seed)` you are setting the seed\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Set random seed\n", "\n", "# @markdown Executing `set_seed(seed=seed)` you are setting the seed\n", "\n", "# For DL its critical to set the random seed so that students can have a\n", "# baseline to compare their results to expected results.\n", "# Read more here: https://pytorch.org/docs/stable/notes/randomness.html\n", "\n", "# Call `set_seed` function in the exercises to ensure reproducibility.\n", "def set_seed(seed=None, seed_torch=True):\n", " \"\"\"\n", " Function that controls randomness. NumPy and random modules must be imported.\n", "\n", " Args:\n", " seed : Integer\n", " A non-negative integer that defines the random state. Default is `None`.\n", " seed_torch : Boolean\n", " If `True` sets the random seed for pytorch tensors, so pytorch module\n", " must be imported. Default is `True`.\n", "\n", " Returns:\n", " Nothing.\n", " \"\"\"\n", " if seed is None:\n", " seed = np.random.choice(2 ** 32)\n", " random.seed(seed)\n", " np.random.seed(seed)\n", " if seed_torch:\n", " torch.manual_seed(seed)\n", " torch.cuda.manual_seed_all(seed)\n", " torch.cuda.manual_seed(seed)\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", " print(f'Random seed {seed} has been set.')\n", "\n", "\n", "# In case that `DataLoader` is used\n", "def seed_worker(worker_id):\n", " \"\"\"\n", " DataLoader will reseed workers following randomness in\n", " multi-process data loading algorithm.\n", "\n", " Args:\n", " worker_id: integer\n", " ID of subprocess to seed. 0 means that\n", " the data will be loaded in the main process\n", " Refer: https://pytorch.org/docs/stable/data.html#data-loading-randomness for more details\n", "\n", " Returns:\n", " Nothing\n", " \"\"\"\n", " worker_seed = torch.initial_seed() % 2**32\n", " np.random.seed(worker_seed)\n", " random.seed(worker_seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set device (GPU or CPU). Execute `set_device()`\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Set device (GPU or CPU). Execute `set_device()`\n", "# especially if torch modules used.\n", "\n", "# Inform the user if the notebook uses GPU or CPU.\n", "\n", "def set_device():\n", " \"\"\"\n", " Set the device. CUDA if available, CPU otherwise\n", "\n", " Args:\n", " None\n", "\n", " Returns:\n", " Nothing\n", " \"\"\"\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " if device != \"cuda\":\n", " print(\"WARNING: For this notebook to perform best, \"\n", " \"if possible, in the menu under `Runtime` -> \"\n", " \"`Change runtime type.` select `GPU` \")\n", " else:\n", " print(\"GPU is enabled in this notebook.\")\n", "\n", " return device" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "SEED = 2023\n", "set_seed(seed=SEED)\n", "DEVICE = set_device()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download the modules\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Download the modules\n", "\n", "# @markdown Run this cell!\n", "\n", "# @markdown Download from OSF. The original repo is https://github.com/raymondchua/nma_rl_games.git\n", "\n", "import os, io, sys, shutil, zipfile\n", "from urllib.request import urlopen\n", "\n", "# download from github repo directly\n", "#!git clone git://github.com/raymondchua/nma_rl_games.git --quiet\n", "REPO_PATH = 'nma_rl_games'\n", "\n", "if not os.path.exists(REPO_PATH):\n", " download_string = \"Downloading\"\n", " zipurl = 'https://osf.io/kf4p9/download'\n", " print(f\"{download_string} and unzipping the file... Please wait.\")\n", " with urlopen(zipurl) as zipresp:\n", " with zipfile.ZipFile(io.BytesIO(zipresp.read())) as zfile:\n", " zfile.extractall()\n", " print(\"Download completed.\")\n", "\n", "print(f\"Add the {REPO_PATH} in the path and import the modules.\")\n", "# add the repo in the path\n", "sys.path.append('nma_rl_games/alpha-zero')\n", "\n", "# @markdown Import modules designed for use in this notebook\n", "import Arena\n", "\n", "from utils import *\n", "from Game import Game\n", "from MCTS import MCTS\n", "from NeuralNet import NeuralNet\n", "\n", "# from othello.OthelloPlayers import *\n", "from othello.OthelloLogic import Board\n", "# from othello.OthelloGame import OthelloGame\n", "from othello.pytorch.NNet import NNetWrapper as NNet" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Helper functions from previous tutorials\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Helper functions from previous tutorials\n", "\n", "def loadTrainExamples(folder, filename):\n", " \"\"\"\n", " Helper function to load training examples\n", "\n", " Args:\n", " folder: string\n", " Path specifying training examples\n", " filename: string\n", " File name of training examples\n", "\n", " Returns:\n", " trainExamplesHistory: list\n", " Returns examples based on the model were already collected (loaded)\n", " \"\"\"\n", " trainExamplesHistory = []\n", " modelFile = os.path.join(folder, filename)\n", " examplesFile = modelFile + \".examples\"\n", " if not os.path.isfile(examplesFile):\n", " print(f'File \"{examplesFile}\" with trainExamples not found!')\n", " r = input(\"Continue? [y|n]\")\n", " if r != \"y\":\n", " sys.exit()\n", " else:\n", " print(\"File with train examples found. Loading it...\")\n", " with open(examplesFile, \"rb\") as f:\n", " trainExamplesHistory = Unpickler(f).load()\n", " print('Loading done!')\n", " return trainExamplesHistory\n", "\n", "\n", "def save_model_checkpoint(folder, filename, nnet):\n", " filepath = os.path.join(folder, filename)\n", "\n", " if not os.path.exists(folder):\n", " print(\"Checkpoint Directory does not exist! Making directory {}\".format(folder))\n", " os.mkdir(folder)\n", " else:\n", " print(\"Checkpoint Directory exists!\")\n", "\n", " torch.save({'state_dict': nnet.state_dict()}, filepath)\n", " print(\"Model saved!\")\n", "\n", "def load_model_checkpoint(folder, filename, nnet, device):\n", " filepath = os.path.join(folder, filename)\n", "\n", " if not os.path.exists(filepath):\n", " raise FileNotFoundError(\"No model in path {}\".format(filepath))\n", "\n", " checkpoint = torch.load(filepath, map_location=device)\n", " nnet.load_state_dict(checkpoint['state_dict'])\n", "\n", "\n", "class OthelloGame(Game):\n", " \"\"\"\n", " Othello game board\n", " \"\"\"\n", " square_content = {\n", " -1: \"X\",\n", " +0: \"-\",\n", " +1: \"O\"\n", " }\n", "\n", " @staticmethod\n", " def getSquarePiece(piece):\n", " return OthelloGame.square_content[piece]\n", "\n", " def __init__(self, n):\n", " self.n = n\n", "\n", " def getInitBoard(self):\n", " b = Board(self.n)\n", " return np.array(b.pieces)\n", "\n", " def getBoardSize(self):\n", " return (self.n, self.n)\n", "\n", " def getActionSize(self):\n", " # Return number of actions, n is the board size and +1 is for no-op action\n", " return self.n * self.n + 1\n", "\n", " def getCanonicalForm(self, board, player):\n", " # Return state if player==1, else return -state if player==-1\n", " return player * board\n", "\n", " def stringRepresentation(self, board):\n", " return board.tobytes()\n", "\n", " def stringRepresentationReadable(self, board):\n", " board_s = \"\".join(self.square_content[square] for row in board for square in row)\n", " return board_s\n", "\n", " def getScore(self, board, player):\n", " b = Board(self.n)\n", " b.pieces = np.copy(board)\n", " return b.countDiff(player)\n", "\n", " @staticmethod\n", " def display(board):\n", " n = board.shape[0]\n", " print(\" \", end=\"\")\n", " for y in range(n):\n", " print(y, end=\" \")\n", " print(\"\")\n", " print(\"-----------------------\")\n", " for y in range(n):\n", " print(y, \"|\", end=\"\") # Print the row\n", " for x in range(n):\n", " piece = board[y][x] # Get the piece to print\n", " print(OthelloGame.square_content[piece], end=\" \")\n", " print(\"|\")\n", " print(\"-----------------------\")\n", "\n", " @staticmethod\n", " def displayValidMoves(moves):\n", " A=np.reshape(moves[0:-1], board.shape)\n", " n = board.shape[0]\n", " print(\" \")\n", " print(\"possible moves\")\n", " print(\" \", end=\"\")\n", " for y in range(n):\n", " print(y, end=\" \")\n", " print(\"\")\n", " print(\"-----------------------\")\n", " for y in range(n):\n", " print(y, \"|\", end=\"\") # Print the row\n", " for x in range(n):\n", " piece = A[y][x] # Get the piece to print\n", " print(OthelloGame.square_content[piece], end=\" \")\n", " print(\"|\")\n", " print(\"-----------------------\")\n", "\n", " def getNextState(self, board, player, action):\n", " \"\"\"\n", " Make valid move. If player takes action on board, return next (board,player)\n", " and action must be a valid move\n", "\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", " player: Integer\n", " ID of current player\n", " action: np.ndarray\n", " Space of actions\n", "\n", " Returns:\n", " (board, player): tuple\n", " Next state representation\n", " \"\"\"\n", " if action == self.n*self.n:\n", " return (board, -player)\n", " b = Board(self.n)\n", " b.pieces = np.copy(board)\n", " move = (int(action/self.n), action%self.n)\n", " b.execute_move(move, player)\n", " return (b.pieces, -player)\n", "\n", " def getValidMoves(self, board, player):\n", " \"\"\"\n", " Get all valid moves for player\n", "\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", " player: Integer\n", " ID of current player\n", " action: np.ndarray\n", " Space of action\n", "\n", " Returns:\n", " valids: np.ndarray\n", " Valid moves for player\n", " \"\"\"\n", " valids = [0]*self.getActionSize()\n", " b = Board(self.n)\n", " b.pieces = np.copy(board)\n", " legalMoves = b.get_legal_moves(player)\n", " if len(legalMoves)==0:\n", " valids[-1]=1\n", " return np.array(valids)\n", " for x, y in legalMoves:\n", " valids[self.n*x+y]=1\n", " return np.array(valids)\n", "\n", " def getGameEnded(self, board, player):\n", " \"\"\"\n", " Check if game ended\n", "\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", " player: Integer\n", " ID of current player\n", "\n", " Returns:\n", " 0 if not ended, 1 if player 1 won, -1 if player 1 lost\n", " \"\"\"\n", " b = Board(self.n)\n", " b.pieces = np.copy(board)\n", " if b.has_legal_moves(player):\n", " return 0\n", " if b.has_legal_moves(-player):\n", " return 0\n", " if b.countDiff(player) > 0:\n", " return 1\n", " return -1\n", "\n", " def getSymmetries(self, board, pi):\n", " \"\"\"\n", " Get mirror/rotational configurations of board\n", "\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", " pi: np.ndarray\n", " Dimension of board\n", "\n", " Returns:\n", " l: list\n", " 90 degree of board, 90 degree of pi_board\n", " \"\"\"\n", " assert(len(pi) == self.n**2+1) # 1 for pass\n", " pi_board = np.reshape(pi[:-1], (self.n, self.n))\n", " l = []\n", "\n", " for i in range(1, 5):\n", " for j in [True, False]:\n", " newB = np.rot90(board, i)\n", " newPi = np.rot90(pi_board, i)\n", " if j:\n", " newB = np.fliplr(newB)\n", " newPi = np.fliplr(newPi)\n", " l += [(newB, list(newPi.ravel()) + [pi[-1]])]\n", " return l\n", "\n", "\n", "class RandomPlayer():\n", "\n", " def __init__(self, game):\n", " self.game = game\n", "\n", " def play(self, board):\n", " \"\"\"\n", " Simulates game play\n", "\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " a: int\n", " Randomly chosen move\n", " \"\"\"\n", "\n", " # Compute the valid moves using getValidMoves()\n", " valids = self.game.getValidMoves(board, 1)\n", "\n", " # Compute the probability of each move being played (random player means this should\n", " # be uniform for valid moves, 0 for others)\n", " prob = valids/valids.sum()\n", "\n", " # Pick an action based on the probabilities (hint: np.choice is useful)\n", " a = np.random.choice(self.game.getActionSize(), p=prob)\n", "\n", " return a\n", "\n", "\n", "class OthelloNNet(nn.Module):\n", "\n", " def __init__(self, game, args):\n", " \"\"\"\n", " Initialise game parameters\n", "\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " args: dictionary\n", " Instantiates number of iterations and episodes, controls temperature threshold, queue length,\n", " arena, checkpointing, and neural network parameters:\n", " learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,\n", " num_channels: 512\n", " \"\"\"\n", " self.board_x, self.board_y = game.getBoardSize()\n", " self.action_size = game.getActionSize()\n", " self.args = args\n", "\n", " super(OthelloNNet, self).__init__()\n", " self.conv1 = nn.Conv2d(in_channels=1, out_channels=args.num_channels,\n", " kernel_size=3, stride=1, padding=1)\n", " self.conv2 = nn.Conv2d(in_channels=args.num_channels,\n", " out_channels=args.num_channels, kernel_size=3,\n", " stride=1, padding=1)\n", " self.conv3 = nn.Conv2d(in_channels=args.num_channels,\n", " out_channels=args.num_channels, kernel_size=3,\n", " stride=1)\n", " self.conv4 = nn.Conv2d(in_channels=args.num_channels,\n", " out_channels=args.num_channels, kernel_size=3,\n", " stride=1)\n", "\n", " self.bn1 = nn.BatchNorm2d(num_features=args.num_channels)\n", " self.bn2 = nn.BatchNorm2d(num_features=args.num_channels)\n", " self.bn3 = nn.BatchNorm2d(num_features=args.num_channels)\n", " self.bn4 = nn.BatchNorm2d(num_features=args.num_channels)\n", "\n", " self.fc1 = nn.Linear(in_features=args.num_channels * (self.board_x - 4) * (self.board_y - 4),\n", " out_features=1024)\n", " self.fc_bn1 = nn.BatchNorm1d(num_features=1024)\n", "\n", " self.fc2 = nn.Linear(in_features=1024, out_features=512)\n", " self.fc_bn2 = nn.BatchNorm1d(num_features=512)\n", "\n", " self.fc3 = nn.Linear(in_features=512, out_features=self.action_size)\n", "\n", " self.fc4 = nn.Linear(in_features=512, out_features=1)\n", "\n", " def forward(self, s):\n", " \"\"\"\n", " Controls forward pass of OthelloNNet\n", "\n", " Args:\n", " s: np.ndarray\n", " Array of size (batch_size x board_x x board_y)\n", "\n", " Returns:\n", " prob, v: tuple of torch.Tensor\n", " Probability distribution over actions at the current state and the value\n", " of the current state.\n", " \"\"\"\n", " s = s.view(-1, 1, self.board_x, self.board_y) # batch_size x 1 x board_x x board_y\n", " s = F.relu(self.bn1(self.conv1(s))) # batch_size x num_channels x board_x x board_y\n", " s = F.relu(self.bn2(self.conv2(s))) # batch_size x num_channels x board_x x board_y\n", " s = F.relu(self.bn3(self.conv3(s))) # batch_size x num_channels x (board_x-2) x (board_y-2)\n", " s = F.relu(self.bn4(self.conv4(s))) # batch_size x num_channels x (board_x-4) x (board_y-4)\n", " s = s.view(-1, self.args.num_channels * (self.board_x - 4) * (self.board_y - 4))\n", "\n", " s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.args.dropout, training=self.training) # batch_size x 1024\n", " s = F.dropout(F.relu(self.fc_bn2(self.fc2(s))), p=self.args.dropout, training=self.training) # batch_size x 512\n", "\n", " pi = self.fc3(s) # batch_size x action_size\n", " v = self.fc4(s) # batch_size x 1\n", "\n", " return F.log_softmax(pi, dim=1), torch.tanh(v)\n", "\n", "\n", "class ValueNetwork(NeuralNet):\n", "\n", " def __init__(self, game):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame\n", " Instance of the OthelloGame class above\n", " \"\"\"\n", " self.nnet = OthelloNNet(game, args)\n", " self.board_x, self.board_y = game.getBoardSize()\n", " self.action_size = game.getActionSize()\n", " self.nnet.to(args.device)\n", "\n", " def train(self, games):\n", " \"\"\"\n", " Args:\n", " games: list\n", " List of examples with each example is of form (board, pi, v)\n", " \"\"\"\n", " optimizer = optim.Adam(self.nnet.parameters())\n", " for examples in games:\n", " for epoch in range(args.epochs):\n", " print('EPOCH ::: ' + str(epoch + 1))\n", " self.nnet.train()\n", " v_losses = [] # To store the losses per epoch\n", " batch_count = int(len(examples) / args.batch_size) # len(examples)=200, batch-size=64, batch_count=3\n", " t = tqdm(range(batch_count), desc='Training Value Network')\n", " for _ in t:\n", " sample_ids = np.random.randint(len(examples), size=args.batch_size) # Read the ground truth information from MCTS simulation using the loaded examples\n", " boards, pis, vs = list(zip(*[examples[i] for i in sample_ids])) # Length of boards, pis, vis = 64\n", " boards = torch.FloatTensor(np.array(boards).astype(np.float64))\n", " target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))\n", "\n", " # Predict\n", " # To run on GPU if available\n", " boards, target_vs = boards.contiguous().to(args.device), target_vs.contiguous().to(args.device)\n", "\n", " # Compute output\n", " _, out_v = self.nnet(boards)\n", " l_v = self.loss_v(target_vs, out_v) # Total loss\n", "\n", " # Record loss\n", " v_losses.append(l_v.item())\n", " t.set_postfix(Loss_v=l_v.item())\n", "\n", " # Compute gradient and do SGD step\n", " optimizer.zero_grad()\n", " l_v.backward()\n", " optimizer.step()\n", "\n", " def predict(self, board):\n", " \"\"\"\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " v: OthelloNet instance\n", " Data of the OthelloNet class instance above;\n", " \"\"\"\n", " # Timing\n", " start = time.time()\n", "\n", " # Preparing input\n", " board = torch.FloatTensor(board.astype(np.float64))\n", " board = board.contiguous().to(args.device)\n", " board = board.view(1, self.board_x, self.board_y)\n", " self.nnet.eval()\n", " with torch.no_grad():\n", " _, v = self.nnet(board)\n", " return v.data.cpu().numpy()[0]\n", "\n", " def loss_v(self, targets, outputs):\n", " \"\"\"\n", " Args:\n", " targets: np.ndarray\n", " Ground Truth variables corresponding to input\n", " outputs: np.ndarray\n", " Predictions of Network\n", "\n", " Returns:\n", " MSE Loss averaged across the whole dataset\n", " \"\"\"\n", " # Mean squared error (MSE)\n", " return torch.sum((targets - outputs.view(-1)) ** 2) / targets.size()[0]\n", "\n", " def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):\n", " save_model_checkpoint(folder, filename, self.nnet)\n", "\n", " def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):\n", " load_model_checkpoint(folder, filename, self.nnet, args.device)\n", "\n", "class ValueBasedPlayer():\n", "\n", " def __init__(self, game, vnet):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class\n", " vnet: Value Network instance\n", " Instance of the Value Network class\n", " \"\"\"\n", " self.game = game\n", " self.vnet = vnet\n", "\n", " def play(self, board):\n", " \"\"\"\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " candidates: List\n", " Collection of tuples describing action and values of future predicted\n", " states\n", " \"\"\"\n", " valids = self.game.getValidMoves(board, 1)\n", " candidates = []\n", " max_num_actions = 4\n", " va = np.where(valids)[0]\n", " va_list = va.tolist()\n", " random.shuffle(va_list)\n", " for a in va_list:\n", " # Return next board state using getNextState() function\n", " nextBoard, _ = self.game.getNextState(board, 1, a)\n", " # Predict the value of next state using value network\n", " value = self.vnet.predict(nextBoard)\n", " # Add the value and the action as a tuple to the candidate lists, note that you might need to change the sign of the value based on the player\n", " candidates += [(-value, a)]\n", "\n", " if len(candidates) == max_num_actions:\n", " break\n", "\n", " # Sort by the values\n", " candidates.sort()\n", "\n", " # Return action associated with highest value\n", " return candidates[0][1]\n", "\n", "\n", "class PolicyNetwork(NeuralNet):\n", "\n", " def __init__(self, game):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame\n", " Instance of the OthelloGame class\n", " \"\"\"\n", " self.nnet = OthelloNNet(game, args)\n", " self.board_x, self.board_y = game.getBoardSize()\n", " self.action_size = game.getActionSize()\n", " self.nnet.to(args.device)\n", "\n", " def train(self, games):\n", " \"\"\"\n", " Args:\n", " games: list\n", " List of examples where each example is of form (board, pi, v)\n", " \"\"\"\n", " optimizer = optim.Adam(self.nnet.parameters())\n", "\n", " for examples in games:\n", " for epoch in range(args.epochs):\n", " print('EPOCH ::: ' + str(epoch + 1))\n", " self.nnet.train()\n", " pi_losses = []\n", "\n", " batch_count = int(len(examples) / args.batch_size)\n", "\n", " t = tqdm(range(batch_count), desc='Training Policy Network')\n", " for _ in t:\n", " sample_ids = np.random.randint(len(examples), size=args.batch_size)\n", " boards, pis, _ = list(zip(*[examples[i] for i in sample_ids]))\n", " boards = torch.FloatTensor(np.array(boards).astype(np.float64))\n", " target_pis = torch.FloatTensor(np.array(pis))\n", "\n", " # Predict\n", " boards, target_pis = boards.contiguous().to(args.device), target_pis.contiguous().to(args.device)\n", "\n", " # Compute output\n", " out_pi, _ = self.nnet(boards)\n", " l_pi = self.loss_pi(target_pis, out_pi)\n", "\n", " # Record loss\n", " pi_losses.append(l_pi.item())\n", " t.set_postfix(Loss_pi=l_pi.item())\n", "\n", " # Compute gradient and do SGD step\n", " optimizer.zero_grad()\n", " l_pi.backward()\n", " optimizer.step()\n", "\n", " def predict(self, board):\n", " \"\"\"\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " Data from the OthelloNet instance\n", " \"\"\"\n", " # Timing\n", " start = time.time()\n", "\n", " # Preparing input\n", " board = torch.FloatTensor(board.astype(np.float64))\n", " board = board.contiguous().to(args.device)\n", " board = board.view(1, self.board_x, self.board_y)\n", " self.nnet.eval()\n", " with torch.no_grad():\n", " pi,_ = self.nnet(board)\n", " return torch.exp(pi).data.cpu().numpy()[0]\n", "\n", " def loss_pi(self, targets, outputs):\n", " \"\"\"\n", " Calculates Negative Log Likelihood(NLL) of Targets\n", "\n", " Args:\n", " targets: np.ndarray\n", " Ground Truth variables corresponding to input\n", " outputs: np.ndarray\n", " Predictions of Network\n", "\n", " Returns:\n", " Negative Log Likelihood calculated as: When training a model, we aspire to\n", " find the minima of a loss function given a set of parameters (in a neural\n", " network, these are the weights and biases).\n", " Sum the loss function to all the correct classes. So, whenever the network\n", " assigns high confidence at the correct class, the NLL is low, but when the\n", " network assigns low confidence at the correct class, the NLL is high.\n", " \"\"\"\n", " ## For more information, here is a reference that connects the expression to\n", " # the neg-log-prob: https://gombru.github.io/2018/05/23/cross_entropy_loss/\n", " return -torch.sum(targets * outputs) / targets.size()[0]\n", "\n", " def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):\n", " save_model_checkpoint(folder, filename, self.nnet)\n", "\n", " def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):\n", " load_model_checkpoint(folder, filename, self.nnet, args.device)\n", "\n", "\n", "class PolicyBasedPlayer():\n", "\n", " def __init__(self, game, pnet, greedy=True):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " pnet: Policy Network instance\n", " Instance of the Policy Network class above\n", " greedy: Boolean\n", " If true, implement greedy approach\n", " Else, implement random sample policy based player\n", " \"\"\"\n", " self.game = game\n", " self.pnet = pnet\n", " self.greedy = greedy\n", "\n", " def play(self, board):\n", " \"\"\"\n", " Args:\n", " board: np.ndarray\n", " Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " a: np.ndarray\n", " If greedy, implement greedy policy player\n", " Else, implement random sample policy based player\n", " \"\"\"\n", " valids = self.game.getValidMoves(board, 1)\n", " action_probs = self.pnet.predict(board)\n", " vap = action_probs*valids # Masking invalid moves\n", " sum_vap = np.sum(vap)\n", "\n", " if sum_vap > 0:\n", " vap /= sum_vap # Renormalize\n", " else:\n", " # If all valid moves were masked we make all valid moves equally probable\n", " print(\"All valid moves were masked, doing a workaround.\")\n", " vap = vap + valids\n", " vap /= np.sum(vap)\n", "\n", " if self.greedy:\n", " # Greedy policy player\n", " a = np.where(vap == np.max(vap))[0][0]\n", " else:\n", " # Sample-based policy player\n", " a = np.random.choice(self.game.getActionSize(), p=vap)\n", "\n", " return a\n", "\n", "\n", "class MonteCarlo():\n", "\n", " def __init__(self, game, nnet, args):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " nnet: OthelloNet instance\n", " Instance of the OthelloNNet class above;\n", " args: dictionary\n", " Instantiates number of iterations and episodes, controls temperature threshold, queue length,\n", " arena, checkpointing, and neural network parameters:\n", " learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,\n", " num_channels: 512\n", " \"\"\"\n", " self.game = game\n", " self.nnet = nnet\n", " self.args = args\n", "\n", " self.Ps = {} # Stores initial policy (returned by neural net)\n", " self.Es = {} # Stores game.getGameEnded ended for board s\n", "\n", " # Call this rollout\n", " def simulate(self, canonicalBoard):\n", " \"\"\"\n", " Simulate one Monte Carlo rollout\n", "\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " temp_v:\n", " Terminal State\n", " \"\"\"\n", " s = self.game.stringRepresentation(canonicalBoard)\n", " init_start_state = s\n", " temp_v = 0\n", " isfirstAction = None\n", " current_player = -1 # opponent's turn (the agent has already taken an action before the simulation)\n", " self.Ps[s], _ = self.nnet.predict(canonicalBoard)\n", "\n", " for i in range(self.args.maxDepth): # maxDepth\n", "\n", " if s not in self.Es:\n", " self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)\n", " if self.Es[s] != 0:\n", " # Terminal state\n", " temp_v = self.Es[s] * current_player\n", " break\n", "\n", " self.Ps[s], v = self.nnet.predict(canonicalBoard)\n", " valids = self.game.getValidMoves(canonicalBoard, 1)\n", " self.Ps[s] = self.Ps[s] * valids # Masking invalid moves\n", " sum_Ps_s = np.sum(self.Ps[s])\n", "\n", " if sum_Ps_s > 0:\n", " self.Ps[s] /= sum_Ps_s # Renormalize\n", " else:\n", " # If all valid moves were masked make all valid moves equally probable\n", " # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.\n", " # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.\n", " log.error(\"All valid moves were masked, doing a workaround.\")\n", " self.Ps[s] = self.Ps[s] + valids\n", " self.Ps[s] /= np.sum(self.Ps[s])\n", "\n", " # Choose action according to the policy distribution\n", " a = np.random.choice(self.game.getActionSize(), p=self.Ps[s])\n", " # Find the next state and the next player\n", " next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)\n", " canonicalBoard = self.game.getCanonicalForm(next_s, next_player)\n", " s = self.game.stringRepresentation(next_s)\n", " current_player *= -1\n", " # Initial policy\n", " self.Ps[s], v = self.nnet.predict(canonicalBoard)\n", " temp_v = v.item() * current_player\n", "\n", " return temp_v\n", "\n", "\n", "class MonteCarloBasedPlayer():\n", " \"\"\"\n", " Simulate Player based on Monte Carlo Algorithm\n", " \"\"\"\n", "\n", " def __init__(self, game, nnet, args):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " nnet: OthelloNet instance\n", " Instance of the OthelloNNet class above;\n", " args: dictionary\n", " Instantiates number of iterations and episodes, controls temperature threshold, queue length,\n", " arena, checkpointing, and neural network parameters:\n", " learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,\n", " num_channels: 512\n", " \"\"\"\n", " self.game = game\n", " self.nnet = nnet\n", " self.args = args\n", " self.mc = MonteCarlo(game, nnet, args)\n", " self.K = self.args.mc_topk\n", "\n", " def play(self, canonicalBoard):\n", " \"\"\"\n", " Simulate Play on Canonical Board\n", "\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " best_action: tuple\n", " (avg_value, action) i.e., Average value associated with corresponding action\n", " i.e., Action with the highest topK probability\n", " \"\"\"\n", " self.qsa = []\n", " s = self.game.stringRepresentation(canonicalBoard)\n", " Ps, v = self.nnet.predict(canonicalBoard)\n", " valids = self.game.getValidMoves(canonicalBoard, 1)\n", " Ps = Ps * valids # Masking invalid moves\n", " sum_Ps_s = np.sum(Ps)\n", "\n", " if sum_Ps_s > 0:\n", " Ps /= sum_Ps_s # Renormalize\n", " else:\n", " # If all valid moves were masked make all valid moves equally probable\n", " # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.\n", " # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.\n", " log = logging.getLogger(__name__)\n", " log.error(\"All valid moves were masked, doing a workaround.\")\n", " Ps = Ps + valids\n", " Ps /= np.sum(Ps)\n", "\n", " num_valid_actions = np.shape(np.nonzero(Ps))[1]\n", "\n", " if num_valid_actions < self.K:\n", " top_k_actions = np.argpartition(Ps,-num_valid_actions)[-num_valid_actions:]\n", " else:\n", " top_k_actions = np.argpartition(Ps,-self.K)[-self.K:] # To get actions that belongs to top k prob\n", "\n", " for action in top_k_actions:\n", " next_s, next_player = self.game.getNextState(canonicalBoard, 1, action)\n", " next_s = self.game.getCanonicalForm(next_s, next_player)\n", "\n", " values = []\n", "\n", " # Do some rollouts\n", " for rollout in range(self.args.numMCsims):\n", " value = self.mc.simulate(next_s)\n", " values.append(value)\n", "\n", " # Average out values\n", " avg_value = np.mean(values)\n", " self.qsa.append((avg_value, action))\n", "\n", " self.qsa.sort(key=lambda a: a[0])\n", " self.qsa.reverse()\n", " best_action = self.qsa[0][1]\n", " return best_action\n", "\n", " def getActionProb(self, canonicalBoard, temp=1):\n", " \"\"\"\n", " Get probabilities associated with each action\n", "\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", " temp: Integer\n", " Signifies if game is in terminal state\n", "\n", " Returns:\n", " action_probs: List\n", " Probability associated with corresponding action\n", " \"\"\"\n", " if self.game.getGameEnded(canonicalBoard, 1) != 0:\n", " return np.zeros((self.game.getActionSize()))\n", "\n", " else:\n", " action_probs = np.zeros((self.game.getActionSize()))\n", " best_action = self.play(canonicalBoard)\n", " action_probs[best_action] = 1\n", "\n", " return action_probs" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "The hyperparameters used throughout the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "args = dotdict({\n", " 'numIters': 1, # In training, number of iterations = 1000 and num of episodes = 100\n", " 'numEps': 1, # Number of complete self-play games to simulate during a new iteration.\n", " 'tempThreshold': 15, # To control exploration and exploitation\n", " 'updateThreshold': 0.6, # During arena playoff, new neural net will be accepted if threshold or more of games are won.\n", " 'maxlenOfQueue': 200, # Number of game examples to train the neural networks.\n", " 'numMCTSSims': 15, # Number of games moves for MCTS to simulate.\n", " 'arenaCompare': 10, # Number of games to play during arena play to determine if new net will be accepted.\n", " 'cpuct': 1,\n", " 'maxDepth':5, # Maximum number of rollouts\n", " 'numMCsims': 5, # Number of monte carlo simulations\n", " 'mc_topk': 3, # Top k actions for monte carlo rollout\n", "\n", " 'checkpoint': './temp/',\n", " 'load_model': False,\n", " 'load_folder_file': ('/dev/models/8x100x50','best.pth.tar'),\n", " 'numItersForTrainExamplesHistory': 20,\n", "\n", " # Define neural network arguments\n", " 'lr': 0.001, # learning rate\n", " 'dropout': 0.3,\n", " 'epochs': 10,\n", " 'batch_size': 64,\n", " 'device': DEVICE,\n", " 'num_channels': 512,\n", "})" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "---\n", "# Section 1: Plan using Monte Carlo Tree Search (MCTS)\n", "\n", "*Time estimate: ~30 mins*" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "**Goal:** Teach students to understand the core ideas behind Monte Carlo Tree Search (MCTS)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Video 1: Plan with MCTS\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "remove-input" ] }, "outputs": [], "source": [ "# @title Video 1: Plan with MCTS\n", "from ipywidgets import widgets\n", "from ipywidgets import widgets\n", "from IPython.display import YouTubeVideo\n", "from IPython.display import IFrame\n", "from IPython.display import display\n", "\n", "\n", "class PlayVideo(IFrame):\n", " def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n", " self.id = id\n", " if source == 'Bilibili':\n", " src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n", " elif source == 'Osf':\n", " src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n", " super(PlayVideo, self).__init__(src, width, height, **kwargs)\n", "\n", "\n", "def display_videos(video_ids, W=400, H=300, fs=1):\n", " tab_contents = []\n", " for i, video_id in enumerate(video_ids):\n", " out = widgets.Output()\n", " with out:\n", " if video_ids[i][0] == 'Youtube':\n", " video = YouTubeVideo(id=video_ids[i][1], width=W,\n", " height=H, fs=fs, rel=0)\n", " print(f'Video available at https://youtube.com/watch?v={video.id}')\n", " else:\n", " video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n", " height=H, fs=fs, autoplay=False)\n", " if video_ids[i][0] == 'Bilibili':\n", " print(f'Video available at https://www.bilibili.com/video/{video.id}')\n", " elif video_ids[i][0] == 'Osf':\n", " print(f'Video available at https://osf.io/{video.id}')\n", " display(video)\n", " tab_contents.append(out)\n", " return tab_contents\n", "\n", "\n", "video_ids = [('Youtube', 'Hhw6Ed0Zmco'), ('Bilibili', 'BV1yQ4y127Sr')]\n", "tab_contents = display_videos(video_ids, W=730, H=410)\n", "tabs = widgets.Tab()\n", "tabs.children = tab_contents\n", "for i in range(len(tab_contents)):\n", " tabs.set_title(i, video_ids[i][0])\n", "display(tabs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit your feedback\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Submit your feedback\n", "content_review(f\"{feedback_prefix}_Plan_with_MCTS_Video\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "## Coding Exercise 1: MCTS planner\n", "\n", "In building the MCTS planner, we will focus on the action selection part, particularly the objective function used. MCTS will use a combination of the current action-value function $Q$ and the policy prior as follows:\n", "\n", "\\begin{equation}\n", "\\underset{a}{\\operatorname{argmax}} (Q(s_t, a)+u(s_t, a))\n", "\\end{equation}\n", "\n", "with $u(s_t, a)=c_{puct} \\cdot P(s,a) \\cdot \\frac{\\sqrt{\\sum_b N(s,b)}}{1+N(s,a)}$. This effectively implements an Upper Confidence bound applied to Trees (UCT). UCT balances exploration and exploitation by taking the values stored from the MCTS into account. The trade-off is parametrized by $c_{puct}$.\n", "\n", "**Note**: Polynomial Upper Confidence Trees (PUCT) is the technical term for the alorithm below in which we sequentially run MCTS and store/use information from previous runs to explore and find optimal actions).\n", "\n", "
\n", "\n", "**Exercise**:\n", "* Finish the MCTS planner by using UCT to select actions to build the tree.\n", "* Deploy the MCTS planner to build a tree search for a given board position, producing value estimates and action counts for that position." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "class MCTS():\n", "\n", " def __init__(self, game, nnet, args):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " nnet: OthelloNet instance\n", " Instance of the OthelloNNet class above;\n", " args: dictionary\n", " Instantiates number of iterations and episodes, controls temperature threshold, queue length,\n", " arena, checkpointing, and neural network parameters:\n", " learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,\n", " num_channels: 512\n", " \"\"\"\n", " self.game = game\n", " self.nnet = nnet\n", " self.args = args\n", " self.Qsa = {} # Stores Q values for s,a (as defined in the paper)\n", " self.Nsa = {} # Stores #times edge s,a was visited\n", " self.Ns = {} # Stores #times board s was visited\n", " self.Ps = {} # Stores initial policy (returned by neural net)\n", " self.Es = {} # Stores game.getGameEnded ended for board s\n", " self.Vs = {} # Stores game.getValidMoves for board s\n", "\n", " def search(self, canonicalBoard):\n", " \"\"\"\n", " Perform one iteration of MCTS.\n", "\n", " It is recursively called till a leaf node is found. The action chosen at\n", " each node is one that has the maximum upper confidence bound.\n", " Once a leaf node is found, the neural network is called to return an\n", " initial policy P and a value v for the state. This value is propagated\n", " up the search path. In case the leaf node is a terminal state, the\n", " outcome is propagated up the search path. The values of Ns, Nsa, Qsa are\n", " updated.\n", " NOTE: the return values are the negative of the value of the current\n", " state. This is done since v is in [-1,1] and if v is the value of a\n", " state for the current player, then its value is -v for the other player.\n", "\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", "\n", " Returns:\n", " v: Float\n", " The negative of the value of the current canonicalBoard\n", " \"\"\"\n", " s = self.game.stringRepresentation(canonicalBoard)\n", "\n", " if s not in self.Es:\n", " self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)\n", " if self.Es[s] != 0:\n", " # Terminal node\n", " return -self.Es[s]\n", "\n", " if s not in self.Ps:\n", " # Leaf node\n", " self.Ps[s], v = self.nnet.predict(canonicalBoard)\n", " valids = self.game.getValidMoves(canonicalBoard, 1)\n", " self.Ps[s] = self.Ps[s] * valids # Masking invalid moves\n", " sum_Ps_s = np.sum(self.Ps[s])\n", " if sum_Ps_s > 0:\n", " self.Ps[s] /= sum_Ps_s # Renormalize\n", " else:\n", " # If all valid moves were masked make all valid moves equally probable\n", " # NB! All valid moves may be masked if either your NNet architecture is\n", " # insufficient or you've get overfitting or something else.\n", " # If you have got dozens or hundreds of these messages you should\n", " # pay attention to your NNet and/or training process.\n", " log = logging.getLogger(__name__)\n", " log.error(\"All valid moves were masked, doing a workaround.\")\n", " self.Ps[s] = self.Ps[s] + valids\n", " self.Ps[s] /= np.sum(self.Ps[s])\n", "\n", " self.Vs[s] = valids\n", " self.Ns[s] = 0\n", "\n", " return -v\n", "\n", " valids = self.Vs[s]\n", " cur_best = -float('inf')\n", " best_act = -1\n", "\n", " ############################################################################\n", " ## TODO for students:\n", " # Implement the highest upper confidence bound depending whether we observed\n", " # the state-action pair which is stored in self.Qsa[(s, a)].\n", " # You can find the formula in the slide 52 in video 8 above.\n", " # Fill out function and remove\n", " raise NotImplementedError(\"Complete the for loop\")\n", " ############################################################################\n", " # Pick the action with the highest upper confidence bound\n", " for a in range(self.game.getActionSize()):\n", " if valids[a]:\n", " if (s, a) in self.Qsa:\n", " u = ... + ... * ... * math.sqrt(...) / (1 + ...)\n", " else:\n", " u = ... * ... * math.sqrt(... + 1e-8)\n", "\n", " if u > cur_best:\n", " cur_best = u\n", " best_act = a\n", "\n", " a = best_act\n", " next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)\n", " next_s = self.game.getCanonicalForm(next_s, next_player)\n", "\n", " v = self.search(next_s)\n", "\n", " if (s, a) in self.Qsa:\n", " self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)\n", " self.Nsa[(s, a)] += 1\n", "\n", " else:\n", " self.Qsa[(s, a)] = v\n", " self.Nsa[(s, a)] = 1\n", "\n", " self.Ns[s] += 1\n", " return -v\n", "\n", " def getNsa(self):\n", " return self.Nsa" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "execution": {} }, "source": [ "[*Click for solution*](https://github.com/NeuromatchAcademy/course-content-dl/tree/main/tutorials/W3D5_ReinforcementLearningForGamesAndDlThinking3/solutions/W3D5_Tutorial3_Solution_2aef50a6.py)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit your feedback\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Submit your feedback\n", "content_review(f\"{feedback_prefix}_MCTS_Planner_Exercise\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "---\n", "# Section 2: Use MCTS to play games\n", "\n", "*Time estimate: ~10 mins*\n" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "**Goal:** Learn how to use the results of MCTS to play games.\n", "\n", "**Exercise:**\n", "* Plug the MCTS planner into an agent.\n", "* Play games against other agents.\n", "* Explore the contributions of prior network, value function, number of simulations/time to play and explore/exploit parameters." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Video 2: Play with MCTS\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "remove-input" ] }, "outputs": [], "source": [ "# @title Video 2: Play with MCTS\n", "from ipywidgets import widgets\n", "from IPython.display import YouTubeVideo\n", "from IPython.display import IFrame\n", "from IPython.display import display\n", "\n", "\n", "class PlayVideo(IFrame):\n", " def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n", " self.id = id\n", " if source == 'Bilibili':\n", " src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n", " elif source == 'Osf':\n", " src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n", " super(PlayVideo, self).__init__(src, width, height, **kwargs)\n", "\n", "\n", "def display_videos(video_ids, W=400, H=300, fs=1):\n", " tab_contents = []\n", " for i, video_id in enumerate(video_ids):\n", " out = widgets.Output()\n", " with out:\n", " if video_ids[i][0] == 'Youtube':\n", " video = YouTubeVideo(id=video_ids[i][1], width=W,\n", " height=H, fs=fs, rel=0)\n", " print(f'Video available at https://youtube.com/watch?v={video.id}')\n", " else:\n", " video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n", " height=H, fs=fs, autoplay=False)\n", " if video_ids[i][0] == 'Bilibili':\n", " print(f'Video available at https://www.bilibili.com/video/{video.id}')\n", " elif video_ids[i][0] == 'Osf':\n", " print(f'Video available at https://osf.io/{video.id}')\n", " display(video)\n", " tab_contents.append(out)\n", " return tab_contents\n", "\n", "\n", "video_ids = [('Youtube', '1BRXb-igKAU'), ('Bilibili', 'BV1ng411M7Gz')]\n", "tab_contents = display_videos(video_ids, W=730, H=410)\n", "tabs = widgets.Tab()\n", "tabs.children = tab_contents\n", "for i in range(len(tab_contents)):\n", " tabs.set_title(i, video_ids[i][0])\n", "display(tabs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit your feedback\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Submit your feedback\n", "content_review(f\"{feedback_prefix}_Play_with_MCTS_Video\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "## Coding Exercise 2: Agent that uses an MCTS planner\n", "Now we can use the MCTS planner and play the game! We will again let the MCTS planner play against players with other policies." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "# Load MCTS model from the repository\n", "mcts_model_save_name = 'MCTS.pth.tar'\n", "path = \"nma_rl_games/alpha-zero/pretrained_models/models/\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "class MonteCarloTreeSearchBasedPlayer():\n", "\n", " def __init__(self, game, nnet, args):\n", " \"\"\"\n", " Args:\n", " game: OthelloGame instance\n", " Instance of the OthelloGame class above;\n", " nnet: OthelloNet instance\n", " Instance of the OthelloNNet class above;\n", " args: dictionary\n", " Instantiates number of iterations and episodes, controls temperature threshold, queue length,\n", " arena, checkpointing, and neural network parameters:\n", " learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,\n", " num_channels: 512\n", " \"\"\"\n", " self.game = game\n", " self.nnet = nnet\n", " self.args = args\n", " self.mcts = MCTS(game, nnet, args)\n", "\n", " def play(self, canonicalBoard, temp=1):\n", " \"\"\"\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", " temp: Integer\n", " Signifies if game is in terminal state\n", "\n", " Returns:\n", " List of probabilities for all actions if temp is 0\n", " Best action based on max probability otherwise\n", " \"\"\"\n", " for i in range(self.args.numMCTSSims):\n", "\n", " ##########################################################################\n", " ## TODO for students:\n", " # Run MCTS search function.\n", " # Fill out function and remove\n", " raise NotImplementedError(\"Plug the planner\")\n", " ##########################################################################\n", " ...\n", "\n", " s = self.game.stringRepresentation(canonicalBoard)\n", " ############################################################################\n", " ## TODO for students:\n", " # Call the Nsa function from MCTS class and store it in the self.Nsa\n", " # Fill out function and remove\n", " raise NotImplementedError(\"Compute Nsa (number of times edge s,a was visited)\")\n", " ############################################################################\n", " self.Nsa = ...\n", " self.counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 for a in range(self.game.getActionSize())]\n", "\n", " if temp == 0:\n", " bestAs = np.array(np.argwhere(self.counts == np.max(self.counts))).flatten()\n", " bestA = np.random.choice(bestAs)\n", " probs = [0] * len(self.counts)\n", " probs[bestA] = 1\n", " return probs\n", "\n", " self.counts = [x ** (1. / temp) for x in self.counts]\n", " self.counts_sum = float(sum(self.counts))\n", " probs = [x / self.counts_sum for x in self.counts]\n", " return np.argmax(probs)\n", "\n", " def getActionProb(self, canonicalBoard, temp=1):\n", " \"\"\"\n", " Args:\n", " canonicalBoard: np.ndarray\n", " Canonical Board of size n x n [6x6 in this case]\n", " temp: Integer\n", " Signifies if game is in terminal state\n", "\n", " Returns:\n", " action_probs: List\n", " Probability associated with corresponding action\n", " \"\"\"\n", " action_probs = np.zeros((self.game.getActionSize()))\n", " best_action = self.play(canonicalBoard)\n", " action_probs[best_action] = 1\n", "\n", " return action_probs\n", "\n", "\n", "set_seed(seed=SEED)\n", "game = OthelloGame(6)\n", "rp = RandomPlayer(game).play # All players\n", "num_games = 20 # Games\n", "n1 = NNet(game) # nnet players\n", "n1.load_checkpoint(folder=path, filename=mcts_model_save_name)\n", "args1 = dotdict({'numMCTSSims': 50, 'cpuct':1.0})\n", "\n", "## Uncomment below to check your agent!\n", "# print('\\n******MCTS player versus random player******')\n", "# mcts1 = MonteCarloTreeSearchBasedPlayer(game, n1, args1)\n", "# n1p = lambda x: np.argmax(mcts1.getActionProb(x, temp=0))\n", "# arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)\n", "# MCTS_result = arena.playGames(num_games, verbose=False)\n", "# print(f\"\\nNumber of games won by player1 = {MCTS_result[0]}, \"\n", "# f\"number of games won by player2 = {MCTS_result[1]}, out of {num_games} games\")\n", "# win_rate_player1 = MCTS_result[0]/num_games\n", "# print(f\"\\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "```\n", "Number of games won by player1 = 19, num of games won by player2 = 1, out of 20 games\n", "\n", "Win rate for player1 over 20 games: 95.0%\n", "```" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "execution": {} }, "source": [ "[*Click for solution*](https://github.com/NeuromatchAcademy/course-content-dl/tree/main/tutorials/W3D5_ReinforcementLearningForGamesAndDlThinking3/solutions/W3D5_Tutorial3_Solution_9d9515c8.py)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load in trained value and policy networks\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Load in trained value and policy networks\n", "model_save_name = 'ValueNetwork.pth.tar'\n", "path = \"nma_rl_games/alpha-zero/pretrained_models/models/\"\n", "set_seed(seed=SEED)\n", "game = OthelloGame(6)\n", "vnet = ValueNetwork(game)\n", "vnet.load_checkpoint(folder=path, filename=model_save_name)\n", "\n", "\n", "model_save_name = 'PolicyNetwork.pth.tar'\n", "path = \"nma_rl_games/alpha-zero/pretrained_models/models/\"\n", "set_seed(seed=SEED)\n", "game = OthelloGame(6)\n", "pnet = PolicyNetwork(game)\n", "pnet.load_checkpoint(folder=path, filename=model_save_name)\n", "\n", "\n", "# Alternative if the downloading of trained model didn't work (will train the model)\n", "if not os.listdir('nma_rl_games/alpha-zero/pretrained_models/models/'):\n", " path = \"nma_rl_games/alpha-zero/pretrained_models/data/\"\n", " loaded_games = loadTrainExamples(folder=path, filename='checkpoint_1.pth.tar')\n", "\n", " set_seed(seed=SEED)\n", " game = OthelloGame(6)\n", " vnet = ValueNetwork(game)\n", " vnet.train(loaded_games)\n", "\n", " set_seed(seed=SEED)\n", " game = OthelloGame(6)\n", " pnet = PolicyNetwork(game)\n", " pnet.train(loaded_games)" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "### MCTS player against Value-based player" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "print('\\n******MCTS player versus value-based player******')\n", "set_seed(seed=SEED)\n", "vp = ValueBasedPlayer(game, vnet).play # Value-based player\n", "arena = Arena.Arena(n1p, vp, game, display=OthelloGame.display)\n", "MC_result = arena.playGames(num_games, verbose=False)\n", "\n", "print(f\"\\nNumber of games won by player1 = {MC_result[0]}, \"\n", " f\"number of games won by player2 = {MC_result[1]}, out of {num_games} games\")\n", "win_rate_player1 = MC_result[0]/num_games\n", "print(f\"\\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "```\n", "Number of games won by player1 = 17, number of games won by player2 = 3, out of 20 games\n", "\n", "Win rate for player1 over 20 games: 85.0%\n", "```" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "### MCTS player against Policy-based player" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "print('\\n******MCTS player versus policy-based player******')\n", "set_seed(seed=SEED)\n", "pp = PolicyBasedPlayer(game, pnet).play # Policy-based player\n", "arena = Arena.Arena(n1p, pp, game, display=OthelloGame.display)\n", "MC_result = arena.playGames(num_games, verbose=False)\n", "\n", "print(f\"\\nNumber of games won by player1 = {MC_result[0]}, \"\n", " f\"number of games won by player2 = {MC_result[1]}, out of {num_games} games\")\n", "win_rate_player1 = MC_result[0]/num_games\n", "print(f\"\\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "```\n", "Number of games won by player1 = 20, number of games won by player2 = 0, out of 20 games\n", "\n", "Win rate for player1 over 20 games: 100.0%\n", "```" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "### MCTS player against Monte-Carlo player" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "mc_model_save_name = 'MC.pth.tar'\n", "path = \"nma_rl_games/alpha-zero/pretrained_models/models/\"\n", "\n", "n2 = NNet(game) # nNet players\n", "n2.load_checkpoint(folder=path, filename=mc_model_save_name)\n", "args2 = dotdict({'numMCsims': 10, 'maxRollouts':5, 'maxDepth':5, 'mc_topk': 3})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": {} }, "outputs": [], "source": [ "print('\\n******MCTS player versus MC player******')\n", "set_seed(seed=SEED)\n", "mc = MonteCarloBasedPlayer(game, n2, args2)\n", "n2p = lambda x: np.argmax(mc.getActionProb(x))\n", "arena = Arena.Arena(n1p, n2p, game, display=OthelloGame.display)\n", "MC_result = arena.playGames(num_games, verbose=False)\n", "\n", "print(f\"\\nNumber of games won by player1 = {MC_result[0]}, \"\n", " f\"number of games won by player2 = {MC_result[1]}, out of {num_games} games\")\n", "win_rate_player1 = MC_result[0]/num_games\n", "print(f\"\\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "```\n", "Number of games won by player1 = 16, number of games won by player2 = 4, out of 20 games\n", "\n", "Win rate for player1 over 20 games: 80.0%\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Submit your feedback\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "execution": {}, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# @title Submit your feedback\n", "content_review(f\"{feedback_prefix}_Play_Games_MCTS_Exercise\")" ] }, { "cell_type": "markdown", "metadata": { "execution": {} }, "source": [ "---\n", "# Summary\n", "\n", "In this tutorial, you have learned about players with Monte Carlo Tree Search planner and compared them to random, value-based, policy-based, and Monte-Carlo players." ] } ], "metadata": { "accelerator": "GPU", "colab": { "collapsed_sections": [], "include_colab_link": true, "name": "W3D5_Tutorial3", "provenance": [], "toc_visible": true }, "kernel": { "display_name": "Python 3", "language": "python", "name": "python3" }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.10" } }, "nbformat": 4, "nbformat_minor": 0 }