Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add minSize to pool to maintain a minimum # of connections #1319

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setMaxWaitQueueSize(((Number)member.getValue()).intValue());
}
break;
case "minSize":
if (member.getValue() instanceof Number) {
obj.setMinSize(((Number)member.getValue()).intValue());
}
break;
case "name":
if (member.getValue() instanceof String) {
obj.setName((String)member.getValue());
Expand Down Expand Up @@ -104,6 +109,7 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
}
json.put("maxSize", obj.getMaxSize());
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
json.put("minSize", obj.getMinSize());
if (obj.getName() != null) {
json.put("name", obj.getName());
}
Expand Down
31 changes: 31 additions & 0 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@DataObject(generateConverter = true)
public class PoolOptions {

/**
* The default minimum number of connections a client will keep open in the pool = 0
*/
public static final int DEFAULT_MIN_SIZE = 0;

/**
* The default maximum number of connections a client will pool = 4
*/
Expand Down Expand Up @@ -92,6 +97,7 @@ public class PoolOptions {
*/
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;

private int minSize = DEFAULT_MIN_SIZE;
private int maxSize = DEFAULT_MAX_SIZE;
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
Expand All @@ -113,6 +119,7 @@ public PoolOptions(JsonObject json) {
}

public PoolOptions(PoolOptions other) {
minSize = other.minSize;
maxSize = other.maxSize;
maxWaitQueueSize = other.maxWaitQueueSize;
idleTimeout = other.idleTimeout;
Expand All @@ -122,6 +129,30 @@ public PoolOptions(PoolOptions other) {
eventLoopSize = other.eventLoopSize;
}

/**
* @return the minimum pool size
*/
public int getMinSize() {
return minSize;
}

/**
* Set the minimum pool size
*
* @param minSize the minimum pool size
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMinSize(int minSize) {
if (minSize < 0) {
throw new IllegalArgumentException("Min size cannot be negative");
}
if (minSize > maxSize) {
throw new IllegalArgumentException("Min size cannot be greater than max size");
}
this.minSize = minSize;
return this;
}

/**
* @return the maximum pool size
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public PoolImpl(VertxInternal vertx,
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer,
afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMinSize(), poolOptions.getMaxSize(),
pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
}

Expand All @@ -77,10 +79,11 @@ public Pool init() {
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
synchronized (this) {
timerID = vertx.setTimer(cleanerPeriod, id -> {
runEviction();
runInvariantsCheck();
});
}
}
pool.checkMin(connectionTimeout);
return this;
}

Expand All @@ -92,17 +95,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
return this;
}

private void runEviction() {
private void runInvariantsCheck() {
synchronized (this) {
if (timerID == -1) {
// Cancelled
return;
}
timerID = vertx.setTimer(cleanerPeriod, id -> {
runEviction();
runInvariantsCheck();
});
}
pool.evict();
pool.checkInvariants(connectionTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SqlConnectionPool {
private final boolean pipelined;
private final long idleTimeout;
private final long maxLifetime;
private final int minSize;
private final int maxSize;

public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
Expand All @@ -63,10 +64,17 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
VertxInternal vertx,
long idleTimeout,
long maxLifetime,
int minSize,
int maxSize,
boolean pipelined,
int maxWaitQueueSize,
int eventLoopSize) {
if (minSize < 0) {
throw new IllegalArgumentException("Pool min size must be > 0");
}
if (minSize > maxSize) {
throw new IllegalArgumentException("Pool min size must be <= max size");
}
if (maxSize < 1) {
throw new IllegalArgumentException("Pool max size must be > 0");
}
Expand All @@ -78,6 +86,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
this.pipelined = pipelined;
this.idleTimeout = idleTimeout;
this.maxLifetime = maxLifetime;
this.minSize = minSize;
this.maxSize = maxSize;
this.hook = hook;
this.connectionProvider = connectionProvider;
Expand Down Expand Up @@ -145,18 +154,32 @@ public int size() {
return pool.size();
}

public void evict() {
public void checkInvariants(long connectionTimeout) {
long now = System.currentTimeMillis();
pool.evict(conn -> conn.shouldEvict(now), ar -> {
if (ar.succeeded()) {
List<PooledConnection> res = ar.result();
for (PooledConnection conn : res) {
conn.close(Promise.promise());
}
checkMin(connectionTimeout);
}
});
}

public void checkMin(long connectionTimeout) {
if (pool.size() < minSize) {
ContextInternal context = vertx.getOrCreateContext();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue here is that we might create connections with a context that is not best for the application and force context switching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you're referring to vertx.getOrCreateContext(). How would you solve this? Pass in a context?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tsegismont @vietj, any comments?

for (int i = 0; i < minSize; ++i) {
acquire(context, connectionTimeout, (ar) -> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there should be a flavor of acquire that forces new connection creation. Thinking of the following scenario:

If the maxPoolSize is 16, minSize is 8, and let's say steady state connection usage is 4 (so pool.size() will be approximately 4). In this case, is it possible that some/all needed calls to acquire (i.e. needed = 8 - 4 = 4) end up getting one or more of the steady state connections when they are not in use, and as a result, < 4 (or even 0) new connections get created, and pool.size() doesn't reach close to 8 (or worse, remains at 4)?

@tsegismont is this a valid concern?

Copy link
Contributor Author

@kdubb kdubb Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over time (e.g a period of 10 - 30 seconds) this brings up the required number of connections and keeps it there.

Even though I currently don't see this behavior your example and concern may be valid. There is a simpler fix though, just pass the required minimum to the evict function; it can easily stop the eviction test when the minimum would be breached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a change to vert.x core.

Copy link
Contributor Author

@kdubb kdubb Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR with a simpler fix. Instead of calculating a needed value, it now checks if the pool is "low" and acquires minSize connections immediately (in parallel).

This seems to work pretty well and ensures that minSize will be honored at all times. The downside is that it may start more than minSize connections depending on if any of the connections are in use.

In practice this "downside" doesn't appear that often. Remember that to trigger the acquisition of connections the pool has to be in some kind of "idle" state because connections are idling and falling below the pool minimum. Worst case a few extra connections are activated for a short period of before idling away.

In the end the implementation is somewhere between a minSize and minAvailable depending on the state of the pool and I'm pretty ok with that.

One thing to remember when discussing this is that the pool does wait a beat to see if a connection becomes available; it doesn't immediately start a connection when an acquisition request cannot be immediately fulfilled.

Looking back to your scenario of current pool size of 4 and a minSize of 8. The pool wold attempt to acquire 8 connections. In the case where all 4 connections are in-use, the pool would attempt to acquire 8 connections, potentially bumping it to 12 connections; but only if none of the connections are recycled while the pool is acquiring the connections.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think this should work for most cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the asynchronous nature of acquire, we shall wait for all operations to complete before recycling the lease. Otherwise, there is no guarantee that some of the operations will complete before all operations are submitted.

if (ar.succeeded()) {
ar.result().cleanup(Promise.promise());
}
});
}
}
}

public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
Promise<Lease<PooledConnection>> p = context.promise();
pool.acquire(context, 0, p);
Expand Down