-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.py
130 lines (94 loc) · 2.91 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: UTF-8 -*-
DEFAULT_FETCH_SIZE = 4096
class Operation(object):
"""
Represents an operation to be executed on driver.
"""
OPER_INSERT = 1
OPER_UPDATE = 2
oper_type = None
data = None
def __init__(self, oper_type, data):
self.oper_type = oper_type
self.data = data
def as_insert(self):
"""
Returns this same operation, but casted to insert.
"""
return Operation(self.OPER_INSERT, self.data)
def as_update(self):
"""
Returns this same operation, but casted to update.
"""
return Operation(self.OPER_UPDATE, self.data)
class Delta(object):
"""
This class represents the batch operations needed to transform the state
of a certain driver into another.
"""
driver = None
operations = None
def __init__(self, driver):
self.driver = driver
self.operations = []
def add_operation(self, operation):
"""
Queue the specified operation.
"""
self.operations.append(operation)
def apply(self):
"""
Apply all operations in this delta to the associated driver.
"""
for op in self.operations:
self.driver.execute_operation(op)
class BaseDriver(object):
"""
Base classe for drivers.
"""
def get_queryset(self):
"""
Returns the queryset for all records.
"""
raise NotImplementedError()
def get_record(self, id):
"""
Returns a record identified by it's id.
"""
raise NotImplementedError()
def execute_operation(self, operation):
"""
Executes the operation specified.
"""
raise NotImplementedError()
def build_delta(self, another_driver):
"""
Generates the delta against the specified driver, the resulting delta
will transform the other driver into the same state of this one.
"""
delta = Delta(another_driver)
qs = self.get_queryset()
for row in qs:
another_row = another_driver.get_record(row["id"])
if another_row is None:
delta.add_operation(Operation(Operation.OPER_INSERT, row))
elif row["last_modified"] > another_row["last_modified"]:
delta.add_operation(Operation(Operation.OPER_UPDATE, row))
return delta
class BaseQuerySet(object):
"""
Base class for implementing driver's querysets. This allow us to read
tons of rows without consuming much memory, as result is paginated.
"""
__data = None
def __init__(self, batch_size=DEFAULT_FETCH_SIZE):
pass
def __iter__(self):
return self
def next(self):
return self._fetch_row()
def _fetch_row(self):
"""
This is where the main driver logic for fetching rows is implemented.
"""
raise NotImplementedError()