From 454dc979e14a3eedbbd5fb5750da1cec5e14455a Mon Sep 17 00:00:00 2001 From: Wynn Vonnegut Date: Wed, 28 Jun 2017 11:52:55 -0700 Subject: [PATCH 1/3] initial commit of asyncio-based client/server --- examples/asyncio/ping_client.py | 23 ++++ examples/asyncio/ping_server.py | 29 ++++++ examples/asyncio/pingpong.thrift | 7 ++ setup.py | 7 +- tests/conftest.py | 5 + tests/test_asyncio.py | 89 ++++++++++++++++ thriftpy/contrib/async.py | 173 +++++++++++++++++++++++++++++++ tox.ini | 2 + 8 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 examples/asyncio/ping_client.py create mode 100644 examples/asyncio/ping_server.py create mode 100644 examples/asyncio/pingpong.thrift create mode 100644 tests/conftest.py create mode 100644 tests/test_asyncio.py create mode 100644 thriftpy/contrib/async.py diff --git a/examples/asyncio/ping_client.py b/examples/asyncio/ping_client.py new file mode 100644 index 0000000..1e3ceb8 --- /dev/null +++ b/examples/asyncio/ping_client.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +import thriftpy +from thriftpy.contrib.async import make_client + +import asyncio + + +pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") + +@asyncio.coroutine +def main(): + c = yield from make_client(pp_thrift.PingService) + + pong = yield from c.ping() + print(pong) + + c.close() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/asyncio/ping_server.py b/examples/asyncio/ping_server.py new file mode 100644 index 0000000..2954935 --- /dev/null +++ b/examples/asyncio/ping_server.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +import thriftpy +import asyncio +from thriftpy.contrib.async import make_server + + +pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") + + +class Dispatcher(object): + @asyncio.coroutine + def ping(self): + print("ping pong!") + return 'pong' + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + server = loop.run_until_complete( + make_server(pp_thrift.PingService, Dispatcher())) + + try: + loop.run_forever() + except KeyboardInterrupt: + pass + + server.close() + loop.run_until_complete(server.wait_closed()) + loop.close() diff --git a/examples/asyncio/pingpong.thrift b/examples/asyncio/pingpong.thrift new file mode 100644 index 0000000..0bb9c85 --- /dev/null +++ b/examples/asyncio/pingpong.thrift @@ -0,0 +1,7 @@ +# ping service demo +service PingService { + /* + * Sexy c style comment + */ + string ping(), +} diff --git a/setup.py b/setup.py index a8243b5..092fe56 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,10 @@ "toro>=0.6" ] +asyncio_requires = [ + "async_timeout>=1.2.1" +] + dev_requires = [ "cython>=0.23", "flake8>=2.5", @@ -81,7 +85,8 @@ tests_require=tornado_requires, extras_require={ "dev": dev_requires, - "tornado": tornado_requires + "tornado": tornado_requires, + "asyncio": asyncio_requires }, cmdclass=cmdclass, ext_modules=ext_modules, diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..19b837a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,5 @@ +import sys + +collect_ignore = ["setup.py"] +if sys.version_info < (3, 5): + collect_ignore.append("test_asyncio.py") diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py new file mode 100644 index 0000000..bbfc1de --- /dev/null +++ b/tests/test_asyncio.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import + +from os import path + +import thriftpy +from thriftpy.contrib.async import make_client, make_server + +import pytest +import asyncio + +pytestmark = pytest.mark.asyncio + +addressbook = thriftpy.load(path.join(path.dirname(__file__), + "addressbook.thrift")) + + +class Dispatcher(object): + def __init__(self): + self.registry = {} + + @asyncio.coroutine + def add(self, person): + """ + bool add(1: Person person); + """ + if person.name in self.registry: + return False + self.registry[person.name] = person + return True + + @asyncio.coroutine + def get(self, name): + """ + Person get(1: string name) throws (1: PersonNotExistsError not_exists); + """ + if name not in self.registry: + raise addressbook.PersonNotExistsError( + 'Person "{0}" does not exist!'.format(name)) + return self.registry[name] + + @asyncio.coroutine + def remove(self, name): + """ + bool remove(1: string name) throws (1: PersonNotExistsError not_exists) + """ + # delay action for later + yield from asyncio.sleep(.1) + if name not in self.registry: + raise addressbook.PersonNotExistsError( + 'Person "{0}" does not exist!'.format(name)) + del self.registry[name] + return True + + +@pytest.fixture +async def client(request): + server = await make_server(addressbook.AddressBookService, Dispatcher()) + client = await make_client(addressbook.AddressBookService) + + def teardown(): + client.close() + server.close() + loop = asyncio.get_event_loop() + loop.run_until_complete(server.wait_closed()) + request.addfinalizer(teardown) + + return client + + +async def test_async_result(client): + dennis = addressbook.Person(name='Dennis Ritchie') + success = await client.add(dennis) + assert success + success = await client.add(dennis) + assert not success + person = await client.get(dennis.name) + assert person.name == dennis.name + + +async def test_async_exception(client): + exc = None + try: + await client.get('Brian Kernighan') + except Exception as e: + exc = e + + assert isinstance(exc, addressbook.PersonNotExistsError) diff --git a/thriftpy/contrib/async.py b/thriftpy/contrib/async.py new file mode 100644 index 0000000..bbf160c --- /dev/null +++ b/thriftpy/contrib/async.py @@ -0,0 +1,173 @@ +from thriftpy.thrift import TType, TMessageType, TApplicationException, TProcessor, TClient, args2kwargs +from thriftpy.transport import TMemoryBuffer +from thriftpy.protocol import TBinaryProtocolFactory + +import asyncio +import async_timeout +import struct + +import logging +LOG = logging.getLogger(__name__) + + +class TAsyncTransport(TMemoryBuffer): + def __init__(self, trans): + super().__init__() + self._trans = trans + self._io_lock = asyncio.Lock() + + def flush(self): + buf = self.getvalue() + self._trans.write(struct.pack("!i", len(buf)) + buf) + self.setvalue(b'') + + @asyncio.coroutine + def read_frame(self): + # do not yield the event loop on a single reader + # between reading the frame_size and the buffer + with (yield from self._io_lock): + buff = yield from self._trans.readexactly(4) + sz, = struct.unpack('!i', buff) + + frame = yield from self._trans.readexactly(sz) + self.setvalue(frame) + + @asyncio.coroutine + def drain(self): + # drain cannot be called concurrently + with (yield from self._io_lock): + yield from self._trans.drain() + + +class TAsyncProcessor(TProcessor): + def __init__(self, service, handler): + self._service = service + self._handler = handler + + @asyncio.coroutine + def process(self, iprot, oprot): + # the standard thrift protocol packs a single request per frame + # note that chunked requests are not supported, and would require + # additional sequence information + yield from iprot.trans.read_frame() + api, seqid, result, call = self.process_in(iprot) + + if isinstance(result, TApplicationException): + self.send_exception(oprot, api, result, seqid) + yield from oprot.trans.drain() + + try: + result.success = yield from call() + except Exception as e: + # raise if api don't have throws + self.handle_exception(e, result) + + if not result.oneway: + self.send_result(oprot, api, result, seqid) + yield from oprot.trans.drain() + + +class TAsyncServer(object): + def __init__(self, processor, + iprot_factory=None, + oprot_factory=None, + timeout=None): + self.processor = processor + self.iprot_factory = iprot_factory or TBinaryProtocolFactory() + self.oprot_factory = oprot_factory or self.iprot_factory + self.timeout = timeout + + @asyncio.coroutine + def __call__(self, reader, writer): + itrans = TAsyncTransport(reader) + iproto = self.iprot_factory.get_protocol(itrans) + + otrans = TAsyncTransport(writer) + oproto = self.oprot_factory.get_protocol(otrans) + + while not reader.at_eof(): + try: + with async_timeout.timeout(self.timeout): + yield from self.processor.process(iproto, oproto) + except ConnectionError: + LOG.debug('client has closed the connection') + writer.close() + except asyncio.TimeoutError: + LOG.debug('timeout when processing the client request') + writer.close() + except asyncio.IncompleteReadError: + LOG.debug('client has closed the connection') + writer.close() + except Exception: + # app exception + LOG.exception('unhandled app exception') + writer.close() + writer.close() + + +class TAsyncClient(TClient): + def __init__(self, *args, timeout=None, **kwargs): + super().__init__(*args, **kwargs) + self.timeout = timeout + + @asyncio.coroutine + def _req(self, _api, *args, **kwargs): + with async_timeout.timeout(self.timeout): + args_cls = getattr(self._service, _api + "_args") + _kw = args2kwargs(args_cls.thrift_spec, *args) + + kwargs.update(_kw) + result_cls = getattr(self._service, _api + "_result") + + self._send(_api, **kwargs) + yield from self._oprot.trans.drain() + + # wait result only if non-oneway + if not getattr(result_cls, "oneway"): + yield from self._iprot.trans.read_frame() + return self._recv(_api) + + def close(self): + self._oprot.trans.close() + + +@asyncio.coroutine +def make_server( + service, + handler, + host = 'localhost', + port = 9090, + proto_factory = TBinaryProtocolFactory(), + loop = None, + timeout = None + ): + """ + create a thrift server running on an asyncio event-loop. + """ + processor = TAsyncProcessor(service, handler) + if loop is None: + loop = asyncio.get_event_loop() + server = yield from asyncio.start_server( + TAsyncServer(processor, proto_factory, timeout=timeout), host, port, loop=loop) + return server + + +@asyncio.coroutine +def make_client(service, + host = 'localhost', + port = 9090, + proto_factory = TBinaryProtocolFactory(), + timeout = None, + loop = None): + if loop is None: + loop = asyncio.get_event_loop() + + reader, writer = yield from asyncio.open_connection( + host, port, loop=loop) + + itrans = TAsyncTransport(reader) + iproto = proto_factory.get_protocol(itrans) + + otrans = TAsyncTransport(writer) + oproto = proto_factory.get_protocol(otrans) + return TAsyncClient(service, iproto, oproto) diff --git a/tox.ini b/tox.ini index c488270..cd38704 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,8 @@ deps = toro cython py26: ordereddict + py35: pytest_asyncio + py35: async_timeout [testenv:flake8] deps = flake8 From 2a07d7081ab6f545f4beac66bb59846628340f20 Mon Sep 17 00:00:00 2001 From: Wynn Vonnegut Date: Wed, 28 Jun 2017 15:10:42 -0700 Subject: [PATCH 2/3] make sure to send/acknowlege EOF when a connection is closed --- tests/test_asyncio.py | 102 +++++++++++++++++++++++++++----------- thriftpy/contrib/async.py | 21 ++++++-- 2 files changed, 91 insertions(+), 32 deletions(-) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index bbfc1de..b4b9297 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -6,11 +6,12 @@ import thriftpy from thriftpy.contrib.async import make_client, make_server +from thriftpy.rpc import make_client as make_sync_client +from thriftpy.transport import TFramedTransportFactory import pytest import asyncio - -pytestmark = pytest.mark.asyncio +import threading addressbook = thriftpy.load(path.join(path.dirname(__file__), "addressbook.thrift")) @@ -54,36 +55,81 @@ def remove(self, name): return True -@pytest.fixture -async def client(request): - server = await make_server(addressbook.AddressBookService, Dispatcher()) - client = await make_client(addressbook.AddressBookService) +class Server(threading.Thread): + def __init__(self): + self.loop = loop = asyncio.new_event_loop() + self.server = loop.run_until_complete(make_server( + service=addressbook.AddressBookService, + handler=Dispatcher(), + loop=loop + )) + super().__init__() - def teardown(): - client.close() - server.close() - loop = asyncio.get_event_loop() - loop.run_until_complete(server.wait_closed()) - request.addfinalizer(teardown) + def run(self): + loop = self.loop + server = self.server + asyncio.set_event_loop(loop) - return client + loop.run_forever() + server.close() + loop.run_until_complete(server.wait_closed()) -async def test_async_result(client): - dennis = addressbook.Person(name='Dennis Ritchie') - success = await client.add(dennis) - assert success - success = await client.add(dennis) - assert not success - person = await client.get(dennis.name) - assert person.name == dennis.name + loop.close() + def stop(self): + self.loop.call_soon_threadsafe(self.loop.stop) + self.join() -async def test_async_exception(client): - exc = None - try: - await client.get('Brian Kernighan') - except Exception as e: - exc = e - assert isinstance(exc, addressbook.PersonNotExistsError) +@pytest.fixture +def server(): + server = Server() + server.start() + yield server + server.stop() + + +class TestAsyncClient: + @pytest.fixture + async def client(self, request, server): + client = await make_client(addressbook.AddressBookService) + request.addfinalizer(client.close) + return client + + @pytest.mark.asyncio + async def test_result(self, client): + dennis = addressbook.Person(name='Dennis Ritchie') + success = await client.add(dennis) + assert success + success = await client.add(dennis) + assert not success + person = await client.get(dennis.name) + assert person.name == dennis.name + + @pytest.mark.asyncio + async def test_exception(self, client): + with pytest.raises(addressbook.PersonNotExistsError): + await client.get('Brian Kernighan') + + +class TestSyncClient: + @pytest.fixture + async def client(self, request, server): + client = make_sync_client(addressbook.AddressBookService, + trans_factory=TFramedTransportFactory()) + request.addfinalizer(client.close) + return client + + def test_result(self, client): + dennis = addressbook.Person(name='Dennis Ritchie') + success = client.add(dennis) + assert success + success = client.add(dennis) + assert not success + person = client.get(dennis.name) + assert person.name == dennis.name + + def test_exception(self, client): + with pytest.raises(addressbook.PersonNotExistsError): + client.get('Brian Kernighan') diff --git a/thriftpy/contrib/async.py b/thriftpy/contrib/async.py index bbf160c..b04a545 100644 --- a/thriftpy/contrib/async.py +++ b/thriftpy/contrib/async.py @@ -39,6 +39,18 @@ def drain(self): yield from self._trans.drain() +class TAsyncReader(TAsyncTransport): + def close(self): + self._trans.feed_eof() + super().close() + + +class TAsyncWriter(TAsyncTransport): + def close(self): + self._trans.write_eof() + super().close() + + class TAsyncProcessor(TProcessor): def __init__(self, service, handler): self._service = service @@ -79,10 +91,10 @@ def __init__(self, processor, @asyncio.coroutine def __call__(self, reader, writer): - itrans = TAsyncTransport(reader) + itrans = TAsyncReader(reader) iproto = self.iprot_factory.get_protocol(itrans) - otrans = TAsyncTransport(writer) + otrans = TAsyncWriter(writer) oproto = self.oprot_factory.get_protocol(otrans) while not reader.at_eof(): @@ -128,6 +140,7 @@ def _req(self, _api, *args, **kwargs): return self._recv(_api) def close(self): + self._iprot.trans.close() self._oprot.trans.close() @@ -165,9 +178,9 @@ def make_client(service, reader, writer = yield from asyncio.open_connection( host, port, loop=loop) - itrans = TAsyncTransport(reader) + itrans = TAsyncReader(reader) iproto = proto_factory.get_protocol(itrans) - otrans = TAsyncTransport(writer) + otrans = TAsyncWriter(writer) oproto = proto_factory.get_protocol(otrans) return TAsyncClient(service, iproto, oproto) From d0780d55770eb309dba2027893993a105883cbe2 Mon Sep 17 00:00:00 2001 From: Wynn Vonnegut Date: Wed, 5 Jul 2017 14:42:56 -0700 Subject: [PATCH 3/3] remove async_timeout as a dependency --- setup.py | 7 +------ thriftpy/contrib/async.py | 32 ++++++++++++++++++-------------- tox.ini | 3 +-- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/setup.py b/setup.py index 092fe56..a8243b5 100644 --- a/setup.py +++ b/setup.py @@ -22,10 +22,6 @@ "toro>=0.6" ] -asyncio_requires = [ - "async_timeout>=1.2.1" -] - dev_requires = [ "cython>=0.23", "flake8>=2.5", @@ -85,8 +81,7 @@ tests_require=tornado_requires, extras_require={ "dev": dev_requires, - "tornado": tornado_requires, - "asyncio": asyncio_requires + "tornado": tornado_requires }, cmdclass=cmdclass, ext_modules=ext_modules, diff --git a/thriftpy/contrib/async.py b/thriftpy/contrib/async.py index b04a545..f0ab6ec 100644 --- a/thriftpy/contrib/async.py +++ b/thriftpy/contrib/async.py @@ -3,7 +3,6 @@ from thriftpy.protocol import TBinaryProtocolFactory import asyncio -import async_timeout import struct import logging @@ -99,8 +98,8 @@ def __call__(self, reader, writer): while not reader.at_eof(): try: - with async_timeout.timeout(self.timeout): - yield from self.processor.process(iproto, oproto) + fut = self.processor.process(iproto, oproto) + yield from asyncio.wait_for(fut, self.timeout) except ConnectionError: LOG.debug('client has closed the connection') writer.close() @@ -124,20 +123,25 @@ def __init__(self, *args, timeout=None, **kwargs): @asyncio.coroutine def _req(self, _api, *args, **kwargs): - with async_timeout.timeout(self.timeout): - args_cls = getattr(self._service, _api + "_args") - _kw = args2kwargs(args_cls.thrift_spec, *args) + fut = self._req_impl(_api, *args, **kwargs) + result = yield from asyncio.wait_for(fut, self.timeout) + return result - kwargs.update(_kw) - result_cls = getattr(self._service, _api + "_result") + @asyncio.coroutine + def _req_impl(self, _api, *args, **kwargs): + args_cls = getattr(self._service, _api + "_args") + _kw = args2kwargs(args_cls.thrift_spec, *args) + + kwargs.update(_kw) + result_cls = getattr(self._service, _api + "_result") - self._send(_api, **kwargs) - yield from self._oprot.trans.drain() + self._send(_api, **kwargs) + yield from self._oprot.trans.drain() - # wait result only if non-oneway - if not getattr(result_cls, "oneway"): - yield from self._iprot.trans.read_frame() - return self._recv(_api) + # wait result only if non-oneway + if not getattr(result_cls, "oneway"): + yield from self._iprot.trans.read_frame() + return self._recv(_api) def close(self): self._iprot.trans.close() diff --git a/tox.ini b/tox.ini index cd38704..ac5c120 100644 --- a/tox.ini +++ b/tox.ini @@ -14,8 +14,7 @@ deps = toro cython py26: ordereddict - py35: pytest_asyncio - py35: async_timeout + py,py35: pytest_asyncio [testenv:flake8] deps = flake8