-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtorcs.py
202 lines (168 loc) · 6.65 KB
/
torcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import copy
import collections as col
import math
import os
import random
import time
import gym
from gym import spaces
import numpy as np
from .snakeoil3 import Client as snakeoil3
HOST = os.environ.get('TORCS_HOST', 'localhost')
PORT = int(os.environ.get('TORCS_PORT', '3101'))
FILEDIR = os.path.dirname(os.path.realpath(__file__))
class Torcs:
# Speed limit is applied after this step
terminal_judge_start = 50
# [km/h], episode terminates if car is running slower than this limit
termination_limit_progress = 1
def __init__(self):
self.initial_reset = True
self.initial_run = True
self.client = None
# Action Space
low = [-1., -1.] # steering, throttle/brake
high = [1., 1.] # steering, throttle/brake
self.action_space = spaces.Box(np.array(low), np.array(high))
# Observation Space
low = ([0.] + # angle
[0.] * 19 + # track sensors,
[-np.inf] + # trackPos
[-np.inf, -np.inf, -np.inf] + # speedX, speedY, speedZ
[-np.inf] * 4 + # wheelSpinVel
[-np.inf]) # rpm
high = ([1.] + # angle
[1.] * 19 + # track sensors
[np.inf] + # trackPos
[np.inf, np.inf, np.inf] + # speedX, speedY, speedZ
[np.inf] * 4 + # wheelSpinVel
[np.inf]) # rpm
self.observation_space = spaces.Box(np.array(low), np.array(high))
def step(self, action):
client = self.client
# Apply Action
action_torcs = client.R.d
action_torcs['steer'] = np.clip(action[0], -1, 1)
throttle = np.clip(np.abs(action[1]), 0, 1)
if action[1] > 0:
action_torcs['accel'] = throttle
action_torcs['brake'] = 0
else:
action_torcs['accel'] = 0
action_torcs['brake'] = throttle
# Automatic gear shifting
action_torcs['gear'] = 1
if client.S.d['speedX'] > 50:
action_torcs['gear'] = 2
if client.S.d['speedX'] > 80:
action_torcs['gear'] = 3
if client.S.d['speedX'] > 110:
action_torcs['gear'] = 4
if client.S.d['speedX'] > 140:
action_torcs['gear'] = 5
if client.S.d['speedX'] > 170:
action_torcs['gear'] = 6
# Save the previous full-obs from torcs for the reward calculation
obs_pre = copy.deepcopy(client.S.d)
try:
client.respond_to_server() # Apply the Agent's action into torcs
client.get_servers_input() # Get the response of TORCS
except ConnectionError:
self.initial_reset = True
raise EnvironmentError('Torcs server went away.')
# Get the current full-observation from torcs
obs = client.S.d
# Make an observation from a raw observation vector from TORCS
self.observation = self.make_observation(obs)
# Compute reward.
# TODO: Make plugable
speed = np.array(obs['speedX'])
progress = speed * np.cos(obs['angle']) # forward progress
reward = speed * (np.cos(obs['angle']) - # encourage forward
np.abs(np.sin(obs['angle'])) - # discourage sideways
np.abs(obs['trackPos'])) # discourage off-center
# Collision detection.
if obs['damage'] - obs_pre['damage'] > 0:
reward = -1
print('terminated due to damage taken')
episode_terminate = True
client.R.d['meta'] = True
# Termination judgement
episode_terminate = False
# Episode is terminated if the car is out of track
if np.min(obs['track']) < 0:
print('terminated due to out of track')
reward = -1
episode_terminate = True
client.R.d['meta'] = True
print('progress {:.2f}, reward {:.2f}'.format(progress, reward))
# Episode terminates if the progress of agent is small
if self.terminal_judge_start < self.time_step and progress < 1:
reward = -1
episode_terminate = True
client.R.d['meta'] = True
# if self.terminal_judge_start < self.time_step and speed < 10:
# print('terminated due to speed')
# episode_terminate = True
# client.R.d['meta'] = True
# Episode is terminated if the agent runs backward
# if np.cos(obs['angle']) < 0:
# print('terminated due to angle')
# episode_terminate = True
# client.R.d['meta'] = True
# Send a reset signal
if client.R.d['meta'] is True:
self.initial_run = False
try:
client.respond_to_server()
except ConnectionError:
self.initial_reset = True
raise EnvironmentError('Torcs server went away.')
# if episode_terminate:
# reward = -1
self.time_step += 1
return self.observation, reward, client.R.d['meta'], {}
def reset(self):
self.time_step = 0
if self.initial_reset is not True:
self.client.R.d['meta'] = True
try:
self.client.respond_to_server()
except ConnectionError:
self.initial_reset = True
return self.reset()
# Modify here if you use multiple tracks in the environment
# Open new UDP in vtorcs
self.client = snakeoil3(H=HOST, p=PORT, vision=False)
self.client.MAX_STEPS = np.inf
try:
self.client.get_servers_input() # Get the initial input from torcs
except ConnectionError:
self.initial_reset = True
return self.reset()
obs = self.client.S.d # Get the current full-observation from torcs
self.observation = self.make_observation(obs)
self.last_u = None
self.initial_reset = False
return self.observation
def make_observation(self, obs):
"""
angle, track sensors, trackPos, speedX, speedY, speedZ,
wheelSpinVel, rpm
"""
sensors = [('angle', 3.1416),
('track', 200),
('trackPos', 1),
('speedX', 300),
('speedY', 300),
('speedZ', 300),
('wheelSpinVel', 1),
('rpm', 10000)]
data = [np.array(obs[sensor]) / div for sensor, div in sensors]
return np.hstack(data)
def render(self, close=False):
"""Not implemented. Always renders."""
...
def close(self):
"""Shutdown connection to torcs server."""
self.client.shutdown()