forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpushed_notifications_test.py
486 lines (409 loc) · 22.4 KB
/
pushed_notifications_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
import time
import pytest
import logging
from datetime import datetime
from distutils.version import LooseVersion
from threading import Event
from cassandra import ConsistencyLevel as CL
from cassandra import ReadFailure
from cassandra.query import SimpleStatement
from ccmlib.node import Node, TimeoutError
from dtest import Tester, get_ip_from_node, get_port_from_node, create_ks
since = pytest.mark.since
logger = logging.getLogger(__name__)
class NotificationWaiter(object):
"""
A helper class for waiting for pushed notifications from
Cassandra over the native protocol.
"""
def __init__(self, tester, node, notification_types, keyspace=None):
"""
`address` should be a ccmlib.node.Node instance
`notification_types` should be a list of
"TOPOLOGY_CHANGE", "STATUS_CHANGE", and "SCHEMA_CHANGE".
"""
self.node = node
self.address = node.network_interfaces['binary'][0]
self.notification_types = notification_types
self.keyspace = keyspace
# get a single, new connection
version = 5 if node.get_cassandra_version() >= LooseVersion('4.0') else None
session = tester.patient_exclusive_cql_connection(node, protocol_version=version)
connection = session.cluster.connection_factory(self.address, is_control_connection=True)
# coordinate with an Event
self.event = Event()
# the pushed notification
self.notifications = []
# register a callback for the notification type
for notification_type in notification_types:
connection.register_watcher(notification_type, self.handle_notification, register_timeout=5.0)
def handle_notification(self, notification):
"""
Called when a notification is pushed from Cassandra.
"""
logger.debug("Got {} from {} at {}".format(notification, self.address, datetime.now()))
if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']:
return # we are not interested in this schema change
self.notifications.append(notification)
self.event.set()
def wait_for_notifications(self, timeout, num_notifications=1):
"""
Waits up to `timeout` seconds for notifications from Cassandra. If
passed `num_notifications`, stop waiting when that many notifications
are observed.
"""
deadline = time.time() + timeout
while time.time() < deadline:
self.event.wait(deadline - time.time())
self.event.clear()
if len(self.notifications) >= num_notifications:
break
return self.notifications
def clear_notifications(self):
logger.debug("Clearing notifications...")
self.notifications = []
self.event.clear()
class TestPushedNotifications(Tester):
"""
Tests for pushed native protocol notification from Cassandra.
"""
@pytest.mark.no_vnodes
def test_move_single_node(self):
"""
@jira_ticket CASSANDRA-8516
Moving a token should result in MOVED_NODE notifications.
"""
self.cluster.populate(3).start()
waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
for node in list(self.cluster.nodes.values())]
logger.debug("Issuing move command....")
node1 = list(self.cluster.nodes.values())[0]
node1.move("123")
for waiter in waiters:
# poll each waiter in turn, they should all receive a MOVED_NODE notification for node3
# and at most one NEW_NODE for each node in the cluster, which we don't care about here.
# Whether nodes send the NEW_NODE depends on whether that node learns of the new node
# (either through gossip or TCM) before or after the listener is established.
logger.debug("Checking notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(10.0, 4)
logger.debug("Received {}".format(notifications))
count = len(notifications)
assert 1 <= count <= 4
notification = notifications[count - 1]
change_type = notification["change_type"]
address, port = notification["address"]
assert "MOVED_NODE" == change_type
assert get_ip_from_node(node1) == address
@pytest.mark.no_vnodes
def test_move_single_node_localhost(self):
"""
Test that we don't get NODE_MOVED notifications from nodes other than the local one,
when rpc_address is set to localhost (127.0.0.1) Pre 4.0.
Test that we get NODE_MOVED notifications from nodes other than the local one,
when rpc_address is set to localhost (127.0.0.1) Post 4.0.
@jira_ticket CASSANDRA-10052
@jira_ticket CASSANDRA-15677
To set-up this test we override the rpc_address to "localhost (127.0.0.1)" for all nodes, and
therefore we must change the rpc port or else processes won't start.
"""
cluster = self.cluster
cluster.populate(3)
self.change_rpc_address_to_localhost()
cluster.start()
waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
for node in list(self.cluster.nodes.values())]
# The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
# late due to network delays let's block a bit longer
logger.debug("Waiting for unwanted notifications...")
waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
waiters[0].clear_notifications()
logger.debug("Issuing move command....")
node1 = list(self.cluster.nodes.values())[0]
node1.move("123")
version = self.cluster.cassandra_version()
for waiter in waiters:
logger.debug("Waiting for notification from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(30.0)
if version >= '4.0':
# CASSANDRA-15677 Post 4.0 we'll get the notifications. Check that they are for the right node.
assert 1 == len(notifications), notifications
notification = notifications[0]
change_type = notification["change_type"]
address, port = notification["address"]
assert "MOVED_NODE" == change_type
assert get_ip_from_node(node1) == address
assert get_port_from_node(node1) == port
else:
assert 1 if waiter.node is node1 else 0 == len(notifications), notifications
def test_restart_node(self):
"""
@jira_ticket CASSANDRA-7816
Restarting a node should generate exactly one DOWN and one UP notification
"""
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
# don't confuse the state below.
logger.debug("Waiting for unwanted notifications...")
waiter.wait_for_notifications(timeout=30, num_notifications=2)
waiter.clear_notifications()
# On versions prior to 2.2, an additional NEW_NODE notification is sent when a node
# is restarted. This bug was fixed in CASSANDRA-11038 (see also CASSANDRA-11360)
version = self.cluster.cassandra_version()
expected_notifications = 2 if version >= '2.2' else 3
for i in range(5):
logger.debug("Restarting second node...")
node2.stop(wait_other_notice=True)
node2.start()
logger.debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications)
assert expected_notifications, len(notifications) == notifications
for notification in notifications:
assert get_ip_from_node(node2) == notification["address"][0]
assert "DOWN" == notifications[0]["change_type"]
if version >= '2.2':
assert "UP" == notifications[1]["change_type"]
else:
# pre 2.2, we'll receive both a NEW_NODE and an UP notification,
# but the order is not guaranteed
assert {"NEW_NODE", "UP"} == set([n["change_type"] for n in notifications[1:]])
waiter.clear_notifications()
def test_restart_node_localhost(self):
"""
Test that we don't get client notifications when rpc_address is set to localhost Pre 4.0.
Test that we get correct client notifications when rpc_address is set to localhost Post 4.0.
@jira_ticket CASSANDRA-10052
@jira_ticket CASSANDRA-15677
To set-up this test we override the rpc_address to "localhost" for all nodes, and
therefore we must change the rpc port or else processes won't start.
"""
cluster = self.cluster
cluster.populate(2)
node1, node2 = cluster.nodelist()
self.change_rpc_address_to_localhost()
cluster.start()
# register for notification with node1
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# restart node 2
version = self.cluster.cassandra_version()
if version >= '4.0':
# >=4.0 we wait for the NEW_NODE and UP notifications to reach us
waiter.wait_for_notifications(timeout=30.0, num_notifications=2)
waiter.clear_notifications()
logger.debug("Restarting second node...")
node2.stop(wait_other_notice=True)
node2.start()
# check that node1 did not send UP or DOWN notification for node2
logger.debug("Waiting for notifications from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2)
if version >= '4.0':
# CASSANDRA-15677 Post 4.0 we'll get the notifications. Check that they are for the right node.
for notification in notifications:
address, port = notification["address"]
assert get_ip_from_node(node2) == address
assert get_port_from_node(node2) == port
assert "DOWN" == notifications[0]["change_type"], notifications
assert "UP" == notifications[1]["change_type"], notifications
else:
assert 0 == len(notifications), notifications
@since("2.2")
def test_add_and_remove_node(self):
"""
Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave.
@jira_ticket CASSANDRA-11038
"""
self.cluster.populate(1).start()
node1 = self.cluster.nodelist()[0]
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
# don't confuse the state below
logger.debug("Waiting for unwanted notifications...")
waiter.wait_for_notifications(timeout=30, num_notifications=2)
waiter.clear_notifications()
session = self.patient_cql_connection(node1)
# reduce system_distributed RF to 2 so we don't require forceful decommission
session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
logger.debug("Adding second node...")
node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042))
self.cluster.add(node2, False, data_center="dc1")
node2.start()
logger.debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=120.0, num_notifications=2)
assert 2 == len(notifications), notifications
for notification in notifications:
assert get_ip_from_node(node2) == notification["address"][0]
assert "NEW_NODE" == notifications[0]["change_type"]
assert "UP" == notifications[1]["change_type"]
logger.debug("Removing second node...")
waiter.clear_notifications()
node2.decommission()
node2.stop(gently=False)
logger.debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=120.0, num_notifications=2)
assert 2 == len(notifications), notifications
for notification in notifications:
assert get_ip_from_node(node2) == notification["address"][0]
assert "REMOVED_NODE" == notifications[0]["change_type"]
assert "DOWN" == notifications[1]["change_type"]
def change_rpc_address_to_localhost(self):
"""
change node's 'rpc_address' from '127.0.0.x' to 'localhost (127.0.0.1)', increase port numbers
"""
cluster = self.cluster
i = 0
for node in cluster.nodelist():
logger.debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml')
if cluster.version() < '4':
node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i)
node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i)
node.import_config_files() # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address
node.set_configuration_options(values={'rpc_address': 'localhost'})
logger.debug(node.show())
i += 2
@since("3.0")
def test_schema_changes(self):
"""
@jira_ticket CASSANDRA-10328
Creating, updating and dropping a keyspace, a table and a materialized view
will generate the correct schema change notifications.
"""
self.cluster.set_configuration_options({'enable_materialized_views': 'true'})
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
waiter = NotificationWaiter(self, node2, ["SCHEMA_CHANGE"], keyspace='ks')
create_ks(session, 'ks', 3)
session.execute("create TABLE t (k int PRIMARY KEY , v int)")
session.execute("alter TABLE t add v1 int;")
session.execute("create MATERIALIZED VIEW mv as select * from t WHERE v IS NOT NULL AND k IS NOT NULL PRIMARY KEY (v, k)")
session.execute(" alter materialized view mv with min_index_interval = 100")
session.execute("drop MATERIALIZED VIEW mv")
session.execute("drop TABLE t")
session.execute("drop KEYSPACE ks")
logger.debug("Waiting for notifications from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8)
assert 8 == len(notifications), notifications
# assert dict contains subset
expected = {'change_type': 'CREATED', 'target_type': 'KEYSPACE'}
assert set(notifications[0].keys()) >= expected.keys() and {k: notifications[0][k] for k in expected if
k in notifications[0]} == expected
expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 't'}
assert set(notifications[1].keys()) >= expected.keys() and {k: notifications[1][k] for k in expected if
k in notifications[1]} == expected
expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 't'}
assert set(notifications[2].keys()) >= expected.keys() and {k: notifications[2][k] for k in expected if
k in notifications[2]} == expected
expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 'mv'}
assert set(notifications[3].keys()) >= expected.keys() and {k: notifications[3][k] for k in expected if
k in notifications[3]} == expected
expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 'mv'}
assert set(notifications[4].keys()) >= expected.keys() and {k: notifications[4][k] for k in expected if
k in notifications[4]} == expected
expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 'mv'}
assert set(notifications[5].keys()) >= expected.keys() and {k: notifications[5][k] for k in expected if
k in notifications[5]} == expected
expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 't'}
assert set(notifications[6].keys()) >= expected.keys() and {k: notifications[6][k] for k in expected if
k in notifications[6]} == expected
expected = {'change_type': 'DROPPED', 'target_type': 'KEYSPACE'}
assert set(notifications[7].keys()) >= expected.keys() and {k: notifications[7][k] for k in expected if
k in notifications[7]} == expected
class TestVariousNotifications(Tester):
"""
Tests for various notifications/messages from Cassandra.
"""
@since('2.2')
def test_tombstone_failure_threshold_message(self):
"""
Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather
than dropping the request. A drop makes the coordinator waits for the specified
read_request_timeout_in_ms.
@jira_ticket CASSANDRA-7886
"""
have_v5_protocol = self.supports_v5_protocol(self.cluster.version())
self.fixture_dtest_setup.allow_log_errors = True
opts = {'read_request_timeout_in_ms': 30000, # 30 seconds
'range_request_timeout_in_ms': 40000}
if self.supports_guardrails():
opts['guardrails'] = {'tombstone_warn_threshold': -1,
'tombstone_failure_threshold': 500}
else:
opts['tombstone_warn_threshold'] = -1
opts['tombstone_failure_threshold'] = 500
# TODO this can be simplified when we are up-to-date with 5.0-rc1
if self.cluster.version() >= LooseVersion('4.1.6') and self.cluster.version() < LooseVersion('4.2') or self.cluster.version() >= LooseVersion('5.0-rc1'):
opts['native_transport_timeout'] = '30s'
self.cluster.set_configuration_options(values=opts)
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
proto_version = 5 if have_v5_protocol else None
session = self.patient_cql_connection(node1, protocol_version=proto_version)
create_ks(session, 'test', 3)
session.execute(
"CREATE TABLE test ( "
"id int, mytext text, col1 int, col2 int, col3 int, "
"PRIMARY KEY (id, mytext) )"
)
if self.supports_guardrails():
# cell tombstones are not counted towards the threshold, so we delete rows
query = "delete from test where id = 1 and mytext = '{}'"
else:
# Add data with tombstones
query = "insert into test (id, mytext, col1) values (1, '{}', null)"
values = [str(i) for i in range(1000)]
for value in values:
session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL))
failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted")
@pytest.mark.timeout(25)
def read_failure_query():
try:
session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL))
except ReadFailure as exc:
if have_v5_protocol:
# at least one replica should have responded with a tombstone error
assert exc.error_code_map is not None
assert 0x0001 == list(exc.error_code_map.values())[0]
except Exception:
raise
else:
pytest.fail(reason='Expected ReadFailure')
read_failure_query()
# In almost all cases, we should find the failure message on node1 within a few seconds.
# If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
# If we still cannot find it then, we fail the test, as this is a problem.
try:
node1.watch_log_for(failure_msg, timeout=5)
except TimeoutError:
failure = (node1.grep_log(failure_msg) or
node2.grep_log(failure_msg) or
node3.grep_log(failure_msg))
assert failure, "Cannot find tombstone failure threshold error in log after failed query"
mark1 = node1.mark_log()
mark2 = node2.mark_log()
mark3 = node3.mark_log()
@pytest.mark.timeout(35)
def range_request_failure_query():
try:
session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL))
except ReadFailure as exc:
if have_v5_protocol:
# at least one replica should have responded with a tombstone error
assert exc.error_code_map is not None
assert 0x0001 == list(exc.error_code_map.values())[0]
except Exception:
raise
else:
pytest.fail(reason='Expected ReadFailure')
range_request_failure_query()
# In almost all cases, we should find the failure message on node1 within a few seconds.
# If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
# If we still cannot find it then, we fail the test, as this is a problem.
try:
node1.watch_log_for(failure_msg, from_mark=mark1, timeout=5)
except TimeoutError:
failure = (node1.grep_log(failure_msg, from_mark=mark1) or
node2.grep_log(failure_msg, from_mark=mark2) or
node3.grep_log(failure_msg, from_mark=mark3))
assert failure == "Cannot find tombstone failure threshold error in log after range_request_timeout_query"