diff --git a/src/poprank/functional/__init__.py b/src/poprank/functional/__init__.py index 2889ef0..9242680 100644 --- a/src/poprank/functional/__init__.py +++ b/src/poprank/functional/__init__.py @@ -6,6 +6,7 @@ bradleyterry_with_context_draw ) from .elo import elo +from .bayeselo import bayeselo from .glicko import glicko, glicko2 from .melo import mElo from .nashavg import nashavg diff --git a/src/poprank/functional/_bayeselo/core.py b/src/poprank/functional/_bayeselo/core.py new file mode 100644 index 0000000..4ecd706 --- /dev/null +++ b/src/poprank/functional/_bayeselo/core.py @@ -0,0 +1,238 @@ +from math import log + +from poprank.rates import EloRate +from .data import PopulationPairwiseStatistics, PairwiseStatistics + + +class BayesEloRating: + """Rates players by calculating their new elo using a bayeselo approach + Given a set of interactions and initial elo ratings, uses a + Minorization-Maximization algorithm to estimate maximum-likelihood + ratings. + Made to imitate https://www.remi-coulom.fr/Bayesian-Elo/ + + Args: + pairwise_stats (PopulationPairwisestatistics): The summary + of all interactions between players + elos (list[EloRate]): The ititial ratings of the players + elo_advantage (float, optional): The home-field-advantage + expressed as rating points. Defaults to 32.8. + elo_draw (float, optional): The probability of drawing. + Defaults to 97.3. + base (float, optional): The base of the exponent in the elo + formula. Defaults to 10.0 + spread (float, optional): The divisor of the exponent in the elo + formula. Defaults to 400.0. + home_field_bias (float, optional): _description_. Defaults to 0.0. + draw_bias (float, optional): _description_. Defaults to 0.0. + + Methods: + update_ratings(self) -> None: Performs one iteration of the + Minorization-Maximization algorithm + update_home_field_bias(self) -> float: Use interaction statistics + to update the home_field_bias automatically + update_draw_bias(self) -> float: Use interaction statistics to + update the draw_bias automatically + compute_difference(self, ratings: "list[float]", + next_ratings: "list[float]") -> float: Compute the impact of + the current interation on ratings + minorize_maximize(self, learn_home_field_bias: bool, + home_field_bias: float, learn_draw_bias: bool, + draw_bias: float, iterations: int, tolerance: float + ) -> None: Perform the MM algorithm for generalized + Bradley-Terry models. + """ + + def __init__( + self, pairwise_stats: PopulationPairwiseStatistics, + elos: "list[EloRate]", elo_advantage: float = 32.8, + elo_draw: float = 97.3, base=10., spread=400., + home_field_bias=0.0, draw_bias=0.0 + ): + + # Condensed results + self.pairwise_stats: PopulationPairwiseStatistics = pairwise_stats + self.elos = elos # Players elos + self.elo_advantage = elo_advantage # advantage of playing white + self.elo_draw = elo_draw # likelihood of drawing + self.ratings = [0. for x in range(pairwise_stats.num_players)] + self.next_ratings = [0. for x in range(pairwise_stats.num_players)] + self.base = base + self.spread = spread + self.home_field_bias: float = home_field_bias + self.draw_bias: float = draw_bias + + def update_ratings(self) -> None: + """Performs one iteration of the Minorization-Maximization algorithm""" + for player in range(self.pairwise_stats.num_players-1, -1, -1): + A: float = 0.0 + B: float = 0.0 + + for opponent in range( + self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): + result: PairwiseStatistics = \ + self.pairwise_stats.statistics[player][opponent] + + if result.opponent_idx > player: + opponent_rating = self.next_ratings[result.opponent_idx] + else: + opponent_rating = self.ratings[result.opponent_idx] + + A += result.w_ij + result.d_ij + result.l_ji + result.d_ji + + B += ((result.d_ij + result.w_ij) * self.home_field_bias / + (self.home_field_bias * self.ratings[player] + + self.draw_bias * opponent_rating) + + (result.d_ij + result.l_ij) * self.draw_bias * + self.home_field_bias / + (self.draw_bias * self.home_field_bias * + self.ratings[player] + + opponent_rating) + + (result.d_ji + result.w_ji) * self.draw_bias / + (self.home_field_bias * opponent_rating + + self.draw_bias * self.ratings[player]) + + (result.d_ji + result.l_ji) / + (self.draw_bias * self.home_field_bias * + opponent_rating + + self.ratings[player])) + + self.next_ratings[player] = A / B + + self.ratings, self.next_ratings = self.next_ratings, self.ratings + + def update_home_field_bias(self) -> float: + """Use interaction statistics to update the home_field_bias + automatically""" + numerator: float = 0. + denominator: float = 0. + + for player in range(self.pairwise_stats.num_players-1, -1, -1): + for opponent in range( + self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): + result = self.pairwise_stats.statistics[player][opponent] + opponent_rating = self.ratings[result.opponent_idx] + + numerator += result.w_ij + result.d_ij + denominator += ((result.d_ij + result.w_ij) * + self.ratings[player] / + (self.home_field_bias * self.ratings[player] + + self.draw_bias * opponent_rating) + + (result.d_ij + result.l_ij) * self.draw_bias * + self.ratings[player] / + (self.draw_bias * self.home_field_bias * + self.ratings[player] + opponent_rating)) + + return numerator / denominator + + def update_draw_bias(self) -> float: + """Use interaction statistics to update the draw_bias automatically""" + numerator: float = 0. + denominator: float = 0. + + for player in range(self.pairwise_stats.num_players-1, -1, -1): + for opponent in range( + self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): + result = self.pairwise_stats.statistics[player][opponent] + opponent_rating = self.ratings[result.opponent_idx] + + numerator += result.d_ij + denominator += ((result.d_ij + result.w_ij) * opponent_rating / + (self.home_field_bias * self.ratings[player] + + self.draw_bias * opponent_rating) + + (result.d_ij + result.l_ij) * + self.home_field_bias * + self.ratings[player] / + (self.draw_bias * self.home_field_bias * + self.ratings[player] + opponent_rating)) + + c: float = numerator / denominator + return c + (c * c + 1)**0.5 + + def compute_difference(self, ratings: "list[float]", + next_ratings: "list[float]") -> float: + """Compute the impact of the current interation on ratings""" + return max([abs(a-b)/(a+b) for a, b in zip(ratings, next_ratings)]) + + def minorize_maximize( + self, + learn_home_field_bias: bool = False, + home_field_bias: float = 1., + learn_draw_bias: bool = False, + draw_bias: float = 1., + iterations: int = 10000, + tolerance: float = 1e-5, + ) -> None: + """Perform the MM algorithm for generalized Bradley-Terry models. + + The Minorization-Maximization algorithm is performed for the number of + specified iterations or until the changes are below the tolerance + value, whichever comes first. + Args: + use_home_field_bias (bool, optional): _description_. Defaults to + False. + home_field_bias (float, optional): _description_. Defaults to 1.0. + learn_draw_bias (bool, optional): _description_. Defaults to False. + draw_bias (float, optional): _description_. Defaults to 1.0. + iterations (int, optional): _description_. Defaults to 10000. + tolerance (float, optional): _description_. Defaults to 1e-5. + """ + + # Set initial values + self.home_field_bias = home_field_bias + self.draw_bias = draw_bias + self.ratings = [1. for p in range(self.pairwise_stats.num_players)] + + # Main MM loop + for player in range(iterations): + self.update_ratings() + diff = self.compute_difference(self.ratings, self.next_ratings) + + if learn_home_field_bias: + new_home_field_bias = self.update_home_field_bias() + home_field_bias_diff = \ + abs(self.home_field_bias - new_home_field_bias) + if home_field_bias_diff > diff: + diff = home_field_bias_diff + self.home_field_bias = new_home_field_bias + + if learn_draw_bias: + new_draw_bias = self.update_draw_bias() + draw_bias_diff = abs(self.draw_bias - new_draw_bias) + if draw_bias_diff > diff: + diff = draw_bias_diff + self.draw_bias = new_draw_bias + + if diff < tolerance: + break + + # Convert back to Elos + total: float = \ + sum([log(self.ratings[player], self.base) * self.spread + for player in range(self.pairwise_stats.num_players)]) + + offset: float = -total / self.pairwise_stats.num_players + + for player in range(self.pairwise_stats.num_players-1, -1, -1): + self.elos[player].mu = log( + self.ratings[player], self.base) * self.spread + offset + + if learn_home_field_bias: + self.elo_advantage = \ + log(self.home_field_bias, self.base) * self.spread + if learn_draw_bias: + self.elo_draw = log(self.draw_bias, self.base) * self.spread + + def rescale_elos(self) -> None: + """Rescales the elos by a common factor""" + # EloScale # TODO: Figure out what on earth that is + for i, e in enumerate(self.elos): + x: float = e.base**(-self.elo_draw/e.spread) + elo_scale: float = x * 4.0 / ((1 + x) ** 2) + tmp_base: float = self.elos[i].base + tmp_spread: float = self.elos[i].spread + self.elos[i]: EloRate = EloRate( + self.elos[i].mu * elo_scale, + self.elos[i].std + ) + self.elos[i].base = tmp_base + self.elos[i].spread = tmp_spread diff --git a/src/poprank/functional/_bayeselo/data.py b/src/poprank/functional/_bayeselo/data.py new file mode 100644 index 0000000..109176a --- /dev/null +++ b/src/poprank/functional/_bayeselo/data.py @@ -0,0 +1,204 @@ +from dataclasses import dataclass +from popcore import Interaction + + +@dataclass +class PairwiseStatistics: # cr + """A condensed summary of all the interactions between two players. + + Args: + player_idx (int, optional): Id of the player. Defaults to -1. + opponent_idx (int, optional): Id of the opponent. Defaults to -1. + total_games (int, optional): Total number of games played. + Defaults to 0. + w_ij (float, optional): # wins of player i against opponent j. + Defaults to 0. + d_ij (float, optional): # draws of player i against opponent j. + Defaults to 0. + l_ij (float, optional): # losses of player i against opponent j. + Defaults to 0. + w_ji (float, optional): # wins of opponent j against player i. + Defaults to 0. + d_ji (float, optional): # draws of opponent j against player i. + Defaults to 0. + l_ji (float, optional): # losses of opponent j against player i. + Defaults to 0. + """ + + player_idx: int = -1 # id of the player + opponent_idx: int = -1 # id of the opponent + total_games: int = 0 # Total number of games played + w_ij: float = 0 # win player i against player j + d_ij: float = 0 # draw player i against player j + l_ij: float = 0 # loss player i against player j + w_ji: float = 0 # win player j against player i + d_ji: float = 0 # draw player j against player i + l_ji: float = 0 # loss player j against player i + + +@dataclass +class PopulationPairwiseStatistics: # crs + """The pairwise statistics of an entire population + + Args: + num_players(int): Number of players in the population + num_opponents_per_players (list[int]): Number of opponents for + each player + statistics (list[list[PairwiseStatistics]]): Results for each + pair of players + + Static Methods: + from_interactions( + players: 'list[str]', + interactions: 'list[Interaction]', + add_draw_prior: bool = True, + draw_prior: float = 2.0 + ) -> 'PopulationPairwiseStatistics': Turn a list of interactions + into pairwise statistics + + Instance Methods: + add_opponent( + player: str, + opponent: str, + ppcr_ids: "list[int]", + indx: "dict[str, int]" + ) -> None: Add an opponent to the player + + def add_prior(draw_prior: float = 2.0) -> None: + Add prior draws to pairwise statistics + """ + num_players: int # Number of players in the pop + num_opponents_per_player: "list[int]" # nbr of opponents for each player + statistics: "list[list[PairwiseStatistics]]" # Results for each match + + def add_opponent( + self, + player: str, + opponent: str, + ppcr_ids: "list[int]", + indx: "dict[str, int]" + ) -> None: + """Add an opponent to the player""" + ppcr_ids[indx[player]].append(opponent) + self.statistics[indx[player]].append(PairwiseStatistics( + player_idx=indx[player], + opponent_idx=indx[opponent] + )) + self.num_opponents_per_player[indx[player]] += 1 + + def add_prior(self, draw_prior: float = 2.0) -> None: + """Add prior draws to pairwise statistics""" + for player, _ in enumerate(self.statistics): + prior: float = draw_prior * 0.25 / self.count_total_opponent_games( + player) + + for opponent in range(self.num_opponents_per_player[player]): + cr_player = self.statistics[player][opponent] + cr_opponent = self.find_opponent( + cr_player.opponent_idx, player) + this_prior: float = prior * cr_player.total_games + cr_player.d_ij += this_prior + cr_player.d_ji += this_prior + cr_opponent.d_ij += this_prior + cr_opponent.d_ji += this_prior + + def find_opponent( + self, + player_idx: int, + opponent_idx: int, + ) -> PairwiseStatistics: + """Return the pairwise interaction statistics between the player + and the opponent + + Args: + player_idx (int): Id of the player + opponent_idx (int): Id of the opponent + + Raises: + RuntimeError: If the opponent could not be foud + """ + for x in range(self.num_opponents_per_player[player_idx]): + if self.statistics[player_idx][x].opponent_idx == opponent_idx: + return self.statistics[player_idx][x] + raise RuntimeError(f"Cound not find opponent {opponent_idx} \ + for player {player_idx}") + + def count_total_opponent_games( + self, + player_idx: int, + ) -> int: + """Return the sum of all games played by opponents of the player + + Args: + player_idx (int): Id of the player + """ + + return sum([ + opponent.total_games for opponent in self.statistics[player_idx] + ]) + + @staticmethod + def from_interactions( + players: 'list[str]', + interactions: 'list[Interaction]', + add_draw_prior: bool = True, + draw_prior: float = 2.0 + ) -> 'PopulationPairwiseStatistics': + """Turn a list of interactions into pairwise statistics + + Args: + players (list[str]): The list of players + interactions (list[Interaction]): The list of interactions to + turn into pairwise statistics. + add_draw_prior (bool, optional): If true, draws will be added to + pairwise statistics to avoid division by zero errors. + Defaults to True. + draw_prior (float, optional): Value of the draws to add. + Defaults to 2.0. + """ + + # have fun figuring out this indexing mess :) + num_opponents_per_player: "list[int]" = [0 for p in players] + statistics: "list[list[PairwiseStatistics]]" = [[] for p in players] + ppcr_ids: "list[list[str]]" = [[] for p in players] + indx: "dict[str, int]" = {p: i for i, p in enumerate(players)} + + pps: PopulationPairwiseStatistics = PopulationPairwiseStatistics( + num_players=len(players), + num_opponents_per_player=num_opponents_per_player, + statistics=statistics + ) + + for i in interactions: + + # If the players have never played together before + if i.players[1] not in ppcr_ids[indx[i.players[0]]]: + # Add player 1 to the list of opponents of player 0 + pps.add_opponent(i.players[0], i.players[1], ppcr_ids, indx) + + # Add player 0 to the list of opponents of player 1 + pps.add_opponent(i.players[1], i.players[0], ppcr_ids, indx) + + p1_relative_id = ppcr_ids[indx[i.players[0]]].index(i.players[1]) + p0_relative_id = ppcr_ids[indx[i.players[1]]].index(i.players[0]) + + if i.outcomes[0] > i.outcomes[1]: # White wins + pps.statistics[indx[i.players[0]]][p1_relative_id].w_ij += 1 + pps.statistics[indx[i.players[1]]][p0_relative_id].w_ji += 1 + + elif i.outcomes[0] < i.outcomes[1]: # Black wins + pps.statistics[indx[i.players[0]]][p1_relative_id].l_ij += 1 + pps.statistics[indx[i.players[1]]][p0_relative_id].l_ji += 1 + + else: # Draw + pps.statistics[indx[i.players[0]]][p1_relative_id].d_ij += 1 + pps.statistics[indx[i.players[1]]][p0_relative_id].d_ji += 1 + + # Update total games + pps.statistics[indx[i.players[0]]][p1_relative_id].total_games += 1 + pps.statistics[indx[i.players[1]]][p0_relative_id].total_games += 1 + + if add_draw_prior: + pps.add_prior(draw_prior) + + return pps diff --git a/src/poprank/functional/_trueskill/factor_graph.py b/src/poprank/functional/_trueskill/factor_graph.py index e8d00ec..5959e73 100644 --- a/src/poprank/functional/_trueskill/factor_graph.py +++ b/src/poprank/functional/_trueskill/factor_graph.py @@ -58,12 +58,13 @@ from dataclasses import dataclass from math import sqrt -from scipy.stats import norm +from statistics import NormalDist from typing import Callable from poprank import Rate from typing import List INF: float = float("inf") +_norm: NormalDist = NormalDist() @dataclass @@ -306,8 +307,8 @@ def v_win(diff: float, draw_margin: float) -> float: variation of a mean. """ x: float = diff - draw_margin - denom: float = norm.cdf(x) - return (norm.pdf(x) / denom) if denom else -x + denom: float = _norm.cdf(x) + return (_norm.pdf(x) / denom) if denom else -x def v_draw(diff: float, draw_margin: float) -> float: @@ -315,8 +316,8 @@ def v_draw(diff: float, draw_margin: float) -> float: abs_diff: float = abs(diff) a: float = draw_margin - abs_diff b: float = -draw_margin - abs_diff - denom: float = norm.cdf(a) - norm.cdf(b) - numer: float = norm.pdf(b) - norm.pdf(a) + denom: float = _norm.cdf(a) - _norm.cdf(b) + numer: float = _norm.pdf(b) - _norm.pdf(a) return ((numer / denom) if denom else a) * (-1 if diff < 0 else +1) @@ -337,11 +338,11 @@ def w_draw(diff: float, draw_margin: float) -> float: abs_diff: float = abs(diff) a: float = draw_margin - abs_diff b: float = -draw_margin - abs_diff - denom: float = norm.cdf(a) - norm.cdf(b) + denom: float = _norm.cdf(a) - _norm.cdf(b) if denom == 0.: raise FloatingPointError() v: float = v_draw(abs_diff, draw_margin) - return (v ** 2) + (a * norm.pdf(a) - b * norm.pdf(b)) / denom + return (v ** 2) + (a * _norm.pdf(a) - b * _norm.pdf(b)) / denom def flatten(array: List) -> List: diff --git a/src/poprank/functional/bayeselo.py b/src/poprank/functional/bayeselo.py new file mode 100644 index 0000000..73610a6 --- /dev/null +++ b/src/poprank/functional/bayeselo.py @@ -0,0 +1,126 @@ +from popcore import Interaction +from poprank.rates import EloRate + +from ._bayeselo.data import ( + PopulationPairwiseStatistics +) +from ._bayeselo.core import BayesEloRating + + +def bayeselo( + players: "list[str]", interactions: "list[Interaction]", + elos: "list[EloRate]", elo_base: float = 10., elo_spread: float = 400., + elo_draw: float = 97.3, elo_advantage: float = 32.8, + iterations: int = 10000, tolerance: float = 1e-5 +) -> "list[EloRate]": + """Rates players by calculating their new elo using a bayeselo approach + + Given a set of interactions and initial elo ratings, uses a + Minorization-Maximization algorithm to estimate maximum-likelihood + ratings. + The Minorization-Maximization algorithm is performed for the number of + specified iterations or until the changes are below the tolerance + value, whichever comes first. + Made to imitate https://www.remi-coulom.fr/Bayesian-Elo/ + + Args: + players (list[str]): The list of all players + interactions (list[Interactions]): The list of all interactions + elos (list[EloRate]): The initial ratings of the players + elo_base (float, optional): The base of the exponent in the elo + formula. Defaults to 10.0 + elo_spread (float, optional): The divisor of the exponent in the elo + formula. Defaults to 400.0. + elo_draw (float, optional): The probability of drawing. + Defaults to 97.3. + elo_advantage (float, optional): The home-field-advantage + expressed as rating points. Defaults to 32.8. + iterations (int, optional): The maximum number of iterations the + Minorization-Maximization algorithm will go through. + Defaults to 10000. + tolerance (float, optional): The error threshold below which the + Minorization-Maximization algorithm stopt. Defaults to 1e-5. + + Returns: + list[EloRate]: The updated ratings of all players + """ + + # This check is necessary, otherwise the algorithm raises a + # divide by 0 error + if len(interactions) == 0: + return elos + + if len(players) != len(elos): + raise ValueError(f"Players and elos length mismatch\ +: {len(players)} != {len(elos)}") + + for elo in elos: + if not isinstance(elo, EloRate): + raise TypeError("elos must be of type list[EloRate]") + + players_in_interactions = set() + + for interaction in interactions: + players_in_interactions = \ + players_in_interactions.union(interaction.players) + if len(interaction.players) != 2 or len(interaction.outcomes) != 2: + raise ValueError("Bayeselo only accepts interactions involving \ +both a pair of players and a pair of outcomes") + + if interaction.players[0] not in players \ + or interaction.players[1] not in players: + raise ValueError("Players(s) in interactions absent from player \ +list") + + if interaction.outcomes[0] not in (0, .5, 1) or \ + interaction.outcomes[1] not in (0, .5, 1) or \ + sum(interaction.outcomes) != 1: + raise Warning("Bayeselo takes outcomes in the (1, 0), (0, 1), \ +(.5, .5) format, other values may have unspecified behavior") + + for e in elos: + if e.base != elo_base or e.spread != elo_spread: + raise ValueError(f"Elos with different bases and \ +spreads are not compatible (expected base {elo_base}, spread {elo_spread} but \ +got base {e.base}, spread {e.spread})") + + players_in_interactions = [ + player for player in players + if player in players_in_interactions + ] + elos_to_update = [ + elo for elo, player in zip(elos, players) + if player in players_in_interactions + ] + + pairwise_stats = PopulationPairwiseStatistics.from_interactions( + players=players_in_interactions, + interactions=interactions + ) + + bradley_terry = BayesEloRating( + pairwise_stats, elos=elos_to_update, elo_draw=elo_draw, + elo_advantage=elo_advantage, + base=elo_base, spread=elo_spread + ) + + bradley_terry.minorize_maximize( + learn_home_field_bias=False, + home_field_bias=elo_base ** (elo_advantage / elo_spread), + learn_draw_bias=False, + draw_bias=elo_base ** (elo_draw / elo_spread), + iterations=iterations, + tolerance=tolerance + ) + + bradley_terry.rescale_elos() + + new_elos = [] + for i, p in enumerate(players): + if p in players_in_interactions: + new_elos.append(bradley_terry.elos[0]) + bradley_terry.elos = bradley_terry.elos[1:] + else: + new_elos.append(elos[i]) + + return new_elos diff --git a/src/poprank/functional/elo.py b/src/poprank/functional/elo.py index 77d907f..acdefd9 100644 --- a/src/poprank/functional/elo.py +++ b/src/poprank/functional/elo.py @@ -97,560 +97,3 @@ def elo( for i, e in enumerate(elos)] return rates - - -@dataclass -class PairwiseStatistics: # cr - """A condensed summary of all the interactions between two players. - - Args: - player_idx (int, optional): Id of the player. Defaults to -1. - opponent_idx (int, optional): Id of the opponent. Defaults to -1. - total_games (int, optional): Total number of games played. - Defaults to 0. - w_ij (float, optional): # wins of player i against opponent j. - Defaults to 0. - d_ij (float, optional): # draws of player i against opponent j. - Defaults to 0. - l_ij (float, optional): # losses of player i against opponent j. - Defaults to 0. - w_ji (float, optional): # wins of opponent j against player i. - Defaults to 0. - d_ji (float, optional): # draws of opponent j against player i. - Defaults to 0. - l_ji (float, optional): # losses of opponent j against player i. - Defaults to 0. - """ - - player_idx: int = -1 # id of the player - opponent_idx: int = -1 # id of the opponent - total_games: int = 0 # Total number of games played - w_ij: float = 0 # win player i against player j - d_ij: float = 0 # draw player i against player j - l_ij: float = 0 # loss player i against player j - w_ji: float = 0 # win player j against player i - d_ji: float = 0 # draw player j against player i - l_ji: float = 0 # loss player j against player i - - -def count_total_opponent_games( - player_idx: int, - statistics: "list[list[PairwiseStatistics]]") -> int: - """Return the sum of all games played by opponents of the player - - Args: - player_idx (int): Id of the player - num_opponents_per_player (list[int]): Number of opponents per player - statistics (list[list[PairwiseStatistics]]): The array of pairwise - statistics - """ - return sum([opponent.total_games for opponent in statistics[player_idx]]) - - -def find_opponent(player_idx: int, - opponent_idx: int, - num_opponents_per_player: "list[int]", - statistics: "list[list[PairwiseStatistics]]" - ) -> PairwiseStatistics: - """Return the pairwise interaction statistics between the player and the - opponent - - Args: - player_idx (int): Id of the player - opponent_idx (int): Id of the opponent - num_opponents_per_player (list[int]): Number of opponents per player - statistics (list[list[PairwiseStatistics]]): The array of pairwise - statistics - - Raises: - RuntimeError: If the opponent could not be foud - """ - for x in range(num_opponents_per_player[player_idx]): - if statistics[player_idx][x].opponent_idx == opponent_idx: - return statistics[player_idx][x] - raise RuntimeError(f"Cound not find opponent {opponent_idx} \ - for player {player_idx}") - - -@dataclass -class PopulationPairwiseStatistics: # crs - """The pairwise statistics of an entire population - - Args: - num_players(int): Number of players in the population - num_opponents_per_players (list[int]): Number of opponents for - each player - statistics (list[list[PairwiseStatistics]]): Results for each - pair of players - - Static Methods: - from_interactions( - players: 'list[str]', - interactions: 'list[Interaction]', - add_draw_prior: bool = True, - draw_prior: float = 2.0 - ) -> 'PopulationPairwiseStatistics': Turn a list of interactions - into pairwise statistics - - Instance Methods: - add_opponent( - player: str, - opponent: str, - ppcr_ids: "list[int]", - indx: "dict[str, int]" - ) -> None: Add an opponent to the player - - def add_prior(draw_prior: float = 2.0) -> None: - Add prior draws to pairwise statistics - """ - num_players: int # Number of players in the pop - num_opponents_per_player: "list[int]" # nbr of opponents for each player - statistics: "list[list[PairwiseStatistics]]" # Results for each match - - def add_opponent( - self, - player: str, - opponent: str, - ppcr_ids: "list[int]", - indx: "dict[str, int]" - ) -> None: - """Add an opponent to the player""" - ppcr_ids[indx[player]].append(opponent) - self.statistics[indx[player]].append(PairwiseStatistics( - player_idx=indx[player], - opponent_idx=indx[opponent] - )) - self.num_opponents_per_player[indx[player]] += 1 - - def add_prior(self, draw_prior: float = 2.0) -> None: - """Add prior draws to pairwise statistics""" - for player, _ in enumerate(self.statistics): - prior: float = draw_prior * 0.25 / count_total_opponent_games( - player, self.statistics) - - for opponent in range(self.num_opponents_per_player[player]): - cr_player = self.statistics[player][opponent] - cr_opponent = find_opponent( - cr_player.opponent_idx, player, - self.num_opponents_per_player, self.statistics) - this_prior: float = prior * cr_player.total_games - cr_player.d_ij += this_prior - cr_player.d_ji += this_prior - cr_opponent.d_ij += this_prior - cr_opponent.d_ji += this_prior - - @staticmethod - def from_interactions( - players: 'list[str]', - interactions: 'list[Interaction]', - add_draw_prior: bool = True, - draw_prior: float = 2.0 - ) -> 'PopulationPairwiseStatistics': - """Turn a list of interactions into pairwise statistics - - Args: - players (list[str]): The list of players - interactions (list[Interaction]): The list of interactions to - turn into pairwise statistics. - add_draw_prior (bool, optional): If true, draws will be added to - pairwise statistics to avoid division by zero errors. - Defaults to True. - draw_prior (float, optional): Value of the draws to add. - Defaults to 2.0. - """ - - # have fun figuring out this indexing mess :) - num_opponents_per_player: "list[int]" = [0 for p in players] - statistics: "list[list[PairwiseStatistics]]" = [[] for p in players] - ppcr_ids: "list[list[str]]" = [[] for p in players] - indx: "dict[str, int]" = {p: i for i, p in enumerate(players)} - - pps: PopulationPairwiseStatistics = PopulationPairwiseStatistics( - num_players=len(players), - num_opponents_per_player=num_opponents_per_player, - statistics=statistics - ) - - for i in interactions: - - # If the players have never played together before - if i.players[1] not in ppcr_ids[indx[i.players[0]]]: - # Add player 1 to the list of opponents of player 0 - pps.add_opponent(i.players[0], i.players[1], ppcr_ids, indx) - - # Add player 0 to the list of opponents of player 1 - pps.add_opponent(i.players[1], i.players[0], ppcr_ids, indx) - - p1_relative_id = ppcr_ids[indx[i.players[0]]].index(i.players[1]) - p0_relative_id = ppcr_ids[indx[i.players[1]]].index(i.players[0]) - - if i.outcomes[0] > i.outcomes[1]: # White wins - pps.statistics[indx[i.players[0]]][p1_relative_id].w_ij += 1 - pps.statistics[indx[i.players[1]]][p0_relative_id].w_ji += 1 - - elif i.outcomes[0] < i.outcomes[1]: # Black wins - pps.statistics[indx[i.players[0]]][p1_relative_id].l_ij += 1 - pps.statistics[indx[i.players[1]]][p0_relative_id].l_ji += 1 - - else: # Draw - pps.statistics[indx[i.players[0]]][p1_relative_id].d_ij += 1 - pps.statistics[indx[i.players[1]]][p0_relative_id].d_ji += 1 - - # Update total games - pps.statistics[indx[i.players[0]]][p1_relative_id].total_games += 1 - pps.statistics[indx[i.players[1]]][p0_relative_id].total_games += 1 - - if add_draw_prior: - pps.add_prior(draw_prior) - - return pps - - -class BayesEloRating: - """Rates players by calculating their new elo using a bayeselo approach - Given a set of interactions and initial elo ratings, uses a - Minorization-Maximization algorithm to estimate maximum-likelihood - ratings. - Made to imitate https://www.remi-coulom.fr/Bayesian-Elo/ - - Args: - pairwise_stats (PopulationPairwisestatistics): The summary - of all interactions between players - elos (list[EloRate]): The ititial ratings of the players - elo_advantage (float, optional): The home-field-advantage - expressed as rating points. Defaults to 32.8. - elo_draw (float, optional): The probability of drawing. - Defaults to 97.3. - base (float, optional): The base of the exponent in the elo - formula. Defaults to 10.0 - spread (float, optional): The divisor of the exponent in the elo - formula. Defaults to 400.0. - home_field_bias (float, optional): _description_. Defaults to 0.0. - draw_bias (float, optional): _description_. Defaults to 0.0. - - Methods: - update_ratings(self) -> None: Performs one iteration of the - Minorization-Maximization algorithm - update_home_field_bias(self) -> float: Use interaction statistics - to update the home_field_bias automatically - update_draw_bias(self) -> float: Use interaction statistics to - update the draw_bias automatically - compute_difference(self, ratings: "list[float]", - next_ratings: "list[float]") -> float: Compute the impact of - the current interation on ratings - minorize_maximize(self, learn_home_field_bias: bool, - home_field_bias: float, learn_draw_bias: bool, - draw_bias: float, iterations: int, tolerance: float - ) -> None: Perform the MM algorithm for generalized - Bradley-Terry models. - """ - - def __init__( - self, pairwise_stats: PopulationPairwiseStatistics, - elos: "list[EloRate]", elo_advantage: float = 32.8, - elo_draw: float = 97.3, base=10., spread=400., - home_field_bias=0.0, draw_bias=0.0 - ): - - # Condensed results - self.pairwise_stats: PopulationPairwiseStatistics = pairwise_stats - self.elos = elos # Players elos - self.elo_advantage = elo_advantage # advantage of playing white - self.elo_draw = elo_draw # likelihood of drawing - self.ratings = [0. for x in range(pairwise_stats.num_players)] - self.next_ratings = [0. for x in range(pairwise_stats.num_players)] - self.base = base - self.spread = spread - self.home_field_bias: float = home_field_bias - self.draw_bias: float = draw_bias - - def update_ratings(self) -> None: - """Performs one iteration of the Minorization-Maximization algorithm""" - for player in range(self.pairwise_stats.num_players-1, -1, -1): - A: float = 0.0 - B: float = 0.0 - - for opponent in range( - self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): - result: PairwiseStatistics = \ - self.pairwise_stats.statistics[player][opponent] - - if result.opponent_idx > player: - opponent_rating = self.next_ratings[result.opponent_idx] - else: - opponent_rating = self.ratings[result.opponent_idx] - - A += result.w_ij + result.d_ij + result.l_ji + result.d_ji - - B += ((result.d_ij + result.w_ij) * self.home_field_bias / - (self.home_field_bias * self.ratings[player] + - self.draw_bias * opponent_rating) + - (result.d_ij + result.l_ij) * self.draw_bias * - self.home_field_bias / - (self.draw_bias * self.home_field_bias * - self.ratings[player] + - opponent_rating) + - (result.d_ji + result.w_ji) * self.draw_bias / - (self.home_field_bias * opponent_rating + - self.draw_bias * self.ratings[player]) + - (result.d_ji + result.l_ji) / - (self.draw_bias * self.home_field_bias * - opponent_rating + - self.ratings[player])) - - self.next_ratings[player] = A / B - - self.ratings, self.next_ratings = self.next_ratings, self.ratings - - def update_home_field_bias(self) -> float: - """Use interaction statistics to update the home_field_bias - automatically""" - numerator: float = 0. - denominator: float = 0. - - for player in range(self.pairwise_stats.num_players-1, -1, -1): - for opponent in range( - self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): - result = self.pairwise_stats.statistics[player][opponent] - opponent_rating = self.ratings[result.opponent_idx] - - numerator += result.w_ij + result.d_ij - denominator += ((result.d_ij + result.w_ij) * - self.ratings[player] / - (self.home_field_bias * self.ratings[player] + - self.draw_bias * opponent_rating) + - (result.d_ij + result.l_ij) * self.draw_bias * - self.ratings[player] / - (self.draw_bias * self.home_field_bias * - self.ratings[player] + opponent_rating)) - - return numerator / denominator - - def update_draw_bias(self) -> float: - """Use interaction statistics to update the draw_bias automatically""" - numerator: float = 0. - denominator: float = 0. - - for player in range(self.pairwise_stats.num_players-1, -1, -1): - for opponent in range( - self.pairwise_stats.num_opponents_per_player[player]-1, -1, -1): - result = self.pairwise_stats.statistics[player][opponent] - opponent_rating = self.ratings[result.opponent_idx] - - numerator += result.d_ij - denominator += ((result.d_ij + result.w_ij) * opponent_rating / - (self.home_field_bias * self.ratings[player] + - self.draw_bias * opponent_rating) + - (result.d_ij + result.l_ij) * - self.home_field_bias * - self.ratings[player] / - (self.draw_bias * self.home_field_bias * - self.ratings[player] + opponent_rating)) - - c: float = numerator / denominator - return c + (c * c + 1)**0.5 - - def compute_difference(self, ratings: "list[float]", - next_ratings: "list[float]") -> float: - """Compute the impact of the current interation on ratings""" - return max([abs(a-b)/(a+b) for a, b in zip(ratings, next_ratings)]) - - def minorize_maximize( - self, - learn_home_field_bias: bool = False, - home_field_bias: float = 1., - learn_draw_bias: bool = False, - draw_bias: float = 1., - iterations: int = 10000, - tolerance: float = 1e-5, - ) -> None: - """Perform the MM algorithm for generalized Bradley-Terry models. - - The Minorization-Maximization algorithm is performed for the number of - specified iterations or until the changes are below the tolerance - value, whichever comes first. - Args: - use_home_field_bias (bool, optional): _description_. Defaults to - False. - home_field_bias (float, optional): _description_. Defaults to 1.0. - learn_draw_bias (bool, optional): _description_. Defaults to False. - draw_bias (float, optional): _description_. Defaults to 1.0. - iterations (int, optional): _description_. Defaults to 10000. - tolerance (float, optional): _description_. Defaults to 1e-5. - """ - - # Set initial values - self.home_field_bias = home_field_bias - self.draw_bias = draw_bias - self.ratings = [1. for p in range(self.pairwise_stats.num_players)] - - # Main MM loop - for player in range(iterations): - self.update_ratings() - diff = self.compute_difference(self.ratings, self.next_ratings) - - if learn_home_field_bias: - new_home_field_bias = self.update_home_field_bias() - home_field_bias_diff = \ - abs(self.home_field_bias - new_home_field_bias) - if home_field_bias_diff > diff: - diff = home_field_bias_diff - self.home_field_bias = new_home_field_bias - - if learn_draw_bias: - new_draw_bias = self.update_draw_bias() - draw_bias_diff = abs(self.draw_bias - new_draw_bias) - if draw_bias_diff > diff: - diff = draw_bias_diff - self.draw_bias = new_draw_bias - - if diff < tolerance: - break - - # Convert back to Elos - total: float = \ - sum([log(self.ratings[player], self.base) * self.spread - for player in range(self.pairwise_stats.num_players)]) - - offset: float = -total / self.pairwise_stats.num_players - - for player in range(self.pairwise_stats.num_players-1, -1, -1): - self.elos[player].mu = log( - self.ratings[player], self.base) * self.spread + offset - - if learn_home_field_bias: - self.elo_advantage = \ - log(self.home_field_bias, self.base) * self.spread - if learn_draw_bias: - self.elo_draw = log(self.draw_bias, self.base) * self.spread - - def rescale_elos(self) -> None: - """Rescales the elos by a common factor""" - # EloScale # TODO: Figure out what on earth that is - for i, e in enumerate(self.elos): - x: float = e.base**(-self.elo_draw/e.spread) - elo_scale: float = x * 4.0 / ((1 + x) ** 2) - tmp_base: float = self.elos[i].base - tmp_spread: float = self.elos[i].spread - self.elos[i]: EloRate = EloRate( - self.elos[i].mu * elo_scale, - self.elos[i].std - ) - self.elos[i].base = tmp_base - self.elos[i].spread = tmp_spread - - -def bayeselo( - players: "list[str]", interactions: "list[Interaction]", - elos: "list[EloRate]", elo_base: float = 10., elo_spread: float = 400., - elo_draw: float = 97.3, elo_advantage: float = 32.8, - iterations: int = 10000, tolerance: float = 1e-5 -) -> "list[EloRate]": - """Rates players by calculating their new elo using a bayeselo approach - - Given a set of interactions and initial elo ratings, uses a - Minorization-Maximization algorithm to estimate maximum-likelihood - ratings. - The Minorization-Maximization algorithm is performed for the number of - specified iterations or until the changes are below the tolerance - value, whichever comes first. - Made to imitate https://www.remi-coulom.fr/Bayesian-Elo/ - - Args: - players (list[str]): The list of all players - interactions (list[Interactions]): The list of all interactions - elos (list[EloRate]): The initial ratings of the players - elo_base (float, optional): The base of the exponent in the elo - formula. Defaults to 10.0 - elo_spread (float, optional): The divisor of the exponent in the elo - formula. Defaults to 400.0. - elo_draw (float, optional): The probability of drawing. - Defaults to 97.3. - elo_advantage (float, optional): The home-field-advantage - expressed as rating points. Defaults to 32.8. - iterations (int, optional): The maximum number of iterations the - Minorization-Maximization algorithm will go through. - Defaults to 10000. - tolerance (float, optional): The error threshold below which the - Minorization-Maximization algorithm stopt. Defaults to 1e-5. - - Returns: - list[EloRate]: The updated ratings of all players - """ - - # This check is necessary, otherwise the algorithm raises a - # divide by 0 error - if len(interactions) == 0: - return elos - - if len(players) != len(elos): - raise ValueError(f"Players and elos length mismatch\ -: {len(players)} != {len(elos)}") - - for elo in elos: - if not isinstance(elo, EloRate): - raise TypeError("elos must be of type list[EloRate]") - - players_in_interactions = set() - - for interaction in interactions: - players_in_interactions = \ - players_in_interactions.union(interaction.players) - if len(interaction.players) != 2 or len(interaction.outcomes) != 2: - raise ValueError("Bayeselo only accepts interactions involving \ -both a pair of players and a pair of outcomes") - - if interaction.players[0] not in players \ - or interaction.players[1] not in players: - raise ValueError("Players(s) in interactions absent from player \ -list") - - if interaction.outcomes[0] not in (0, .5, 1) or \ - interaction.outcomes[1] not in (0, .5, 1) or \ - sum(interaction.outcomes) != 1: - raise Warning("Bayeselo takes outcomes in the (1, 0), (0, 1), \ -(.5, .5) format, other values may have unspecified behavior") - - for e in elos: - if e.base != elo_base or e.spread != elo_spread: - raise ValueError(f"Elos with different bases and \ -spreads are not compatible (expected base {elo_base}, spread {elo_spread} but \ -got base {e.base}, spread {e.spread})") - - players_in_interactions = [p for p in players if - p in players_in_interactions] - elos_to_update = [e for e, p in zip(elos, players) - if p in players_in_interactions] - - pairwise_stats: PopulationPairwiseStatistics = \ - PopulationPairwiseStatistics.from_interactions( - players=players_in_interactions, - interactions=interactions - ) - - bt: BayesEloRating = BayesEloRating( - pairwise_stats, elos=elos_to_update, elo_draw=elo_draw, - elo_advantage=elo_advantage, - base=elo_base, spread=elo_spread - ) - - bt.minorize_maximize( - learn_home_field_bias=False, - home_field_bias=elo_base ** (elo_advantage/elo_spread), - learn_draw_bias=False, - draw_bias=elo_base ** (elo_draw/elo_spread), - iterations=iterations, - tolerance=tolerance - ) - - bt.rescale_elos() - - new_elos = [] - for i, p in enumerate(players): - if p in players_in_interactions: - new_elos.append(bt.elos[0]) - bt.elos = bt.elos[1:] - else: - new_elos.append(elos[i]) - - return new_elos diff --git a/src/poprank/functional/trueskill.py b/src/poprank/functional/trueskill.py index c3a28b2..895eb92 100644 --- a/src/poprank/functional/trueskill.py +++ b/src/poprank/functional/trueskill.py @@ -1,7 +1,7 @@ from copy import deepcopy from math import sqrt from typing import Callable -from scipy.stats import norm +from statistics import NormalDist from popcore import Interaction, Team, Player from poprank import Rate from ._trueskill.factor_graph import ( @@ -152,7 +152,7 @@ def trueskill( for i, team_diff_var in enumerate(team_diff_variables): # TODO: Make if statement for dynamic draw probability size: int = sum([len(x) for x in sorted_ratings[i:i+2]]) - draw_margin: float = norm.ppf((draw_probability + 1) / 2.) \ + draw_margin: float = NormalDist().inv_cdf((draw_probability + 1) / 2.) \ * sqrt(size) * beta v_func: Callable[[float, float], float] w_func: Callable[[float, float], float] diff --git a/src/poprank/rates.py b/src/poprank/rates.py index 1e48d3a..37f5202 100644 --- a/src/poprank/rates.py +++ b/src/poprank/rates.py @@ -1,6 +1,8 @@ from dataclasses import dataclass from math import sqrt, log, pi, e -from scipy.stats import norm +from statistics import NormalDist + + from abc import ( ABC, abstractmethod ) @@ -26,7 +28,7 @@ def __init__(self, mu: float = 0, std: float = 1): self.__std = std def sample(self) -> float: - pass + raise NotImplementedError() def __lt__(self, other: 'Rate') -> bool: # TODO: is this right? @@ -34,6 +36,8 @@ def __lt__(self, other: 'Rate') -> bool: @property def mu(self) -> float: + """ + """ return self.__mu @mu.setter @@ -53,8 +57,8 @@ def expected_outcome(self, opponent: "Rate"): """probability that player rate > opponent rate given both distributions""" mean = self.mu - opponent.mu - standard_dev = sqrt(self.std**2 + opponent.std**2) - return 1 - norm.cdf(x=0, loc=mean, scale=standard_dev) + standard_dev = sqrt(self.std ** 2 + opponent.std ** 2) + return 1.0 - NormalDist(mean, standard_dev).cdf(x=0) class RateModule(ABC): diff --git a/test/test_bayeselo.py b/test/test_bayeselo.py index c17ab9d..58030c5 100644 --- a/test/test_bayeselo.py +++ b/test/test_bayeselo.py @@ -3,17 +3,17 @@ from os.path import dirname from popcore import Interaction from poprank import EloRate -from poprank.functional.elo import bayeselo # , elo +from poprank.functional import bayeselo class TestBayeseloFunctional(unittest.TestCase): - def translateoutcome(self, outcome: str): + def outcome_to_numeric(self, outcome: str): if outcome == "1-0": return (1, 0) if outcome == "0-1": return (0, 1) - return (.5, .5) + return (0.5, 0.5) def test_implementation_against_bayeselo(self): """Results BayesElo gives for this file @@ -41,7 +41,7 @@ def test_implementation_against_bayeselo(self): with open(games_filepath, "r") as f: games = json.load(f) - self.assertEqual(len(games), 7999) # Sanity check + assert len(games) == 7999 # Sanity check players = [] interactions = [] @@ -52,35 +52,42 @@ def test_implementation_against_bayeselo(self): players.append(x[1]) interactions.append( Interaction(players=[x[0], x[1]], - outcomes=self.translateoutcome(x[2]))) - - elos = [EloRate(mu=0., std=0.) for x in players] - - actual_elos = [EloRate(x, 0) for x in - [215, 201, 200, 186, 179, 155, 152, 108, 71, -29, -78, - -94, -136, -216, -283, -303, -328]] - actual_ranking = ["Hiarcs 11.1", - "Hiarcs 11", - "Shredder 10", - "Loop for Chess960", - "Hiarcs X54", - "Spike 1.2 Turin", - "Fruit 2.2.1", - "Naum 2.1", - "Glaurung 1.2.1", - "Pharaon 3.5.1", - "Ufim 8.02", - "Movei 00.8.383", - "Movei 00.8.366", - "Hermann 1.9", - "Hermann 1.7", - "Aice 0.99.2", - "Ayito 0.2.994"] + outcomes=self.outcome_to_numeric(x[2]))) + + elos = [EloRate(mu=0., std=0.) for _ in players] + + actual_elos = [ + EloRate(elo, 0) for elo in [ + 215, 201, 200, 186, 179, 155, 152, 108, 71, + -29, -78, -94, -136, -216, -283, -303, -328] + ] + actual_ranking = [ + "Hiarcs 11.1", + "Hiarcs 11", + "Shredder 10", + "Loop for Chess960", + "Hiarcs X54", + "Spike 1.2 Turin", + "Fruit 2.2.1", + "Naum 2.1", + "Glaurung 1.2.1", + "Pharaon 3.5.1", + "Ufim 8.02", + "Movei 00.8.383", + "Movei 00.8.366", + "Hermann 1.9", + "Hermann 1.7", + "Aice 0.99.2", + "Ayito 0.2.994" + ] results = bayeselo(players, interactions, elos) - ranked_players = [tmp for (_, tmp) in - sorted(zip(results, players), - key=lambda x: x[0].mu, reverse=True)] + ranked_players = sorted( + zip(results, players), + key=lambda x: x[0].mu, + reverse=True + ) + ranked_players = [player for elo, player in ranked_players] results.sort(key=lambda x: x.mu, reverse=True) results = [EloRate(round(r.mu), 0) for r in results] @@ -105,25 +112,33 @@ def test_implementation_full_scale(self): actual_elos = [EloRate(x, 0) for x in expected_results_500k["ratings"]] actual_ranking = expected_results_500k["actual_ranking"] - self.assertEqual(len(games), 549907) # Sanity check + assert len(games) == 549907 # Sanity check players = [] interactions = [] - for x in games: - if not x[0] in players: - players.append(x[0]) - if not x[1] in players: - players.append(x[1]) + for game in games: + player, opponent, outcome = game + if player not in players: + players.append(player) + if opponent not in players: + players.append(opponent) interactions.append( - Interaction(players=[x[0], x[1]], - outcomes=self.translateoutcome(x[2]))) + Interaction( + players=[player, opponent], + outcomes=self.outcome_to_numeric(outcome) + ) + ) elos = [EloRate(mu=0., std=0.) for x in players] results = bayeselo(players, interactions, elos) - ranked_players = [tmp for (_, tmp) in - sorted(zip(results, players), - key=lambda x: x[0].mu, reverse=True)] + ranked_players = sorted( + zip(results, players), + key=lambda x: x[0].mu, + reverse=True + ) + ranked_players = [player for elo, player in ranked_players] + results.sort(key=lambda x: x.mu, reverse=True) results = [EloRate(round(r.mu), 0) for r in results]