-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqn.py
160 lines (137 loc) · 5.7 KB
/
qn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import math
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow import keras
from keras import layers
import os
#os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import random
import gym
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.optimizers import Adam, RMSprop
def OurModel(input_shape, action_space):
X_input = Input(input_shape)
X = Dense(128, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X_input)
X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)
X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)
model = Model(inputs=X_input, outputs=X)
model.compile(loss="mse", optimizer=Adam(lr=0.001))
model.summary()
return model
class DQNAgent:
def __init__(self):
self.env = gym.make('CartPole-v1')
# by default, CartPole-v1 has max episode steps = 500
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
self.EPISODES = 1000
self.memory = deque(maxlen=1024 * 4)
self.gamma = 0.95 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.03
self.epsilon_decay = 0.999
self.batch_size = 1024
self.epoch_count = 4
self.train_start = 1024
self.visualize = True
# create main model
self.model = OurModel(input_shape=(self.state_size,), action_space=self.action_size)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
if len(self.memory) >= self.train_start:
if self.epsilon >= self.epsilon_min:
self.epsilon *= self.epsilon_decay
def act(self, state):
if np.random.random() <= self.epsilon:
return random.randrange(self.action_size)
else:
return np.argmax(self.model.predict(state, verbose=0))
def replay(self):
if len(self.memory) < self.train_start:
return
# Randomly sample minibatch from the memory
minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))
state = np.zeros((self.batch_size, self.state_size))
next_state = np.zeros((self.batch_size, self.state_size))
action, reward, done = [], [], []
# do this before prediction
# for speedup, this could be done on the tensor level
# but easier to understand using a loop
for i in range(self.batch_size):
state[i] = minibatch[i][0]
action.append(minibatch[i][1])
reward.append(minibatch[i][2])
next_state[i] = minibatch[i][3]
done.append(minibatch[i][4])
# do batch prediction to save speed
target = self.model.predict(state, verbose=0)
target_next = self.model.predict(next_state, verbose=0)
for i in range(self.batch_size):
# correction on the Q value for the action used
if done[i]:
target[i][action[i]] = reward[i]
else:
# Standard - DQN
# DQN chooses the max Q value among next actions
# selection and evaluation of action is on the target Q Network
# Q_max = max_a' Q_target(s', a')
target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))
# Train the Neural Network with batches
self.model.fit(state, target, batch_size=self.batch_size, epochs=self.epoch_count, verbose=0)
def load(self, name):
self.model = load_model(name)
def save(self, name):
self.model.save(name)
def run(self):
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
if self.visualize:
self.env.render()
action = self.act(state)
next_state, reward, done, _ = self.env.step(action)
next_state = np.reshape(next_state, [1, self.state_size])
if not done or i == self.env._max_episode_steps - 1:
reward = reward
else:
reward = -100
self.remember(state, action, reward, next_state, done)
state = next_state
i += 1
if done:
print("episode: {}/{}, score: {}, e: {:.2}".format(e, self.EPISODES, i, self.epsilon))
print("Saving trained model as cartpole-dqn.h5")
self.save("cartpole-dqn2.h5")
#return
self.replay()
def test(self):
self.load("cartpole-dqn.h5")
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
self.env.render()
action = np.argmax(self.model.predict(state, verbose=0))
next_state, reward, done, _ = self.env.step(action)
state = np.reshape(next_state, [1, self.state_size])
i += 1
if done:
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
break
if __name__ == "__main__":
agent = DQNAgent()
#agent.model.load_weights('cartpole-dqn2.h5')
#agent.epsilon_min = 0.0
#agent.epsilon = agent.epsilon_min
#agent.visualize = True
#agent.run()
# agent.test()