Skip to content

Commit

Permalink
Add SSL support for DoIP sockets (#4327)
Browse files Browse the repository at this point in the history
* Add SSL functionality to DoIP sockets

* Cleanup DoIP Sockets to not have tons of different objects
  • Loading branch information
polybassa authored May 1, 2024
1 parent 19eeee5 commit 8cea357
Show file tree
Hide file tree
Showing 2 changed files with 491 additions and 65 deletions.
243 changes: 178 additions & 65 deletions scapy/contrib/automotive/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import struct
import socket
import time
import ssl

from scapy.contrib.automotive import log_automotive
from scapy.fields import (
Expand All @@ -27,7 +28,7 @@
XStrField,
)
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.supersocket import StreamSocket
from scapy.supersocket import StreamSocket, SSLStreamSocket
from scapy.layers.inet import TCP, UDP
from scapy.contrib.automotive.uds import UDS
from scapy.data import MTU
Expand All @@ -39,6 +40,7 @@
Optional,
)


# ISO 13400-2 sect 9.2


Expand Down Expand Up @@ -247,8 +249,8 @@ def post_build(self, pkt, pay):
This will set the Field 'payload_length' to the correct value.
"""
if self.payload_length is None:
pkt = pkt[:4] + struct.pack("!I", len(pay) + len(pkt) - 8) + \
pkt[8:]
pkt = pkt[:4] + struct.pack(
"!I", len(pay) + len(pkt) - 8) + pkt[8:]
return pkt + pay

def extract_padding(self, s):
Expand All @@ -259,13 +261,24 @@ def extract_padding(self, s):
return b"", None


class DoIPSocket(StreamSocket):
""" Custom StreamSocket for DoIP communication. This sockets automatically
sends a routing activation request as soon as a TCP connection is
bind_bottom_up(UDP, DoIP, sport=13400)
bind_bottom_up(UDP, DoIP, dport=13400)
bind_layers(UDP, DoIP, sport=13400, dport=13400)

bind_layers(TCP, DoIP, sport=13400)
bind_layers(TCP, DoIP, dport=13400)

bind_layers(DoIP, UDS, payload_type=0x8001)


class DoIPSocket(SSLStreamSocket):
"""Socket for DoIP communication. This sockets automatically
sends a routing activation request as soon as a TCP or TLS connection is
established.
:param ip: IP address of destination
:param port: destination port, usually 13400
:param tls_port: destination port for TLS connection, usually 3496
:param activate_routing: If true, routing activation request is
automatically sent
:param source_address: DoIP source address
Expand All @@ -275,84 +288,158 @@ class DoIPSocket(StreamSocket):
the routing activation request
:param reserved_oem: Optional parameter to set value for reserved_oem field
of routing activation request
:param force_tls: Skip establishing of a TCP connection and directly try to
connect via SSL/TLS
:param context: Optional ssl.SSLContext object for initialization of ssl socket
connections.
Example:
>>> socket = DoIPSocket("169.254.0.131")
>>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
""" # noqa: E501
def __init__(self, ip='127.0.0.1', port=13400, activate_routing=True,
source_address=0xe80, target_address=0,
activation_type=0, reserved_oem=b""):
# type: (str, int, bool, int, int, int, bytes) -> None

def __init__(self,
ip='127.0.0.1', # type: str
port=13400, # type: int
tls_port=3496, # type: int
activate_routing=True, # type: bool
source_address=0xe80, # type: int
target_address=0, # type: int
activation_type=0, # type: int
reserved_oem=b"", # type: bytes
force_tls=False, # type: bool
context=None # type: Optional[ssl.SSLContext]
): # type: (...) -> None
self.ip = ip
self.port = port
self.tls_port = tls_port
self.activate_routing = activate_routing
self.source_address = source_address
self.target_address = target_address
self.activation_type = activation_type
self.reserved_oem = reserved_oem
self.buffer = b""
self._init_socket()

if activate_routing:
self._activate_routing(
source_address, target_address, activation_type, reserved_oem)
self.force_tls = force_tls
self.context = context
try:
self._init_socket(socket.AF_INET)
except Exception:
self.close()
raise

