-
Notifications
You must be signed in to change notification settings - Fork 0
/
storage.py
440 lines (356 loc) · 17.6 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
#!/usr/bin/env python
import argparse
import configparser
import logging
import random
import re
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from random import expovariate
from typing import Optional, List
# the humanfriendly library (https://humanfriendly.readthedocs.io/en/latest/) lets us pass parameters in human-readable
# format (e.g., "500 KiB" or "5 days"). You can safely remove this if you don't want to install it on your system, but
# then you'll need to handle sizes in bytes and time spans in seconds--or write your own alternative.
# It should be trivial to install (e.g., apt install python3-humanfriendly or conda/pip install humanfriendly).
from humanfriendly import format_timespan, parse_size, parse_timespan
from discrete_event_sim import Simulation, Event
def exp_rv(mean):
"""Return an exponential random variable with the given mean."""
return expovariate(1 / mean)
class DataLost(Exception):
"""Not enough redundancy in the system, data is lost. We raise this exception to stop the simulation."""
pass
class Backup(Simulation):
"""Backup simulation."""
def __init__(self, nodes: List['Node']):
super().__init__()
self.nodes = nodes
# we add to the event queue the first event of each node going online and of failing
for node in nodes:
self.schedule(node.arrival_time, Online(node))
self.schedule(node.arrival_time + exp_rv(node.average_lifetime), Fail(node)) #defines randomly when the node fails
def schedule_transfer(self, uploader: 'Node', downloader: 'Node', block_id: int, restore: bool):
"""Helper function called by `Node.schedule_next_upload` and `Node.schedule_next_download`.
If `restore` is true, we are restoring a block owned by the downloader,
otherwise, we are saving one owned by the uploader.
"""
block_size = downloader.block_size if restore else uploader.block_size
assert uploader.current_upload is None
assert downloader.current_download is None
speed = min(uploader.upload_speed, downloader.download_speed) # we take the slowest between the two
delay = block_size / speed
if restore:
event = BlockRestoreComplete(uploader, downloader, block_id)
else:
event = BlockBackupComplete(uploader, downloader, block_id)
self.schedule(delay, event)
uploader.current_upload = downloader.current_download = event
self.log_info(f"scheduled {event.__class__.__name__} from {uploader} to {downloader}"
f" in {format_timespan(delay)}")
def log_info(self, msg):
"""Override method to get human-friendly logging for time."""
logging.info(f'{format_timespan(self.t)}: {msg}')
@dataclass(eq=False) # auto initialization from parameters below (won't consider two nodes with same state as equal)
class Node:
"""Class representing the configuration of a given node."""
# using dataclass is (for our purposes) equivalent to having something like
# def __init__(self, description, n, k, ...):
# self.n = n
# self.k = k
# ...
# self.__post_init__() # if the method exists
name: str # the node's name
n: int # number of blocks in which the data is encoded
k: int # number of blocks sufficient to recover the whole node's data
data_size: int # amount of data to back up (in bytes)
storage_size: int # storage space devoted to storing remote data (in bytes)
upload_speed: float # node's upload speed, in bytes per second
download_speed: float # download speed
average_uptime: float # average time spent online
average_downtime: float # average time spent offline
average_lifetime: float # average time before a crash and data loss
average_recover_time: float # average time after a data loss
arrival_time: float # time at which the node will come online
without_access: str
def __post_init__(self):
"""Compute other data dependent on config values and set up initial state."""
# whether this node is online. All nodes start offline.
self.online: bool = False
# whether this node is currently under repairs. All nodes are ok at start.
self.failed: bool = False
# size of each block
self.block_size: int = self.data_size // self.k if self.k > 0 else 0
# amount of free space for others' data -- note we always leave enough space for our n blocks
self.free_space: int = self.storage_size - self.block_size * self.n
assert self.free_space >= 0, "Node without enough space to hold its own data"
# local_blocks[block_id] is true if we locally have the local block
# [x] * n is a list with n references to the object x
self.local_blocks: list[bool] = [True] * self.n
# backed_up_blocks[block_id] is the peer we're storing that block on,
# or None if it's not backed up yet; # we start with no blocks backed up
self.backed_up_blocks: list[Optional[Node]] = [None] * self.n
# (owner -> block_id) mapping for remote blocks stored
self.remote_blocks_held: dict[Node, int] = {}
# current uploads and downloads, stored as a reference to the relative TransferComplete event
self.current_upload: Optional[TransferComplete] = None
self.current_download: Optional[TransferComplete] = None
try:
self.access_denied = eval(self.without_access)
except SyntaxError:
print("sintax error in configuration file, without_access must be a list")
exit()
#TODO
def find_block_to_back_up(self):
"""Returns the block id of a block that needs backing up, or None if there are none."""
# find a block that we have locally but not remotely
# check `enumerate` and `zip`at https://docs.python.org/3/library/functions.html
for block_id, (held_locally, peer) in enumerate(zip(self.local_blocks, self.backed_up_blocks)):
if held_locally and peer is None:
return block_id
return None
#TODO
def schedule_next_upload(self, sim: Backup):
"""Schedule the next upload, if any."""
assert self.online
if self.current_upload is not None:
return
# first find if we have a backup that a remote node needs
for peer, block_id in self.remote_blocks_held.items():
# if the block is not present locally and the peer is online, if the node is accessible and not downloading anything currently, then
# schedule the restore from self to peer of block_id
if peer.online and peer.current_download is None and not peer.local_blocks[block_id]:
sim.schedule_transfer(self, peer, block_id, restore=True)
return # we have found our upload, we stop
# try to back up a block on a locally held remote node
block_id = self.find_block_to_back_up()
if block_id is None:
return
sim.log_info(f"{self} is looking for somebody to back up block {block_id}")
remote_owners = set(node for node in self.backed_up_blocks if node is not None) # nodes having one block
for peer in sim.nodes:
# if the peer is not self, is online, is not among the remote owners, has enough space and is not
# downloading anything currently, schedule the backup of block_id from self to peer
if (peer is not self and peer.online and peer not in remote_owners and peer.current_download is None
and peer.free_space >= self.block_size
and re.sub('\-(.*)','',peer.name) not in self.access_denied
):
sim.schedule_transfer(self, peer, block_id, restore=False)
return
#TODO
def schedule_next_download(self, sim: Backup):
"""Schedule the next download, if any."""
assert self.online
if self.current_download is not None:
return
# first find if we have a missing block to restore
for block_id, (held_locally, peer) in enumerate(zip(self.local_blocks, self.backed_up_blocks)):
if not held_locally and peer is not None and peer.online and peer.current_upload is None:
sim.schedule_transfer(peer, self, block_id, restore=True)
sim.log_info(f"schedule_next_download on {self}")
return # we are done in this case
# try to back up a block for a remote node
for peer in sim.nodes:
if (peer is not self and peer.online and peer.current_upload is None and peer not in self.remote_blocks_held
and self.free_space >= peer.block_size
and re.sub('\-(.*)','',peer.name) not in self.access_denied
):
block_id = peer.find_block_to_back_up()
if block_id is not None:
sim.schedule_transfer(peer, self, block_id, restore=False)
sim.log_info(f"schedule_next_download on {self}")
return
def __hash__(self):
"""Function that allows us to have `Node`s as dictionary keys or set items.
With this implementation, each node is only equal to itself.
"""
return id(self)
def __str__(self):
"""Function that will be called when converting this to a string (e.g., when logging or printing)."""
return self.name
@dataclass
class NodeEvent(Event):
"""An event regarding a node. Carries the identifier, i.e., the node's index in `Backup.nodes_config`"""
node: Node
def process(self, sim: Simulation):
"""Must be implemented by subclasses."""
raise NotImplementedError
#TODO
class Online(NodeEvent):
"""A node goes online."""
def process(self, sim: Backup):
node = self.node
if node.online or node.failed: #if this node is failed it will wait Recover
return
node.online = True
# schedule next upload and download
node.schedule_next_upload(sim)
node.schedule_next_download(sim)
# schedule the next offline event
sim.schedule(exp_rv(node.average_uptime), Offline(node))
class Recover(Online):
"""A node goes online after recovering from a failure."""
def process(self, sim: Backup):
node = self.node
sim.log_info(f"{node} recovers")
node.failed = False
super().process(sim)
sim.schedule(exp_rv(node.average_lifetime), Fail(node))
node.free_space = node.storage_size - node.block_size * node.n
class Disconnection(NodeEvent):
"""Base class for both Offline and Fail, events that make a node disconnect."""
def process(self, sim: Simulation):
"""Must be implemented by subclasses."""
raise NotImplementedError
def disconnect(self):
node = self.node
node.online = False
# cancel current upload and download
# retrieve the nodes we're uploading and downloading to and set their current downloads and uploads to None
current_upload, current_download = node.current_upload, node.current_download
if current_upload is not None:
current_upload.canceled = True
current_upload.downloader.current_download = None
node.current_upload = None
if current_download is not None:
current_download.canceled = True
current_download.uploader.current_upload = None
node.current_download = None
class Offline(Disconnection):
"""A node goes offline."""
def process(self, sim: Backup):
node = self.node
if node.failed or not node.online:
return
assert node.online
self.disconnect()
# schedule the next online event
sim.schedule(exp_rv(self.node.average_downtime), Online(node))
class Fail(Disconnection):
"""A node fails and loses all local data."""
def process(self, sim: Backup):
# a offline node can have a failure
sim.log_info(f"{self.node} fails")
self.disconnect()
node = self.node
node.failed = True
node.local_blocks = [False] * node.n # lose all local data
# lose all remote data (other nodes' backup)
for owner, block_id in node.remote_blocks_held.items():
owner.backed_up_blocks[block_id] = None
if owner.online and owner.current_upload is None:
owner.schedule_next_upload(sim)
node.remote_blocks_held.clear()
# schedule the next online and recover events
recover_time = exp_rv(node.average_recover_time)
sim.schedule(recover_time, Recover(node))
@dataclass
class TransferComplete(Event):
"""An upload is completed."""
uploader: Node
downloader: Node
block_id: int
canceled: bool = False
def __post_init__(self):
assert self.uploader is not self.downloader
def process(self, sim: Backup):
sim.log_info(f"{self.__class__.__name__} from {self.uploader} to {self.downloader}")
if self.canceled:
return # this transfer was canceled, so ignore this event
uploader, downloader = self.uploader, self.downloader
assert uploader.online and downloader.online
self.update_block_state()
uploader.current_upload = downloader.current_download = None
uploader.schedule_next_upload(sim)
downloader.schedule_next_download(sim)
for node in [uploader, downloader]:
sim.log_info(f"{node}: {sum(node.local_blocks)} local blocks, "
f"{sum(peer is not None for peer in node.backed_up_blocks)} backed up blocks, "
f"{len(node.remote_blocks_held)} remote blocks held")
def update_block_state(self):
"""Needs to be specified by the subclasses, `BackupComplete` and `DownloadComplete`."""
raise NotImplementedError
class BlockBackupComplete(TransferComplete):
def update_block_state(self):
owner, peer = self.uploader, self.downloader
peer.free_space -= owner.block_size
assert peer.free_space >= 0
owner.backed_up_blocks[self.block_id] = peer
peer.remote_blocks_held[owner] = self.block_id
#TODO
class BlockRestoreComplete(TransferComplete):
def update_block_state(self):
owner = self.downloader
owner.local_blocks[self.block_id] = True
if sum(owner.local_blocks) == owner.k: # we have exactly k local blocks, we have all of them then
owner.local_blocks = [True] * owner.n
# count block that get lost during the simulation
def lostBlocks(nodes, sim):
global total_blocks
total_blocks = 0
lost_bloks = 0
# print the state of every node
for node in nodes:
sim.log_info(f"{node}: {sum(node.local_blocks)} local blocks, {sum(peer is not None for peer in node.backed_up_blocks)} backed up blocks,"
f"{len(node.remote_blocks_held)} remote blocks held online {node.online}, failed {node.failed}") #DEBUG
for i in range(node.n):
total_blocks += 1
if node.local_blocks[i] == False and node.backed_up_blocks[i] is None:
lost_bloks += 1
print(f"lost blocks = {lost_bloks} of {total_blocks}\n") #DEBUG
return lost_bloks
# create histogram of lost blocks
def createHistogram(lost):
vals, bins = np.histogram([], bins = list(range(n_test+1)))
for i in range(n_test):
vals[i] = lost[i]*100/total_blocks
_,ax = plt.subplots(1,1)
# plot histogram values as bar chart
ax.bar(bins[:-1] + 1/2, vals, 1)
ax.set_title("Number of blocks lost")
ax.set_xticks(list(range(0, n_test+5, 5)))
ax.set_yticks(list(range(0, 110, 10)))
ax.set_xlabel('simulations')
ax.set_ylabel('no. of lost blocks')
plt.show()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("config", help="configuration file")
parser.add_argument("--max-t", default="100 years")
parser.add_argument("--seed", help="random seed")
parser.add_argument("--verbose", action='store_true')
args = parser.parse_args()
if args.seed:
random.seed(args.seed) # set a seed to make experiments repeatable
if args.verbose:
logging.basicConfig(format='{levelname}:{message}', level=logging.INFO, style='{') # output info on stdout
# functions to parse every parameter of peer configuration
parsing_functions = [
('n', int), ('k', int),
('data_size', parse_size), ('storage_size', parse_size),
('upload_speed', parse_size), ('download_speed', parse_size),
('average_uptime', parse_timespan), ('average_downtime', parse_timespan),
('average_lifetime', parse_timespan), ('average_recover_time', parse_timespan),
('arrival_time', parse_timespan), ('without_access', str),
]
config = configparser.ConfigParser()
config.read(args.config)
nodes = [] # we build the list of nodes to pass to the Backup class
for node_class in config.sections():
class_config = config[node_class]
#list that contains every info of the current node class
cfg = [parse(class_config[name]) for name, parse in parsing_functions]
# the `callable(p1, p2, *args)` idiom is equivalent to `callable(p1, p2, args[0], args[1], ...)
# istantiate "number" Nodes of type node_class
nodes.extend(Node(f"{node_class}-{i}", *cfg) for i in range(class_config.getint('number')))
sim = Backup(nodes)
sim.run(parse_timespan(args.max_t))
sim.log_info(f"Simulation over")
return lostBlocks(nodes, sim)
n_test = 40
if __name__ == '__main__':
lost = []
for i in range(n_test):
print(i)
lost.append(main())
createHistogram(lost)