Skip to content

Commit

Permalink
Merge pull request #54 from wildfoundry/tag-list
Browse files Browse the repository at this point in the history
Send meta tag list
  • Loading branch information
stuartkmarsh authored Mar 12, 2019
2 parents 215bf87 + f1760b5 commit b9879e9
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 83 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ virtualenv -qq .build -p Python2.7
source .build/bin/activate
pip -q install pex==1.2.13
echo building ./bin/dataplicity
pex dataplicity==0.4.32 --pre -r requirements.txt -o bin/dataplicity -m dataplicity.app:main
pex dataplicity==0.4.33 --pre -r requirements.txt -o bin/dataplicity -m dataplicity.app:main
deactivate
echo built dataplicity agent v`./bin/dataplicity version`
2 changes: 1 addition & 1 deletion dataplicity/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.32"
__version__ = "0.4.33"
201 changes: 128 additions & 73 deletions dataplicity/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
from .disk_tools import disk_usage
from .m2mmanager import M2MManager
from .portforward import PortForwardManager
from .tags import get_tag_list, TagException
import six

log = logging.getLogger('agent')
log = logging.getLogger("agent")


class Client(object):
Expand All @@ -32,18 +33,19 @@ def __init__(self, rpc_url=None, m2m_url=None, serial=None, auth_token=None):
self._sent_meta = False
self.exit_event = Event()
self._init()
self._tag_list = None

@classmethod
def _read(cls, path):
"""Read contents of a file, strip whitespace."""
with open(path, 'rt') as fh:
with open(path, "rt") as fh:
data = fh.read().strip()
return data

def _init(self):
try:
log.info('dataplicity %s', __version__)
log.info('uname=%s', ' '.join(platform.uname()))
log.info("dataplicity %s", __version__)
log.info("uname=%s", " ".join(platform.uname()))

self.remote = jsonrpc.JSONRPC(self.rpc_url)
self.serial = self.serial or self._read(constants.SERIAL_LOCATION)
Expand All @@ -52,16 +54,16 @@ def _init(self):
self.disk_poll_rate_seconds = 60 * 60
self.next_disk_poll_time = time.time()

log.info('m2m=%s', self.m2m_url)
log.info('api=%s', self.rpc_url)
log.info('serial=%s', self.serial)
log.info('poll=%s', self.poll_rate_seconds)
log.info("m2m=%s", self.m2m_url)
log.info("api=%s", self.rpc_url)
log.info("serial=%s", self.serial)
log.info("poll=%s", self.poll_rate_seconds)

self.m2m = M2MManager.init(self, m2m_url=self.m2m_url)
self.port_forward = PortForwardManager.init(self)

except:
log.exception('failed to initialize client')
log.exception("failed to initialize client")
raise

def run_forever(self):
Expand All @@ -74,17 +76,17 @@ def run_forever(self):
while not self.exit_event.wait(self.poll_rate_seconds):
self.poll()
except SystemExit:
log.debug('exit requested')
log.debug("exit requested")
return
except KeyboardInterrupt:
log.debug('user exit')
log.debug("user exit")
return
finally:
clock_check_thread.running = False
clock_check_thread.join()
log.debug('closing')
log.debug("closing")
self.close()
log.debug('goodbye')
log.debug("goodbye")

def exit(self):
"""Exit the agent."""
Expand All @@ -95,31 +97,78 @@ def disk_poll(self):

if now >= self.next_disk_poll_time:
self.next_disk_poll_time = now + self.disk_poll_rate_seconds
disk_space = disk_usage('/')
disk_space = disk_usage("/")

with self.remote.batch() as batch:
batch.call_with_id(
'authenticate_result',
'device.check_auth',
device_class='tuxtunnel',
"authenticate_result",
"device.check_auth",
device_class="tuxtunnel",
serial=self.serial,
auth_token=self.auth_token
auth_token=self.auth_token,
)
batch.call_with_id(
'set_disk_space_result',
'device.set_disk_space',
"set_disk_space_result",
"device.set_disk_space",
disk_capacity=disk_space.total,
disk_used=disk_space.used
disk_used=disk_space.used,
)

def tag_poll(self):
"""Gets the tag list for get_tag_list() and sends to the server"""
try:
tag_list = get_tag_list()
except TagException:
return

try:
if tag_list != self._tag_list:
with self.remote.batch() as batch:
batch.call_with_id(
"authenticate_result",
"device.check_auth",
device_class="tuxtunnel",
serial=self.serial,
auth_token=self.auth_token,
)
batch.call_with_id(
"set_machine_defined_tags_result",
"device.set_machine_defined_tags",
tag_list=tag_list,
)
batch.get_result("set_machine_defined_tags_result")
except jsonrpc.JSONRPCError as error:
log.error(
'unable to set tag list ("%s"=%s, "%s")',
error.method,
error.code,
error.message,
)
return None
except jsonrpc.ServerUnreachableError as error:
log.debug("set tag list failed: %s", error)
return None
except Exception as error:
log.error("set tag list failed: %s", error)
return None
else:
# Success! Set cached tag list
self._tag_list = tag_list

def poll(self):
"""Called at regular intervals."""
t = time.time()
log.debug('poll t=%.02fs', t)
log.debug("poll t=%.02fs", t)
try:
self.disk_poll()
except Exception as e:
log.error("disk poll failed %s", e)

