-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_mld_basic.py
166 lines (138 loc) Β· 5.07 KB
/
test_mld_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2018-2022 David Lamparter for NetDEF, Inc.
"""
IPv6 Multicast Listener Discovery tests.
"""
from topotato.v1 import *
from topotato.multicast import *
from topotato.scapy import ScapySend
from scapy.all import (
IPv6,
ICMPv6MLReport2,
ICMPv6MLDMultAddrRec,
IPv6ExtHdrHopByHop,
RouterAlert,
UDP,
)
@topology_fixture()
def topology(topo):
"""
[ ]-----[ h1 ]
[ ]
[ dut ]-----[ h2 ]
[ ]
[ ]-----{ lan }-----[ src ]
"""
class FRRConfigured(RouterFRR):
zebra = """
#% extends "boilerplate.conf"
#% block main
debug zebra events
debug zebra packet
debug zebra rib detailed
debug zebra nht detailed
#% endblock
"""
pim6d = """
#% extends "boilerplate.conf"
#% block main
#% if frr.has_defun("debug_mld_cmd")
debug mld
#% endif
#% if router.name in ['dut']
#% for iface in router.ifaces
interface {{ iface.ifname }}
ipv6 pim
ipv6 mld query-max-response-time 10
ipv6 mld
!
#% endfor
#% endif
#% endblock
"""
def requirements(self):
self.require_defun("interface_ipv6_mld_cmd")
class Setup(TopotatoNetwork, topo=topology):
dut: FRRConfigured
h1: Host
h2: Host
src: Host
def iter_mld_records(report):
for record in report.records:
while isinstance(record, ICMPv6MLDMultAddrRec):
yield record
record = record.payload
class MLDBasic(TestBase, AutoFixture, setup=Setup):
@topotatofunc(include_startup=True)
def prepare(self, topo, dut, h1, h2, src):
self.receiver = MulticastReceiver(h1, h1.iface_to('dut'))
# wait for query before continuing
logchecks = yield from AssertLog.make(dut, 'pim6d', '[MLD default:dut-h1] MLD query', maxwait=3.0)
@logchecks.skip_on_exception
def need_debug_mld(testitem):
testitem.instance.dut.require_defun("debug_mld_cmd")
# get out of initial reporting (prevents timing issues later)
def expect_pkt(ipv6: IPv6, report: ICMPv6MLReport2):
for record in iter_mld_records(report):
if record.rtype == 2: # IS_EX
return True
yield from AssertPacket.make("h1_dut", maxwait=2.0, pkt=expect_pkt)
@topotatofunc
def test_ssm(self, topo, dut, h1, h2, src):
"""
Join a (S,G) on MLD and try forwarding a packet on it.
"""
srcaddr = src.iface_to('lan').ip6[0].ip
yield from self.receiver.join('ff05::2345', srcaddr)
logchecks = yield from AssertLog.make(dut, 'pim6d', '[MLD default:dut-h1 (%s,ff05::2345)] NOINFO => JOIN' % srcaddr, maxwait=3.0)
@logchecks.skip_on_exception
def need_debug_mld(testitem):
testitem.instance.dut.require_defun("debug_mld_cmd")
yield from AssertVtysh.make(dut, "pim6d", "debug show mld interface %s" % (dut.iface_to('h1').ifname))
ip = IPv6(hlim=255, src=srcaddr, dst="ff05::2345")
udp = UDP(sport=9999, dport=9999)
yield from ScapySend.make(
src,
"src-lan",
pkt = ip/udp,
)
def expect_pkt(ipv6: IPv6, udp: UDP):
return ipv6.src == str(srcaddr) and ipv6.dst == 'ff05::2345' \
and udp.dport == 9999
yield from AssertPacket.make("h1_dut", maxwait=2.0, pkt=expect_pkt)
@topotatofunc
def test_asm(self, topo, dut, h1, h2, src):
yield from self.receiver.join('ff05::1234')
logchecks = yield from AssertLog.make(dut, 'pim6d', '[MLD default:dut-h1 (*,ff05::1234)] NOINFO => JOIN', maxwait=2.0)
@logchecks.skip_on_exception
def need_debug_mld(testitem):
testitem.instance.dut.require_defun("debug_mld_cmd")
yield from AssertVtysh.make(dut, "pim6d", "debug show mld interface %s" % (dut.iface_to('h1').ifname))
@topotatofunc
def test_no_rtralert(self, topo, dut, h1, h2, src):
"""
MLD code should be ignoring MLD reports without router alert option.
"""
ip = IPv6(hlim=1, src=h1.iface_to("dut").ll6, dst="ff02::16")
rec0 = ICMPv6MLDMultAddrRec(dst="ff0e::1234")
yield from ScapySend.make(
h1,
"h1-dut",
pkt = ip/ICMPv6MLReport2(records = [rec0]),
)
yield from AssertLog.make(dut, 'pim6d', 'packet without IPv6 Router Alert MLD option', maxwait=2.0)
@topotatofunc
def test_invalid_group(self, topo, dut, h1, h2, src):
"""
An unicast address is not a valid group address.
"""
ip = IPv6(hlim=1, src=h1.iface_to("dut").ll6, dst="ff02::16")
hbh = IPv6ExtHdrHopByHop(options = RouterAlert())
mfrec0 = ICMPv6MLDMultAddrRec(dst="fe80::1234")
yield from ScapySend.make(
h1,
"h1-dut",
pkt = ip/hbh/ICMPv6MLReport2(records = [mfrec0]),
)
yield from AssertLog.make(dut, 'pim6d', f"[MLD default:dut-h1 {h1.iface_to('dut').ll6}] malformed MLDv2 report (invalid group fe80::1234)", maxwait=2.0)