From dd513aabcda9d1e9f0d39de72cee9be0c040b207 Mon Sep 17 00:00:00 2001 From: Charles Leifer Date: Tue, 8 Oct 2024 07:36:15 -0500 Subject: [PATCH] Use FOR UPDATE SKIP LOCKED when supported by the database. Replaces #817 --- huey/contrib/sql_huey.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/huey/contrib/sql_huey.py b/huey/contrib/sql_huey.py index 5503c2ed..cee9a39c 100644 --- a/huey/contrib/sql_huey.py +++ b/huey/contrib/sql_huey.py @@ -33,6 +33,23 @@ def __init__(self, name='huey', database=None, **kwargs): self.KV, self.Schedule, self.Task = self.create_models() self.create_tables() + # Check for FOR UPDATE SKIP LOCKED support. + if isinstance(self.database, PostgresqlDatabase): + self.for_update = 'FOR UPDATE SKIP LOCKED' + elif isinstance(self.database, MySQLDatabase): + self.for_update = 'FOR UPDATE SKIP LOCKED' # Assume support. + # Try to determine if we're using MariaDB or MySQL. + version, = self.database.execute_sql('select version()').fetchone() + if 'mariadb' in str(version).lower(): + # MariaDB added support in 10.6.0. + if self.database.server_version < (10, 6): + self.for_update = 'FOR UPDATE' + elif self.database.server_version < (8, 0, 1): + # MySQL added support in 8.0.1. + self.for_update = 'FOR UPDATE' + else: + self.for_update = None + def create_models(self): class Base(Model): class Meta: @@ -96,8 +113,8 @@ def dequeue(self): query = (self.tasks(self.Task.id, self.Task.data) .order_by(self.Task.priority.desc(), self.Task.id) .limit(1)) - if self.database.for_update: - query = query.for_update() + if self.for_update: + query = query.for_update(self.for_update) with self.database.atomic(): try: @@ -131,8 +148,8 @@ def read_schedule(self, timestamp): query = (self.schedule(self.Schedule.id, self.Schedule.data) .where(self.Schedule.timestamp <= timestamp) .tuples()) - if self.database.for_update: - query = query.for_update() + if self.for_update: + query = query.for_update(self.for_update) with self.database.atomic(): results = list(query) @@ -185,8 +202,8 @@ def peek_data(self, key): def pop_data(self, key): self.check_conn() query = self.kv().where(self.KV.key == key) - if self.database.for_update: - query = query.for_update() + if self.for_update: + query = query.for_update(self.for_update) with self.database.atomic(): try: