From 014236145360aa4e82810fedf9294c280de4001c Mon Sep 17 00:00:00 2001 From: quentin on chickenita Date: Tue, 1 Aug 2023 13:49:37 +0200 Subject: [PATCH] backport for old remote admin client --- pypeman/plugins/remoteadmin/plugin.py | 2 +- pypeman/plugins/remoteadmin/urls.py | 1 + pypeman/plugins/remoteadmin/views.py | 97 +++++++++++++++++++++------ 3 files changed, 77 insertions(+), 23 deletions(-) diff --git a/pypeman/plugins/remoteadmin/plugin.py b/pypeman/plugins/remoteadmin/plugin.py index 2887900..7afa411 100644 --- a/pypeman/plugins/remoteadmin/plugin.py +++ b/pypeman/plugins/remoteadmin/plugin.py @@ -6,7 +6,7 @@ class WSRemoteAdminPlugin(BasePlugin): - def __init__(self, host="127.0.0.1", port=8000): + def __init__(self, host="127.0.0.1", port=8091): super().__init__() self.app = web.Application() self.host = host diff --git a/pypeman/plugins/remoteadmin/urls.py b/pypeman/plugins/remoteadmin/urls.py index 9cababb..d0dc8e4 100644 --- a/pypeman/plugins/remoteadmin/urls.py +++ b/pypeman/plugins/remoteadmin/urls.py @@ -12,6 +12,7 @@ def init_urls(app): url routing have to be added """ app.add_routes([ + web.get('/', views.backport_old_client), web.get('/channels', views.list_channels), web.get(r'/channels/{channelname}/start', views.list_channels), web.get(r'/channels/{channelname}/stop', views.stop_channel), diff --git a/pypeman/plugins/remoteadmin/views.py b/pypeman/plugins/remoteadmin/views.py index 6a04218..dc9cabd 100644 --- a/pypeman/plugins/remoteadmin/views.py +++ b/pypeman/plugins/remoteadmin/views.py @@ -1,6 +1,7 @@ import json from aiohttp import web +from jsonrpcserver.response import SuccessResponse from pypeman import channels @@ -15,12 +16,13 @@ def get_channel(self, name): return None -async def list_channels(request): +async def list_channels(request, ws=None): """ Return a list of available channels. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if ws is None: + ws = web.WebSocketResponse() + await ws.prepare(request) chans = [] print(channels.all_channels) @@ -36,14 +38,15 @@ async def list_channels(request): await ws.send_str(resp_message) -async def start_channel(request, channelname): +async def start_channel(request, channelname, ws=None): """ Start the specified channel :params channel: The channel name to start. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if not ws: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) await chan.start() @@ -55,14 +58,15 @@ async def start_channel(request, channelname): await ws.send_str(resp_message) -async def stop_channel(request, channelname): +async def stop_channel(request, channelname, ws=None): """ Stop the specified channel :params channel: The channel name to stop. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if not ws: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) await chan.stop() @@ -74,14 +78,15 @@ async def stop_channel(request, channelname): await ws.send_str(resp_message) -async def list_msgs(request, channelname): +async def list_msgs(request, channelname, ws=None): """ List first `count` messages from message store of specified channel. :params channel: The channel name. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if ws is None: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) @@ -106,15 +111,16 @@ async def list_msgs(request, channelname): await ws.send_str(resp_message) -async def replay_msg(request, channelname, message_id): +async def replay_msg(request, channelname, message_id, ws=None): """ Replay messages from message store. :params channel: The channel name. :params msg_ids: The message ids list to replay. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if not ws: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) result = [] @@ -128,15 +134,16 @@ async def replay_msg(request, channelname, message_id): await ws.send_str(resp_message) -async def view_msg(request, channelname, message_id): +async def view_msg(request, channelname, message_id, ws=None): """ Permit to get the content of a message :params channel: The channel name. :params msg_ids: The message ids list to replay. """ - ws = web.WebSocketResponse() - await ws.prepare(request) + if not ws: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) result = [] @@ -150,16 +157,16 @@ async def view_msg(request, channelname, message_id): await ws.send_str(resp_message) -async def preview_msg(request, channelname, message_id): +async def preview_msg(request, channelname, message_id, ws=None): """ Permits to get the 1000 chars of a message payload :params channel: The channel name. :params msg_ids: The message ids list to replay. """ - print(vars(request)) - ws = web.WebSocketResponse() - await ws.prepare(request) + if not ws: + ws = web.WebSocketResponse() + await ws.prepare(request) chan = get_channel(channelname) result = [] @@ -171,3 +178,49 @@ async def preview_msg(request, channelname, message_id): resp_message = json.dumps(result) await ws.send_str(resp_message) + + +class RPCWebSocketResponse(web.WebSocketResponse): + """ + Mocked aiohttp.web.WebSocketResponse to return JSOn RPC responses + Workaround to have a backport compatibility with old json rpc client + """ + + def set_rpc_attrs(self, request_data): + self.rpc_data = request_data + + async def send_str(self, message): + message = SuccessResponse(message, id=self.rpc_data["id"]) + await super().send_str(str(message)) + + +async def backport_old_client(request): + ws = RPCWebSocketResponse() + await ws.prepare(request) + async for msg in ws: + try: + cmd_data = json.loads(msg.data) + print(cmd_data) + except Exception: + await ws.send_str(f"cannot parse ws json data ({msg.data})") + return ws + ws.set_rpc_attrs(cmd_data) + cmd_method = cmd_data.pop("method") + + if cmd_method == "channels": + await list_channels(request, ws=ws) + elif cmd_method == "preview_msg": + channelname = cmd_data[""] + await preview_msg(request, channelname=channelname, ws=ws) + elif cmd_method == "view_msg": + pass + elif cmd_method == "replay_msg": + pass + elif cmd_method == "list_msgs": + pass + elif cmd_method == "start_channel": + pass + elif cmd_method == "stop_channel": + pass + else: + await ws.send_str(f"{cmd_method} is not a valid method")