def recv(self, x=MTU, **kwargs):
# type: (Optional[int], **Any) -> Optional[Packet]
if self.buffer:
len_data = self.buffer[:8]
else:
len_data = self.ins.recv(8, socket.MSG_PEEK)
if len(len_data) != 8:
return None
if len(self.buffer) < 8:
self.buffer += self.ins.recv(8)
if len(self.buffer) < 8:
return None
len_data = self.buffer[:8]

len_int = struct.unpack(">I", len_data[4:8])[0]
len_int += 8
self.buffer += self.ins.recv(len_int - len(self.buffer))

if len(self.buffer) != len_int:
self.buffer += self.ins.recv(len_int - len(self.buffer))
if len(self.buffer) < len_int:
return None
pktbuf = self.buffer[:len_int]
self.buffer = self.buffer[len_int:]

pkt = self.basecls(self.buffer, **kwargs) # type: Packet
self.buffer = b""
pkt = self.basecls(pktbuf, **kwargs) # type: Packet
return pkt

def _init_socket(self, sock_family=socket.AF_INET):
# type: (int) -> None
connected = False
s = socket.socket(sock_family, socket.SOCK_STREAM)
s.settimeout(5)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP)
s.connect(addrinfo[0][-1])
StreamSocket.__init__(self, s, DoIP)

def _activate_routing(self,
source_address, # type: int
target_address, # type: int
activation_type, # type: int
reserved_oem=b"" # type: bytes
): # type: (...) -> None

if not self.force_tls:
addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP)
s.connect(addrinfo[0][-1])
connected = True
SSLStreamSocket.__init__(self, s, DoIP)

if not self.activate_routing:
return

activation_return = self._activate_routing()
else:
# Let's overwrite activation_return to force TLS Connection
activation_return = 0x07

if activation_return == 0x10:
# Routing successfully activated.
return
elif activation_return == 0x07:
# Routing activation denied because the specified activation
# type requires a secure TLS TCP_DATA socket.
if self.context is None:
raise ValueError("SSLContext 'context' can not be None")
if connected:
s.close()
s = socket.socket(sock_family, socket.SOCK_STREAM)
s.settimeout(5)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

ss = self.context.wrap_socket(s)
addrinfo = socket.getaddrinfo(
self.ip, self.tls_port, proto=socket.IPPROTO_TCP)
ss.connect(addrinfo[0][-1])
SSLStreamSocket.__init__(self, ss, DoIP)

if not self.activate_routing:
return

activation_return = self._activate_routing()
if activation_return == 0x10:
# Routing successfully activated.
return
else:
raise Exception(
"DoIPSocket activate_routing failed with "
"routing_activation_response 0x%x" % activation_return)

elif activation_return == -1:
raise Exception("DoIPSocket._activate_routing failed")
else:
raise Exception(
"DoIPSocket activate_routing failed with "
"routing_activation_response 0x%x!" % activation_return)

def _activate_routing(self): # type: (...) -> int
resp = self.sr1(
DoIP(payload_type=0x5, activation_type=activation_type,
source_address=source_address, reserved_oem=reserved_oem),
DoIP(payload_type=0x5, activation_type=self.activation_type,
source_address=self.source_address, reserved_oem=self.reserved_oem),
verbose=False, timeout=1)
if resp and resp.payload_type == 0x6 and \
resp.routing_activation_response == 0x10:
self.target_address = target_address or \
resp.logical_address_doip_entity
self.target_address = (
self.target_address or resp.logical_address_doip_entity)
log_automotive.info(
"Routing activation successful! Target address set to: 0x%x",
self.target_address)
else:
log_automotive.error(
"Routing activation failed! Response: %s", repr(resp))

if resp and resp.payload_type == 0x6:
return resp.routing_activation_response
else:
return -1


