-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add delay on entity re-ingestion on failed attempt #298
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,21 @@ def parse_options(argv) | |
opts[:queue_name] = queue | ||
end | ||
|
||
o.on('-i', '--ingestion_prefix PREFIX', | ||
'Prefix for keys used in counting the number of failed ingestion attempts') do |prefix| | ||
opts[:ingestion_prefix] = prefix | ||
end | ||
|
||
o.on('-x', '--ingestion_attempts NUMBER', Integer, | ||
'Max number of attempts to try ingesting an entity') do |ingestion_attempts| | ||
opts[:ingestion_attempts] = ingestion_attempts | ||
end | ||
|
||
o.on('-f', '--first_failed_wait NUMBER', Integer, | ||
'Time in seconds to wait after first failed deposit. Time will double every failed attempt') do |failed_wait| | ||
opts[:first_failed_wait] = failed_wait | ||
end | ||
|
||
o.separator '' | ||
o.separator 'Common options:' | ||
|
||
|
@@ -183,16 +198,10 @@ def rotate_logs | |
|
||
def run_preservation_cycle | ||
begin | ||
entity_json = queue.wait_next_item | ||
return unless entity_json | ||
|
||
# jupiter is submitting the entries to reddis in a hash format using fat arrows. We need to change them to colons | ||
# in order to parse them correctly from json | ||
entity = JSON.parse(entity_json.gsub('=>', ':'), { symbolize_names: true }) | ||
Comment on lines
-189
to
-191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this no longer an issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To my understanding, this was an old error where the items were being added to the queue in an incorrect manner that added the fat arrows. The new versions of jupiter and pmpy will be adding correct json. |
||
return unless entity[:type].present? && entity[:uuid].present? | ||
entity = queue.wait_next_item | ||
return unless entity && entity[:type].present? && entity[:uuid].present? | ||
rescue StandardError => e | ||
Rollbar.error(e) | ||
logger.error(e) | ||
log_exception(e) | ||
end | ||
|
||
# add additional information about the error context to errors that occur while processing this item. | ||
|
@@ -209,7 +218,11 @@ def run_preservation_cycle | |
# readding it to the queue as it will always fail | ||
rescue PushmiPullyu::AIP::EntityInvalid => e | ||
rescue StandardError => e | ||
queue.add_entity_json(entity_json) | ||
begin | ||
queue.add_entity_in_timeframe(entity) | ||
rescue MaxDepositAttemptsReached => e | ||
log_exception(e) | ||
end | ||
Comment on lines
+221
to
+225
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bit of code will log when an entity reaches the max number of attempts for ingestion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ConnorSheremeta do you think this would be enough to keep track of the failed items? It would mean going through the logs though, not sure if this is the best approach. Always open to ideas! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filtering through the logs could suffice although it may not be ideal, I suppose these failures could be logged to another file but that may not be worth doing as this information can be filtered as needed |
||
|
||
# rubocop:disable Lint/RescueException | ||
# Something other than a StandardError exception means something happened which we were not expecting! | ||
|
@@ -218,8 +231,7 @@ def run_preservation_cycle | |
raise e | ||
# rubocop:enable Lint/RescueException | ||
ensure | ||
Rollbar.error(e) | ||
logger.error(e) | ||
log_exception(e) | ||
end | ||
end | ||
|
||
|
@@ -294,4 +306,9 @@ def start_server_as_daemon | |
end | ||
end | ||
|
||
def log_exception(exception) | ||
Rollbar.error(exception) | ||
logger.error(exception) | ||
end | ||
|
||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
class PushmiPullyu::PreservationQueue | ||
|
||
class ConnectionError < StandardError; end | ||
class MaxDepositAttemptsReached < StandardError; end | ||
|
||
def initialize(redis_url: 'redis://localhost:6379', | ||
pool_opts: { size: 1, timeout: 5 }, | ||
|
@@ -50,7 +51,8 @@ def next_item | |
rd.multi do |tx| | ||
tx.zrem(@queue_name, element) # remove the top element transactionally | ||
end | ||
return element | ||
|
||
return JSON.parse(element, { symbolize_names: true }) | ||
else | ||
rd.unwatch # cancel the transaction since there was nothing in the queue | ||
return nil | ||
|
@@ -68,10 +70,29 @@ def wait_next_item | |
end | ||
end | ||
|
||
def add_entity_json(entity_json) | ||
def add_entity_in_timeframe(entity) | ||
entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}" | ||
|
||
@redis.with do |connection| | ||
connection.zadd @queue_name, Time.now.to_f, entity_json | ||
# separate information for priority information and queue | ||
deposit_attempt = connection.incr entity_attempts_key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen if we deploy this and there are items already in the queue which wouldn't have an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from https://devdocs.io/redis/incr
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct, the normal behaviour would continue |
||
|
||
if deposit_attempt <= PushmiPullyu.options[:ingestion_attempts] | ||
connection.zadd @queue_name, Time.now.to_f + self.class.extra_wait_time(deposit_attempt), | ||
entity.slice(:uuid, :type).to_json | ||
else | ||
connection.del entity_attempts_key | ||
raise MaxDepositAttemptsReached | ||
end | ||
end | ||
end | ||
|
||
def self.extra_wait_time(deposit_attempt) | ||
extra = 0 | ||
deposit_attempt.times do |n| | ||
extra += (2**n) * PushmiPullyu.options[:first_failed_wait] | ||
end | ||
extra | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't appear to match the stated specifications of: "doubles the time of each attempt" With a PushmiPullyu.options[:first_failed_wait] of 10 it appears that this is the added wait time of each attempt:
If doubling added wait on each attempt starting with first_failed_wait is what we're trying to accomplish it looks like this would give that:
The numbers vary slightly so this is a small nit in reality There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right! This code should work: (2**deposit_attempt) * PushmiPullyu.options[:first_failed_wait] Will make the changes |
||
|
||
protected | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
require 'spec_helper' | ||
require 'tempfile' | ||
require 'timecop' | ||
|
||
RSpec.describe PushmiPullyu::CLI do | ||
let(:cli) { PushmiPullyu::CLI.instance } | ||
|
@@ -213,14 +214,20 @@ | |
expect(PushmiPullyu.options[:queue_name]).to eq 'test:pmpy_queue' | ||
expect(PushmiPullyu.options[:swift][:auth_url]).to eq 'http://127.0.0.1:8080/auth/v1.0' | ||
expect(PushmiPullyu.options[:rollbar][:token]).to eq 'abc123xyz' | ||
expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prod:pmpy_ingest_attempt:' | ||
expect(PushmiPullyu.options[:ingestion_attempts]).to eq 15 | ||
expect(PushmiPullyu.options[:first_failed_wait]).to eq 10 | ||
end | ||
|
||
it 'still allows command line arguments to take precedence' do | ||
cli.parse(['start', | ||
'-C', 'spec/fixtures/config.yml', | ||
'--logdir', 'path/to/random', | ||
'--minimum-age', '5', | ||
'--piddir', 'path/to/piddir']) | ||
'--piddir', 'path/to/piddir', | ||
'--ingestion_prefix', 'prefix', | ||
'--ingestion_attempts', '20', | ||
'--first_failed_wait', '20']) | ||
|
||
expect(PushmiPullyu.options[:daemonize]).to be_truthy | ||
expect(PushmiPullyu.options[:config_file]).to eq 'spec/fixtures/config.yml' | ||
|
@@ -230,6 +237,9 @@ | |
expect(PushmiPullyu.options[:piddir]).to eq 'path/to/piddir' | ||
expect(PushmiPullyu.options[:process_name]).to eq 'test_pushmi_pullyu' | ||
expect(PushmiPullyu.options[:minimum_age]).to be 5.0 | ||
expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prefix' | ||
expect(PushmiPullyu.options[:ingestion_attempts]).to eq 20 | ||
expect(PushmiPullyu.options[:first_failed_wait]).to eq 20 | ||
end | ||
end | ||
|
||
|
@@ -273,11 +283,44 @@ | |
expect(old_options).to eq new_options | ||
end | ||
|
||
it 'makes sure an entities information is readded to reddis when deposit fails' do | ||
it 'delays the repeated attempts when deposits fail' do | ||
# Lets substract 10 seconds to avoid waiting for the item to be processed | ||
# expect((readded_entity_score.to_i - test_time).to_i).to eq 10 | ||
cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml']) | ||
redis = Redis.new | ||
redis.zadd(PushmiPullyu.options[:queue_name], 10, | ||
'{"uuid": "123e4567-e89b-12d3-a456-426614174000", "type": "items"}') | ||
entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' } | ||
|
||
start_time = Time.now - 10 | ||
attempt_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}" | ||
redis.zadd(PushmiPullyu.options[:queue_name], start_time.to_f, entity.to_json) | ||
deposit_attempt = 0 | ||
redis.set(attempt_key, deposit_attempt) | ||
Timecop.freeze do | ||
while deposit_attempt < PushmiPullyu.options[:ingestion_attempts] | ||
VCR.use_cassette('aip_download_and_swift_upload') do | ||
PushmiPullyu::Logging.logger.fatal! | ||
cli.send(:run_preservation_cycle) | ||
PushmiPullyu::Logging.initialize_logger | ||
time_now = Time.now.to_i | ||
_readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], | ||
0, 0, with_scores: true).first | ||
deposit_attempt = redis.get(attempt_key).to_i | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be nice to assert that |
||
extra_wait_time = PushmiPullyu::PreservationQueue.extra_wait_time(deposit_attempt) | ||
expect(readded_entity_score.to_i - time_now).to eq extra_wait_time | ||
# We dont want to wait for defined minimum age so we add it to the time travel shenanigans | ||
Timecop.travel(extra_wait_time + PushmiPullyu.options[:minimum_age]) | ||
end | ||
end | ||
end | ||
end | ||
|
||
it 'makes sure an entities information is readded to redis when deposit fails' do | ||
cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml']) | ||
redis = Redis.new | ||
entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' } | ||
|
||
redis.zadd(PushmiPullyu.options[:queue_name], 10, entity.to_json) | ||
redis.set("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}", 0) | ||
|
||
original_entity_information, original_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], | ||
0, 0, with_scores: true).first | ||
|
@@ -291,10 +334,12 @@ | |
PushmiPullyu::Logging.initialize_logger | ||
readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], | ||
0, 0, with_scores: true).first | ||
readded_attempt = redis.get("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}") | ||
expect(original_entity_information).to eq readded_entity | ||
expect(original_entity_score).not_to eq readded_entity_score | ||
expect(readded_attempt).to eq '1' | ||
end | ||
redis.del(PushmiPullyu.options[:queue_name]) | ||
redis.flushall | ||
end | ||
end | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,20 +8,24 @@ | |
|
||
before do | ||
PushmiPullyu.server_running = true | ||
redis.zadd 'test:pmpy_queue', 1, 'noid1' | ||
redis.zadd 'test:pmpy_queue', 3, 'noid3' | ||
redis.zadd 'test:pmpy_queue', 4, 'noid2' | ||
Comment on lines
-11
to
-13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These values where just used for this test. |
||
redis.zadd 'test:pmpy_queue', 10, 'noid1' | ||
redis.zadd 'test:pmpy_queue', 1, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 3, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c3","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 4, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c2","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 10, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
end | ||
|
||
after do | ||
redis.del 'test:pmpy_queue' | ||
end | ||
|
||
it 'retrieves 3 items in priority order' do | ||
expect(queue.wait_next_item).to eq 'noid3' | ||
expect(queue.wait_next_item).to eq 'noid2' | ||
expect(queue.wait_next_item).to eq 'noid1' | ||
next_item = queue.wait_next_item | ||
|
||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c3' | ||
next_item = queue.wait_next_item | ||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c2' | ||
next_item = queue.wait_next_item | ||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' | ||
end | ||
end | ||
|
||
|
@@ -32,7 +36,7 @@ | |
let!(:redis) { Redis.new } | ||
|
||
before do | ||
redis.zadd 'test:pmpy_queue', Time.now.to_f, 'noid1' | ||
redis.zadd 'test:pmpy_queue', Time.now.to_f, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
end | ||
|
||
after do | ||
|
@@ -50,7 +54,7 @@ | |
expect(queue.next_item).to be_nil | ||
|
||
Timecop.travel(now + 15.minutes) | ||
expect(queue.next_item).to eq 'noid1' | ||
expect(queue.next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth adding these new options to the README
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are there, do let me know if I can clarify it though.
https://github.com/ualbertalib/pushmi_pullyu/pull/298/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5