Skip to content

Commit

Permalink
Merge pull request #127 from peter-wangxu/add_VACUUM
Browse files Browse the repository at this point in the history
Add VACUUM for SQLiteAckQueue
  • Loading branch information
peter-wangxu authored Mar 9, 2020
2 parents 7bdbcde + 84f7148 commit 321197b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
20 changes: 14 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ and `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle
This project is based on the achievements of `python-pqueue <https://github.com/balena/python-pqueue>`_
and `queuelib <https://github.com/scrapy/queuelib>`_

Slack channels
^^^^^^^^^^^^^^

Join `persist-queue <https://join.slack
.com/t/persist-queue/shared_invite
/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel


Requirements
------------
* Python 2.7 or Python 3.x.
Expand Down Expand Up @@ -193,6 +201,12 @@ The core functions:
- ``ack``: mark item as acked
- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it
- ``ack_failed``: there might be something wrong during process, so just mark item as failed.
- ``clear_acked_data``: perform a sql delete agaist sqlite, it remove the
latest 1000 items whose status is ``AckStatus.acked`` (note: this does not
shrink the file size on disk)
- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild
the database file, this usually takes long time and frees a lot of disk space
after ``clear_acked_data``

.. code-block:: python
Expand Down Expand Up @@ -480,12 +494,6 @@ Contribution
Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to
enhance this project with you :).

Slack channels
^^^^^^^^^^^^^^

Join `persist-queue <https://join.slack
.com/t/persist-queue/shared_invite
/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel

License
-------
Expand Down
43 changes: 24 additions & 19 deletions persistqueue/sqlackqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ class SQLiteAckQueue(sqlbase.SQLiteBase):
'{key_column} INTEGER PRIMARY KEY AUTOINCREMENT, '
'data BLOB, timestamp FLOAT, status INTEGER)')
# SQL to insert a record
_SQL_INSERT = 'INSERT INTO {table_name} (data, timestamp, status)'\
' VALUES (?, ?, %s)' % AckStatus.inited
_SQL_INSERT = 'INSERT INTO {table_name} (data, timestamp, status)' \
' VALUES (?, ?, %s)' % AckStatus.inited
# SQL to select a record
_SQL_SELECT = ('SELECT {key_column}, data, status FROM {table_name} '
'WHERE status < %s '
'ORDER BY {key_column} ASC LIMIT 1' % AckStatus.unack)
_SQL_MARK_ACK_UPDATE = 'UPDATE {table_name} SET status = ?'\
' WHERE {key_column} = ?'
_SQL_SELECT_WHERE = 'SELECT {key_column}, data FROM {table_name}'\
' WHERE status < %s AND' \
' {column} {op} ? ORDER BY {key_column} ASC'\
' LIMIT 1 ' % AckStatus.unack
_SQL_MARK_ACK_UPDATE = 'UPDATE {table_name} SET status = ?' \
' WHERE {key_column} = ?'
_SQL_SELECT_WHERE = 'SELECT {key_column}, data FROM {table_name}' \
' WHERE status < %s AND' \
' {column} {op} ? ORDER BY {key_column} ASC' \
' LIMIT 1 ' % AckStatus.unack

def __init__(self, path, auto_resume=True, **kwargs):
super(SQLiteAckQueue, self).__init__(path, **kwargs)
Expand All @@ -65,9 +65,9 @@ def resume_unack_tasks(self):
unack_count = self.unack_count()
if unack_count:
log.warning("resume %d unack tasks", unack_count)
sql = 'UPDATE {} set status = ?'\
' WHERE status = ?'.format(self._table_name)
return sql, (AckStatus.ready, AckStatus.unack, )
sql = 'UPDATE {} set status = ?' \
' WHERE status = ?'.format(self._table_name)
return sql, (AckStatus.ready, AckStatus.unack,)

def put(self, item):
obj = self._serializer.dumps(item)
Expand All @@ -82,17 +82,17 @@ def _init(self):
self.total = self._count()

def _count(self):
sql = 'SELECT COUNT({}) FROM {}'\
' WHERE status < ?'.format(self._key_column,
self._table_name)
sql = 'SELECT COUNT({}) FROM {}' \
' WHERE status < ?'.format(self._key_column,
self._table_name)
row = self._getter.execute(sql, (AckStatus.unack,)).fetchone()
return row[0] if row else 0

def _ack_count_via_status(self, status):
sql = 'SELECT COUNT({}) FROM {}'\
' WHERE status = ?'.format(self._key_column,
self._table_name)
row = self._getter.execute(sql, (status, )).fetchone()
sql = 'SELECT COUNT({}) FROM {}' \
' WHERE status = ?'.format(self._key_column,
self._table_name)
row = self._getter.execute(sql, (status,)).fetchone()
return row[0] if row else 0

def unack_count(self):
Expand All @@ -109,7 +109,7 @@ def ack_failed_count(self):

@sqlbase.with_conditional_transaction
def _mark_ack_status(self, key, status):
return self._sql_mark_ack_status, (status, key, )
return self._sql_mark_ack_status, (status, key,)

@sqlbase.with_conditional_transaction
def clear_acked_data(self):
Expand All @@ -123,6 +123,11 @@ def clear_acked_data(self):
max_acked_length=self._MAX_ACKED_LENGTH)
return sql, AckStatus.acked

@sqlbase.with_conditional_transaction
def shrink_disk_usage(self):
sql = """VACUUM"""
return sql, ()

@property
def _sql_mark_ack_status(self):
return self._SQL_MARK_ACK_UPDATE.format(table_name=self._table_name,
Expand Down
1 change: 1 addition & 0 deletions persistqueue/tests/test_sqlackqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def test_ack_and_clear(self):
self.assertEqual(q.acked_count(), 100)
q.clear_acked_data()
self.assertEqual(q.acked_count(), 10)
q.shrink_disk_usage()

def test_ack_unknown_item(self):
q = SQLiteAckQueue(path=self.path)
Expand Down

0 comments on commit 321197b

Please sign in to comment.