Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Async #288

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/semantic_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require "semantic_logger/appender"
require "semantic_logger/appenders"
require "semantic_logger/processor"
require "semantic_logger/queue_processor"
require "semantic_logger/sync_processor"
require "semantic_logger/logger"
require "semantic_logger/debug_as_trace_logger"
Expand Down
49 changes: 23 additions & 26 deletions lib/semantic_logger/appender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,29 @@ module Appender
# @formatter:on

# Returns [SemanticLogger::Subscriber] appender for the supplied options
def self.factory(async: false, batch: nil,
max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30,
batch_size: 300, batch_seconds: 5,
def self.factory(async: false,
max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, async_max_retries: 100,
batch: nil, batch_size: 300, batch_seconds: 5,
**args,
&block)
appender = build(**args, &block)

# If appender implements #batch, then it should use the batch proxy by default.
# If appender implements #batch, then it should use the async batch processor by default.
batch = true if batch.nil? && appender.respond_to?(:batch)

if batch == true
Appender::AsyncBatch.new(
appender: appender,
max_queue_size: max_queue_size,
lag_threshold_s: lag_threshold_s,
batch_size: batch_size,
batch_seconds: batch_seconds
)
elsif async == true
Appender::Async.new(
appender: appender,
max_queue_size: max_queue_size,
lag_check_interval: lag_check_interval,
lag_threshold_s: lag_threshold_s
)
else
appender
end
return appender unless batch == true || async == true

# Run appender asynchronously in a separate thread.
Appender::Async.new(
appender: appender,
max_queue_size: max_queue_size,
lag_check_interval: lag_check_interval,
lag_threshold_s: lag_threshold_s,
async_max_retries: async_max_retries,
batch: batch,
batch_size: batch_size,
batch_seconds: batch_seconds
)
end

# Returns [Subscriber] instance from the supplied options.
Expand All @@ -67,18 +62,20 @@ def self.build(io: nil, file_name: nil, appender: nil, metric: nil, logger: nil,
elsif logger
SemanticLogger::Appender::Wrapper.new(logger: logger, **args, &block)
elsif appender
if appender.is_a?(Symbol)
case appender
when Symbol
SemanticLogger::Utils.constantize_symbol(appender).new(**args)
elsif appender.is_a?(Subscriber)
when Subscriber
appender
else
raise(ArgumentError,
"Parameter :appender must be either a Symbol or an object derived from SemanticLogger::Subscriber, not: #{appender.inspect}")
end
elsif metric
if metric.is_a?(Symbol)
case metric
when Symbol
SemanticLogger::Utils.constantize_symbol(metric, "SemanticLogger::Metric").new(**args)
elsif metric.is_a?(Subscriber)
when Subscriber
metric
else
raise(ArgumentError,
Expand Down
210 changes: 56 additions & 154 deletions lib/semantic_logger/appender/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,203 +6,105 @@ module Appender
class Async
extend Forwardable

attr_accessor :lag_check_interval, :lag_threshold_s
attr_reader :queue, :appender, :max_queue_size
attr_reader :appender

# Forward methods that can be called directly
def_delegator :@appender, :name
def_delegator :@appender, :should_log?
def_delegator :@appender, :filter
def_delegator :@appender, :host
def_delegator :@appender, :application
def_delegator :@appender, :environment
def_delegator :@appender, :level
def_delegator :@appender, :level=
def_delegator :@appender, :logger
def_delegator :@appender, :logger=
def_delegators :@appender, :name, :should_log?, :filter,
:host, :application, :environment,
:level, :level=, :logger, :logger=
def_delegators :@processor, :active?, :capped?, :max_queue_size,
:lag_check_interval, :lag_threshold_s, :async_max_retries,
:batch?, :batch_size, :batch_seconds, :queue

# Appender proxy to allow an existing appender to run asynchronously in a separate thread.
#
# Parameters:
# appender: [SemanticLogger::Subscriber]
# The appender to log to in a separate thread.
#
# max_queue_size: [Integer]
# The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
# -1: The queue size is uncapped and will never block no matter how long the queue is.
# Default: 10,000
#
# lag_threshold_s [Float]
# lag_threshold_s: [Float]
# Log a warning when a log message has been on the queue for longer than this period in seconds.
# Default: 30
#
# lag_check_interval: [Integer]
# Number of messages to process before checking for slow logging.
# Default: 1,000
def initialize(appender:,
max_queue_size: 10_000,
lag_check_interval: 1_000,
lag_threshold_s: 30)

@appender = appender
@lag_check_interval = lag_check_interval
@lag_threshold_s = lag_threshold_s
@thread = nil
@max_queue_size = max_queue_size
create_queue
thread
# Note: Not applicable when batch: `true`.
#
# async_max_retries: [Integer]
# Maximum number of consecutive failed attempts when trying to log to an appender.
# Default: 100
#
# batch: [true|false]
# Process log messages in batches for appenders that support batches.
# Note: Appenders must implement the `batch` method.
#
# batch_size: [Integer]
# Maximum number of messages to batch up before sending.
# Default: 300
# Note: Only applicable when batch: `true`
#
# batch_seconds: [Integer]
# Maximum number of seconds between sending batches.
# Default: 5
# Note: Only applicable when batch: `true`
def initialize(appender:, **args)
@appender = appender
@args = args
create_processor
end

# Re-open appender after a fork
def reopen
# Workaround CRuby crash on fork by recreating queue on reopen
# https://github.com/reidmorrison/semantic_logger/issues/103
@queue&.close
create_queue
# When #reopen is called without forking, cleanup running thread.
close if !closed? && active?

appender.reopen if appender.respond_to?(:reopen)

@thread&.kill if @thread&.alive?
@thread = Thread.new { process }
end

# Returns [true|false] if the queue has a capped size.
def capped?
@capped
end

# Returns [Thread] the worker thread.
#
# Starts the worker thread if not running.
def thread
return @thread if @thread&.alive?

@thread = Thread.new { process }
end

# Returns true if the worker thread is active
def active?
@thread&.alive?
create_processor
true
end

# Add log message for processing.
def log(log)
queue << log
return false if closed?

processor.log(log)
end

# Flush all queued log entries disk, database, etc.
# All queued log messages are written and then each appender is flushed in turn.
def flush
submit_request(:flush)
return false if closed?

processor.flush
end

# Close all appenders and flush any outstanding messages.
# Ignores multiple close requests.
# Only returns once the messages have been flushed to disk.
def close
# TODO: Prevent new close requests once this appender has been closed.
submit_request(:close)
end
return false if closed?

private

def create_queue
if max_queue_size == -1
@queue = Queue.new
@capped = false
else
@queue = SizedQueue.new(max_queue_size)
@capped = true
end
@closed = true
processor.close
end

# Separate thread for batching up log messages before writing.
def process
# This thread is designed to never go down unless the main thread terminates
# or the appender is closed.
Thread.current.name = logger.name
logger.trace "Async: Appender thread active"
begin
process_messages
rescue StandardError => e
# This block may be called after the file handles have been released by Ruby
begin
logger.error("Async: Restarting due to exception", e)
rescue StandardError
nil
end
retry
rescue Exception => e
# This block may be called after the file handles have been released by Ruby
begin
logger.error("Async: Stopping due to fatal exception", e)
rescue StandardError
nil
end
ensure
@thread = nil
# This block may be called after the file handles have been released by Ruby
begin
logger.trace("Async: Thread has stopped")
rescue StandardError
nil
end
end
def closed?
@closed
end

def process_messages
count = 0
while (message = queue.pop)
if message.is_a?(Log)
appender.log(message)
count += 1
# Check every few log messages whether this appender thread is falling behind
if count > lag_check_interval
check_lag(message)
count = 0
end
else
break unless process_message(message)
end
end
logger.trace "Async: Queue Closed"
end

# Returns false when message processing should be stopped
def process_message(message)
case message[:command]
when :flush
appender.flush
message[:reply_queue] << true if message[:reply_queue]
when :close
appender.close
message[:reply_queue] << true if message[:reply_queue]
return false
else
logger.warn "Async: Appender thread: Ignoring unknown command: #{message[:command]}"
end
true
end

def check_lag(log)
diff = Time.now - log.time
return unless diff > lag_threshold_s
private

logger.warn "Async: Appender thread has fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders"
end
attr_reader :args, :processor

# Submit command and wait for reply
def submit_request(command)
return false unless active?

queue_size = queue.size
msg = "Async: Queued log messages: #{queue_size}, running command: #{command}"
if queue_size > 1_000
logger.warn msg
elsif queue_size > 100
logger.info msg
elsif queue_size.positive?
logger.trace msg
end

reply_queue = Queue.new
queue << {command: command, reply_queue: reply_queue}
reply_queue.pop
def create_processor
@processor = QueueProcessor.start(appender: appender, **args)
@closed = false
end
end
end
Expand Down
Loading
Loading