-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulation.py
117 lines (102 loc) · 4.72 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from time_flow import TimeFlow
from time_class import Time
from intersection import Intersection
from events import CarArrival, LightsPhase
import numpy as np
class Simulation:
def __init__(self, expected_interval, lights_enabled, green_duration):
self.expected_interval = expected_interval
self.lights_enabled = lights_enabled
self.green_duration = green_duration
self.time = TimeFlow(lights_enabled=lights_enabled)
self.intersection = Intersection(parent_object=self,
expected_interval=expected_interval,
lights_enabled=lights_enabled,
green_duration=green_duration)
self.start_simulation()
def assign_child_object(self, object):
"""Dummy method for child objects compatibility"""
pass
def check_end(self):
"""Function checks if simulation end conditions are fulfilled"""
if self.time.check_end_condition():
return True
return False
def get_cars_awaiting(self):
"""Method returns all cars which are waiting in queues"""
return self.intersection.get_awaiting_cars()
def get_cars_created(self):
"""
Method returns all cars created during simulation
Does not include cars yet to arrive
"""
return [car for car in self.intersection.get_all_cars() if car.has_arrived()]
def get_cars_departed(self):
"""Method returns all cars which crossed the intersection"""
return self.intersection.get_departed_cars()
def get_cars_avg_time_in_system(self):
all_cars = self.get_cars_created()
if all_cars == []:
return Time()
average_time_in_system = np.average(np.array([car.calculate_time_in_system().convert_to_seconds() for car in all_cars]))
average_time_in_system = Time(seconds=average_time_in_system)
return average_time_in_system
def get_cars_avg_wait_time(self):
all_cars = self.get_cars_created()
average_wait_time = np.average(np.array([car.calculate_wait_time().convert_to_seconds() for car in all_cars]))
return Time(seconds=average_wait_time)
def get_current_time(self):
return self.time.get_current_time()
def get_expected_interval_times(self):
"""Method returns expected intervals times for all streams"""
streams = self.intersection.get_streams()
return [stream.get_expected_interval() for stream in streams]
def print_stats(self):
crossing_cars = self.intersection.get_crossing_cars()
print(f"Lights enabled : {str(self.lights_enabled)}")
print(f"Expected arrival times : {str(self.get_expected_interval_times())}")
print(f"Number of cars created : {len(self.get_cars_created())}")
print(f"Number of cars departed : {len(self.get_cars_departed())}")
print(f"Number of cars awaiting : {len(self.get_cars_awaiting())}")
print(f"Average wait time : {self.get_cars_avg_wait_time().convert_to_text()}")
print(f"Average time in system : {self.get_cars_avg_time_in_system().convert_to_text()}")
def get_results(self):
"""
Method returns simulation results as dictionary.
Contains:
'cars_created', 'cars_departed', 'cars_awaiting',
'avg_wait_time' and 'avg_time_in_system'.
"""
results = {
'cars_created': len(self.get_cars_created()),
'cars_departed': len(self.get_cars_departed()),
'cars_awaiting': len(self.get_cars_awaiting()),
'avg_wait_time': self.get_cars_avg_wait_time(),
'avg_time_in_system': self.get_cars_avg_time_in_system()
}
return results
def start_simulation(self):
"""
Functions handles simualtion start
Schedules first car for each stream.
Starts first traffic lights cycle.
"""
self.streams = self.intersection.get_streams()
for _stream in self.streams:
CarArrival(time_flow=self.time, stream=_stream)
# Start traffic lights cycles
LightsPhase(intersection=self.intersection, time_flow=self.time,
lights_remaining=[])
# Turn on first light
self.intersection.get_lights()[0].switch_lights(state=True)
self.run_simulation()
def run_simulation(self):
"""Function handles simulation flow"""
# Check end conditions
while not self.check_end():
# Try to advance time. If failed simulation ended
success = self.time.advance_time()
if not success:
return False
# Execute planned events
self.time.execute_events()