Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #517 - TCP Support #540

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions ait/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def deprecated_func(*args, **kwargs):

sys.modules["ait"].SERVER_DEFAULT_XSUB_URL = "tcp://*:5559" # type: ignore[attr-defined]
sys.modules["ait"].SERVER_DEFAULT_XPUB_URL = "tcp://*:5560" # type: ignore[attr-defined]

sys.modules["ait"].MIN_PORT = 1024 # type: ignore[attr-defined]
sys.modules["ait"].MAX_PORT = 65535 # type: ignore[attr-defined]
7 changes: 6 additions & 1 deletion ait/core/server/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ait.core.server
from ait.core import log
from .config import ZmqConfig
from .utils import is_valid_address_spec


class Broker(gevent.Greenlet):
Expand Down Expand Up @@ -69,7 +70,11 @@ def _subscribe_all(self):
"""
for stream in self.inbound_streams + self.outbound_streams:
for input_ in stream.inputs:
if not type(input_) is int and input_ is not None:
if (
not type(input_) is int
and input_ is not None
and not is_valid_address_spec(input_)
):
Broker.subscribe(stream, input_)

for plugin in self.plugins:
Expand Down
282 changes: 272 additions & 10 deletions ait/core/server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ def _run(self):
raise (e)


class PortOutputClient(ZMQInputClient):
class UDPOutputClient(ZMQInputClient):
"""
This is the parent class for all outbound streams which publish
to a port. It opens a UDP port to publish to and publishes
to a UDP port. It opens a UDP port to publish to and publishes
outgoing message data to this port.
"""

Expand All @@ -131,20 +131,74 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):
super(PortOutputClient, self).__init__(
super(UDPOutputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
self.out_port = kwargs["output"]
if "output" in kwargs:
output = kwargs["output"]
if type(output) is int:
self.addr_spec = ("localhost", output)
elif utils.is_valid_address_spec(output):
protocol, hostname, port = output.split(":")
if protocol.lower() != "udp":
raise (
ValueError(f"UDPOutputClient: Invalid Specification {output}")
)
self.addr_spec = (hostname, int(port))
else:
raise (ValueError(f"UDPOutputClient: Invalid Specification {output}"))
else:
raise (ValueError("UDPOutputClient: Invalid Specification"))

self.context = zmq_context
# override pub to be udp socket
self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def publish(self, msg):
self.pub.sendto(msg, ("localhost", int(self.out_port)))
self.pub.sendto(msg, self.addr_spec)
log.debug("Published message from {}".format(self))


class PortInputClient(ZMQClient, gs.DatagramServer):
class TCPOutputClient(ZMQInputClient):
"""
This is the parent class for all outbound streams which publish
to a TCP port. It opens a TCP connection to publish to and publishes
outgoing message data to this port.
"""

def __init__(
self,
zmq_context,
zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL,
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):
super(TCPOutputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
if "output" in kwargs:
output = kwargs["output"]
if utils.is_valid_address_spec(output):
protocol, hostname, port = output.split(":")
if protocol.lower() != "tcp":
raise (
ValueError(f"TCPOutputClient: Invalid Specification {output}")
)
self.addr_spec = (hostname, int(port))
else:
raise (ValueError(f"TCPOutputClient: Invalid Specification {output}"))
else:
raise (ValueError("TCPOutputClient: Invalid Specification"))

self.context = zmq_context
self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def publish(self, msg):
self.pub.connect(self.addr_spec)
self.pub.sendall(msg)


class UDPInputServer(ZMQClient, gs.DatagramServer):
"""
This is the parent class for all inbound streams which receive messages
on a port. It opens a UDP port for receiving messages, listens for them,
Expand All @@ -158,15 +212,31 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):
if "input" in kwargs and type(kwargs["input"][0]) is int:
super(PortInputClient, self).__init__(
if "input" in kwargs:
input = kwargs["input"]
if type(input) is int:
host_spec = input
elif utils.is_valid_address_spec(input):
protocol, hostname, port = input.split(":")
if protocol.lower() != "udp":
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))
if hostname in ["127.0.0.1", "localhost"]:
host_spec = port
elif hostname in ["0.0.0.0", "server"]:
host_spec = f"0.0.0.0:{port}"
else:
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))

else:
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))
super(UDPInputServer, self).__init__(
zmq_context,
zmq_proxy_xsub_url,
zmq_proxy_xpub_url,
listener=int(kwargs["input"][0]),
listener=host_spec,
)
else:
raise (ValueError("Input must be port in order to create PortInputClient"))
raise (ValueError("UDPInputServer: Invalid Specification"))