try:
self.tag_poll()
except Exception as error:
log.error("tag poll failed %s", error)

self.sync()

def close(self):
Expand All @@ -129,9 +178,8 @@ def close(self):
@classmethod
def make_sync_id(cls):
"""Make a random sync ID."""
sync_id = ''.join(
random.choice('abcdefghijklmnopqrstuvwxyz')
for _ in six.moves.xrange(12)
sync_id = "".join(
random.choice("abcdefghijklmnopqrstuvwxyz") for _ in six.moves.xrange(12)
)
return sync_id

Expand All @@ -153,74 +201,73 @@ def _sync(self):
if not self._sent_meta:
with self.remote.batch() as batch:
batch.call_with_id(
'authenticate_result',
'device.check_auth',
device_class='tuxtunnel',
"authenticate_result",
"device.check_auth",
device_class="tuxtunnel",
serial=self.serial,
auth_token=self.auth_token,
sync_id=sync_id
sync_id=sync_id,
)
self._sync_meta(batch)
batch.get_result('authenticate_result')
batch.get_result("authenticate_result")
self._check_meta(batch)

finally:
elapsed = time.time() - start
log.debug('sync complete %0.2fs', elapsed)
log.debug("sync complete %0.2fs", elapsed)

def _sync_meta(self, batch):
"""Sync meta information regarding host device."""
try:
meta = device_meta.get_meta()
log.debug("syncing meta %r", meta)
except:
log.exception('error getting meta')
log.exception("error getting meta")
else:
batch.call_with_id(
'set_agent_version_result',
'device.set_agent_version',
agent_version=meta['agent_version']
"set_agent_version_result",
"device.set_agent_version",
agent_version=meta["agent_version"],
)
batch.call_with_id(
'set_machine_revision_result',
'device.set_machine_revision',
revision_code=meta['machine_revision']
"set_machine_revision_result",
"device.set_machine_revision",
revision_code=meta["machine_revision"],
)
batch.call_with_id(
'set_os_version_result',
'device.set_os_version',
os_version=meta['os_version']
"set_os_version_result",
"device.set_os_version",
os_version=meta["os_version"],
)
batch.call_with_id(
'set_uname_result',
'device.set_uname',
uname=meta['uname']
"set_uname_result", "device.set_uname", uname=meta["uname"]
)
batch.call_with_id(
'set_ip_addresses',
'device.set_ip_addresses',
ip_list=meta['ip_list']
"set_ip_addresses_result",
"device.set_ip_addresses",
ip_list=meta["ip_list"],
)

def _check_meta(self, batch):
"""Check previously sent meta information."""
log.debug('checking meta')
log.debug("checking meta")
if self._sent_meta:
log.debug('meta was previously sent')
log.debug("meta was previously sent")
return
try:
batch.check(
'set_agent_version_result',
'set_machine_revision_result',
'set_os_version_result',
'set_uname_result'
"set_agent_version_result",
"set_machine_revision_result",
"set_os_version_result",
"set_uname_result",
"set_ip_addresses_result",
)
except Exception as e:
log.warning('failed to set device meta (%s)', e)
log.warning("failed to set device meta (%s)", e)
else:
# Success! Don't send again.
self._sent_meta = True
log.debug('sent meta')
log.debug("sent meta")

def set_m2m_identity(self, identity):
"""
Expand All @@ -232,32 +279,40 @@ def set_m2m_identity(self, identity):
return None

try:
log.debug('notifying server (%s) of m2m identity (%s)',
self.remote.url,
identity or '<None>')
log.debug(
"notifying server (%s) of m2m identity (%s)",
self.remote.url,
identity or "<None>",
)
with self.remote.batch() as batch:
batch.call_with_id('authenticate_result',
'device.check_auth',
device_class='tuxtunnel',
serial=self.serial,
auth_token=self.auth_token)
batch.call_with_id('associate_result',
'm2m.associate',
identity=identity or '')
batch.call_with_id(
"authenticate_result",
"device.check_auth",
device_class="tuxtunnel",
serial=self.serial,
auth_token=self.auth_token,
)
batch.call_with_id(
"associate_result", "m2m.associate", identity=identity or ""
)
# These methods may potentially throw JSONRPCErrors
batch.get_result('authenticate_result')
batch.get_result('associate_result')
batch.get_result("authenticate_result")
batch.get_result("associate_result")
except jsonrpc.JSONRPCError as e:
log.error('unable to associate m2m identity ("%s"=%s, "%s")',
e.method, e.code, e.message)
log.error(
'unable to associate m2m identity ("%s"=%s, "%s")',
e.method,
e.code,
e.message,
)
return None
except jsonrpc.ServerUnreachableError as e:
log.debug('set m2m identity failed, %s', e)
log.debug("set m2m identity failed, %s", e)
return None
except Exception as error:
log.error('unable to set m2m identity: %s', error)
log.error("unable to set m2m identity: %s", error)
return None
else:
# If we made it here the server has acknowledged it received the identity
log.debug('server received m2m identity %s', identity)
log.debug("server received m2m identity %s", identity)
return identity
Loading

0 comments on commit b9879e9

Please sign in to comment.