-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement](plugin) logstash: add retry queue without blocking tasks #44999
base: master
Are you sure you want to change the base?
Conversation
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
end | ||
|
||
def run | ||
@retry_queue << @event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is there any limit on length of retry_queue?
- Why not just add event to retry_queue but use a task and timer to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- No. Because there won't be too many failures.
- Because we need to use a timer to make the event wait for a certain amount of time before being queued and retried, rather than retrying immediately.
run buildall |
run buildall |
run buildall |
run buildall |
PR approved by at least one committer and no changes requested. |
# Run named Timer as daemon thread | ||
@timer = java.util.Timer.new("Doris Output #{self.params['id']}", true) | ||
# The queue in Timer is unbounded and uncontrollable, so use a new queue to control the amount | ||
@count_block_queue = java.util.concurrent.ArrayBlockingQueue.new(128) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should use LinkedBlockingQueue here since it's very offen to do insert and delete.
documents << event_body(event) << "\n" | ||
event_num += 1 | ||
end | ||
documents = events.map { |event| event_body(event) }.join("\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduce allocation times
begin | ||
response_json = JSON.parse(response.body) | ||
rescue => _ | ||
@logger.warn("doris stream load response: #{response} is not a valid JSON") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return or do something else instead of just go ahead.
|
||
@logger.warn("FAILED doris stream load response:\n#{response}") | ||
# if there are data quality issues, we do not retry | ||
if (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count > @max_retries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's no necessary to do special check for DATA_QUALITY_ERROR, since it's should be handled by max_filter_ration config.
req_count += 1 | ||
@logger.warn("Will do retry #{req_count} after #{sleep_for} secs.") | ||
timer_task = RetryTimerTask.new(@retry_queue, @count_block_queue, [documents, http_headers, event_num, req_count]) | ||
@count_block_queue.put(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why put 0 to count_block_queue?
run buildall |
req_count += 1 | ||
@logger.warn("Will do the #{req_count-1}th retry after #{sleep_for} secs.") | ||
delay_event = DelayEvent.new(sleep_for, [documents, http_headers, event_num, req_count]) | ||
add_event_to_retry_queue(delay_event, req_count <= 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why block just for req_count <= 1
?
@@ -72,6 +73,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base | |||
|
|||
config :log_progress_interval, :validate => :number, :default => 10 | |||
|
|||
config :retry_queue_size, :validate => :number, :default => 128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
128 may be too large if the request batch size is large, eg 100MB. So you should limit on queued bytes instead of items.
sleep(sleep_rand) | ||
@logger.warn("FAILED doris stream load response:\n#{response}") | ||
# if there are data quality issues, we do not retry | ||
if (status == 'Fail' && response_json['Message'].start_with?("[DATA_QUALITY_ERROR]")) || (@max_retries >= 0 && req_count-1 > @max_retries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DATA_QUALITY_ERROR should be processed by setting max_filter_ratio instead of hard code here.
begin | ||
response_json = JSON.parse(response.body) | ||
rescue => _ | ||
@logger.warn("doris stream load response is not a valid JSON:\n#{response}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do more exception handling instead of just log warning.
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)