Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dvl/klaus/webadm readable msg #85

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pypeman/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from pypeman.helpers.sleeper import Sleeper


from pypeman.helpers.serializers import JsonableEncoder

logger = logging.getLogger(__name__)

# List all channel registered
Expand Down Expand Up @@ -469,6 +471,27 @@ def graph_dot(self, end=''):
for end, sub in after:
sub.graph_dot(end=end)

def jsonable_msg_info_for_admin(self, msg):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not a Message method ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each channel handles different kinds of messages.
I imagine the usecase where a channel wants to have a different representation of a message.

Let's say an MLLP channel would prefer to have perhaps a partially preparsed representation for the web interface,
A file watcher channel might not display the contents of the file, but more stats about the file.

An alternative would of course be to put this method into the channel and let certain endpoints / channels subclass the Message object. (Though this might imply more data copies)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fair.

New proposal : you add the method to the Message class but you allow to redefine default payload_encoder with an argument. Then you add a method to the channel that call the Message method with the appropriate encoder. Later (i.e. in another PR) we can add a way to custom encoder by a channel argument.

What do you think ?

I think you can use current .to_dict() method and add the correct argument to achieve that ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thinking, more details :

  • You can create two messages serializer (at least) in the serializer file : b64picklerEncoder and JsonableEncoder,
  • Add payload_encoder option to Message.to_json() method with b64picklerEncoder as default encoder (to keep compatibility) and propagate the option to .to_dict() method,
  • Add a message_payload_encoder property to channel with b64 default encoder with default value to b64encoder,
  • Use this encoder in the .list_message() while calling .to_json() to get your formatted message.

Do I miss something ?

Copy link
Collaborator

@feenes feenes Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the param for message_payload_encoder be
a class, an instance (which must have the method encode or just a function that encodes).

Will make a first start with an instance, but I am open.

and the more I think the less I wonder why not just passing a function, except we want to group the encode and decode function in the same office.

In that case we should perhaps call the param
payload_codec.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. pushed only to local gitlab server and not to github

Just check klausfmh@297b219

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact best to recheck all the deltas of this MR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to continue review :-)
I think I handled all the FB. pls correct me if wrong

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's what i think for message but part :

  • Add a message_payload_encoder property to channel with b64 default encoder with default value to b64encoder,
  • Use this encoder in the .list_message() while calling .to_json() to get your formatted message.

is not yet done ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think jsonable_msg_info_for_admin channel method should not exists anymore at the end.

jrmi marked this conversation as resolved.
Show resolved Hide resolved
""" creates jsonable info representing the message for admin interfaces.
(Perhaps there's a better method name?)
"""

encoder = JsonableEncoder()

result = {}
result['timestamp'] = msg.timestamp.strftime(message.DATE_FORMAT)
result['uuid'] = msg.uuid
result['payload'] = encoder.encode(msg.payload)
result['meta'] = msg.meta
result['ctx'] = {}

for k, ctx_msg in msg.ctx.items():
result['ctx'][k] = {}
result['ctx'][k]['payload'] = encoder.encode(ctx_msg['payload'])
result['ctx'][k]['meta'] = dict(ctx_msg['meta'])

return result

def __str__(self):
return "<chan: %s>" % self.name

