Skip to content

Commit

Permalink
klaus comments (change ws to normal http resp)
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin on chickenita committed Aug 2, 2023
1 parent 8970c57 commit 145d4cf
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 50 deletions.
5 changes: 3 additions & 2 deletions pypeman/plugins/remoteadmin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

class WSRemoteAdminPlugin(BasePlugin):

def __init__(self, host="127.0.0.1", port=8091):
def __init__(self, host="127.0.0.1", port=8091, url_prefix=""):
super().__init__()
self.app = web.Application()
self.host = host
self.port = port
self.url_prefix = url_prefix
self.runner = web.AppRunner(self.app)

def ready(self):
urls.init_urls(self.app)
urls.init_urls(self.app, prefix=self.url_prefix)

async def start(self):
await self.runner.setup()
Expand Down
24 changes: 13 additions & 11 deletions pypeman/plugins/remoteadmin/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
from pypeman.plugins.remoteadmin import views


def init_urls(app):
def init_urls(app, prefix=""):
"""
Create the pypeman remoteadmin routing
Create the pypeman remoteadmin routing
Args:
app (aiohttp.web.Application): The aiohttp web app where the
url routing have to be added
url routings have to be added
"""
app.add_routes([
web.get('/', views.backport_old_client),
web.get('/channels', views.list_channels),
web.get(r'/channels/{channelname}/start', views.list_channels),
web.get(r'/channels/{channelname}/stop', views.stop_channel),
web.get(r'/channels/{channelname}/messages', views.list_msgs),
web.get(r'/channels/{channelname}/messages/{message_id}/replay', views.replay_msg),
web.get(r'/channels/{channelname}/messages/{message_id}/view', views.view_msg),
web.get(r'/channels/{channelname}/messages/{message_id}/preview', views.preview_msg),
# API :
web.get(prefix + '/channels', views.list_channels),
web.get(prefix + '/channels/{channelname}/start', views.list_channels),
web.get(prefix + '/channels/{channelname}/stop', views.stop_channel),
web.get(prefix + '/channels/{channelname}/messages', views.list_msgs),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/replay', views.replay_msg),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/view', views.view_msg),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/preview', views.preview_msg),
# WEBSOCKETS :
web.get(prefix + '/', views.backport_old_client),
])
92 changes: 55 additions & 37 deletions pypeman/plugins/remoteadmin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def get_channel(name):
"""
return channel by is name.all_channels
returns channel by name
"""
for chan in channels.all_channels:
if chan.name == name:
Expand All @@ -21,11 +21,8 @@ def get_channel(name):

async def list_channels(request, ws=None):
"""
Return a list of available channels.
Returns a list of available channels.
"""
if ws is None:
ws = web.WebSocketResponse()
await ws.prepare(request)

chans = []
for chan in channels.all_channels:
Expand All @@ -35,55 +32,67 @@ async def list_channels(request, ws=None):

chans.append(chan_dict)

resp_message = json.dumps(chans)
await ws.send_str(resp_message)
if ws is not None:
await ws.send_jsonrpcresp(chans)
return ws
return web.json_response(chans)


async def start_channel(request, channelname, ws=None):
"""
Start the specified channel
:params channel: The channel name to start.
:params channelname: The channel name to start.
"""
if ws is None:
ws = web.WebSocketResponse()
await ws.prepare(request)

chan = get_channel(channelname)
await chan.start()

resp_message = json.dumps({
resp_dict = {
'name': chan.name,
'status': channels.BaseChannel.status_id_to_str(chan.status)
})
await ws.send_str(resp_message)
}
if ws is not None:
await ws.send_jsonrpcresp(resp_dict)
return ws
return web.json_response(resp_dict)


async def stop_channel(request, channelname, ws=None):
"""
Stop the specified channel
:params channel: The channel name to stop.
:params channelname: The channel name to stop.
"""
if ws is None:
ws = web.WebSocketResponse()
await ws.prepare(request)

chan = get_channel(channelname)
await chan.stop()

resp_message = json.dumps({
resp_dict = {
'name': chan.name,
'status': channels.BaseChannel.status_id_to_str(chan.status)
})
await ws.send_str(resp_message)
}
if ws is not None:
await ws.send_jsonrpcresp(resp_dict)
return ws
return web.json_response(resp_dict)


async def list_msgs(request, channelname, ws=None):
"""
List first `count` messages from message store of specified channel.
:params channel: The channel name.
:params channelname: The channel name.
:queryparams:
start (int, default=0): the start indexof msgs to list
count (count, default=10): The maximum returned msgs
order_by (str, default="timestamp"): the message attribute to use for sorting
start_dt (str): iso datetime string to use for filter messages
end_dt (str): iso datetime string to use for filter messages
text (str): search this text in messages and return only messages that contains
this text
rtext (str): same as 'text' param but for regex
"""
if ws is None:
ws = web.WebSocketResponse()
Expand All @@ -108,8 +117,11 @@ async def list_msgs(request, channelname, ws=None):
res['timestamp'] = res['message'].timestamp_str()
res['message'] = res['message'].to_json()

resp_message = json.dumps({'messages': messages, 'total': await chan.message_store.total()})
await ws.send_str(resp_message)
resp_dict = {'messages': messages, 'total': await chan.message_store.total()}
if ws is not None:
await ws.send_jsonrpcresp(resp_dict)
return ws
return web.json_response(resp_dict)


async def replay_msg(request, channelname, message_id, ws=None):
Expand All @@ -131,16 +143,18 @@ async def replay_msg(request, channelname, message_id, ws=None):
except Exception as exc:
result.append({'error': str(exc)})

resp_message = json.dumps(result)
await ws.send_str(resp_message)
if ws is not None:
await ws.send_jsonrpcresp(result)
return ws
return web.json_response(result)


async def view_msg(request, channelname, message_id, ws=None):
"""
Permit to get the content of a message
:params channel: The channel name.
:params msg_ids: The message ids list to replay.
:params channelname: The channel name.
:params message_id: The message id to view
"""
if ws is None:
ws = web.WebSocketResponse()
Expand All @@ -154,16 +168,18 @@ async def view_msg(request, channelname, message_id, ws=None):
except Exception as exc:
result.append({'error': str(exc)})

resp_message = json.dumps(result)
await ws.send_str(resp_message)
if ws is not None:
await ws.send_jsonrpcresp(result)
return ws
return web.json_response(result)


async def preview_msg(request, channelname, message_id, ws=None):
"""
Permits to get the 1000 chars of a message payload
Permits to get the first 1000 chars of a message payload
:params channel: The channel name.
:params msg_ids: The message ids list to replay.
:params channelname: The channel name.
:params message_id: The message id to preview
"""
if ws is None:
ws = web.WebSocketResponse()
Expand All @@ -177,8 +193,10 @@ async def preview_msg(request, channelname, message_id, ws=None):
except Exception as exc:
result.append({'error': str(exc)})

resp_message = json.dumps(result)
await ws.send_str(resp_message)
if ws is not None:
await ws.send_jsonrpcresp(result)
return ws
return web.json_response(result)


class RPCWebSocketResponse(web.WebSocketResponse):
Expand All @@ -190,8 +208,8 @@ class RPCWebSocketResponse(web.WebSocketResponse):
def set_rpc_attrs(self, request_data):
self.rpc_data = request_data

async def send_str(self, message):
message = SuccessResponse(json.loads(message), id=self.rpc_data["id"])
async def send_jsonrpcresp(self, message):
message = SuccessResponse(message, id=self.rpc_data["id"])
await super().send_str(str(message))


Expand Down

0 comments on commit 145d4cf

Please sign in to comment.