diff --git a/test/backends/nftables/test_nftables.py b/test/backends/nftables/test_nftables.py index 92ab7f04..014fd7eb 100644 --- a/test/backends/nftables/test_nftables.py +++ b/test/backends/nftables/test_nftables.py @@ -31,6 +31,9 @@ def tearDown(self): run_cmd("nft", "delete", "table", "ip6", "crowdsec6", ignore_error=True) def test_table_rule_set_are_created(self): + d1, d2, d3 = generate_n_decisions(3) + d4 = generate_n_decisions(1, ipv4=False) + self.lapi.ds.insert_decisions([d1, d2, d3, d4]) sleep(1) output = json.loads(run_cmd("nft", "-j", "list", "tables")) tables = { @@ -48,7 +51,7 @@ def test_table_rule_set_are_created(self): for node in output["nftables"] if "set" in node } - assert ("ip", "crowdsec-blacklists", "ipv4_addr") in sets + assert ("ip", "crowdsec-blacklists-script", "ipv4_addr") in sets rules = { node["rule"]["chain"] for node in output["nftables"] if "rule" in node } # maybe stricter check ? @@ -62,7 +65,7 @@ def test_table_rule_set_are_created(self): for node in output["nftables"] if "set" in node } - assert ("ip6", "crowdsec6-blacklists", "ipv6_addr") in sets + assert ("ip6", "crowdsec6-blacklists-script", "ipv6_addr") in sets rules = { node["rule"]["chain"] for node in output["nftables"] if "rule" in node @@ -74,7 +77,7 @@ def test_duplicate_decisions_across_decision_stream(self): d1, d2, d3 = generate_n_decisions(3, dup_count=1) self.lapi.ds.insert_decisions([d1]) sleep(1) - self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), {"0.0.0.0"}) + self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), {"0.0.0.0"}) self.lapi.ds.insert_decisions([d2, d3]) sleep(1) @@ -86,12 +89,12 @@ def test_duplicate_decisions_across_decision_stream(self): self.lapi.ds.delete_decision_by_id(d1["id"]) self.lapi.ds.delete_decision_by_id(d2["id"]) sleep(1) - self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), set()) + self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), set()) assert self.fb.poll() is None self.lapi.ds.delete_decision_by_id(d3["id"]) sleep(1) - self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), set()) + self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), set()) assert self.fb.poll() is None def test_decision_insertion_deletion_ipv4(self): @@ -100,7 +103,7 @@ def test_decision_insertion_deletion_ipv4(self): self.lapi.ds.insert_decisions(decisions) sleep(1) # let the bouncer insert the decisions - set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists") + set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script") self.assertEqual(len(set_elements), total_decisions - duplicate_decisions) assert {i["value"] for i in decisions} == set_elements assert "0.0.0.0" in set_elements @@ -108,7 +111,7 @@ def test_decision_insertion_deletion_ipv4(self): self.lapi.ds.delete_decisions_by_ip("0.0.0.0") sleep(1) - set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists") + set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script") assert {i["value"] for i in decisions if i["value"] != "0.0.0.0"} == set_elements assert len(set_elements) == total_decisions - duplicate_decisions - 1 assert "0.0.0.0" not in set_elements @@ -119,7 +122,7 @@ def test_decision_insertion_deletion_ipv6(self): self.lapi.ds.insert_decisions(decisions) sleep(1) - set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists") + set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists-script") set_elements = set(map(ip_address, set_elements)) assert len(set_elements) == total_decisions - duplicate_decisions assert {ip_address(i["value"]) for i in decisions} == set_elements @@ -128,7 +131,7 @@ def test_decision_insertion_deletion_ipv6(self): self.lapi.ds.delete_decisions_by_ip("::1:0:3") sleep(1) - set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists") + set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists-script") set_elements = set(map(ip_address, set_elements)) self.assertEqual(len(set_elements), total_decisions - duplicate_decisions - 1) assert ( @@ -154,7 +157,7 @@ def test_longest_decision_insertion(self): ] self.lapi.ds.insert_decisions(decisions) sleep(1) - elems = get_set_elements("ip", "crowdsec", "crowdsec-blacklists", with_timeout=True) + elems = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script", with_timeout=True) assert len(elems) == 1 elems = list(elems) assert elems[0][0] == "123.45.67.12"