# open sub socket
self.sub = gevent.socket.socket(gevent.socket.AF_INET, gevent.socket.SOCK_DGRAM)
Expand All @@ -175,3 +245,195 @@ def handle(self, packet, address):
# This function provided for gs.DatagramServer class
log.debug("{} received message from port {}".format(self, address))
self.process(packet)


class TCPInputServer(ZMQClient, gs.StreamServer):
"""
This class is similar to UDPInputServer except its TCP instead of UDP.
"""

def __init__(
self,
zmq_context,
zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL,
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
buffer=1024,
**kwargs,
):
self.cur_socket = None
self.buffer = buffer
if "input" in kwargs:
input = kwargs["input"]
if not utils.is_valid_address_spec(input):
raise (ValueError(f"TCPInputServer: Invalid Specification {input}"))
protocol, hostname, port = input.split(":")
if protocol.lower() != "tcp" or hostname not in [
"127.0.0.1",
"localhost",
"server",
"0.0.0.0",
]:
raise (ValueError(f"TCPInputServer: Invalid Specification {input}"))

self.sub = gevent.socket.socket(
gevent.socket.AF_INET, gevent.socket.SOCK_STREAM
)
hostname = (
"127.0.0.1" if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0"
)
super(TCPInputServer, self).__init__(
zmq_context,
zmq_proxy_xsub_url,
zmq_proxy_xpub_url,
listener=(hostname, int(port)),
)
else:
raise (ValueError("TCPInputServer: Invalid Specification"))

def handle(self, socket, address):
self.cur_socket = socket
with socket:
while True:
data = socket.recv(self.buffer)
if not data:
break
log.debug("{} received message from port {}".format(self, address))
self.process(data)
gevent.sleep(0) # pass control back


class TCPInputClient(ZMQClient):
"""
This class creates a TCP input client. Unlike TCPInputServer and UDPInputServer,
this class will proactively initiate a connection with an input source and begin
receiving data from that source. This class does not inherit directly from gevent
servers and thus implements its own housekeeping functions. It also implements a
start function that spawns a process to stay consistent with the behavior of
TCPInputServer and UDPInputServer.

"""

def __init__(
self,
zmq_context,
zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL,
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
connection_reattempts=5,
buffer=1024,
**kwargs,
):
self.connection_reattempts = connection_reattempts
self.buffer = buffer
self.connection_status = -1
self.proc = None
self.protocol = gevent.socket.SOCK_STREAM

if "buffer" in kwargs and type(kwargs["buffer"]) == int:
self.buffer = kwargs["buffer"]

if "input" in kwargs:
input = kwargs["input"]
if not utils.is_valid_address_spec(input):
raise (ValueError(f"TCPInputClient: Invalid Specification {input}"))
protocol, hostname, port = input.split(":")
if protocol.lower() != "tcp":
raise (ValueError(f"TCPInputClient: Invalid Specification {input}"))
super(TCPInputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)

self.sub = gevent.socket.socket(gevent.socket.AF_INET, self.protocol)

self.hostname = hostname
self.port = int(port)
self.address = (hostname, int(port))

else:
raise (ValueError("TCPInputClient: Invalid Specification"))

def __exit__(self, exc_type, exc_val, exc_tb):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def __del__(self):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def __repr__(self):
return "<%s at %s %s>" % (
type(self).__name__,
hex(id(self)),
self._formatinfo(),
)

def __str__(self):
return "<%s %s>" % (type(self).__name__, self._formatinfo())

def start(self):
self.proc = gevent.spawn(self._client)

def _connect(self):
while self.connection_reattempts:
try:
res = self.sub.connect_ex((self.hostname, self.port))
if res == 0:
self.connection_reattempts = 5
return res
else:
self.connection_reattempts -= 1
gevent.sleep(1)
except Exception as e:
log.error(e)
self.connection_reattempts -= 1
gevent.sleep(1)

def _exit(self):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def _client(self):
self.connection_status = self._connect()
if self.connection_status != 0:
log.error(
f"Unable to connect to client: {self.address[0]}:{self.address[1]}"
)
self._exit()
while True:
packet = self.sub.recv(self.buffer)
if not packet:
gevent.sleep(1)
log.info(
f"Trying to reconnect to client: {self.address[0]}:{self.address[1]}"
)
if self._connect() != 0:
log.error(
f"Unable to connect to client: {self.address[0]}:{self.address[1]}"
)
self._exit()
self.process(packet)

def _formatinfo(self):
result = ""
try:
if isinstance(self.address, tuple) and len(self.address) == 2:
result += "address=%s:%s" % self.address
else:
result += "address=%s" % (self.address,)
except Exception as ex:
result += str(ex) or "<error>"
return result
Loading
Loading