diff --git a/lib/qless/lua/qless-lib.lua b/lib/qless/lua/qless-lib.lua index 08c7ae7d..b8daf8b0 100644 --- a/lib/qless/lua/qless-lib.lua +++ b/lib/qless/lua/qless-lib.lua @@ -1,4 +1,4 @@ --- Current SHA: 525c39000dc71df53a3502491cb4daf0e1128f1d +-- Current SHA: 36199bfcabc3216b754bb75fa9912a1d48f0b1b4 -- This is a generated file ------------------------------------------------------------------------------- -- Forward declarations to make everything happy @@ -68,14 +68,14 @@ end -- If no group is provided, this returns a JSON blob of the counts of the -- various groups of failures known. If a group is provided, it will report up -- to `limit` from `start` of the jobs affected by that issue. --- +-- -- # If no group, then... -- { -- 'group1': 1, -- 'group2': 5, -- ... -- } --- +-- -- # If a group is provided, then... -- { -- 'total': 20, @@ -121,9 +121,9 @@ end ------------------------------------------------------------------------------- -- Return all the job ids currently considered to be in the provided state -- in a particular queue. The response is a list of job ids: --- +-- -- [ --- jid1, +-- jid1, -- jid2, -- ... -- ] @@ -169,7 +169,7 @@ end -- associated with that id, and 'untrack' stops tracking it. In this context, -- tracking is nothing more than saving the job to a list of jobs that are -- considered special. --- +-- -- { -- 'jobs': [ -- { @@ -254,18 +254,18 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] - if _tags[tag] == nil then + if _tags[tag] == nil or _tags[tag] == false then _tags[tag] = true table.insert(tags, tag) end redis.call('zadd', 'ql:t:' .. tag, now, jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags)) return tags else @@ -280,7 +280,7 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] @@ -288,10 +288,10 @@ function Qless.tag(now, command, ...) redis.call('zrem', 'ql:t:' .. tag, jid) redis.call('zincrby', 'ql:tags', -1, tag) end - + local results = {} for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results)) return results else @@ -333,7 +333,7 @@ function Qless.cancel(...) -- make sure that this operation will be ok for i, jid in ipairs(arg) do for j, dep in ipairs(dependents[jid]) do - if dependents[dep] == nil then + if dependents[dep] == nil or dependents[dep] == false then error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep .. ' but is not mentioned to be canceled') end @@ -418,7 +418,7 @@ function Qless.cancel(...) redis.call('del', QlessJob.ns .. jid .. '-history') end end - + return arg end @@ -535,26 +535,26 @@ end -- Complete a job and optionally put it in another queue, either scheduled or -- to be considered waiting immediately. It can also optionally accept other --- jids on which this job will be considered dependent before it's considered +-- jids on which this job will be considered dependent before it's considered -- valid. -- -- The variable-length arguments may be pairs of the form: --- +-- -- ('next' , queue) : The queue to advance it to next -- ('delay' , delay) : The delay for the next queue -- ('depends', : Json of jobs it depends on in the new queue -- '["jid1", "jid2", ...]') --- -function QlessJob:complete(now, worker, queue, data, ...) +function QlessJob:complete(now, worker, queue, raw_data, ...) assert(worker, 'Complete(): Arg "worker" missing') assert(queue , 'Complete(): Arg "queue" missing') - data = assert(cjson.decode(data), - 'Complete(): Arg "data" missing or not JSON: ' .. tostring(data)) + local data = assert(cjson.decode(raw_data), + 'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data)) -- Read in all the optional parameters local options = {} for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end - + -- Sanity check on optional args local nextq = options['next'] local delay = assert(tonumber(options['delay'] or 0)) @@ -581,14 +581,15 @@ function QlessJob:complete(now, worker, queue, data, ...) 'priority', 'retries', 'queue')) if lastworker == false then - error('Complete(): Job does not exist') + error('Complete(): Job ' .. self.jid .. ' does not exist') elseif (state ~= 'running') then - error('Complete(): Job is not currently running: ' .. state) + error('Complete(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif lastworker ~= worker then - error('Complete(): Job has been handed out to another worker: ' .. - tostring(lastworker)) + error('Complete(): Job ' .. self.jid .. + ' has been handed out to another worker: ' .. tostring(lastworker)) elseif queue ~= current_queue then - error('Complete(): Job running in another queue: ' .. + error('Complete(): Job ' .. self.jid .. ' running in another queue: ' .. tostring(current_queue)) end @@ -600,8 +601,8 @@ function QlessJob:complete(now, worker, queue, data, ...) -- update history self:history(now, 'done') - if data then - redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) + if raw_data then + redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data) end -- Remove the job from the previous queue @@ -647,7 +648,7 @@ function QlessJob:complete(now, worker, queue, data, ...) if redis.call('zscore', 'ql:queues', nextq) == false then redis.call('zadd', 'ql:queues', now, nextq) end - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'waiting', 'worker', '', @@ -655,7 +656,7 @@ function QlessJob:complete(now, worker, queue, data, ...) 'queue', nextq, 'expires', 0, 'remaining', tonumber(retries)) - + if (delay > 0) and (#depends == 0) then queue_obj.scheduled.add(now + delay, self.jid) return 'scheduled' @@ -703,18 +704,18 @@ function QlessJob:complete(now, worker, queue, data, ...) 'queue', '', 'expires', 0, 'remaining', tonumber(retries)) - + -- Do the completion dance local count = Qless.config.get('jobs-history-count') local time = Qless.config.get('jobs-history') - + -- These are the default values count = tonumber(count or 50000) time = tonumber(time or 7 * 24 * 60 * 60) - + -- Schedule this job for destructination eventually redis.call('zadd', 'ql:completed', now, self.jid) - + -- Now look at the expired job data. First, based on the current time local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) -- Any jobs that need to be expired... delete @@ -730,7 +731,7 @@ function QlessJob:complete(now, worker, queue, data, ...) end -- And now remove those from the queued-for-cleanup queue redis.call('zremrangebyscore', 'ql:completed', 0, now - time) - + -- Now take the all by the most recent 'count' ids jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do @@ -744,7 +745,7 @@ function QlessJob:complete(now, worker, queue, data, ...) redis.call('del', QlessJob.ns .. jid .. '-history') end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) - + -- Alright, if this has any dependents, then we should go ahead -- and unstick those guys. for i, j in ipairs(redis.call( @@ -768,10 +769,10 @@ function QlessJob:complete(now, worker, queue, data, ...) end end end - + -- Delete our dependents key redis.call('del', QlessJob.ns .. self.jid .. '-dependents') - + return 'complete' end end @@ -782,14 +783,14 @@ end -- specific message. By `group`, we mean some phrase that might be one of -- several categorical modes of failure. The `message` is something more -- job-specific, like perhaps a traceback. --- +-- -- This method should __not__ be used to note that a job has been dropped or -- has failed in a transient way. This method __should__ be used to note that -- a job has something really wrong with it that must be remedied. --- +-- -- The motivation behind the `group` is so that similar errors can be grouped -- together. Optionally, updated data can be provided for the job. A job in --- any state can be marked as failed. If it has been given to a worker as a +-- any state can be marked as failed. If it has been given to a worker as a -- job, then its subsequent requests to heartbeat or complete that job will -- fail. Failed jobs are kept until they are canceled or completed. -- @@ -821,11 +822,12 @@ function QlessJob:fail(now, worker, group, message, data) -- If the job has been completed, we cannot fail it if not state then - error('Fail(): Job does not exist') + error('Fail(): Job ' .. self.jid .. 'does not exist') elseif state ~= 'running' then - error('Fail(): Job not currently running: ' .. state) + error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state) elseif worker ~= oldworker then - error('Fail(): Job running with another worker: ' .. oldworker) + error('Fail(): Job ' .. self.jid .. ' running with another worker: ' .. + oldworker) end -- Send out a log message @@ -860,7 +862,7 @@ function QlessJob:fail(now, worker, group, message, data) queue_obj.locks.remove(self.jid) queue_obj.scheduled.remove(self.jid) - -- The reason that this appears here is that the above will fail if the + -- The reason that this appears here is that the above will fail if the -- job doesn't exist if data then redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) @@ -897,7 +899,7 @@ end -- Throws an exception if: -- - the worker is not the worker with a lock on the job -- - the job is not actually running --- +-- -- Otherwise, it returns the number of retries remaining. If the allowed -- retries have been exhausted, then it is automatically failed, and a negative -- number is returned. @@ -910,7 +912,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) assert(worker, 'Retry(): Arg "worker" missing') delay = assert(tonumber(delay or 0), 'Retry(): Arg "delay" not a number: ' .. tostring(delay)) - + -- Let's see what the old priority, and tags were local oldqueue, state, retries, oldworker, priority, failure = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', @@ -918,11 +920,13 @@ function QlessJob:retry(now, queue, worker, delay, group, message) -- If this isn't the worker that owns if oldworker == false then - error('Retry(): Job does not exist') + error('Retry(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Retry(): Job is not currently running: ' .. state) + error('Retry(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif oldworker ~= worker then - error('Retry(): Job has been given to another worker: ' .. oldworker) + error('Retry(): Job ' .. self.jid .. + ' has been given to another worker: ' .. oldworker) end -- For each of these, decrement their retries. If any of them @@ -943,7 +947,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) -- queue it's in local group = group or 'failed-retries-' .. queue self:history(now, 'failed', {['group'] = group}) - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed', 'worker', '', 'expires', '') @@ -967,7 +971,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) ['worker'] = unpack(self:data('worker')) })) end - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types @@ -1103,11 +1107,14 @@ function QlessJob:heartbeat(now, worker, data) redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state')) if job_worker == false then -- This means the job doesn't exist - error('Heartbeat(): Job does not exist') + error('Heartbeat(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Heartbeat(): Job not currently running: ' .. state) + error( + 'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state) elseif job_worker ~= worker or #job_worker == 0 then - error('Heartbeat(): Job given out to another worker: ' .. job_worker) + error( + 'Heartbeat(): Job ' .. self.jid .. + ' given out to another worker: ' .. job_worker) else -- Otherwise, optionally update the user data, and the heartbeat if data then @@ -1119,11 +1126,11 @@ function QlessJob:heartbeat(now, worker, data) redis.call('hmset', QlessJob.ns .. self.jid, 'expires', expires, 'worker', worker) end - + -- Update hwen this job was last updated on that worker -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid) - + -- And now we should just update the locks local queue = Qless.queue( redis.call('hget', QlessJob.ns .. self.jid, 'queue')) @@ -1144,7 +1151,7 @@ function QlessJob:priority(priority) -- Get the queue the job is currently in, if any local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue') - if queue == nil then + if queue == nil or queue == false then -- If the job doesn't exist, throw an error error('Priority(): Job ' .. self.jid .. ' does not exist') elseif queue == '' then @@ -1177,8 +1184,8 @@ end function QlessJob:timeout(now) local queue_name, state, worker = unpack(redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker')) - if queue_name == nil then - error('Timeout(): Job does not exist') + if queue_name == nil or queue_name == false then + error('Timeout(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then error('Timeout(): Job ' .. self.jid .. ' not running') else @@ -1261,7 +1268,7 @@ function QlessJob:history(now, what, item) -- We'll always keep the first item around local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history') redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1) - if obj ~= nil then + if obj ~= nil and obj ~= false then redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj) end end @@ -1453,11 +1460,11 @@ function QlessQueue:stats(now, date) local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk')) - + count = tonumber(count) or 0 mean = tonumber(mean) or 0 vk = tonumber(vk) - + results.count = count or 0 results.mean = mean or 0 results.histogram = {} @@ -1507,8 +1514,8 @@ function QlessQueue:peek(now, count) -- Now we've checked __all__ the locks for this queue the could -- have expired, and are no more than the number requested. If - -- we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) @@ -1582,8 +1589,8 @@ function QlessQueue:pop(now, worker, count) -- look for all the recurring jobs that need jobs run self:check_recurring(now, count - #jids) - -- If we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- If we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) @@ -1605,19 +1612,19 @@ function QlessQueue:pop(now, worker, count) self:stat(now, 'wait', waiting) redis.call('hset', QlessJob.ns .. jid, 'time', string.format("%.20f", now)) - + -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid) - + -- Update the jobs data, and add its locks, and return the job job:update({ worker = worker, expires = expires, state = 'running' }) - + self.locks.add(expires, jid) - + local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false if tracked then Qless.publish('popped', jid) @@ -1668,7 +1675,7 @@ function QlessQueue:stat(now, stat, val) redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1) else -- days redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1) - end + end redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk) end @@ -1728,8 +1735,8 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) -- Now find what's in the original, but not the new local original = redis.call( 'smembers', QlessJob.ns .. jid .. '-dependencies') - for _, dep in pairs(original) do - if new[dep] == nil then + for _, dep in pairs(original) do + if new[dep] == nil or new[dep] == false then -- Remove k as a dependency redis.call('srem', QlessJob.ns .. dep .. '-dependents' , jid) redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep) @@ -1851,7 +1858,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) end -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) @@ -1921,7 +1928,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) if #arg % 2 == 1 then error('Odd number of additional args: ' .. tostring(arg)) end - + -- Read in all the optional parameters local options = {} for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end @@ -1941,12 +1948,12 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue')) count = count or 0 - -- If it has previously been in another queue, then we should remove + -- If it has previously been in another queue, then we should remove -- some information about it if old_queue then Qless.queue(old_queue).recurring.remove(jid) end - + -- Do some insertions redis.call('hmset', 'ql:r:' .. jid, 'jid' , jid, @@ -1964,14 +1971,14 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) 'backlog' , options.backlog) -- Now, we should schedule the next run of the job self.recurring.add(now + offset, jid) - + -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) end - + return jid else error('Recur(): schedule type "' .. tostring(spec) .. '" unknown') @@ -2017,22 +2024,22 @@ function QlessQueue:check_recurring(now, count) ) end end - - -- We're saving this value so that in the history, we can accurately + + -- We're saving this value so that in the history, we can accurately -- reflect when the job would normally have been scheduled while (score <= now) and (moved < count) do local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1) moved = moved + 1 local child_jid = jid .. '-' .. count - + -- Add this job to the list of jobs tagged with whatever tags were -- supplied for i, tag in ipairs(_tags) do redis.call('zadd', 'ql:t:' .. tag, now, child_jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + -- First, let's save its data redis.call('hmset', QlessJob.ns .. child_jid, 'jid' , child_jid, @@ -2049,12 +2056,12 @@ function QlessQueue:check_recurring(now, count) 'time' , string.format("%.20f", score), 'spawned_from_jid', jid) Qless.job(child_jid):history(score, 'put', {q = self.name}) - + -- Now, if a delay was provided, and if it's in the future, -- then we'll have to schedule it. Otherwise, we're just -- going to add it to the work queue. self.work.add(score, priority, child_jid) - + score = score + interval self.recurring.add(score, jid) end @@ -2069,7 +2076,7 @@ function QlessQueue:check_scheduled(now, count) -- insert into the work queue local scheduled = self.scheduled.ready(now, 0, count) for index, jid in ipairs(scheduled) do - -- With these in hand, we'll have to go out and find the + -- With these in hand, we'll have to go out and find the -- priorities of these jobs, and then we'll insert them -- into the work queue and then when that's complete, we'll -- remove them from the scheduled queue @@ -2154,7 +2161,7 @@ function QlessQueue:invalidate_locks(now, count) -- See how many remaining retries the job has local remaining = tonumber(redis.call( 'hincrby', QlessJob.ns .. jid, 'remaining', -1)) - + -- This is where we actually have to time out the work if remaining < 0 then -- Now remove the instance from the schedule, and work queues @@ -2162,7 +2169,7 @@ function QlessQueue:invalidate_locks(now, count) self.work.remove(jid) self.locks.remove(jid) self.scheduled.remove(jid) - + local group = 'failed-retries-' .. Qless.job(jid):data()['queue'] local job = Qless.job(jid) job:history(now, 'failed', {group = group}) @@ -2178,12 +2185,12 @@ function QlessQueue:invalidate_locks(now, count) ['when'] = now, ['worker'] = unpack(job:data('worker')) })) - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types redis.call('lpush', 'ql:f:' .. group, jid) - + if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('failed', jid) end @@ -2260,11 +2267,11 @@ function QlessRecurringJob:data() local job = redis.call( 'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue', 'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog') - + if not job[1] then return nil end - + return { jid = job[1], klass = job[2], @@ -2287,7 +2294,7 @@ end -- - data -- - klass -- - queue --- - backlog +-- - backlog function QlessRecurringJob:update(now, ...) local options = {} -- Make sure that the job exists @@ -2345,10 +2352,10 @@ function QlessRecurringJob:tag(...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags - for i=1,#arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end - + for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end + tags = cjson.encode(tags) redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) return tags @@ -2404,7 +2411,7 @@ end -- Provide data about all the workers, or if a specific worker is provided, -- then which jobs that worker is responsible for. If no worker is provided, -- expect a response of the form: --- +-- -- [ -- # This is sorted by the recency of activity from that worker -- { @@ -2415,9 +2422,9 @@ end -- ... -- } -- ] --- +-- -- If a worker id is provided, then expect a response of the form: --- +-- -- { -- 'jobs': [ -- jid1, diff --git a/lib/qless/lua/qless.lua b/lib/qless/lua/qless.lua index ae140ce8..dd1f6871 100644 --- a/lib/qless/lua/qless.lua +++ b/lib/qless/lua/qless.lua @@ -1,4 +1,4 @@ --- Current SHA: 525c39000dc71df53a3502491cb4daf0e1128f1d +-- Current SHA: 36199bfcabc3216b754bb75fa9912a1d48f0b1b4 -- This is a generated file local Qless = { ns = 'ql:' @@ -147,17 +147,17 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + for i=2,#arg do local tag = arg[i] - if _tags[tag] == nil then + if _tags[tag] == nil or _tags[tag] == false then _tags[tag] = true table.insert(tags, tag) end redis.call('zadd', 'ql:t:' .. tag, now, jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags)) return tags else @@ -170,17 +170,17 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + for i=2,#arg do local tag = arg[i] _tags[tag] = nil redis.call('zrem', 'ql:t:' .. tag, jid) redis.call('zincrby', 'ql:tags', -1, tag) end - + local results = {} for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results)) return results else @@ -214,7 +214,7 @@ function Qless.cancel(...) for i, jid in ipairs(arg) do for j, dep in ipairs(dependents[jid]) do - if dependents[dep] == nil then + if dependents[dep] == nil or dependents[dep] == false then error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep .. ' but is not mentioned to be canceled') end @@ -282,7 +282,7 @@ function Qless.cancel(...) redis.call('del', QlessJob.ns .. jid .. '-history') end end - + return arg end @@ -376,15 +376,15 @@ function QlessJob:data(...) end end -function QlessJob:complete(now, worker, queue, data, ...) +function QlessJob:complete(now, worker, queue, raw_data, ...) assert(worker, 'Complete(): Arg "worker" missing') assert(queue , 'Complete(): Arg "queue" missing') - data = assert(cjson.decode(data), - 'Complete(): Arg "data" missing or not JSON: ' .. tostring(data)) + local data = assert(cjson.decode(raw_data), + 'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data)) local options = {} for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end - + local nextq = options['next'] local delay = assert(tonumber(options['delay'] or 0)) local depends = assert(cjson.decode(options['depends'] or '[]'), @@ -405,21 +405,22 @@ function QlessJob:complete(now, worker, queue, data, ...) 'priority', 'retries', 'queue')) if lastworker == false then - error('Complete(): Job does not exist') + error('Complete(): Job ' .. self.jid .. ' does not exist') elseif (state ~= 'running') then - error('Complete(): Job is not currently running: ' .. state) + error('Complete(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif lastworker ~= worker then - error('Complete(): Job has been handed out to another worker: ' .. - tostring(lastworker)) + error('Complete(): Job ' .. self.jid .. + ' has been handed out to another worker: ' .. tostring(lastworker)) elseif queue ~= current_queue then - error('Complete(): Job running in another queue: ' .. + error('Complete(): Job ' .. self.jid .. ' running in another queue: ' .. tostring(current_queue)) end self:history(now, 'done') - if data then - redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) + if raw_data then + redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data) end local queue_obj = Qless.queue(queue) @@ -454,7 +455,7 @@ function QlessJob:complete(now, worker, queue, data, ...) if redis.call('zscore', 'ql:queues', nextq) == false then redis.call('zadd', 'ql:queues', now, nextq) end - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'waiting', 'worker', '', @@ -462,7 +463,7 @@ function QlessJob:complete(now, worker, queue, data, ...) 'queue', nextq, 'expires', 0, 'remaining', tonumber(retries)) - + if (delay > 0) and (#depends == 0) then queue_obj.scheduled.add(now + delay, self.jid) return 'scheduled' @@ -505,15 +506,15 @@ function QlessJob:complete(now, worker, queue, data, ...) 'queue', '', 'expires', 0, 'remaining', tonumber(retries)) - + local count = Qless.config.get('jobs-history-count') local time = Qless.config.get('jobs-history') - + count = tonumber(count or 50000) time = tonumber(time or 7 * 24 * 60 * 60) - + redis.call('zadd', 'ql:completed', now, self.jid) - + local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) for index, jid in ipairs(jids) do local tags = cjson.decode( @@ -526,7 +527,7 @@ function QlessJob:complete(now, worker, queue, data, ...) redis.call('del', QlessJob.ns .. jid .. '-history') end redis.call('zremrangebyscore', 'ql:completed', 0, now - time) - + jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do local tags = cjson.decode( @@ -539,7 +540,7 @@ function QlessJob:complete(now, worker, queue, data, ...) redis.call('del', QlessJob.ns .. jid .. '-history') end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) - + for i, j in ipairs(redis.call( 'smembers', QlessJob.ns .. self.jid .. '-dependents')) do redis.call('srem', QlessJob.ns .. j .. '-dependencies', self.jid) @@ -561,9 +562,9 @@ function QlessJob:complete(now, worker, queue, data, ...) end end end - + redis.call('del', QlessJob.ns .. self.jid .. '-dependents') - + return 'complete' end end @@ -583,11 +584,12 @@ function QlessJob:fail(now, worker, group, message, data) 'hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker')) if not state then - error('Fail(): Job does not exist') + error('Fail(): Job ' .. self.jid .. 'does not exist') elseif state ~= 'running' then - error('Fail(): Job not currently running: ' .. state) + error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state) elseif worker ~= oldworker then - error('Fail(): Job running with another worker: ' .. oldworker) + error('Fail(): Job ' .. self.jid .. ' running with another worker: ' .. + oldworker) end Qless.publish('log', cjson.encode({ @@ -641,17 +643,19 @@ function QlessJob:retry(now, queue, worker, delay, group, message) assert(worker, 'Retry(): Arg "worker" missing') delay = assert(tonumber(delay or 0), 'Retry(): Arg "delay" not a number: ' .. tostring(delay)) - + local oldqueue, state, retries, oldworker, priority, failure = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'retries', 'worker', 'priority', 'failure')) if oldworker == false then - error('Retry(): Job does not exist') + error('Retry(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Retry(): Job is not currently running: ' .. state) + error('Retry(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif oldworker ~= worker then - error('Retry(): Job has been given to another worker: ' .. oldworker) + error('Retry(): Job ' .. self.jid .. + ' has been given to another worker: ' .. oldworker) end local remaining = tonumber(redis.call( @@ -665,7 +669,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) if remaining < 0 then local group = group or 'failed-retries-' .. queue self:history(now, 'failed', {['group'] = group}) - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed', 'worker', '', 'expires', '') @@ -688,7 +692,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) ['worker'] = unpack(self:data('worker')) })) end - + redis.call('sadd', 'ql:failures', group) redis.call('lpush', 'ql:f:' .. group, self.jid) local bin = now - (now % 86400) @@ -793,11 +797,14 @@ function QlessJob:heartbeat(now, worker, data) local job_worker, state = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state')) if job_worker == false then - error('Heartbeat(): Job does not exist') + error('Heartbeat(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Heartbeat(): Job not currently running: ' .. state) + error( + 'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state) elseif job_worker ~= worker or #job_worker == 0 then - error('Heartbeat(): Job given out to another worker: ' .. job_worker) + error( + 'Heartbeat(): Job ' .. self.jid .. + ' given out to another worker: ' .. job_worker) else if data then redis.call('hmset', QlessJob.ns .. self.jid, 'expires', @@ -806,9 +813,9 @@ function QlessJob:heartbeat(now, worker, data) redis.call('hmset', QlessJob.ns .. self.jid, 'expires', expires, 'worker', worker) end - + redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid) - + local queue = Qless.queue( redis.call('hget', QlessJob.ns .. self.jid, 'queue')) queue.locks.add(expires, self.jid) @@ -823,7 +830,7 @@ function QlessJob:priority(priority) local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue') - if queue == nil then + if queue == nil or queue == false then error('Priority(): Job ' .. self.jid .. ' does not exist') elseif queue == '' then redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority) @@ -850,8 +857,8 @@ end function QlessJob:timeout(now) local queue_name, state, worker = unpack(redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker')) - if queue_name == nil then - error('Timeout(): Job does not exist') + if queue_name == nil or queue_name == false then + error('Timeout(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then error('Timeout(): Job ' .. self.jid .. ' not running') else @@ -921,7 +928,7 @@ function QlessJob:history(now, what, item) if count > 0 then local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history') redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1) - if obj ~= nil then + if obj ~= nil and obj ~= false then redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj) end end @@ -1065,11 +1072,11 @@ function QlessQueue:stats(now, date) local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk')) - + count = tonumber(count) or 0 mean = tonumber(mean) or 0 vk = tonumber(vk) - + results.count = count or 0 results.mean = mean or 0 results.histogram = {} @@ -1174,17 +1181,17 @@ function QlessQueue:pop(now, worker, count) self:stat(now, 'wait', waiting) redis.call('hset', QlessJob.ns .. jid, 'time', string.format("%.20f", now)) - + redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid) - + job:update({ worker = worker, expires = expires, state = 'running' }) - + self.locks.add(expires, jid) - + local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false if tracked then Qless.publish('popped', jid) @@ -1224,7 +1231,7 @@ function QlessQueue:stat(now, stat, val) redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1) else -- days redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1) - end + end redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk) end @@ -1266,8 +1273,8 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) local original = redis.call( 'smembers', QlessJob.ns .. jid .. '-dependencies') - for _, dep in pairs(original) do - if new[dep] == nil then + for _, dep in pairs(original) do + if new[dep] == nil or new[dep] == false then redis.call('srem', QlessJob.ns .. dep .. '-dependents' , jid) redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep) end @@ -1421,7 +1428,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) if #arg % 2 == 1 then error('Odd number of additional args: ' .. tostring(arg)) end - + local options = {} for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end options.tags = assert(cjson.decode(options.tags or '{}'), @@ -1443,7 +1450,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) if old_queue then Qless.queue(old_queue).recurring.remove(jid) end - + redis.call('hmset', 'ql:r:' .. jid, 'jid' , jid, 'klass' , klass, @@ -1458,11 +1465,11 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) 'retries' , options.retries, 'backlog' , options.backlog) self.recurring.add(now + offset, jid) - + if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) end - + return jid else error('Recur(): schedule type "' .. tostring(spec) .. '" unknown') @@ -1493,18 +1500,18 @@ function QlessQueue:check_recurring(now, count) ) end end - + while (score <= now) and (moved < count) do local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1) moved = moved + 1 local child_jid = jid .. '-' .. count - + for i, tag in ipairs(_tags) do redis.call('zadd', 'ql:t:' .. tag, now, child_jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + redis.call('hmset', QlessJob.ns .. child_jid, 'jid' , child_jid, 'klass' , klass, @@ -1520,9 +1527,9 @@ function QlessQueue:check_recurring(now, count) 'time' , string.format("%.20f", score), 'spawned_from_jid', jid) Qless.job(child_jid):history(score, 'put', {q = self.name}) - + self.work.add(score, priority, child_jid) - + score = score + interval self.recurring.add(score, jid) end @@ -1587,12 +1594,12 @@ function QlessQueue:invalidate_locks(now, count) local remaining = tonumber(redis.call( 'hincrby', QlessJob.ns .. jid, 'remaining', -1)) - + if remaining < 0 then self.work.remove(jid) self.locks.remove(jid) self.scheduled.remove(jid) - + local group = 'failed-retries-' .. Qless.job(jid):data()['queue'] local job = Qless.job(jid) job:history(now, 'failed', {group = group}) @@ -1607,10 +1614,10 @@ function QlessQueue:invalidate_locks(now, count) ['when'] = now, ['worker'] = unpack(job:data('worker')) })) - + redis.call('sadd', 'ql:failures', group) redis.call('lpush', 'ql:f:' .. group, jid) - + if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('failed', jid) end @@ -1669,11 +1676,11 @@ function QlessRecurringJob:data() local job = redis.call( 'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue', 'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog') - + if not job[1] then return nil end - + return { jid = job[1], klass = job[2], @@ -1739,9 +1746,9 @@ function QlessRecurringJob:tag(...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - - for i=1,#arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end - + + for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end + tags = cjson.encode(tags) redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) return tags diff --git a/lib/qless/qless-core b/lib/qless/qless-core index 525c3900..36199bfc 160000 --- a/lib/qless/qless-core +++ b/lib/qless/qless-core @@ -1 +1 @@ -Subproject commit 525c39000dc71df53a3502491cb4daf0e1128f1d +Subproject commit 36199bfcabc3216b754bb75fa9912a1d48f0b1b4