diff --git a/CHANGELOG.md b/CHANGELOG.md index e06511d..3b0e1f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and releases in PushmiPullyu adheres to [Semantic Versioning](https://semver.org - Add rescue block to catch exceptions while waiting for next item [#280](https://github.com/ualbertalib/pushmi_pullyu/issues/280) - Add logic to fetch new community and collection information from jupiter and create their AIPS. [#255](https://github.com/ualbertalib/pushmi_pullyu/issues/255) +- Add delay to re-ingestion attempts to allow for problems to be fixed [#297](https://github.com/ualbertalib/pushmi_pullyu/issues/297) - Bump git from 1.9.1 to 1.13.0 ## [2.0.4] - 2022-11-22 diff --git a/README.md b/README.md index 56c5b9d..115c9ba 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,10 @@ Specific options: -W, --workdir PATH Path for directory where AIP creation work takes place in -N, --process_name NAME Name of the application process -m, --monitor Start monitor process for a deamon - -q, --queue NAME Name of the queue to read from + -q, --queue NAME Name of the queue to read from + -i, --ingestion_prefix PREFIX Prefix for keys used in counting the number of failed ingestion attempts + -x, --ingestion_attempts NUMBER Max number of attempts to try ingesting an entity + -f, --first_failed_wait NUMBER Time in seconds to wait after first failed entity deposit. This time will double every failed attempt Common options: -v, --version Show version diff --git a/examples/pushmi_pullyu.yml b/examples/pushmi_pullyu.yml index 3a07a35..3c6bc13 100644 --- a/examples/pushmi_pullyu.yml +++ b/examples/pushmi_pullyu.yml @@ -16,6 +16,9 @@ piddir: tmp/pids workdir: tmp/work process_name: pushmi_pullyu queue_name: dev:pmpy_queue +ingestion_prefix: "'prod:pmpy_ingest_attempt:'" +ingestion_attempts: 15 +first_failed_wait: 10 minimum_age: 0 redis: diff --git a/lib/pushmi_pullyu.rb b/lib/pushmi_pullyu.rb index 83a923d..c7de3f3 100644 --- a/lib/pushmi_pullyu.rb +++ b/lib/pushmi_pullyu.rb @@ -26,6 +26,9 @@ module PushmiPullyu workdir: 'tmp/work', process_name: 'pushmi_pullyu', queue_name: 'dev:pmpy_queue', + ingestion_prefix: 'prod:pmpy_ingest_attempt:', + ingestion_attempts: 15, + first_failed_wait: 10, redis: { url: 'redis://localhost:6379' }, diff --git a/lib/pushmi_pullyu/cli.rb b/lib/pushmi_pullyu/cli.rb index 5a62989..927d982 100644 --- a/lib/pushmi_pullyu/cli.rb +++ b/lib/pushmi_pullyu/cli.rb @@ -148,6 +148,21 @@ def parse_options(argv) opts[:queue_name] = queue end + o.on('-i', '--ingestion_prefix PREFIX', + 'Prefix for keys used in counting the number of failed ingestion attempts') do |prefix| + opts[:ingestion_prefix] = prefix + end + + o.on('-x', '--ingestion_attempts NUMBER', Integer, + 'Max number of attempts to try ingesting an entity') do |ingestion_attempts| + opts[:ingestion_attempts] = ingestion_attempts + end + + o.on('-f', '--first_failed_wait NUMBER', Integer, + 'Time in seconds to wait after first failed deposit. Time will double every failed attempt') do |failed_wait| + opts[:first_failed_wait] = failed_wait + end + o.separator '' o.separator 'Common options:' @@ -183,16 +198,10 @@ def rotate_logs def run_preservation_cycle begin - entity_json = queue.wait_next_item - return unless entity_json - - # jupiter is submitting the entries to reddis in a hash format using fat arrows. We need to change them to colons - # in order to parse them correctly from json - entity = JSON.parse(entity_json.gsub('=>', ':'), { symbolize_names: true }) - return unless entity[:type].present? && entity[:uuid].present? + entity = queue.wait_next_item + return unless entity && entity[:type].present? && entity[:uuid].present? rescue StandardError => e - Rollbar.error(e) - logger.error(e) + log_exception(e) end # add additional information about the error context to errors that occur while processing this item. @@ -209,7 +218,11 @@ def run_preservation_cycle # readding it to the queue as it will always fail rescue PushmiPullyu::AIP::EntityInvalid => e rescue StandardError => e - queue.add_entity_json(entity_json) + begin + queue.add_entity_in_timeframe(entity) + rescue MaxDepositAttemptsReached => e + log_exception(e) + end # rubocop:disable Lint/RescueException # Something other than a StandardError exception means something happened which we were not expecting! @@ -218,8 +231,7 @@ def run_preservation_cycle raise e # rubocop:enable Lint/RescueException ensure - Rollbar.error(e) - logger.error(e) + log_exception(e) end end @@ -294,4 +306,9 @@ def start_server_as_daemon end end + def log_exception(exception) + Rollbar.error(exception) + logger.error(exception) + end + end diff --git a/lib/pushmi_pullyu/preservation_queue.rb b/lib/pushmi_pullyu/preservation_queue.rb index 236c160..5ecd679 100644 --- a/lib/pushmi_pullyu/preservation_queue.rb +++ b/lib/pushmi_pullyu/preservation_queue.rb @@ -20,6 +20,7 @@ class PushmiPullyu::PreservationQueue class ConnectionError < StandardError; end + class MaxDepositAttemptsReached < StandardError; end def initialize(redis_url: 'redis://localhost:6379', pool_opts: { size: 1, timeout: 5 }, @@ -50,7 +51,8 @@ def next_item rd.multi do |tx| tx.zrem(@queue_name, element) # remove the top element transactionally end - return element + + return JSON.parse(element, { symbolize_names: true }) else rd.unwatch # cancel the transaction since there was nothing in the queue return nil @@ -68,12 +70,27 @@ def wait_next_item end end - def add_entity_json(entity_json) + def add_entity_in_timeframe(entity) + entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}" + @redis.with do |connection| - connection.zadd @queue_name, Time.now.to_f, entity_json + # separate information for priority information and queue + deposit_attempt = connection.incr entity_attempts_key + + if deposit_attempt <= PushmiPullyu.options[:ingestion_attempts] + connection.zadd @queue_name, Time.now.to_f + self.class.extra_wait_time(deposit_attempt), + entity.slice(:uuid, :type).to_json + else + connection.del entity_attempts_key + raise MaxDepositAttemptsReached + end end end + def self.extra_wait_time(deposit_attempt) + (2**deposit_attempt) * PushmiPullyu.options[:first_failed_wait] + end + protected def connected? diff --git a/spec/fixtures/config.yml b/spec/fixtures/config.yml index 31d986d..a3811f0 100644 --- a/spec/fixtures/config.yml +++ b/spec/fixtures/config.yml @@ -7,6 +7,9 @@ monitor: false piddir: tmp/spec/pids workdir: tmp/spec/work process_name: test_pushmi_pullyu +ingestion_prefix: 'prod:pmpy_ingest_attempt:' +ingestion_attempts: 15 +first_failed_wait: 10 minimum_age: 1 queue_name: test:pmpy_queue rollbar: diff --git a/spec/integration/acceptance_spec.rb b/spec/integration/acceptance_spec.rb index 5786428..0d823b2 100644 --- a/spec/integration/acceptance_spec.rb +++ b/spec/integration/acceptance_spec.rb @@ -39,11 +39,7 @@ cli = PushmiPullyu::CLI.instance cli.parse(['-C', 'spec/fixtures/config.yml', '-W', workdir]) - entity_json = JSON.parse(cli.send(:queue).wait_next_item) - entity = { - type: entity_json['type'], - uuid: entity_json['uuid'] - } + entity = cli.send(:queue).wait_next_item expect(entity[:uuid]).to eq uuid diff --git a/spec/pushmi_pullyu/cli_spec.rb b/spec/pushmi_pullyu/cli_spec.rb index 24e70e8..190905e 100644 --- a/spec/pushmi_pullyu/cli_spec.rb +++ b/spec/pushmi_pullyu/cli_spec.rb @@ -1,5 +1,6 @@ require 'spec_helper' require 'tempfile' +require 'timecop' RSpec.describe PushmiPullyu::CLI do let(:cli) { PushmiPullyu::CLI.instance } @@ -213,6 +214,9 @@ expect(PushmiPullyu.options[:queue_name]).to eq 'test:pmpy_queue' expect(PushmiPullyu.options[:swift][:auth_url]).to eq 'http://127.0.0.1:8080/auth/v1.0' expect(PushmiPullyu.options[:rollbar][:token]).to eq 'abc123xyz' + expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prod:pmpy_ingest_attempt:' + expect(PushmiPullyu.options[:ingestion_attempts]).to eq 15 + expect(PushmiPullyu.options[:first_failed_wait]).to eq 10 end it 'still allows command line arguments to take precedence' do @@ -220,7 +224,10 @@ '-C', 'spec/fixtures/config.yml', '--logdir', 'path/to/random', '--minimum-age', '5', - '--piddir', 'path/to/piddir']) + '--piddir', 'path/to/piddir', + '--ingestion_prefix', 'prefix', + '--ingestion_attempts', '20', + '--first_failed_wait', '20']) expect(PushmiPullyu.options[:daemonize]).to be_truthy expect(PushmiPullyu.options[:config_file]).to eq 'spec/fixtures/config.yml' @@ -230,6 +237,9 @@ expect(PushmiPullyu.options[:piddir]).to eq 'path/to/piddir' expect(PushmiPullyu.options[:process_name]).to eq 'test_pushmi_pullyu' expect(PushmiPullyu.options[:minimum_age]).to be 5.0 + expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prefix' + expect(PushmiPullyu.options[:ingestion_attempts]).to eq 20 + expect(PushmiPullyu.options[:first_failed_wait]).to eq 20 end end @@ -273,11 +283,47 @@ expect(old_options).to eq new_options end - it 'makes sure an entities information is readded to reddis when deposit fails' do + it 'delays the repeated attempts when deposits fail' do + # Lets substract 10 seconds to avoid waiting for the item to be processed + # expect((readded_entity_score.to_i - test_time).to_i).to eq 10 cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml']) redis = Redis.new - redis.zadd(PushmiPullyu.options[:queue_name], 10, - '{"uuid": "123e4567-e89b-12d3-a456-426614174000", "type": "items"}') + entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' } + + start_time = Time.now - 10 + attempt_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}" + redis.zadd(PushmiPullyu.options[:queue_name], start_time.to_f, entity.to_json) + deposit_attempt = 0 + redis.set(attempt_key, deposit_attempt) + Timecop.freeze do + while deposit_attempt < PushmiPullyu.options[:ingestion_attempts] + VCR.use_cassette('aip_download_and_swift_upload') do + PushmiPullyu::Logging.logger.fatal! + cli.send(:run_preservation_cycle) + PushmiPullyu::Logging.initialize_logger + time_now = Time.now.to_i + _readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], + 0, 0, with_scores: true).first + new_deposit_attempt = redis.get(attempt_key).to_i + extra_wait_time = PushmiPullyu::PreservationQueue.extra_wait_time(new_deposit_attempt) + # Make sure the deposits attempts are increasing by 1 + expect(new_deposit_attempt).to eq deposit_attempt + 1 + expect(readded_entity_score.to_i - time_now).to eq extra_wait_time + # We dont want to wait for defined minimum age so we add it to the time travel shenanigans + deposit_attempt = new_deposit_attempt + Timecop.travel(extra_wait_time + PushmiPullyu.options[:minimum_age]) + end + end + end + end + + it 'makes sure an entities information is readded to redis when deposit fails' do + cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml']) + redis = Redis.new + entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' } + + redis.zadd(PushmiPullyu.options[:queue_name], 10, entity.to_json) + redis.set("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}", 0) original_entity_information, original_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], 0, 0, with_scores: true).first @@ -291,10 +337,12 @@ PushmiPullyu::Logging.initialize_logger readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name], 0, 0, with_scores: true).first + readded_attempt = redis.get("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}") expect(original_entity_information).to eq readded_entity expect(original_entity_score).not_to eq readded_entity_score + expect(readded_attempt).to eq '1' end - redis.del(PushmiPullyu.options[:queue_name]) + redis.flushall end end end diff --git a/spec/pushmi_pullyu/preservation_queue_spec.rb b/spec/pushmi_pullyu/preservation_queue_spec.rb index 03b437e..ce399f0 100644 --- a/spec/pushmi_pullyu/preservation_queue_spec.rb +++ b/spec/pushmi_pullyu/preservation_queue_spec.rb @@ -8,10 +8,10 @@ before do PushmiPullyu.server_running = true - redis.zadd 'test:pmpy_queue', 1, 'noid1' - redis.zadd 'test:pmpy_queue', 3, 'noid3' - redis.zadd 'test:pmpy_queue', 4, 'noid2' - redis.zadd 'test:pmpy_queue', 10, 'noid1' + redis.zadd 'test:pmpy_queue', 1, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' + redis.zadd 'test:pmpy_queue', 3, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c3","type":"items"}' + redis.zadd 'test:pmpy_queue', 4, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c2","type":"items"}' + redis.zadd 'test:pmpy_queue', 10, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' end after do @@ -19,9 +19,13 @@ end it 'retrieves 3 items in priority order' do - expect(queue.wait_next_item).to eq 'noid3' - expect(queue.wait_next_item).to eq 'noid2' - expect(queue.wait_next_item).to eq 'noid1' + next_item = queue.wait_next_item + + expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c3' + next_item = queue.wait_next_item + expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c2' + next_item = queue.wait_next_item + expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' end end @@ -32,7 +36,7 @@ let!(:redis) { Redis.new } before do - redis.zadd 'test:pmpy_queue', Time.now.to_f, 'noid1' + redis.zadd 'test:pmpy_queue', Time.now.to_f, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' end after do @@ -50,7 +54,7 @@ expect(queue.next_item).to be_nil Timecop.travel(now + 15.minutes) - expect(queue.next_item).to eq 'noid1' + expect(queue.next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' end end end