forked from heronyang/airport-simulation
-
Notifications
You must be signed in to change notification settings - Fork 11
/
controller.py
170 lines (135 loc) · 7.16 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
Ground Controller oversees the aircraft movement on the ground. It continuously observes the world and sends
commands to pilots when there is a notable situation.
"""
import collections
from config import Config
class Controller:
def __init__(self, ground):
self.ground = ground
self.PILOT_VISION = Config.params["aircraft_model"]["pilot_vision"]
self.CLOSE_NODE_THRESHOLD = Config.params["simulation"]["close_node_threshold"]
def tick(self):
self.__observe()
self.__resolve_conflicts()
def __observe(self):
aircraft_list = self.ground.aircrafts
self.aircraft_location_lookup = collections.defaultdict(list) # {link: (aircraft, distance_on_link)}
self.aircraft_ahead_lookup = {} # {aircraft: (target_speed, relative_distance)}
self.conflicts = []
"""
Observe the invertedAircraftLocations = {link: (aircraft, distance_on_link)} set.
Observe the closestAircraft = {aircraft: (target_speed, relative_distance)} dict.
Observe potential conflicts.
"""
for aircraft in aircraft_list:
if aircraft.itinerary.is_completed:
continue
link, distance = aircraft.itinerary.current_target, aircraft.itinerary.current_distance
self.aircraft_location_lookup[link].append((aircraft, distance))
# Sort the {link: (aircraft, distance_on_link)} by distance_on_link.
for _, aircraft_pair in self.aircraft_location_lookup.items():
aircraft_pair.sort(key=lambda pair: pair[1])
for aircraft in aircraft_list:
if aircraft.itinerary.is_completed:
continue
try:
target_speed, relative_distance, fronter_aircraft = self.__find_aircraft_ahead(aircraft)
self.aircraft_ahead_lookup[aircraft] = (target_speed, relative_distance)
# TODO: discuss with zy & what if none
aircraft.set_fronter_info((target_speed, relative_distance))
aircraft.set_fronter_aircraft(fronter_aircraft)
except NoCloseAircraftFoundError:
# TODO: discuss with zy & what if none
aircraft.set_fronter_info(None)
# TODO: currently just assume the front is moving
# aircraft.set_fronter_info((200, 100))
pass
def __find_aircraft_ahead(self, aircraft):
link_index, link_distance = aircraft.itinerary.current_target_index, aircraft.itinerary.current_distance
relative_distance = -link_distance
for index in range(link_index, aircraft.itinerary.length):
link = aircraft.itinerary.targets[index]
aircraft_on_same_link = self.aircraft_location_lookup.get(link, [])
rear_aircraft, rear_dist = None, -1
for item in aircraft_on_same_link:
item_aircraft, item_distance = item
# Skip if the item is behind the aircraft
if index == link_index and item_distance <= link_distance:
continue
# Found an aircraft ahead!
relative_distance += item_distance
if relative_distance < self.CLOSE_NODE_THRESHOLD:
self.conflicts.append((aircraft, item_aircraft))
if relative_distance > self.PILOT_VISION:
continue
if rear_aircraft is None:
rear_aircraft, rear_dist = item_aircraft, relative_distance
elif relative_distance < rear_dist:
rear_aircraft, rear_dist = item_aircraft, relative_distance
if rear_aircraft is not None:
return rear_aircraft.speed, rear_dist, rear_aircraft
relative_distance += link.length
raise NoCloseAircraftFoundError
# def __find_aircraft_ahead(self, aircraft):
# link_index, link_distance = aircraft.itinerary.current_target_index, aircraft.itinerary.current_distance
# link = aircraft.itinerary.targets[link_index]
# aircraft_on_same_link = self.aircraft_location_lookup.get(link, [])
# relative_distance = -link_distance
# for item in aircraft_on_same_link:
# item_aircraft, item_distance = item
# # Skip if the item is behind the aircraft
# if item_distance <= link_distance:
# continue
# # Found an aircraft ahead!
# relative_distance += item_distance
# if relative_distance <= self.PILOT_VISION:
# return item_aircraft.speed, relative_distance, item_aircraft
# for index in range(link_index + 1, aircraft.itinerary.length):
# link = aircraft.itinerary.targets[index]
# aircraft_on_same_link = self.aircraft_location_lookup.get(link, [])
# rear_aircraft, rear_dist = None, -1
# for item in aircraft_on_same_link:
# item_aircraft, item_distance = item
# if rear_aircraft is None:
# rear_aircraft, rear_dist = item_aircraft, item_distance
# continue
# if item_distance < rear_dist:
# rear_aircraft, rear_dist = item_aircraft, item_distance
# if rear_aircraft is not None:
# return rear_aircraft.speed, rear_dist, rear_aircraft
# raise NoCloseAircraftFoundError
# relative_distance = -link_distance
# for index in range(link_index, aircraft.itinerary.length):
# link = aircraft.itinerary.targets[index]
# aircraft_on_same_link = self.aircraft_location_lookup.get(link, [])
# for item in aircraft_on_same_link:
# item_aircraft, item_distance = item
# # Skip if the item is behind the aircraft
# if index == link_index and item_distance <= link_distance:
# continue
# # Found an aircraft ahead!
# relative_distance += item_distance
# if relative_distance < self.CLOSE_NODE_THRESHOLD:
# self.conflicts.append((aircraft, item_aircraft))
# if relative_distance > self.PILOT_VISION:
# # Too far that the pilot can't see the aircraft
# raise NoCloseAircraftFoundError
# else:
# return item_aircraft.speed, relative_distance, item_aircraft
# relative_distance += link.length
# raise NoCloseAircraftFoundError
def __resolve_conflicts(self):
"""
If two aircraft on two links would enter the same link using their current speed, we see a potential conflict.
Send command to one of the pilots to wait there.
Will call Aircraft.Pilot.Slowdown() or something alike.
"""
priority_list = self.ground.priority
# TODO: compare the departure time and pioritize the one with longer delay
for aircraft_1, aircraft_2 in self.conflicts:
aircraft_2.set_fronter_info((-1, -1))
def get_closest_aircraft_ahead(self, aircraft):
return self.aircraft_ahead_lookup.get(aircraft, None)
class NoCloseAircraftFoundError(Exception):
pass