Skip to content

Commit

Permalink
pythongh-123309: Add more tests for the pickletools module
Browse files Browse the repository at this point in the history
Add tests for genops() and dis().
  • Loading branch information
serhiy-storchaka committed Aug 26, 2024
1 parent dbc1752 commit f41de40
Showing 1 changed file with 310 additions and 0 deletions.
310 changes: 310 additions & 0 deletions Lib/test/test_pickletools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import pickle
import pickletools
from test import support
Expand Down Expand Up @@ -62,6 +63,315 @@ def test_optimize_binput_and_memoize(self):
self.assertNotIn(pickle.BINPUT, pickled2)


class SimpleReader:
def __init__(self, data):
self.data = data
self.pos = 0

def read(self, n):
data = self.data[self.pos: self.pos + n]
self.pos += n
return data

def readline(self):
nl = self.data.find(b'\n', self.pos) + 1
if not nl:
nl = len(self.data)
data = self.data[self.pos: nl]
self.pos = nl
return data


class GenopsTests(unittest.TestCase):
def test_genops(self):
it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.')
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, 0),
('INT', 123, 1),
('BININT1', 0x12, 6),
('BININT', 0x78563412, 8),
('TUPLE', None, 13),
('STOP', None, 14),
])

def test_from_file(self):
f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix')
self.assertEqual(f.read(6), b'prefix')
it = pickletools.genops(f)
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, 6),
('INT', 123, 7),
('BININT1', 0x12, 12),
('BININT', 0x78563412, 14),
('TUPLE', None, 19),
('STOP', None, 20),
])
self.assertEqual(f.read(), b'suffix')

def test_without_pos(self):
f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.')
it = pickletools.genops(f)
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, None),
('INT', 123, None),
('BININT1', 0x12, None),
('BININT', 0x78563412, None),
('TUPLE', None, None),
('STOP', None, None),
])

def test_no_stop(self):
it = pickletools.genops(b'N')
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
'pickle exhausted before seeing STOP'):
next(it)

def test_truncated_data(self):
it = pickletools.genops(b'I123')
with self.assertRaisesRegex(ValueError,
'no newline found when trying to read stringnl'):
next(it)
it = pickletools.genops(b'J\x12\x34')
with self.assertRaisesRegex(ValueError,
'not enough data in stream to read int4'):
next(it)

def test_unknown_opcode(self):
it = pickletools.genops(b'N\xff')
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
r"at position 1, opcode b'\\xff' unknown"):
next(it)

def test_unknown_opcode_without_pos(self):
f = SimpleReader(b'N\xff')
it = pickletools.genops(f)
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
r"at position <unknown>, opcode b'\\xff' unknown"):
next(it)


class DisTests(unittest.TestCase):
maxDiff = None

def check_dis(self, data, expected, **kwargs):
out = io.StringIO()
pickletools.dis(data, out=out, **kwargs)
self.assertEqual(out.getvalue(), expected)

def check_dis_error(self, data, expected, expected_error, **kwargs):
out = io.StringIO()
with self.assertRaisesRegex(ValueError, expected_error):
pickletools.dis(data, out=out, **kwargs)
self.assertEqual(out.getvalue(), expected)

def test_mark(self):
self.check_dis(b'(N(tl.', '''\
0: ( MARK
1: N NONE
2: ( MARK
3: t TUPLE (MARK at 2)
4: l LIST (MARK at 0)
5: . STOP
highest protocol among opcodes = 0
''')

def test_indentlevel(self):
self.check_dis(b'(N(tl.', '''\
0: ( MARK
1: N NONE
2: ( MARK
3: t TUPLE (MARK at 2)
4: l LIST (MARK at 0)
5: . STOP
highest protocol among opcodes = 0
''', indentlevel=2)

def test_mark_without_pos(self):
self.check_dis(SimpleReader(b'(N(tl.'), '''\
( MARK
N NONE
( MARK
t TUPLE (MARK at unknown opcode offset)
l LIST (MARK at unknown opcode offset)
. STOP
highest protocol among opcodes = 0
''')

def test_no_mark(self):
self.check_dis_error(b'Nt.', '''\
0: N NONE
1: t TUPLE no MARK exists on stack
''', 'no MARK exists on stack')

