diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index 21d3ee6e752b08..bafafc40969f6b 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -22,6 +22,7 @@ require "logstash/namespace" require "logstash/json" require 'logstash/util/formater' +require 'logstash/util/delay_event' require "uri" require "securerandom" require "json" @@ -43,7 +44,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :db, :validate => :string, :required => true # the table which data is loaded to config :table, :validate => :string, :required => true - # label prefix of a stream load requst. + # label prefix of a stream load request. config :label_prefix, :validate => :string, :default => "logstash" # user name config :user, :validate => :string, :required => true @@ -72,6 +73,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :log_progress_interval, :validate => :number, :default => 10 + # max retry queue size in MB, default is the half max memory of JVM + config :max_retry_queue_size, :validate => :number, :default => java.lang.Runtime.get_runtime.max_memory / 1024 / 1024 / 2 def print_plugin_info() @plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-doris/ } @@ -131,9 +134,36 @@ def register end end + if @max_retry_queue_size <= 0 + @max_retry_queue_size = java.lang.Runtime.get_runtime.max_memory / 1024 / 1024 / 2 + end + @logger.info("max retry queue size: #{@max_retry_queue_size}MB") + + @retry_queue = java.util.concurrent.DelayQueue.new + # retry queue size in bytes + @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0) + retry_thread = Thread.new do + while popped = @retry_queue.take + documents, http_headers, event_num, req_count = popped.event + handle_request(documents, http_headers, event_num, req_count) + end + end + print_plugin_info() end # def register + private + def add_event_to_retry_queue(delay_event) + event_size = delay_event.documents.size + if delay_event.first_retry + while @retry_queue_bytes.get + event_size > @max_retry_queue_size * 1024 * 1024 + sleep(1) + end + @retry_queue_bytes.addAndGet(event_size) + end + @retry_queue.add(delay_event) + end + def multi_receive(events) return if events.empty? send_events(events) @@ -141,12 +171,8 @@ def multi_receive(events) private def send_events(events) - documents = "" - event_num = 0 - events.each do |event| - documents << event_body(event) << "\n" - event_num += 1 - end + documents = events.map { |event| event_body(event) }.join("\n") + event_num = events.size # @logger.info("get event num: #{event_num}") @logger.debug("get documents: #{documents}") @@ -157,50 +183,66 @@ def send_events(events) http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid) end - req_count = 0 - sleep_for = 1 - while true - response = make_request(documents, http_headers, @http_query, @http_hosts.sample) - - req_count += 1 - response_json = {} - begin - response_json = JSON.parse(response.body) - rescue => e - @logger.warn("doris stream load response: #{response} is not a valid JSON") - end + handle_request(documents, http_headers, event_num, 1) + end + + def sleep_for_attempt(attempt) + sleep_for = attempt**2 + sleep_for = sleep_for <= 60 ? sleep_for : 60 + (sleep_for/2) + (rand(0..sleep_for)/2) + end + + private + def handle_request(documents, http_headers, event_num, req_count) + response = make_request(documents, http_headers, @http_query, @http_hosts.sample) + response_json = {} + begin + response_json = JSON.parse(response.body) + rescue => _ + @logger.warn("doris stream load response is not a valid JSON:\n#{response}") + end + + status = response_json["Status"] + + need_retry = true - status = response_json["Status"] + if status == 'Label Already Exists' + @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records:\n#{response}") + need_retry = false - if status == 'Label Already Exists' - @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records.") - break + elsif status == "Success" || status == "Publish Timeout" + @total_bytes.addAndGet(documents.size) + @total_rows.addAndGet(event_num) + if @log_request or @logger.debug? + @logger.info("doris stream load response:\n#{response}") end + need_retry = false + + # if there are data quality issues, we do not retry + elsif (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count-1 > @max_retries) + # elsif @max_retries >= 0 && req_count - 1 > @max_retries + @logger.warn("FAILED doris stream load response:\n#{response}") + @logger.warn("DROP this batch after failed #{req_count} times.") + if @save_on_failure + @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") + save_to_disk(documents) + end + need_retry = false + end - if status == "Success" || status == "Publish Timeout" - @total_bytes.addAndGet(documents.size) - @total_rows.addAndGet(event_num) - break - else - @logger.warn("FAILED doris stream load response:\n#{response}") - - if @max_retries >= 0 && req_count > @max_retries - @logger.warn("DROP this batch after failed #{req_count} times.") - if @save_on_failure - @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") - save_to_disk(documents) - end - break - end - - # sleep and then retry - sleep_for = sleep_for * 2 - sleep_for = sleep_for <= 60 ? sleep_for : 60 - sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2) - @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand} secs.") - sleep(sleep_rand) + if !need_retry + if req_count > 1 + @retry_queue_bytes.addAndGet(-documents.size) end + return end + + # add to retry_queue + sleep_for = sleep_for_attempt(req_count) + @logger.warn("FAILED doris stream load response:\n#{response}") + @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.") + delay_event = DelayEvent.new(sleep_for, [documents, http_headers, event_num, req_count+1]) + add_event_to_retry_queue(delay_event) end private @@ -227,11 +269,7 @@ def make_request(documents, http_headers, query, host) log_failure("doris stream load request error: #{e}") end - if @log_request or @logger.debug? - @logger.info("doris stream load response:\n#{response}") - end - - return response + response end # def make_request # Format the HTTP body @@ -284,8 +322,8 @@ def save_to_disk(documents) end # This is split into a separate method mostly to help testing - def log_failure(message) - @logger.warn("[Doris Output Failure] #{message}") + def log_failure(message, data = {}) + @logger.warn("[Doris Output Failure] #{message}", data) end def make_request_headers() diff --git a/extension/logstash/lib/logstash/util/delay_event.rb b/extension/logstash/lib/logstash/util/delay_event.rb new file mode 100644 index 00000000000000..18b481bebe6ad3 --- /dev/null +++ b/extension/logstash/lib/logstash/util/delay_event.rb @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require 'java' + +class DelayEvent + include java.util.concurrent.Delayed + + def initialize(delay, event) + @start_time = Time.now.to_i + delay + @event = event # event style: [documents, http_headers, event_num, req_count] + end + + def get_delay(unit) + delay = @start_time - Time.now.to_i + unit.convert(delay, java.util.concurrent.TimeUnit::SECONDS) + end + +def compare_to(other) + d = self.get_delay(java.util.concurrent.TimeUnit::SECONDS) - other.get_delay(java.util.concurrent.TimeUnit::SECONDS) + return 0 if d == 0 + d < 0 ? -1 : 1 +end + end + + def event + @event + end + + def documents + @event[0] + end + + def first_retry + @event[3] == 2 + end +end