diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala index 1aea5e5e74..a2b677f1df 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.api.SqlDialect.{DEFAULT, HIVE} import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} object FlinkSqlValidator extends Logger { @@ -45,10 +45,11 @@ object FlinkSqlValidator extends Logger { def getConfig(sqlDialect: SqlDialect): Config = { val conformance = sqlDialect match { case HIVE => - try { - FlinkSqlConformance.HIVE - } catch { - case _ => FlinkSqlConformance.DEFAULT + Try(FlinkSqlConformance.HIVE) match { + case Success(v) => v + // for flink 1.18+ + case Failure(_: NoSuchFieldError) => FlinkSqlConformance.DEFAULT + case Failure(e) => throw new IllegalArgumentException(e) } case DEFAULT => FlinkSqlConformance.DEFAULT case _ => throw new UnsupportedOperationException(s"Unsupported sqlDialect: $sqlDialect") diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala index d0e2299703..65f715c752 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala @@ -154,6 +154,7 @@ class StreamTableContext( extraDetails: ExplainDetail*): String = tableEnv.explainSql(statement, format, extraDetails: _*) + /** @since 1.18 */ override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { tableEnv.createCatalog(catalog, catalogDescriptor) } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index afcbc4715a..e8f704f393 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -95,6 +95,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: extraDetails: ExplainDetail*): String = tableEnv.explainSql(statement, format, extraDetails: _*) + /** @since 1.18 */ override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { tableEnv.createCatalog(catalog, catalogDescriptor) }