def test_put(self):
self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\
0: N NONE
1: p PUT 0
4: q BINPUT 1
6: r LONG_BINPUT 2
11: \\x94 MEMOIZE (as 3)
12: . STOP
highest protocol among opcodes = 4
''')

def test_put_redefined(self):
self.check_dis_error(b'Np1\np1\n.', '''\
0: N NONE
1: p PUT 1
4: p PUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\nq\x01.', '''\
0: N NONE
1: p PUT 1
4: q BINPUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\
0: N NONE
1: p PUT 1
4: r LONG_BINPUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\n\x94.', '''\
0: N NONE
1: p PUT 1
4: \\x94 MEMOIZE (as 1)
''', 'memo key None already defined')

def test_put_empty_stack(self):
self.check_dis_error(b'p0\n', '''\
0: p PUT 0
''', "stack is empty -- can't store into memo")

def test_put_markobject(self):
self.check_dis_error(b'(p0\n', '''\
0: ( MARK
1: p PUT 0
''', "can't store markobject in the memo")

def test_get(self):
self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\
0: ( MARK
1: N NONE
2: p PUT 1
5: g GET 1
8: h BINGET 1
10: j LONG_BINGET 1
15: t TUPLE (MARK at 0)
16: . STOP
highest protocol among opcodes = 1
''')

def test_get_without_put(self):
self.check_dis_error(b'g1\n.', '''\
0: g GET 1
''', 'memo key 1 has never been stored into')
self.check_dis_error(b'h\x01.', '''\
0: h BINGET 1
''', 'memo key 1 has never been stored into')
self.check_dis_error(b'j\x01\x00\x00\x00.', '''\
0: j LONG_BINGET 1
''', 'memo key 1 has never been stored into')

def test_memo(self):
memo = {}
self.check_dis(b'Np1\n.', '''\
0: N NONE
1: p PUT 1
4: . STOP
highest protocol among opcodes = 0
''', memo=memo)
self.check_dis(b'g1\n.', '''\
0: g GET 1
3: . STOP
highest protocol among opcodes = 0
''', memo=memo)

def test_mark_pop(self):
self.check_dis(b'(N00N.', '''\
0: ( MARK
1: N NONE
2: 0 POP
3: 0 POP (MARK at 0)
4: N NONE
5: . STOP
highest protocol among opcodes = 0
''')

def test_too_small_stack(self):
self.check_dis_error(b'a', '''\
0: a APPEND
''', 'tries to pop 2 items from stack with only 0 items')
self.check_dis_error(b']a', '''\
0: ] EMPTY_LIST
1: a APPEND
''', 'tries to pop 2 items from stack with only 1 items')

def test_no_stop(self):
self.check_dis_error(b'N', '''\
0: N NONE
''', 'pickle exhausted before seeing STOP')

def test_truncated_data(self):
self.check_dis_error(b'NI123', '''\
0: N NONE
''', 'no newline found when trying to read stringnl')
self.check_dis_error(b'NJ\x12\x34', '''\
0: N NONE
''', 'not enough data in stream to read int4')

def test_unknown_opcode(self):
self.check_dis_error(b'N\xff', '''\
0: N NONE
''', r"at position 1, opcode b'\\xff' unknown")

def test_stop_not_empty_stack(self):
self.check_dis_error(b']N.', '''\
0: ] EMPTY_LIST
1: N NONE
2: . STOP
highest protocol among opcodes = 1
''', r'stack not empty after STOP: \[list\]')

def test_annotate(self):
self.check_dis(b'(Nt.', '''\
0: ( MARK Push markobject onto the stack.
1: N NONE Push None on the stack.
2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
3: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=1)
self.check_dis(b'(Nt.', '''\
0: ( MARK Push markobject onto the stack.
1: N NONE Push None on the stack.
2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
3: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)
self.check_dis(b'(((((((ttttttt.', '''\
0: ( MARK Push markobject onto the stack.
1: ( MARK Push markobject onto the stack.
2: ( MARK Push markobject onto the stack.
3: ( MARK Push markobject onto the stack.
4: ( MARK Push markobject onto the stack.
5: ( MARK Push markobject onto the stack.
6: ( MARK Push markobject onto the stack.
7: t TUPLE (MARK at 6) Build a tuple out of the topmost stack slice, after markobject.
8: t TUPLE (MARK at 5) Build a tuple out of the topmost stack slice, after markobject.
9: t TUPLE (MARK at 4) Build a tuple out of the topmost stack slice, after markobject.
10: t TUPLE (MARK at 3) Build a tuple out of the topmost stack slice, after markobject.
11: t TUPLE (MARK at 2) Build a tuple out of the topmost stack slice, after markobject.
12: t TUPLE (MARK at 1) Build a tuple out of the topmost stack slice, after markobject.
13: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
14: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)


class MiscTestCase(unittest.TestCase):
def test__all__(self):
not_exported = {
Expand Down

0 comments on commit f41de40

Please sign in to comment.