Skip to content

Commit

Permalink
Merge pull request #305 from ydb-platform/query_table_client
Browse files Browse the repository at this point in the history
TableClient implementation with QueryClient under hood
  • Loading branch information
alex268 authored Aug 20, 2024
2 parents fd38171 + bfbba60 commit f86645f
Show file tree
Hide file tree
Showing 9 changed files with 1,305 additions and 6 deletions.
6 changes: 6 additions & 0 deletions query/src/main/java/tech/ydb/query/QueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.query.impl.QueryClientImpl;
import tech.ydb.query.impl.TableClientImpl;
import tech.ydb.table.TableClient;

/**
*
Expand All @@ -22,6 +24,10 @@ static Builder newClient(@WillNotClose GrpcTransport transport) {
return QueryClientImpl.newClient(transport);
}

static TableClient.Builder newTableClient(@WillNotClose GrpcTransport transport) {
return TableClientImpl.newClient(transport);
}

/**
* Return a future with {@link QuerySession} for further work. The session will be taken from the session pool if
* it has any idle session or will be created on the fly
Expand Down
4 changes: 4 additions & 0 deletions query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public void updatePoolMaxSize(int maxSize) {
pool.updateMaxSize(maxSize);
}

SessionPool getSessionPool() {
return pool;
}

@Override
public void close() {
pool.close();
Expand Down
3 changes: 1 addition & 2 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ abstract class StreamImpl implements QueryStream {
}

abstract void handleTxMeta(YdbQuery.TransactionMeta meta);
void handleCompletion(Status status, Throwable th) {
}
void handleCompletion(Status status, Throwable th) { }

@Override
public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
Expand Down
102 changes: 100 additions & 2 deletions query/src/main/java/tech/ydb/query/impl/SessionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
Expand All @@ -26,6 +27,7 @@
import tech.ydb.query.settings.AttachSessionSettings;
import tech.ydb.query.settings.CreateSessionSettings;
import tech.ydb.query.settings.DeleteSessionSettings;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.impl.pool.WaitingQueue;


Expand Down Expand Up @@ -54,6 +56,7 @@ class SessionPool implements AutoCloseable {
private final ScheduledExecutorService scheduler;
private final WaitingQueue<PooledQuerySession> queue;
private final ScheduledFuture<?> cleanerFuture;
private final StatsImpl stats = new StatsImpl();

SessionPool(Clock clock, QueryServiceRpc rpc, ScheduledExecutorService scheduler, int minSize, int maxSize,
Duration idleDuration) {
Expand All @@ -79,6 +82,10 @@ public void updateMaxSize(int maxSize) {
this.queue.updateLimits(maxSize);
}

SessionPoolStats getStats() {
return stats;
}

@Override
public void close() {
logger.info("closing QuerySession pool");
Expand Down Expand Up @@ -140,6 +147,7 @@ private boolean tryComplete(CompletableFuture<Result<QuerySession>> future, Pool
return false;
}

stats.acquired.increment();
return true;
}

Expand All @@ -155,6 +163,7 @@ private class PooledQuerySession extends SessionImpl {
super(rpc, response);
this.lastActive = clock.instant();
this.attachStream = attach(ATTACH_SETTINGS);
stats.created.increment();
}

@Override
Expand Down Expand Up @@ -240,6 +249,7 @@ public void destroy() {

@Override
public void close() {
stats.released.increment();
if (isBroken || isStopped) {
queue.delete(this);
} else {
Expand All @@ -257,15 +267,22 @@ private class Handler implements WaitingQueue.Handler<PooledQuerySession> {

@Override
public CompletableFuture<PooledQuerySession> create() {
stats.requested.increment();
return SessionImpl
.createSession(rpc, CREATE_SETTINGS, true)
.thenApply(Result::getValue)
.thenCompose(resp -> new PooledQuerySession(rpc, resp).start())
.thenCompose(r -> {
if (!r.isSuccess()) {
stats.failed.increment();
throw new UnexpectedResultException("create session problem", r.getStatus());
}
return new PooledQuerySession(rpc, r.getValue()).start();
})
.thenApply(Result::getValue);
}

@Override
public void destroy(PooledQuerySession session) {
stats.deleted.increment();
session.destroy();
}
}
Expand Down Expand Up @@ -296,6 +313,87 @@ public void run() {
}
}

private class StatsImpl implements SessionPoolStats {
private final LongAdder acquired = new LongAdder();
private final LongAdder released = new LongAdder();

private final LongAdder requested = new LongAdder();
private final LongAdder failed = new LongAdder();
private final LongAdder created = new LongAdder();
private final LongAdder deleted = new LongAdder();

@Override
public int getMinSize() {
return minSize;
}

@Override
public int getMaxSize() {
return queue.getTotalLimit();
}

@Override
public int getIdleCount() {
return queue.getIdleCount();
}

@Override
public int getAcquiredCount() {
return queue.getUsedCount();
}

@Override
public int getPendingAcquireCount() {
return queue.getWaitingCount() + queue.getPendingCount();
}

@Override
public long getAcquiredTotal() {
return acquired.sum();
}

@Override
public long getReleasedTotal() {
return released.sum();
}

@Override
public long getRequestedTotal() {
return requested.sum();
}

@Override
public long getCreatedTotal() {
return created.sum();
}

@Override
public long getFailedTotal() {
return failed.sum();
}

@Override
public long getDeletedTotal() {
return deleted.sum();
}

@Override
public String toString() {
return "SessionPoolStats{minSize=" + getMinSize()
+ ", maxSize=" + getMaxSize()
+ ", idleCount=" + getIdleCount()
+ ", acquiredCount=" + getAcquiredCount()
+ ", pendingAcquireCount=" + getPendingAcquireCount()
+ ", acquiredTotal=" + getAcquiredTotal()
+ ", releasedTotal=" + getReleasedTotal()
+ ", requestsTotal=" + getRequestedTotal()
+ ", createdTotal=" + getCreatedTotal()
+ ", failedTotal=" + getFailedTotal()
+ ", deletedTotal=" + getDeletedTotal()
+ "}";
}
}

/**
* This is the part based on the code written by Doug Lea with assistance from members
* of JCP JSR-166 Expert Group and released to the public domain, as explained at
Expand Down
Loading

0 comments on commit f86645f

Please sign in to comment.