From a093060b0e8a787e51212b5f2879dc839605da65 Mon Sep 17 00:00:00 2001 From: Matt Strapp Date: Mon, 26 Apr 2021 17:12:01 -0500 Subject: Revert "Refactor jsut about everything" This reverts commit e58a60ed18bde5db28ba96910df518a61b3999b2. --- dotsandboxes/agents/agent_AB.py | 57 -------- dotsandboxes/agents/agent_MCTS.py | 55 -------- dotsandboxes/agents/agent_random.py | 212 ---------------------------- dotsandboxes/agents/algorithms/MCTS.py | 151 -------------------- dotsandboxes/agents/algorithms/alphaBeta.py | 105 -------------- dotsandboxes/agents/algorithms/ann.py | 170 ---------------------- 6 files changed, 750 deletions(-) delete mode 100644 dotsandboxes/agents/agent_AB.py delete mode 100644 dotsandboxes/agents/agent_MCTS.py delete mode 100644 dotsandboxes/agents/agent_random.py delete mode 100644 dotsandboxes/agents/algorithms/MCTS.py delete mode 100644 dotsandboxes/agents/algorithms/alphaBeta.py delete mode 100644 dotsandboxes/agents/algorithms/ann.py (limited to 'dotsandboxes/agents') diff --git a/dotsandboxes/agents/agent_AB.py b/dotsandboxes/agents/agent_AB.py deleted file mode 100644 index 5564f11..0000000 --- a/dotsandboxes/agents/agent_AB.py +++ /dev/null @@ -1,57 +0,0 @@ -from python.alphaBeta import AlphaBeta -import dotsandboxes.dotsandboxesagent as dba - -import sys -import argparse -import logging -from GameState import GameState, DotsAndBoxesState -import alphaBeta - - -logger = logging.getLogger(__name__) -games = {} -agentclass = dba - - -class Agent(dba.DotsAndBoxesAgent): - def __init__(self, player, nb_rows, nb_cols, timelimit): - super(Agent, self).__init__(player, nb_rows, nb_cols, timelimit) - self.GameStateClass = DotsAndBoxesState - self.game_state = self.GameStateClass(nb_rows, nb_cols, player) - self.controller = AlphaBeta() - - def register_action(self, row, column, orientation, player): - super(Agent, self).register_action(row, column, orientation, player) - # adjust agent specific board representation - move = (row, column, orientation) - self.game_state.play_move(move) - - def next_action(self): - r, c, o = self.controller.get_next_move(self.game_state, time_allowed=self.timelimit) - return r, c, o - - def end_game(self): - super(Agent, self).end_game() - - -# Adapted from provided code -def main(argv=None): - global agentclass - parser = argparse.ArgumentParser(description='Start agent to play Dots and Boxes') - parser.add_argument('--verbose', '-v', action='count', default=0, help='Verbose output') - parser.add_argument('--quiet', '-q', action='count', default=0, help='Quiet output') - parser.add_argument('port', metavar='PORT', type=int, help='Port to use for server') - args = parser.parse_args(argv) - print(args) - - logger.setLevel(max(logging.INFO - 10 * (args.verbose - args.quiet), logging.DEBUG)) - logger.addHandler(logging.StreamHandler(sys.stdout)) - - agentclass = Agent - dba.agentclass = Agent - dba.start_server(args.port) - - -if __name__ == "__main__": - sys.exit(main()) - diff --git a/dotsandboxes/agents/agent_MCTS.py b/dotsandboxes/agents/agent_MCTS.py deleted file mode 100644 index b60f5ec..0000000 --- a/dotsandboxes/agents/agent_MCTS.py +++ /dev/null @@ -1,55 +0,0 @@ -import dotsandboxes.dotsandboxesagent as dba - -import sys -import argparse -import logging -from GameState import GameState, DotsAndBoxesState -from MCTS import MCTSNode, MCTSGameController - -logger = logging.getLogger(__name__) -games = {} -agentclass = dba - - -class Agent(dba.DotsAndBoxesAgent): - def __init__(self, player, nb_rows, nb_cols, timelimit): - super(Agent, self).__init__(player, nb_rows, nb_cols, timelimit) - self.GameStateClass = DotsAndBoxesState - self.game_state = self.GameStateClass(nb_rows, nb_cols, player) - self.controller = MCTSGameController() - - def register_action(self, row, column, orientation, player): - super(Agent, self).register_action(row, column, orientation, player) - # adjust agent specific board representation - move = (row, column, orientation) - self.game_state.play_move(move) - - def next_action(self): - r, c, o = self.controller.get_next_move(self.game_state, time_allowed=self.timelimit) - return r, c, o - - def end_game(self): - super(Agent, self).end_game() - - -# Adapted from provided code -def main(argv=None): - global agentclass - parser = argparse.ArgumentParser(description='Start agent to play Dots and Boxes') - parser.add_argument('--verbose', '-v', action='count', default=0, help='Verbose output') - parser.add_argument('--quiet', '-q', action='count', default=0, help='Quiet output') - parser.add_argument('port', metavar='PORT', type=int, help='Port to use for server') - args = parser.parse_args(argv) - print(args) - - logger.setLevel(max(logging.INFO - 10 * (args.verbose - args.quiet), logging.DEBUG)) - logger.addHandler(logging.StreamHandler(sys.stdout)) - - agentclass = Agent - dba.agentclass = Agent - dba.start_server(args.port) - - -if __name__ == "__main__": - sys.exit(main()) - diff --git a/dotsandboxes/agents/agent_random.py b/dotsandboxes/agents/agent_random.py deleted file mode 100644 index abf677b..0000000 --- a/dotsandboxes/agents/agent_random.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python3 -# encoding: utf-8 -""" -dotsandboxesagent.py - -Template for the Machine Learning Project course at KU Leuven (2017-2018) -of Hendrik Blockeel and Wannes Meert. - -Copyright (c) 2018 KU Leuven. All rights reserved. -""" -import sys -import argparse -import logging -import asyncio -import websockets -import json -from collections import defaultdict -import random - -logger = logging.getLogger(__name__) -games = {} -agentclass = None - - -class DotsAndBoxesAgent: - """Example Dots and Boxes agent implementation base class. - It returns a random next move. - - A DotsAndBoxesAgent object should implement the following methods: - - __init__ - - add_player - - register_action - - next_action - - end_game - - This class does not necessarily use the best data structures for the - approach you want to use. - """ - def __init__(self, player, nb_rows, nb_cols, timelimit): - """Create Dots and Boxes agent. - - :param player: Player number, 1 or 2 - :param nb_rows: Rows in grid - :param nb_cols: Columns in grid - :param timelimit: Maximum time allowed to send a next action. - """ - self.player = {player} - self.timelimit = timelimit - self.ended = False - self.nb_rows = nb_rows - self.nb_cols = nb_cols - rows = [] - for ri in range(nb_rows + 1): - columns = [] - for ci in range(nb_cols + 1): - columns.append({"v": 0, "h": 0}) - rows.append(columns) - self.cells = rows - - def add_player(self, player): - """Use the same agent for multiple players.""" - self.player.add(player) - - def register_action(self, row, column, orientation, player): - """Register action played in game. - - :param row: - :param columns: - :param orientation: "v" or "h" - :param player: 1 or 2 - """ - self.cells[row][column][orientation] = player - - def next_action(self): - """Return the next action this agent wants to perform. - - In this example, the function implements a random move. Replace this - function with your own approach. - - :return: (row, column, orientation) - """ - logger.info("Computing next move (grid={}x{}, player={})"\ - .format(self.nb_rows, self.nb_cols, self.player)) - # Random move - free_lines = [] - for ri in range(len(self.cells)): - row = self.cells[ri] - for ci in range(len(row)): - cell = row[ci] - if ri < (len(self.cells) - 1) and cell["v"] == 0: - free_lines.append((ri, ci, "v")) - if ci < (len(row) - 1) and cell["h"] == 0: - free_lines.append((ri, ci, "h")) - if len(free_lines) == 0: - # Board full - return None - movei = random.randint(0, len(free_lines) - 1) - r, c, o = free_lines[movei] - return r, c, o - - def end_game(self): - self.ended = True - - -## MAIN EVENT LOOP - -async def handler(websocket, path): - logger.info("Start listening") - game = None - # msg = await websocket.recv() - try: - async for msg in websocket: - logger.info("< {}".format(msg)) - try: - msg = json.loads(msg) - except json.decoder.JSONDecodeError as err: - logger.error(err) - return False - game = msg["game"] - answer = None - if msg["type"] == "start": - # Initialize game - if msg["game"] in games: - games[msg["game"]].add_player(msg["player"]) - else: - nb_rows, nb_cols = msg["grid"] - games[msg["game"]] = agentclass(msg["player"], - nb_rows, - nb_cols, - msg["timelimit"]) - if msg["player"] == 1: - # Start the game - nm = games[game].next_action() - print('nm = {}'.format(nm)) - if nm is None: - # Game over - logger.info("Game over") - continue - r, c, o = nm - answer = { - 'type': 'action', - 'location': [r, c], - 'orientation': o - } - else: - # Wait for the opponent - answer = None - - elif msg["type"] == "action": - # An action has been played - r, c = msg["location"] - o = msg["orientation"] - games[game].register_action(r, c, o, msg["player"]) - if msg["nextplayer"] in games[game].player: - # Compute your move - nm = games[game].next_action() - if nm is None: - # Game over - logger.info("Game over") - continue - nr, nc, no = nm - answer = { - 'type': 'action', - 'location': [nr, nc], - 'orientation': no - } - else: - answer = None - - elif msg["type"] == "end": - # End the game - games[msg["game"]].end_game() - answer = None - else: - logger.error("Unknown message type:\n{}".format(msg)) - - if answer is not None: - print(answer) - await websocket.send(json.dumps(answer)) - logger.info("> {}".format(answer)) - except websockets.exceptions.ConnectionClosed as err: - logger.info("Connection closed") - logger.info("Exit handler") - - -def start_server(port): - server = websockets.serve(handler, 'localhost', port) - print("Running on ws://127.0.0.1:{}".format(port)) - asyncio.get_event_loop().run_until_complete(server) - asyncio.get_event_loop().run_forever() - - -## COMMAND LINE INTERFACE - -def main(argv=None): - global agentclass - parser = argparse.ArgumentParser(description='Start agent to play Dots and Boxes') - parser.add_argument('--verbose', '-v', action='count', default=0, help='Verbose output') - parser.add_argument('--quiet', '-q', action='count', default=0, help='Quiet output') - parser.add_argument('port', metavar='PORT', type=int, help='Port to use for server') - args = parser.parse_args(argv) - - logger.setLevel(max(logging.INFO - 10 * (args.verbose - args.quiet), logging.DEBUG)) - logger.addHandler(logging.StreamHandler(sys.stdout)) - - agentclass = DotsAndBoxesAgent - start_server(args.port) - - -if __name__ == "__main__": - sys.exit(main()) - diff --git a/dotsandboxes/agents/algorithms/MCTS.py b/dotsandboxes/agents/algorithms/MCTS.py deleted file mode 100644 index 6c71ba9..0000000 --- a/dotsandboxes/agents/algorithms/MCTS.py +++ /dev/null @@ -1,151 +0,0 @@ -import math -from copy import deepcopy -from time import perf_counter -from random import choice - -from GameState import GameState - -# Based on https://github.com/DieterBuys/mcts-player/ - -class GameController(object): - def get_next_move(self, state): - # when you get a new move, it is assumed that the game is not ended yet - assert state.get_moves() - - -class MCTSNode(object): - """Monte Carlo Tree Node. - Each node encapsulates a particular game state, the moves that - are possible from that state and the strategic information accumulated - by the tree search as it progressively samples the game space. - """ - - def __init__(self, state, parent=None, move=None): - self.parent = parent - self.move = move - self.state = state - - self.plays = 0 - self.score = 0 - - self.pending_moves = state.get_moves() - self.children = [] - - def select_child_ucb(self): - # Note that each node's plays count is equal - # to the sum of its children's plays - def ucb(child): - win_ratio = child.score / child.plays \ - + math.sqrt(2 * math.log(self.plays) / child.plays) - return win_ratio - - return max(self.children, key=ucb) - - def expand_move(self, move): - self.pending_moves.remove(move) # raises KeyError - - child_state = deepcopy(self.state) - child_state.play_move(move) - - child = MCTSNode(state=child_state, parent=self, move=move) - self.children.append(child) - return child - - def get_score(self, result): - # return result - if result == 0.5: - return result - - if self.state.player == 2: - if self.state.next_turn_player == result: - return 0.0 - else: - return 1.0 - else: - if self.state.next_turn_player == result: - return 1.0 - else: - return 0.0 - - if self.state.next_turn_player == result: - return 0.0 - else: - return 1.0 - - def __repr__(self): - s = 'ROOT\n' if self.parent is None else '' - - children_moves = [c.move for c in self.children] - - s += """Score ratio: {score} / {plays} -Pending moves: {pending_moves} -Children's moves: {children_moves} -State: -{state}\n""".format(children_moves=children_moves, **self.__dict__) - - return s - - -class MCTSGameController(GameController): - """Game controller that uses MCTS to determine the next move. - This is the class which implements the Monte Carlo Tree Search algorithm. - It builds a game tree of MCTSNodes and samples the game space until a set - time has elapsed. - """ - - def select(self): - node = self.root_node - - # Descend until we find a node that has pending moves, or is terminal - while node.pending_moves == set() and node.children != []: - node = node.select_child_ucb() - - return node - - def expand(self, node): - assert node.pending_moves != set() - - move = choice(tuple(node.pending_moves)) - return node.expand_move(move) - - def simulate(self, state, max_iterations=1000): - state = deepcopy(state) - - move = state.get_random_move() - while move is not None: - state.play_move(move) - move = state.get_random_move() - - max_iterations -= 1 - if max_iterations <= 0: - return 0.5 # raise exception? (game too deep to simulate) - - return state.game_result - - def update(self, node, result): - while node is not None: - node.plays += 1 - node.score += node.get_score(result) - node = node.parent - - def get_next_move(self, state, time_allowed=1.0): - super(MCTSGameController, self).get_next_move(state) - - # Create new tree (TODO: Preserve some state for better performance?) - self.root_node = MCTSNode(state) - iterations = 0 - - start_time = perf_counter() - while perf_counter() < start_time + time_allowed: - node = self.select() - - if node.pending_moves != set(): - node = self.expand(node) - - result = self.simulate(node.state) - self.update(node, result) - - iterations += 1 - - # Return most visited node's move - return max(self.root_node.children, key=lambda n: n.plays).move diff --git a/dotsandboxes/agents/algorithms/alphaBeta.py b/dotsandboxes/agents/algorithms/alphaBeta.py deleted file mode 100644 index 8e041fe..0000000 --- a/dotsandboxes/agents/algorithms/alphaBeta.py +++ /dev/null @@ -1,105 +0,0 @@ -from GameState import GameState -class GameController(object): - def get_next_move(self, state): - # when you get a new move, it is assumed that the game is not ended yet - assert state.get_moves() - - -def alpha_beta(node, alpha, beta): - - # Based on https://en.wikipedia.org/wiki/Alpha%E2%80%93beta_pruning#Pseudocode - # node needs to support three operations: isTerminal(), value(), getChildren(), maximizingPlayer() - - if node.isTerminal(): - return node.value() - - if node.maximizingPlayer(): - - v = float("-inf") - for child in node.getChildren(): - - v = max(v, alpha_beta(child, alpha, beta)) - alpha = max(alpha, v) - if beta <= alpha: - break - - else: - - v = float("inf") - for child in node.getChildren(): - - v = min(v, alpha_beta(child, alpha, beta)) - beta = min(beta, v) - if beta <= alpha: - break - - return v - - -# A class for defining algorithms used (minimax and alpha-beta pruning) -class AlphaBeta: - - def miniMax(State, Ply_num): # Function for the minimax algorithm - - for i in range(State.Current.dimY): - for j in range(State.Current.dimX): - if State.Current.Mat[i][j] == ' ' and (j, i) not in State.children: - State.Make(j, i, True) - if Ply_num < 2: - return (i, j) - - Minimum_Score = 1000 - i = 0 - - j = 0 - for k, z in State.children.items(): - Result = Algo.Maximum(z, Ply_num - 1, Minimum_Score) - if Minimum_Score > Result: - Minimum_Score = Result - i = k[0] - j = k[1] - - return (i, j) - - # Alpha-beta pruning function for taking care of Alpha values - def Maximum(State, Ply_num, Alpha): - if Ply_num == 0: - return State.CurrentScore - - for i in range(State.Current.dimY): - for j in range(State.Current.dimX): - if State.Current.Mat[i][j] == ' ' and (j, i) not in State.children: - State.Make(j, i, False) - - Maximum_Score = -1000 - i = 0 - j = 0 - for k, z in State.children.items(): - Result = Algo.Minimum(z, Ply_num - 1, Maximum_Score) - if Maximum_Score < Result: - Maximum_Score = Result - if Result > Alpha: - return Result - - return Maximum_Score - - def Minimum(State, Ply_num, Beta): # Alpha-beta pruning function for taking care of Beta values - if Ply_num == 0: - return State.CurrentScore - - for i in range(State.Current.dimY): - for j in range(State.Current.dimX): - if State.Current.Mat[i][j] == ' ' and (j, i) not in State.children: - State.Make(j, i, True) - - Minimum_Score = 1000 - i = 0 - j = 0 - for k, z in State.children.items(): - Result = Algo.Maximum(z, Ply_num - 1, Minimum_Score) - if Minimum_Score > Result: - Minimum_Score = Result - if Result < Beta: - return Result - - return Minimum_Score diff --git a/dotsandboxes/agents/algorithms/ann.py b/dotsandboxes/agents/algorithms/ann.py deleted file mode 100644 index 05ae647..0000000 --- a/dotsandboxes/agents/algorithms/ann.py +++ /dev/null @@ -1,170 +0,0 @@ -from numpy import * -from math import sqrt -from copy import deepcopy -from time import time - -class ANN: - - """ANN with one hidden layer, one output and full connections in between consecutive layers. - Initial weights are chosen from a normal distribution. - Activation function is tanh.""" - - INIT_SIGMA = 0.02 - REL_STOP_MARGIN = 0.01 - MAX_ITERATIONS = 1000000 - ACTIVATION = tanh - D_ACTIVATION = lambda x: 1 - tanh(x)**2 # Derivative of tanh - VEC_ACTIVATION = vectorize(ACTIVATION) - VEC_D_ACTIVATION = vectorize(D_ACTIVATION) - STEP_SIZE = 0.1 - - def __init__(self, input_size, hidden_size): - - #self.input_size = input_size - #self.hidden_size = hidden_size - self.hidden_weights = random.normal(0, ANN.INIT_SIGMA, (hidden_size, input_size)) - self.output_weights = random.normal(0, ANN.INIT_SIGMA, hidden_size) - - def get_weights(self): - return self.hidden_weights, self.output_weights - - def predict(self, input_vector): - - # Predicts the output for this input vector - # input_vector will be normalized - - input_vector = input_vector/linalg.norm(input_vector) - return ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector)))) - - @staticmethod - def frob_norm(a, b): - - # Calculates the total Frobenius norm of both matrices A and B - return sqrt(linalg.norm(a)**2 + linalg.norm(b)**2) - - def train(self, examples): - - #print("Training") - start = time() - - # examples is a list of (input, output)-tuples - # input will be normalized - # We stop when the weights have converged within some relative margin - - for example in examples: - example[0] = example[0]/linalg.norm(example[0]) - - iteration = 0 - while True: - - - # Store old weights to check for convergence later - prev_hidden_weights = deepcopy(self.hidden_weights) - prev_output_weights = deepcopy(self.output_weights) - - for k in range(len(examples)): - - input_vector, output = examples[k] - - # Calculate outputs - hidden_input = dot(self.hidden_weights, input_vector) - hidden_output = ANN.VEC_ACTIVATION(hidden_input) - final_input = dot(self.output_weights, hidden_output) - predicted_output = ANN.ACTIVATION(final_input) - - #print("Output:", output) - #print("Predicted output:", predicted_output) - - # Used in calculations - prediction_error = output - predicted_output - output_derivative = ANN.D_ACTIVATION(final_input) - - # Adjust output weights and calculate requested hidden change - requested_hidden_change = prediction_error*output_derivative*self.output_weights - self.output_weights = self.output_weights + ANN.STEP_SIZE*prediction_error*hidden_output - - #print("After adjusting output weights:", ANN.ACTIVATION(dot(self.output_weights, hidden_output))) - - # Backpropagate requested hidden change to adjust hidden weights - self.hidden_weights = self.hidden_weights + ANN.STEP_SIZE*outer(requested_hidden_change*(ANN.VEC_D_ACTIVATION(hidden_input)), input_vector) - - #print("After adjusting hidden weights:", ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector))))) - - # Check stop criteria - iteration += 1 - if iteration >= ANN.MAX_ITERATIONS: - break - - # Check stop criteria - if iteration >= ANN.MAX_ITERATIONS: - break - diff = ANN.frob_norm(self.hidden_weights - prev_hidden_weights, self.output_weights - prev_output_weights) - base = ANN.frob_norm(self.hidden_weights, self.output_weights) - #if base > 0 and diff/base < ANN.REL_STOP_MARGIN: - # break - - print(time() - start) - print("Stopped training after %s iterations."%iteration) - -# TESTING - -def print_difference(ann1, ann2): - - # Prints the differences in weights in between two ANN's with identical topology - - hidden_weights1, output_weights1 = ann1.get_weights() - hidden_weights2, output_weights2 = ann2.get_weights() - hidden_diff = hidden_weights1 - hidden_weights2 - output_diff = output_weights1 - output_weights2 - - print(hidden_diff) - print(output_diff) - print("Frobenius norms:") - print("Hidden weights difference:", linalg.norm(hidden_diff)) - print("Output weights difference:", linalg.norm(output_diff)) - print("Both:", ANN.frob_norm(hidden_diff, output_diff)) - -def RMSE(ann, examples): - - total = 0 - for input_vector, output in examples: - total += (output - ann.predict(input_vector))**2 - return sqrt(total/len(examples)) - -def generate_examples(amount, input_size, evaluate): - # evaluate is a function mapping an input vector onto a numerical value - examples = [] - inputs = random.normal(0, 100, (amount, input_size)) - for i in range(amount): - input_vector = inputs[i] - examples.append([input_vector, evaluate(input_vector)]) - return examples - -def test(): - - # Test the ANN by having it model another ANN with identical topology but unknown weights - - input_size = 5 - hidden_size = 3 - real = ANN(input_size, hidden_size) - model = ANN(input_size, hidden_size) - - # Generate training data - training_data = generate_examples(10000, input_size, real.predict) - validation_data = generate_examples(10000, input_size, real.predict) - - # Print initial difference, train, then print new difference - print("Initial difference:") - print_difference(real, model) - print("Initial RMSE (on training data):", RMSE(model, training_data)) - print("Initial RMSE (on validation data):", RMSE(model, validation_data)) - model.train(training_data) - print("After training:") - print_difference(real, model) - print("After training RMSE (on training data):", RMSE(model, training_data)) - print("After training RMSE (on validation data):", RMSE(model, validation_data)) - -if __name__ == "__main__": - test() - - -- cgit v1.2.3