-
Notifications
You must be signed in to change notification settings - Fork 9
/
router.py
71 lines (57 loc) · 1.94 KB
/
router.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import logging
from abc import ABC, abstractmethod
from simulator import clock, schedule_event, cancel_event, reschedule_event
class Router(ABC):
"""
Router routes Requests to Application Schedulers.
"""
def __init__(self,
cluster,
overheads):
self.cluster = cluster
self.overheads = overheads
self.applications = []
self.schedulers = {}
# request queues
self.pending_queue = []
self.executing_queue = []
self.completed_queue = []
def add_application(self, application):
self.applications.append(application)
self.schedulers[application.application_id] = application.scheduler
def run(self):
pass
@abstractmethod
def route(self, *args):
"""
Main routing logic
"""
raise NotImplementedError
def request_arrival(self, request):
request.arrive_at_router()
self.pending_queue.append(request)
self.route_request(request)
def request_completion(self, request):
request.complete_at_router()
self.executing_queue.remove(request)
self.completed_queue.append(request)
def route_request(self, request):
self.route(request)
self.pending_queue.remove(request)
self.executing_queue.append(request)
def save_results(self):
#results = []
#for request in self.completed_queue:
# times = request.time_per_instance_type()
# results.append(times)
#utils.save_dict_as_csv(results, "router.csv")
pass
class NoOpRouter(Router):
"""
Forwards request to the appropriate scheduler without any overheads.
"""
def route(self, request):
scheduler = self.schedulers[request.application_id]
f = lambda scheduler=scheduler,request=request: \
scheduler.request_arrival(request)
schedule_event(self.overheads.routing_delay, f)