From 3b3a5a5a877dc3ba4c6d83da994a651dd302c9bc Mon Sep 17 00:00:00 2001 From: wallezhang Date: Tue, 23 Jan 2018 15:35:25 +0800 Subject: [PATCH 1/9] add TNonblockingServer --- tests/test_nonblocking.py | 251 +++++++++++++++++++++++++ thriftpy/nonblockingserver.py | 345 ++++++++++++++++++++++++++++++++++ 2 files changed, 596 insertions(+) create mode 100644 tests/test_nonblocking.py create mode 100644 thriftpy/nonblockingserver.py diff --git a/tests/test_nonblocking.py b/tests/test_nonblocking.py new file mode 100644 index 0000000..31c3584 --- /dev/null +++ b/tests/test_nonblocking.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import + +import multiprocessing +import os +import time +import socket +import ssl + +import pytest + +import thriftpy + +thriftpy.install_import_hook() + +from thriftpy._compat import PY3 # noqa +from thriftpy.rpc import make_server, client_context # noqa +from thriftpy.transport import TFramedTransportFactory # noqa +from thriftpy.transport import TTransportException # noqa + +addressbook = thriftpy.load(os.path.join(os.path.dirname(__file__), + "addressbook.thrift")) +unix_sock = "/tmp/thriftpy_test.sock" +SSL_PORT = 50441 + + +class Dispatcher(object): + def __init__(self): + self.ab = addressbook.AddressBook() + self.ab.people = {} + + def ping(self): + print('ping is called!!') + return True + + def hello(self, name): + return "hello " + name + + def add(self, person): + self.ab.people[person.name] = person + return True + + def remove(self, name): + try: + self.ab.people.pop(name) + return True + except KeyError: + raise addressbook.PersonNotExistsError( + "{} not exists".format(name)) + + def get(self, name): + try: + return self.ab.people[name] + except KeyError: + raise addressbook.PersonNotExistsError( + "{} not exists".format(name)) + + def book(self): + return self.ab + + def get_phonenumbers(self, name, count): + p = [self.ab.people[name].phones[0]] if name in self.ab.people else [] + return p * count + + def get_phones(self, name): + phone_numbers = self.ab.people[name].phones + return dict((p.type, p.number) for p in phone_numbers) + + def sleep(self, ms): + time.sleep(ms / 1000.0) + return True + + +@pytest.fixture(scope="module") +def server(request): + server = make_server(addressbook.AddressBookService, Dispatcher(), + unix_socket=unix_sock, blocking=False) + ps = multiprocessing.Process(target=server.serve) + ps.start() + + time.sleep(0.1) + + def fin(): + if ps.is_alive(): + ps.terminate() + try: + os.remove(unix_sock) + except IOError: + pass + + request.addfinalizer(fin) + + +@pytest.fixture(scope="module") +def ssl_server(request): + ssl_server = make_server(addressbook.AddressBookService, Dispatcher(), + host='localhost', port=SSL_PORT, + certfile="ssl/server.pem", blocking=False) + ps = multiprocessing.Process(target=ssl_server.serve) + ps.start() + + time.sleep(0.1) + + def fin(): + if ps.is_alive(): + ps.terminate() + + request.addfinalizer(fin) + + +@pytest.fixture(scope="module") +def person(): + phone1 = addressbook.PhoneNumber() + phone1.type = addressbook.PhoneType.MOBILE + phone1.number = '555-1212' + phone2 = addressbook.PhoneNumber() + phone2.type = addressbook.PhoneType.HOME + phone2.number = '555-1234' + + # empty struct + phone3 = addressbook.PhoneNumber() + + alice = addressbook.Person() + alice.name = "Alice" + alice.phones = [phone1, phone2, phone3] + alice.created_at = int(time.time()) + + return alice + + +def client(timeout=3000): + return client_context(addressbook.AddressBookService, + unix_socket=unix_sock, timeout=timeout, + trans_factory=TFramedTransportFactory()) + + +def ssl_client(timeout=3000): + return client_context(addressbook.AddressBookService, + host='localhost', port=SSL_PORT, + timeout=timeout, + cafile="ssl/CA.pem", certfile="ssl/client.crt", + keyfile="ssl/client.key", + trans_factory=TFramedTransportFactory()) + + +def test_void_api(server): + with client() as c: + assert c.ping() is None + + +def test_void_api_with_ssl(ssl_server): + with ssl_client() as c: + assert c.ping() is None + + +def test_string_api(server): + with client() as c: + assert c.hello("world") == "hello world" + + +def test_string_api_with_ssl(ssl_server): + with ssl_client() as c: + assert c.hello("world") == "hello world" + + +def test_huge_res(server): + with client() as c: + big_str = "world" * 100000 + assert c.hello(big_str) == "hello " + big_str + + +def test_huge_res_with_ssl(ssl_server): + with ssl_client() as c: + big_str = "world" * 100000 + assert c.hello(big_str) == "hello " + big_str + + +def test_tstruct_req(person): + with client() as c: + assert c.add(person) is True + + +def test_tstruct_req_with_ssl(person): + with ssl_client() as c: + assert c.add(person) is True + + +def test_tstruct_res(person): + with client() as c: + assert person == c.get("Alice") + + +def test_tstruct_res_with_ssl(person): + with ssl_client() as c: + assert person == c.get("Alice") + + +def test_complex_tstruct(): + with client() as c: + assert len(c.get_phonenumbers("Alice", 0)) == 0 + assert len(c.get_phonenumbers("Alice", 1000)) == 1000 + + +def test_complex_tstruct_with_ssl(): + with ssl_client() as c: + assert len(c.get_phonenumbers("Alice", 0)) == 0 + assert len(c.get_phonenumbers("Alice", 1000)) == 1000 + + +def test_exception(): + with pytest.raises(addressbook.PersonNotExistsError): + with client() as c: + c.remove("Bob") + + +def test_exception_iwth_ssl(): + with pytest.raises(addressbook.PersonNotExistsError): + with ssl_client() as c: + c.remove("Bob") + + +def test_client_timeout(): + with pytest.raises(socket.timeout): + with client(timeout=500) as c: + c.sleep(1000) + + +def test_client_socket_timeout(): + with pytest.raises(socket.timeout): + with client_context(addressbook.AddressBookService, + unix_socket=unix_sock, + socket_timeout=500, + trans_factory=TFramedTransportFactory()) as c: + c.sleep(1000) + + +def test_client_connect_timeout(): + with pytest.raises(TTransportException): + with client_context(addressbook.AddressBookService, + unix_socket='/tmp/test.sock', + connect_timeout=1000) as c: + c.hello('test') + + +def test_ssl_client_timeout(): + # SSL socket timeout raises socket.timeout since Python 3.2. + # http://bugs.python.org/issue10272 + with pytest.raises(socket.timeout if PY3 else ssl.SSLError): + with ssl_client(timeout=500) as c: + c.sleep(1000) diff --git a/thriftpy/nonblockingserver.py b/thriftpy/nonblockingserver.py new file mode 100644 index 0000000..e215bee --- /dev/null +++ b/thriftpy/nonblockingserver.py @@ -0,0 +1,345 @@ +# -*- coding: utf-8 -*- + +"""Implementation of non-blocking server. + +The main idea of the server is to receive and send requests +only from the main thread. + +The thread poool should be sized for concurrent tasks, not +maximum connections +""" + +from __future__ import absolute_import + +import logging +import select +import socket +import struct +import threading +import ssl + +from six.moves import queue + +from thriftpy.protocol import TBinaryProtocolFactory +from thriftpy.transport import TMemoryBuffer + +__all__ = ['TNonblockingServer'] + +logger = logging.getLogger(__name__) + + +class Worker(threading.Thread): + """Worker is a small helper to process incoming connection.""" + + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + + def run(self): + """Process queries from task queue, stop if processor is None.""" + while True: + try: + processor, iprot, oprot, otrans, callback = self.queue.get() + if processor is None: + break + processor.process(iprot, oprot) + callback(True, otrans.getvalue()) + except Exception: + logger.exception("Exception while processing request") + callback(False, b'') + + +WAIT_LEN = 0 +WAIT_MESSAGE = 1 +WAIT_PROCESS = 2 +SEND_ANSWER = 3 +CLOSED = 4 + + +def locked(func): + """Decorator which locks self.lock.""" + + def nested(self, *args, **kwargs): + self.lock.acquire() + try: + return func(self, *args, **kwargs) + finally: + self.lock.release() + + return nested + + +def socket_exception(func): + """Decorator close object on socket.error.""" + + def read(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except socket.error: + self.close() + + return read + + +class Connection(object): + """Basic class is represented connection. + + It can be in state: + WAIT_LEN --- connection is reading request len. + WAIT_MESSAGE --- connection is reading request. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. + SEND_ANSWER --- connection is sending answer string (including length + of answer). + CLOSED --- socket was closed and connection should be deleted. + """ + + def __init__(self, new_socket, wake_up): + self.socket = new_socket + self.socket.setblocking(False) + self.status = WAIT_LEN + self.len = 0 + self.message = b'' + self.lock = threading.Lock() + self.wake_up = wake_up + + def _read_len(self): + """Reads length of request. + + It's a safer alternative to self.socket.recv(4) + """ + read = self.socket.recv(4 - len(self.message)) + if len(read) == 0: + # if we read 0 bytes and self.message is empty, then + # the client closed the connection + if len(self.message) != 0: + logger.error("can't read frame size from socket") + self.close() + return + self.message += read + if len(self.message) == 4: + self.len, = struct.unpack('!i', self.message) + if self.len < 0: + logger.error("negative frame size, it seems client " + "doesn't use FramedTransport") + self.close() + elif self.len == 0: + logger.error("empty frame, it's really strange") + self.close() + else: + self.message = b'' + self.status = WAIT_MESSAGE + + @socket_exception + def read(self): + """Reads data from stream and switch state.""" + assert self.status in (WAIT_LEN, WAIT_MESSAGE) + if self.status == WAIT_LEN: + self._read_len() + # go back to the main loop here for simplicity instead of + # falling through, even though there is a good chance that + # the message is already available + elif self.status == WAIT_MESSAGE: + read = self.socket.recv(self.len - len(self.message)) + if len(read) == 0: + logger.error("can't read frame from socket (get %d of " + "%d bytes)" % (len(self.message), self.len)) + self.close() + return + self.message += read + if len(self.message) == self.len: + self.status = WAIT_PROCESS + + @socket_exception + def write(self): + """Writes data from socket and switch state.""" + assert self.status == SEND_ANSWER + sent = self.socket.send(self.message) + if sent == len(self.message): + self.status = WAIT_LEN + self.message = b'' + self.len = 0 + else: + self.message = self.message[sent:] + + @locked + def ready(self, all_ok, message): + """Callback function for switching state and waking up main thread. + + This function is the only function witch can be called asynchronous. + + The ready can switch Connection to three states: + WAIT_LEN if request was oneway. + SEND_ANSWER if request was processed in normal way. + CLOSED if request throws unexpected exception. + + The one wakes up main thread. + """ + assert self.status == WAIT_PROCESS + if not all_ok: + self.close() + self.wake_up() + return + self.len = 0 + if len(message) == 0: + # it was a oneway request, do not write answer + self.message = b'' + self.status = WAIT_LEN + else: + self.message = struct.pack('!i', len(message)) + message + self.status = SEND_ANSWER + self.wake_up() + + @locked + def is_writeable(self): + """Return True if connection should be added to write list of select""" + return self.status == SEND_ANSWER + + # it's not necessary, but... + @locked + def is_readable(self): + """Return True if connection should be added to read list of select""" + return self.status in (WAIT_LEN, WAIT_MESSAGE) + + @locked + def is_closed(self): + """Returns True if connection is closed.""" + return self.status == CLOSED + + def fileno(self): + """Returns the file descriptor of the associated socket.""" + return self.socket.fileno() + + def close(self): + """Closes connection""" + self.status = CLOSED + self.socket.close() + + +class TNonblockingServer(object): + """Non-blocking server.""" + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): + self.processor = processor + self.socket = lsocket + self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() + self.out_protocol = outputProtocolFactory or self.in_protocol + self.threads = int(threads) + self.clients = {} + self.tasks = queue.Queue() + self._read, self._write = socket.socketpair() + self.prepared = False + self._stop = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created.""" + # implement ThreadPool interface + assert not self.prepared, "Can't change number of threads after start" + self.threads = num + + def prepare(self): + """Prepares server for serve requests.""" + if self.prepared: + return + self.socket.listen() + for _ in range(self.threads): + thread = Worker(self.tasks) + thread.setDaemon(True) + thread.start() + self.prepared = True + + def wake_up(self): + """Wake up main thread. + + The server usually waits in select call in we should terminate one. + The simplest way is using socketpair. + + Select always wait to read from the first socket of socketpair. + + In this case, we can just write anything to the second socket from + socketpair. + """ + self._write.send(b'1') + + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + + def _select(self): + """Does select on open connections.""" + readable = [self.socket.sock.fileno(), self._read.fileno()] + writable = [] + for i, connection in list(self.clients.items()): + if connection.is_readable(): + readable.append(connection.fileno()) + if connection.is_writeable(): + writable.append(connection.fileno()) + if connection.is_closed(): + del self.clients[i] + return select.select(readable, writable, readable) + + def handle(self): + """Handle requests. + + WARNING! You must call prepare() BEFORE calling handle() + """ + assert self.prepared, "You have to call prepare before handle" + rset, wset, xset = self._select() + for readable in rset: + if readable == self._read.fileno(): + # don't care i just need to clean readable flag + self._read.recv(1024) + elif readable == self.socket.sock.fileno(): + client = self.socket.accept().sock + self.clients[client.fileno()] = Connection(client, + self.wake_up) + else: + connection = self.clients[readable] + connection.read() + if isinstance(connection.socket, ssl.SSLSocket) and connection.status in (WAIT_LEN, WAIT_MESSAGE): + connection.read() + if connection.status == WAIT_PROCESS: + itransport = TMemoryBuffer(connection.message) + otransport = TMemoryBuffer() + iprot = self.in_protocol.get_protocol(itransport) + oprot = self.out_protocol.get_protocol(otransport) + self.tasks.put([self.processor, iprot, oprot, + otransport, connection.ready]) + for writeable in wset: + self.clients[writeable].write() + for oob in xset: + self.clients[oob].close() + del self.clients[oob] + + def close(self): + """Closes the server.""" + for _ in range(self.threads): + self.tasks.put([None, None, None, None, None]) + self.socket.close() + self.prepared = False + + def serve(self): + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False + self.prepare() + while not self._stop: + self.handle() From 540abd3e1439d72f9655721e3f48447869ea3224 Mon Sep 17 00:00:00 2001 From: wallezhang Date: Tue, 23 Jan 2018 17:10:12 +0800 Subject: [PATCH 2/9] add parameters to make_server() --- thriftpy/rpc.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/thriftpy/rpc.py b/thriftpy/rpc.py index 389ad76..3d0e2f2 100644 --- a/thriftpy/rpc.py +++ b/thriftpy/rpc.py @@ -3,10 +3,11 @@ from __future__ import absolute_import import contextlib -import warnings +import warnings, multiprocessing from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.server import TThreadedServer +from thriftpy.nonblockingserver import TNonblockingServer from thriftpy.thrift import TProcessor, TClient from thriftpy.transport import ( TBufferedTransportFactory, @@ -47,7 +48,8 @@ def make_server(service, handler, host="localhost", port=9090, unix_socket=None, proto_factory=TBinaryProtocolFactory(), trans_factory=TBufferedTransportFactory(), - client_timeout=3000, certfile=None): + client_timeout=3000, certfile=None, blocking=True, + threads=multiprocessing.cpu_count()): processor = TProcessor(service, handler) if unix_socket: @@ -65,9 +67,13 @@ def make_server(service, handler, else: raise ValueError("Either host/port or unix_socket must be provided.") - server = TThreadedServer(processor, server_socket, - iprot_factory=proto_factory, - itrans_factory=trans_factory) + if blocking: + server = TThreadedServer(processor, server_socket, + iprot_factory=proto_factory, + itrans_factory=trans_factory) + else: + server = TNonblockingServer(processor, server_socket, threads, + proto_factory, proto_factory) return server From 36b8da0fedb823941c30d786c0abf54ab631fcc9 Mon Sep 17 00:00:00 2001 From: wallezhang Date: Tue, 23 Jan 2018 17:11:11 +0800 Subject: [PATCH 3/9] fix broken pipe while read 0 byte --- thriftpy/nonblockingserver.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/thriftpy/nonblockingserver.py b/thriftpy/nonblockingserver.py index e215bee..7aa2b86 100644 --- a/thriftpy/nonblockingserver.py +++ b/thriftpy/nonblockingserver.py @@ -17,6 +17,7 @@ import struct import threading import ssl +import multiprocessing from six.moves import queue @@ -222,9 +223,9 @@ class TNonblockingServer(object): def __init__(self, processor, lsocket, + threads, inputProtocolFactory=None, - outputProtocolFactory=None, - threads=10): + outputProtocolFactory=None): self.processor = processor self.socket = lsocket self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() @@ -312,7 +313,7 @@ def handle(self): else: connection = self.clients[readable] connection.read() - if isinstance(connection.socket, ssl.SSLSocket) and connection.status in (WAIT_LEN, WAIT_MESSAGE): + while isinstance(connection.socket, ssl.SSLSocket) and connection.socket.pending() > 0: connection.read() if connection.status == WAIT_PROCESS: itransport = TMemoryBuffer(connection.message) From 81805661f8db826f6541c78475d9f40537324d55 Mon Sep 17 00:00:00 2001 From: wallezhang Date: Mon, 19 Mar 2018 09:23:10 +0800 Subject: [PATCH 4/9] Add getvalue() function to cyframed.pyx --- thriftpy/transport/framed/cyframed.pyx | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/thriftpy/transport/framed/cyframed.pyx b/thriftpy/transport/framed/cyframed.pyx index 250f4de..a0e2700 100644 --- a/thriftpy/transport/framed/cyframed.pyx +++ b/thriftpy/transport/framed/cyframed.pyx @@ -1,13 +1,12 @@ -from libc.stdlib cimport malloc, free -from libc.string cimport memcpy -from libc.stdint cimport int32_t +from libc.stdint cimport -from thriftpy.transport.cybase cimport ( - TCyBuffer, - CyTransportBase, - DEFAULT_BUFFER, - STACK_STRING_LEN -) +int32_t +from libc.stdlib cimport + +malloc, free +from libc.string cimport + +memcpy from .. import TTransportException @@ -120,6 +119,9 @@ cdef class TCyFramedTransport(CyTransportBase): self.rframe_buf.clean() self.wframe_buf.clean() + def getvalue(self): + return self._trans.getvalue() + class TCyFramedTransportFactory(object): def get_transport(self, trans): From bae9e0411b944f758dfd7af259cc7cfc7b1eda0a Mon Sep 17 00:00:00 2001 From: wallezhang Date: Mon, 19 Mar 2018 10:39:53 +0800 Subject: [PATCH 5/9] Fix compile error --- thriftpy/transport/framed/cyframed.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thriftpy/transport/framed/cyframed.pyx b/thriftpy/transport/framed/cyframed.pyx index a0e2700..e20bf15 100644 --- a/thriftpy/transport/framed/cyframed.pyx +++ b/thriftpy/transport/framed/cyframed.pyx @@ -125,4 +125,4 @@ cdef class TCyFramedTransport(CyTransportBase): class TCyFramedTransportFactory(object): def get_transport(self, trans): - return TCyFramedTransport(trans) + return TCyFramedTransport(trans) \ No newline at end of file From 658b161a0aa92d2dcf2d2986dcd2c44eab08fe9c Mon Sep 17 00:00:00 2001 From: wallezhang Date: Mon, 19 Mar 2018 10:46:14 +0800 Subject: [PATCH 6/9] Fix compile error --- thriftpy/transport/framed/cyframed.pyx | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/thriftpy/transport/framed/cyframed.pyx b/thriftpy/transport/framed/cyframed.pyx index e20bf15..bde6a78 100644 --- a/thriftpy/transport/framed/cyframed.pyx +++ b/thriftpy/transport/framed/cyframed.pyx @@ -1,12 +1,13 @@ -from libc.stdint cimport - -int32_t -from libc.stdlib cimport - -malloc, free -from libc.string cimport - -memcpy +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy +from libc.stdint cimport int32_t + +from thriftpy.transport.cybase cimport ( + TCyBuffer, + CyTransportBase, + DEFAULT_BUFFER, + STACK_STRING_LEN +) from .. import TTransportException From e9a90f740b04731e2f57500f5017b5e896f922ce Mon Sep 17 00:00:00 2001 From: wallezhang Date: Mon, 7 May 2018 14:59:12 +0800 Subject: [PATCH 7/9] Separate business handler thread from IO thread --- thriftpy/nonblockingserver.py | 38 +++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/thriftpy/nonblockingserver.py b/thriftpy/nonblockingserver.py index 7aa2b86..40bb050 100644 --- a/thriftpy/nonblockingserver.py +++ b/thriftpy/nonblockingserver.py @@ -17,7 +17,7 @@ import struct import threading import ssl -import multiprocessing +from errno import EINTR from six.moves import queue @@ -101,6 +101,7 @@ def __init__(self, new_socket, wake_up): self.status = WAIT_LEN self.len = 0 self.message = b'' + self.message_to_send = b'' self.lock = threading.Lock() self.wake_up = wake_up @@ -151,17 +152,12 @@ def read(self): if len(self.message) == self.len: self.status = WAIT_PROCESS + @locked @socket_exception def write(self): """Writes data from socket and switch state.""" - assert self.status == SEND_ANSWER - sent = self.socket.send(self.message) - if sent == len(self.message): - self.status = WAIT_LEN - self.message = b'' - self.len = 0 - else: - self.message = self.message[sent:] + sent = self.socket.send(self.message_to_send) + self.message_to_send = self.message_to_send[sent:] @locked def ready(self, all_ok, message): @@ -176,25 +172,19 @@ def ready(self, all_ok, message): The one wakes up main thread. """ - assert self.status == WAIT_PROCESS if not all_ok: self.close() self.wake_up() return self.len = 0 - if len(message) == 0: - # it was a oneway request, do not write answer - self.message = b'' - self.status = WAIT_LEN - else: - self.message = struct.pack('!i', len(message)) + message - self.status = SEND_ANSWER + if len(message) > 0: + self.message_to_send += struct.pack('!i', len(message)) + message self.wake_up() @locked def is_writeable(self): """Return True if connection should be added to write list of select""" - return self.status == SEND_ANSWER + return len(self.message_to_send) > 0 # it's not necessary, but... @locked @@ -301,7 +291,14 @@ def handle(self): WARNING! You must call prepare() BEFORE calling handle() """ assert self.prepared, "You have to call prepare before handle" - rset, wset, xset = self._select() + try: + rset, wset, xset = self._select() + except select.error as err: + if err.args[0] != EINTR: + raise + else: + return + for readable in rset: if readable == self._read.fileno(): # don't care i just need to clean readable flag @@ -322,6 +319,9 @@ def handle(self): oprot = self.out_protocol.get_protocol(otransport) self.tasks.put([self.processor, iprot, oprot, otransport, connection.ready]) + # receive next data packet + connection.status = WAIT_LEN + connection.message = b'' for writeable in wset: self.clients[writeable].write() for oob in xset: From 7ce7ba292764f0e48dbabea4fec10c4b14c60d26 Mon Sep 17 00:00:00 2001 From: wallezhang Date: Fri, 18 May 2018 10:01:58 +0800 Subject: [PATCH 8/9] Fix handle loop exit by catching EBADF error --- thriftpy/nonblockingserver.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/thriftpy/nonblockingserver.py b/thriftpy/nonblockingserver.py index 40bb050..240b66a 100644 --- a/thriftpy/nonblockingserver.py +++ b/thriftpy/nonblockingserver.py @@ -17,7 +17,7 @@ import struct import threading import ssl -from errno import EINTR +from errno import EINTR, EBADF from six.moves import queue @@ -277,12 +277,19 @@ def _select(self): readable = [self.socket.sock.fileno(), self._read.fileno()] writable = [] for i, connection in list(self.clients.items()): - if connection.is_readable(): - readable.append(connection.fileno()) - if connection.is_writeable(): - writable.append(connection.fileno()) - if connection.is_closed(): - del self.clients[i] + try: + if connection.is_readable(): + readable.append(connection.fileno()) + if connection.is_writeable(): + writable.append(connection.fileno()) + if connection.is_closed(): + del self.clients[i] + except socket.error as err: + if err.args[0] == EBADF: + logger.error('connection %s is closed already! err: %s' % (connection, err)) + del self.clients[i] + else: + logger.exception(err.message) return select.select(readable, writable, readable) def handle(self): From 5ea23f700338ccd007d024c5db227e732ae5a65f Mon Sep 17 00:00:00 2001 From: wallezhang Date: Mon, 21 May 2018 11:25:40 +0800 Subject: [PATCH 9/9] Fix negative buffer size in recv function --- thriftpy/nonblockingserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thriftpy/nonblockingserver.py b/thriftpy/nonblockingserver.py index 240b66a..53c6ec7 100644 --- a/thriftpy/nonblockingserver.py +++ b/thriftpy/nonblockingserver.py @@ -176,7 +176,6 @@ def ready(self, all_ok, message): self.close() self.wake_up() return - self.len = 0 if len(message) > 0: self.message_to_send += struct.pack('!i', len(message)) + message self.wake_up() @@ -329,6 +328,7 @@ def handle(self): # receive next data packet connection.status = WAIT_LEN connection.message = b'' + connection.len = 0 for writeable in wset: self.clients[writeable].write() for oob in xset: