Skip to content

Commit

Permalink
fix nftables tests
Browse files Browse the repository at this point in the history
  • Loading branch information
blotus committed Apr 22, 2024
1 parent 261f693 commit 92eee0a
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions test/backends/nftables/test_nftables.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def tearDown(self):
run_cmd("nft", "delete", "table", "ip6", "crowdsec6", ignore_error=True)

def test_table_rule_set_are_created(self):
d1, d2, d3 = generate_n_decisions(3)
d4 = generate_n_decisions(1, ipv4=False)
self.lapi.ds.insert_decisions([d1, d2, d3, d4])
sleep(1)
output = json.loads(run_cmd("nft", "-j", "list", "tables"))
tables = {
Expand All @@ -48,7 +51,7 @@ def test_table_rule_set_are_created(self):
for node in output["nftables"]
if "set" in node
}
assert ("ip", "crowdsec-blacklists", "ipv4_addr") in sets
assert ("ip", "crowdsec-blacklists-script", "ipv4_addr") in sets
rules = {
node["rule"]["chain"] for node in output["nftables"] if "rule" in node
} # maybe stricter check ?
Expand All @@ -62,7 +65,7 @@ def test_table_rule_set_are_created(self):
for node in output["nftables"]
if "set" in node
}
assert ("ip6", "crowdsec6-blacklists", "ipv6_addr") in sets
assert ("ip6", "crowdsec6-blacklists-script", "ipv6_addr") in sets

rules = {
node["rule"]["chain"] for node in output["nftables"] if "rule" in node
Expand All @@ -74,7 +77,7 @@ def test_duplicate_decisions_across_decision_stream(self):
d1, d2, d3 = generate_n_decisions(3, dup_count=1)
self.lapi.ds.insert_decisions([d1])
sleep(1)
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), {"0.0.0.0"})
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), {"0.0.0.0"})

self.lapi.ds.insert_decisions([d2, d3])
sleep(1)
Expand All @@ -86,12 +89,12 @@ def test_duplicate_decisions_across_decision_stream(self):
self.lapi.ds.delete_decision_by_id(d1["id"])
self.lapi.ds.delete_decision_by_id(d2["id"])
sleep(1)
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), set())
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), set())
assert self.fb.poll() is None

self.lapi.ds.delete_decision_by_id(d3["id"])
sleep(1)
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists"), set())
self.assertEqual(get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script"), set())
assert self.fb.poll() is None

def test_decision_insertion_deletion_ipv4(self):
Expand All @@ -100,15 +103,15 @@ def test_decision_insertion_deletion_ipv4(self):
self.lapi.ds.insert_decisions(decisions)
sleep(1) # let the bouncer insert the decisions

set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists")
set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script")
self.assertEqual(len(set_elements), total_decisions - duplicate_decisions)
assert {i["value"] for i in decisions} == set_elements
assert "0.0.0.0" in set_elements

self.lapi.ds.delete_decisions_by_ip("0.0.0.0")
sleep(1)

set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists")
set_elements = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script")
assert {i["value"] for i in decisions if i["value"] != "0.0.0.0"} == set_elements
assert len(set_elements) == total_decisions - duplicate_decisions - 1
assert "0.0.0.0" not in set_elements
Expand All @@ -119,7 +122,7 @@ def test_decision_insertion_deletion_ipv6(self):
self.lapi.ds.insert_decisions(decisions)
sleep(1)

set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists")
set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists-script")
set_elements = set(map(ip_address, set_elements))
assert len(set_elements) == total_decisions - duplicate_decisions
assert {ip_address(i["value"]) for i in decisions} == set_elements
Expand All @@ -128,7 +131,7 @@ def test_decision_insertion_deletion_ipv6(self):
self.lapi.ds.delete_decisions_by_ip("::1:0:3")
sleep(1)

set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists")
set_elements = get_set_elements("ip6", "crowdsec6", "crowdsec6-blacklists-script")
set_elements = set(map(ip_address, set_elements))
self.assertEqual(len(set_elements), total_decisions - duplicate_decisions - 1)
assert (
Expand All @@ -154,7 +157,7 @@ def test_longest_decision_insertion(self):
]
self.lapi.ds.insert_decisions(decisions)
sleep(1)
elems = get_set_elements("ip", "crowdsec", "crowdsec-blacklists", with_timeout=True)
elems = get_set_elements("ip", "crowdsec", "crowdsec-blacklists-script", with_timeout=True)
assert len(elems) == 1
elems = list(elems)
assert elems[0][0] == "123.45.67.12"
Expand Down

0 comments on commit 92eee0a

Please sign in to comment.