From 177082303b04c5b16e658fb468976e07d7322259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Tue, 11 Apr 2017 16:29:47 +0200 Subject: [PATCH 01/13] fix redirect-port instruction --- dataplicity/m2mmanager.py | 4 ++- dataplicity/portforward.py | 15 ++++++++---- tests/dataplicity/test_portforward.py | 35 +++++++++++++++++++++++++-- tox.ini | 3 ++- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/dataplicity/m2mmanager.py b/dataplicity/m2mmanager.py index 708e866..c847dff 100644 --- a/dataplicity/m2mmanager.py +++ b/dataplicity/m2mmanager.py @@ -153,7 +153,9 @@ def on_instruction(self, sender, data): elif action == 'open-portredirect': device_port = data['device_port'] m2m_port = data['m2m_port'] - self.client.port_forward.redirect_port(device_port, m2m_port) + self.client.port_forward.redirect_port( + device_port=device_port, m2m_port=m2m_port + ) elif action == 'reboot-device': self.reboot() elif action == 'read-file': diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index ede21e8..76332c7 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -255,6 +255,7 @@ def __init__(self, client): self._client = weakref.ref(client) self._services = {} self._ports = {} + self._dynamic_ports = {} self._close_event = threading.Event() @property @@ -317,9 +318,13 @@ def open(self, m2m_port, service=None, port=None): return service.connect(m2m_port) - def redirect_service(self, m2m_port, device_port): - service = Service( - manager=self, name='port-{}'.format(device_port), - port=device_port, host='127.0.0.1' - ) + def redirect_port(self, m2m_port, device_port): + if device_port not in self._dynamic_ports: + service = Service( + manager=self, name='port-{}'.format(device_port), + port=device_port, host='127.0.0.1' + ) + self._dynamic_ports[device_port] = service + service = self._dynamic_ports[device_port] service.connect(m2m_port) + return service diff --git a/tests/dataplicity/test_portforward.py b/tests/dataplicity/test_portforward.py index cfd46eb..ee2b81b 100644 --- a/tests/dataplicity/test_portforward.py +++ b/tests/dataplicity/test_portforward.py @@ -1,6 +1,9 @@ +import weakref + import pytest -from mock import patch +from dataplicity.m2mmanager import M2MManager from dataplicity.portforward import PortForwardManager +from mock import call, patch class FakeClient(object): @@ -32,9 +35,37 @@ def test_open_service_which_doesnt_exist_results_in_noop(manager, route): def test_redirect_service(manager, route): with patch('dataplicity.portforward.Service.connect') as connect: - manager.redirect_service(9999, 22) + manager.redirect_port(9999, 22) + + assert connect.called + +def test_calling_redirect_service_from_m2mmanager_works(): + with patch( + 'dataplicity.portforward.PortForwardManager.redirect_port' + ) as redirect_port: + client = FakeClient() + client.port_forward = PortForwardManager(client) + m2m_manager = M2MManager(client=client, url='ws://localhost/') + m2m_manager.on_instruction( + 'sender', { + 'action': 'open-portredirect', + 'device_port': 22, + 'm2m_port': 1234 + } + ) + assert redirect_port.call_args == call(device_port=22, m2m_port=1234) + + +def test_service_is_not_garbage_collected_after_leaving_redirect_port_fn( + manager +): + with patch('dataplicity.portforward.Service.connect') as connect: + service = manager.redirect_port(1234, 22) assert connect.called + _ref = weakref.ref(service) + del service + assert _ref() is not None def test_can_open_service_by_name(manager): diff --git a/tox.ini b/tox.ini index e2ed27a..13208f9 100644 --- a/tox.ini +++ b/tox.ini @@ -2,10 +2,11 @@ envlist = py{27,35,36} [testenv] +usedevelop = true passenv = CIRCLE_ARTIFACTS setenv = PYTHONPATH={toxinidir}/tests deps = -rrequirements-tests.txt commands = py.test --cov-config {toxinidir}/.coveragerc \ - --cov={envsitepackagesdir}/dataplicity \ + --cov={toxinidir}/dataplicity \ --cov-report html:{env:CIRCLE_ARTIFACTS:reports}/{envname} \ {posargs:tests/} From a405cd09a27dbc04b080fdb0f849d636dc7192c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Thu, 20 Apr 2017 02:21:18 +0200 Subject: [PATCH 02/13] fix for frozing connections, when reusing the port number --- dataplicity/portforward.py | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 76332c7..bb0a491 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -194,6 +194,7 @@ def __init__(self, manager, name, port, host="127.0.0.1"): self.name = name self.port = port self.host = host + self.m2m_port = None self._connect_index = 0 self._connections = {} self._lock = threading.RLock() @@ -229,6 +230,7 @@ def url(self): def connect(self, port_no): """Add a new connection.""" + self.m2m_port = port_no log.debug('new %r connection on port %s', self, port_no) with self._lock: connection_id = self._connect_index = self._connect_index + 1 @@ -246,6 +248,8 @@ def on_connection_complete(self, connection_id): """Called by a connection when it is finished.""" with self._lock: self.remove_connection(connection_id) + self.manager.remove_service( + m2m_port=self.m2m_port, device_port=self.port) class PortForwardManager(object): @@ -319,12 +323,34 @@ def open(self, m2m_port, service=None, port=None): service.connect(m2m_port) def redirect_port(self, m2m_port, device_port): + # we need to store the reference to the Service somewhere so that + # when the Connection starts in thread it wouldn't loose the value + # of service variable. However, we have to remember that there may + # be numerous connections to the same local port. + # for instance, one could be ssh'ed into a machine twice, so we + # shan't confuse these two connections. + # therefore, an easy way is to store these in a dict, so that the + # lookup would be quick + # + # if there are no redirects on this port, create a hash if device_port not in self._dynamic_ports: - service = Service( - manager=self, name='port-{}'.format(device_port), - port=device_port, host='127.0.0.1' - ) - self._dynamic_ports[device_port] = service - service = self._dynamic_ports[device_port] + self._dynamic_ports[device_port] = {} + # create the service which will handle traffic trough Connection + service = Service( + manager=self, name='port-{}'.format(device_port), + port=device_port, host='127.0.0.1' + ) + # store the reference, so that it doesn't deref in Connection thread + self._dynamic_ports[device_port][m2m_port] = service + # connect to M2M service.connect(m2m_port) + # the below line is merely a shortcut for nicer-looking unit-tests. return service + + def remove_service(self, m2m_port, device_port): + # great. The connection was closed, so we need to clean up. However + # this function would also be executed after a forward to port 80 + # (which is noto dynamically redirected) would finish. + if device_port in self._dynamic_ports: + # ok, we know it is a dynamic one, so remove the object + del self._dynamic_ports[device_port][m2m_port] From 3195f86991459137204dd4dedfdc50b0ed9e9bda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Sat, 29 Apr 2017 00:42:06 +0200 Subject: [PATCH 03/13] WIP - direct call to Connection thread on dynamic port redirect --- dataplicity/portforward.py | 42 ++++++++------------------------------ 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index bb0a491..e2d90b1 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -24,10 +24,10 @@ class Connection(threading.Thread): # Max to read at-a-time BUFFER_SIZE = 1024 * 32 - def __init__(self, service, connection_id, channel): + def __init__(self, close_event, connection_id, channel): """Initialize the connection, set up callbacks.""" super(Connection, self).__init__() - self._service = weakref.ref(service) + self._close_event = close_event self.connection_id = connection_id self.channel = channel @@ -39,15 +39,10 @@ def __init__(self, service, connection_id, channel): self.on_channel_close, self.on_channel_control) - @property - def service(self): - """Get the parent service object (weak reference, may return None).""" - return self._service() - @property def close_event(self): """Get a threading.Event object.""" - return self.service.close_event + return self._close_event def run(self): """Main loop, connects to local server, reads data, and writes it to an m2m channel.""" @@ -235,7 +230,7 @@ def connect(self, port_no): with self._lock: connection_id = self._connect_index = self._connect_index + 1 channel = self.m2m.m2m_client.get_channel(port_no) - connection = Connection(self, connection_id, channel) + connection = Connection(self.close_event, connection_id, channel) self._connections[connection_id] = connection connection.start() return connection_id @@ -248,8 +243,6 @@ def on_connection_complete(self, connection_id): """Called by a connection when it is finished.""" with self._lock: self.remove_connection(connection_id) - self.manager.remove_service( - m2m_port=self.m2m_port, device_port=self.port) class PortForwardManager(object): @@ -332,25 +325,8 @@ def redirect_port(self, m2m_port, device_port): # therefore, an easy way is to store these in a dict, so that the # lookup would be quick # - # if there are no redirects on this port, create a hash - if device_port not in self._dynamic_ports: - self._dynamic_ports[device_port] = {} - # create the service which will handle traffic trough Connection - service = Service( - manager=self, name='port-{}'.format(device_port), - port=device_port, host='127.0.0.1' - ) - # store the reference, so that it doesn't deref in Connection thread - self._dynamic_ports[device_port][m2m_port] = service - # connect to M2M - service.connect(m2m_port) - # the below line is merely a shortcut for nicer-looking unit-tests. - return service - - def remove_service(self, m2m_port, device_port): - # great. The connection was closed, so we need to clean up. However - # this function would also be executed after a forward to port 80 - # (which is noto dynamically redirected) would finish. - if device_port in self._dynamic_ports: - # ok, we know it is a dynamic one, so remove the object - del self._dynamic_ports[device_port][m2m_port] + Connection( + close_event=self.close_event, + connection_id=-1, + channel=self.m2m.m2m_client.get_channel(m2m_port) + ).start() From ab5af9c02829e4614f07e5218be0762c244afd28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Mon, 1 May 2017 14:34:02 +0200 Subject: [PATCH 04/13] fix unit tests; backward-compatible implementation of Connection thread --- dataplicity/portforward.py | 46 ++++++++++++++++++++++----- tests/dataplicity/test_portforward.py | 23 ++++++-------- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index e2d90b1..7151612 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -24,12 +24,19 @@ class Connection(threading.Thread): # Max to read at-a-time BUFFER_SIZE = 1024 * 32 - def __init__(self, close_event, connection_id, channel): + def __init__(self, close_event, connection_id, channel, + service=None, host_port=None): """Initialize the connection, set up callbacks.""" super(Connection, self).__init__() self._close_event = close_event self.connection_id = connection_id self.channel = channel + self._service = None + # this is for backward compatibility purposes + if service: + self._service = weakref.ref(service) + # host_port tuple will be used when there will be no service + self._host_port = host_port self._lock = threading.RLock() self.socket = None @@ -39,6 +46,12 @@ def __init__(self, close_event, connection_id, channel): self.on_channel_close, self.on_channel_control) + @property + def service(self): + if self._service: + return self._service() + return None + @property def close_event(self): """Get a threading.Event object.""" @@ -84,8 +97,10 @@ def run(self): finally: log.debug("left recv loop (read %s bytes)", bytes_written) # Tell service we're done with this connection - self.service.on_connection_complete(self.connection_id) - # These close methods are a null operation if the objects are already closed + if self.service: + self.service.on_connection_complete(self.connection_id) + # These close methods are a null operation if the objects are + # already closed self.channel.close() self._shutdown_read() @@ -98,6 +113,12 @@ def _shutdown_read(self): except: pass + @property + def host_port(self): + if self.service: + return self.service.host_port + return self._host_port + def _shutdown_write(self): """Shutdown writing.""" with self._lock: @@ -133,9 +154,10 @@ def _connect(self): # Set the timeout for initial connect, as default is too high _socket.settimeout(5.0) - log.debug('connecting to %s', self.service.url) + if self.service: + log.debug('connecting to %s', self.service.url) try: - _socket.connect(self.service.host_port) + _socket.connect(self.host_port) except socket.timeout: log.error('timed out connecting to server') return False @@ -146,7 +168,9 @@ def _connect(self): log.exception('error connecting') return False else: - log.debug("connected to %s", self.service.url) + if self.service: + log.debug("connected to %s", self.service.url) + _socket.setblocking(0) # set non-blocking self.socket = _socket self._flush_buffer() return True @@ -230,7 +254,12 @@ def connect(self, port_no): with self._lock: connection_id = self._connect_index = self._connect_index + 1 channel = self.m2m.m2m_client.get_channel(port_no) - connection = Connection(self.close_event, connection_id, channel) + connection = Connection( + self.close_event, + connection_id, + channel, + service=self + ) self._connections[connection_id] = connection connection.start() return connection_id @@ -328,5 +357,6 @@ def redirect_port(self, m2m_port, device_port): Connection( close_event=self.close_event, connection_id=-1, - channel=self.m2m.m2m_client.get_channel(m2m_port) + channel=self.m2m.m2m_client.get_channel(m2m_port), + host_port=('127.0.0.1', device_port) ).start() diff --git a/tests/dataplicity/test_portforward.py b/tests/dataplicity/test_portforward.py index ee2b81b..be1e102 100644 --- a/tests/dataplicity/test_portforward.py +++ b/tests/dataplicity/test_portforward.py @@ -6,6 +6,9 @@ from mock import call, patch +_weakref_table = {} + + class FakeClient(object): pass @@ -13,7 +16,9 @@ class FakeClient(object): @pytest.fixture def manager(): client = FakeClient() - return PortForwardManager.init(client=client) + _weakref_table['client'] = client + yield PortForwardManager.init(client=client) + del _weakref_table['client'] @pytest.fixture @@ -34,10 +39,11 @@ def test_open_service_which_doesnt_exist_results_in_noop(manager, route): def test_redirect_service(manager, route): - with patch('dataplicity.portforward.Service.connect') as connect: + manager.client.m2m = M2MManager.init('ws://localhost/') + with patch('dataplicity.portforward.Connection.start') as connection_start: manager.redirect_port(9999, 22) - assert connect.called + assert connection_start.called def test_calling_redirect_service_from_m2mmanager_works(): @@ -57,17 +63,6 @@ def test_calling_redirect_service_from_m2mmanager_works(): assert redirect_port.call_args == call(device_port=22, m2m_port=1234) -def test_service_is_not_garbage_collected_after_leaving_redirect_port_fn( - manager -): - with patch('dataplicity.portforward.Service.connect') as connect: - service = manager.redirect_port(1234, 22) - assert connect.called - _ref = weakref.ref(service) - del service - assert _ref() is not None - - def test_can_open_service_by_name(manager): with patch('dataplicity.portforward.Service.connect') as connect: manager.open(1234, service='web') From a8824bcba604348687d6cdfaaca426853c3bed69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Mon, 1 May 2017 16:14:05 +0200 Subject: [PATCH 05/13] remove unused import --- tests/dataplicity/test_portforward.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/dataplicity/test_portforward.py b/tests/dataplicity/test_portforward.py index be1e102..4381cb6 100644 --- a/tests/dataplicity/test_portforward.py +++ b/tests/dataplicity/test_portforward.py @@ -1,11 +1,8 @@ -import weakref - import pytest from dataplicity.m2mmanager import M2MManager from dataplicity.portforward import PortForwardManager from mock import call, patch - _weakref_table = {} From 0b3a8d92bb73b3d54fca55b871e3770687f0f2b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Tue, 5 Sep 2017 11:51:57 +0200 Subject: [PATCH 06/13] remove unused _dynamic_ports attribute --- dataplicity/portforward.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 7151612..7711683 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -281,7 +281,6 @@ def __init__(self, client): self._client = weakref.ref(client) self._services = {} self._ports = {} - self._dynamic_ports = {} self._close_event = threading.Event() @property From 39ff3428003227e06759e9a47876256ed59edf6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Tue, 5 Sep 2017 11:56:51 +0200 Subject: [PATCH 07/13] update circle.yml to include python 3.6.2 --- circle.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circle.yml b/circle.yml index 696a18f..fce64d7 100644 --- a/circle.yml +++ b/circle.yml @@ -2,7 +2,7 @@ dependencies: override: - pip install -r requirements-tests.txt - pip install tox tox-pyenv - - pyenv local 2.7.11 3.5.2 3.6.0 + - pyenv local 2.7.11 3.5.2 3.6.2 test: override: - tox From 1e01980477b8e33cbdc12a6e85561e473bc99378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Wed, 6 Sep 2017 11:47:26 +0200 Subject: [PATCH 08/13] remove .setnonblocking call (merge from master) --- dataplicity/portforward.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 7711683..97f8eea 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -170,7 +170,6 @@ def _connect(self): else: if self.service: log.debug("connected to %s", self.service.url) - _socket.setblocking(0) # set non-blocking self.socket = _socket self._flush_buffer() return True From 6558e2b3fd4fa3cbd9009f3dd3bc255789a4bc4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Wed, 6 Sep 2017 11:52:08 +0200 Subject: [PATCH 09/13] reverse order for redirect_port call --- dataplicity/m2mmanager.py | 4 +--- tests/dataplicity/test_portforward.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dataplicity/m2mmanager.py b/dataplicity/m2mmanager.py index c847dff..a9c2218 100644 --- a/dataplicity/m2mmanager.py +++ b/dataplicity/m2mmanager.py @@ -153,9 +153,7 @@ def on_instruction(self, sender, data): elif action == 'open-portredirect': device_port = data['device_port'] m2m_port = data['m2m_port'] - self.client.port_forward.redirect_port( - device_port=device_port, m2m_port=m2m_port - ) + self.client.port_forward.redirect_port(m2m_port, device_port) elif action == 'reboot-device': self.reboot() elif action == 'read-file': diff --git a/tests/dataplicity/test_portforward.py b/tests/dataplicity/test_portforward.py index 4381cb6..dea9041 100644 --- a/tests/dataplicity/test_portforward.py +++ b/tests/dataplicity/test_portforward.py @@ -57,7 +57,7 @@ def test_calling_redirect_service_from_m2mmanager_works(): 'm2m_port': 1234 } ) - assert redirect_port.call_args == call(device_port=22, m2m_port=1234) + assert redirect_port.call_args == call(1234, 22) def test_can_open_service_by_name(manager): From 90b21952be1e339cd1838d977c6d7ef5ab18a3d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Wed, 6 Sep 2017 14:58:56 +0200 Subject: [PATCH 10/13] unify Connection thread for static services and dynamic port redirects --- dataplicity/portforward.py | 52 +++++++++++++------------------------- 1 file changed, 18 insertions(+), 34 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 97f8eea..2f96a68 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -25,18 +25,14 @@ class Connection(threading.Thread): BUFFER_SIZE = 1024 * 32 def __init__(self, close_event, connection_id, channel, - service=None, host_port=None): + host_port, connection_complete_callback=None): """Initialize the connection, set up callbacks.""" super(Connection, self).__init__() self._close_event = close_event + self.connection_complete_callback = connection_complete_callback self.connection_id = connection_id self.channel = channel - self._service = None - # this is for backward compatibility purposes - if service: - self._service = weakref.ref(service) - # host_port tuple will be used when there will be no service - self._host_port = host_port + self.host_port = host_port self._lock = threading.RLock() self.socket = None @@ -46,19 +42,16 @@ def __init__(self, close_event, connection_id, channel, self.on_channel_close, self.on_channel_control) - @property - def service(self): - if self._service: - return self._service() - return None - @property def close_event(self): """Get a threading.Event object.""" return self._close_event def run(self): - """Main loop, connects to local server, reads data, and writes it to an m2m channel.""" + """ + Main loop, connects to local server, reads data, and writes it to an + m2m channel. + """ bytes_written = 0 try: # Connect to remote host @@ -69,9 +62,12 @@ def run(self): # Read all the data we can and write it to the channel # TODO: Rework this loop to not use the timeout while not self.close_event.is_set(): - # Block for a period of time until the socket becomes readable, or there is an error + # Block for a period of time until the socket becomes readable, + # or there is an error try: - readable, _, exceptional = select.select([self.socket], [], [self.socket], 5.0) + readable, _, exceptional = select.select( + [self.socket], [], [self.socket], 5.0 + ) except Exception as e: # For paranoia only. log.warning('error %s in select', e) @@ -97,8 +93,8 @@ def run(self): finally: log.debug("left recv loop (read %s bytes)", bytes_written) # Tell service we're done with this connection - if self.service: - self.service.on_connection_complete(self.connection_id) + if self.connection_complete_callback: + self.connection_complete_callback(self.connection_id) # These close methods are a null operation if the objects are # already closed self.channel.close() @@ -113,12 +109,6 @@ def _shutdown_read(self): except: pass - @property - def host_port(self): - if self.service: - return self.service.host_port - return self._host_port - def _shutdown_write(self): """Shutdown writing.""" with self._lock: @@ -154,8 +144,7 @@ def _connect(self): # Set the timeout for initial connect, as default is too high _socket.settimeout(5.0) - if self.service: - log.debug('connecting to %s', self.service.url) + log.debug('connecting to %s', ':'.join(map(str, self.host_port))) try: _socket.connect(self.host_port) except socket.timeout: @@ -168,8 +157,7 @@ def _connect(self): log.exception('error connecting') return False else: - if self.service: - log.debug("connected to %s", self.service.url) + log.debug("connected to %s", ':'.join(map(str, self.host_port))) self.socket = _socket self._flush_buffer() return True @@ -241,11 +229,6 @@ def host_port(self): """A tuple of (host, port) as a convenience for socket.connect.""" return (self.host, self.port) - @property - def url(self): - """URL of server we're connecting to.""" - return "http://{0}:{1}".format(self.host, self.port) - def connect(self, port_no): """Add a new connection.""" self.m2m_port = port_no @@ -257,7 +240,8 @@ def connect(self, port_no): self.close_event, connection_id, channel, - service=self + self.host_port(), + self.on_connection_complete ) self._connections[connection_id] = connection connection.start() From 22dc0ef3c2bcc481630074f67b4b5dc09297e2b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Wed, 6 Sep 2017 15:04:43 +0200 Subject: [PATCH 11/13] self.host_port is a property, not callable --- dataplicity/portforward.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 2f96a68..59171f2 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -240,7 +240,7 @@ def connect(self, port_no): self.close_event, connection_id, channel, - self.host_port(), + self.host_port, self.on_connection_complete ) self._connections[connection_id] = connection From 7a69738a44baad244ee8376ce57ba836e0f94705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Thu, 7 Sep 2017 11:34:56 +0200 Subject: [PATCH 12/13] make the log.debug call way cleaner - thanks Will! --- dataplicity/portforward.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index 59171f2..a99e5a0 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -144,7 +144,7 @@ def _connect(self): # Set the timeout for initial connect, as default is too high _socket.settimeout(5.0) - log.debug('connecting to %s', ':'.join(map(str, self.host_port))) + log.debug('connecting to %s:%d', *self.host_port) try: _socket.connect(self.host_port) except socket.timeout: @@ -157,7 +157,7 @@ def _connect(self): log.exception('error connecting') return False else: - log.debug("connected to %s", ':'.join(map(str, self.host_port))) + log.debug("connected to %s:%d", *self.host_port) self.socket = _socket self._flush_buffer() return True From 4b1ee2ed67123f3973dacab1d46e5b4a8d415a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Miko=C5=82ajczyk?= Date: Thu, 7 Sep 2017 15:03:02 +0200 Subject: [PATCH 13/13] simplify Connection constructor --- dataplicity/portforward.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/dataplicity/portforward.py b/dataplicity/portforward.py index a99e5a0..71ae528 100644 --- a/dataplicity/portforward.py +++ b/dataplicity/portforward.py @@ -24,13 +24,10 @@ class Connection(threading.Thread): # Max to read at-a-time BUFFER_SIZE = 1024 * 32 - def __init__(self, close_event, connection_id, channel, - host_port, connection_complete_callback=None): + def __init__(self, close_event, channel, host_port): """Initialize the connection, set up callbacks.""" super(Connection, self).__init__() self._close_event = close_event - self.connection_complete_callback = connection_complete_callback - self.connection_id = connection_id self.channel = channel self.host_port = host_port @@ -92,9 +89,6 @@ def run(self): break finally: log.debug("left recv loop (read %s bytes)", bytes_written) - # Tell service we're done with this connection - if self.connection_complete_callback: - self.connection_complete_callback(self.connection_id) # These close methods are a null operation if the objects are # already closed self.channel.close() @@ -202,7 +196,6 @@ def __init__(self, manager, name, port, host="127.0.0.1"): self.host = host self.m2m_port = None self._connect_index = 0 - self._connections = {} self._lock = threading.RLock() def __repr__(self): @@ -234,27 +227,13 @@ def connect(self, port_no): self.m2m_port = port_no log.debug('new %r connection on port %s', self, port_no) with self._lock: - connection_id = self._connect_index = self._connect_index + 1 channel = self.m2m.m2m_client.get_channel(port_no) connection = Connection( self.close_event, - connection_id, channel, self.host_port, - self.on_connection_complete ) - self._connections[connection_id] = connection connection.start() - return connection_id - - def remove_connection(self, connection_id): - with self._lock: - self._connections.pop(connection_id, None) - - def on_connection_complete(self, connection_id): - """Called by a connection when it is finished.""" - with self._lock: - self.remove_connection(connection_id) class PortForwardManager(object): @@ -338,7 +317,6 @@ def redirect_port(self, m2m_port, device_port): # Connection( close_event=self.close_event, - connection_id=-1, channel=self.m2m.m2m_client.get_channel(m2m_port), host_port=('127.0.0.1', device_port) ).start()