diff --git a/.gitignore b/.gitignore index 14cbb881..60eaea7a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ sophia # Vim Swap files *.sw[a-z] +.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 26676fc7..2217c63d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,6 @@ env: - OS=ubuntu DIST=trusty - OS=ubuntu DIST=xenial - OS=ubuntu DIST=bionic - - OS=ubuntu DIST=disco - OS=debian DIST=jessie - OS=debian DIST=stretch - OS=debian DIST=buster diff --git a/appveyor.yml b/appveyor.yml index e0485939..b72c5e82 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -2,20 +2,20 @@ environment: matrix: - PYTHON: "C:\\Python27" - PYTHON: "C:\\Python27-x64" - - PYTHON: "C:\\Python34" - - PYTHON: "C:\\Python34-x64" - PYTHON: "C:\\Python35" - PYTHON: "C:\\Python35-x64" - PYTHON: "C:\\Python36" - PYTHON: "C:\\Python36-x64" - PYTHON: "C:\\Python37" - PYTHON: "C:\\Python37-x64" + - PYTHON: "C:\\Python38" + - PYTHON: "C:\\Python38-x64" install: # install runtime dependencies - "%PYTHON%\\python.exe -m pip install -r requirements.txt" # install testing dependencies - - "%PYTHON%\\python.exe -m pip install pyyaml" + - "%PYTHON%\\python.exe -m pip install pyyaml dbapi-compliance==1.15.0" build: off diff --git a/tarantool/__init__.py b/tarantool/__init__.py index a9838321..b4ed81d4 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -75,4 +75,4 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None, __all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema', 'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning', - 'SchemaError'] + 'SchemaError', 'dbapi'] diff --git a/tarantool/connection.py b/tarantool/connection.py index 6f330896..afb7f23f 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -11,6 +11,7 @@ import ctypes import ctypes.util + try: from ctypes import c_ssize_t except ImportError: @@ -34,7 +35,8 @@ RequestSubscribe, RequestUpdate, RequestUpsert, - RequestAuthenticate + RequestAuthenticate, + RequestExecute ) from tarantool.space import Space from tarantool.const import ( @@ -49,11 +51,18 @@ ITERATOR_ALL ) from tarantool.error import ( + Error, NetworkError, DatabaseError, InterfaceError, SchemaError, NetworkWarning, + OperationalError, + DataError, + IntegrityError, + InternalError, + ProgrammingError, + NotSupportedError, SchemaReloadException, warn ) @@ -76,11 +85,20 @@ class Connection(object): Also this class provides low-level interface to data manipulation (insert/delete/update/select). ''' - Error = tarantool.error + # DBAPI Extension: supply exceptions as attributes on the connection + Error = Error DatabaseError = DatabaseError InterfaceError = InterfaceError SchemaError = SchemaError NetworkError = NetworkError + Warning = Warning + DataError = DataError + OperationalError = OperationalError + IntegrityError = IntegrityError + InternalError = InternalError + ProgrammingError = ProgrammingError + NotSupportedError = NotSupportedError + ImproperlyConfigured = Exception def __init__(self, host, port, user=None, @@ -91,6 +109,7 @@ def __init__(self, host, port, connect_now=True, encoding=ENCODING_DEFAULT, call_16=False, + use_list=True, connection_timeout=CONNECTION_TIMEOUT): ''' Initialize a connection to the server. @@ -123,6 +142,7 @@ def __init__(self, host, port, self._socket = None self.connected = False self.error = True + self.use_list = use_list self.encoding = encoding self.call_16 = call_16 self.connection_timeout = connection_timeout @@ -260,7 +280,7 @@ def _send_request_wo_reconnect(self, request): while True: try: self._socket.sendall(bytes(request)) - response = Response(self, self._read_response()) + response = Response(self, self._read_response(), self.use_list) break except SchemaReloadException as e: self.update_schema(e.schema_version) @@ -292,13 +312,12 @@ def check(): # Check that connection is alive retbytes = self._sys_recv(sock_fd, buf, 1, flag) err = 0 - if os.name!= 'nt': + if os.name != 'nt': err = ctypes.get_errno() else: err = ctypes.get_last_error() self._socket.setblocking(True) - WWSAEWOULDBLOCK = 10035 if (retbytes < 0) and (err == errno.EAGAIN or err == errno.EWOULDBLOCK or @@ -445,7 +464,7 @@ def _join_v16(self, server_uuid): self._socket.sendall(bytes(request)) while True: - resp = Response(self, self._read_response()) + resp = Response(self, self._read_response(), self.use_list) yield resp if resp.code == REQUEST_TYPE_OK or resp.code >= REQUEST_TYPE_ERROR: return @@ -459,7 +478,7 @@ class JoinState: self._socket.sendall(bytes(request)) state = JoinState.Handshake while True: - resp = Response(self, self._read_response()) + resp = Response(self, self._read_response(), self.use_list) yield resp if resp.code >= REQUEST_TYPE_ERROR: return @@ -488,7 +507,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None): request = RequestSubscribe(self, cluster_uuid, server_uuid, vclock) self._socket.sendall(bytes(request)) while True: - resp = Response(self, self._read_response()) + resp = Response(self, self._read_response(), self.use_list) yield resp if resp.code >= REQUEST_TYPE_ERROR: return @@ -785,3 +804,23 @@ def generate_sync(self): Need override for async io connection ''' return 0 + + def execute(self, query, params=None): + ''' + Execute SQL request. + Execute SQL query in database. + + :param query: SQL syntax query + :type query: str + + :param params: Bind values to use in query + :type params: list, dict + + :return: query result data + :rtype: list + ''' + if not params: + params = [] + request = RequestExecute(self, query, params) + response = self._send_request(request) + return response \ No newline at end of file diff --git a/tarantool/const.py b/tarantool/const.py index 9d175974..3798004e 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -29,6 +29,12 @@ # IPROTO_DATA = 0x30 IPROTO_ERROR = 0x31 +# +IPROTO_METADATA = 0x32 +IPROTO_SQL_TEXT = 0x40 +IPROTO_SQL_BIND = 0x41 +IPROTO_SQL_INFO = 0x42 + IPROTO_GREETING_SIZE = 128 IPROTO_BODY_MAX_LEN = 2147483648 @@ -44,6 +50,7 @@ REQUEST_TYPE_EVAL = 8 REQUEST_TYPE_UPSERT = 9 REQUEST_TYPE_CALL = 10 +REQUEST_TYPE_EXECUTE = 11 REQUEST_TYPE_PING = 64 REQUEST_TYPE_JOIN = 65 REQUEST_TYPE_SUBSCRIBE = 66 diff --git a/tarantool/dbapi.py b/tarantool/dbapi.py new file mode 100644 index 00000000..26663fe2 --- /dev/null +++ b/tarantool/dbapi.py @@ -0,0 +1,312 @@ +from __future__ import unicode_literals + +import re + +from tarantool.error import InterfaceError +from .connection import Connection as BaseConnection + + +update_insert_pattern = re.compile(r'^UPDATE|^INSERT', re.IGNORECASE) + + +class Cursor: + _lastrowid = 0 + _rowcount = 0 + description = None + position = 0 + arraysize = 1 + autocommit = True + closed = False + rows = None + + def __init__(self, conn): + self._c = conn + + def callproc(self, procname, *params): # TODO + """ + Call a stored database procedure with the given name. The sequence of + parameters must contain one entry for each argument that the + procedure expects. The result of the call is returned as modified + copy of the input sequence. Input parameters are left untouched, + output and input/output parameters replaced with possibly new values. + + The procedure may also provide a result set as output. This must then + be made available through the standard .fetch*() methods. + """ + + def close(self): + """ + Close the cursor now (rather than whenever __del__ is called). + + The cursor will be unusable from this point forward; an Error (or + subclass) exception will be raised if any operation is attempted with + the cursor. + """ + self._c = None + + @staticmethod + def _convert_param(p): + if p is None: + return "NULL" + if isinstance(p, bool): + return str(p) + if isinstance(p, str): + return "'%s'" % p.replace("'", "''") + return "'%s'" % p + + @staticmethod + def _extract_last_row_id(body): # TODO: Need to be checked + try: + val = tuple(tuple(body.items())[0][-1].items())[-1][-1][0] + except TypeError: + val = -1 + return val + + def execute(self, query, params=None): + """ + Prepare and execute a database operation (query or command). + + Parameters may be provided as sequence or mapping and will be bound + to variables in the operation. Variables are specified in a + database-specific notation (see the module's paramstyle attribute for + details). + + A reference to the operation will be retained by the cursor. If the + same operation object is passed in again, then the cursor can + optimize its behavior. This is most effective for algorithms where + the same operation is used, but different parameters are bound to it + (many times). + + For maximum efficiency when reusing an operation, it is best to use + the .setinputsizes() method to specify the parameter types and sizes + ahead of time. It is legal for a parameter to not match the + predefined information; the implementation should compensate, + possibly with a loss of efficiency. + + The parameters may also be specified as list of tuples to e.g. insert + multiple rows in a single operation, but this kind of usage is + deprecated: .executemany() should be used instead. + + Return values are not defined. + """ + if self.closed: + raise self._c.ProgrammingError + + if params: + query = query % tuple( + self._convert_param(param) for param in params) + + response = self._c.execute(query) + + self.rows = tuple(response.body.values())[1] if len( + response.body) > 1 else None + + if update_insert_pattern.match(query): + try: + self._rowcount = response.rowcount + except InterfaceError: + self._rowcount = 1 + else: + self._rowcount = -1 + + if query.upper().startswith('INSERT'): + self._lastrowid = self._extract_last_row_id(response.body) + return response + + def executemany(self, query, param_sets): + rowcounts = [] + for params in param_sets: + self.execute(query, params) + rowcounts.append(self.rowcount) + + self._rowcount = -1 if -1 in rowcounts else sum(rowcounts) + return self + + @property + def lastrowid(self): + """ + This read-only attribute provides the rowid of the last modified row + (most databases return a rowid only when a single INSERT operation is + performed). If the operation does not set a rowid or if the database + does not support rowids, this attribute should be set to None. + + The semantics of .lastrowid are undefined in case the last executed + statement modified more than one row, e.g. when using INSERT with + .executemany(). + + Warning Message: "DB-API extension cursor.lastrowid used" + """ + return self._lastrowid + + @property + def rowcount(self): + """ + This read-only attribute specifies the number of rows that the last + .execute*() produced (for DQL statements like SELECT) or affected ( + for DML statements like UPDATE or INSERT). + + The attribute is -1 in case no .execute*() has been performed on the + cursor or the rowcount of the last operation is cannot be determined + by the interface. + + Note: + Future versions of the DB API specification could redefine the latter + case to have the object return None instead of -1. + """ + return self._rowcount + + def fetchone(self): + """ + Fetch the next row of a query result set, returning a single + sequence, or None when no more data is available. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + """ + if self.rows is None: + raise self._c.ProgrammingError('Nothing to fetch') + return self.fetchmany(1)[0] if len(self.rows) else None + + def fetchmany(self, size=None): + """ + Fetch the next set of rows of a query result, returning a sequence of + sequences (e.g. a list of tuples). An empty sequence is returned when + no more rows are available. + + The number of rows to fetch per call is specified by the parameter. + If it is not given, the cursor's arraysize determines the number of + rows to be fetched. The method should try to fetch as many rows as + indicated by the size parameter. If this is not possible due to the + specified number of rows not being available, fewer rows may be + returned. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + + Note there are performance considerations involved with the size + parameter. For optimal performance, it is usually best to use the + .arraysize attribute. If the size parameter is used, then it is best + for it to retain the same value from one .fetchmany() call to the next. + """ + size = size or self.arraysize + + if self.rows is None: + raise self._c.ProgrammingError('Nothing to fetch') + + if len(self.rows) < size: + items = self.rows + self.rows = [] + else: + items, self.rows = self.rows[:size], self.rows[size:] + + return items if len(items) else [] + + def fetchall(self): + """Fetch all (remaining) rows of a query result, returning them as a + sequence of sequences (e.g. a list of tuples). Note that the cursor's + arraysize attribute can affect the performance of this operation. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + """ + if self.rows is None: + raise self._c.ProgrammingError('Nothing to fetch') + + items = self.rows[:] + self.rows = [] + return items + + def setinputsizes(self, sizes): + """This can be used before a call to .execute*() to predefine memory + areas for the operation's parameters. + + sizes is specified as a sequence - one item for each input parameter. + The item should be a Type Object that corresponds to the input that + will be used, or it should be an integer specifying the maximum + length of a string parameter. If the item is None, then no predefined + memory area will be reserved for that column (this is useful to avoid + predefined areas for large inputs). + + This method would be used before the .execute*() method is invoked. + + Implementations are free to have this method do nothing and users are + free to not use it.""" + + def setoutputsize(self, size, column=None): + """Set a column buffer size for fetches of large columns (e.g. LONGs, + BLOBs, etc.). The column is specified as an index into the result + sequence. Not specifying the column will set the default size for all + large columns in the cursor. + + This method would be used before the .execute*() method is invoked. + + Implementations are free to have this method do nothing and users are + free to not use it.""" + + +class Connection(BaseConnection): + _cursor = None + paramstyle = 'format' + apilevel = "2.0" + threadsafety = 0 + + server_version = 2 + + def connect(self): + BaseConnection.connect(self) + return self + + def commit(self): + """ + Commit any pending transaction to the database. + + Note that if the database supports an auto-commit feature, this must + be initially off. An interface method may be provided to turn it back + on. + + Database modules that do not support transactions should implement + this method with void functionality. + """ + if self._socket is None: + raise self.ProgrammingError + + def rollback(self): + """ + In case a database does provide transactions this method causes the + database to roll back to the start of any pending transaction. + Closing a connection without committing the changes first will cause + an implicit rollback to be performed. + """ + if self._socket is None: + raise self.ProgrammingError + + def execute(self, query, params=None): + if self._socket is None: + raise self.ProgrammingError('Can not execute on closed connection') + return super().execute(query, params) + + def close(self): + """ + Close the connection now (rather than whenever .__del__() is called). + + The connection will be unusable from this point forward; an Error (or + subclass) exception will be raised if any operation is attempted with + the connection. The same applies to all cursor objects trying to use + the connection. Note that closing a connection without committing the + changes first will cause an implicit rollback to be performed. + """ + if self._socket: + self._socket.close() + self._socket = None + else: + raise self.ProgrammingError('Connection already closed') + + def cursor(self, params=None): + """ + Return a new Cursor Object using the connection. + + If the database does not provide a direct cursor concept, the module + will have to emulate cursors using other means to the extent needed + by this specification. + """ + return Cursor(self) diff --git a/tarantool/error.py b/tarantool/error.py index cc66e8c5..8bdb5714 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -49,6 +49,30 @@ class ConfigurationError(Error): ''' +class InternalError(DatabaseError): + pass + + +class OperationalError(DatabaseError): + pass + + +class ProgrammingError(DatabaseError): + pass + + +class IntegrityError(DatabaseError): + pass + + +class DataError(DatabaseError): + pass + + +class NotSupportedError(DatabaseError): + pass + + # Monkey patch os.strerror for win32 if sys.platform == "win32": # Windows Sockets Error Codes (not all, but related on network errors) diff --git a/tarantool/request.py b/tarantool/request.py index 2e45c8d9..532218a4 100644 --- a/tarantool/request.py +++ b/tarantool/request.py @@ -7,7 +7,6 @@ import msgpack import hashlib - from tarantool.const import ( IPROTO_CODE, IPROTO_SYNC, @@ -27,6 +26,8 @@ IPROTO_OPS, # IPROTO_INDEX_BASE, IPROTO_SCHEMA_ID, + IPROTO_SQL_TEXT, + IPROTO_SQL_BIND, REQUEST_TYPE_OK, REQUEST_TYPE_PING, REQUEST_TYPE_SELECT, @@ -37,6 +38,7 @@ REQUEST_TYPE_UPSERT, REQUEST_TYPE_CALL16, REQUEST_TYPE_CALL, + REQUEST_TYPE_EXECUTE, REQUEST_TYPE_EVAL, REQUEST_TYPE_AUTHENTICATE, REQUEST_TYPE_JOIN, @@ -47,6 +49,7 @@ binary_types ) + class Request(object): ''' Represents a single request to the server in compliance with the @@ -135,6 +138,16 @@ def sha1(values): IPROTO_TUPLE: ("chap-sha1", scramble)}) self._body = request_body + def header(self, length): + self._sync = self.conn.generate_sync() + # Set IPROTO_SCHEMA_ID: 0 to avoid SchemaReloadException + # It is ok to use 0 in auth every time + header = msgpack.dumps({IPROTO_CODE: self.request_type, + IPROTO_SYNC: self._sync, + IPROTO_SCHEMA_ID: 0}) + + return msgpack.dumps(length + len(header)) + header + def header(self, length): self._sync = self.conn.generate_sync() # Set IPROTO_SCHEMA_ID: 0 to avoid SchemaReloadException @@ -332,3 +345,19 @@ def __init__(self, conn, sync): request_body = msgpack.dumps({IPROTO_CODE: self.request_type, IPROTO_SYNC: sync}) self._body = request_body + + +class RequestExecute(Request): + ''' + Represents EXECUTE request + ''' + request_type = REQUEST_TYPE_EXECUTE + + # pylint: disable=W0231 + def __init__(self, conn, sql, args): + super(RequestExecute, self).__init__(conn) + + request_body = msgpack.dumps({IPROTO_SQL_TEXT: sql, + IPROTO_SQL_BIND: args}) + + self._body = request_body \ No newline at end of file diff --git a/tarantool/response.py b/tarantool/response.py index 9516cd39..2edcc70f 100644 --- a/tarantool/response.py +++ b/tarantool/response.py @@ -35,7 +35,7 @@ class Response(Sequence): and parses binary packet received from the server. ''' - def __init__(self, conn, response): + def __init__(self, conn, response, use_list=True): ''' Create an instance of `Response` using data received from the server. @@ -54,11 +54,11 @@ def __init__(self, conn, response): # Get rid of the following warning. # > PendingDeprecationWarning: encoding is deprecated, # > Use raw=False instead. - unpacker = msgpack.Unpacker(use_list=True, raw=False) + unpacker = msgpack.Unpacker(use_list=use_list, raw=False) elif conn.encoding is not None: - unpacker = msgpack.Unpacker(use_list=True, encoding=conn.encoding) + unpacker = msgpack.Unpacker(use_list=use_list, encoding=conn.encoding) else: - unpacker = msgpack.Unpacker(use_list=True) + unpacker = msgpack.Unpacker(use_list=use_list) unpacker.feed(response) header = unpacker.unpack() @@ -244,4 +244,6 @@ def __str__(self): output.pop() return ''.join(output) - __repr__ = __str__ + def __repr__(self): + return self.__str__() + diff --git a/test.sh b/test.sh index a8c1b577..14aa4495 100755 --- a/test.sh +++ b/test.sh @@ -3,15 +3,11 @@ set -exu # Strict shell (w/o -o pipefail) # Install tarantool. -curl http://download.tarantool.org/tarantool/2x/gpgkey | sudo apt-key add - -release=`lsb_release -c -s` -echo "deb http://download.tarantool.org/tarantool/2x/ubuntu/ ${release} main" | sudo tee /etc/apt/sources.list.d/tarantool_2x.list -sudo apt-get update > /dev/null -sudo apt-get -q -y install tarantool +curl -L https://tarantool.io/installer.sh | VER=2.4 sudo -E bash # Install testing dependencies. pip install -r requirements.txt -pip install pyyaml +pip install pyyaml dbapi-compliance==1.15.0 # Run tests. python setup.py test diff --git a/unit/suites/__init__.py b/unit/suites/__init__.py index ead75297..b4d1cf2b 100644 --- a/unit/suites/__init__.py +++ b/unit/suites/__init__.py @@ -9,9 +9,10 @@ from .test_protocol import TestSuite_Protocol from .test_reconnect import TestSuite_Reconnect from .test_mesh import TestSuite_Mesh +from .test_dbapi import TestSuite_DBAPI test_cases = (TestSuite_Schema, TestSuite_Request, TestSuite_Protocol, - TestSuite_Reconnect, TestSuite_Mesh) + TestSuite_Reconnect, TestSuite_Mesh, TestSuite_DBAPI) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/unit/suites/test_dbapi.py b/unit/suites/test_dbapi.py new file mode 100644 index 00000000..fd3b6fe2 --- /dev/null +++ b/unit/suites/test_dbapi.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import sys +import unittest + +import dbapi20 + +import tarantool +from tarantool.dbapi import Connection +from .lib.tarantool_server import TarantoolServer + + +class TestSuite_DBAPI(dbapi20.DatabaseAPI20Test): + table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables + + ddl1 = 'create table %sbooze (name varchar(20) primary key)' % table_prefix + ddl2 = 'create table %sbarflys (name varchar(20) primary key, ' \ + 'drink varchar(30))' % table_prefix + + @classmethod + def setUpClass(self): + print(' SCHEMA '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + self.srv = TarantoolServer() + self.srv.script = 'unit/suites/box.lua' + self.srv.start() + self.con = tarantool.Connection(self.srv.host, self.srv.args['primary']) + self.driver = Connection(self.srv.host, self.srv.args['primary']) + + def setUp(self): + # prevent a remote tarantool from clean our session + if self.srv.is_started(): + self.srv.touch_lock() + self.con.flush_schema() + + # grant full access to guest + self.srv.admin("box.schema.user.grant('guest', 'create,read,write," + "execute', 'universe')") + + @classmethod + def tearDownClass(self): + self.con.close() + self.srv.stop() + self.srv.clean() + + @unittest.skip('Not implemented') + def test_Binary(self): + pass + + @unittest.skip('Not implemented') + def test_STRING(self): + pass + + @unittest.skip('Not implemented') + def test_BINARY(self): + pass + + @unittest.skip('Not implemented') + def test_NUMBER(self): + pass + + @unittest.skip('Not implemented') + def test_DATETIME(self): + pass + + @unittest.skip('Not implemented') + def test_ROWID(self): + pass + + @unittest.skip('Not implemented') + def test_Date(self): + pass + + @unittest.skip('Not implemented') + def test_Time(self): + pass + + @unittest.skip('Not implemented') + def test_Timestamp(self): + pass + + @unittest.skip('Not implemented as optional.') + def test_nextset(self): + pass + + @unittest.skip('To do') + def test_callproc(self): + pass + + def test_setoutputsize(self): # Do nothing + pass + + @unittest.skip('To do') + def test_description(self): + pass