From 11751da90c3223534722b89dfadb321ec649ad4a Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Sun, 22 Mar 2015 18:09:32 -0300 Subject: [PATCH 1/3] return a Moped::Errors::PoolTimeout when a timeout happens while getting a connection from pool --- lib/moped/node.rb | 10 ++++++++-- spec/moped/node_spec.rb | 43 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/lib/moped/node.rb b/lib/moped/node.rb index 0cafd4e..b8cc073 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -111,8 +111,14 @@ def connected? # # @since 2.0.0 def connection - pool.with do |conn| - yield(conn) + connection_acquired = false + begin + pool.with do |conn| + connection_acquired = true + yield(conn) + end + rescue Timeout::Error => e + raise connection_acquired ? e : Errors::PoolTimeout.new(e) end end diff --git a/spec/moped/node_spec.rb b/spec/moped/node_spec.rb index aa88688..1013c2f 100644 --- a/spec/moped/node_spec.rb +++ b/spec/moped/node_spec.rb @@ -486,4 +486,47 @@ end end end + + describe "#connection" do + let(:node) do + described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1) + end + + context "when take a long time to get a connection from pool" do + it "raise a Errors::PoolTimeout error" do + expect { + + exception = nil + 100.times.map do |i| + Thread.new do + begin + node.connection do |conn| + conn.apply_credentials({}) + node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true}) + end + rescue => e + exception = e if exception.nil? + end + end + end.each {|t| t.join } + raise exception unless exception.nil? + + }.to raise_error(Moped::Errors::PoolTimeout) + end + end + + context "when the timeout happens after get a connection from pool" do + it "raise a Timeout::Error" do + expect { + node.connection do |conn| + Timeout::timeout(0.01) do + conn.apply_credentials({}) + node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true}) + sleep(0.1) # just to simulate a long block which raise a timeout + end + end + }.to raise_error(Timeout::Error) + end + end + end end From 49f65ef3c68a9858b37f39c7f0774abd6c75e2da Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Sun, 22 Mar 2015 23:17:27 -0300 Subject: [PATCH 2/3] fix retry for PoolTimeout error --- lib/moped/failover.rb | 3 ++- lib/moped/failover/retry.rb | 6 ++++-- spec/moped/session_spec.rb | 20 ++++++++++++++++++++ 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/lib/moped/failover.rb b/lib/moped/failover.rb index d893adc..37d2ce8 100644 --- a/lib/moped/failover.rb +++ b/lib/moped/failover.rb @@ -21,7 +21,8 @@ module Failover Errors::ConnectionFailure => Retry, Errors::CursorNotFound => Ignore, Errors::OperationFailure => Reconfigure, - Errors::QueryFailure => Reconfigure + Errors::QueryFailure => Reconfigure, + Errors::PoolTimeout => Retry }.freeze # Get the appropriate failover handler given the provided exception. diff --git a/lib/moped/failover/retry.rb b/lib/moped/failover/retry.rb index bb8b091..6de08e1 100644 --- a/lib/moped/failover/retry.rb +++ b/lib/moped/failover/retry.rb @@ -9,7 +9,7 @@ module Failover module Retry extend self - # Executes the failover strategy. In the case of retyr, we disconnect and + # Executes the failover strategy. In the case of retry, we disconnect and # reconnect, then try the operation one more time. # # @example Execute the retry strategy. @@ -24,11 +24,13 @@ module Retry # # @since 2.0.0 def execute(exception, node) - node.disconnect + node.disconnect unless exception.is_a?(Errors::PoolTimeout) begin node.connection do |conn| yield(conn) if block_given? end + rescue Errors::PoolTimeout => e + raise Errors::ConnectionFailure.new e rescue Exception => e node.down! raise(e) diff --git a/spec/moped/session_spec.rb b/spec/moped/session_spec.rb index 6d4005f..4706171 100644 --- a/spec/moped/session_spec.rb +++ b/spec/moped/session_spec.rb @@ -347,4 +347,24 @@ nodes.last.should be_down end end + + context "when connections on pool are busy" do + let(:session) do + Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1) + end + + it "should retry the operation" do + session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true}) + + results = [] + + 300.times.map do |i| + Thread.new do + results.push session[:test].find({ name: "test_counter" }).first["cnt"] + end + end.each {|t| t.join } + + expect(results.count).to eql(300) + end + end end From 372f22aca10cbd28d8652798d85875d05def67aa Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Tue, 14 Apr 2015 16:04:15 -0300 Subject: [PATCH 3/3] flush connection credentials to force a new login after an authentication/authorization error --- lib/moped/node.rb | 6 ++++++ lib/moped/retryable.rb | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/moped/node.rb b/lib/moped/node.rb index b8cc073..e8f0f3a 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -162,6 +162,12 @@ def down! Connection::Manager.shutdown(self) end + def flush_connection_credentials + connection do |conn| + conn.credentials.clear + end + end + # Yields the block if a connection can be established, retrying when a # connection error is raised. # diff --git a/lib/moped/retryable.rb b/lib/moped/retryable.rb index 906b094..251adc7 100644 --- a/lib/moped/retryable.rb +++ b/lib/moped/retryable.rb @@ -29,12 +29,13 @@ def with_retry(cluster, retries = cluster.max_retries, &block) begin block.call rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e - raise e if e.is_a?(Errors::PotentialReconfiguration) && - ! (e.message.include?("not master") || e.message.include?("Not primary")) + authentication_error = e.is_a?(Errors::PotentialReconfiguration) && e.message.match(/not (master|primary|authorized)/i) + raise e if e.is_a?(Errors::PotentialReconfiguration) && !authentication_error if retries > 0 Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") sleep(cluster.retry_interval) + cluster.nodes.each { |node| node.flush_connection_credentials } if authentication_error cluster.refresh with_retry(cluster, retries - 1, &block) else