Skip to content

Commit

Permalink
add start_id to msgstore and API
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin on chickenita committed Oct 8, 2024
1 parent 1e88bdd commit 710c5c4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 16 deletions.
50 changes: 43 additions & 7 deletions pypeman/msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,18 @@ async def is_txt_in_msg(self, id, text):
"""

async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, end_dt=None,
text=None, rtext=None):
text=None, rtext=None, start_id=None):
"""
Return a list of message with store specific `id` and processed status.
:param start: First element.
:param start: (DEPRECATED: use start_id instead) First element.
:param count: Count of elements since first element.
:param order_by: Message order. Allowed values : ['timestamp', 'status'].
:param start_dt: (optional) Isoformat start date(time) to filter with
:param end_dt: (optional) Isoformat end date(time) to filter with
:param text: (optional) String to search in message content
:param rtext: (optional) String regex to search in message content
:param start_id: (optional): If set, start search from this id (excluded from rslt list)
:return: A list of dict `{'id':<message_id>, 'state': <message_state>, 'message': <message_object>}`.
"""

Expand Down Expand Up @@ -297,8 +298,9 @@ async def is_txt_in_msg(self, id, text):
return text in msg.payload

async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, end_dt=None,
text=None, rtext=None):

text=None, rtext=None, start_id=None):
if start and start_id:
raise ValueError("`start` and `start_id` cannot both be set")
if order_by.startswith('-'):
reverse = True
sort_key = order_by[1:]
Expand Down Expand Up @@ -330,8 +332,30 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
if await self.is_regex_in_msg(val["id"], rtext):
found_values.append(val)
values = found_values
for value in islice(sorted(values, key=lambda x: x[sort_key], reverse=reverse),
start, start + count):

ordered_list = sorted(values, key=lambda x: x[sort_key], reverse=reverse)
if start:
filtered_iterator = islice(
ordered_list,
start, start + count
)
elif start_id:
for idx, msgdict in enumerate(ordered_list):
if msgdict["id"] == start_id:
start_id_idx = idx + 1
break
else:
raise IndexError("Cannot found start_id %r in filtered results", start_id)
filtered_iterator = islice(
ordered_list,
start_id_idx, start_id_idx + count
)
else:
filtered_iterator = islice(
ordered_list,
0, count
)
for value in filtered_iterator:
resp = dict(value)
resp['message'] = Message.from_dict(resp['message'])
result.append(resp)
Expand Down Expand Up @@ -558,7 +582,9 @@ async def is_txt_in_msg(self, id, text):
return text in msg.payload

async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, end_dt=None,
text=None, rtext=None):
text=None, rtext=None, start_id=None):
if start and start_id:
raise ValueError("`start` and `start_id` cannot both be set")
# TODO better performance for slicing by counting file in dirs ?
if order_by.startswith('-'):
reverse = True
Expand All @@ -576,6 +602,7 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
result = []
end = start + count
position = 0
start_id_found = False
for year in await self.sorted_list_directories(
os.path.join(self.base_path), reverse=reverse):
for month in await self.sorted_list_directories(
Expand All @@ -594,6 +621,11 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
os.listdir(os.path.join(
self.base_path, year, month, day)),
reverse=reverse):
if not start_id_found and start_id and msg_id != start_id:
continue
elif not start_id_found and start_id and msg_id == start_id:
start_id_found = True
continue
found = self.msg_re.match(msg_id)
if found:
msg_str_time = found.groupdict()["msg_time"]
Expand All @@ -619,7 +651,11 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
# TODO: add filter here
# TODO: can we transfoer into a generator?
result.append(await self.get(msg_id))
elif position >= end:
break
position += 1
if start_id and not start_id_found:
raise IndexError("Cannot found start_id %r in filtered results", start_id)
return result

async def total(self):
Expand Down
15 changes: 11 additions & 4 deletions pypeman/plugins/remoteadmin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ async def list_msgs(request, ws=None):
:params channelname: The channel name.
:queryparams:
start (int, default=0): the start indexof msgs to list
start (int, default=0): OBSOLETE (use start_id instead) the start indexof msgs to list
start_id (str, default=None): the start id from which search starts (start_d excluded from results)
count (count, default=10): The maximum returned msgs
order_by (str, default="timestamp"): the message attribute to use for sorting
start_dt (str): iso datetime string to use for filter messages
Expand All @@ -103,15 +104,20 @@ async def list_msgs(request, ws=None):

args = request.rel_url.query
start = int(args.get("start", 0))
start_id = args.get("start_id", None)
count = int(args.get("count", 10))
order_by = args.get("order_by", "-timestamp")
start_dt = args.get("start_dt", None)
end_dt = args.get("end_dt", None)
text = args.get("text", None)
rtext = args.get("rtext", None)
messages = await chan.message_store.search(
start=start, count=count, order_by=order_by, start_dt=start_dt, end_dt=end_dt,
text=text, rtext=rtext) or []
try:
messages = await chan.message_store.search(
start=start, count=count, order_by=order_by, start_dt=start_dt, end_dt=end_dt,
text=text, rtext=rtext, start_id=start_id) or []
except Exception:
logger.exception("Cannot search messages")
messages = []

for res in messages:
res["id"] = res["id"]
Expand Down Expand Up @@ -287,6 +293,7 @@ async def backport_old_client(request):
"end_dt": params[5],
"text": params[6],
"rtext": params[7],
"start_id": params[8],
}
query_params = {k: v for k, v in query_params.items() if v is not None}
query_url = request.rel_url.with_query(query_params)
Expand Down
14 changes: 9 additions & 5 deletions pypeman/remoteadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def stop_channel(self, channel):

@method
async def list_msgs(self, channel, start=0, count=10, order_by='timestamp', start_dt=None, end_dt=None,
text=None, rtext=None):
text=None, rtext=None, start_id=None):
"""
List first `count` messages from message store of specified channel.
Expand All @@ -152,7 +152,7 @@ async def list_msgs(self, channel, start=0, count=10, order_by='timestamp', star

messages = await chan.message_store.search(
start=start, count=count, order_by=order_by, start_dt=start_dt, end_dt=end_dt,
text=text, rtext=rtext) or []
text=text, rtext=rtext, start_id=start_id) or []

