-
Notifications
You must be signed in to change notification settings - Fork 5
/
meta_module.py
487 lines (373 loc) · 17.7 KB
/
meta_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# Copyright (c) Charl P. Botha, TU Delft.
# All rights reserved.
# See COPYRIGHT for details.
import counter
class MetaModule:
"""Class used to store module-related information.
Every instance is contained in a single MetaModule. This is why the
cycle-proof module split has not been implemented as MetaModules.
@todo: at the moment, some interfaces work with a real module instance
as well as a MetaModule. Should be consistent and use all MetaModules.
@todo: document all timestamp logic here, link to scheduler
documentation
@author: Charl P. Botha <http://cpbotha.net/>
"""
def __init__(self, instance, instance_name, module_name,
partsToInputs=None, partsToOutputs=None):
"""Instance is the actual class instance and instance_name is a unique
name that has been chosen by the user or automatically.
@param module_name: the full spec of the module of which the instance
is encapsulated by this meta_module, for example
modules.filters.blaat. One could also get at this with
instance.__class__.__module__, but that relies on the convention that
the name of the module and contained class is the same.
"""
if instance is None:
raise Exception(
'instance is None during MetaModule instantiation.')
self.instance = instance
self.instance_name = instance_name
self.module_name = module_name
# init blocked ivar
self.blocked = False
# determine number of module parts based on parts to indices mappings
maxPart = 0
if not partsToInputs is None:
maxPart = max(partsToInputs.keys())
if not partsToOutputs is None:
max2 = max(partsToOutputs.keys())
maxPart = max(maxPart, max2)
self.numParts = maxPart + 1
# number of parts has been determined ###############################
# time when module was last brought up to date
# default to 0.0; that will guarantee an initial execution
self.execute_times = self.numParts * [0]
# we use this to record all the times the hybrid scheduler
# touches a streaming module but does not call its
# streaming_execute (because it's NOT terminating)
# this is necessary for the transfer output caching to work;
# we compare touch time to output transfer time
self.streaming_touch_times = self.numParts * [0]
# time when module was last invalidated (through parameter changes)
# default is current time. Along with 0.0 executeTime, this will
# guarantee initial execution.
self.modifiedTimes = self.numParts * [counter.counter()]
# derive partsTo dictionaries #######################################
self._inputsToParts = {}
if not partsToInputs is None:
for part, inputs in partsToInputs.items():
for inp in inputs:
self._inputsToParts[inp] = part
self._outputsToParts = {}
if not partsToOutputs is None:
for part, outputs in partsToOutputs.items():
for outp in outputs:
self._outputsToParts[outp] = part
# partsTo dicts derived ############################################
# to the time when data was last transferred from the encapsulated
# instance through this path
self.transferTimes = {}
# dict containing timestamps when last transfer was done from
# the encapsulated module to each of its consumer connections
self.streaming_transfer_times = {}
# this will create self.inputs, self.outputs
self.reset_inputsOutputs()
def close(self):
del self.instance
del self.inputs
del self.outputs
def view_to_config(self):
"""Wrapper method that transfers info from the module view to its
config data structure. The method also takes care of updating the
modified time if necessary.
"""
res = self.instance.view_to_config()
if res is None or res:
must_modify = True
else:
must_modify = False
if must_modify:
for part in range(self.numParts):
self.modify(part)
def config_to_logic(self):
"""Wrapper method that transfers info from the module config to its
underlying logic. The method also takes care of updating the
modified time if necessary.
"""
res = self.instance.config_to_logic()
if res is None or res:
must_modify = True
else:
must_modify = False
if must_modify:
for part in range(self.numParts):
self.modify(part)
def applyViewToLogic_DEPRECATED(self):
"""Transfer information from module view to its underlying logic
(model) and all the way back up.
The reason for the two-way transfer is so that other logic-linked
view variables get the opportunity to update themselves. This method
will also take care of adapting the modifiedTime.
At the moment this is only called by the event handlers for the
standard ECASH interface devices.
"""
vtc_res = self.instance.view_to_config()
ctl_res = self.instance.config_to_logic()
mustModify = True
if vtc_res is None and ctl_res is None:
# this is an old-style module, we assume that it's made changes
mustModify = True
elif not vtc_res and not ctl_res:
# this means both are false, i.e. NO changes were made to
# the config and no changes were made to the logic... this
# means we need not modify
mustModify = False
else:
# all other cases (for a new style module) means we have to mod
mustModify = True
if mustModify:
# set modified time to now
for part in range(self.numParts):
self.modify(part)
self.instance.logic_to_config()
self.instance.config_to_view()
def findConsumerInOutputConnections(
self, output_idx, consumerInstance, consumerInputIdx=-1):
"""Find the given consumer module and its input index in the
list for the given output index.
@param consumerInputIdx: input index on consumer module. If this is
-1, the code will only check for the correct consumerInstance and
will return the first occurrence.
@return: index of given instance if found, -1 otherwise.
"""
ol = self.outputs[output_idx]
found = False
for i in range(len(ol)):
ci, cii = ol[i]
if ci == consumerInstance and \
(consumerInputIdx == -1 or cii == consumerInputIdx):
found = True
break
#import pdb
#pdb.set_trace()
if found:
return i
else:
return -1
def getPartForInput(self, input_idx):
"""Return module part that takes input input_idx.
"""
if self.numParts > 1:
return self._inputsToParts[input_idx]
else:
return 0
def getPartForOutput(self, output_idx):
"""Return module part that produces output output_idx.
"""
if self.numParts > 1:
return self._outputsToParts[output_idx]
else:
return 0
def connectInput(self, input_idx, producerModule, producerOutputIdx):
"""Record connection on the specified input_idx.
This is one half of recording a complete connection: the supplier
module should also record the connection of this consumer.
@raise Exception: if input is already connected.
@return: Nothing.
"""
# check that the given input is not already connected
if self.inputs[input_idx] is not None:
raise Exception, \
"%d'th input of module %s already connected." % \
(input_idx, self.instance.__class__.__name__)
# record the input connection
self.inputs[input_idx] = (producerModule, producerOutputIdx)
def disconnectInput(self, input_idx):
"""Record disconnection on the given input of the encapsulated
instance.
@return: Nothing.
"""
# if this is a new disconnect, we have to register that the
# module is now modified. For connected data transfers, this
# is done by the scheduler during network execution (when the
# data is actually transferred), but for disconnects, we have
# to modify the module immediately.
if not self.inputs[input_idx] is None:
part = self.getPartForInput(input_idx)
self.modify(part)
self.inputs[input_idx] = None
def connectOutput(self, output_idx, consumerInstance, consumerInputIdx):
"""Record connection on the given output of the encapsulated module.
@return: True if connection recorded, False if not (for example if
connection already exists)
"""
if self.findConsumerInOutputConnections(
output_idx, consumerInstance, consumerInputIdx) >= 0:
# this connection has already been made, bail.
return
# do the connection
ol = self.outputs[output_idx]
ol.append((consumerInstance, consumerInputIdx))
# this is a new connection, so set the transfer times to 0
self.transferTimes[
(output_idx, consumerInstance, consumerInputIdx)] = 0
# also reset streaming transfers
self.streaming_transfer_times[
(output_idx, consumerInstance, consumerInputIdx)] = 0
def disconnectOutput(self, output_idx, consumerInstance, consumerInputIdx):
"""Record disconnection on the given output of the encapsulated module.
"""
# find index of the given consumerInstance and consumerInputIdx
# in the list of consumers connected to producer port output_idx
cidx = self.findConsumerInOutputConnections(
output_idx, consumerInstance, consumerInputIdx)
# if this is a valid index, nuke it
if cidx >= 0:
ol = self.outputs[output_idx]
del ol[cidx]
# also remove the relevant slot from our transferTimes
del self.transferTimes[
(output_idx, consumerInstance, consumerInputIdx)]
del self.streaming_transfer_times[
(output_idx, consumerInstance, consumerInputIdx)]
else:
# consumer not found, the connection didn't exist
raise Exception, \
"Attempt to disconnect output which isn't connected."
def reset_inputsOutputs(self):
numIns = len(self.instance.get_input_descriptions())
numOuts = len(self.instance.get_output_descriptions())
# numIns list of tuples of (supplierModule, supplierOutputIdx)
# if the input is not connected, that position in the list is None
# supplierModule is a module instance, not a MetaModule
self.inputs = [None] * numIns
# numOuts list of lists of tuples of (consumerModule,
# consumerInputIdx); consumerModule is an instance, not a MetaModule
# be careful with list concatenation, it makes copies, which are mostly
# shallow!!!
self.outputs = [[] for _ in range(numOuts)]
def streaming_touch_timestamp_module(self, part=0):
"""
@todo: should change the name of this timestamp.
"""
self.streaming_touch_times[part] = counter.counter()
print "streaming touch stamped:", self.streaming_touch_times[part]
def execute_module(self, part=0, streaming=False):
"""Used by ModuleManager to execute module.
This method also takes care of timestamping the execution time if
execution was successful.
"""
if self.instance:
# this is the actual user function.
# if something goes wrong, an exception will be thrown and
# correctly handled by the invoking module manager
if streaming:
if part == 0:
self.instance.streaming_execute_module()
else:
self.instance.streaming_execute_module(part)
self.execute_times[part] = counter.counter()
print "streaming exec stamped:", self.execute_times[part]
else:
if part == 0:
self.instance.execute_module()
else:
self.instance.execute_module(part)
# if we get here, everything is okay and we can record
# the execution time of this part
self.execute_times[part] = counter.counter()
print "exec stamped:", self.execute_times[part]
def modify(self, part=0):
"""Used by the ModuleManager to timestamp the modified time.
This should be called whenever module state has changed in such a way
as to invalidate the current state of the module. At the moment,
this is called by L{applyViewToLogic()} as well as by the
ModuleManager.
@param part: indicates the part that has to be modified.
"""
self.modifiedTimes[part] = counter.counter()
def shouldExecute(self, part=0):
"""Determine whether the encapsulated module needs to be executed.
"""
print "?? mod > exec? :", self.modifiedTimes[part], self.execute_times[part]
return self.modifiedTimes[part] > self.execute_times[part]
def should_touch(self, part=0):
print "?? mod > touch? :", self.modifiedTimes[part], self.streaming_touch_times[part]
return self.modifiedTimes[part] > self.streaming_touch_times[part]
def should_transfer_output(
self, output_idx, consumer_meta_module, consumer_input_idx,
streaming=False):
"""Determine whether output should be transferred through
the given output index to the input index on the given consumer
module.
If the transferTime is older than executeTime, we should
transfer to our consumer. This means that if we (the producer
module) have executed after the previous transfer, we should
transfer again. When a new consumer module is connected to
us, transfer time is set to 0, so it will always get a
transfer initially. Semantics with viewer modules (internal
division into source and sink modules by the scheduler) are
taken care of by the scheduler.
@param output_idx: index of output of this module through which
output would be transferred
@param consumer_meta_module: the META MODULE associated with the
consumer that's connected to us.
@param consumer_input_idx: the input connection on the consumer
module that we want to transfer to
@param streaming: indicates whether this is to be a streaming
transfer. If so, a different transfer timestamp is used.
"""
consumer_instance = consumer_meta_module.instance
# first double check that we're actually connected on this output
# to the given consumerModule
if self.findConsumerInOutputConnections(
output_idx, consumer_instance, consumer_input_idx) >= 0:
consumerFound = True
else:
consumerFound = False
if consumerFound:
# determine which part is responsible for this output
part = self.getPartForOutput(output_idx)
if streaming:
tTime = self.streaming_transfer_times[
(output_idx, consumer_instance, consumer_input_idx)]
# note that we compare with the touch time, and not
# the execute time, since only the terminating module
# is actually executed.
print "?? streaming transfer? ttime ", tTime, "< touchtime", self.streaming_touch_times[part]
should_transfer = bool(tTime <
self.streaming_touch_times[part])
else:
tTime = self.transferTimes[
(output_idx, consumer_instance, consumer_input_idx)]
print "?? transfer? ttime ", tTime, "< executetime", self.execute_times[part]
should_transfer = bool(tTime < self.execute_times[part])
return should_transfer
else:
return False
def timeStampTransferTime(
self, outputIndex, consumerInstance, consumerInputIdx,
streaming=False):
"""Timestamp given transfer time with current time.
This method is called right after a successful transfer has
been made. Depending on the scheduling mode (event-driven or
hybrid) and whether it's a streaming module that's being
transferred to, the timestamps are set differently to make
sure that after a switch between modes, all transfers are
redone. Please see the documentation in the scheduler module.
@param streaming: determines whether a streaming transfer or a
normal transfer has just occurred.
"""
if streaming:
streaming_transfer_time = counter.counter()
transfer_time = 0
else:
transfer_time = counter.counter()
streaming_transfer_time = 0
# and set the timestamp
self.transferTimes[
(outputIndex, consumerInstance, consumerInputIdx)] = \
transfer_time
self.streaming_transfer_times[
(outputIndex, consumerInstance, consumerInputIdx)] = \
streaming_transfer_time