Skip to content

Commit

Permalink
Merge pull request #29 from wildfoundry/services
Browse files Browse the repository at this point in the history
Services
  • Loading branch information
willmcgugan authored Jul 12, 2017
2 parents a09f440 + 2ac64c7 commit e394607
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dataplicity/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.22"
__version__ = "0.4.23"
91 changes: 91 additions & 0 deletions dataplicity/m2m/commandservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
An M2M Service to run commands.
The stdout is sent a line at a time, until the command is finished,
and the channel is closed.
"""

from __future__ import print_function
from __future__ import unicode_literals

import logging
import os
import select
import subprocess
import threading

from lomond.errors import WebSocketError


log = logging.getLogger('m2m')


class CommandService(threading.Thread):
"""Runs a command and sends the stdout over m2m."""

CHUNK_SIZE = 4096

def __init__(self, channel, command):
self._repr = "CommandService({!r}, {!r})".format(channel, command)
super(CommandService, self).__init__(
args=(channel, command),
target=self.run_service
)
self.start()

def __repr__(self):
return self._repr

def run_service(self, channel, command):
"""Run the thread and log exceptions."""
try:
self._run_service(channel, command)
except Exception:
log.exception("error running %r", self)

def _run_service(self, channel, command):
"""Run command and send stdout over m2m."""
log.debug("%r started", self)
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=True
)
except OSError as error:
log.warning('%r command failed; %s', self, error)
return

bytes_sent = 0
fh = process.stdout.fileno()
try:
while True:
if channel.is_closed:
log.debug("%r channel closed", self)
break
readable, _, _ = select.select([fh], [], [], 0.5)
if readable:
chunk = os.read(fh, self.CHUNK_SIZE)
if not chunk:
log.debug('%r EOF', self)
break
channel.write(chunk)
bytes_sent += len(chunk)
else:
log.debug('%r complete', self)

except WebSocketError as websocket_error:
log.warning('%r websocket error (%s)', self, websocket_error)
except Exception as error:
log.exception('%r error', self)
else:
log.info(
'read %s byte(s) from command "%s"',
bytes_sent,
command
)
finally:
channel.close()
process.terminate()
77 changes: 77 additions & 0 deletions dataplicity/m2m/fileservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
An M2M Service to retrieve files.
File data is sent in chunks.
"""

from __future__ import print_function
from __future__ import unicode_literals

from functools import partial
import logging
import threading

from lomond.errors import WebSocketError


log = logging.getLogger('m2m')


class FileService(threading.Thread):
"""A thread the sends data from a file over m2m."""

# A word on lifetime management of this and similar objects...
# There does not need to be any other references to these objects;
# the thread maintains a reference to a m2m channel in the run
# function, so the entire object will be garbage collected when
# the thread exits.

CHUNK_SIZE = 4096

def __init__(self, channel, path):
self._repr = "FileService({!r}, {!r})".format(channel, path)
super(FileService, self).__init__(
args=(channel, path),
target=self.run_service
)
self.start()

def __repr__(self):
return self._repr

def run_service(self, channel, path):
"""Run the thread and log exceptions."""
try:
self._run_service(channel, path)
except Exception:
log.exception("error running %r", self)

def _run_service(self, channel, path):
"""Send a file over a port."""
log.debug("%r started", self)
bytes_sent = 0
try:
with open(path, 'rb') as read_file:
read = partial(read_file.read, self.CHUNK_SIZE)
for chunk in iter(read, b''):
if channel.is_closed:
log.warning('%r m2m closed prematurely', self)
break
channel.write(chunk)
bytes_sent += len(chunk)
except IOError:
log.debug('unable to read file "%s"', path)
except WebSocketError as websocket_error:
log.warning('websocket error (%s)', websocket_error)
except Exception:
log.exception('error in file service')
else:
log.info(
'read %s byte(s) from "%s"',
bytes_sent,
path
)
finally:
channel.close()
23 changes: 22 additions & 1 deletion dataplicity/m2mmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from . import constants
from .m2m import WSClient, EchoService
from .m2m.fileservice import FileService
from .m2m.commandservice import CommandService
from .m2m.remoteprocess import RemoteProcess


Expand Down Expand Up @@ -153,8 +155,17 @@ def on_instruction(self, sender, data):
m2m_port = data['m2m_port']
self.client.port_forward.redirect_port(device_port, m2m_port)
elif action == 'reboot-device':
log.debug('reboot requested')
self.reboot()
elif action == 'read-file':
self.open_file_service(
data['port'],
data['path']
)
elif action == 'run-command':
self.open_command_service(
data['port'],
data['command']
)
# Unrecognized instructions are ignored

def open_terminal(self, name, port, size=None):
Expand Down Expand Up @@ -182,3 +193,13 @@ def reboot(self):
# Why not subprocess.call? Because it will block this process and prevent graceful exit!
pid = subprocess.Popen(command.split()).pid
log.debug('opened reboot process %s', pid)

def open_file_service(self, port, path):
"""Open a file service, to send a file over a port."""
channel = self.m2m_client.get_channel(port)
FileService(channel, path)

def open_command_service(self, port, command):
"""Open a service that runs a command and sends the stdout over m2m."""
channel = self.m2m_client.get_channel(port)
CommandService(channel, command)

0 comments on commit e394607

Please sign in to comment.