diff --git a/pypeman/plugins/remoteadmin/plugin.py b/pypeman/plugins/remoteadmin/plugin.py index 7afa411..36bd3a0 100644 --- a/pypeman/plugins/remoteadmin/plugin.py +++ b/pypeman/plugins/remoteadmin/plugin.py @@ -6,15 +6,16 @@ class WSRemoteAdminPlugin(BasePlugin): - def __init__(self, host="127.0.0.1", port=8091): + def __init__(self, host="127.0.0.1", port=8091, url_prefix=""): super().__init__() self.app = web.Application() self.host = host self.port = port + self.url_prefix = url_prefix self.runner = web.AppRunner(self.app) def ready(self): - urls.init_urls(self.app) + urls.init_urls(self.app, prefix=self.url_prefix) async def start(self): await self.runner.setup() diff --git a/pypeman/plugins/remoteadmin/urls.py b/pypeman/plugins/remoteadmin/urls.py index d0dc8e4..06b2881 100644 --- a/pypeman/plugins/remoteadmin/urls.py +++ b/pypeman/plugins/remoteadmin/urls.py @@ -3,21 +3,23 @@ from pypeman.plugins.remoteadmin import views -def init_urls(app): +def init_urls(app, prefix=""): """ - Create the pypeman remoteadmin routing + Create the pypeman remoteadmin routing Args: app (aiohttp.web.Application): The aiohttp web app where the - url routing have to be added + url routings have to be added """ app.add_routes([ - web.get('/', views.backport_old_client), - web.get('/channels', views.list_channels), - web.get(r'/channels/{channelname}/start', views.list_channels), - web.get(r'/channels/{channelname}/stop', views.stop_channel), - web.get(r'/channels/{channelname}/messages', views.list_msgs), - web.get(r'/channels/{channelname}/messages/{message_id}/replay', views.replay_msg), - web.get(r'/channels/{channelname}/messages/{message_id}/view', views.view_msg), - web.get(r'/channels/{channelname}/messages/{message_id}/preview', views.preview_msg), + # API : + web.get(prefix + '/channels', views.list_channels), + web.get(prefix + '/channels/{channelname}/start', views.list_channels), + web.get(prefix + '/channels/{channelname}/stop', views.stop_channel), + web.get(prefix + '/channels/{channelname}/messages', views.list_msgs), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/replay', views.replay_msg), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/view', views.view_msg), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/preview', views.preview_msg), + # WEBSOCKETS : + web.get(prefix + '/', views.backport_old_client), ]) diff --git a/pypeman/plugins/remoteadmin/views.py b/pypeman/plugins/remoteadmin/views.py index 88d27c8..b0b5dbd 100644 --- a/pypeman/plugins/remoteadmin/views.py +++ b/pypeman/plugins/remoteadmin/views.py @@ -11,7 +11,7 @@ def get_channel(name): """ - return channel by is name.all_channels + returns channel by name """ for chan in channels.all_channels: if chan.name == name: @@ -21,11 +21,8 @@ def get_channel(name): async def list_channels(request, ws=None): """ - Return a list of available channels. + Returns a list of available channels. """ - if ws is None: - ws = web.WebSocketResponse() - await ws.prepare(request) chans = [] for chan in channels.all_channels: @@ -35,55 +32,67 @@ async def list_channels(request, ws=None): chans.append(chan_dict) - resp_message = json.dumps(chans) - await ws.send_str(resp_message) + if ws is not None: + await ws.send_jsonrpcresp(chans) + return ws + return web.json_response(chans) async def start_channel(request, channelname, ws=None): """ Start the specified channel - :params channel: The channel name to start. + :params channelname: The channel name to start. """ - if ws is None: - ws = web.WebSocketResponse() - await ws.prepare(request) chan = get_channel(channelname) await chan.start() - resp_message = json.dumps({ + resp_dict = { 'name': chan.name, 'status': channels.BaseChannel.status_id_to_str(chan.status) - }) - await ws.send_str(resp_message) + } + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) async def stop_channel(request, channelname, ws=None): """ Stop the specified channel - :params channel: The channel name to stop. + :params channelname: The channel name to stop. """ - if ws is None: - ws = web.WebSocketResponse() - await ws.prepare(request) chan = get_channel(channelname) await chan.stop() - resp_message = json.dumps({ + resp_dict = { 'name': chan.name, 'status': channels.BaseChannel.status_id_to_str(chan.status) - }) - await ws.send_str(resp_message) + } + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) async def list_msgs(request, channelname, ws=None): """ List first `count` messages from message store of specified channel. - :params channel: The channel name. + :params channelname: The channel name. + + :queryparams: + start (int, default=0): the start indexof msgs to list + count (count, default=10): The maximum returned msgs + order_by (str, default="timestamp"): the message attribute to use for sorting + start_dt (str): iso datetime string to use for filter messages + end_dt (str): iso datetime string to use for filter messages + text (str): search this text in messages and return only messages that contains + this text + rtext (str): same as 'text' param but for regex """ if ws is None: ws = web.WebSocketResponse() @@ -108,8 +117,11 @@ async def list_msgs(request, channelname, ws=None): res['timestamp'] = res['message'].timestamp_str() res['message'] = res['message'].to_json() - resp_message = json.dumps({'messages': messages, 'total': await chan.message_store.total()}) - await ws.send_str(resp_message) + resp_dict = {'messages': messages, 'total': await chan.message_store.total()} + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) async def replay_msg(request, channelname, message_id, ws=None): @@ -131,16 +143,18 @@ async def replay_msg(request, channelname, message_id, ws=None): except Exception as exc: result.append({'error': str(exc)}) - resp_message = json.dumps(result) - await ws.send_str(resp_message) + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) async def view_msg(request, channelname, message_id, ws=None): """ Permit to get the content of a message - :params channel: The channel name. - :params msg_ids: The message ids list to replay. + :params channelname: The channel name. + :params message_id: The message id to view """ if ws is None: ws = web.WebSocketResponse() @@ -154,16 +168,18 @@ async def view_msg(request, channelname, message_id, ws=None): except Exception as exc: result.append({'error': str(exc)}) - resp_message = json.dumps(result) - await ws.send_str(resp_message) + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) async def preview_msg(request, channelname, message_id, ws=None): """ - Permits to get the 1000 chars of a message payload + Permits to get the first 1000 chars of a message payload - :params channel: The channel name. - :params msg_ids: The message ids list to replay. + :params channelname: The channel name. + :params message_id: The message id to preview """ if ws is None: ws = web.WebSocketResponse() @@ -177,8 +193,10 @@ async def preview_msg(request, channelname, message_id, ws=None): except Exception as exc: result.append({'error': str(exc)}) - resp_message = json.dumps(result) - await ws.send_str(resp_message) + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) class RPCWebSocketResponse(web.WebSocketResponse): @@ -190,8 +208,8 @@ class RPCWebSocketResponse(web.WebSocketResponse): def set_rpc_attrs(self, request_data): self.rpc_data = request_data - async def send_str(self, message): - message = SuccessResponse(json.loads(message), id=self.rpc_data["id"]) + async def send_jsonrpcresp(self, message): + message = SuccessResponse(message, id=self.rpc_data["id"]) await super().send_str(str(message))