Expand Down
100 changes: 100 additions & 0 deletions pypeman/helpers/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
serializers, that might be helpful for pypeman projects
"""

import json
import logging
import pickle

from base64 import b64decode
from base64 import b64encode


logger = logging.getLogger(__name__)


# attempt to create serializers allowing to encode objects for non
# Python applications.
class JsonableEncoder:
""" Initial simple encoder/decoder for objects.
returns: a dict with type and value
type: 'asis' or 'b64', 'repr'
value:
- the object itself if it can be json encoded (basetype or
struct of base types)
- a b64 encoded string if obj is of type bytes and not ASCII
(- a slightly transformed object, that can be json encoded
(To be implemented))
- a repr string of the object (which can normally not be decoded)
"""
def encode(self, obj):
logger.debug("try to encode type %s (%r)", type(obj), obj)
result = {
'type': 'asis',
'value': obj,
'repr': '',
}
if isinstance(obj, bytes):
try:
result['type'] = "bytes"
result['value'] = obj.decode('ascii') # OR UTF-8?
return result
except UnicodeDecodeError:
pass
try:
result['type'] = "utf8bytes"
# TODO could merge asis and utf8
result['value'] = obj.decode('utf-8')
return result
except UnicodeDecodeError:
result['type'] = "b64"
result['value'] = b64encode(obj).decode()
result['repr'] = obj.decode('utf-8', 'ignore')
return result
try:
# logger.info("try to dump %s", repr(obj))
json_str = json.dumps(obj) # noqa this line will except if not jsonable
# logger.info("can be dumped as json %s", json_str)
return result
except TypeError:
pass
result['type'] = "repr"
# TODO: might add a base64 pickle for the repr case
result['value'] = None
result['repr'] = repr(obj)
return result

def decode(self, encoded_obj):
enc_type = encoded_obj['type']
data = encoded_obj['value']
if enc_type == 'asis':
return data
elif enc_type in ('bytes', 'utf8bytes'):
return data.encode('utf-8')
elif enc_type == 'b64':
return b64decode(data.encode('ascii'))


class B64PickleEncoder:
""" Initial simple encoder for objects.
objects are encoded, such, that they can be stored / transfered as
an ASCII string.

drawback: the encoded object can only be decoded if the
application handles pickle. This limits this decoding mostly
to python3 applications.
"""

def encode(self, obj):
""" the encoding function
:param obj: the object to be encoded
returns: a b64 encoded string of the pickled bytes of the passed
object
"""
return b64encode(pickle.dumps(obj)).decode('ascii')

def decode(self, encoded_obj):
""" the decoding function
:param encoded_obj: the encoded_object to be decoded
"""
return pickle.loads(b64decode(encoded_obj.encode('ascii')))
56 changes: 43 additions & 13 deletions pypeman/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import uuid
import copy
from uuid import UUID
import pickle
import json
import base64
import logging

from pypeman.helpers.serializers import B64PickleEncoder

default_logger = logging.getLogger(__name__)

DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
DEFAULT_ENCODER_CLS = B64PickleEncoder



class Message():
Expand Down Expand Up @@ -88,51 +90,79 @@ def add_context(self, key, msg):
payload=copy.deepcopy(msg.payload),
)

def to_dict(self):
def to_dict(self, payload_encoder=None):
"""
Convert the current message object to a dict. Payload is pickled.
Convert the current message object to a dict, that can be converted
to a json.
Therefore the payload and all the context is pickled and base64 encoded.

Warning: the returned dict cannot be converted to json if meta contains
jrmi marked this conversation as resolved.
Show resolved Hide resolved
objects, that cannot be dumped as json.

:param encoder: Encoder object that will be used to encode the payload
if set to None B64PickleEncoder() will be used, which just pickles an object
and encodes with base64.

:return: A dict with an equivalent of message
"""

payload_encoder = (payload_encoder if payload_encoder
else DEFAULT_ENCODER_CLS())

encode = payload_encoder.encode
result = {}
result['timestamp'] = self.timestamp.strftime(DATE_FORMAT)
result['uuid'] = self.uuid
result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii')
result['payload'] = encode(self.payload)
result['meta'] = self.meta
result['ctx'] = {}

for k, ctx_msg in self.ctx.items():
result['ctx'][k] = {}
result['ctx'][k]['payload'] = base64.b64encode(pickle.dumps(ctx_msg['payload'])).decode('ascii')
result['ctx'][k]['payload'] = encode(ctx_msg['payload'])
result['ctx'][k]['meta'] = dict(ctx_msg['meta'])

return result

def to_json(self):
def to_json(self, payload_encoder=None):
"""
Create json string for current message.

:param encoder: Encoder object that will be used to encode the payload.
more info at documentation of to_dict()

:return: a json string equivalent for message.
"""
return json.dumps(self.to_dict())
return json.dumps(self.to_dict(payload_encoder=payload_encoder))

@staticmethod
def from_dict(data):
def from_dict(data, payload_encoder=None):
"""
Convert the input dict previously converted with `.as_dict()` method in Message object.
Converts an input dict previously converted with `.to_dict()` method
to a Message object.

:param data: The input dict.
:return: The message message object correponding to given data.

:param encoder: Encoder object that will be used to decode the payload
if set to None B64PickleEncoder() will be used, which decodes an
object that was pickled and then base64 encoded.

