-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnes.py
189 lines (144 loc) · 6.08 KB
/
nes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Module containing the core class of the Natural Evolution Strategies
:Date: 2019-03-11
:Version: 1
:Authors:
- Cedric Derstroff
- Janosch Moos
- Kay Hansel
"""
from utilities.estimations import *
class NES:
"""Core class of the NES algorithm. Contains all relevant Parameters
except for the policy. Important functions are "do" to run a single
training step as well as "fitness" for evaluating samples during
training.
This class implements the Separable NES (SNES).
Wierstra, D., Schaul, T., Glasmachers, T., Sun, Y., Peters, J.,
Schmidhuber, J.: NaturalEvolution Strategies. Journal of
Machine Learning Research 15, 949–980 (2014).
URL http://jmlr.org/papers/v15/wierstra14a.html
Attributes
-----------
normalizer: Normalizer
None
Methods
-----------
do(env, policy, n_roll_outs)
Runs a single training step:
1. draw a set of parameter samples
2. Gets an evaluation (fitness) for all samples using n
simulations (roll-outs) for each sample on the environment
3. Updates parameters based on samples sorted by their fitness
"""
def __init__(self, n_parameters: int, eta_sigma: float = None,
eta_mu: float = None, population_size: int = None,
sigma_lower_bound: float = 1e-10, sigma_init: float = 1.0):
"""
:param n_parameters: Number of parameters of the policy which
will be trained
:type n_parameters: int
:param eta_sigma: Learning rate for training the variance
:type eta_sigma: float
:param eta_mu: Learning rate for training the parameters
:type eta_mu: float
:param population_size: Number of sampled policy parameters for
learning
:type population_size: int
:param sigma_lower_bound: The lower bound of the variance,
i.e. the lowest value the variance everything below will be
set to this value
:type sigma_lower_bound: float
:param sigma_init: Initial value of the variance
:type sigma_init: float
"""
self.normalizer = None
# pre calculate value for performance
log_d = np.log(n_parameters)
# calculate population size if not specified
if population_size is not None:
self.__population_size = population_size
else:
self.__population_size = 4 + int(3 * log_d)
# calculate eta_sigma if not specified
if eta_sigma is not None:
self.__eta_sigma = eta_sigma
else:
self.__eta_sigma = (3 + log_d) / np.sqrt(n_parameters) / 5
# set eta_mu
self.__eta_mu = eta_mu if eta_mu is not None else 1
# define lower bound for sigma to avoid artifacts in
# calculations
self.__sigma_lower_bound = sigma_lower_bound
# utility is always equal hence we can pre compute it here
log_half = np.log(0.5 * self.__population_size + 1)
log_k = np.log(np.arange(1, self.__population_size + 1))
numerator = np.maximum(0, log_half - log_k)
utilities = numerator / np.sum(numerator) - 1 / self.__population_size
# define sigma
if sigma_init <= self.__sigma_lower_bound:
sigma_init = self.__sigma_lower_bound
self.__sigma = np.ones(n_parameters) * sigma_init
self.__sigma_init = sigma_init
# random number generator for drawing samples z_k
seed: int = int(np.random.rand() * 2**32 - 1)
self.__sampler = np.random.RandomState(seed)
# pre calculate values for performance
self.__u_eta_sigma_half = 0.5 * self.__eta_sigma * utilities
self.__u_eta_mu = self.__eta_mu * utilities
# Main Functions
# ===============================================================
def do(self, env, policy, n_roll_outs):
"""Runs a single training step:
1. draw a set of parameter samples
2. Gets an evaluation (fitness) for all samples using n
simulations (roll-outs) for each sample on the environment
3. Updates parameters based on samples sorted by their fitness
:param env: Contains the gym environment the simulations are
performed on
:type env: Environment
:param policy: The policy to improve
:type policy: Policy
:param n_roll_outs: Number of roll outs per policy
:type n_roll_outs: int
:return: the array of the fitness of the policies and the array
of the time steps until the policy encountered the done flag
:rtype array, array
"""
mu = policy.get_parameters()
# draw samples from search distribution
s = self.__sampler.normal(0, 1, (self.__population_size, len(mu)))
z = mu + self.__sigma * s
# evaluate fitness
fitness, steps = estimate_fitness(policy, env, z, n_roll_outs)
# sort samples according to fitness
s_sorted = s[np.argsort(fitness, kind="mergesort")[::-1]]
# update parameters
mu += self.__sigma * (self.__u_eta_mu @ s_sorted)
self.__sigma *= np.exp(self.__u_eta_sigma_half @ (s_sorted ** 2 - 1))
# sigma has to be positive
self.__sigma[self.__sigma < self.__sigma_lower_bound] =\
self.__sigma_lower_bound
policy.set_parameters(mu)
return fitness, steps
# getter only properties
@property
def title(self):
"""Generates a title for plotting results containing all
relevant parameters and the algorithm name
:return: the title for the plots
:rtype str
"""
return r"NES $\lambda = {}, " \
r"\sigma_0 = {}, " \
r"\eta_\sigma = {:.4f}, " \
r"\eta_\mu = {}$".format(self.__population_size,
self.__sigma_init,
self.__eta_sigma,
self.__eta_mu)
@property
def name(self):
"""Returns algorithm name
:return: 'NES'
:rtype str
"""
return 'NES'