class DoIPSocket6(DoIPSocket):
""" Custom StreamSocket for DoIP communication over IPv6.
This sockets automatically sends a routing activation request as soon as
a TCP connection is established.
"""Socket for DoIP communication. This sockets automatically
sends a routing activation request as soon as a TCP or TLS connection is
established.
:param ip: IPv6 address of destination
:param port: destination port, usually 13400
:param tls_port: destination port for TLS connection, usually 3496
:param activate_routing: If true, routing activation request is
automatically sent
:param source_address: DoIP source address
Expand All @@ -362,29 +449,49 @@ class DoIPSocket6(DoIPSocket):
the routing activation request
:param reserved_oem: Optional parameter to set value for reserved_oem field
of routing activation request
:param force_tls: Skip establishing of a TCP connection and directly try to
connect via SSL/TLS
:param context: Optional ssl.SSLContext object for initialization of ssl socket
connections.
Example:
>>> socket = DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9")
>>> socket_link_local = DoIPSocket6("fe80::30e8:80ff:fe07:6d43%eth1")
>>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
""" # noqa: E501
def __init__(self, ip='::1', port=13400, activate_routing=True,
source_address=0xe80, target_address=0,
activation_type=0, reserved_oem=b""):
# type: (str, int, bool, int, int, int, bytes) -> None

def __init__(self,
ip='::1', # type: str
port=13400, # type: int
tls_port=3496, # type: int
activate_routing=True, # type: bool
source_address=0xe80, # type: int
target_address=0, # type: int
activation_type=0, # type: int
reserved_oem=b"", # type: bytes
force_tls=False, # type: bool
context=None # type: Optional[ssl.SSLContext]
): # type: (...) -> None
self.ip = ip
self.port = port
self.tls_port = tls_port
self.activate_routing = activate_routing
self.source_address = source_address
self.target_address = target_address
self.activation_type = activation_type
self.reserved_oem = reserved_oem
self.buffer = b""
super(DoIPSocket6, self)._init_socket(socket.AF_INET6)

if activate_routing:
super(DoIPSocket6, self)._activate_routing(
source_address, target_address, activation_type, reserved_oem)
self.force_tls = force_tls
self.context = context
try:
self._init_socket(socket.AF_INET6)
except Exception:
self.close()
raise


class UDS_DoIPSocket(DoIPSocket):
class _UDS_DoIPSocketBase(StreamSocket):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
Expand All @@ -394,11 +501,14 @@ class UDS_DoIPSocket(DoIPSocket):
>>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
"""

def send(self, x):
# type: (Union[Packet, bytes]) -> int
if isinstance(x, UDS):
pkt = DoIP(payload_type=0x8001, source_address=self.source_address,
target_address=self.target_address) / x
pkt = DoIP(payload_type=0x8001,
source_address=self.source_address, # type: ignore
target_address=self.target_address # type: ignore
) / x
else:
pkt = x

Expand All @@ -407,35 +517,38 @@ def send(self, x):
except AttributeError:
pass

return super(UDS_DoIPSocket, self).send(pkt)
return super().send(pkt)

def recv(self, x=MTU, **kwargs):
# type: (Optional[int], **Any) -> Optional[Packet]
pkt = super(UDS_DoIPSocket, self).recv(x, **kwargs)
pkt = super().recv(x, **kwargs)
if pkt and pkt.payload_type == 0x8001:
return pkt.payload
else:
return pkt


class UDS_DoIPSocket6(DoIPSocket6, UDS_DoIPSocket):
class UDS_DoIPSocket(_UDS_DoIPSocketBase, DoIPSocket):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
Example:
>>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9")
>>> socket = UDS_DoIPSocket("169.254.117.238")
>>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
"""
pass


bind_bottom_up(UDP, DoIP, sport=13400)
bind_bottom_up(UDP, DoIP, dport=13400)
bind_layers(UDP, DoIP, sport=13400, dport=13400)

bind_layers(TCP, DoIP, sport=13400)
bind_layers(TCP, DoIP, dport=13400)
class UDS_DoIPSocket6(_UDS_DoIPSocketBase, DoIPSocket6):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
bind_layers(DoIP, UDS, payload_type=0x8001)
Example:
>>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9")
>>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
"""
pass
Loading

0 comments on commit 8cea357

Please sign in to comment.