forked from Kirkados/AIAA_GNC_2021
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
382 lines (270 loc) · 14.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
"""
Main script that runs the D4PG learning algorithm
(https://arxiv.org/pdf/1804.08617)
It features the standard DDPG algorithm with a number
of improvements from other researchers.
Namely:
Distributed rollouts (https://arxiv.org/pdf/1602.01783)
A distributional learner (http://arxiv.org/abs/1707.06887)
N-step returns (https://arxiv.org/pdf/1602.01783)
Prioritized experience replay (http://arxiv.org/abs/1511.05952)
This implementation does not use the
ApeX framework (https://arxiv.org/abs/1803.00933) as the original authors did.
Instead, it uses python's 'threading' and 'multiprocessing' library.
Different tasks are contained in different threads. Tensorflow is thread-safe and automatically multi-threaded.
Each instance of the environment is contained in a different process due to scipy not being thread-safe.
===== Notes =====
No notes at the moment
@author: Kirk Hovell ([email protected])
Special thanks to:
- msinto93 (https://github.com/msinto93)
- SuReLI (https://github.com/SuReLI)
- DeepMind (https://github.com/deepmind)
- OpenAI (https://github.com/openai)
for publishing their codes! The open-source attitude of AI research is wonderful.
Code started: October 15, 2018
Learning algorithm complete: May 6, 2019
"""
# Importing libraries & other classes
# Others'
import shutil
import os
import glob
import time
import threading
import multiprocessing
import random
import datetime
import psutil
import tensorflow as tf
import numpy as np
import sys
# My own
from learner import Learner
from replay_buffer import ReplayBuffer
from prioritized_replay_buffer import PrioritizedReplayBuffer
from settings import Settings
import saver
environment_file = __import__('environment_' + Settings.ENVIRONMENT)
agent_file = __import__('agent' + Settings.AGENT)
#%%
##########################
##### SETTING UP RUN #####
##########################
start_time = time.time()
# Clearing Tensorflow graph
tf.reset_default_graph()
# Setting Tensorflow configuration parameters
config = tf.ConfigProto()
config.intra_op_parallelism_threads = psutil.cpu_count(logical = False) # Number of CPU physical cores recommended
if psutil.cpu_count(logical = False) == 32:
config.inter_op_parallelism_threads = 32 # RCDC has 32 sockets
else:
config.inter_op_parallelism_threads = 1 # All my other computers have 1
# Set random seeds
tf.set_random_seed(Settings.RANDOM_SEED)
np.random.seed(Settings.RANDOM_SEED)
random.seed(Settings.RANDOM_SEED)
############################################################
##### New run or continuing a partially completed one? #####
############################################################
# If we're continuing a run
if Settings.RESUME_TRAINING:
filename = Settings.RUN_NAME # Reuse the name too
starting_episode_number = np.zeros(Settings.NUMBER_OF_ACTORS, dtype = np.int8) # initializing
starting_iteration_number = 0 # initializing
try:
# Grab the tensorboard path
old_tensorboard_filename = [i for i in os.listdir(Settings.MODEL_SAVE_DIRECTORY + filename) if i.endswith(Settings.TENSORBOARD_FILE_EXTENSION)][0]
# For every entry in the tensorboard file
for tensorboard_entry in tf.train.summary_iterator(Settings.MODEL_SAVE_DIRECTORY + filename + "/" + old_tensorboard_filename):
# Search each one for the Loss value so you can find the final iteration number
for tensorboard_value in tensorboard_entry.summary.value:
if tensorboard_value.tag == 'Logging_Learning/Loss':
starting_iteration_number = max(tensorboard_entry.step, starting_iteration_number)
# Search also for the actors so you can find what episode they were on
for agent_number in range(Settings.NUMBER_OF_ACTORS):
for tensorboard_value in tensorboard_entry.summary.value:
if tensorboard_value.tag == 'Agent_' + str(agent_number + 1) + '/Number_of_timesteps':
starting_episode_number[agent_number] = max(tensorboard_entry.step, starting_episode_number[agent_number])
except:
# If the load failed... quit run
print("Couldn't load in old tensorboard file! Quitting run.")
raise SystemExit
else: # Otherwise, we are starting from scratch
# Generate a filename using Settings.RUN_NAME with the current timestamp
filename = Settings.RUN_NAME + '-{:%Y-%m-%d_%H-%M}'.format(datetime.datetime.now())
starting_episode_number = np.ones(Settings.NUMBER_OF_ACTORS, dtype = int) # All actors start at episode 0
starting_iteration_number = 1 # learner starts at iteration 0
# Generate writer that will log Tensorboard scalars & graph
writer = tf.summary.FileWriter(Settings.MODEL_SAVE_DIRECTORY + filename, filename_suffix = Settings.TENSORBOARD_FILE_EXTENSION)
# Saving a copy of the all python files used in this run, for reference
# Make directory if it doesn't already exist
os.makedirs(os.path.dirname(Settings.MODEL_SAVE_DIRECTORY + filename + '/code/'), exist_ok=True)
for each_file in glob.glob('*.py'):
shutil.copy2(each_file, Settings.MODEL_SAVE_DIRECTORY + filename + '/code/')
#######################################
##### Starting Tensorflow session #####
#######################################
with tf.Session(config = config) as sess:
print("\nThis run is named " + filename)
print("\nThe environment file is: environment_" + Settings.ENVIRONMENT + '\n')
if Settings.TEST_ON_DYNAMICS:
print("At test time, full dynamics are being used\n")
else:
print("At test time, kinematics are being used\n")
if Settings.KINEMATIC_NOISE:
print("Noise is being applied to the kinematics during training to simulate a poor controller\n")
def get_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size
##############################
##### Initializing items #####
##############################
# Initializing saver class (for loading & saving data)
saver = saver.Saver(sess, filename)
# Initializing replay buffer, with the option of a prioritized replay buffer
if Settings.PRIORITY_REPLAY_BUFFER:
replay_buffer = PrioritizedReplayBuffer()
else:
replay_buffer = ReplayBuffer()
# Initializing thread & process list
threads = []
environment_processes = []
# Event()s are used to communicate with threads while they run.
# In this case, it is used to signal to the threads when it is time to stop gracefully.
stop_run_flag = threading.Event() # Flag to stop all threads
replay_buffer_dump_flag = threading.Event() # Flag to pause data writing to the replay buffer
replay_buffer_dump_flag.set() # Set the flag to initially be True so that the agents will write data
# Generating the learner and assigning it to a thread
if Settings.USE_GPU_WHEN_AVAILABLE:
# Allow GPU use when appropriate
learner = Learner(sess, saver, replay_buffer, writer)
# Generate the queue responsible for communicating with the agent (for test distribution calculating)
agent_to_learner, learner_to_agent = learner.generate_queue()
else:
# Forcing to the CPU only
with tf.device('/device:CPU:0'):
learner = Learner(sess, saver, replay_buffer, writer)
# Generate the queue responsible for communicating with the agent (for test distribution calculating)
agent_to_learner, learner_to_agent = learner.generate_queue()
threads.append(threading.Thread(target = learner.run, args = (stop_run_flag, replay_buffer_dump_flag, starting_iteration_number)))
# Generating the actors and placing them into their own threads
for i in range(Settings.NUMBER_OF_ACTORS):
if Settings.USE_GPU_WHEN_AVAILABLE:
# Allow GPU use when appropriate
# Make an instance of the environment which will be placed in its own process
if Settings.ENVIRONMENT == 'gym':
environment = environment_file.Environment(filename, i+1, Settings.CHECK_GREEDY_PERFORMANCE_EVERY_NUM_EPISODES, Settings.VIDEO_RECORD_FREQUENCY, Settings.MODEL_SAVE_DIRECTORY) # Additional parameters needed for gym
else:
environment = environment_file.Environment()
# Set the environment seed
environment.seed(Settings.RANDOM_SEED*(i+1))
# Generate the queue responsible for communicating with the agent
agent_to_env, env_to_agent = environment.generate_queue()
# Generate the actor
actor = agent_file.Agent(sess, i+1, agent_to_env, env_to_agent, replay_buffer, writer, filename, learner.actor.parameters, agent_to_learner, learner_to_agent)
else:
with tf.device('/device:CPU:0'):
# Forcing to the CPU only
# Make an instance of the environment which will be placed in its own process
if Settings.ENVIRONMENT == 'gym':
environment = environment_file.Environment(filename, i+1, Settings.CHECK_GREEDY_PERFORMANCE_EVERY_NUM_EPISODES, Settings.VIDEO_RECORD_FREQUENCY, Settings.MODEL_SAVE_DIRECTORY) # Additional parameters needed for gym
else:
environment = environment_file.Environment()
# Set the environment seed
environment.seed(Settings.RANDOM_SEED*(i+1))
# Generate the queue responsible for communicating with the agent
agent_to_env, env_to_agent = environment.generate_queue()
# Generate the actor
actor = agent_file.Agent(sess, i+1, agent_to_env, env_to_agent, replay_buffer, writer, filename, learner.actor.parameters, agent_to_learner, learner_to_agent)
# Add thread and process to the list
threads.append(threading.Thread(target = actor.run, args = (stop_run_flag, replay_buffer_dump_flag, starting_episode_number)))
environment_processes.append(multiprocessing.Process(target = environment.run, daemon = True)) # daemon ensures process is killed when main ends
# If desired, try to load in partially-trained parameters
if Settings.RESUME_TRAINING == True:
if not saver.load():
# If loading was not successful -> quit program
print("Could not load in parameters... quitting program")
raise SystemExit
else:
# Don't try to load in parameters, just initialize them instead
# Initialize saver
saver.initialize()
# Initialize Tensorflow variables
sess.run(tf.global_variables_initializer())
# Starting all environments
for each_process in environment_processes:
each_process.start()
#############################################
##### STARTING EXECUTION OF ALL THREADS #####
#############################################
# #
# #
for each_thread in threads: #
# #
each_thread.start() #
# #
# #
#############################################
############## THREADS STARTED ##############
#############################################
# Write the Tensorflow computation graph to file, now that it has been fully built
writer.add_graph(sess.graph)
print('Done starting!')
# For printing out all variables and their sizes
def sizeof_fmt(num, suffix='B'):
''' by Fred Cirera, https://stackoverflow.com/a/1094933/1870254, modified'''
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
####################################################
##### Waiting until all threads have completed #####
####################################################
print("Running until threads finish or until you press Ctrl + C")
# Getting the current process
process = psutil.Process(os.getpid())
try:
counter = 0
while True:
time.sleep(0.5)
if counter % 1200 == 0:
print("Main.py (Environment %s) is using %2.3f GB of RAM and the buffer has %i samples" %(Settings.RUN_NAME, process.memory_info().rss/1000000000.0, replay_buffer.how_filled()))
counter += 1
# If all agents have finished, gracefully stop the learner and end
if np.sum(each_thread.is_alive() for each_thread in threads) <= 1:
print("All threads ended naturally.")
# Gracefully stop learner
stop_run_flag.set()
# Join threads (suspends main.py until threads finish)
for each_thread in threads:
each_thread.join()
break
except KeyboardInterrupt: # if someone pressed Ctrl + C
print("Interrupted by user!")
print("Stopping all the threads!!")
# Gracefully stop all threads, ending episodes and saving data
stop_run_flag.set()
# Join threads (suspends main.py until threads finish)
for each_thread in threads:
each_thread.join()
print("This run completed in %.3f hours." %((time.time() - start_time)/3600))
print("Done closing! Goodbye :)")