Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Treat timeout error before pool shutting down error #359

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/moped/failover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module Failover
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::QueryFailure => Reconfigure,
Errors::PoolTimeout => Retry
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand All @@ -24,11 +24,13 @@ module Retry
#
# @since 2.0.0
def execute(exception, node)
node.disconnect
node.disconnect unless exception.is_a?(Errors::PoolTimeout)
begin
node.connection do |conn|
yield(conn) if block_given?
end
rescue Errors::PoolTimeout => e
raise Errors::ConnectionFailure.new e
rescue Exception => e
node.down!
raise(e)
Expand Down
16 changes: 14 additions & 2 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,14 @@ def connected?
#
# @since 2.0.0
def connection
pool.with do |conn|
yield(conn)
connection_acquired = false
begin
pool.with do |conn|
connection_acquired = true
yield(conn)
end
rescue Timeout::Error => e
raise connection_acquired ? e : Errors::PoolTimeout.new(e)
end
end

Expand Down Expand Up @@ -156,6 +162,12 @@ def down!
Connection::Manager.shutdown(self)
end

def flush_connection_credentials
connection do |conn|
conn.credentials.clear
end
end

# Yields the block if a connection can be established, retrying when a
# connection error is raised.
#
Expand Down
5 changes: 3 additions & 2 deletions lib/moped/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))
authentication_error = e.is_a?(Errors::PotentialReconfiguration) && e.message.match(/not (master|primary|authorized)/i)
raise e if e.is_a?(Errors::PotentialReconfiguration) && !authentication_error

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.nodes.each { |node| node.flush_connection_credentials } if authentication_error
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
Expand Down
43 changes: 43 additions & 0 deletions spec/moped/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,47 @@
end
end
end

describe "#connection" do
let(:node) do
described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1)
end

context "when take a long time to get a connection from pool" do
it "raise a Errors::PoolTimeout error" do
expect {

exception = nil
100.times.map do |i|
Thread.new do
begin
node.connection do |conn|
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
end
rescue => e
exception = e if exception.nil?
end
end
end.each {|t| t.join }
raise exception unless exception.nil?

}.to raise_error(Moped::Errors::PoolTimeout)
end
end

context "when the timeout happens after get a connection from pool" do
it "raise a Timeout::Error" do
expect {
node.connection do |conn|
Timeout::timeout(0.01) do
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
sleep(0.1) # just to simulate a long block which raise a timeout
end
end
}.to raise_error(Timeout::Error)
end
end
end
end
20 changes: 20 additions & 0 deletions spec/moped/session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,24 @@
nodes.last.should be_down
end
end

context "when connections on pool are busy" do
let(:session) do
Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1)
end

it "should retry the operation" do
session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true})

results = []

300.times.map do |i|
Thread.new do
results.push session[:test].find({ name: "test_counter" }).first["cnt"]
end
end.each {|t| t.join }

expect(results.count).to eql(300)
end
end
end