:return: The message message object corresponding to given data.
"""
payload_encoder = (payload_encoder if payload_encoder
else DEFAULT_ENCODER_CLS())

decode = payload_encoder.decode

result = Message()
result.timestamp = datetime.datetime.strptime(data['timestamp'], DATE_FORMAT)
result.uuid = UUID(data['uuid']).hex
result.payload = pickle.loads(base64.b64decode(data['payload'].encode('ascii')))
result.payload = decode(data['payload'])
result.meta = data['meta']

for k, ctx_msg in data['ctx'].items():
result.ctx[k] = {}
result.ctx[k]['payload'] = pickle.loads(base64.b64decode(ctx_msg['payload'].encode('ascii')))
result.ctx[k]['payload'] = decode(ctx_msg['payload'])
result.ctx[k]['meta'] = dict(ctx_msg['meta'])

return result
Expand Down
6 changes: 5 additions & 1 deletion pypeman/msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def get(self, id):

async def search(self, start=0, count=10, order_by='timestamp'):
"""
Return a list of message with store specific `id` and processed status.
Return a list of messages with store specific `id` and processed status.

:param start: First element.
:param count: Count of elements since first element.
Expand Down Expand Up @@ -252,6 +252,10 @@ async def get(self, id):
raise IndexError

with open(os.path.join(self.base_path, id), "rb") as f:
# TODO: we might implement an async version for huge files
# - use either https://github.com/Tinche/aiofiles
# - read chunks + add sleep(0) (not good for blocking network file systems)

msg = Message.from_json(f.read().decode('utf-8'))
return {'id': id, 'state': await self.get_message_state(id), 'message': msg}

Expand Down
27 changes: 22 additions & 5 deletions pypeman/remoteadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,34 @@ async def stop_channel(self, channel):
'status': channels.BaseChannel.status_id_to_str(chan.status)
}

async def list_msg(self, channel, start=0, count=10, order_by='timestamp'):
async def list_msg(self, channel, start=0, count=10, order_by='timestamp', mk_b64pickle=False):
"""
List first `count` messages from message store of specified channel.

:params channel: The channel name.
:param mk_b64pickle: if True (yield payload / ctx fields as b64 encoded pickles)
if False a a dict with a field type, field value and a field repr
will be created.
Field types depend on the channel's jsonable_msg_info_for_admin() method
typical values are:
"asis": value is the object 'asis', as the object could be pickled
"bytes": bytes just as ASCII string
"utf8bytes": bytes as UTF8 string
"b64": base 64 encoded bytes
"repr": a repr() string of the object is returned
"b64pickle": all fields b64 encoded pickles of the related python object
"""
chan = self.get_channel(channel)

messages = await chan.message_store.search(start=start, count=count, order_by=order_by)

for res in messages:
res['timestamp'] = res['message'].timestamp_str()
res['message'] = res['message'].to_json()
if mk_b64pickle:
res['message'] = res['message'].to_json()
else:
msg = res['message']
res['message'] = chan.jsonable_msg_info_for_admin(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why is this not a message method ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above


return {'messages': messages, 'total': await chan.message_store.total()}

Expand All @@ -180,7 +195,7 @@ async def push_msg(self, channel, text):
Push a message in the channel.

:params channel: The channel name.
:params msg_ids: The text added to the payload.
:params text: The text added to the payload.
"""
chan = self.get_channel(channel)
msg = message.Message(payload=text)
Expand Down Expand Up @@ -248,7 +263,8 @@ def stop(self, channel):
"""
return self.send_command('stop_channel', [channel])

def list_msg(self, channel, start=0, count=10, order_by='timestamp'):
def list_msg(self, channel, start=0, count=10, order_by='timestamp',
mk_b64pickle=True):
"""
List first 10 messages on specified channel from remote instance.

Expand All @@ -258,7 +274,8 @@ def list_msg(self, channel, start=0, count=10, order_by='timestamp'):
:params order_by: Message order. only 'timestamp' and '-timestamp' handled for now.
:returns: list of message with status.
"""
result = self.send_command('list_msg', [channel, start, count, order_by])
result = self.send_command('list_msg', [channel, start, count,
order_by, mk_b64pickle])

for m in result['messages']:
m['message'] = message.Message.from_json(m['message'])
Expand Down
Loading