diff --git a/scapy/contrib/automotive/doip.py b/scapy/contrib/automotive/doip.py index ba66ef8a84d..f81049ca96e 100644 --- a/scapy/contrib/automotive/doip.py +++ b/scapy/contrib/automotive/doip.py @@ -11,6 +11,7 @@ import struct import socket import time +import ssl from scapy.contrib.automotive import log_automotive from scapy.fields import ( @@ -27,7 +28,7 @@ XStrField, ) from scapy.packet import Packet, bind_layers, bind_bottom_up -from scapy.supersocket import StreamSocket +from scapy.supersocket import StreamSocket, SSLStreamSocket from scapy.layers.inet import TCP, UDP from scapy.contrib.automotive.uds import UDS from scapy.data import MTU @@ -39,6 +40,7 @@ Optional, ) + # ISO 13400-2 sect 9.2 @@ -247,8 +249,8 @@ def post_build(self, pkt, pay): This will set the Field 'payload_length' to the correct value. """ if self.payload_length is None: - pkt = pkt[:4] + struct.pack("!I", len(pay) + len(pkt) - 8) + \ - pkt[8:] + pkt = pkt[:4] + struct.pack( + "!I", len(pay) + len(pkt) - 8) + pkt[8:] return pkt + pay def extract_padding(self, s): @@ -259,13 +261,24 @@ def extract_padding(self, s): return b"", None -class DoIPSocket(StreamSocket): - """ Custom StreamSocket for DoIP communication. This sockets automatically - sends a routing activation request as soon as a TCP connection is +bind_bottom_up(UDP, DoIP, sport=13400) +bind_bottom_up(UDP, DoIP, dport=13400) +bind_layers(UDP, DoIP, sport=13400, dport=13400) + +bind_layers(TCP, DoIP, sport=13400) +bind_layers(TCP, DoIP, dport=13400) + +bind_layers(DoIP, UDS, payload_type=0x8001) + + +class DoIPSocket(SSLStreamSocket): + """Socket for DoIP communication. This sockets automatically + sends a routing activation request as soon as a TCP or TLS connection is established. :param ip: IP address of destination :param port: destination port, usually 13400 + :param tls_port: destination port for TLS connection, usually 3496 :param activate_routing: If true, routing activation request is automatically sent :param source_address: DoIP source address @@ -275,69 +288,137 @@ class DoIPSocket(StreamSocket): the routing activation request :param reserved_oem: Optional parameter to set value for reserved_oem field of routing activation request + :param force_tls: Skip establishing of a TCP connection and directly try to + connect via SSL/TLS + :param context: Optional ssl.SSLContext object for initialization of ssl socket + connections. Example: >>> socket = DoIPSocket("169.254.0.131") >>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000]) >>> resp = socket.sr1(pkt, timeout=1) """ # noqa: E501 - def __init__(self, ip='127.0.0.1', port=13400, activate_routing=True, - source_address=0xe80, target_address=0, - activation_type=0, reserved_oem=b""): - # type: (str, int, bool, int, int, int, bytes) -> None + + def __init__(self, + ip='127.0.0.1', # type: str + port=13400, # type: int + tls_port=3496, # type: int + activate_routing=True, # type: bool + source_address=0xe80, # type: int + target_address=0, # type: int + activation_type=0, # type: int + reserved_oem=b"", # type: bytes + force_tls=False, # type: bool + context=None # type: Optional[ssl.SSLContext] + ): # type: (...) -> None self.ip = ip self.port = port + self.tls_port = tls_port + self.activate_routing = activate_routing self.source_address = source_address + self.target_address = target_address + self.activation_type = activation_type + self.reserved_oem = reserved_oem self.buffer = b"" - self._init_socket() - - if activate_routing: - self._activate_routing( - source_address, target_address, activation_type, reserved_oem) + self.force_tls = force_tls + self.context = context + try: + self._init_socket(socket.AF_INET) + except Exception: + self.close() + raise def recv(self, x=MTU, **kwargs): # type: (Optional[int], **Any) -> Optional[Packet] - if self.buffer: - len_data = self.buffer[:8] - else: - len_data = self.ins.recv(8, socket.MSG_PEEK) - if len(len_data) != 8: - return None + if len(self.buffer) < 8: + self.buffer += self.ins.recv(8) + if len(self.buffer) < 8: + return None + len_data = self.buffer[:8] len_int = struct.unpack(">I", len_data[4:8])[0] len_int += 8 - self.buffer += self.ins.recv(len_int - len(self.buffer)) - if len(self.buffer) != len_int: + self.buffer += self.ins.recv(len_int - len(self.buffer)) + if len(self.buffer) < len_int: return None + pktbuf = self.buffer[:len_int] + self.buffer = self.buffer[len_int:] - pkt = self.basecls(self.buffer, **kwargs) # type: Packet - self.buffer = b"" + pkt = self.basecls(pktbuf, **kwargs) # type: Packet return pkt def _init_socket(self, sock_family=socket.AF_INET): # type: (int) -> None + connected = False s = socket.socket(sock_family, socket.SOCK_STREAM) + s.settimeout(5) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP) - s.connect(addrinfo[0][-1]) - StreamSocket.__init__(self, s, DoIP) - - def _activate_routing(self, - source_address, # type: int - target_address, # type: int - activation_type, # type: int - reserved_oem=b"" # type: bytes - ): # type: (...) -> None + + if not self.force_tls: + addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP) + s.connect(addrinfo[0][-1]) + connected = True + SSLStreamSocket.__init__(self, s, DoIP) + + if not self.activate_routing: + return + + activation_return = self._activate_routing() + else: + # Let's overwrite activation_return to force TLS Connection + activation_return = 0x07 + + if activation_return == 0x10: + # Routing successfully activated. + return + elif activation_return == 0x07: + # Routing activation denied because the specified activation + # type requires a secure TLS TCP_DATA socket. + if self.context is None: + raise ValueError("SSLContext 'context' can not be None") + if connected: + s.close() + s = socket.socket(sock_family, socket.SOCK_STREAM) + s.settimeout(5) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + ss = self.context.wrap_socket(s) + addrinfo = socket.getaddrinfo( + self.ip, self.tls_port, proto=socket.IPPROTO_TCP) + ss.connect(addrinfo[0][-1]) + SSLStreamSocket.__init__(self, ss, DoIP) + + if not self.activate_routing: + return + + activation_return = self._activate_routing() + if activation_return == 0x10: + # Routing successfully activated. + return + else: + raise Exception( + "DoIPSocket activate_routing failed with " + "routing_activation_response 0x%x" % activation_return) + + elif activation_return == -1: + raise Exception("DoIPSocket._activate_routing failed") + else: + raise Exception( + "DoIPSocket activate_routing failed with " + "routing_activation_response 0x%x!" % activation_return) + + def _activate_routing(self): # type: (...) -> int resp = self.sr1( - DoIP(payload_type=0x5, activation_type=activation_type, - source_address=source_address, reserved_oem=reserved_oem), + DoIP(payload_type=0x5, activation_type=self.activation_type, + source_address=self.source_address, reserved_oem=self.reserved_oem), verbose=False, timeout=1) if resp and resp.payload_type == 0x6 and \ resp.routing_activation_response == 0x10: - self.target_address = target_address or \ - resp.logical_address_doip_entity + self.target_address = ( + self.target_address or resp.logical_address_doip_entity) log_automotive.info( "Routing activation successful! Target address set to: 0x%x", self.target_address) @@ -345,14 +426,20 @@ def _activate_routing(self, log_automotive.error( "Routing activation failed! Response: %s", repr(resp)) + if resp and resp.payload_type == 0x6: + return resp.routing_activation_response + else: + return -1 + class DoIPSocket6(DoIPSocket): - """ Custom StreamSocket for DoIP communication over IPv6. - This sockets automatically sends a routing activation request as soon as - a TCP connection is established. + """Socket for DoIP communication. This sockets automatically + sends a routing activation request as soon as a TCP or TLS connection is + established. :param ip: IPv6 address of destination :param port: destination port, usually 13400 + :param tls_port: destination port for TLS connection, usually 3496 :param activate_routing: If true, routing activation request is automatically sent :param source_address: DoIP source address @@ -362,6 +449,10 @@ class DoIPSocket6(DoIPSocket): the routing activation request :param reserved_oem: Optional parameter to set value for reserved_oem field of routing activation request + :param force_tls: Skip establishing of a TCP connection and directly try to + connect via SSL/TLS + :param context: Optional ssl.SSLContext object for initialization of ssl socket + connections. Example: >>> socket = DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9") @@ -369,22 +460,38 @@ class DoIPSocket6(DoIPSocket): >>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000]) >>> resp = socket.sr1(pkt, timeout=1) """ # noqa: E501 - def __init__(self, ip='::1', port=13400, activate_routing=True, - source_address=0xe80, target_address=0, - activation_type=0, reserved_oem=b""): - # type: (str, int, bool, int, int, int, bytes) -> None + + def __init__(self, + ip='::1', # type: str + port=13400, # type: int + tls_port=3496, # type: int + activate_routing=True, # type: bool + source_address=0xe80, # type: int + target_address=0, # type: int + activation_type=0, # type: int + reserved_oem=b"", # type: bytes + force_tls=False, # type: bool + context=None # type: Optional[ssl.SSLContext] + ): # type: (...) -> None self.ip = ip self.port = port + self.tls_port = tls_port + self.activate_routing = activate_routing self.source_address = source_address + self.target_address = target_address + self.activation_type = activation_type + self.reserved_oem = reserved_oem self.buffer = b"" - super(DoIPSocket6, self)._init_socket(socket.AF_INET6) - - if activate_routing: - super(DoIPSocket6, self)._activate_routing( - source_address, target_address, activation_type, reserved_oem) + self.force_tls = force_tls + self.context = context + try: + self._init_socket(socket.AF_INET6) + except Exception: + self.close() + raise -class UDS_DoIPSocket(DoIPSocket): +class _UDS_DoIPSocketBase(StreamSocket): """ Application-Layer socket for DoIP endpoints. This socket takes care about the encapsulation of UDS packets into DoIP packets. @@ -394,11 +501,14 @@ class UDS_DoIPSocket(DoIPSocket): >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) >>> resp = socket.sr1(pkt, timeout=1) """ + def send(self, x): # type: (Union[Packet, bytes]) -> int if isinstance(x, UDS): - pkt = DoIP(payload_type=0x8001, source_address=self.source_address, - target_address=self.target_address) / x + pkt = DoIP(payload_type=0x8001, + source_address=self.source_address, # type: ignore + target_address=self.target_address # type: ignore + ) / x else: pkt = x @@ -407,35 +517,38 @@ def send(self, x): except AttributeError: pass - return super(UDS_DoIPSocket, self).send(pkt) + return super().send(pkt) def recv(self, x=MTU, **kwargs): # type: (Optional[int], **Any) -> Optional[Packet] - pkt = super(UDS_DoIPSocket, self).recv(x, **kwargs) + pkt = super().recv(x, **kwargs) if pkt and pkt.payload_type == 0x8001: return pkt.payload else: return pkt -class UDS_DoIPSocket6(DoIPSocket6, UDS_DoIPSocket): +class UDS_DoIPSocket(_UDS_DoIPSocketBase, DoIPSocket): """ Application-Layer socket for DoIP endpoints. This socket takes care about the encapsulation of UDS packets into DoIP packets. Example: - >>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9") + >>> socket = UDS_DoIPSocket("169.254.117.238") >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) >>> resp = socket.sr1(pkt, timeout=1) """ pass -bind_bottom_up(UDP, DoIP, sport=13400) -bind_bottom_up(UDP, DoIP, dport=13400) -bind_layers(UDP, DoIP, sport=13400, dport=13400) - -bind_layers(TCP, DoIP, sport=13400) -bind_layers(TCP, DoIP, dport=13400) +class UDS_DoIPSocket6(_UDS_DoIPSocketBase, DoIPSocket6): + """ + Application-Layer socket for DoIP endpoints. This socket takes care about + the encapsulation of UDS packets into DoIP packets. -bind_layers(DoIP, UDS, payload_type=0x8001) + Example: + >>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9") + >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) + >>> resp = socket.sr1(pkt, timeout=1) + """ + pass diff --git a/test/contrib/automotive/doip.uts b/test/contrib/automotive/doip.uts index 51f8d5e7364..74d095162ad 100644 --- a/test/contrib/automotive/doip.uts +++ b/test/contrib/automotive/doip.uts @@ -381,6 +381,13 @@ assert req.hashret() == resp.hashret() assert resp[3].answers(req[3]) assert not req[3].answers(resp[3]) ++ DoIP Communication tests + += Load libraries +import base64 +import ssl +import tempfile + = Test DoIPSocket ~ automotive_comm @@ -407,8 +414,75 @@ server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) +assert len(pkts) == 2 + + += Test DoIPSocket 2 +~ automotive_comm + +server_up = threading.Event() +def server(): + buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('127.0.0.1', 13400)) + sock.listen(1) + server_up.set() + connection, address = sock.accept() + for i in range(len(buffer)): + connection.send(buffer[i:i+1]) + time.sleep(0.01) + connection.close() + finally: + sock.close() + + +server_thread = threading.Thread(target=server) +server_thread.start() +server_up.wait(timeout=1) +sock = DoIPSocket(activate_routing=False) + +pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) +assert len(pkts) == 2 + += Test DoIPSocket 3 +~ automotive_comm + +server_up = threading.Event() +def server(): + buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('127.0.0.1', 13400)) + sock.listen(1) + server_up.set() + connection, address = sock.accept() + while buffer: + randlen = random.randint(0, len(buffer)) + connection.send(buffer[:randlen]) + buffer = buffer[randlen:] + time.sleep(0.01) + connection.close() + finally: + sock.close() + + +server_thread = threading.Thread(target=server) +server_thread.start() +server_up.wait(timeout=1) +sock = DoIPSocket(activate_routing=False) + +pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) assert len(pkts) == 2 + = Test DoIPSocket6 ~ automotive_comm @@ -435,4 +509,243 @@ server_up.wait(timeout=1) sock = DoIPSocket6(activate_routing=False) pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) +assert len(pkts) == 2 + += Test DoIPSslSocket +~ automotive_comm + +certstring = """ +LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB +QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4 +YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj +N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo +dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5 +QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW +SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz +b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY +QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1 +cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr +SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs +L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx +cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1 +RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww +S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2 +VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU +TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE +aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2 +WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2 +M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr +M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP +Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4 +N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH +WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov +c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH +clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t +dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5 +d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB +WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK +T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C +RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk +SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN +d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv +TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB +aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN +eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13 +RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N +Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo +QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx +aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV +VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC +WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq +VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh +UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m +aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX +N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI +UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk +UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5 +dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4 +aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w +YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO +WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2 +em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts +cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx +akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=""" + +certstring = certstring.replace('\n', '') + +def _load_certificate_chain(context) -> None: + with tempfile.NamedTemporaryFile(delete=False) as fp: + fp.write(base64.b64decode(certstring)) + fp.close() + context.load_cert_chain(fp.name) + + +server_up = threading.Event() +def server(): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + _load_certificate_chain(context) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock = context.wrap_socket(sock) + try: + ssock.bind(('127.0.0.1', 3496)) + ssock.listen(1) + server_up.set() + connection, address = ssock.accept() + connection.send(buffer) + connection.close() + finally: + ssock.close() + + +server_thread = threading.Thread(target=server) +server_thread.start() +server_up.wait(timeout=1) +context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +context.check_hostname = False +context.verify_mode = ssl.CERT_NONE +sock = DoIPSocket(activate_routing=False, force_tls=True, context=context) + +pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) +assert len(pkts) == 2 + += Test DoIPSslSocket6 +~ automotive_comm + +server_up = threading.Event() +def server(): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + _load_certificate_chain(context) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock = context.wrap_socket(sock) + try: + ssock.bind(('::1', 3496)) + ssock.listen(1) + server_up.set() + connection, address = ssock.accept() + connection.send(buffer) + connection.close() + finally: + ssock.close() + + +server_thread = threading.Thread(target=server) +server_thread.start() +server_up.wait(timeout=1) +context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +context.check_hostname = False +context.verify_mode = ssl.CERT_NONE +sock = DoIPSocket6(activate_routing=False, force_tls=True, context=context) + +pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) +assert len(pkts) == 2 + += Test UDS_DoIPSslSocket6 +~ automotive_comm + +server_up = threading.Event() +def server(): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + _load_certificate_chain(context) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock = context.wrap_socket(sock) + try: + ssock.bind(('::1', 3496)) + ssock.listen(1) + server_up.set() + connection, address = ssock.accept() + connection.send(buffer) + connection.close() + finally: + ssock.close() + + +server_thread = threading.Thread(target=server) +server_thread.start() +server_up.wait(timeout=1) +context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +context.check_hostname = False +context.verify_mode = ssl.CERT_NONE +sock = UDS_DoIPSocket6(activate_routing=False, force_tls=True, context=context) + +pkts = sock.sniff(timeout=1, count=2) +server_thread.join(timeout=1) assert len(pkts) == 2 + += Test UDS_DualDoIPSslSocket6 +~ automotive_comm + +server_tcp_up = threading.Event() +server_tls_up = threading.Event() +def server_tls(): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + _load_certificate_chain(context) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + buffer = bytes.fromhex("02fd0006000000090e8011061000000000") + buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock = context.wrap_socket(sock) + try: + ssock.bind(('::1', 3496)) + ssock.listen(1) + server_tls_up.set() + connection, address = ssock.accept() + connection.send(buffer) + connection.close() + finally: + ssock.close() + +def server_tcp(): + buffer = bytes.fromhex("02fd0006000000090e8011060700000000") + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('::1', 13400)) + sock.listen(1) + server_tcp_up.set() + connection, address = sock.accept() + connection.send(buffer) + connection.shutdown() + connection.close() + finally: + sock.close() + + +server_tcp_thread = threading.Thread(target=server_tcp) +server_tcp_thread.start() +server_tcp_up.wait(timeout=1) +server_tls_thread = threading.Thread(target=server_tls) +server_tls_thread.start() +server_tls_up.wait(timeout=1) +context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +context.check_hostname = False +context.verify_mode = ssl.CERT_NONE + + +sock = UDS_DoIPSocket6(ip="::1", context=context) + +pkts = sock.sniff(timeout=1, count=2) +server_tcp_thread.join(timeout=1) +server_tls_thread.join(timeout=1) +assert len(pkts) == 2 \ No newline at end of file