diff --git a/pypeman/message.py b/pypeman/message.py index f527560..dd77b0d 100644 --- a/pypeman/message.py +++ b/pypeman/message.py @@ -88,16 +88,24 @@ def add_context(self, key, msg): payload=copy.deepcopy(msg.payload), ) - def to_dict(self): + def to_dict(self, encode_payload=True): """ - Convert the current message object to a dict. Payload is pickled. + Convert the current message object to a dict. + Payload is pickled and b64 encoded if encode_payload not set to False :return: A dict with an equivalent of message """ result = {} result['timestamp'] = self.timestamp.strftime(DATE_FORMAT) result['uuid'] = self.uuid - result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii') + if encode_payload: + result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii') + else: + try: + result['payload'] = str(self.payload) + except Exception: + default_logger.warning("Cannot convert to string payload %r, pickling it") + result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii') result['meta'] = self.meta result['ctx'] = {} diff --git a/pypeman/msgstore.py b/pypeman/msgstore.py index eb02a3e..edec8cf 100644 --- a/pypeman/msgstore.py +++ b/pypeman/msgstore.py @@ -349,7 +349,7 @@ async def delete(self, id): class FileMessageStoreFactory(MessageStoreFactory): """ Generate a FileMessageStore message store instance. - Store a file in `////` hierachy. + Store a file in `/////` hierachy. """ # TODO add an option to reguraly archive old file or delete them @@ -364,7 +364,7 @@ def get_store(self, store_id): class FileMessageStore(MessageStore): - """ Store a file in `////` hierachy.""" + """ Store a file in `/////` hierachy.""" # TODO file access should be done in another thread. Waiting for file backend. def __init__(self, path, store_id): @@ -374,7 +374,7 @@ def __init__(self, path, store_id): # Match msg file name self.msg_re = re.compile( - r'^(?P[0-9]{8})_(?P[0-9]{2}[0-9]{2})_(?P[0-9abcdef]*)$') + r'^(?P[0-9]{8})_(?P[0-9]{2}[0-9]{2})_(?P[0-9a-zA-Z]*)$') try: # Try to make dirs if necessary @@ -384,6 +384,17 @@ def __init__(self, path, store_id): self._total = 0 + def id2path(self, id): + match = self.msg_re.match(id) + if not match: + raise ValueError(f"Id '{id}' not a correct id") + msg_str_date = match.groupdict()["msg_date"] + year = msg_str_date[:4] + month = msg_str_date[4:6] + day = msg_str_date[6:8] + msg_path = Path(self.base_path) / year / month / day / id + return msg_path + async def start(self): self._total = await self.count_msgs() @@ -393,27 +404,18 @@ async def store(self, msg): # The filename is the file id filename = "{}_{}".format(msg.timestamp.strftime(DATE_FORMAT), msg.uuid) - dirs = os.path.join(str(msg.timestamp.year), - "%02d" % msg.timestamp.month, - "%02d" % msg.timestamp.day) - - try: - # Try to make dirs if necessary - os.makedirs(os.path.join(self.base_path, dirs)) - except FileExistsError: - pass - - file_path = os.path.join(dirs, filename) + msg_path = self.id2path(filename) + msg_path.parent.mkdir(parents=True, exist_ok=True) # Write message to file - with open(os.path.join(self.base_path, file_path), "w") as f: + with msg_path.open("w") as f: f.write(msg.to_json()) - await self.change_message_state(file_path, Message.PENDING) + await self.change_message_state(filename, Message.PENDING) self._total += 1 - return file_path + return filename def _is_json_meta(self, id): """Check if the message meta file is a json. If it's not, @@ -423,7 +425,8 @@ def _is_json_meta(self, id): Args: id (str): The id of the message """ - meta_fpath = (Path(self.base_path) / id).with_suffix(".meta") + msg_fpath = self.id2path(id) + meta_fpath = msg_fpath.with_suffix(".meta") if not meta_fpath.exists(): return False with meta_fpath.open("r") as fin: @@ -439,7 +442,8 @@ def _convert_meta_to_json(self, id): Args: id (str): The id of the message to convert """ - meta_fpath = (Path(self.base_path) / id).with_suffix(".meta") + msg_fpath = self.id2path(id) + meta_fpath = msg_fpath.with_suffix(".meta") with meta_fpath.open("r") as fin: state = fin.read() meta_data = {"state": state} @@ -448,7 +452,8 @@ def _convert_meta_to_json(self, id): return meta_data async def get_message_meta_infos(self, id, meta_info_name=None): - meta_fpath = (Path(self.base_path) / id).with_suffix(".meta") + msg_fpath = self.id2path(id) + meta_fpath = msg_fpath.with_suffix(".meta") if meta_fpath.exists(): is_new_meta_file = self._is_json_meta(id) if not is_new_meta_file: @@ -464,7 +469,8 @@ async def get_message_meta_infos(self, id, meta_info_name=None): return meta_data async def add_message_meta_infos(self, id, meta_info_name, info): - meta_fpath = (Path(self.base_path) / id).with_suffix(".meta") + msg_fpath = self.id2path(id) + meta_fpath = msg_fpath.with_suffix(".meta") if meta_fpath.exists(): is_new_meta_file = self._is_json_meta(id) if not is_new_meta_file: @@ -486,10 +492,11 @@ async def get_message_state(self, id): return await self.get_message_meta_infos(id, "state") async def get(self, id): - if not os.path.exists(os.path.join(self.base_path, id)): + fpath = self.id2path(id) + if not fpath.exists(): raise IndexError - with open(os.path.join(self.base_path, id), "rb") as f: + with fpath.open("rb") as f: msg = Message.from_json(f.read().decode('utf-8')) return { 'id': id, @@ -527,8 +534,8 @@ async def count_msgs(self): for year in await self.sorted_list_directories(os.path.join(self.base_path)): for month in await self.sorted_list_directories(os.path.join(self.base_path, year)): for day in await self.sorted_list_directories(os.path.join(self.base_path, year, month)): - for msg_name in sorted(os.listdir(os.path.join(self.base_path, year, month, day))): - found = self.msg_re.match(msg_name) + for msg_id in sorted(os.listdir(os.path.join(self.base_path, year, month, day))): + found = self.msg_re.match(msg_id) if found: count += 1 return count @@ -583,18 +590,17 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e if end_dt: if msg_date > end_dt.date(): continue - for msg_name in sorted( + for msg_id in sorted( os.listdir(os.path.join( self.base_path, year, month, day)), reverse=reverse): - found = self.msg_re.match(msg_name) + found = self.msg_re.match(msg_id) if found: msg_str_time = found.groupdict()["msg_time"] hour = int(msg_str_time[:2]) minute = int(msg_str_time[2:4]) msg_time = datetime.time(hour=hour, minute=minute, second=0) msg_dt = datetime.datetime.combine(msg_date, msg_time) - mid = os.path.join(year, month, day, msg_name) if start_dt: if msg_dt < start_dt: continue @@ -602,17 +608,17 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e if msg_dt > end_dt: continue if text: - if not await self.is_txt_in_msg(mid, text): + if not await self.is_txt_in_msg(msg_id, text): continue if rtext: - if not await self.is_regex_in_msg(mid, rtext): + if not await self.is_regex_in_msg(msg_id, rtext): continue if start <= position < end: # TODO: need to do processing of payload # before filtering (HL7 / json-str) # TODO: add filter here # TODO: can we transfoer into a generator? - result.append(await self.get(mid)) + result.append(await self.get(msg_id)) position += 1 return result @@ -620,11 +626,12 @@ async def total(self): return self._total async def _delete_meta_file(self, id): - meta_fpath = (Path(self.base_path) / str(id)).with_suffix(".meta") + msg_path = self.id2path(id) + meta_fpath = msg_path.with_suffix(".meta") meta_fpath.unlink() async def delete(self, id): - fpath = Path(self.base_path) / str(id) + fpath = self.id2path(id) if not fpath.exists(): raise IndexError diff --git a/pypeman/plugins/remoteadmin/views.py b/pypeman/plugins/remoteadmin/views.py index 9924c30..d389efa 100644 --- a/pypeman/plugins/remoteadmin/views.py +++ b/pypeman/plugins/remoteadmin/views.py @@ -133,13 +133,18 @@ async def replay_msg(request, ws=None): :params channel: The channel name. :params msg_id: The message id to replay. + + :queryparams: + encode_payload (Bool, default=False): Force pickling and encoding the payload or not """ channelname = request.match_info['channelname'] message_id = request.match_info['message_id'] + args = request.rel_url.query + encode_payload = args.get("encode_payload", False) chan = get_channel(channelname) try: msg_res = await chan.replay(message_id) - result = msg_res.to_dict() + result = msg_res.to_dict(encode_payload=encode_payload) except IndexError: message = f"Cannot replay msg, id {message_id} probably doesn't exists" logger.error(message) @@ -160,16 +165,22 @@ async def view_msg(request, ws=None): :params channelname: The channel name. :params message_id: The message id to view + + :queryparams: + encode_payload (Bool, default=False): Force pickling and encoding the payload or not """ channelname = request.match_info['channelname'] message_id = request.match_info['message_id'] + args = request.rel_url.query + encode_payload = args.get("encode_payload", False) + chan = get_channel(channelname) try: msg_res = await chan.message_store.get_msg_content(message_id) - result = msg_res.to_dict() + result = msg_res.to_dict(encode_payload=encode_payload) except IndexError: message = f"Cannot view msg, id {message_id} probably doesn't exists" logger.error(message) @@ -190,14 +201,20 @@ async def preview_msg(request, ws=None): :params channelname: The channel name. :params message_id: The message id to preview + + :queryparams: + encode_payload (Bool, default=False): Force pickling and encoding the payload or not """ channelname = request.match_info['channelname'] message_id = request.match_info['message_id'] + args = request.rel_url.query + encode_payload = args.get("encode_payload", False) + chan = get_channel(channelname) try: msg_res = await chan.message_store.get_preview_str(message_id) - result = msg_res.to_dict() + result = msg_res.to_dict(encode_payload=encode_payload) except IndexError: message = f"Cannot preview msg, id {message_id} probably doesn't exists" logger.error(message) @@ -248,11 +265,15 @@ async def backport_old_client(request): elif cmd_method == "preview_msg": message_id = params[1] request.match_info["message_id"] = message_id - await preview_msg(request, ws=ws) + query_url = request.rel_url.with_query({"encode_payload": "True"}) + new_req = request.clone(rel_url=query_url) + await preview_msg(request=new_req, ws=ws) elif cmd_method == "view_msg": message_id = params[1] request.match_info["message_id"] = message_id - await view_msg(request=request, ws=ws) + query_url = request.rel_url.with_query({"encode_payload": "True"}) + new_req = request.clone(rel_url=query_url) + await view_msg(request=new_req, ws=ws) elif cmd_method == "replay_msg": message_id = params[1] request.match_info["message_id"] = message_id diff --git a/pypeman/tests/test_msgstore.py b/pypeman/tests/test_msgstore.py index d7dce75..18392d6 100644 --- a/pypeman/tests/test_msgstore.py +++ b/pypeman/tests/test_msgstore.py @@ -288,12 +288,12 @@ def test_file_message_store(self): # Test processed message dict_msg = self.loop.run_until_complete( - chan.message_store.get('1982/11/28/19821128_1235_%s' % msg3.uuid)) + chan.message_store.get('19821128_1235_%s' % msg3.uuid)) self.assertEqual(dict_msg['state'], 'processed', "Message %s should be in processed state!" % msg3) # Test failed message dict_msg = self.loop.run_until_complete( - chan.message_store.get('1982/11/12/19821112_1435_%s' % msg5.uuid)) + chan.message_store.get('19821112_1435_%s' % msg5.uuid)) self.assertEqual(dict_msg['state'], 'error', "Message %s should be in error state!" % msg5) self.assertTrue(os.path.exists("%s/%s/1982/11/28/19821128_1235_%s" @@ -320,12 +320,12 @@ def test_file_message_store(self): # Test view message msg_content = self.loop.run_until_complete(chan.message_store.get_msg_content( - '1982/11/12/19821112_1435_%s' % msg5.uuid)) + '19821112_1435_%s' % msg5.uuid)) self.assertEqual(msg_content.payload, msg5.payload, "Failure of message %s view!" % msg5) # Test preview message msg_content = self.loop.run_until_complete(chan.message_store.get_preview_str( - '1982/11/12/19821112_1435_%s' % msg5.uuid)) + '19821112_1435_%s' % msg5.uuid)) self.assertEqual(msg_content.payload, msg5.payload[:1000], "Failure of message %s preview!" % msg5) self.clean_loop() @@ -340,13 +340,20 @@ def test_file_message_store_meta(self): new_meta_tst_path = data_tst_path / "new_meta.meta" with new_meta_tst_path.open("r") as fin: new_meta_data = json.load(fin) - msg_id = "msgid" + msg_uid = "msgid" + msg_year = "2024" + msg_month = "06" + msg_day = "13" + msg_date = f"{msg_year}{msg_month}{msg_day}" + msg_time = "0000" + msg_id = f"{msg_date}_{msg_time}_{msg_uid}" with tempfile.TemporaryDirectory() as tempdir: - print(tempdir) store_factory = msgstore.FileMessageStoreFactory(path=tempdir) store = store_factory.get_store(store_id="") - meta_dst_path = Path(tempdir) / f"{msg_id}.meta" + meta_dst_folder_path = Path(tempdir) / msg_year / msg_month / msg_day + meta_dst_folder_path.mkdir(parents=True, exist_ok=True) + meta_dst_path = meta_dst_folder_path / f"{msg_id}.meta" shutil.copy(old_meta_tst_path, meta_dst_path) # Tests that msgstore could read an old meta file msg_state = asyncio.run(store.get_message_state(msg_id))