-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathServ.py
306 lines (251 loc) · 9.66 KB
/
Serv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
from threading import Thread
from xmlrpc.server import SimpleXMLRPCServer
from Api import Api
from Clock import Clock
import time
import random
from Util import *
import threading
from collections import deque
from socketserver import ThreadingMixIn
class SimpleThreadedXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
pass
class Serv(Thread):
def __init__(self, port):
Thread.__init__(self)
ip = socket.gethostbyname(socket.gethostname())
self.server = SimpleThreadedXMLRPCServer((ip, port), allow_none=True)
self.server.register_introspection_functions()
self.server.register_multicall_functions()
self.server.register_instance(Api(self), allow_dotted_names=True)
self.ctr = 0
self.addr = ip + ":" + str(port)
self.net_members = [self.addr]
self.string_queue = deque()
self.status = False
self.master_string = ""
self.master_address = ""
self.is_interested_in_master_string = False
self.is_victory_broadcasted = False
self.clock = Clock(self.addr)
self.wordListToCheck = []
print("Serv created")
def run(self):
self.server.serve_forever()
print("Serv started")
def incr(self):
self.ctr += 1
print(self.ctr)
def get_clock(self):
return self.clock
def leave(self):
self.net_members.remove(self.addr)
self.set_status(False)
for node in self.net_members:
get_node_by_addr(node).deleteNode(self.addr)
if(self.master_address == self.addr and len(self.net_members) > 1):
get_node_by_addr(self.net_members[0]).runBully()
for node in self.net_members:
get_node_by_addr(node).setVictoryBroadcasted(False)
self.net_members = [self.addr]
self.master_address = ""
self.masterString = ""
self.is_victory_broadcasted = False;
def join_net(self, addr):
if addr.startswith("127.0.0.1") or addr.startswith("localhost"):
asp = addr.split(":")
ip = socket.gethostbyname(socket.gethostname())
addr = ip + ":" + asp[1]
destApi = get_node_by_addr(addr)
for node in destApi.getNetworkMembers():
if node not in self.net_members:
self.net_members.append(node)
for node in self.net_members:
if self.addr != node:
get_node_by_addr(node).appendNode(self.addr)
if destApi.getMasterAddress() == "":
self.run_bully()
for node in self.net_members:
get_node_by_addr(node).setVictoryBroadcasted(False)
else:
self.master_address = destApi.getMasterAddress()
def set_status(self, st):
self.status = st
def get_status(self):
return self.status
def get_net_members(self):
return self.net_members
def set_net_members(self, members):
self.net_members = members
def append_node(self, addr):
self.status = True
if addr not in self.net_members:
print("Adding node " + addr)
self.net_members.append(addr)
def delete_node(self, address):
if address in self.net_members:
self.net_members.remove(address)
if len(self.net_members) == 1:
self.status = False
def get_master_string(self):
if self.master_address == self.addr:
return self.master_string
else:
return get_node_by_addr(self.master_address).getMasterString()
def get_master_address(self):
return self.master_address
def have_fun(self, isAgrawala):
self.wordListToCheck = []
print("Having fun")
master = get_node_by_addr(self.master_address)
stTime = time.time()
while (time.time() - stTime) < 20:
time.sleep(random.randint(1,20))
w = get_random_word()
self.wordListToCheck.append(w)
if not isAgrawala:
print("Sent request for access critical section")
master.appendMasterStringRequest(self.addr)
print("Entering critical section")
ms = master.getMasterString()
print("Got master string " + ms)
master.setMasterString(ms+w)
print("Appending word " + w)
master.appendMasterStringRelease(self.addr)
print("Left critical section")
else:
self.agrawala_append_string_request(w)
def adventure_time(self, isAgrawala):
print("Adventure time!")
self.check_master_avaliability()
threads = []
for node in self.net_members:
a = get_node_by_addr(node)
t = threading.Thread(target=a.haveFun, args=(isAgrawala,))
threads.append(t)
t.start()
for t in threads:
t.join()
for node in self.net_members:
a = get_node_by_addr(node)
a.checkAppend()
def checkAppend(self):
ms = self.get_master_string()
print("Node: " + self.addr + ", master string: " + ms);
failedWords = []
for word in self.wordListToCheck:
if word not in ms:
failedWords.append(word)
if len(failedWords) == 0:
print("All the appended words are in the master string: " + ','.join(self.wordListToCheck))
else:
print("The following words were not appended into master string " + ','.join(failedWords))
def run_bully(self):
electionSleepTime = 1
isThereAnybodyHere = False
threads = []
for node in self.net_members:
if node > self.addr:
try:
a = get_node_by_addr(node)
t1 = threading.Thread(target=a.runBully)
t1.start()
threads.append(t1)
isThereAnybodyHere = True
except ValueError as e:
print(e)
for t in threads:
t.join()
if isThereAnybodyHere:
time.sleep(electionSleepTime)
if not self.is_victory_broadcasted:
self.run_bully()
else:
for node in self.net_members:
get_node_by_addr(node).setVictoryBroadcasted(True)
self.set_master(self.addr)
def election_broadcast(self, source_addr):
print("Broadcasting elections")
threading.Thread(target=self.run_bully).start()
return True
def set_victory_broadcasted(self, status):
print("broadcasting victory")
self.is_victory_broadcasted = status
def agrawala_append_string_request(self, word):
self.clock.tick()
threads = []
for node in self.net_members:
if not self.addr == node:
a = get_node_by_addr(node)
self.clock.tick()
cl = self.clock.get_val()
t = threading.Thread(target=a.agrawalaAppendHandle, args=(cl,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("Node is in critical section")
self.clock.tick()
master = get_node_by_addr(self.master_address)
print("Want to append word " + word)
self.clock.tick()
ms = master.getMasterString()
print("Got master string:" + ms)
self.clock.tick()
master.setMasterString(ms+word)
self.clock.tick()
self.is_interested_in_master_string = False
print("Left critical section")
self.clock.tick()
def agrawala_append_handler(self, timestamp):
self.clock.tick()
if self.clock.get_val().get("val") < timestamp.get("val"):
self.clock.set_clock(timestamp.get("val") + 1)
else:
self.clock.tick()
while self.is_interested_in_master_string and self.clock.compareTo(timestamp) < 0:
time.sleep(1)
self.clock.tick()
def append_master_string_request(self, source_addr):
self.string_queue.appendleft(source_addr)
while self.string_queue[-1] != source_addr:
time.sleep(1)
def set_master_string(self, s):
self.master_string = s
def append_master_string(self, s):
self.master_string += s
def set_master_addr(self, addr):
self.master_address = addr
def set_master(self, newIp):
print("New master elected with addr " + newIp)
#get old params
oldString = ""
try:
oldMaster = get_node_by_addr(self.master_address)
oldString = oldMaster.getMasterString()
oldMaster.setMasterString("")
except:
pass
#find new master
newMaster = get_node_by_addr(newIp)
newMaster.setMasterString(oldString)
#set master ip for others
for node in self.net_members:
get_node_by_addr(node).setMasterAddress(newIp)
#TODO may be it is worth setting isVictoryBroadcasted to false here and in Java
def get_addr(self):
return self.addr
def check_master_avaliability(self):
#checking if master is alive
try:
get_node_by_addr(self.master_address).getStatus()
except:
self.net_members.remove(self.master_address)
for node in self.net_members:
get_node_by_addr(node).deleteNode(self.master_address)
self.run_bully()
def get_word_list_to_check(self):
return self.wordListToCheck
def apppend_master_string_release(self, sourceAddr):
if self.string_queue[-1] == sourceAddr: #todo check popping queue
self.string_queue.pop()