From 0a36984d1ae15dd141bbe5dbfcd59b8d0b0c5f96 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 24 Oct 2023 17:49:26 +0800 Subject: [PATCH] [Feature] dev-2.1.2 Support Flink 1.18 --- .../streampark/common/conf/FlinkVersion.scala | 2 +- .../console/core/runner/EnvInitializer.java | 2 +- .../streampark-flink-shims/pom.xml | 1 + .../flink/core/FlinkSqlValidator.scala | 9 +- .../streampark-flink-shims_flink-1.18/pom.xml | 154 +++++++++++++++++ .../flink/core/FlinkClusterClient.scala | 49 ++++++ .../flink/core/FlinkKubernetesClient.scala | 31 ++++ .../flink/core/StreamTableContext.scala | 161 ++++++++++++++++++ .../streampark/flink/core/TableContext.scala | 103 +++++++++++ .../streampark/flink/core/TableExt.scala | 42 +++++ 10 files changed, 551 insertions(+), 3 deletions(-) create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index 3dd78d51ec..b6e5e6a908 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg def checkVersion(throwException: Boolean = true): Boolean = { version.split("\\.").map(_.trim.toInt) match { - case Array(1, v, _) if v >= 12 && v <= 17 => true + case Array(1, v, _) if v >= 12 && v <= 18 => true case _ => if (throwException) { throw new UnsupportedOperationException(s"Unsupported flink version: $version") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index c2f4053436..5707e8e64e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -69,7 +69,7 @@ public class EnvInitializer implements ApplicationRunner { private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile( - "^streampark-flink-shims_flink-(1.1[2-7])_(2.11|2.12)-(.*).jar$", + "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL); @Override diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml index 47dd29be61..b67ab24837 100644 --- a/streampark-flink/streampark-flink-shims/pom.xml +++ b/streampark-flink/streampark-flink-shims/pom.xml @@ -44,6 +44,7 @@ streampark-flink-shims_flink-1.15 streampark-flink-shims_flink-1.16 streampark-flink-shims_flink-1.17 + streampark-flink-shims_flink-1.18 diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala index 721a143def..70101672ef 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala @@ -53,7 +53,14 @@ object FlinkSqlValidator extends Logger { TableConfigOptions.TABLE_SQL_DIALECT, sqlDialect.name().toLowerCase()) val conformance = sqlDialect match { - case HIVE => FlinkSqlConformance.HIVE + case HIVE => + try { + FlinkSqlConformance.HIVE + } catch { + // for flink 1.18+ + case _: NoSuchFieldError => FlinkSqlConformance.DEFAULT + case e => throw new IllegalArgumentException("Init Flink sql Dialect error: ", e) + } case DEFAULT => FlinkSqlConformance.DEFAULT case _ => throw new UnsupportedOperationException(s"Unsupported sqlDialect: $sqlDialect") } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml new file mode 100644 index 0000000000..1c8ee265a3 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml @@ -0,0 +1,154 @@ + + + + 4.0.0 + + + org.apache.streampark + streampark-flink-shims + 2.1.2 + + + streampark-flink-shims_flink-1.18_${scala.binary.version} + StreamPark : Flink Shims 1.18 + + + 1.18.0 + + + + + org.apache.streampark + streampark-flink-shims-base_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + + + + org.apache.flink + flink-table-api-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-uber + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.binary.version} + ${flink.version} + true + + + + org.apache.flink + flink-statebackend-rocksdb + ${flink.version} + provided + + + + org.apache.flink + flink-yarn + ${flink.version} + provided + + + + org.apache.hadoop + hadoop-client-api + true + + + + org.apache.hadoop + hadoop-client-runtime + true + + + + org.apache.flink + flink-kubernetes + ${flink.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + ${project.basedir}/target/dependency-reduced-pom.xml + + + org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version} + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala new file mode 100644 index 0000000000..4f6336f5a1 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.JobID +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.core.execution.SavepointFormatType + +import java.util.concurrent.CompletableFuture + +class FlinkClusterClient[T](clusterClient: ClusterClient[T]) + extends FlinkClientTrait[T](clusterClient) { + + override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = { + clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT) + } + + override def cancelWithSavepoint( + jobID: JobID, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT) + } + + override def stopWithSavepoint( + jobID: JobID, + advanceToEndOfEventTime: Boolean, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.stopWithSavepoint( + jobID, + advanceToEndOfEventTime, + savepointDirectory, + SavepointFormatType.DEFAULT) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala new file mode 100644 index 0000000000..f388c8e9f4 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService + +import java.util.Optional + +class FlinkKubernetesClient(kubeClient: FlinkKubeClient) + extends FlinkKubernetesClientTrait(kubeClient) { + + override def getService(serviceName: String): Optional[KubernetesService] = { + kubeClient.getService(serviceName) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala new file mode 100644 index 0000000000..65f715c752 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Schema, Table, TableDescriptor, TableResult} +import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri +import org.apache.flink.table.types.AbstractDataType +import org.apache.flink.types.Row + +import java.util.{List => JList} + +class StreamTableContext( + override val parameter: ParameterTool, + private val streamEnv: StreamExecutionEnvironment, + private val tableEnv: StreamTableEnvironment) + extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) { + + def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = + this(args._1, args._2, args._3) + + def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = + tableEnv.fromDataStream[T](dataStream, schema) + + override def fromChangelogStream(dataStream: DataStream[Row]): Table = + tableEnv.fromChangelogStream(dataStream) + + override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = + tableEnv.fromChangelogStream(dataStream, schema) + + override def fromChangelogStream( + dataStream: DataStream[Row], + schema: Schema, + changelogMode: ChangelogMode): Table = + tableEnv.fromChangelogStream(dataStream, schema, changelogMode) + + override def createTemporaryView[T]( + path: String, + dataStream: DataStream[T], + schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema) + + override def toDataStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toDataStream(table) + } + + override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetClass) + } + + override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetDataType) + } + + override def toChangelogStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table) + } + + override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema) + } + + override def toChangelogStream( + table: Table, + targetSchema: Schema, + changelogMode: ChangelogMode): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema, changelogMode) + } + + override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet() + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTemporaryTable(path, descriptor) + + override def createTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTable(path, descriptor) + + override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor) + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala new file mode 100644 index 0000000000..e8f704f393 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri + +import java.util.{List => JList} + +class TableContext(override val parameter: ParameterTool, private val tableEnv: TableEnvironment) + extends FlinkTableTrait(parameter, tableEnv) { + + def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2) + + def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTemporaryTable(path, descriptor) + } + + override def createTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTable(path, descriptor) + } + + override def from(tableDescriptor: TableDescriptor): Table = { + tableEnv.from(tableDescriptor) + } + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(catalogName: String, databaseName: String): Array[String] = + tableEnv.listTables(catalogName, databaseName) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala new file mode 100644 index 0000000000..cab368e361 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.table.api.{Table => FlinkTable} +import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions} +import org.apache.flink.types.Row + +object TableExt { + + class Table(val table: FlinkTable) { + def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*) + } + + class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) { + + def \\ : DataStream[Row] = toDataStream + + def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = { + context.isConvertedToDataStream = true + super.toAppendStream + } + } + +}