for res in messages:
timestamp = res['timestamp']
Expand Down Expand Up @@ -293,7 +293,7 @@ def stop(self, channel):
return self.send_command('stop_channel', [channel])

def list_msgs(self, channel, start=0, count=10, order_by='timestamp', start_dt=None, end_dt=None,
text=None, rtext=None):
text=None, rtext=None, start_id=None):
"""
List first 10 messages on specified channel from remote instance.
Expand All @@ -307,7 +307,7 @@ def list_msgs(self, channel, start=0, count=10, order_by='timestamp', start_dt=N
:params order_by: Message order. only 'timestamp' and '-timestamp' handled for now.
:returns: list of message with status.
"""
list_msg_args = [channel, start, count, order_by, start_dt, end_dt, text, rtext]
list_msg_args = [channel, start, count, order_by, start_dt, end_dt, text, rtext, start_id]
result = self.send_command('list_msgs', list_msg_args)

for m in result['messages']:
Expand Down Expand Up @@ -471,6 +471,7 @@ def do_list(self, channel, arg):
end_dt = None
text = None
rtext = None
start_id = None

args_copy = [i for i in args]
# Parsing of naming args
Expand All @@ -488,6 +489,9 @@ def do_list(self, channel, arg):
if arg.startswith("rtext="):
rtext = arg.split("=", 1)[1]
args.remove(arg)
if arg.startswith("start_id="):
start_id = arg.split("=", 1)[1]
args.remove(arg)

# Parsing of common args
if args:
Expand All @@ -498,7 +502,7 @@ def do_list(self, channel, arg):
order_by = args[2]
result = self.client.list_msgs(
channel, start, end, order_by, start_dt=start_dt, end_dt=end_dt,
text=text, rtext=rtext)
text=text, rtext=rtext, start_id=start_id)

if not result['total']:
print('No message yet.')
Expand Down

0 comments on commit 710c5c4

Please sign in to comment.