diff --git a/lib/semantic_logger.rb b/lib/semantic_logger.rb index 0728e6f3..d8cc2bb9 100644 --- a/lib/semantic_logger.rb +++ b/lib/semantic_logger.rb @@ -13,6 +13,7 @@ require "semantic_logger/appender" require "semantic_logger/appenders" require "semantic_logger/processor" +require "semantic_logger/queue_processor" require "semantic_logger/sync_processor" require "semantic_logger/logger" require "semantic_logger/debug_as_trace_logger" diff --git a/lib/semantic_logger/appender.rb b/lib/semantic_logger/appender.rb index 6b950027..3a8dc1b7 100644 --- a/lib/semantic_logger/appender.rb +++ b/lib/semantic_logger/appender.rb @@ -28,34 +28,29 @@ module Appender # @formatter:on # Returns [SemanticLogger::Subscriber] appender for the supplied options - def self.factory(async: false, batch: nil, - max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, - batch_size: 300, batch_seconds: 5, + def self.factory(async: false, + max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, async_max_retries: 100, + batch: nil, batch_size: 300, batch_seconds: 5, **args, &block) appender = build(**args, &block) - # If appender implements #batch, then it should use the batch proxy by default. + # If appender implements #batch, then it should use the async batch processor by default. batch = true if batch.nil? && appender.respond_to?(:batch) - if batch == true - Appender::AsyncBatch.new( - appender: appender, - max_queue_size: max_queue_size, - lag_threshold_s: lag_threshold_s, - batch_size: batch_size, - batch_seconds: batch_seconds - ) - elsif async == true - Appender::Async.new( - appender: appender, - max_queue_size: max_queue_size, - lag_check_interval: lag_check_interval, - lag_threshold_s: lag_threshold_s - ) - else - appender - end + return appender unless batch == true || async == true + + # Run appender asynchronously in a separate thread. + Appender::Async.new( + appender: appender, + max_queue_size: max_queue_size, + lag_check_interval: lag_check_interval, + lag_threshold_s: lag_threshold_s, + async_max_retries: async_max_retries, + batch: batch, + batch_size: batch_size, + batch_seconds: batch_seconds + ) end # Returns [Subscriber] instance from the supplied options. @@ -67,18 +62,20 @@ def self.build(io: nil, file_name: nil, appender: nil, metric: nil, logger: nil, elsif logger SemanticLogger::Appender::Wrapper.new(logger: logger, **args, &block) elsif appender - if appender.is_a?(Symbol) + case appender + when Symbol SemanticLogger::Utils.constantize_symbol(appender).new(**args) - elsif appender.is_a?(Subscriber) + when Subscriber appender else raise(ArgumentError, "Parameter :appender must be either a Symbol or an object derived from SemanticLogger::Subscriber, not: #{appender.inspect}") end elsif metric - if metric.is_a?(Symbol) + case metric + when Symbol SemanticLogger::Utils.constantize_symbol(metric, "SemanticLogger::Metric").new(**args) - elsif metric.is_a?(Subscriber) + when Subscriber metric else raise(ArgumentError, diff --git a/lib/semantic_logger/appender/async.rb b/lib/semantic_logger/appender/async.rb index b5ca7748..3eb3ad67 100644 --- a/lib/semantic_logger/appender/async.rb +++ b/lib/semantic_logger/appender/async.rb @@ -6,203 +6,105 @@ module Appender class Async extend Forwardable - attr_accessor :lag_check_interval, :lag_threshold_s - attr_reader :queue, :appender, :max_queue_size + attr_reader :appender # Forward methods that can be called directly - def_delegator :@appender, :name - def_delegator :@appender, :should_log? - def_delegator :@appender, :filter - def_delegator :@appender, :host - def_delegator :@appender, :application - def_delegator :@appender, :environment - def_delegator :@appender, :level - def_delegator :@appender, :level= - def_delegator :@appender, :logger - def_delegator :@appender, :logger= + def_delegators :@appender, :name, :should_log?, :filter, + :host, :application, :environment, + :level, :level=, :logger, :logger= + def_delegators :@processor, :active?, :capped?, :max_queue_size, + :lag_check_interval, :lag_threshold_s, :async_max_retries, + :batch?, :batch_size, :batch_seconds, :queue # Appender proxy to allow an existing appender to run asynchronously in a separate thread. # # Parameters: + # appender: [SemanticLogger::Subscriber] + # The appender to log to in a separate thread. + # # max_queue_size: [Integer] # The maximum number of log messages to hold on the queue before blocking attempts to add to the queue. # -1: The queue size is uncapped and will never block no matter how long the queue is. # Default: 10,000 # - # lag_threshold_s [Float] + # lag_threshold_s: [Float] # Log a warning when a log message has been on the queue for longer than this period in seconds. # Default: 30 # # lag_check_interval: [Integer] # Number of messages to process before checking for slow logging. # Default: 1,000 - def initialize(appender:, - max_queue_size: 10_000, - lag_check_interval: 1_000, - lag_threshold_s: 30) - - @appender = appender - @lag_check_interval = lag_check_interval - @lag_threshold_s = lag_threshold_s - @thread = nil - @max_queue_size = max_queue_size - create_queue - thread + # Note: Not applicable when batch: `true`. + # + # async_max_retries: [Integer] + # Maximum number of consecutive failed attempts when trying to log to an appender. + # Default: 100 + # + # batch: [true|false] + # Process log messages in batches for appenders that support batches. + # Note: Appenders must implement the `batch` method. + # + # batch_size: [Integer] + # Maximum number of messages to batch up before sending. + # Default: 300 + # Note: Only applicable when batch: `true` + # + # batch_seconds: [Integer] + # Maximum number of seconds between sending batches. + # Default: 5 + # Note: Only applicable when batch: `true` + def initialize(appender:, **args) + @appender = appender + @args = args + create_processor end # Re-open appender after a fork def reopen - # Workaround CRuby crash on fork by recreating queue on reopen - # https://github.com/reidmorrison/semantic_logger/issues/103 - @queue&.close - create_queue + # When #reopen is called without forking, cleanup running thread. + close if !closed? && active? appender.reopen if appender.respond_to?(:reopen) - - @thread&.kill if @thread&.alive? - @thread = Thread.new { process } - end - - # Returns [true|false] if the queue has a capped size. - def capped? - @capped - end - - # Returns [Thread] the worker thread. - # - # Starts the worker thread if not running. - def thread - return @thread if @thread&.alive? - - @thread = Thread.new { process } - end - - # Returns true if the worker thread is active - def active? - @thread&.alive? + create_processor + true end # Add log message for processing. def log(log) - queue << log + return false if closed? + + processor.log(log) end # Flush all queued log entries disk, database, etc. # All queued log messages are written and then each appender is flushed in turn. def flush - submit_request(:flush) + return false if closed? + + processor.flush end # Close all appenders and flush any outstanding messages. + # Ignores multiple close requests. + # Only returns once the messages have been flushed to disk. def close - # TODO: Prevent new close requests once this appender has been closed. - submit_request(:close) - end + return false if closed? - private - - def create_queue - if max_queue_size == -1 - @queue = Queue.new - @capped = false - else - @queue = SizedQueue.new(max_queue_size) - @capped = true - end + @closed = true + processor.close end - # Separate thread for batching up log messages before writing. - def process - # This thread is designed to never go down unless the main thread terminates - # or the appender is closed. - Thread.current.name = logger.name - logger.trace "Async: Appender thread active" - begin - process_messages - rescue StandardError => e - # This block may be called after the file handles have been released by Ruby - begin - logger.error("Async: Restarting due to exception", e) - rescue StandardError - nil - end - retry - rescue Exception => e - # This block may be called after the file handles have been released by Ruby - begin - logger.error("Async: Stopping due to fatal exception", e) - rescue StandardError - nil - end - ensure - @thread = nil - # This block may be called after the file handles have been released by Ruby - begin - logger.trace("Async: Thread has stopped") - rescue StandardError - nil - end - end + def closed? + @closed end - def process_messages - count = 0 - while (message = queue.pop) - if message.is_a?(Log) - appender.log(message) - count += 1 - # Check every few log messages whether this appender thread is falling behind - if count > lag_check_interval - check_lag(message) - count = 0 - end - else - break unless process_message(message) - end - end - logger.trace "Async: Queue Closed" - end - - # Returns false when message processing should be stopped - def process_message(message) - case message[:command] - when :flush - appender.flush - message[:reply_queue] << true if message[:reply_queue] - when :close - appender.close - message[:reply_queue] << true if message[:reply_queue] - return false - else - logger.warn "Async: Appender thread: Ignoring unknown command: #{message[:command]}" - end - true - end - - def check_lag(log) - diff = Time.now - log.time - return unless diff > lag_threshold_s + private - logger.warn "Async: Appender thread has fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders" - end + attr_reader :args, :processor - # Submit command and wait for reply - def submit_request(command) - return false unless active? - - queue_size = queue.size - msg = "Async: Queued log messages: #{queue_size}, running command: #{command}" - if queue_size > 1_000 - logger.warn msg - elsif queue_size > 100 - logger.info msg - elsif queue_size.positive? - logger.trace msg - end - - reply_queue = Queue.new - queue << {command: command, reply_queue: reply_queue} - reply_queue.pop + def create_processor + @processor = QueueProcessor.start(appender: appender, **args) + @closed = false end end end diff --git a/lib/semantic_logger/appender/async_batch.rb b/lib/semantic_logger/appender/async_batch.rb deleted file mode 100644 index cfe5e45f..00000000 --- a/lib/semantic_logger/appender/async_batch.rb +++ /dev/null @@ -1,94 +0,0 @@ -module SemanticLogger - module Appender - # Log asynchronously in batches using a separate thread. - # - # Log messages are grouped up and only logged when: - # * The number of queued messages is exceeded. - # * Or, the appropriate amount of time has passed since the last batch was sent. - class AsyncBatch < Async - attr_accessor :batch_size, :batch_seconds - attr_reader :signal - - # Batching Appender proxy for appenders that support batches. - # - # Parameters: - # batch_size: [Integer] - # Maximum number of messages to batch up before sending. - # Default: 300 - # - # batch_seconds: [Integer] - # Maximum number of seconds between sending batches. - # Default: 5 - # - # See SemanticLogger::Appender::Async for other paramaters - # - # Note: - # * `lag_check_interval` is not applicable to batches, since the first message of every batch - # is the oldest and is always checked to see if the lag interval has been exceeded. - def initialize(appender:, - max_queue_size: 10_000, - lag_threshold_s: 30, - batch_size: 300, - batch_seconds: 5) - - @batch_size = batch_size - @batch_seconds = batch_seconds - @signal = Concurrent::Event.new - super( - appender: appender, - max_queue_size: max_queue_size, - lag_threshold_s: lag_threshold_s - ) - - return if appender.respond_to?(:batch) - - raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch") - end - - # Add log message for processing. - def log(log) - result = super(log) - # Wake up the processing thread since the number of queued messages has been exceeded. - signal.set if queue.size >= batch_size - result - end - - private - - # Separate thread for batching up log messages before writing. - def process_messages - loop do - # Wait for batch interval or number of messages to be exceeded. - signal.wait(batch_seconds) - - logs = [] - messages = [] - first = true - message_count = queue.length - message_count.times do - # Queue#pop(true) raises an exception when there are no more messages, which is considered expensive. - message = queue.pop - if message.is_a?(Log) - logs << message - if first - check_lag(message) - first = false - end - else - messages << message - end - end - appender.batch(logs) if logs.size.positive? - messages.each { |message| process_message(message) } - signal.reset unless queue.size >= batch_size - end - end - - def submit_request(command) - # Wake up the processing thread to process this command immediately. - signal.set - super - end - end - end -end diff --git a/lib/semantic_logger/processor.rb b/lib/semantic_logger/processor.rb index 5dd45fa5..f62bcf4a 100644 --- a/lib/semantic_logger/processor.rb +++ b/lib/semantic_logger/processor.rb @@ -27,13 +27,5 @@ def initialize(max_queue_size: -1) @appenders = Appenders.new(self.class.logger.dup) super(appender: @appenders, max_queue_size: max_queue_size) end - - # Start the appender thread - def start - return false if active? - - thread - true - end end end diff --git a/lib/semantic_logger/queue_processor.rb b/lib/semantic_logger/queue_processor.rb new file mode 100644 index 00000000..4a9cfc15 --- /dev/null +++ b/lib/semantic_logger/queue_processor.rb @@ -0,0 +1,201 @@ +require "forwardable" + +module SemanticLogger + # Internal class to process log messages from a queue. + class QueueProcessor + attr_reader :appender, :max_queue_size, :logger, + :lag_check_interval, :lag_threshold_s, :async_max_retries, :batch_size, :batch_seconds, + :queue, :thread, :retry_count, :signal + + def self.start(**args) + processor = new(**args) + processor.send(:create_thread) + processor + end + + def initialize(appender:, max_queue_size: 10_000, logger: nil, + lag_check_interval: 1_000, lag_threshold_s: 30, async_max_retries: 100, + batch: false, batch_size: 300, batch_seconds: 5) + @appender = appender + @capped = max_queue_size != -1 + @queue = @capped ? SizedQueue.new(max_queue_size) : Queue.new + @lag_check_interval = lag_check_interval + @lag_threshold_s = lag_threshold_s + @async_max_retries = async_max_retries + @logger = logger || appender.logger + @retry_count = 0 + @batch = batch + @batch_size = batch_size + @batch_seconds = batch_seconds + @signal = Concurrent::Event.new + + return unless batch && !appender.respond_to?(:batch) + + raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch") + end + + def log(log) + # Freeze the log entry to prevent subsequent changes since it is being shared across threads. + queue << log.freeze + + # For batches wake up the processing thread once the number of queued messages has been exceeded. + signal.set if batch? && (queue.size >= batch_size) + + true + end + + def flush + send_command(:flush) + end + + def close + send_command(:close) + end + + def active? + thread&.alive? + end + + # Returns [true|false] whether the messages queue is capped. + def capped? + @capped + end + + # Returns [true|false] whether the messages are being processed in batches. + def batch? + @batch + end + + private + + attr_writer :thread, :retry_count + + # Submit command and wait for reply + def send_command(command) + return false unless active? + + queue_size = queue.size + msg = "QueueProcessor: Queued log messages: #{queue_size}, running command: #{command}" + if queue_size > 1_000 + logger.warn msg + elsif queue_size > 100 + logger.info msg + elsif queue_size.positive? + logger.trace msg + end + + # Wake up the processing thread to process this command immediately. + signal.set if batch? + + reply_queue = Queue.new + queue << {command: command, reply_queue: reply_queue} + reply_queue.pop + end + + def create_thread + self.thread = Thread.new do + Thread.current.name = logger.name + process + end + end + + # Process messages from the queue. + def process + # This thread is designed to never go down unless the main thread terminates + # or the appender is closed. + logger.trace "QueueProcessor: Processing messages" + begin + batch? ? process_messages_in_batches : process_messages + rescue StandardError => e + if retry_count < async_max_retries + self.retry_count += 1 + safe_log(:warn, "QueueProcessor: Sleeping #{retry_count} second(s). Retry: #{retry_count}", e) + sleep(retry_count) + retry + else + safe_log(:error, "QueueProcessor: Stopping, exhausted #{retry_count} retries", e) + end + rescue Exception => e + safe_log(:error, "QueueProcessor: Stopping due to a fatal exception", e) + end + safe_log(:trace, "QueueProcessor: Stopped processing messages") + end + + def process_messages + count = 0 + while (message = queue.pop) + if message.is_a?(Log) + appender.log(message) + self.retry_count = 0 + count += 1 + # Check every few log messages whether this appender thread is falling behind + if count > lag_check_interval + check_lag(message) + count = 0 + end + else + return unless process_command(message) + end + end + end + + def process_messages_in_batches + loop do + # Wait for batch interval or number of messages to be exceeded. + signal.wait(batch_seconds) + + logs = [] + commands = [] + message_count = queue.length + message_count.times do + # Queue#pop(true) raises an exception when there are no more messages, which is considered expensive. + message = queue.pop + if message.is_a?(Log) + logs << message + if logs.size >= batch_size + appender.batch(logs) + logs = [] + check_lag(message) + end + else + commands << message + end + end + appender.batch(logs) if logs.size.positive? + self.retry_count = 0 + commands.each { |message| return unless process_command(message) } + signal.reset unless queue.size >= batch_size + end + end + + # Returns false when message processing should be stopped + def process_command(message) + case message[:command] + when :flush + appender.flush + message[:reply_queue] << true if message[:reply_queue] + when :close + appender.close + message[:reply_queue] << true if message[:reply_queue] + return false + else + logger.warn "QueueProcessor: Ignoring unknown command: #{message[:command]}" + end + true + end + + def check_lag(log) + diff = Time.now - log.time + return unless diff > lag_threshold_s + + logger.warn "QueueProcessor: Fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders" + end + + def safe_log(level, message, exception = nil) + # In case the file handles has already been released by Ruby + logger.public_send(level, message, exception) + rescue StandardError + nil + end + end +end diff --git a/lib/semantic_logger/semantic_logger.rb b/lib/semantic_logger/semantic_logger.rb index 5b9b1183..a36d35b0 100644 --- a/lib/semantic_logger/semantic_logger.rb +++ b/lib/semantic_logger/semantic_logger.rb @@ -163,10 +163,7 @@ def self.environment=(environment) # logger.info "Hello World" # logger.debug("Login time", user: 'Joe', duration: 100, ip_address: '127.0.0.1') def self.add_appender(**args, &block) - appender = appenders.add(**args, &block) - # Start appender thread if it is not already running - Logger.processor.start - appender + appenders.add(**args, &block) end # Remove an existing appender diff --git a/lib/semantic_logger/sync_processor.rb b/lib/semantic_logger/sync_processor.rb index aebbd4d4..20a461e9 100644 --- a/lib/semantic_logger/sync_processor.rb +++ b/lib/semantic_logger/sync_processor.rb @@ -50,9 +50,5 @@ def initialize(appenders = nil) @monitor = Monitor.new @appenders = appenders || Appenders.new(self.class.logger.dup) end - - def start - # NOP - end end end diff --git a/lib/semantic_logger/test/capture_log_events.rb b/lib/semantic_logger/test/capture_log_events.rb index 2c5a7392..c8ade1af 100644 --- a/lib/semantic_logger/test/capture_log_events.rb +++ b/lib/semantic_logger/test/capture_log_events.rb @@ -25,7 +25,7 @@ class CaptureLogEvents < SemanticLogger::Subscriber # By default collect all log levels, and collect metric only log events. def initialize(level: :trace, metrics: true) super(level: level, metrics: true) - @events = [] + @events = [] end def log(log) @@ -33,11 +33,6 @@ def log(log) @events << log end - # Supports batching of log events - def batch(logs) - @events += log - end - def clear @events.clear end diff --git a/test/appender/async_batch_test.rb b/test/appender/async_batch_test.rb deleted file mode 100644 index c3fe9fa5..00000000 --- a/test/appender/async_batch_test.rb +++ /dev/null @@ -1,60 +0,0 @@ -require_relative "../test_helper" - -module Appender - class AsyncBatchTest < Minitest::Test - describe SemanticLogger::Appender::Async do - include InMemoryAppenderHelper - - let :appender do - InMemoryBatchAppender.new - end - - describe "with default batch_size" do - let :added_appender do - SemanticLogger.add_appender(appender: appender, batch: true) - end - - it "uses the batch proxy" do - assert_instance_of SemanticLogger::Appender::AsyncBatch, added_appender - end - - it "logs messages after a flush" do - logger.info("hello world1") - refute appender.message - - logger.info("hello world2") - refute appender.message - - logger.info("hello world3") - refute appender.message - - # Calls flush - assert logs = log_message - assert_equal 3, logs.size, logs - assert_equal "hello world1", logs[0].message - assert_equal "hello world2", logs[1].message - assert_equal "hello world3", logs[2].message - end - end - - # :batch_size, :batch_seconds - describe "with batch_size 1" do - let :added_appender do - SemanticLogger.add_appender(appender: appender, batch: true, batch_size: 1) - end - - it "uses the batch proxy" do - assert_instance_of SemanticLogger::Appender::AsyncBatch, added_appender - end - - it "logs message immediately" do - logger.info("hello world") - - assert logs = log_message - assert_equal 1, logs.size, logs - assert_equal "hello world", logs.first.message - end - end - end - end -end diff --git a/test/appender/async_test.rb b/test/appender/async_test.rb index a5fd6e1b..79882e40 100644 --- a/test/appender/async_test.rb +++ b/test/appender/async_test.rb @@ -3,40 +3,80 @@ module Appender class AsyncTest < Minitest::Test describe SemanticLogger::Appender::Async do - include InMemoryAppenderHelper + let(:log) do + log = SemanticLogger::Log.new("User", :info) + log.message = "hello world" + log.level = :info + log + end - describe "with capped queue" do - let :added_appender do - SemanticLogger.add_appender(appender: appender, async: true) - end + let(:logger) { SemanticLogger::Test::CaptureLogEvents.new } + + let(:appender) do + appender = SemanticLogger::Test::CaptureLogEvents.new + appender.logger = logger + appender + end + + let(:subject) do + SemanticLogger::Appender::Async.new(appender: appender, async_max_retries: 2) + end - it "uses the async proxy" do - assert_instance_of SemanticLogger::Appender::Async, added_appender + describe ".new" do + it "starts a processor" do + assert_equal true, subject.active? end - it "logs message immediately" do - logger.info("hello world") + it "is not closed" do + assert_equal false, subject.closed? + end + end - assert log = log_message - assert_equal "hello world", log.message + describe "#log" do + it "calls log on the processor" do + assert_equal true, subject.log(log) + assert_equal log, subject.queue.pop end + end - it "uses an capped queue" do - assert_instance_of SizedQueue, added_appender.queue + describe "#flush" do + it "calls flush on the processor" do + subject.send(:processor).stub(:flush, true) do + assert_equal true, subject.flush + end + assert_equal false, subject.closed? end end - describe "with uncapped queue" do - let :added_appender do - SemanticLogger.add_appender(appender: appender, async: true, max_queue_size: -1) + describe "#close" do + it "calls close on the processor" do + subject.send(:processor).stub(:close, true) do + assert_equal true, subject.close + end + assert_equal true, subject.closed? end + end - it "uses the async proxy" do - assert_instance_of SemanticLogger::Appender::Async, added_appender + describe "#reopen" do + it "closes before reopening" do + called = false + subject.send(:processor).stub(:close, -> { called = true }) do + assert_equal true, subject.reopen + end + assert called, "Did not call close on processor first" + assert_equal false, subject.closed? + assert_equal true, subject.active? end - it "uses an uncapped queue" do - assert_instance_of Queue, added_appender.queue + it "reopens a closed processor" do + called = nil + subject.close + subject.send(:processor).stub(:close, -> { called = true }) do + assert_equal true, subject.reopen + end + refute called, "Called close on closed processor" + assert_equal false, subject.closed? + assert_equal true, subject.active? end end end diff --git a/test/appenders_test.rb b/test/appenders_test.rb index 6ffdc01b..f0ec7f45 100644 --- a/test/appenders_test.rb +++ b/test/appenders_test.rb @@ -1,6 +1,14 @@ require_relative "test_helper" class AppendersTest < Minitest::Test + class BatchAppender < SemanticLogger::Subscriber + attr_accessor :batches + + def batch(events) + (@batches ||= []) << events + end + end + describe SemanticLogger::Appenders do let(:capture_logger) { SemanticLogger::Test::CaptureLogEvents.new } let(:appenders) { SemanticLogger::Appenders.new(capture_logger) } @@ -91,14 +99,14 @@ class AppendersTest < Minitest::Test end it "adds batch proxy" do - appender = appenders.add(appender: logger, batch: true) - assert_instance_of SemanticLogger::Appender::AsyncBatch, appender + appender = appenders.add(appender: BatchAppender.new, batch: true) + assert_instance_of SemanticLogger::Appender::Async, appender end - # it "adds async proxy" do - # appender = appenders.add(appender: logger, async: true) - # assert_instance_of SemanticLogger::Appender::Async, appender - # end + it "adds async proxy" do + appender = appenders.add(appender: logger, async: true) + assert_instance_of SemanticLogger::Appender::Async, appender + end end end end diff --git a/test/in_memory_appender.rb b/test/in_memory_appender.rb deleted file mode 100644 index 09360826..00000000 --- a/test/in_memory_appender.rb +++ /dev/null @@ -1,8 +0,0 @@ -# Store in memory the last log message received. -class InMemoryAppender < SemanticLogger::Subscriber - attr_accessor :message - - def log(log) - self.message = log - end -end diff --git a/test/in_memory_appender_helper.rb b/test/in_memory_appender_helper.rb deleted file mode 100644 index b29705f4..00000000 --- a/test/in_memory_appender_helper.rb +++ /dev/null @@ -1,43 +0,0 @@ -require "minitest/shared_description" - -InMemoryAppenderHelper = shared_description do - let :log_message do - SemanticLogger.flush - appender.message - end - - let :log_filter do - nil - end - - let :appender do - InMemoryAppender.new - end - - let :thread_name do - Thread.current.name - end - - let :payload do - {session_id: "HSSKLEU@JDK767", tracking_number: 12_345, message: "Message from payload"} - end - - let :logger do - SemanticLogger::Logger.new("TestLogger", nil, log_filter) - end - - let :added_appender do - SemanticLogger.add_appender(appender: appender) - end - - before do - SemanticLogger.default_level = :trace - SemanticLogger.backtrace_level = :trace - SemanticLogger.flush - added_appender - end - - after do - SemanticLogger.appenders.each { |appender| SemanticLogger.remove_appender(appender) } - end -end diff --git a/test/in_memory_batch_appender.rb b/test/in_memory_batch_appender.rb deleted file mode 100644 index e4a31c74..00000000 --- a/test/in_memory_batch_appender.rb +++ /dev/null @@ -1,8 +0,0 @@ -# Store in memory the last log message received. -class InMemoryBatchAppender < SemanticLogger::Subscriber - attr_accessor :message - - def batch(logs) - self.message = logs - end -end diff --git a/test/in_memory_metrics_appender.rb b/test/in_memory_metrics_appender.rb deleted file mode 100644 index 31ec09f6..00000000 --- a/test/in_memory_metrics_appender.rb +++ /dev/null @@ -1,13 +0,0 @@ -# Store in memory the last log message received. -class InMemoryMetricsAppender < SemanticLogger::Subscriber - attr_accessor :message - - def log(log) - self.message = log - end - - # Only forward log entries that contain metrics. - def should_log?(log) - log.metric && meets_log_level?(log) && !filtered?(log) - end -end diff --git a/test/queue_processor_test.rb b/test/queue_processor_test.rb new file mode 100644 index 00000000..38ec203b --- /dev/null +++ b/test/queue_processor_test.rb @@ -0,0 +1,328 @@ +require_relative "test_helper" + +class QueueProcessorTest < Minitest::Test + class SimpleAppender < SemanticLogger::Test::CaptureLogEvents + def close + @closed = true + end + + def closed? + @closed + end + + def flush + @flushed = true + end + + def flushed? + @flushed + end + end + + class BatchAppender < SimpleAppender + attr_accessor :batches + + def batch(events) + (@batches ||= []) << events + end + end + + describe SemanticLogger::QueueProcessor do + let(:logger) { SemanticLogger::Test::CaptureLogEvents.new } + let(:appender) do + appender = SimpleAppender.new + appender.logger = logger + appender + end + let(:lag_check_interval) { 1_000 } + let(:lag_threshold_s) { 30 } + let(:log_time) { Time.now - 1 } + let(:max_queue_size) { 1_000 } + let(:batch) { false } + let(:log) do + log = SemanticLogger::Log.new("User", :info) + log.message = "hello world" + log.time = log_time + log.level = :info + log + end + + let(:subject) do + SemanticLogger::QueueProcessor.new( + appender: appender, + lag_check_interval: lag_check_interval, + lag_threshold_s: lag_threshold_s, + async_max_retries: 2, + batch: batch, + batch_size: 3, + batch_seconds: 1, + max_queue_size: max_queue_size + ) + end + + describe ".new" do + describe "batch" do + let(:batch) { true } + + it "checks for non-batch appender attempts" do + assert_raises(ArgumentError) { subject } + end + end + + it "creates a capped queue" do + assert_instance_of SizedQueue, subject.queue + assert_equal true, subject.capped? + end + + describe "infinite max_queue_size" do + let(:max_queue_size) { -1 } + + it "creates a capped queue" do + assert_instance_of Queue, subject.queue + assert_equal false, subject.capped? + end + end + end + + describe "log" do + it "adds the message to queue" do + subject.log(log) + assert message = subject.queue.pop + assert_equal message, log + end + end + + describe "#process" do + it "processes messages from the queue" do + subject.queue << log + subject.queue << {command: :close} + subject.send(:process) + + assert_equal 1, appender.events.count + end + + it "logs messages" do + subject.queue << log + subject.queue << {command: :close} + subject.send(:process) + + assert messages = logger.events.collect(&:message) + assert_equal 2, messages.count, messages + assert_includes messages[0], "Processing messages" + assert_includes messages[1], "Stopped processing messages" + end + + it "retries on standard error" do + subject.stub(:process_messages, -> { raise StandardError, "Standard" }) do + subject.send(:process) + end + assert messages = logger.events.collect(&:message) + assert_equal 5, messages.count, messages + assert_includes messages[0], "Processing messages" + assert_includes messages[1], "Sleeping 1 second(s). Retry: 1" + assert_includes messages[2], "Sleeping 2 second(s). Retry: 2" + assert_includes messages[3], "Stopping, exhausted 2 retries" + assert_includes messages[4], "Stopped processing messages" + end + + it "exits on exception" do + subject.stub(:process_messages, -> { raise Exception, "Exception" }) do + subject.send(:process) + end + assert messages = logger.events.collect(&:message) + assert_equal 3, messages.count, messages + assert_includes messages[0], "Processing messages" + assert_includes messages[1], "Stopping due to a fatal exception" + assert_includes messages[2], "Stopped processing messages" + end + end + + describe "#process_messages" do + it "processes close command" do + subject.queue << {command: :close} + subject.send(:process_messages) + assert appender.closed? + + assert logger.events.empty? + end + + it "processes messages" do + subject.queue << log + subject.queue << {command: :close} + subject.send(:process_messages) + + assert_equal 1, appender.events.count + assert log = appender.events.first + assert_equal :info, log.level, -> { log.ai } + assert_includes log.message, "hello world" + end + + it "processes multiple messages" do + subject.queue << log + log2 = log.dup + log2.level = :warn + log2.message = "oh no" + subject.queue << log2 + subject.queue << {command: :close} + subject.send(:process_messages) + + assert_equal 2, appender.events.count + assert log = appender.events.last + assert_equal :warn, log.level, -> { log.ai } + assert_includes log.message, "oh no" + end + + it "resets retry count" do + subject.instance_variable_set(:@retry_count, 30) + assert_equal 30, subject.retry_count + subject.queue << log + subject.queue << {command: :close} + subject.send(:process_messages) + assert_equal 0, subject.retry_count + end + end + + describe "#flush" do + it "successful when running" do + subject.send(:create_thread) + assert_equal true, subject.flush + assert appender.flushed? + end + + it "false if not running" do + assert_equal false, subject.flush + refute appender.flushed? + end + end + + describe "#close" do + it "successful when running" do + subject.send(:create_thread) + assert_equal true, subject.close + assert appender.closed? + end + + it "false if not running" do + assert_equal false, subject.close + refute appender.closed? + end + end + + describe "#process_command" do + it "logs invalid command" do + assert_equal true, subject.send(:process_command, command: :blah) + assert log = logger.events.first + assert_equal :warn, log.level + assert_includes log.message, "Ignoring unknown command: blah" + end + end + + describe "#check_lag" do + it "logs warning when lag is exceeded" do + subject.send(:check_lag, log) + assert logger.events.empty? + end + + describe "with very old message" do + let(:log_time) { Time.now - 40 } + + it "logs warning when lag is exceeded" do + subject.send(:check_lag, log) + assert log = logger.events.first + assert_equal :warn, log.level + assert_includes log.message, "Fallen behind by" + end + end + end + + describe "#logger" do + it "uses appenders loggeapr" do + assert_equal logger, subject.logger + end + end + + describe "batch appender" do + let(:batch) { true } + let(:appender) do + appender = BatchAppender.new + appender.logger = logger + appender + end + + describe "#batch?" do + it "is a batch" do + assert_equal true, subject.batch? + end + end + + describe "#log" do + it "adds the message to queue" do + subject.log(log) + assert message = subject.queue.pop + assert_equal message, log + end + + it "raises signal at batch_size" do + subject.log(log) + refute subject.signal.set? + + subject.log(log) + refute subject.signal.set? + + subject.log(log) + assert subject.signal.set? + end + end + + describe "#process_messages_in_batches" do + it "sends a partial batch" do + subject.queue << log + subject.queue << {command: :close} + subject.send(:process_messages_in_batches) + + assert_equal [[log]], appender.batches + end + + it "sends a full batch" do + 3.times { subject.queue << log } + subject.queue << {command: :close} + subject.send(:process_messages_in_batches) + + assert_equal [[log, log, log]], appender.batches + end + + it "sends full and partial batch" do + 4.times { subject.queue << log } + subject.queue << {command: :close} + subject.send(:process_messages_in_batches) + + assert_equal [[log, log, log], [log]], appender.batches + end + + it "sends 2 full batches" do + 6.times { subject.queue << log } + subject.queue << {command: :close} + subject.send(:process_messages_in_batches) + + assert_equal [[log, log, log], [log, log, log]], appender.batches + end + end + + describe "#send_command" do + it "wakes up thread immediately" do + assert_equal true, subject.batch? + thread = Thread.new do + message = subject.queue.pop + message[:reply_queue] << true + end + subject.stub(:active?, true) do + subject.send(:send_command, command: :close) + end + + assert subject.signal.set? + thread.join + end + end + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index b63fae91..8f7c7dd0 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -12,10 +12,6 @@ else require "semantic_logger" end -require_relative "in_memory_appender" -require_relative "in_memory_batch_appender" -require_relative "in_memory_metrics_appender" -require_relative "in_memory_appender_helper" require "amazing_print" # Add Semantic Logger helpers for Minitest