-
Notifications
You must be signed in to change notification settings - Fork 0
/
PPO_alone_with_binary_reward.py
161 lines (136 loc) · 6.03 KB
/
PPO_alone_with_binary_reward.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
np.random.seed(42)
# Define the neural network for the policy and value functions
class PolicyValueNetwork(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(PolicyValueNetwork, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim) # Directly outputting action probabilities
self.value_head = nn.Linear(hidden_dim, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
action_probs = torch.softmax(self.fc2(x), dim=-1)
value = self.value_head(x)
return action_probs, value
class WateringPlantEnv:
def __init__(self):
self.moisture_target = 0.7 #this number describes the percentage of water held by the soil (100 percent means the soil reaches its limit of holding water)
self.moisture_increase = 0.03
self.moisture = self.moisture_target
self.time_elapsed = 0
self.history = []
def moisture_decay_func(self, dt, k = 1):
decay = self.moisture * (1 - np.e**(-k * dt))
return decay
#dt is the elapsed time, for 1 time step dt = 1
#k is loss rate (unit is 1/s) depend on current temperature
#k needs to corresponds with dt
# this decay is also representing the percentage decayed (if you decay by 0.5, that means the soil loses half of its maximum water capacity)
def reset(self):
self.moisture = self.moisture_target
self.time_elapsed = 0
self.history = []
return torch.tensor([self.time_elapsed], dtype=torch.float32)
def step(self, action):
if action == 1: # water the plant
self.moisture += self.moisture_increase
self.time_elapsed = 0
else: # do not water
self.moisture -= self.moisture_decay_func(self.moisture, 0.01)
self.time_elapsed += 1
self.history.append(self.moisture)
if len(self.history) > 100:
self.history.pop(0)
# reward = -10.0 * abs(self.moisture - self.moisture_target)
if 0.675 < self.moisture < 0.725:
reward = 1.0
else:
reward = 0.0
done = len(self.history) == 100 and all(0.65 <= m <= 0.75 for m in self.history)
return torch.tensor([self.time_elapsed], dtype=torch.float32), reward, done
class PPO:
def __init__(self, input_dim, hidden_dim, output_dim, lr=1e-3, gamma=0.99, epsilon=0.9, epsilon_decay=0.995):
self.network = PolicyValueNetwork(input_dim, hidden_dim, output_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
def get_action(self, state):
action_probs, _ = self.network(state)
if np.random.rand() > self.epsilon:
action = torch.argmax(action_probs).item()
else:
action = torch.multinomial(action_probs, 1).item()
return action
def compute_loss(self, states, actions, rewards, next_states, old_probs):
probs, values = self.network(states)
_, next_values = self.network(next_states)
td_targets = rewards + self.gamma * next_values.squeeze()
advantages = td_targets - values.squeeze()
new_probs = probs.gather(1, actions.unsqueeze(-1)).squeeze()
ratio = new_probs / (old_probs + 1e-10)
policy_loss = -torch.min(ratio * advantages, torch.clamp(ratio, 0.8, 1.2) * advantages).mean()
value_loss = 0.5 * (td_targets - values.squeeze()).pow(2).mean()
return policy_loss + value_loss
def update(self, states, actions, rewards, next_states, old_probs):
loss = self.compute_loss(states, actions, rewards, next_states, old_probs)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
self.optimizer.step()
self.epsilon *= self.epsilon_decay
# Parameters
num_epochs = 2500
max_timesteps = 100
batch_size = 256
env = WateringPlantEnv()
ppo = PPO(1, 64, 2)
for epoch in range(num_epochs):
states, actions, rewards, next_states, old_probs = [], [], [], [], []
state = env.reset()
for t in range(max_timesteps):
action = ppo.get_action(state.unsqueeze(0))
next_state, reward, done = env.step(action)
action_probs, _ = ppo.network(state.unsqueeze(0))
prob = action_probs[0][action].item()
states.append(state)
actions.append(action)
rewards.append(reward)
next_states.append(next_state)
old_probs.append(prob)
state = next_state
if done:
break
# Convert collected data into tensors
states = torch.stack(states)
actions = torch.tensor(actions)
rewards = torch.tensor(rewards)
next_states = torch.stack(next_states)
old_probs = torch.tensor(old_probs)
# Update PPO in mini-batches
for i in range(0, len(states), batch_size):
ppo.update(states[i:i+batch_size], actions[i:i+batch_size], rewards[i:i+batch_size],
next_states[i:i+batch_size], old_probs[i:i+batch_size])
# Print progress
if epoch % 100 == 0:
print(f"Epoch {epoch}, Average Reward: {rewards.mean().item()}")
# Test the policy after training
max_test_timesteps = 1000
state = env.reset()
total_reward = 0
success = False # Initialize a variable to keep track of success
for t in range(max_test_timesteps):
action = ppo.get_action(state.unsqueeze(0))
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state
if done:
print(f"Great! The model is an hydration expert now at iteration ", t)
success = True # Update the variable to indicate success
break
# Only print the failure message if the success message was not printed
if not success:
print("Unfortunately, policy does not pass the 100 iteration hydration test after 5000 timestamp, which means the policy does not grasp how to water a plant.")