From 026a020dec174b91977d893644866d1ff373622b Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 17 Sep 2024 12:11:17 +0100 Subject: [PATCH 1/2] Added removing session from the pool, if client cancelled the stream --- query/src/main/java/tech/ydb/query/impl/SessionImpl.java | 6 ++++++ query/src/main/java/tech/ydb/query/impl/SessionPool.java | 1 + 2 files changed, 7 insertions(+) diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 37cc9d08..ae0ef980 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -58,6 +58,11 @@ abstract class SessionImpl implements QuerySession { YdbQuery.DeleteSessionResponse::getStatus, YdbQuery.DeleteSessionResponse::getIssuesList ); + private static final Status CANCELLED = Status.of( + StatusCode.CLIENT_CANCELLED, + Issue.of("Stream was cancelled by client, session will be removed", Issue.Severity.WARNING) + ); + private final QueryServiceRpc rpc; private final String sessionId; private final long nodeID; @@ -313,6 +318,7 @@ public CompletableFuture> execute(PartsHandler handler) { @Override public void cancel() { + updateSessionState(CANCELLED); grpcStream.cancel(); } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index b0162300..ab316a77 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -175,6 +175,7 @@ public void updateSessionState(Status status) { status.getCode() == StatusCode.INTERNAL_ERROR || status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || + status.getCode() == StatusCode.CLIENT_CANCELLED || status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE; if (isStatusBroken) { logger.warn("QuerySession[{}] broken with status {}", getId(), status); From 4f0f5fd873f28728ceec8f85e02e412abaf44a0f Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 17 Sep 2024 12:11:28 +0100 Subject: [PATCH 2/2] Updated query service tests --- .../ydb/query/impl/QueryIntegrationTest.java | 209 ++++++++++-------- 1 file changed, 121 insertions(+), 88 deletions(-) diff --git a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java index c6ddd0aa..452e7290 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java @@ -21,6 +21,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryResultPart; @@ -70,8 +71,6 @@ public Entity(int id, String name, byte[] payload, boolean isValid) { @ClassRule public final static GrpcTransportRule ydbTransport = new GrpcTransportRule(); - private static QueryClient queryClient = null; - @BeforeClass public static void initSchema() { logger.info("Prepare database..."); @@ -98,95 +97,128 @@ public static void initSchema() { retryCtx.supplyStatus(session -> session.createTable(tablePath, tableDescription)).join(); retryCtx.supplyStatus(session -> session.createTable(table2Path, table2Description)).join(); logger.info("Prepare database OK"); - - queryClient = QueryClient.newClient(ydbTransport).build(); } @AfterClass public static void dropAll() { - try { - logger.info("Clean database..."); - String tablePath = ydbTransport.getDatabase() + "/" + TEST_TABLE; - String table2Path = ydbTransport.getDatabase() + "/" + TEST_DOUBLE_TABLE; - - SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(ydbTransport)).build(); - SessionRetryContext retryCtx = SessionRetryContext.create(client).build(); - retryCtx.supplyStatus(session -> session.dropTable(tablePath)).join().isSuccess(); - retryCtx.supplyStatus(session -> session.dropTable(table2Path)).join().isSuccess(); - logger.info("Clean database OK"); - } finally { - queryClient.close(); - } + logger.info("Clean database..."); + String tablePath = ydbTransport.getDatabase() + "/" + TEST_TABLE; + String table2Path = ydbTransport.getDatabase() + "/" + TEST_DOUBLE_TABLE; + + SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(ydbTransport)).build(); + SessionRetryContext retryCtx = SessionRetryContext.create(client).build(); + retryCtx.supplyStatus(session -> session.dropTable(tablePath)).join().isSuccess(); + retryCtx.supplyStatus(session -> session.dropTable(table2Path)).join().isSuccess(); + logger.info("Clean database OK"); } @Test public void testSimpleSelect() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - QueryReader reader = QueryReader.readFrom( - session.createQuery("SELECT 2 + 3;", TxMode.SERIALIZABLE_RW) - ).join().getValue(); + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryReader reader = QueryReader.readFrom( + session.createQuery("SELECT 2 + 3;", TxMode.SERIALIZABLE_RW) + ).join().getValue(); - Assert.assertEquals(1, reader.getResultSetCount()); - ResultSetReader rs = reader.getResultSet(0); + Assert.assertEquals(1, reader.getResultSetCount()); + ResultSetReader rs = reader.getResultSet(0); - Assert.assertTrue(rs.next()); - Assert.assertEquals(1, rs.getColumnCount()); - Assert.assertEquals("column0", rs.getColumnName(0)); - Assert.assertEquals(5, rs.getColumn(0).getInt32()); + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumnCount()); + Assert.assertEquals("column0", rs.getColumnName(0)); + Assert.assertEquals(5, rs.getColumn(0).getInt32()); - Assert.assertFalse(rs.next()); + Assert.assertFalse(rs.next()); + } } } @Test @Ignore public void testSimplePrepare() { - String query = "" - + "DECLARE $id AS Int32?;\n" - + "UPSERT INTO `" + TEST_TABLE + "` (id) " - + "VALUES ($id)"; - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder() - .withExecMode(QueryExecMode.PARSE) - .build(); + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + String query = "" + + "DECLARE $id AS Int32?;\n" + + "UPSERT INTO `" + TEST_TABLE + "` (id) " + + "VALUES ($id)"; + try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder() + .withExecMode(QueryExecMode.PARSE) + .build(); - QueryReader reader = QueryReader.readFrom( - session.createQuery(query, TxMode.NONE, Params.empty(), settings) - ).join().getValue(); + QueryReader reader = QueryReader.readFrom( + session.createQuery(query, TxMode.NONE, Params.empty(), settings) + ).join().getValue(); - Assert.assertEquals(1, reader.getResultSetCount()); - ResultSetReader rs = reader.getResultSet(0); + Assert.assertEquals(1, reader.getResultSetCount()); + ResultSetReader rs = reader.getResultSet(0); - Assert.assertTrue(rs.next()); - Assert.assertEquals(1, rs.getColumnCount()); - Assert.assertEquals("column0", rs.getColumnName(0)); - Assert.assertEquals(5, rs.getColumn(0).getInt32()); + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumnCount()); + Assert.assertEquals("column0", rs.getColumnName(0)); + Assert.assertEquals(5, rs.getColumn(0).getInt32()); - Assert.assertFalse(rs.next()); + Assert.assertFalse(rs.next()); + } } } @Test public void testErrors() { - SessionImpl s1 = (SessionImpl)queryClient.createSession(Duration.ofSeconds(5)).join().getValue(); - String id = s1.getId(); - s1.close(); - - SessionImpl s2 = (SessionImpl)queryClient.createSession(Duration.ofSeconds(5)).join().getValue(); - Assert.assertEquals(id, s2.getId()); - s2.updateSessionState(Status.of(StatusCode.ABORTED)); - s2.close(); - - SessionImpl s3 = (SessionImpl)queryClient.createSession(Duration.ofSeconds(5)).join().getValue(); - Assert.assertEquals(id, s3.getId()); - s3.updateSessionState(Status.of(StatusCode.BAD_SESSION)); - s3.close(); - - SessionImpl s4 = (SessionImpl)queryClient.createSession(Duration.ofSeconds(5)).join().getValue(); - Assert.assertNotEquals(id, s4.getId()); - s4.close(); + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + SessionImpl s1 = (SessionImpl)client.createSession(Duration.ofSeconds(5)).join().getValue(); + String id = s1.getId(); + s1.close(); + + SessionImpl s2 = (SessionImpl)client.createSession(Duration.ofSeconds(5)).join().getValue(); + Assert.assertEquals(id, s2.getId()); + s2.updateSessionState(Status.of(StatusCode.ABORTED)); + s2.close(); + + SessionImpl s3 = (SessionImpl)client.createSession(Duration.ofSeconds(5)).join().getValue(); + Assert.assertEquals(id, s3.getId()); + s3.updateSessionState(Status.of(StatusCode.BAD_SESSION)); + s3.close(); + + SessionImpl s4 = (SessionImpl)client.createSession(Duration.ofSeconds(5)).join().getValue(); + Assert.assertNotEquals(id, s4.getId()); + s4.close(); + } + } + + @Test + public void testCancelStream() { + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + QuerySession s1 = client.createSession(Duration.ofSeconds(5)).join().getValue(); + String id = s1.getId(); + s1.close(); + + try (QuerySession s2 = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + Assert.assertEquals(id, s2.getId()); + s2.createQuery("SELECT 2 + 2;", TxMode.SNAPSHOT_RO).execute(this::printQuerySetPart) + .join().getStatus().expectSuccess("cannot execute query"); + } + + try (QuerySession s3 = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + Assert.assertEquals(id, s3.getId()); + final QueryStream query = s3.createQuery("SELECT 2 + 2;", TxMode.SNAPSHOT_RO); + final CompletableFuture stop = new CompletableFuture<>(); + CompletableFuture> future = query.execute(part -> { + stop.join(); + printQuerySetPart(part); + }); + query.cancel(); + stop.complete(null); + Result result = future.join(); + Assert.assertEquals(StatusCode.CLIENT_CANCELLED, result.getStatus().getCode()); + } + + try (QuerySession s4 = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + Assert.assertNotEquals(id, s4.getId()); + } + } } @Test @@ -198,35 +230,37 @@ public void testSimpleCRUD() { entities.add(new Entity(3, "dublicate", BYTES_LEN5, true)); entities.add(new Entity(5, "entity 5", BYTES_LEN2, false)); - for (Entity entity: entities) { - String query = "UPSERT INTO `" + TEST_TABLE + "` (id, name, payload, is_valid) " - + "VALUES ($id, $name, $payload, $is_valid)"; - - Params params = Params.of( - "$id", PrimitiveValue.newInt32(entity.id), - "$name", PrimitiveValue.newText(entity.name), - "$payload", PrimitiveValue.newBytes(entity.payload), - "$is_valid", PrimitiveValue.newBool(entity.isValid) - ); + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + for (Entity entity: entities) { + String query = "UPSERT INTO `" + TEST_TABLE + "` (id, name, payload, is_valid) " + + "VALUES ($id, $name, $payload, $is_valid)"; + + Params params = Params.of( + "$id", PrimitiveValue.newInt32(entity.id), + "$name", PrimitiveValue.newText(entity.name), + "$payload", PrimitiveValue.newBytes(entity.payload), + "$is_valid", PrimitiveValue.newBool(entity.isValid) + ); + + try (QuerySession session = client.createSession(SESSION_TIMEOUT).join().getValue()) { + session.createQuery(query, TxMode.SERIALIZABLE_RW, params) + .execute(this::printQuerySetPart) + .join().getStatus().expectSuccess(); + } + } - try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) { - session.createQuery(query, TxMode.SERIALIZABLE_RW, params) + try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;"; + session.createQuery(query, TxMode.SERIALIZABLE_RW) .execute(this::printQuerySetPart) .join().getStatus().expectSuccess(); } - } - - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;"; - session.createQuery(query, TxMode.SERIALIZABLE_RW) - .execute(this::printQuerySetPart) - .join().getStatus().expectSuccess(); - } - try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) { - session.createQuery("DELETE FROM " + TEST_TABLE, TxMode.SERIALIZABLE_RW) - .execute(this::printQuerySetPart) - .join().getStatus().expectSuccess(); + try (QuerySession session = client.createSession(SESSION_TIMEOUT).join().getValue()) { + session.createQuery("DELETE FROM " + TEST_TABLE, TxMode.SERIALIZABLE_RW) + .execute(this::printQuerySetPart) + .join().getStatus().expectSuccess(); + } } } @@ -238,7 +272,6 @@ public void printQuerySetPart(QueryResultPart part) { } @Test - @Ignore public void updateMultipleTablesInOneTransaction() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {