-
Notifications
You must be signed in to change notification settings - Fork 129
/
json2pb.py
125 lines (106 loc) · 3.71 KB
/
json2pb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# vim: sts=4 sw=4 et
"""
Copyright (c) 2008-2013 Pavel Shramov <[email protected]>
json2pb is free software; you can redistribute it and/or modify
it under the terms of the MIT license. See LICENSE for details.
json2pb originaly was part of pbufrpc project
"""
"""
Patch Message class with two functions:
SerializeToJSON which works like SerializeToString but results in JSON message
ParseFromJSON which parses JSON message and fills object
Import this file to add functions
Also they may be used as json_encode and json_decode
"""
from google.protobuf.message import Message
__all__ = ['json_encode', 'json_decode']
Message.SerializeToJSON = lambda s: json_encode(s)
Message.ParseFromJSON = lambda s, j: [s.Clear(), 1] and s.CopyFrom(json_decode(s, j))
def _load_simplejson():
try:
from simplejson import dumps, loads
return dumps, loads
except ImportError:
return None, None
def _load_cjson():
try:
from cjson import encode, decode
_encode = encode
encode = lambda *a: _encode(*a).replace('\\/', '/') #XXX: Workaround #593302
return encode, decode
except ImportError:
return None, None
_j_encode, _j_decode = None, None
def _load():
global _j_encode, _j_decode
if _j_encode:
return
_j_encode, _j_decode = _load_cjson()
if _j_encode:
return
_j_encode, _j_decode = _load_simplejson()
if not _j_encode:
raise ImportError("No JSON implementation found")
def json_encode(obj):
""" Encode Protobuf object (Message) in JSON """
_load()
return _j_encode(pb2jd(obj))
def json_decode(obj, str):
""" Decode JSON message """
_load()
return jd2pb(obj.DESCRIPTOR, _j_decode(str or '{}'))
def pb2jd(o):
def _pbe(d, obj):
if d.type == d.TYPE_MESSAGE:
return pb2jd(obj)
elif d.type == d.TYPE_BYTES:
return obj.encode('base64')
else:
return obj
d = {}
for f in o.DESCRIPTOR.fields:
if f.label == f.LABEL_REPEATED:
r = []
for x in getattr(o, f.name):
r.append(_pbe(f, x))
d[f.name] = r
elif f.label == f.LABEL_OPTIONAL:
if o.HasField(f.name):
d[f.name] = _pbe(f, getattr(o, f.name))
elif f.label == f.LABEL_REQUIRED:
if not o.HasField(f.name):
raise ValueError("Required field %s is not found" % (f.name))
d[f.name] = _pbe(f, getattr(o, f.name))
else:
raise TypeError("Unknown label type %s in %s" % (f.label, f.full_name))
return d
def jd2pb(descriptor, jdict):
def _pbd(d, obj):
if d.type == d.TYPE_MESSAGE:
return jd2pb(d.message_type, obj)
elif d.type == d.TYPE_BYTES:
return obj.decode('base64')
else:
return obj
o = descriptor._concrete_class()
for f in descriptor.fields:
if f.label == f.LABEL_REPEATED:
for x in jdict.get(f.name, []):
if f.type == f.TYPE_MESSAGE:
getattr(o, f.name).add().CopyFrom(_pbd(f, x))
else:
getattr(o, f.name).append(_pbd(f, x))
elif f.label in [f.LABEL_OPTIONAL, f.LABEL_REQUIRED]:
if f.name not in jdict:
if f.label == f.LABEL_REQUIRED:
raise ValueError("Required field %s is not found" % (f.name))
continue
x = _pbd(f, jdict[f.name])
if f.type == f.TYPE_MESSAGE:
getattr(o, f.name).CopyFrom(x)
else:
setattr(o, f.name, x)
else:
raise TypeError("Unknown label type %s in %s" % (f.label, f.full_name))
return o