diff --git a/byteman/guardrails/disk_usage_full.btm b/byteman/guardrails/disk_usage_full.btm new file mode 100644 index 0000000000..bbdf8ddca9 --- /dev/null +++ b/byteman/guardrails/disk_usage_full.btm @@ -0,0 +1,8 @@ +RULE return FULL disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.FULL; +ENDRULE \ No newline at end of file diff --git a/byteman/guardrails/disk_usage_stuffed.btm b/byteman/guardrails/disk_usage_stuffed.btm new file mode 100644 index 0000000000..3256211304 --- /dev/null +++ b/byteman/guardrails/disk_usage_stuffed.btm @@ -0,0 +1,8 @@ +RULE return STUFFED disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED; +ENDRULE \ No newline at end of file diff --git a/client_request_metrics_test.py b/client_request_metrics_test.py index 6c5ef4dd9a..1900ddb099 100644 --- a/client_request_metrics_test.py +++ b/client_request_metrics_test.py @@ -42,7 +42,7 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.ignore_log_patterns = ( 'Testing write failures', # The error to simulate a write failure 'ERROR WRITE_FAILURE', # Logged in DEBUG mode for write failures - f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query" # Caused by the read failure tests + f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} (tombstones|tombstone rows) during query" # Caused by the read failure tests ) def setup_once(self): @@ -50,6 +50,7 @@ def setup_once(self): cluster.set_configuration_options({'read_request_timeout_in_ms': 3000, 'write_request_timeout_in_ms': 3000, 'phi_convict_threshold': 12, + 'tombstone_warn_threshold': -1, 'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD, 'enable_materialized_views': 'true'}) cluster.populate(2, debug=True) diff --git a/compaction_test.py b/compaction_test.py index d84b5dcd0f..ba5486d0eb 100644 --- a/compaction_test.py +++ b/compaction_test.py @@ -339,7 +339,10 @@ def test_large_compaction_warning(self): Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb """ cluster = self.cluster - cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) + if self.supports_guardrails: + cluster.set_configuration_options({'guardrails': {'partition_size_warn_threshold_in_mb': 1}}) + else: + cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) cluster.populate(1).start() [node] = cluster.nodelist() @@ -361,7 +364,10 @@ def test_large_compaction_warning(self): node.nodetool('compact ks large') verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting' sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB' - node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180) + log_message = '{} large partition ks/large:user \({}'.format(verb, sizematcher) + if self.supports_guardrails: + log_message = "Detected partition 'user' in ks.large of size 2MB is greater than the maximum recommended size \(1MB\)" + node.watch_log_for(log_message, from_mark=mark, timeout=180) ret = list(session.execute("SELECT properties from ks.large where userid = 'user'")) assert_length_equal(ret, 1) diff --git a/cqlsh_tests/test_cqlsh.py b/cqlsh_tests/test_cqlsh.py index 7b683cad31..30598ca60b 100644 --- a/cqlsh_tests/test_cqlsh.py +++ b/cqlsh_tests/test_cqlsh.py @@ -1810,8 +1810,11 @@ def test_client_warnings(self): """ max_partitions_per_batch = 5 self.cluster.populate(3) - self.cluster.set_configuration_options({ - 'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)}) + + config_opts = {'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)} + if self.supports_guardrails: + config_opts = {"guardrails": config_opts} + self.cluster.set_configuration_options(config_opts) self.cluster.start() diff --git a/cqlsh_tests/test_cqlsh_copy.py b/cqlsh_tests/test_cqlsh_copy.py index 6ba54360db..de30d46e25 100644 --- a/cqlsh_tests/test_cqlsh_copy.py +++ b/cqlsh_tests/test_cqlsh_copy.py @@ -2475,8 +2475,12 @@ def test_bulk_round_trip_blogposts(self): @jira_ticket CASSANDRA-9302 """ + config_opts = {'batch_size_warn_threshold_in_kb': '10'} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts') @@ -2489,9 +2493,16 @@ def test_bulk_round_trip_blogposts_with_max_connections(self): @jira_ticket CASSANDRA-10938 """ + batch_size_warn_threshold_in_kb = '10' + native_transport_max_concurrent_connections = '12' + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb}, + 'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections} + else: + config_opts = {'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections, + 'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb} self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'native_transport_max_concurrent_connections': '12', - 'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts', copy_to_options={'NUMPROCESSES': 5, 'MAXATTEMPTS': 20}, @@ -2821,8 +2832,13 @@ def test_copy_from_with_large_cql_rows(self): @jira_ticket CASSANDRA-11474 """ num_records = 100 - self.prepare(nodes=1, configuration_options={'batch_size_warn_threshold_in_kb': '1', # warn with 1kb and fail - 'batch_size_fail_threshold_in_kb': '5'}) # with 5kb size batches + batch_size_warn_threshold_in_kb = '1' # warn with 1kb and fail + batch_size_fail_threshold_in_kb = '5' # with 5kb size batches + config_opts = {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb, + 'batch_size_fail_threshold_in_kb': batch_size_fail_threshold_in_kb} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self.prepare(nodes=1, configuration_options=config_opts) logger.debug('Running stress') stress_table_name = 'standard1' diff --git a/dtest_setup.py b/dtest_setup.py index d04fb001bc..009fb220f6 100644 --- a/dtest_setup.py +++ b/dtest_setup.py @@ -332,6 +332,10 @@ def dump_jfr_recording(self, nodes): def supports_v5_protocol(self, cluster_version): return cluster_version >= LooseVersion('4.0') + def supports_guardrails(self): + return self.cluster.version() >= LooseVersion('4.0') + + def cleanup_last_test_dir(self): if os.path.exists(self.last_test_dir): os.remove(self.last_test_dir) diff --git a/guardrails_test.py b/guardrails_test.py new file mode 100644 index 0000000000..bf883bba98 --- /dev/null +++ b/guardrails_test.py @@ -0,0 +1,99 @@ +import logging +import time +import pytest +import re + +from cassandra import InvalidRequest + +from dtest import Tester, create_ks +from tools.assertions import assert_one + +since = pytest.mark.since +logger = logging.getLogger(__name__) + +class BaseGuardrailsTester(Tester): + + def prepare(self, rf=1, options=None, nodes=3, install_byteman=False, extra_jvm_args=None, **kwargs): + if options is None: + options = {} + + if extra_jvm_args is None: + extra_jvm_args = [] + + cluster = self.cluster + cluster.set_log_level('TRACE') + cluster.populate(nodes, install_byteman=install_byteman) + if options: + cluster.set_configuration_options(values=options) + + cluster.start(jvm_args=extra_jvm_args) + node1 = cluster.nodelist()[0] + + session = self.patient_cql_connection(node1, **kwargs) + create_ks(session, 'ks', rf) + + return session + + +@since('4.0') +class TestGuardrails(BaseGuardrailsTester): + + def test_disk_usage_guardrail(self): + """ + Test disk usage guardrail will warn if exceeds warn threshold and reject writes if exceeds failure threshold + """ + + self.fixture_dtest_setup.ignore_log_patterns = ["Write request failed because disk usage exceeds failure threshold"] + guardrails_config = {'guardrails': {'disk_usage_percentage_warn_threshold': 98, + 'disk_usage_percentage_failure_threshold': 99}} + + logger.debug("prepare 2-node cluster with rf=1 and guardrails enabled") + session = self.prepare(rf=1, nodes=2, options=guardrails_config, extra_jvm_args=['-Dcassandra.disk_usage.monitor_interval_ms=100'], install_byteman=True) + node1, node2 = self.cluster.nodelist() + session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)") + + logger.debug("Inject FULL to node1, expect log on node1 and node2 rejects writes") + mark = node1.mark_log() + self.disk_usage_injection(node1, "full", False) + node1.watch_log_for("Adding state DISK_USAGE: FULL", filename='debug.log', from_mark=mark, timeout=10) + + # verify node2 will reject writes if node1 is the replica + session2 = self.patient_exclusive_cql_connection(node2, keyspace="ks") + rows = 100 + failed = 0 + for x in range(rows): + try: + session2.execute("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + except InvalidRequest as e: + assert re.search("Write request failed because disk usage exceeds failure threshold", str(e)) + failed = failed + 1 + + assert rows != failed, "Expect node2 rejects some writes, but rejected all" + assert 0 != failed, "Expect node2 rejects some writes, but rejected nothing" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows - failed]) + + logger.debug("Inject STUFFED to node1, node2 should warn client") + session2.execute("TRUNCATE t") + mark = node1.mark_log() + self.disk_usage_injection(node1, "stuffed") + node1.watch_log_for("Adding state DISK_USAGE: STUFFED", filename='debug.log', from_mark=mark, timeout=10) + + warnings = 0 + for x in range(rows): + fut = session2.execute_async("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + fut.result() + if fut.warnings: + assert ["Replica disk usage exceeds warn threshold"] == fut.warnings + warnings = warnings + 1 + + assert rows != warnings,"Expect node2 emits some warnings, but got all warnings" + assert 0 != warnings,"Expect node2 emits some warnings, but got no warnings" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows]) + + session.cluster.shutdown() + session2.cluster.shutdown() + + def disk_usage_injection(self, node, state, clear_byteman=True): + if clear_byteman: + node.byteman_submit(['-u']) + node.byteman_submit(["./byteman/guardrails/disk_usage_{}.btm".format(state)]) diff --git a/paging_test.py b/paging_test.py index 6983d3f05c..6d666df871 100644 --- a/paging_test.py +++ b/paging_test.py @@ -18,6 +18,7 @@ assert_one, assert_lists_equal_ignoring_order) from tools.data import rows_to_list from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts +from tools.misc import restart_cluster_and_update_config from tools.paging import PageAssertionMixin, PageFetcher since = pytest.mark.since @@ -3423,19 +3424,26 @@ def test_failure_threshold_deletions(self): supports_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': 500} - ) + if self.supports_guardrails: + config_opts = {'guardrails': {'tombstone_failure_threshold': 500, + 'tombstone_warn_threshold': -1, + 'write_consistency_levels_disallowed': {}}} + else: + config_opts = {'tombstone_failure_threshold': 500} + restart_cluster_and_update_config(self.cluster, config_opts) self.session = self.prepare() self.setup_data() - # Add more data + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from paging_test where id = 1 and mytext = '{}'" + else: + # Add more data + query = "insert into paging_test (id, mytext, col1) values (1, '{}', null)" + values = [uuid.uuid4() for i in range(3000)] for value in values: - self.session.execute(SimpleStatement( - "insert into paging_test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), + self.session.execute(SimpleStatement(query.format(value), consistency_level=CL.ALL )) @@ -3456,7 +3464,7 @@ def test_failure_threshold_deletions(self): failure_msg = ("Scanned over.* tombstones in test_paging_size." "paging_test.* query aborted") else: - failure_msg = ("Scanned over.* tombstones during query.* query aborted") + failure_msg = ("Scanned over.* (tombstones|tombstone rows) during query.* query aborted") self.cluster.wait_for_any_log(failure_msg, 25) diff --git a/pushed_notifications_test.py b/pushed_notifications_test.py index 6ab311e7ef..cb307f8b51 100644 --- a/pushed_notifications_test.py +++ b/pushed_notifications_test.py @@ -388,13 +388,18 @@ def test_tombstone_failure_threshold_message(self): have_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={ - 'tombstone_failure_threshold': 500, - 'read_request_timeout_in_ms': 30000, # 30 seconds - 'range_request_timeout_in_ms': 40000 - } - ) + + if self.supports_guardrails: + config_options = {'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': 500}, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} + else: + config_options = {'tombstone_failure_threshold': 500, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} + + self.cluster.set_configuration_options(values=config_options) self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() proto_version = 5 if have_v5_protocol else None @@ -407,17 +412,17 @@ def test_tombstone_failure_threshold_message(self): "PRIMARY KEY (id, mytext) )" ) - # Add data with tombstones + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from test where id = 1 and mytext = '{}'" + else: + # Add data with tombstones + query = "insert into test (id, mytext, col1) values (1, '{}', null)" values = [str(i) for i in range(1000)] for value in values: - session.execute(SimpleStatement( - "insert into test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), - consistency_level=CL.ALL - )) - - failure_msg = ("Scanned over.* tombstones.* query aborted") + session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL)) + + failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted") @pytest.mark.timeout(25) def read_failure_query(): diff --git a/read_failures_test.py b/read_failures_test.py index 475f27815d..664ca70ff4 100644 --- a/read_failures_test.py +++ b/read_failures_test.py @@ -4,6 +4,7 @@ from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout from cassandra.policies import FallthroughRetryPolicy from cassandra.query import SimpleStatement +from distutils.version import LooseVersion from dtest import Tester @@ -21,7 +22,9 @@ class TestReadFailures(Tester): @pytest.fixture(autouse=True) def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.ignore_log_patterns = ( - "Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones + # These are expected when testing read failures due to tombstones, + "Scanned over [1-9][0-9]* tombstones", + "Scanned over [1-9][0-9]* tombstone rows", ) return fixture_dtest_setup @@ -33,9 +36,15 @@ def fixture_dtest_setup_params(self): self.expected_expt = ReadFailure def _prepare_cluster(self): - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': self.tombstone_failure_threshold} - ) + if self.supports_guardrails: + self.cluster.set_configuration_options( + values={'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': self.tombstone_failure_threshold}} + ) + else: + self.cluster.set_configuration_options( + values={'tombstone_failure_threshold': self.tombstone_failure_threshold} + ) self.cluster.populate(3) self.cluster.start() self.nodes = list(self.cluster.nodes.values()) diff --git a/tools/misc.py b/tools/misc.py index 542a889a5a..d746a9947e 100644 --- a/tools/misc.py +++ b/tools/misc.py @@ -157,3 +157,14 @@ def add_skip(cls, reason=""): else: cls.pytestmark = [pytest.mark.skip(reason)] return cls + + +def restart_cluster_and_update_config(cluster, config): + """ + Takes a new config, and applies it to a cluster. We need to restart + for it to take effect. We _could_ take a node here, but we don't want to. + If you really want to change the config of just one node, use JMX. + """ + cluster.stop() + cluster.set_configuration_options(values=config) + cluster.start()