From e26d0bb7fc69bbc26cbb250f076f31cb0dd1eb56 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 4 Dec 2024 18:49:12 +0800 Subject: [PATCH 1/7] fix: retry --- .../logstash/lib/logstash/outputs/doris.rb | 103 +++++++++++------- .../lib/logstash/util/retry_timer_task.rb | 31 ++++++ 2 files changed, 93 insertions(+), 41 deletions(-) create mode 100644 extension/logstash/lib/logstash/util/retry_timer_task.rb diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index 21d3ee6e752b08..f2dcfa7de03cd8 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -22,6 +22,7 @@ require "logstash/namespace" require "logstash/json" require 'logstash/util/formater' +require 'logstash/util/retry_timer_task' require "uri" require "securerandom" require "json" @@ -131,9 +132,24 @@ def register end end + # Run named Timer as daemon thread + @timer = java.util.Timer.new("Doris Output #{self.params['id']}", true) + + @retry_queue = Queue.new + retry_thread = Thread.new do + while popped = @retry_queue.pop + documents, http_headers, event_num, req_count = popped + handle_request(documents, http_headers, event_num, req_count) + end + end + print_plugin_info() end # def register + def close + @timer.cancel + end + def multi_receive(events) return if events.empty? send_events(events) @@ -157,50 +173,55 @@ def send_events(events) http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid) end - req_count = 0 - sleep_for = 1 - while true - response = make_request(documents, http_headers, @http_query, @http_hosts.sample) - - req_count += 1 - response_json = {} - begin - response_json = JSON.parse(response.body) - rescue => e - @logger.warn("doris stream load response: #{response} is not a valid JSON") - end + handle_request(documents, http_headers, event_num, 1) + end - status = response_json["Status"] + def sleep_for_attempt(attempt) + sleep_for = attempt**2 + sleep_for = sleep_for <= 60 ? sleep_for : 60 + (sleep_for/2) + (rand(0..sleep_for)/2) + end - if status == 'Label Already Exists' - @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records.") - break - end + private + def handle_request(documents, http_headers, event_num, req_count) + response = make_request(documents, http_headers, @http_query, @http_hosts.sample) + response_json = {} + begin + response_json = JSON.parse(response.body) + rescue => _ + @logger.warn("doris stream load response: #{response} is not a valid JSON") + end - if status == "Success" || status == "Publish Timeout" - @total_bytes.addAndGet(documents.size) - @total_rows.addAndGet(event_num) - break - else - @logger.warn("FAILED doris stream load response:\n#{response}") - - if @max_retries >= 0 && req_count > @max_retries - @logger.warn("DROP this batch after failed #{req_count} times.") - if @save_on_failure - @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") - save_to_disk(documents) - end - break - end - - # sleep and then retry - sleep_for = sleep_for * 2 - sleep_for = sleep_for <= 60 ? sleep_for : 60 - sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2) - @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand} secs.") - sleep(sleep_rand) + status = response_json["Status"] + + if status == 'Label Already Exists' + @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records.") + return + end + + if status == "Success" || status == "Publish Timeout" + @total_bytes.addAndGet(documents.size) + @total_rows.addAndGet(event_num) + return + end + + @logger.warn("FAILED doris stream load response:\n#{response}") + # if status is Fail, we do not retry + if status == 'Fail' || (@max_retries >= 0 && req_count > @max_retries) + @logger.warn("DROP this batch after failed #{req_count} times.") + if @save_on_failure + @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") + save_to_disk(documents) end + return end + + # add to retry_queue + sleep_for = sleep_for_attempt(req_count) + req_count += 1 + @logger.warn("Will do retry #{req_count} after #{sleep_for} secs.") + timer_task = RetryTimerTask.new(@retry_queue, [documents, http_headers, event_num, req_count]) + @timer.schedule(timer_task, sleep_for*1000) end private @@ -284,8 +305,8 @@ def save_to_disk(documents) end # This is split into a separate method mostly to help testing - def log_failure(message) - @logger.warn("[Doris Output Failure] #{message}") + def log_failure(message, data = {}) + @logger.warn("[Doris Output Failure] #{message}", data) end def make_request_headers() diff --git a/extension/logstash/lib/logstash/util/retry_timer_task.rb b/extension/logstash/lib/logstash/util/retry_timer_task.rb new file mode 100644 index 00000000000000..0d5f5eca6a2a5a --- /dev/null +++ b/extension/logstash/lib/logstash/util/retry_timer_task.rb @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require 'java' + +class RetryTimerTask < java.util.TimerTask + def initialize(retry_queue, event) + @retry_queue = retry_queue + # event style: [documents, http_headers, event_num, req_count] + @event = event + super() + end + + def run + @retry_queue << @event + end +end From 6ffacbd66b1fbca8b3819c6d2dd56e882218544b Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 11 Dec 2024 01:46:49 +0800 Subject: [PATCH 2/7] feat: add retry queue size --- extension/logstash/lib/logstash/outputs/doris.rb | 5 ++++- extension/logstash/lib/logstash/util/retry_timer_task.rb | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index f2dcfa7de03cd8..ed8f1aacff0174 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -134,6 +134,8 @@ def register # Run named Timer as daemon thread @timer = java.util.Timer.new("Doris Output #{self.params['id']}", true) + # The queue in Timer is unbounded and uncontrollable, so use a new queue to control the amount + @count_block_queue = java.util.concurrent.ArrayBlockingQueue.new(128) @retry_queue = Queue.new retry_thread = Thread.new do @@ -220,7 +222,8 @@ def handle_request(documents, http_headers, event_num, req_count) sleep_for = sleep_for_attempt(req_count) req_count += 1 @logger.warn("Will do retry #{req_count} after #{sleep_for} secs.") - timer_task = RetryTimerTask.new(@retry_queue, [documents, http_headers, event_num, req_count]) + timer_task = RetryTimerTask.new(@retry_queue, @count_block_queue, [documents, http_headers, event_num, req_count]) + @count_block_queue.add(0) @timer.schedule(timer_task, sleep_for*1000) end diff --git a/extension/logstash/lib/logstash/util/retry_timer_task.rb b/extension/logstash/lib/logstash/util/retry_timer_task.rb index 0d5f5eca6a2a5a..c686a27eb84eab 100644 --- a/extension/logstash/lib/logstash/util/retry_timer_task.rb +++ b/extension/logstash/lib/logstash/util/retry_timer_task.rb @@ -18,8 +18,9 @@ require 'java' class RetryTimerTask < java.util.TimerTask - def initialize(retry_queue, event) + def initialize(retry_queue, count_block_queue, event) @retry_queue = retry_queue + @count_block_queue = count_block_queue # event style: [documents, http_headers, event_num, req_count] @event = event super() @@ -27,5 +28,6 @@ def initialize(retry_queue, event) def run @retry_queue << @event + @count_block_queue.pull end end From 50400bce2653af68ec7987d2908f45bbbb1ec649 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 11 Dec 2024 02:20:15 +0800 Subject: [PATCH 3/7] fix: queue --- extension/logstash/lib/logstash/outputs/doris.rb | 2 +- extension/logstash/lib/logstash/util/retry_timer_task.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index ed8f1aacff0174..20f3af83eb15d0 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -223,7 +223,7 @@ def handle_request(documents, http_headers, event_num, req_count) req_count += 1 @logger.warn("Will do retry #{req_count} after #{sleep_for} secs.") timer_task = RetryTimerTask.new(@retry_queue, @count_block_queue, [documents, http_headers, event_num, req_count]) - @count_block_queue.add(0) + @count_block_queue.put(0) @timer.schedule(timer_task, sleep_for*1000) end diff --git a/extension/logstash/lib/logstash/util/retry_timer_task.rb b/extension/logstash/lib/logstash/util/retry_timer_task.rb index c686a27eb84eab..30c453a3e83bf6 100644 --- a/extension/logstash/lib/logstash/util/retry_timer_task.rb +++ b/extension/logstash/lib/logstash/util/retry_timer_task.rb @@ -28,6 +28,6 @@ def initialize(retry_queue, count_block_queue, event) def run @retry_queue << @event - @count_block_queue.pull + @count_block_queue.poll end end From 26a8726022cb383506a4571cfbeb07c3b213a801 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Tue, 17 Dec 2024 10:32:31 +0800 Subject: [PATCH 4/7] fix: retry --- extension/logstash/lib/logstash/outputs/doris.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index 20f3af83eb15d0..c2a4aad64bd186 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -208,8 +208,8 @@ def handle_request(documents, http_headers, event_num, req_count) end @logger.warn("FAILED doris stream load response:\n#{response}") - # if status is Fail, we do not retry - if status == 'Fail' || (@max_retries >= 0 && req_count > @max_retries) + # if there are data quality issues, we do not retry + if (status == 'Fail' && response_json['ErrorURL'] != nil) || (@max_retries >= 0 && req_count > @max_retries) @logger.warn("DROP this batch after failed #{req_count} times.") if @save_on_failure @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") From f6b023e0ed46524a39ca64b24879af211d24db1c Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Tue, 17 Dec 2024 17:40:51 +0800 Subject: [PATCH 5/7] refactor: avoid multiple allocations --- extension/logstash/lib/logstash/outputs/doris.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index c2a4aad64bd186..ebcdc7e7386dc6 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -159,12 +159,8 @@ def multi_receive(events) private def send_events(events) - documents = "" - event_num = 0 - events.each do |event| - documents << event_body(event) << "\n" - event_num += 1 - end + documents = events.map { |event| event_body(event) }.join("\n") + event_num = events.size # @logger.info("get event num: #{event_num}") @logger.debug("get documents: #{documents}") From 211dae0ec8f83ffd79a06f61934bb94fabee3712 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Mon, 23 Dec 2024 21:04:47 +0800 Subject: [PATCH 6/7] refactor: retry condition --- extension/logstash/lib/logstash/outputs/doris.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index ebcdc7e7386dc6..e8079d9806ca52 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -205,7 +205,7 @@ def handle_request(documents, http_headers, event_num, req_count) @logger.warn("FAILED doris stream load response:\n#{response}") # if there are data quality issues, we do not retry - if (status == 'Fail' && response_json['ErrorURL'] != nil) || (@max_retries >= 0 && req_count > @max_retries) + if (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count > @max_retries) @logger.warn("DROP this batch after failed #{req_count} times.") if @save_on_failure @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") From 8829f24d32232287763f4df31255e0ad2ae0abbb Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Sat, 28 Dec 2024 01:24:32 +0800 Subject: [PATCH 7/7] reactor: retry and log --- .../logstash/lib/logstash/outputs/doris.rb | 49 ++++++++++--------- .../{retry_timer_task.rb => delay_event.rb} | 31 ++++++++---- 2 files changed, 46 insertions(+), 34 deletions(-) rename extension/logstash/lib/logstash/util/{retry_timer_task.rb => delay_event.rb} (57%) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index e8079d9806ca52..eae1a367f3dffa 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -22,7 +22,7 @@ require "logstash/namespace" require "logstash/json" require 'logstash/util/formater' -require 'logstash/util/retry_timer_task' +require 'logstash/util/delay_event' require "uri" require "securerandom" require "json" @@ -44,7 +44,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :db, :validate => :string, :required => true # the table which data is loaded to config :table, :validate => :string, :required => true - # label prefix of a stream load requst. + # label prefix of a stream load request. config :label_prefix, :validate => :string, :default => "logstash" # user name config :user, :validate => :string, :required => true @@ -73,6 +73,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :log_progress_interval, :validate => :number, :default => 10 + config :retry_queue_size, :validate => :number, :default => 128 def print_plugin_info() @plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-doris/ } @@ -132,15 +133,10 @@ def register end end - # Run named Timer as daemon thread - @timer = java.util.Timer.new("Doris Output #{self.params['id']}", true) - # The queue in Timer is unbounded and uncontrollable, so use a new queue to control the amount - @count_block_queue = java.util.concurrent.ArrayBlockingQueue.new(128) - - @retry_queue = Queue.new + @retry_queue = java.util.concurrent.DelayQueue.new retry_thread = Thread.new do - while popped = @retry_queue.pop - documents, http_headers, event_num, req_count = popped + while popped = @retry_queue.take + documents, http_headers, event_num, req_count = popped.event handle_request(documents, http_headers, event_num, req_count) end end @@ -148,8 +144,14 @@ def register print_plugin_info() end # def register - def close - @timer.cancel + private + def add_event_to_retry_queue(delay_event, block) + if block + while @retry_queue.size >= @retry_queue_size + sleep(1) + end + end + @retry_queue.add(delay_event) end def multi_receive(events) @@ -187,25 +189,29 @@ def handle_request(documents, http_headers, event_num, req_count) begin response_json = JSON.parse(response.body) rescue => _ - @logger.warn("doris stream load response: #{response} is not a valid JSON") + @logger.warn("doris stream load response is not a valid JSON:\n#{response}") end status = response_json["Status"] if status == 'Label Already Exists' - @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records.") + @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records:\n#{response}") return end if status == "Success" || status == "Publish Timeout" @total_bytes.addAndGet(documents.size) @total_rows.addAndGet(event_num) + if @log_request or @logger.debug? + @logger.info("doris stream load response:\n#{response}") + end return end @logger.warn("FAILED doris stream load response:\n#{response}") # if there are data quality issues, we do not retry - if (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count > @max_retries) + if (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count-1 > @max_retries) + # if @max_retries >= 0 && req_count-1 > @max_retries @logger.warn("DROP this batch after failed #{req_count} times.") if @save_on_failure @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") @@ -217,10 +223,9 @@ def handle_request(documents, http_headers, event_num, req_count) # add to retry_queue sleep_for = sleep_for_attempt(req_count) req_count += 1 - @logger.warn("Will do retry #{req_count} after #{sleep_for} secs.") - timer_task = RetryTimerTask.new(@retry_queue, @count_block_queue, [documents, http_headers, event_num, req_count]) - @count_block_queue.put(0) - @timer.schedule(timer_task, sleep_for*1000) + @logger.warn("Will do the #{req_count-1}th retry after #{sleep_for} secs.") + delay_event = DelayEvent.new(sleep_for, [documents, http_headers, event_num, req_count]) + add_event_to_retry_queue(delay_event, req_count <= 1) end private @@ -247,11 +252,7 @@ def make_request(documents, http_headers, query, host) log_failure("doris stream load request error: #{e}") end - if @log_request or @logger.debug? - @logger.info("doris stream load response:\n#{response}") - end - - return response + response end # def make_request # Format the HTTP body diff --git a/extension/logstash/lib/logstash/util/retry_timer_task.rb b/extension/logstash/lib/logstash/util/delay_event.rb similarity index 57% rename from extension/logstash/lib/logstash/util/retry_timer_task.rb rename to extension/logstash/lib/logstash/util/delay_event.rb index 30c453a3e83bf6..3cb0e51ffccd03 100644 --- a/extension/logstash/lib/logstash/util/retry_timer_task.rb +++ b/extension/logstash/lib/logstash/util/delay_event.rb @@ -17,17 +17,28 @@ require 'java' -class RetryTimerTask < java.util.TimerTask - def initialize(retry_queue, count_block_queue, event) - @retry_queue = retry_queue - @count_block_queue = count_block_queue - # event style: [documents, http_headers, event_num, req_count] - @event = event - super() +class DelayEvent + include java.util.concurrent.Delayed + + def initialize(delay, event) + @start_time = Time.now.to_i + delay + @event = event # event style: [documents, http_headers, event_num, req_count] + end + + def get_delay(unit) + delay = @start_time - Time.now.to_i + unit.convert(delay, java.util.concurrent.TimeUnit::SECONDS) + end + + def compare_to(other) + d = self.get_delay(java.util.concurrent.TimeUnit::SECONDS) - other.get_delay(java.util.concurrent.TimeUnit::SECONDS) + if d == 0 + 0 + end + d < 0 ? -1 : 1 end - def run - @retry_queue << @event - @count_block_queue.poll + def event + @event end end