Skip to content

Commit

Permalink
commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
s7monk committed Jan 22, 2024
1 parent bf46f5b commit 64586e5
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public interface Executor {
*
* @param jobId The unique identifier of the job to stop.
* @param withSavepoint If true, the job will create a savepoint before stopping.
* @return true if the job was successfully stopped, false otherwise.
* @throws Exception if the job cannot be stopped or savepoint cannot be created.
*/
boolean stop(String jobId, boolean withSavepoint) throws Exception;
void stop(String jobId, boolean withSavepoint) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class SqlGatewayClient {
private static final String DEFAULT_SESSION_NAME_PREFIX = "FLINK_SQL_GATEWAY_SESSION";
private static final int REQUEST_WAITE_TIME = 1000;
private static final int ACTIVE_STATUS = 1;
private static final int INACTIVE_STATUS = 0;

private final SqlGateWayRestClient restClient;
private final String sqlGatewayHost;
Expand Down Expand Up @@ -127,32 +126,23 @@ public void configureSession(String sessionId, String statement, Long timeout)
.get();
}

public boolean closeSession(String sessionId) {
try {
restClient
.sendRequest(
CloseSessionHeaders.getInstance(),
new SessionMessageParameters(buildSessionHandleBySessionId(sessionId)),
EmptyRequestBody.getInstance())
.get();
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
public String closeSession(String sessionId) throws Exception {
return restClient
.sendRequest(
CloseSessionHeaders.getInstance(),
new SessionMessageParameters(buildSessionHandleBySessionId(sessionId)),
EmptyRequestBody.getInstance())
.get()
.getStatus();
}

public int triggerSessionHeartbeat(String sessionId) {
try {
restClient
.sendRequest(
TriggerSessionHeartbeatHeaders.getInstance(),
new SessionMessageParameters(buildSessionHandleBySessionId(sessionId)),
EmptyRequestBody.getInstance())
.get();
return ACTIVE_STATUS;
} catch (Exception e) {
return INACTIVE_STATUS;
}
public void triggerSessionHeartbeat(String sessionId) throws Exception {
restClient
.sendRequest(
TriggerSessionHeartbeatHeaders.getInstance(),
new SessionMessageParameters(buildSessionHandleBySessionId(sessionId)),
EmptyRequestBody.getInstance())
.get();
}

public String executeStatement(String sessionId, String statement, @Nullable Long timeout)
Expand Down Expand Up @@ -209,7 +199,7 @@ public FetchResultsResponseBody fetchResults(String sessionId, String operationI
return fetchResultsResponseBody;
}

private String getOperationStatus(String sessionId, String operationId) throws Exception {
public String getOperationStatus(String sessionId, String operationId) throws Exception {
return restClient
.sendRequest(
GetOperationStatusHeaders.getInstance(),
Expand All @@ -221,36 +211,28 @@ private String getOperationStatus(String sessionId, String operationId) throws E
.getStatus();
}

public boolean cancelOperation(String sessionId, String operationId) {
try {
restClient
.sendRequest(
CancelOperationHeaders.getInstance(),
new OperationMessageParameters(
buildSessionHandleBySessionId(sessionId),
buildOperationHandleByOperationId(operationId)),
EmptyRequestBody.getInstance())
.get();
return true;
} catch (Exception e) {
return false;
}
public String cancelOperation(String sessionId, String operationId) throws Exception {
return restClient
.sendRequest(
CancelOperationHeaders.getInstance(),
new OperationMessageParameters(
buildSessionHandleBySessionId(sessionId),
buildOperationHandleByOperationId(operationId)),
EmptyRequestBody.getInstance())
.get()
.getStatus();
}

public boolean closeOperation(String sessionId, String operationId) {
try {
restClient
.sendRequest(
CloseOperationHeaders.getInstance(),
new OperationMessageParameters(
buildSessionHandleBySessionId(sessionId),
buildOperationHandleByOperationId(operationId)),
EmptyRequestBody.getInstance())
.get();
return true;
} catch (Exception e) {
return false;
}
public String closeOperation(String sessionId, String operationId) throws Exception {
return restClient
.sendRequest(
CloseOperationHeaders.getInstance(),
new OperationMessageParameters(
buildSessionHandleBySessionId(sessionId),
buildOperationHandleByOperationId(operationId)),
EmptyRequestBody.getInstance())
.get()
.getStatus();
}

private SessionHandle buildSessionHandleBySessionId(String sessionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,8 +40,6 @@
/** The flink sql gateway implementation of the {@link Executor}. */
public class FlinkSqlGatewayExecutor implements Executor {

private static final Logger log = LoggerFactory.getLogger(FlinkSqlGatewayExecutor.class);

private static final Long DEFAULT_FETCH_TOKEN = 0L;
private static final String STOP_JOB_BASE_SQL = "STOP JOB '%s'";
private static final String WITH_SAVEPOINT = " WITH SAVEPOINT";
Expand Down Expand Up @@ -161,18 +157,11 @@ public ExecutionResult fetchResults(FetchResultParams params) throws Exception {
}

@Override
public boolean stop(String jobId, boolean withSavepoint) throws Exception {
try {
StringBuilder sqlBuilder = new StringBuilder(String.format(STOP_JOB_BASE_SQL, jobId));
if (withSavepoint) {
sqlBuilder.append(WITH_SAVEPOINT);
}
client.executeStatement(session.getSessionId(), sqlBuilder.toString(), null);
return true;
} catch (Exception e) {
log.error(
"Failed to stop job with job ID: {}. Savepoint: {}.", jobId, withSavepoint, e);
throw new SqlExecutionException("Failed to stop job with job ID: " + jobId, e);
public void stop(String jobId, boolean withSavepoint) throws Exception {
StringBuilder sqlBuilder = new StringBuilder(String.format(STOP_JOB_BASE_SQL, jobId));
if (withSavepoint) {
sqlBuilder.append(WITH_SAVEPOINT);
}
client.executeStatement(session.getSessionId(), sqlBuilder.toString(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@

package org.apache.paimon.web.engine.flink.sql.gateway.utils;

import java.util.List;

/**
* Utility class for formatting SQL exception messages. This class provides static methods to format
* exception messages for SQL queries and batch SQL statements, ensuring that long SQL texts are
* truncated to a predefined maximum length.
* exception messages for SQL statements, ensuring that long SQL texts are truncated to a predefined
* maximum length.
*/
public class FormatSqlExceptionUtil {

private static final int MAX_SQL_DISPLAY_LENGTH = 500;

public static String formatSqlExceptionMessage(String sql) {
return String.format("Failed to execute query statement: '%s'", formatSql(sql));
}

public static String formatSqlBatchExceptionMessage(List<String> sqlStatements) {
String combinedStatements = String.join("; ", sqlStatements);
return String.format(
"Failed to execute insert statements: %s", formatSql(combinedStatements));
return String.format("Failed to execute sql statement: '%s'", formatSql(sql));
}

private static String formatSql(String sql) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,89 @@
import org.apache.paimon.web.engine.flink.sql.gataway.TestBase;
import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
import org.junit.jupiter.api.BeforeAll;

import org.apache.commons.collections.MapUtils;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Test for {@link SqlGatewayClient}. */
public class SqlGatewayClientTest extends TestBase {

static SqlGatewayClient client;
SqlGatewayClient client;
SessionEntity session;

private static final String SESSION_NAME = "test_session";

@BeforeAll
static void before () throws Exception {
@BeforeEach
void before() throws Exception {
client = new SqlGatewayClient(targetAddress, port);
session = client.openSession(SESSION_NAME);
}

@Test
public void testGetSessionConfig() throws Exception {
Map<String, String> sessionConfig = client.getSessionConfig(session.getSessionId());
assertTrue(MapUtils.isNotEmpty(sessionConfig));
}

@Test
public void testCloseSession() throws Exception {
String status = client.closeSession(session.getSessionId());
assertEquals("CLOSED", status);
}

@Test
public void testExecuteStatement() throws Exception {
String operationHandle = client.executeStatement(session.getSessionId(), "SELECT 1", null);
assertNotNull(operationHandle);
}

@Test
public void testOpenSession() throws Exception {
SessionEntity session = client.openSession("test");
assertNotNull(session);
public void testCompleteStatementHints() throws Exception {
List<String> list = client.completeStatementHints(session.getSessionId(), "CREATE TA");
assertFalse(list.isEmpty());
}

@Test
public void testFetchResults() throws Exception {
String operationHandle = client.executeStatement(session.getSessionId(), "SELECT 1", null);
FetchResultsResponseBody fetchResultsResponseBody =
client.fetchResults(session.getSessionId(), operationHandle, 0);
assertNotNull(fetchResultsResponseBody);
assertEquals("PAYLOAD", fetchResultsResponseBody.getResultType().name());
FetchResultsResponseBody fetchResultsResponseBodyNext =
client.fetchResults(session.getSessionId(), operationHandle, 1);
assertNotNull(fetchResultsResponseBodyNext);
assertEquals("EOS", fetchResultsResponseBodyNext.getResultType().name());
}

@Test
public void testGetOperationStatus() throws Exception {
String operationHandle = client.executeStatement(session.getSessionId(), "SELECT 1", null);
String operationStatus = client.getOperationStatus(session.getSessionId(), operationHandle);
assertNotNull(operationStatus);
}

@Test
public void testCancelOperation() throws Exception {
String operationHandle = client.executeStatement(session.getSessionId(), "SELECT 1", null);
String status = client.cancelOperation(session.getSessionId(), operationHandle);
assertEquals("CANCELED", status);
}

@Test
public void testCloseOperation() throws Exception {
String operationHandle = client.executeStatement(session.getSessionId(), "SELECT 1", null);
String status = client.closeOperation(session.getSessionId(), operationHandle);
assertEquals("CLOSED", status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,31 @@

package org.apache.paimon.web.engine.flink.sql.gataway.executor;

import org.apache.paimon.web.engine.flink.sql.gataway.TestBase;
import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
import org.apache.paimon.web.engine.flink.sql.gateway.executor.FlinkSqlGatewayExecutor;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/** Test for {@link FlinkSqlGatewayExecutor}. */
public class FlinkSqlGatewayExecutorTest {}
public class FlinkSqlGatewayExecutorTest extends TestBase {

SqlGatewayClient client;
FlinkSqlGatewayExecutor executor;

private static final String SESSION_NAME = "test_session";

@BeforeEach
void before() throws Exception {
client = new SqlGatewayClient(targetAddress, port);
SessionEntity session = client.openSession(SESSION_NAME);
executor = new FlinkSqlGatewayExecutor(session);
}

@Test
public void testStop() throws Exception {
executor.stop("1", false);
}
}

0 comments on commit 64586e5

Please sign in to comment.