Skip to content

Commit

Permalink
commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
s7monk committed Jul 10, 2024
1 parent 1d4e874 commit 9f511b4
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public interface Executor {
* Executes an SQL statement.
*
* @param statement The SQL statement to be executed.
* @param maxRows The maximum number of rows to return in the result set.
* @return SubmitResult containing information about the execution result.
* @throws Exception if there is an error executing the SQL statement.
*/
ExecutionResult executeSql(String statement) throws Exception;
ExecutionResult executeSql(String statement, int maxRows) throws Exception;

/**
* Fetches the results of a previously submitted SQL statement execution.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.common.parser;

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;

/** CustomSqlParser to parse Sql list. */
public class CustomSqlParser {

private static final SqlParser.Config config;
private final SqlParser parser;
private final int limit;

private static final int DEFAULT_LIMIT = 500;

static {
config =
SqlParser.config()
.withParserFactory(FlinkSqlParserImpl.FACTORY)
.withConformance(FlinkSqlConformance.DEFAULT)
.withLex(Lex.JAVA)
.withIdentifierMaxLength(256);
}

public CustomSqlParser(String sql) {
this(sql, DEFAULT_LIMIT);
}

public CustomSqlParser(String sql, int limit) {
this.parser = SqlParser.create(sql, config);
this.limit = limit;
}

public SqlNodeList parseStmtList() throws SqlParseException {
SqlNodeList nodeList = parser.parseStmtList();
for (SqlNode node : nodeList) {
if (node instanceof SqlSelect) {
SqlSelect select = (SqlSelect) node;
if (!hasAggregateOrGroupBy(select) && select.getFetch() == null) {
SqlLiteral sqlLiteral =
SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO);
select.setFetch(sqlLiteral);
}
}
}
return nodeList;
}

private boolean hasAggregateOrGroupBy(SqlSelect select) {
if (select.getGroup() != null && !select.getGroup().isEmpty()) {
return true;
}
return containsComplexOperations(select.getSelectList());
}

private boolean containsComplexOperations(SqlNodeList nodes) {
if (nodes != null) {
for (SqlNode node : nodes) {
if (!(node instanceof SqlIdentifier)) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.common.parser;

import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.jupiter.api.Test;

import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement1;
import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement2;
import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement3;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests of {@link CustomSqlParser}. */
public class CustomSqlParserTest {

@Test
public void testParse() throws SqlParseException {
CustomSqlParser customSqlParser = new CustomSqlParser(statement1);
SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
assertThat(sqlNodeList.size()).isEqualTo(5);
}

@Test
public void testSelectLimit() throws SqlParseException {
CustomSqlParser customSqlParser = new CustomSqlParser(statement2);
String actual = customSqlParser.parseStmtList().get(2).toString();
assertThat(actual)
.isEqualToIgnoringWhitespace("SELECT * FROM `t_order` FETCH NEXT 500 ROWS ONLY");
}

@Test
public void testSelectWithoutLimit() throws SqlParseException {
CustomSqlParser customSqlParser = new CustomSqlParser(statement3);
String actual = customSqlParser.parseStmtList().get(2).toString();
assertThat(actual).isEqualToIgnoringWhitespace("SELECT COUNT(*) FROM `t_order`");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.common.parser;

/** Statements constant. */
public class StatementsConstant {

public static String statement1 =
"DROP TABLE IF EXISTS t_order;\n"
+ "CREATE TABLE IF NOT EXISTS t_order(\n"
+ " --order id\n"
+ " `order_id` BIGINT,\n"
+ " --product\n"
+ " `product` BIGINT,\n"
+ " --amount\n"
+ " `amount` BIGINT,\n"
+ " --payment time\n"
+ " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n"
+ " --WATERMARK\n"
+ " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n"
+ ") WITH(\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.order_id.min' = '1',\n"
+ " 'fields.order_id.max' = '2',\n"
+ " 'fields.amount.min' = '1',\n"
+ " 'fields.amount.max' = '10',\n"
+ " 'fields.product.min' = '1',\n"
+ " 'fields.product.max' = '2'\n"
+ ");\n"
+ "-- SELECT * FROM t_order LIMIT 10;\n"
+ "DROP TABLE IF EXISTS sink_table;\n"
+ "CREATE TABLE IF NOT EXISTS sink_table(\n"
+ " --product\n"
+ " `product` BIGINT,\n"
+ " --amount\n"
+ " `amount` BIGINT,\n"
+ " --payment time\n"
+ " `order_time` TIMESTAMP(3),\n"
+ " `one_minute_sum` BIGINT\n"
+ ") WITH('connector' = 'print');\n"
+ "\n"
+ "INSERT INTO\n"
+ " sink_table\n"
+ "SELECT\n"
+ " product,\n"
+ " amount,\n"
+ " order_time,\n"
+ " 0 as one_minute_sum\n"
+ "FROM\n"
+ " t_order;";

public static String statement2 =
"DROP TABLE IF EXISTS t_order;\n"
+ "CREATE TABLE IF NOT EXISTS t_order(\n"
+ " --order id\n"
+ " `order_id` BIGINT,\n"
+ " --product\n"
+ " `product` BIGINT,\n"
+ " --amount\n"
+ " `amount` BIGINT,\n"
+ " --payment time\n"
+ " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n"
+ " --WATERMARK\n"
+ " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n"
+ ") WITH(\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.order_id.min' = '1',\n"
+ " 'fields.order_id.max' = '2',\n"
+ " 'fields.amount.min' = '1',\n"
+ " 'fields.amount.max' = '10',\n"
+ " 'fields.product.min' = '1',\n"
+ " 'fields.product.max' = '2'\n"
+ ");\n"
+ "SELECT * FROM t_order;";
public static String statement3 =
"DROP TABLE IF EXISTS t_order;\n"
+ "CREATE TABLE IF NOT EXISTS t_order(\n"
+ " --order id\n"
+ " `order_id` BIGINT,\n"
+ " --product\n"
+ " `product` BIGINT,\n"
+ " --amount\n"
+ " `amount` BIGINT,\n"
+ " --payment time\n"
+ " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n"
+ " --WATERMARK\n"
+ " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n"
+ ") WITH(\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.order_id.min' = '1',\n"
+ " 'fields.order_id.max' = '2',\n"
+ " 'fields.amount.min' = '1',\n"
+ " 'fields.amount.max' = '10',\n"
+ " 'fields.product.min' = '1',\n"
+ " 'fields.product.max' = '2'\n"
+ ");\n"
+ "SELECT count(*) FROM t_order;";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import org.apache.paimon.web.engine.flink.common.executor.Executor;
import org.apache.paimon.web.engine.flink.common.operation.FlinkSqlOperationType;
import org.apache.paimon.web.engine.flink.common.parser.StatementParser;
import org.apache.paimon.web.engine.flink.common.parser.CustomSqlParser;
import org.apache.paimon.web.engine.flink.common.result.ExecutionResult;
import org.apache.paimon.web.engine.flink.common.result.FetchResultParams;
import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.CollectResultUtil;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.FlinkSqlStatementSetBuilder;

import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;

Expand All @@ -53,16 +55,21 @@ public FlinkSqlGatewayExecutor(SessionEntity session) throws Exception {
}

@Override
public ExecutionResult executeSql(String multiStatement) throws Exception {
String[] statements = StatementParser.parse(multiStatement);
public ExecutionResult executeSql(String multiStatement, int maxRows) throws Exception {
CustomSqlParser customSqlParser =
maxRows > 0
? new CustomSqlParser(multiStatement, maxRows)
: new CustomSqlParser(multiStatement);
SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
List<String> insertStatements = new ArrayList<>();
ExecutionResult executionResult = null;

for (String statement : statements) {
FlinkSqlOperationType operationType = FlinkSqlOperationType.getOperationType(statement);
for (SqlNode sqlNode : sqlNodeList) {
FlinkSqlOperationType operationType =
FlinkSqlOperationType.getOperationType(sqlNode.toString());

if (operationType == null) {
String operationTypeString = extractSqlOperationType(statement);
String operationTypeString = extractSqlOperationType(sqlNode.toString());
throw new UnsupportedOperationException(
"Unsupported operation type: " + operationTypeString);
}
Expand All @@ -73,17 +80,17 @@ public ExecutionResult executeSql(String multiStatement) throws Exception {
throw new UnsupportedOperationException(
"Cannot execute DQL statement with pending INSERT statements.");
}
executionResult = executeDqlStatement(statement, operationType);
executionResult = executeDqlStatement(sqlNode.toString(), operationType);
break;
case DML:
if (operationType.getType().equals(FlinkSqlOperationType.INSERT.getType())) {
insertStatements.add(statement);
insertStatements.add(sqlNode.toString());
} else {
executionResult = executeDmlStatement(statement);
executionResult = executeDmlStatement(sqlNode.toString());
}
break;
default:
client.executeStatement(session.getSessionId(), statement, null);
client.executeStatement(session.getSessionId(), sqlNode.toString(), null);
break;
}

Expand Down
Loading

0 comments on commit 9f511b4

Please sign in to comment.