Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue when connection object is not released after error or tim… #9

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ Options to use when creating pool, defaults match those used by node-pool.
- `max_connections` - Default: `1` - Max number of connections to keep open at any given time
- `min_connections` - Default: `0` - Min number of connections to keep open at any given time
- `idle_timeout` - Default: `30000` - Time (ms) to wait until closing idle connections
- `ttl` - Default: `undefined` - Time (+/-50%) (ms) past which to destroy the connection. For more explanation, see below
- `ssl` - Default: `false` - If true, SSL/TLS connection will be used.

### ttl

When using load balancers such as Amazon's ELB service, a common deploy strategy is to deploy new instances with new versions of your code, attach them to the load balancer, and detach the old ones.
This doesn't work very well with persistent connections, however.
New connections go to the new servers, but any open connections to the old servers remain and are eventually closed by the server when the servers turn off.
This causes any in-flight requests on those connections to error with `Error: Thrift-pool: Connection closed`.

The `ttl` option treats any connection older than `ttl` (+/-50%) as invalid, so if something tries to acquite it, they'll get a more recent connection instead.
If you set this time lower than the [connection drain timeout](http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/config-conn-drain.html) of your load balancer, you can guarantee that all connections to the old servers are closed before the servers are stopped.

We use some amount of randomness (+/- 50%) of this time to ensure that all of your connections are not marked invalid simultaneously.

As an example, we have our connection drain timeout set to five minutes, so when you're removing instances from the ELB, any connections older than five minutes are just cut off.
Based on that, we set our `ttl` to two minutes, and guarantee that all connections will be destroyed sometime after one to three minutes.

=======


## Thrift options - optional
All thrift options are supported and can be passed in as an object in addition to the pooling options.
Expand Down
18 changes: 16 additions & 2 deletions lib/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ CLOSE_MESSAGE = "Thrift-pool: Connection closed"
# @param thrift_options, passed to thrift connection,
create_cb = (thrift, pool_options, thrift_options, cb) ->
cb = _.once cb
connection = thrift.createConnection pool_options.host, pool_options.port, thrift_options
pool_options.ssl ?= false
if pool_options.ssl
connection = thrift.createSSLConnection pool_options.host, pool_options.port, thrift_options
else
connection = thrift.createConnection pool_options.host, pool_options.port, thrift_options
connection.__ended = false
if pool_options.ttl?
connection.__reap_time = Date.now() + _.random (pool_options.ttl / 2), (pool_options.ttl * 1.5)
connection.on "connect", ->
debug "in connect callback"
connection.connection.setKeepAlive(true)
Expand Down Expand Up @@ -50,7 +56,9 @@ create_pool = (thrift, pool_options = {}, thrift_options = {}) ->
connection.end()
validate: (connection) ->
debug "in validate"
not connection.__ended
return false if connection.__ended
return true unless pool_options.ttl?
connection.__reap_time < Date.now()
log: pool_options.log
max: pool_options.max_connections
min: pool_options.min_connections
Expand Down Expand Up @@ -106,12 +114,18 @@ module.exports = (thrift, service, pool_options = {}, thrift_options = {}) ->
cb = _.once cb
cb_error = (err) ->
debug "in error callback, post-acquire listener"
remove_listeners connection, cb_error, cb_timeout, cb_close
pool.release connection
cb err
cb_timeout = ->
debug "in timeout callback, post-acquire listener"
remove_listeners connection, cb_error, cb_timeout, cb_close
pool.release connection
cb new Error TIMEOUT_MESSAGE
cb_close = ->
debug "in close callback, post-acquire listener"
remove_listeners connection, cb_error, cb_timeout, cb_close
pool.release connection
cb new Error CLOSE_MESSAGE
add_listeners connection, cb_error, cb_timeout, cb_close
client = thrift.createClient service, connection
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-thrift-pool",
"version": "1.1.0",
"version": "1.2.0",
"description": "A module that wraps thrift interfaces in connection pooling logic to make them more resilient.",
"main": "index.js",
"scripts": {
Expand Down
13 changes: 12 additions & 1 deletion test/unit.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,29 @@ describe 'create_pool unit', ->
# - connection is valid, it is returned
# - connection is not valid, it is destroyed, new connection is returned.
it 'properly validates a connection', (done) ->
pool = _private.create_pool @thrift, @options
pool = _private.create_pool @thrift, _.extend {ttl: 100}, @options
async.series [
(cb) =>
# Connection is valid and is released
@mock_connection.end.reset()
setImmediate => @mock_connection.emit "connect"
@thrift.createConnection.reset()
@mock_connection.__reap_time = Date.now() - 1
pool.acquire (err, connection) =>
@assert_valid err, connection
assert.equal @mock_connection.end.called, false
pool.release connection
cb()
(cb) =>
# Connection is invalid due to TTL and is destroyed and a
# new connection is created and returned
@mock_connection.end.reset()
setImmediate => @mock_connection.emit "connect"
@thrift.createConnection.reset()
@mock_connection.__reap_time = Date.now() + 1
assert.equal pool.getPoolSize(), 1
assert.equal pool.availableObjectsCount(), 1
@acquire_destroys pool, cb
(cb) =>
# Connection in pool is marked invalid, destroyed, and a
# new connection is created and returned
Expand Down