diff --git a/pypeman/msgstore.py b/pypeman/msgstore.py index 6633d15f..2bdaa9b0 100644 --- a/pypeman/msgstore.py +++ b/pypeman/msgstore.py @@ -6,6 +6,7 @@ from itertools import islice from collections import OrderedDict +from pathlib import Path from pypeman.message import Message @@ -112,6 +113,17 @@ async def total(self): :return: total count of messages """ + async def delete(self, id): + """ + Delete one message in the store corresponding to given `id` with his status. + Useful for tests and maybe in the future to permits cleanup of message store + + !CAUTION! : cannot be undone + + :param id: Message id. Message store dependant. + :return: A dict `{'id':, 'state': , 'message': }`. + """ + class NullMessageStoreFactory(MessageStoreFactory): """ Return an NullMessageStore that do nothing at all. """ @@ -140,6 +152,9 @@ async def search(self, **kwargs): async def total(self): return 0 + async def delete(self, id): + return None + class FakeMessageStoreFactory(MessageStoreFactory): """ Return an Fake message store """ @@ -175,6 +190,13 @@ async def search(self, **kwargs): async def total(self): return 0 + async def delete(self, id): + """ + we delete nothing here, but return what would have been deleted + (for testing) + """ + return {'id': id, 'state': 'processed', 'message': None} + class MemoryMessageStoreFactory(MessageStoreFactory): """ Return a Memory message store. All message are lost at pypeman stop. """ @@ -281,6 +303,11 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e async def total(self): return len(self.messages) + async def delete(self, id): + resp = dict(self.messages.pop(id)) + resp['message'] = Message.from_dict(resp['message']) + return resp + class FileMessageStoreFactory(MessageStoreFactory): """ @@ -484,3 +511,20 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e async def total(self): return self._total + + async def _delete_meta_file(self, id): + meta_fpath = (Path(self.base_path) / str(id)).with_suffix(".meta") + meta_fpath.unlink() + + async def delete(self, id): + fpath = Path(self.base_path) / str(id) + if not fpath.exists(): + raise IndexError + + with fpath.open("rb") as f: + msg = Message.from_json(f.read().decode('utf-8')) + + data_to_return = {'id': id, 'state': await self.get_message_state(id), 'message': msg} + fpath.unlink() + self._delete_meta_file(id=id) + return data_to_return diff --git a/pypeman/plugin_mgr.py b/pypeman/plugin_mgr.py index fab862ea..01f6fa04 100644 --- a/pypeman/plugin_mgr.py +++ b/pypeman/plugin_mgr.py @@ -51,10 +51,10 @@ def init_plugins(self): if self.plugins: for plugin in self.plugins: if plugin.status == plugin.STARTED: - self.plugin.do_stop() + plugin.do_stop() for plugin in self.plugins: if plugin.status == plugin.STOPPED: - self.plugin.do_destroy() + plugin.do_destroy() # instantiate plugins self.plugins = [] diff --git a/pypeman/plugins/base.py b/pypeman/plugins/base.py index 0dd64b45..bb814366 100644 --- a/pypeman/plugins/base.py +++ b/pypeman/plugins/base.py @@ -130,5 +130,5 @@ def do_destroy(self): self.destroy() def destroy(self): - assert self.status in (self.INITIAL, self.STOPPED) + assert self.status in (self.INITIALIZED, self.STOPPED) self.status = self.DESTROYED diff --git a/pypeman/plugins/remoteadmin/__init__.py b/pypeman/plugins/remoteadmin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pypeman/plugins/remoteadmin/plugin.py b/pypeman/plugins/remoteadmin/plugin.py new file mode 100644 index 00000000..75aa7dc7 --- /dev/null +++ b/pypeman/plugins/remoteadmin/plugin.py @@ -0,0 +1,26 @@ +from aiohttp import web + +from pypeman.plugins.base import BasePlugin +from pypeman.plugins.remoteadmin import urls + + +class RemoteAdminPlugin(BasePlugin): + + def __init__(self, host="127.0.0.1", port=8091, url_prefix=""): + super().__init__() + self.app = web.Application() + self.host = host + self.port = port + self.url_prefix = url_prefix + self.runner = web.AppRunner(self.app) + + def ready(self): + urls.init_urls(self.app, prefix=self.url_prefix) + + async def start(self): + await self.runner.setup() + site = web.TCPSite(self.runner, self.host, self.port) + await site.start() + + async def stop(self): + await self.runner.cleanup() diff --git a/pypeman/plugins/remoteadmin/urls.py b/pypeman/plugins/remoteadmin/urls.py new file mode 100644 index 00000000..436cfaad --- /dev/null +++ b/pypeman/plugins/remoteadmin/urls.py @@ -0,0 +1,25 @@ +from aiohttp import web + +from pypeman.plugins.remoteadmin import views + + +def init_urls(app, prefix=""): + """ + Create the pypeman remoteadmin routing + + Args: + app (aiohttp.web.Application): The aiohttp web app where the + url routings have to be added + """ + app.add_routes([ + # API : + web.get(prefix + '/channels', views.list_channels), + web.get(prefix + '/channels/{channelname}/start', views.start_channel), + web.get(prefix + '/channels/{channelname}/stop', views.stop_channel), + web.get(prefix + '/channels/{channelname}/messages', views.list_msgs), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/replay', views.replay_msg), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/view', views.view_msg), + web.get(prefix + '/channels/{channelname}/messages/{message_id}/preview', views.preview_msg), + # WEBSOCKETS : + web.get(prefix + '/', views.backport_old_client), + ]) diff --git a/pypeman/plugins/remoteadmin/views.py b/pypeman/plugins/remoteadmin/views.py new file mode 100644 index 00000000..e11eb2e0 --- /dev/null +++ b/pypeman/plugins/remoteadmin/views.py @@ -0,0 +1,261 @@ +import json +import logging + +from aiohttp import web +from jsonrpcserver.response import SuccessResponse + +from pypeman import channels + +logger = logging.getLogger(__name__) + + +def get_channel(name): + """ + returns channel by name + """ + for chan in channels.all_channels: + if chan.name == name: + return chan + return None + + +async def list_channels(request, ws=None): + """ + Returns a list of available channels. + """ + + chans = [] + for chan in channels.all_channels: + if not chan.parent: + chan_dict = chan.to_dict() + chan_dict['subchannels'] = chan.subchannels() + + chans.append(chan_dict) + + if ws is not None: + await ws.send_jsonrpcresp(chans) + return ws + return web.json_response(chans) + + +async def start_channel(request, ws=None): + """ + Start the specified channel + + :params channelname: The channel name to start. + """ + channelname = request.match_info['channelname'] + chan = get_channel(channelname) + await chan.start() + resp_dict = { + 'name': chan.name, + 'status': channels.BaseChannel.status_id_to_str(chan.status) + } + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) + + +async def stop_channel(request, ws=None): + """ + Stop the specified channel + + :params channelname: The channel name to stop. + """ + channelname = request.match_info['channelname'] + + chan = get_channel(channelname) + await chan.stop() + + resp_dict = { + 'name': chan.name, + 'status': channels.BaseChannel.status_id_to_str(chan.status) + } + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) + + +async def list_msgs(request, ws=None): + """ + List first `count` messages from message store of specified channel. + + :params channelname: The channel name. + + :queryparams: + start (int, default=0): the start indexof msgs to list + count (count, default=10): The maximum returned msgs + order_by (str, default="timestamp"): the message attribute to use for sorting + start_dt (str): iso datetime string to use for filter messages + end_dt (str): iso datetime string to use for filter messages + text (str): search this text in messages and return only messages that contains + this text + rtext (str): same as 'text' param but for regex + """ + + channelname = request.match_info['channelname'] + + chan = get_channel(channelname) + + args = request.rel_url.query + start = int(args.get("start", 0)) + count = int(args.get("count", 10)) + order_by = args.get("order_by", "timestamp") + start_dt = args.get("start_dt", None) + end_dt = args.get("end_dt", None) + text = args.get("text", None) + rtext = args.get("rtext", None) + messages = await chan.message_store.search( + start=start, count=count, order_by=order_by, start_dt=start_dt, end_dt=end_dt, + text=text, rtext=rtext) or [] + + for res in messages: + res['timestamp'] = res['message'].timestamp_str() + res['message'] = res['message'].to_json() + + resp_dict = {'messages': messages, 'total': await chan.message_store.total()} + if ws is not None: + await ws.send_jsonrpcresp(resp_dict) + return ws + return web.json_response(resp_dict) + + +async def replay_msg(request, ws=None): + """ + Replay messages from message store. + + :params channel: The channel name. + :params msg_ids: The message ids list to replay. + """ + channelname = request.match_info['channelname'] + message_id = request.match_info['message_id'] + + chan = get_channel(channelname) + result = [] + try: + msg_res = await chan.replay(message_id) + result.append(msg_res.to_dict()) + except Exception as exc: + result.append({'error': str(exc)}) + + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) + + +async def view_msg(request, ws=None): + """ + Permit to get the content of a message + + :params channelname: The channel name. + :params message_id: The message id to view + """ + + channelname = request.match_info['channelname'] + message_id = request.match_info['message_id'] + + chan = get_channel(channelname) + result = [] + try: + msg_res = await chan.message_store.get_msg_content(message_id) + result.append(msg_res.to_dict()) + except Exception as exc: + result.append({'error': str(exc)}) + + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) + + +async def preview_msg(request, ws=None): + """ + Permits to get the first 1000 chars of a message payload + + :params channelname: The channel name. + :params message_id: The message id to preview + """ + channelname = request.match_info['channelname'] + message_id = request.match_info['message_id'] + + chan = get_channel(channelname) + result = [] + try: + msg_res = await chan.message_store.get_preview_str(message_id) + result.append(msg_res.to_dict()) + except Exception as exc: + result.append({'error': str(exc)}) + + if ws is not None: + await ws.send_jsonrpcresp(result) + return ws + return web.json_response(result) + + +class RPCWebSocketResponse(web.WebSocketResponse): + """ + Mocked aiohttp.web.WebSocketResponse to return JSOn RPC responses + Workaround to have a backport compatibility with old json rpc client + """ + + def set_rpc_attrs(self, request_data): + self.rpc_data = request_data + + async def send_jsonrpcresp(self, message): + message = SuccessResponse(message, id=self.rpc_data["id"]) + await super().send_str(str(message)) + + +async def backport_old_client(request): + ws = RPCWebSocketResponse() + await ws.prepare(request) + async for msg in ws: + try: + cmd_data = json.loads(msg.data) + except Exception: + await ws.send_str(f"cannot parse ws json data ({msg.data})") + return ws + ws.set_rpc_attrs(cmd_data) + cmd_method = cmd_data.pop("method") + params = cmd_data.get("params", [None]) + channelname = params[0] + if channelname: + request.match_info["channelname"] = channelname + + if cmd_method == "channels": + await list_channels(request, ws=ws) + elif cmd_method == "preview_msg": + message_id = params[1] + request.match_info["message_id"] = message_id + await preview_msg(request, ws=ws) + elif cmd_method == "view_msg": + message_id = params[1] + request.match_info["message_id"] = message_id + await view_msg(request=request, ws=ws) + elif cmd_method == "replay_msg": + message_id = params[1] + request.match_info["message_id"] = message_id + await replay_msg(request=request, ws=ws) + elif cmd_method == "list_msgs": + query_params = { + "start": params[1], + "count": params[2], + "order_by": params[3], + "start_dt": params[4], + "end_dt": params[5], + "text": params[6], + "rtext": params[7], + } + query_params = {k: v for k, v in query_params.items() if v is not None} + query_url = request.rel_url.with_query(query_params) + new_req = request.clone(rel_url=query_url) + await list_msgs(request=new_req, ws=ws) + elif cmd_method == "start_channel": + await start_channel(request=request, ws=ws) + elif cmd_method == "stop_channel": + await stop_channel(request=request, ws=ws) + else: + await ws.send_str(f"{cmd_method} is not a valid method") diff --git a/pypeman/plugins/tests/test_remoteadmin.py b/pypeman/plugins/tests/test_remoteadmin.py new file mode 100644 index 00000000..e6c05d00 --- /dev/null +++ b/pypeman/plugins/tests/test_remoteadmin.py @@ -0,0 +1,253 @@ +import asyncio +import json + +from aiohttp import web + +import pytest + +from pypeman import msgstore +from pypeman import channels +from pypeman import nodes +from pypeman.channels import BaseChannel +from pypeman.plugin_mgr import manager +from pypeman.plugins.remoteadmin.plugin import RemoteAdminPlugin +from pypeman.plugins.remoteadmin.urls import init_urls +from pypeman.remoteadmin import RemoteAdminClient +from pypeman.tests.common import generate_msg +from pypeman.tests.common import TstNode + + +@pytest.fixture +def jsonrpcremoteclient(event_loop): + client = RemoteAdminClient(loop=event_loop, url="ws://localhost:%d" % 8091) + client.init() + asyncio.set_event_loop(None) + return client + + +@pytest.fixture +def webremoteclient(event_loop, aiohttp_client): + app = web.Application() + init_urls(app) + return event_loop.run_until_complete(aiohttp_client(app)) + + +@pytest.fixture(autouse=True) +def remoteserver(event_loop): + manager.plugin_classes = [RemoteAdminPlugin] + manager.loop = event_loop + manager.init_plugins() + manager.ready_plugins() + yield manager.start_plugins() + manager.stop_plugins() + + +class RemoteAdminBaseMixin: + + @classmethod + def setup_class(self): + store_factory = msgstore.MemoryMessageStoreFactory() + self.loop = asyncio.new_event_loop() + self.loop.set_debug(True) + self.chan_name = "test_remote050" + self.chan = BaseChannel(name=self.chan_name, loop=self.loop, message_store_factory=store_factory) + + n = TstNode() + n2 = TstNode(name="sub") + n3 = TstNode(name="sub1") + n4 = TstNode(name="sub2") + + self.chan.add(n) + + sub = self.chan.fork(name="subchannel") + sub.append(n2, n3, n4) + + for chan in channels.all_channels: + asyncio.run(chan.start()) + + self.msg = generate_msg(with_context=True) + self.msg2 = generate_msg(timestamp=(1982, 11, 27, 12, 35), message_content="message content2") + self.msg3 = generate_msg(timestamp=(1982, 11, 28, 12, 35), message_content="message_content3") + self.msg4 = generate_msg(timestamp=(1982, 11, 28, 14, 35), message_content="message content4") + + asyncio.run(self.chan.message_store.store(self.msg)) + asyncio.run(self.chan.message_store.store(self.msg2)) + asyncio.run(self.chan.message_store.store(self.msg3)) + asyncio.run(self.chan.message_store.store(self.msg4)) + + @classmethod + def teardown_class(self): + + for chan in channels.all_channels: + asyncio.run(chan.stop()) + channels.reset_pypeman_channels() + nodes.reset_pypeman_nodes() + self.loop.stop() + self.loop.close() + + def setup_method(self, method): + """ + Re-start the channel if the stop/start test fails + """ + if self.chan.status != BaseChannel.WAITING: + asyncio.run(self.chan.start()) + + +class TestRemoteAdminPlugin(RemoteAdminBaseMixin): + + async def test_list_channels(self, webremoteclient): + + # Channel remote listing working + resp = await webremoteclient.get("/channels") + + assert resp.status == 200 + json_resp = json.loads(await resp.text()) + assert len(json_resp) == 1 + assert json_resp[0]["name"] == "test_remote050" + assert json_resp[0]['subchannels'][0]['name'] == 'test_remote050.subchannel' + + async def test_stop_n_start_channel(self, webremoteclient): + # Channel stop working + resp = await webremoteclient.get(f"/channels/{self.chan_name}/stop") + assert resp.status == 200 + + assert self.chan.status == BaseChannel.STOPPED + + # Start channel + resp = await webremoteclient.get(f"/channels/{self.chan_name}/start") + assert resp.status == 200 + + assert self.chan.status == BaseChannel.WAITING + + async def test_search_messages(self, webremoteclient): + params = { + "start": 2, + "count": 5, + "order_by": "-timestamp" + } + resp = await webremoteclient.get(f"/channels/{self.chan_name}/messages", params=params) + + assert resp.status == 200 + json_resp = json.loads(await resp.text()) + assert json_resp['total'] == 4 + assert len(json_resp['messages']) == 2 + assert json_resp['messages'][0]['id'] == self.msg3.uuid + + async def test_search_messages_by_date(self, webremoteclient): + params = { + "start_dt": "1982-11-27", + "end_dt": "1982-11-28T13:00:00", + } + resp = await webremoteclient.get(f"/channels/{self.chan_name}/messages", params=params) + assert resp.status == 200 + json_resp = json.loads(await resp.text()) + assert json_resp['total'] == 4 + assert len(json_resp['messages']) == 2 + assert json_resp['messages'][0]['id'] == self.msg2.uuid + assert json_resp['messages'][1]['id'] == self.msg3.uuid + + async def test_search_messages_with_text_flt(self, webremoteclient): + params = { + "text": "sage_c", + } + resp = await webremoteclient.get(f"/channels/{self.chan_name}/messages", params=params) + assert resp.status == 200 + json_resp = json.loads(await resp.text()) + assert len(json_resp['messages']) == 1 + assert json_resp['messages'][0]['id'] == self.msg3.uuid + + async def test_search_messages_with_rtext_flt(self, webremoteclient): + params = { + "rtext": r"\w+_\w+", + } + resp = await webremoteclient.get(f"/channels/{self.chan_name}/messages", params=params) + assert resp.status == 200 + json_resp = json.loads(await resp.text()) + assert len(json_resp['messages']) == 1 + assert json_resp['messages'][0]['id'] == self.msg3.uuid + + async def test_replay_message(self, webremoteclient): + resp = await webremoteclient.get(f"/channels/{self.chan_name}/messages/{self.msg3.uuid}/replay") + assert resp.status == 200 + + json_resp = json.loads(await resp.text()) + assert len(json_resp) == 1 + assert await self.chan.message_store.total() == 5 + + # Clean + await self.chan.message_store.delete(json_resp[0]["uuid"]) + + async def test_push_message(self, webremoteclient): + pass # actually not implemented + + +# TODO: Next tests (backport url) not working because the jsonrpcremoteclient call loop.run_until_complete but +# the loop is already running in the main thread (channels + remote server are running) +# and asyncio can't have 2 loops in same thread. +# Possible solutions: +# - refactor RemoteAdminClient to have async list/start/stop/replay funcs that can be called +# in a running event_loop. This refactoring could necessit to have a reusable websocket client. +# To meditate +# - ? +# +# Uncomment next class when the problem is resolved + +# class TestRemoteAdminPluginBackportUrl(RemoteAdminBaseMixin): + +# async def test_list_channels(self, jsonrpcremoteclient): +# # Channel remote listing working +# chans = jsonrpcremoteclient.channels() + +# assert len(chans) == 1 +# assert chans[0]["name"] == "test_remote050" +# assert chans[0]['subchannels'][0]['name'] == 'test_remote050.subchannel' + +# async def test_stop_n_start_channel(self, jsonrpcremoteclient): +# # Channel stop working +# jsonrpcremoteclient.stop('test_remote050') + +# assert self.chan.status == BaseChannel.STOPPED + +# # Start channel +# jsonrpcremoteclient.start('test_remote050') + +# assert self.chan.status == BaseChannel.WAITING + +# async def test_search_messages(self, jsonrpcremoteclient): +# resp = jsonrpcremoteclient.list_msgs( +# channel='test_remote050', start=2, count=5, order_by='-timestamp') + +# assert resp['total'] == 4 +# assert len(resp['messages']) == 2 +# assert resp['messages'][0]['id'] == self.msg3.uuid + +# async def test_search_messages_by_date(self, jsonrpcremoteclient): +# resp = jsonrpcremoteclient.list_msgs( +# channel='test_remote050', start_dt="1982-11-27", end_dt="1982-11-28T13:00:00") +# assert resp['total'] == 4 +# assert len(resp['messages']) == 2 +# assert resp['messages'][0]['id'] == self.msg2.uuid +# assert resp['messages'][1]['id'] == self.msg3.uuid + +# async def test_search_messages_with_text_flt(self, jsonrpcremoteclient): +# resp = jsonrpcremoteclient.list_msgs( +# channel='test_remote050', text="sage_c") +# assert len(resp['messages']) == 1 +# assert resp['messages'][0]['id'] == self.msg3.uuid + +# async def test_search_messages_with_rtext_flt(self, jsonrpcremoteclient): +# resp = jsonrpcremoteclient.list_msgs( +# channel='test_remote050', rtext=r"\w+_\w+") +# assert len(resp['messages']) == 1 +# assert resp['messages'][0]['id'] == self.msg3.uuid + +# async def test_replay_message(self, jsonrpcremoteclient): +# resp = jsonrpcremoteclient.replay_msg('test_remote050', [self.msg3.uuid]) +# assert len(resp) == 1 +# assert await self.chan.message_store.total() == 5 + +# # Clean +# await self.chan.message_store.delete(resp[0]["uuid"]) + +# async def test_push_message(self, jsonrpcremoteclient): +# pass # actually not implemented diff --git a/pypeman/remoteadmin.py b/pypeman/remoteadmin.py index f984ee2d..1ec3f465 100644 --- a/pypeman/remoteadmin.py +++ b/pypeman/remoteadmin.py @@ -3,7 +3,6 @@ import functools import json import logging -import os import re import sys @@ -574,7 +573,7 @@ def __init__(self, loop, host, port, ssl): async def start(self): - client_dir = os.path.join(os.path.dirname(os.path.join(__file__)), 'client/dist') + # client_dir = os.path.join(os.path.dirname(os.path.join(__file__)), 'client/dist') app = web.Application() app.router.add_get( '/configs.js', @@ -584,11 +583,11 @@ async def start(self): # redirect to index.html app.router.add_get('/', self.redirect_to_index) - app.router.add_static( - '/', - path=os.path.join(client_dir), - name='static' - ) + # app.router.add_static( + # '/', + # path=os.path.join(client_dir), + # name='static' + # ) await self.loop.create_server( protocol_factory=app.make_handler(), diff --git a/pytest.ini b/pytest.ini index 330bfec0..c2df502e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,4 +4,4 @@ testpaths = pypeman norecursedirs = pypeman/client addopts = --junit-xml=nosetests.xml --continue-on-collection-errors # https://github.com/pytest-dev/pytest-asyncio -asyncio_mode = strict +asyncio_mode = auto diff --git a/requirements.txt b/requirements.txt index eebafa93..0b311c85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,3 @@ jsonrpcclient requests # For jsonrpcclient ipython sqlitedict>=2.1 # Python 3.10 DeprecationWarning: setDaemon() is deprecated - diff --git a/requirements_test.txt b/requirements_test.txt index fb138cfc..f674407a 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -4,4 +4,5 @@ xmltodict aiohttp aiocron pytest-asyncio +pytest-aiohttp pytest-cov