diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/ScalarFunctionExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/ScalarFunctionExample.java new file mode 100644 index 000000000000..7d45f249641c --- /dev/null +++ b/example/udf/src/main/java/org/apache/iotdb/udf/ScalarFunctionExample.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf; + +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +/** This is an internal example of the ScalarFunction implementation. */ +public class ScalarFunctionExample implements ScalarFunction { + /** + * CREATE DATABASE test; + * + *

USE test; + * + *

CREATE TABLE t1(device_id STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT); + * + *

INSERT INTO t1(time, device_id, s1, s2) VALUES (1, 'd1', 'a', 1), (2, 'd1', null, 2), (3, + * 'd1', 'c', null); + * + *

CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample'; + * + *

SHOW FUNCTIONS; + * + *

SELECT time, device_id, s1, s2, contain_null(s1, s2) as contain_null, contain_null(s1) as + * s1_isnull, contain_null(s2) as s2_isnull FROM t1; + */ + @Override + public void validate(FunctionParameters parameters) throws UDFException { + if (parameters.getChildExpressionsSize() < 1) { + throw new UDFParameterNotValidException("At least one parameter is required."); + } + } + + @Override + public void beforeStart(FunctionParameters parameters, ScalarFunctionConfig configurations) { + configurations.setOutputDataType(Type.BOOLEAN); + } + + @Override + public Object evaluate(Record input) throws UDFException { + for (int i = 0; i < input.size(); i++) { + if (input.isNull(i)) { + return true; + } + } + return false; + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/AllSum.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/AllSum.java new file mode 100644 index 000000000000..d9754d954f4c --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/AllSum.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational; + +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +import java.util.HashSet; +import java.util.Set; + +/** Calculate the sum of all parameters. Only support inputs of INT32,INT64,DOUBLE,FLOAT type. */ +public class AllSum implements ScalarFunction { + + private Type outputDataType; + + @Override + public void validate(FunctionParameters parameters) throws UDFException { + if (parameters.getChildExpressionsSize() < 1) { + throw new UDFParameterNotValidException("At least one parameter is required."); + } + for (int i = 0; i < parameters.getChildExpressionsSize(); i++) { + if (parameters.getDataType(i) != Type.INT32 + && parameters.getDataType(i) != Type.INT64 + && parameters.getDataType(i) != Type.FLOAT + && parameters.getDataType(i) != Type.DOUBLE) { + throw new UDFParameterNotValidException( + "Only support inputs of INT32,INT64,DOUBLE,FLOAT type."); + } + } + } + + @Override + public void beforeStart(FunctionParameters parameters, ScalarFunctionConfig configurations) { + Set inputTypeSet = new HashSet<>(); + for (int i = 0; i < parameters.getChildExpressionsSize(); i++) { + inputTypeSet.add(parameters.getDataType(i)); + } + if (inputTypeSet.contains(Type.DOUBLE)) { + outputDataType = Type.DOUBLE; + } else if (inputTypeSet.contains(Type.FLOAT)) { + outputDataType = Type.FLOAT; + } else if (inputTypeSet.contains(Type.INT64)) { + outputDataType = Type.INT64; + } else { + outputDataType = Type.INT32; + } + configurations.setOutputDataType(outputDataType); + } + + @Override + public Object evaluate(Record input) { + double res = 0; + for (int i = 0; i < input.size(); i++) { + if (!input.isNull(i)) { + switch (input.getDataType(i)) { + case INT32: + res += input.getInt(i); + break; + case INT64: + res += input.getLong(i); + break; + case FLOAT: + res += input.getFloat(i); + break; + case DOUBLE: + res += input.getDouble(i); + break; + } + } + } + switch (outputDataType) { + case INT32: + return (int) res; + case INT64: + return (long) res; + case FLOAT: + return (float) res; + case DOUBLE: + return res; + default: + throw new RuntimeException("Unexpected output type."); + } + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/ContainNull.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/ContainNull.java new file mode 100644 index 000000000000..463de569eecc --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/ContainNull.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational; + +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +public class ContainNull implements ScalarFunction { + @Override + public void validate(FunctionParameters parameters) throws UDFException { + if (parameters.getChildExpressionsSize() < 1) { + throw new UDFParameterNotValidException("At least one parameter is required."); + } + } + + @Override + public void beforeStart(FunctionParameters parameters, ScalarFunctionConfig configurations) { + configurations.setOutputDataType(Type.BOOLEAN); + } + + @Override + public Object evaluate(Record input) { + for (int i = 0; i < input.size(); i++) { + if (input.isNull(i)) { + return true; + } + } + return false; + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/DatePlusOne.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/DatePlusOne.java new file mode 100644 index 000000000000..09d69ba30964 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/DatePlusOne.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational; + +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +import java.time.LocalDate; + +public class DatePlusOne implements ScalarFunction { + @Override + public void validate(FunctionParameters parameters) throws UDFException { + if (parameters.getChildExpressionsSize() != 2) { + throw new UDFParameterNotValidException("Only two parameter is required."); + } + if (parameters.getDataType(0) != Type.DATE) { + throw new UDFParameterNotValidException("The first parameter should be DATE type."); + } + if (parameters.getDataType(1) != Type.INT32 && parameters.getDataType(1) != Type.INT64) { + throw new UDFParameterNotValidException("The second parameter should be INT type."); + } + } + + @Override + public void beforeStart(FunctionParameters parameters, ScalarFunctionConfig configurations) { + configurations.setOutputDataType(Type.DATE); + } + + @Override + public Object evaluate(Record input) { + LocalDate date = input.getLocalDate(0); + int days = input.getInt(1); + return date.plusDays(days); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java index a37f5c29a8fe..54599cfc809e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.it; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; @@ -288,6 +289,7 @@ private List createUDF(SyncConfigNodeIServiceClient client) TCreateFunctionReq createFunctionReq1 = new TCreateFunctionReq("test1", "org.apache.iotdb.udf.UDTFExample", true) .setModel(Model.TREE) + .setFunctionType(FunctionType.NONE) .setJarName(jarName) .setJarFile(jarFile) .setJarMD5(jarMD5); @@ -295,6 +297,7 @@ private List createUDF(SyncConfigNodeIServiceClient client) TCreateFunctionReq createFunctionReq2 = new TCreateFunctionReq("test2", "org.apache.iotdb.udf.UDTFExample", true) .setModel(Model.TREE) + .setFunctionType(FunctionType.NONE) .setJarName(jarName) .setJarFile(jarFile) .setJarMD5(jarMD5); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java new file mode 100644 index 000000000000..a2f6e37c9724 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.relational.it.db.it.udf; + +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBSQLFunctionManagementIT { + + private static final int BUILTIN_SCALAR_FUNCTIONS_COUNT = + TableBuiltinScalarFunction.getBuiltInScalarFunctionName().size(); + private static final int BUILTIN_AGGREGATE_FUNCTIONS_COUNT = + TableBuiltinAggregationFunction.values().length; + + private static final String UDF_LIB_PREFIX = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator; + + private static final String UDF_JAR_PREFIX = new File(UDF_LIB_PREFIX).toURI().toString(); + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testCreateShowDropScalarFunction() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + + try (ResultSet resultSet = statement.executeQuery("show functions")) { + assertEquals(4, resultSet.getMetaData().getColumnCount()); + int count = 0; + while (resultSet.next()) { + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); ++i) { + stringBuilder.append(resultSet.getString(i)).append(","); + } + String result = stringBuilder.toString(); + if (result.contains("FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC")) { + Assert.assertEquals( + String.format( + "udsf,%s,org.apache.iotdb.db.query.udf.example.relational.ContainNull,AVAILABLE,", + FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC), + result); + } + ++count; + } + Assert.assertEquals( + 1 + BUILTIN_AGGREGATE_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); + } + statement.execute("drop function udsf"); + try (ResultSet resultSet = statement.executeQuery("show functions")) { + assertEquals(4, resultSet.getMetaData().getColumnCount()); + int count = 0; + while (resultSet.next()) { + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); ++i) { + stringBuilder.append(resultSet.getString(i)).append(","); + } + String result = stringBuilder.toString(); + if (result.contains("FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC")) { + Assert.fail(); + } + ++count; + } + Assert.assertEquals( + BUILTIN_AGGREGATE_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); + } + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } + + @Test + public void testCreateFunctionWithBuiltinFunctionName() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute( + "create function COS as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + fail(); + } catch (SQLException throwable) { + assertTrue( + throwable + .getMessage() + .contains("the given function name conflicts with the built-in function name")); + } + try { + statement.execute( + "create function aVg as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + fail(); + } catch (SQLException throwable) { + assertTrue( + throwable + .getMessage() + .contains("the given function name conflicts with the built-in function name")); + } + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } + + @Test + public void testCreateFunctionTwice() throws SQLException { // create function twice + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + try { + statement.execute( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + fail(); + } catch (SQLException throwable) { + assertTrue(throwable.getMessage().contains("the same name UDF has been created")); + } + } + } + + @Test + public void testCreateInvalidFunction() throws SQLException { // create function twice + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull123'"); + fail(); + } catch (SQLException throwable) { + assertTrue(throwable.getMessage().contains("invalid")); + } + } + } + + @Test + public void testCreateFunctionWithURI() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull' using URI '%s'", + UDF_JAR_PREFIX + "udf-example.jar")); + statement.execute( + String.format( + "create function udsf2 as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull' using URI '%s'", + UDF_JAR_PREFIX + "udf-example.jar")); + + try (ResultSet resultSet = statement.executeQuery("show functions")) { + int count = 0; + while (resultSet.next()) { + ++count; + } + Assert.assertEquals( + 2 + BUILTIN_AGGREGATE_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); + assertEquals(4, resultSet.getMetaData().getColumnCount()); + } catch (Exception e) { + fail(); + } + } + } + + @Test + public void testCreateFunctionWithInvalidURI() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute( + String.format( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull' using URI '%s'", + "")); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("URI")); + } + + try { + statement.execute( + String.format( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull' using URI '%s'", + "file:///data/udf/upload-test.jar")); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("URI")); + } + } catch (SQLException throwable) { + fail(); + } + } + + @Test + public void testDropFunctionTwice() throws SQLException { // create + drop twice + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "create function udsf as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'"); + statement.execute("drop function udsf"); + + try { + // drop UDF that does not exist will not throw exception now. + statement.execute("drop function udsf"); + } catch (SQLException throwable) { + assertTrue(throwable.getMessage().contains("this UDF has not been created")); + } + } + } + + @Test + public void testDropNotExistFunction() { // drop + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + // drop UDF that does not exist will not throw exception now. + statement.execute("drop function udsf"); + } catch (SQLException throwable) { + assertTrue(throwable.getMessage().contains("this UDF has not been created")); + } + } + + @Test + public void testDropBuiltInFunction() throws SQLException { // drop + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute("drop function abs"); + fail(); + } catch (SQLException throwable) { + assertTrue( + throwable.getMessage().contains("Built-in function ABS can not be deregistered")); + } + // ensure that abs is not dropped + statement.execute("CREATE DATABASE db"); + statement.execute("USE db"); + statement.execute("CREATE TABLE table0 (device string id, s1 INT32)"); + statement.execute("INSERT INTO table0 (time, device, s1) VALUES (1, 'd1', -10)"); + try (ResultSet rs = statement.executeQuery("SELECT time, ABS(s1) FROM table0")) { + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getLong(1)); + Assert.assertEquals(10, rs.getInt(2)); + Assert.assertFalse(rs.next()); + } + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java new file mode 100644 index 000000000000..7d7f1cbbe357 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it.udf.scalar; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.Types; +import java.text.DecimalFormat; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBScalarFunctionIT { + private static String[] sqls = + new String[] { + "CREATE DATABASE test", + "USE test", + "CREATE TABLE vehicle (device_id string id, s1 INT32 measurement, s2 INT64 measurement, s3 FLOAT MEASUREMENT, s4 DOUBLE MEASUREMENT, s5 BOOLEAN MEASUREMENT)", + "insert into vehicle(time, device_id, s1, s2, s3, s4, s5) values (1, 'd0', 1, 1, 1.1, 1.1, true)", + "insert into vehicle(time, device_id, s1, s2, s3, s4, s5) values (2, 'd0', null, 2, 2.2, 2.2, true)", + "insert into vehicle(time, device_id, s1, s2, s3, s4, s5) values (3, 'd0', 3, 3, null, null, false)", + "insert into vehicle(time, device_id, s5) values (5, 'd0', true)", + "CREATE FUNCTION contain_null as 'org.apache.iotdb.db.query.udf.example.relational.ContainNull'", + "CREATE FUNCTION all_sum as 'org.apache.iotdb.db.query.udf.example.relational.AllSum'", + "CREATE TABLE t2 (device_id string id, s1 DATE measurement)", + "insert into t2(time, device_id, s1) values (1, 'd0', '2024-02-28')", + "insert into t2(time, device_id, s1) values (2, 'd0', '2024-02-29')", + "insert into t2(time, device_id, s1) values (3, 'd0', '2024-03-01')", + "CREATE FUNCTION date_plus as 'org.apache.iotdb.db.query.udf.example.relational.DatePlusOne'" + }; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + private static void insertData() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqls) { + System.out.println(sql); + statement.execute(sql); + } + } catch (Exception e) { + fail("insertData failed."); + } + } + + @Test + public void testIllegalInput() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE test"); + try { + statement.execute("select contain_null() from vehicle"); + fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("At least one parameter is required")); + } + try { + statement.execute("select all_sum(s1,s2,s3,s4,s5) from vehicle"); + fail(); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("Only support inputs of INT32,INT64,DOUBLE,FLOAT type")); + } + + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testNormalQuery() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE test"); + List expectedResult = + Arrays.asList("1,false,false", "2,true,true", "3,true,false", "5,true,true"); + int row = 0; + try (ResultSet resultSet = + statement.executeQuery( + "select time, contain_null(s1,s2,s3,s4,s5) as contain_null, contain_null(s1) as s1_null from vehicle")) { + while (resultSet.next()) { + Assert.assertEquals( + expectedResult.get(row), + resultSet.getLong(1) + "," + resultSet.getBoolean(2) + "," + resultSet.getBoolean(3)); + row++; + } + assertEquals(4, row); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testPolymorphicQuery() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE test"); + List expectedResult = + Arrays.asList( + "1,1,1,1.1,1.1,2,3.1,4.2,2.1", + "2,0,2,2.2,2.2,2,4.2,6.4,4.2", + "3,3,3,.0,.0,6,6.0,6.0,3.0", + "5,0,0,.0,.0,0,.0,.0,.0"); + int row = 0; + try (ResultSet resultSet = + statement.executeQuery( + "select time, all_sum(s1) as s1, all_sum(s2) as s2, all_sum(s3) as s3, all_sum(s4) as s4, all_sum(s1,s2) as s12, all_sum(s1,s2,s3) as s123, all_sum(s1,s2,s3,s4) as s1234, all_sum(s2,s3) as s23 from vehicle")) { + Assert.assertEquals(Types.TIMESTAMP, resultSet.getMetaData().getColumnType(1)); + Assert.assertEquals(Types.INTEGER, resultSet.getMetaData().getColumnType(2)); + Assert.assertEquals(Types.BIGINT, resultSet.getMetaData().getColumnType(3)); + Assert.assertEquals(Types.FLOAT, resultSet.getMetaData().getColumnType(4)); + Assert.assertEquals(Types.DOUBLE, resultSet.getMetaData().getColumnType(5)); + Assert.assertEquals(Types.BIGINT, resultSet.getMetaData().getColumnType(6)); + Assert.assertEquals(Types.FLOAT, resultSet.getMetaData().getColumnType(7)); + Assert.assertEquals(Types.DOUBLE, resultSet.getMetaData().getColumnType(8)); + Assert.assertEquals(Types.FLOAT, resultSet.getMetaData().getColumnType(9)); + DecimalFormat df = new DecimalFormat("#.0"); + while (resultSet.next()) { + Assert.assertEquals( + expectedResult.get(row), + resultSet.getLong(1) + + "," + + resultSet.getInt(2) + + "," + + resultSet.getLong(3) + + "," + + df.format(resultSet.getFloat(4)) + + "," + + df.format(resultSet.getDouble(5)) + + "," + + resultSet.getLong(6) + + "," + + df.format(resultSet.getFloat(7)) + + "," + + df.format(resultSet.getDouble(8)) + + "," + + df.format(resultSet.getFloat(9))); + row++; + } + assertEquals(4, row); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testDateFunction() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE test"); + List expectedResult = + Arrays.asList( + LocalDate.of(2024, 2, 29), LocalDate.of(2024, 3, 1), LocalDate.of(2024, 3, 2)); + int row = 0; + try (ResultSet resultSet = statement.executeQuery("select date_plus(s1, 1) from t2")) { + while (resultSet.next()) { + Assert.assertEquals(expectedResult.get(row), resultSet.getDate(1).toLocalDate()); + row++; + } + assertEquals(3, row); + } + expectedResult = + Arrays.asList( + LocalDate.of(2024, 3, 1), LocalDate.of(2024, 3, 2), LocalDate.of(2024, 3, 3)); + row = 0; + try (ResultSet resultSet = statement.executeQuery("select date_plus(s1, 2) from t2")) { + while (resultSet.next()) { + Assert.assertEquals(expectedResult.get(row), resultSet.getDate(1).toLocalDate()); + row++; + } + assertEquals(3, row); + } + + } catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/ScalarFunctionConfig.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/ScalarFunctionConfig.java new file mode 100644 index 000000000000..c237e099b0dd --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/ScalarFunctionConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.customizer.config; + +import org.apache.iotdb.udf.api.type.Type; + +public class ScalarFunctionConfig extends UDFConfigurations { + + /** + * Set the output data type of the scalar function. + * + * @param outputDataType the output data type of the scalar function + * @return this + */ + public ScalarFunctionConfig setOutputDataType(Type outputDataType) { + this.outputDataType = outputDataType; + return this; + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/parameter/FunctionParameters.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/parameter/FunctionParameters.java new file mode 100644 index 000000000000..c80ea4788671 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/parameter/FunctionParameters.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.customizer.parameter; + +import org.apache.iotdb.udf.api.type.Type; + +import java.util.List; +import java.util.Map; + +/** + * FunctionParameters is used to provide the information of the function parameters to the UDF + * implementation. It contains the data types of the child expressions, system attributes, etc. + */ +public class FunctionParameters { + private final List childExpressionDataTypes; + private final Map systemAttributes; + + public FunctionParameters( + List childExpressionDataTypes, Map systemAttributes) { + this.childExpressionDataTypes = childExpressionDataTypes; + this.systemAttributes = systemAttributes; + } + + /** + * Get the data types of the input children expressions. + * + * @return a list of data types of the input children expressions + */ + public List getChildExpressionDataTypes() { + return childExpressionDataTypes; + } + + /** + * Get the number of the input children expressions. + * + * @return the number of the input children expressions + */ + public int getChildExpressionsSize() { + return childExpressionDataTypes.size(); + } + + /** + * Get the data type of the input child expression at the specified index. + * + * @param index column index + * @return the data type of the input child expression at the specified index + */ + public Type getDataType(int index) { + return childExpressionDataTypes.get(index); + } + + /** + * Check if the system attribute exists. + * + * @param attributeKey the key of the system attribute + * @return true if the system attribute exists, false otherwise + */ + public boolean hasSystemAttribute(String attributeKey) { + return systemAttributes.containsKey(attributeKey); + } + + /** + * Get all the system attributes. + * + * @return a map of the system attributes + */ + public Map getSystemAttributes() { + return systemAttributes; + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java similarity index 93% rename from iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java rename to iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java index e831d5b5292b..24942afd010e 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java @@ -19,4 +19,4 @@ package org.apache.iotdb.udf.api.relational; -public interface AggregationFunction extends SQLFunction {} +public interface AggregateFunction extends SQLFunction {} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java index 996d6994138b..adea68e58e4c 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java @@ -19,4 +19,51 @@ package org.apache.iotdb.udf.api.relational; -public interface ScalarFunction extends SQLFunction {} +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.access.Record; + +public interface ScalarFunction extends SQLFunction { + + /** + * This method is used to validate {@linkplain FunctionParameters}. + * + * @param parameters parameters used to validate + * @throws UDFException if any parameter is not valid + */ + void validate(FunctionParameters parameters) throws UDFException; + + /** + * This method is mainly used to initialize {@linkplain ScalarFunction} and set the output data + * type. In this method, the user need to do the following things: + * + *

+ * + *

This method is called after the ScalarFunction is instantiated and before the beginning of + * the transformation process. + * + * @param parameters used to parse the input parameters entered by the user + * @param configurations used to set the required properties in the ScalarFunction + */ + void beforeStart(FunctionParameters parameters, ScalarFunctionConfig configurations); + + /** + * This method will be called to process the transformation. In a single UDF query, this method + * may be called multiple times. + * + * @param input original input data row + * @throws UDFException the user can throw errors if necessary + * @throws UnsupportedOperationException if the user does not override this method + */ + Object evaluate(Record input) throws UDFException; + + /** This method is mainly used to release the resources used in the SQLFunction. */ + default void beforeDestroy() { + // do nothing + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java index 98d3b6b80c50..558a12cb69c7 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java @@ -22,7 +22,7 @@ import org.apache.iotdb.udf.api.type.Binary; import org.apache.iotdb.udf.api.type.Type; -import java.io.IOException; +import java.time.LocalDate; public interface Record { /** @@ -32,20 +32,19 @@ public interface Record { * * @param columnIndex index of the specified column * @return the int value at the specified column in this row - * @throws IOException if an I/O error occurs */ - int getInt(int columnIndex) throws IOException; + int getInt(int columnIndex); /** * Returns the long value at the specified column in this row. * - *

Users need to ensure that the data type of the specified column is {@code TSDataType.INT64}. + *

Users need to ensure that the data type of the specified column is {@code TSDataType.INT64} + * or {@code TSDataType.TIMESTAMP}. * * @param columnIndex index of the specified column * @return the long value at the specified column in this row - * @throws IOException if an I/O error occurs */ - long getLong(int columnIndex) throws IOException; + long getLong(int columnIndex); /** * Returns the float value at the specified column in this row. @@ -54,9 +53,8 @@ public interface Record { * * @param columnIndex index of the specified column * @return the float value at the specified column in this row - * @throws IOException if an I/O error occurs */ - float getFloat(int columnIndex) throws IOException; + float getFloat(int columnIndex); /** * Returns the double value at the specified column in this row. @@ -66,9 +64,8 @@ public interface Record { * * @param columnIndex index of the specified column * @return the double value at the specified column in this row - * @throws IOException if an I/O error occurs */ - double getDouble(int columnIndex) throws IOException; + double getDouble(int columnIndex); /** * Returns the boolean value at the specified column in this row. @@ -78,31 +75,40 @@ public interface Record { * * @param columnIndex index of the specified column * @return the boolean value at the specified column in this row - * @throws IOException if an I/O error occurs */ - boolean getBoolean(int columnIndex) throws IOException; + boolean getBoolean(int columnIndex); /** * Returns the Binary value at the specified column in this row. * - *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}. + *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}, + * {@code TSDataType.STRING} or {@code TSDataType.BLOB}. * * @param columnIndex index of the specified column * @return the Binary value at the specified column in this row - * @throws IOException if an I/O error occurs */ - Binary getBinary(int columnIndex) throws IOException; + Binary getBinary(int columnIndex); /** * Returns the String value at the specified column in this row. * - *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}. + *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT} + * or {@code TSDataType.STRING}. * * @param columnIndex index of the specified column * @return the String value at the specified column in this row - * @throws IOException if an I/O error occurs */ - String getString(int columnIndex) throws IOException; + String getString(int columnIndex); + + /** + * Returns the String value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.DATE}. + * + * @param columnIndex index of the specified column + * @return the String value at the specified column in this row + */ + LocalDate getLocalDate(int columnIndex); /** * Returns the actual data type of the value at the specified column in this row. @@ -118,7 +124,7 @@ public interface Record { * @param columnIndex index of the specified column * @return {@code true} if the value of the specified column is null */ - boolean isNull(int columnIndex) throws IOException; + boolean isNull(int columnIndex); /** * Returns the number of columns. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index 810d1426a8ee..6c6477e24181 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -19,11 +19,13 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -88,11 +90,17 @@ public TSStatus createFunction(TCreateFunctionReq req) { final String jarName = req.getJarName(); final byte[] jarFile = req.getJarFile(); final Model model = req.getModel(); + final FunctionType functionType = req.getFunctionType(); udfInfo.validate(model, udfName, jarName, jarMD5); UDFInformation udfInformation = new UDFInformation( - udfName, req.getClassName(), model, false, isUsingURI, jarName, jarMD5); + udfName, + req.getClassName(), + UDFType.of(model, functionType, false), + isUsingURI, + jarName, + jarMD5); final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName); @@ -111,7 +119,13 @@ public TSStatus createFunction(TCreateFunctionReq req) { return preCreateStatus; } udfInformation = - new UDFInformation(udfName, req.getClassName(), model, true, isUsingURI, jarName, jarMD5); + new UDFInformation( + udfName, + req.getClassName(), + UDFType.of(model, functionType, true), + isUsingURI, + jarName, + jarMD5); LOGGER.info( "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar); final TSStatus dataNodesStatus = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 4e357225c14b..ba2f4371f630 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.request; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; @@ -60,6 +61,7 @@ import org.apache.iotdb.commons.sync.TsFilePipeInfo; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; @@ -1476,7 +1478,13 @@ public void UpdateTriggerLocationPlanTest() throws IOException { @Test public void CreateFunctionPlanTest() throws IOException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); + new UDFInformation( + "test1", + "test1", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test1.jar", + "12345"); CreateFunctionPlan createFunctionPlan0 = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); CreateFunctionPlan createFunctionPlan1 = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java index 8d8eadb93afb..8d264ed00094 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.response; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; @@ -27,6 +28,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp; import org.apache.iotdb.confignode.consensus.response.trigger.TransferringTriggersResp; import org.apache.iotdb.confignode.consensus.response.trigger.TriggerLocationResp; @@ -52,9 +54,20 @@ public void convertFunctionRespTest() throws IOException { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), new UDFInformation( - "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); + "test1", + "test1", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test1.jar", + "12345"), + new UDFInformation( + "test2", + "test2", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test2.jar", + "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( @@ -118,9 +131,20 @@ public void convertJarRespTest() throws IOException { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), new UDFInformation( - "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); + "test1", + "test1", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test1.jar", + "12345"), + new UDFInformation( + "test2", + "test2", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test2.jar", + "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java index dc8b577f28e5..d59759ff7d6a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java @@ -18,9 +18,11 @@ */ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.commons.io.FileUtils; @@ -62,14 +64,26 @@ public static void cleanup() throws IOException { @Test public void testSnapshot() throws TException, IOException, IllegalPathException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); + new UDFInformation( + "test1", + "test1", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test1.jar", + "12345"); CreateFunctionPlan createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); udfInformation = - new UDFInformation("test2", "test2", Model.TREE, true, true, "test2.jar", "123456"); + new UDFInformation( + "test2", + "test2", + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + "test2.jar", + "123456"); createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index 8a44437ad9e3..125fa7563029 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -44,8 +44,8 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkState; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.FIRST_BY; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST_BY; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST_BY; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST_BY; import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; public class AccumulatorFactory { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java index 9f9658c9b05b..ee3f459f9e9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java @@ -19,6 +19,10 @@ package org.apache.iotdb.db.queryengine.execution.relational; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.commons.udf.utils.TableUDFUtils; +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; @@ -94,6 +98,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.column.multi.LogicalOrMultiColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.ternary.BetweenColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.ternary.Like3ColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.udf.UserDefineScalarFunctionTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.IsNullColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.LikeColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.LogicNotColumnTransformer; @@ -145,13 +150,15 @@ import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubString2ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubString3ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringColumnTransformer; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TanColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TanhColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Trim2ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TrimColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TryCastFunctionColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.UpperColumnTransformer; +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.regexp.LikePattern; @@ -170,6 +177,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -996,8 +1004,30 @@ private ColumnTransformer getFunctionColumnTransformer( source, ((LongLiteral) children.get(3)).getParsedValue(), context.sessionInfo.getZoneId()); + } else { + // user defined function + if (TableUDFUtils.isScalarFunction(functionName)) { + ScalarFunction scalarFunction = TableUDFUtils.getScalarFunction(functionName); + List childrenColumnTransformer = + children.stream().map(child -> process(child, context)).collect(Collectors.toList()); + FunctionParameters parameters = + new FunctionParameters( + childrenColumnTransformer.stream() + .map(i -> UDFDataTypeTransformer.transformReadTypeToUDFDataType(i.getType())) + .collect(Collectors.toList()), + Collections.emptyMap()); + ScalarFunctionConfig config = new ScalarFunctionConfig(); + scalarFunction.beforeStart(parameters, config); + Type returnType = + UDFDataTypeTransformer.transformUDFDataTypeToReadType(config.getOutputDataType()); + return new UserDefineScalarFunctionTransformer( + returnType, scalarFunction, childrenColumnTransformer); + } } - throw new IllegalArgumentException(String.format("Unknown function: %s", functionName)); + throw new IllegalArgumentException( + String.format( + "Unknown function %s on DataNode: %d.", + functionName, IoTDBDescriptor.getInstance().getConfig().getDataNodeId())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index ded116bcd670..05c00e5322e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -59,11 +59,13 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.KillQuery; @@ -80,6 +82,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables; @@ -397,7 +400,10 @@ private IQueryExecution createQueryExecutionForTableModel( || statement instanceof ShowVariables || statement instanceof ShowClusterId || statement instanceof ShowCurrentTimestamp - || statement instanceof KillQuery) { + || statement instanceof KillQuery + || statement instanceof CreateFunction + || statement instanceof DropFunction + || statement instanceof ShowFunctions) { return new ConfigExecution( queryContext, null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 915dd49ffd60..5c6178830c32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.schema.table.TsTable; @@ -30,10 +31,13 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreateFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowVariablesTask; @@ -88,6 +92,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; @@ -98,6 +103,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; @@ -124,6 +130,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions; @@ -840,4 +847,22 @@ protected IConfigTask visitKillQuery(KillQuery node, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); return new KillQueryTask(node); } + + @Override + protected IConfigTask visitCreateFunction(CreateFunction node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new CreateFunctionTask(node); + } + + @Override + protected IConfigTask visitShowFunctions(ShowFunctions node, MPPQueryContext context) { + context.setQueryType(QueryType.READ); + return new ShowFunctionsTask(Model.TABLE); + } + + @Override + protected IConfigTask visitDropFunction(DropFunction node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new DropFunctionTask(Model.TABLE, node.getUdfName()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 092a9fec9ee1..286bfe8b5785 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -275,6 +276,9 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.enums.FailureStrategy; +import org.apache.iotdb.udf.api.relational.AggregateFunction; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.TableFunction; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.codec.digest.DigestUtils; @@ -572,11 +576,23 @@ public SettableFuture createFunction( jarFileName.substring(jarFileName.lastIndexOf(".") + 1))); } + FunctionType functionType = FunctionType.NONE; // try to create instance, this request will fail if creation is not successful try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { // ensure that jar file contains the class and the class is a UDF Class clazz = Class.forName(className, true, classLoader); - baseClazz.cast(clazz.getDeclaredConstructor().newInstance()); + Object o = baseClazz.cast(clazz.getDeclaredConstructor().newInstance()); + if (Model.TABLE.equals(model)) { + // we check function type for table model + if (o instanceof ScalarFunction) { + functionType = FunctionType.SCALAR; + } else if (o instanceof AggregateFunction) { + functionType = FunctionType.AGGREGATE; + } else if (o instanceof TableFunction) { + functionType = FunctionType.TABLE; + } + } + tCreateFunctionReq.setFunctionType(functionType); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException @@ -655,7 +671,7 @@ public SettableFuture showFunctions(Model model) { return future; } // convert UDFTable and buildTsBlock - ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future); + ShowFunctionsTask.buildTsBlock(model, getUDFTableResp.getAllUDFInformation(), future); } catch (ClientManagerException | TException e) { future.setException(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java index c6220151dd63..0e57412517db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java @@ -27,6 +27,9 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.commons.udf.utils.TableUDFUtils; import org.apache.iotdb.commons.udf.utils.TreeUDFUtils; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; @@ -44,6 +47,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -52,12 +56,18 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_AVAILABLE; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_UNAVAILABLE; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_AGG_FUNC; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_SCALAR; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_SCALAR_FUNC; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_TABLE_FUNC; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_UNKNOWN; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_AGG_FUNC; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC; public class ShowFunctionsTask implements IConfigTask { @@ -69,9 +79,27 @@ public class ShowFunctionsTask implements IConfigTask { BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDTF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDTF)); BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDAF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDAF)); BINARY_MAP.put(FUNCTION_TYPE_BUILTIN_SCALAR, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR)); + + BINARY_MAP.put( + FUNCTION_TYPE_BUILTIN_SCALAR_FUNC, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR_FUNC)); + BINARY_MAP.put( + FUNCTION_TYPE_BUILTIN_AGG_FUNC, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_AGG_FUNC)); + BINARY_MAP.put( + FUNCTION_TYPE_BUILTIN_TABLE_FUNC, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_TABLE_FUNC)); + BINARY_MAP.put( + FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC, + BytesUtils.valueOf(FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC)); + BINARY_MAP.put( + FUNCTION_TYPE_USER_DEFINED_AGG_FUNC, + BytesUtils.valueOf(FUNCTION_TYPE_USER_DEFINED_AGG_FUNC)); + BINARY_MAP.put( + FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC, + BytesUtils.valueOf(FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC)); + BINARY_MAP.put(FUNCTION_TYPE_UNKNOWN, BytesUtils.valueOf(FUNCTION_TYPE_UNKNOWN)); BINARY_MAP.put(FUNCTION_STATE_AVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_AVAILABLE)); BINARY_MAP.put(FUNCTION_STATE_UNAVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_UNAVAILABLE)); + BINARY_MAP.put("", BytesUtils.valueOf("")); } @@ -87,6 +115,48 @@ public ListenableFuture execute(IConfigTaskExecutor configTask } public static void buildTsBlock( + Model model, List allUDFInformation, SettableFuture future) { + if (Model.TREE.equals(model)) { + buildTreeModelTsBlock(allUDFInformation, future); + } else { + buildTableModelTsBlock(allUDFInformation, future); + } + } + + private static void buildTableModelTsBlock( + List allUDFInformation, SettableFuture future) { + List outputDataTypes = + ColumnHeaderConstant.showFunctionsColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList()); + TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes); + List udfInformations = new ArrayList<>(); + if (allUDFInformation != null && !allUDFInformation.isEmpty()) { + for (ByteBuffer udfInformationByteBuffer : allUDFInformation) { + UDFInformation udfInformation = UDFInformation.deserialize(udfInformationByteBuffer); + udfInformations.add(udfInformation); + } + } + + udfInformations.sort(Comparator.comparing(UDFInformation::getFunctionName)); + for (UDFInformation udfInformation : udfInformations) { + appendUDFInformation(builder, udfInformation); + } + appendFunctions( + builder, + TableBuiltinScalarFunction.getBuiltInScalarFunctionName(), + BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_SCALAR_FUNC), + BINARY_MAP.get(FUNCTION_STATE_AVAILABLE)); + appendFunctions( + builder, + TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName(), + BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_AGG_FUNC), + BINARY_MAP.get(FUNCTION_STATE_AVAILABLE)); + DatasetHeader datasetHeader = DatasetHeaderFactory.getShowFunctionsHeader(); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader)); + } + + private static void buildTreeModelTsBlock( List allUDFInformation, SettableFuture future) { List outputDataTypes = ColumnHeaderConstant.showFunctionsColumnHeaders.stream() @@ -118,10 +188,25 @@ private static void appendUDFInformation(TsBlockBuilder builder, UDFInformation builder.getColumnBuilder(1).writeBinary(getFunctionType(udfInformation)); builder.getColumnBuilder(2).writeBinary(BytesUtils.valueOf(udfInformation.getClassName())); builder.getColumnBuilder(3).writeBinary(getFunctionState(udfInformation)); - builder.declarePosition(); } + private static void appendFunctions( + TsBlockBuilder builder, + Collection functionNames, + Binary functionType, + Binary functionState) { + final Binary className = BINARY_MAP.get(""); + for (String functionName : functionNames) { + builder.getTimeColumnBuilder().writeLong(0L); + builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(functionName.toUpperCase())); + builder.getColumnBuilder(1).writeBinary(functionType); + builder.getColumnBuilder(2).writeBinary(className); + builder.getColumnBuilder(3).writeBinary(functionState); + builder.declarePosition(); + } + } + private static void appendBuiltInTimeSeriesGeneratingFunctions(TsBlockBuilder builder) { final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_UDTF); final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); @@ -175,6 +260,14 @@ private static Binary getFunctionType(UDFInformation udfInformation) { } else if (TreeUDFUtils.isUDAF(udfInformation.getFunctionName())) { return BINARY_MAP.get(FUNCTION_TYPE_EXTERNAL_UDAF); } + } else { + if (TableUDFUtils.isScalarFunction(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC); + } else if (TableUDFUtils.isAggregateFunction(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_USER_DEFINED_AGG_FUNC); + } else if (TableUDFUtils.isTableFunction(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC); + } } } return BINARY_MAP.get(FUNCTION_TYPE_UNKNOWN); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 14fb41ebaa4c..7728bf7895a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -194,6 +194,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.MEASUREMENT; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.getAggregationTypeByFuncName; import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; import static org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaContentSupplierFactory.getSupplier; @@ -206,7 +207,6 @@ import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.UNKNOWN_DATATYPE; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.getLinearFill; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.getPreviousFill; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.getAggregationTypeByFuncName; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST; import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java index 9407de3ce473..f786bb1ceffb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultExpressionTraversalVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; @@ -93,7 +93,7 @@ public static QualifiedName asQualifiedName(Expression expression) { static boolean isAggregationFunction(String functionName) { // TODO consider UDAF - return TableBuiltinAggregationFunction.getNativeFunctionNames() + return TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName() .contains(functionName.toLowerCase()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index c8838de59423..fe3cb9bff434 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -188,6 +188,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES; import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY; import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifyOrderByAggregations; @@ -204,7 +205,6 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.RIGHT; import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil.preOrder; import static org.apache.iotdb.db.queryengine.plan.relational.utils.NodeUtils.getSortItemsFromOrderBy; -import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS; import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/InterpretedFunctionInvoker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/InterpretedFunctionInvoker.java index 66b6631271f3..c85605233cbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/InterpretedFunctionInvoker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/InterpretedFunctionInvoker.java @@ -19,8 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.function; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.db.queryengine.common.SessionInfo; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import org.apache.tsfile.read.common.type.Type; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index 4e1b6293825a..c88a73d87c9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -24,6 +24,10 @@ import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.schema.table.InformationSchemaTable; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.commons.udf.utils.TableUDFUtils; +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; @@ -44,9 +48,11 @@ import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; +import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.type.BlobType; @@ -633,7 +639,27 @@ && isIntegerNumber(argumentTypes.get(2)))) { // ignore } - // TODO scalar UDF function + // User-defined scalar function + + if (TableUDFUtils.isScalarFunction(functionName)) { + ScalarFunction scalarFunction = TableUDFUtils.getScalarFunction(functionName); + FunctionParameters functionParameters = + new FunctionParameters( + argumentTypes.stream() + .map(UDFDataTypeTransformer::transformReadTypeToUDFDataType) + .collect(Collectors.toList()), + Collections.emptyMap()); + try { + scalarFunction.validate(functionParameters); + ScalarFunctionConfig config = new ScalarFunctionConfig(); + scalarFunction.beforeStart(functionParameters, config); + return UDFDataTypeTransformer.transformUDFDataTypeToReadType(config.getOutputDataType()); + } catch (Exception e) { + throw new SemanticException("Invalid function parameters: " + e.getMessage()); + } finally { + scalarFunction.beforeDestroy(); + } + } // TODO UDAF @@ -643,7 +669,7 @@ && isIntegerNumber(argumentTypes.get(2)))) { @Override public boolean isAggregationFunction( final SessionInfo session, final String functionName, final AccessControl accessControl) { - return TableBuiltinAggregationFunction.getNativeFunctionNames() + return TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName() .contains(functionName.toLowerCase(Locale.ENGLISH)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 5c3eed2ad31f..c0d5cd2e55c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -86,13 +86,13 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TransformSortToStreamSort.isOrderByAllIdsAndTime; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.split; -import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.tsfile.utils.Preconditions.checkArgument; /** This class is used to generate distributed plan for table model. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index 7ac914e354cc..26e571b31e7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -14,6 +14,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -28,7 +29,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import com.google.common.collect.ImmutableSet; import org.apache.tsfile.utils.Pair; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java index 9a2c9cb55e86..844744dbe79f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index a3bbd8349a5d..fcf65be1eb3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -50,6 +51,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin; @@ -67,6 +69,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin; @@ -139,6 +142,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes; @@ -177,7 +181,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlBaseVisitor; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlLexer; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser; @@ -226,13 +229,13 @@ import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ID; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.MEASUREMENT; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED; import static org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseDateTimeFormat; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.CUBE; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.EXPLICIT; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.ROLLUP; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier; -import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION; @@ -734,7 +737,14 @@ public Node visitUpdateAssignment(RelationalSqlParser.UpdateAssignmentContext ct @Override public Node visitCreateFunctionStatement(RelationalSqlParser.CreateFunctionStatementContext ctx) { - return super.visitCreateFunctionStatement(ctx); + final String udfName = ((Identifier) visit(ctx.udfName)).getValue(); + final String className = ((Identifier) visit(ctx.className)).getValue(); + if (ctx.uriClause() == null) { + return new CreateFunction(getLocation(ctx), udfName, className); + } else { + String uriString = parseAndValidateURI(ctx.uriClause()); + return new CreateFunction(getLocation(ctx), udfName, className, uriString); + } } @Override @@ -744,12 +754,13 @@ public Node visitUriClause(RelationalSqlParser.UriClauseContext ctx) { @Override public Node visitDropFunctionStatement(RelationalSqlParser.DropFunctionStatementContext ctx) { - return super.visitDropFunctionStatement(ctx); + final String udfName = ((Identifier) visit(ctx.udfName)).getValue(); + return new DropFunction(getLocation(ctx), udfName); } @Override public Node visitShowFunctionsStatement(RelationalSqlParser.ShowFunctionsStatementContext ctx) { - return super.visitShowFunctionsStatement(ctx); + return new ShowFunctions(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java index e36a272b363c..d6222564ec7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java @@ -89,10 +89,10 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.ReservedIdentifiers.reserved; import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.SqlFormatter.formatName; import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.SqlFormatter.formatSql; -import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; public final class ExpressionFormatter { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java new file mode 100644 index 000000000000..279e3c06fd49 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.transformation.dag.column.udf; + +import org.apache.iotdb.commons.udf.access.RecordIterator; +import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.access.Record; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.read.common.type.Type; + +import java.util.List; +import java.util.stream.Collectors; + +public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer { + + private final ScalarFunction scalarFunction; + private final List inputTypes; + + public UserDefineScalarFunctionTransformer( + Type returnType, + ScalarFunction scalarFunction, + List childrenTransformers) { + super(returnType, childrenTransformers); + this.scalarFunction = scalarFunction; + this.inputTypes = + childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList()); + } + + @Override + protected void doTransform( + List childrenColumns, ColumnBuilder builder, int positionCount) { + RecordIterator iterator = new RecordIterator(childrenColumns, inputTypes, positionCount); + while (iterator.hasNext()) { + try { + Object result = scalarFunction.evaluate(iterator.next()); + if (result == null) { + builder.appendNull(); + } else { + builder.writeObject(result); + } + } catch (Exception e) { + throw new RuntimeException( + "Error occurs when evaluating user-defined scalar function " + + scalarFunction.getClass().getName(), + e); + } + } + } + + @Override + protected void doTransform( + List childrenColumns, ColumnBuilder builder, int positionCount, boolean[] selection) { + RecordIterator iterator = new RecordIterator(childrenColumns, inputTypes, positionCount); + int i = 0; + while (iterator.hasNext()) { + try { + Record input = iterator.next(); + if (selection[i++]) { + builder.appendNull(); + continue; + } + Object result = scalarFunction.evaluate(input); + if (result == null) { + builder.appendNull(); + } else { + builder.writeObject(result); + } + } catch (Throwable e) { + throw new RuntimeException( + "Error occurs when evaluating user-defined scalar function " + + scalarFunction.getClass().getName(), + e); + } + } + } + + @Override + public void close() { + super.close(); + scalarFunction.beforeDestroy(); + } + + @Override + protected void checkType() { + // do nothing + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSTest.java index 472393b7ba3e..9a0417131205 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSTest.java @@ -38,6 +38,7 @@ import java.util.Optional; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction; @@ -58,7 +59,6 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.LESS_THAN; -import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; @Ignore // TODO public class TSBSTest { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 0455038ed149..c86900e5f992 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -179,12 +179,23 @@ private IoTDBConstant() {} public static final String COLUMN_SCHEMA_TEMPLATE = "template name"; + // for tree model public static final String FUNCTION_TYPE_NATIVE = "native"; public static final String FUNCTION_TYPE_BUILTIN_SCALAR = "built-in scalar"; public static final String FUNCTION_TYPE_BUILTIN_UDAF = "built-in UDAF"; public static final String FUNCTION_TYPE_BUILTIN_UDTF = "built-in UDTF"; public static final String FUNCTION_TYPE_EXTERNAL_UDAF = "external UDAF"; public static final String FUNCTION_TYPE_EXTERNAL_UDTF = "external UDTF"; + // for table model + public static final String FUNCTION_TYPE_BUILTIN_SCALAR_FUNC = "built-in scalar function"; + public static final String FUNCTION_TYPE_BUILTIN_AGG_FUNC = "built-in aggregate function"; + public static final String FUNCTION_TYPE_BUILTIN_TABLE_FUNC = "built-in table function"; + public static final String FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC = + "user-defined scalar function"; + public static final String FUNCTION_TYPE_USER_DEFINED_AGG_FUNC = + "user-defined aggregate function"; + public static final String FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC = "user-defined table function"; + // common public static final String FUNCTION_TYPE_UNKNOWN = "UNKNOWN"; public static final String FUNCTION_STATE_AVAILABLE = "AVAILABLE"; public static final String FUNCTION_STATE_UNAVAILABLE = "UNAVAILABLE"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java index 0b03fd01db8b..57203b574b76 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java @@ -19,8 +19,6 @@ package org.apache.iotdb.commons.udf; -import org.apache.iotdb.common.rpc.thrift.Model; - import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -46,8 +44,7 @@ private UDFInformation() {} public UDFInformation( String functionName, String className, - Model model, - boolean available, + UDFType udfType, boolean isUsingURI, String jarName, String jarMD5) { @@ -56,13 +53,7 @@ public UDFInformation( this.isUsingURI = isUsingURI; this.jarName = jarName; this.jarMD5 = jarMD5; - if (Model.TREE.equals(model)) { - this.udfType = available ? UDFType.TREE_AVAILABLE : UDFType.TREE_UNAVAILABLE; - } else if (Model.TABLE.equals(model)) { - this.udfType = available ? UDFType.TABLE_AVAILABLE : UDFType.TABLE_UNAVAILABLE; - } else { - throw new IllegalArgumentException("Unknown UDF type: " + model); - } + this.udfType = udfType; } // Only used for built-in UDF @@ -122,11 +113,7 @@ public void setUsingURI(boolean usingURI) { } public void setAvailable(boolean available) { - if (this.udfType.isTreeModel()) { - this.udfType = available ? UDFType.TREE_AVAILABLE : UDFType.TREE_UNAVAILABLE; - } else { - this.udfType = available ? UDFType.TABLE_AVAILABLE : UDFType.TABLE_UNAVAILABLE; - } + this.udfType = this.udfType.setAvailable(available); } public boolean isAvailable() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java index 979866519b2f..1340357c2eda 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java @@ -19,6 +19,9 @@ package org.apache.iotdb.commons.udf; +import org.apache.iotdb.common.rpc.thrift.FunctionType; +import org.apache.iotdb.common.rpc.thrift.Model; + import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -27,45 +30,132 @@ /** UDFType is an enum class that represents the type of UDF. */ public enum UDFType { - TREE_AVAILABLE((byte) 0), - /** - * TREE_BUILT_IN will not appear in the snapshot file or raft log. It is just a placeholder for - * some unforeseen circumstances. - */ - TREE_BUILT_IN((byte) 1), - TREE_UNAVAILABLE((byte) 2), - TABLE_AVAILABLE((byte) 3), - TABLE_UNAVAILABLE((byte) 4); + TREE_AVAILABLE(Model.TREE, FunctionType.NONE, Available.AVAILABLE), + TREE_UNAVAILABLE(Model.TREE, FunctionType.NONE, Available.UNAVAILABLE), + + TABLE_AVAILABLE_SCALAR(Model.TABLE, FunctionType.SCALAR, Available.AVAILABLE), + TABLE_AVAILABLE_AGGREGATE(Model.TABLE, FunctionType.AGGREGATE, Available.AVAILABLE), + TABLE_AVAILABLE_TABLE(Model.TABLE, FunctionType.TABLE, Available.AVAILABLE), + TABLE_UNAVAILABLE_SCALAR(Model.TABLE, FunctionType.SCALAR, Available.UNAVAILABLE), + TABLE_UNAVAILABLE_AGGREGATE(Model.TABLE, FunctionType.AGGREGATE, Available.UNAVAILABLE), + TABLE_UNAVAILABLE_TABLE(Model.TABLE, FunctionType.TABLE, Available.UNAVAILABLE); + + private static final int MODEL_SHIFT = 7; + private static final int TYPE_SHIFT = 2; + private static final int AVAILABLE_SHIFT = 1; + + private static final byte MODEL_MASK = (byte) (0b10000000); + private static final byte TYPE_MASK = (byte) 0b01111100; + private static final byte AVAILABLE_MASK = (byte) 0b00000010; + + private final Model model; - private final byte type; + /** For Tree Model: none. For Table Model: scalar, aggregate, table. */ + private final FunctionType type; - UDFType(byte type) { + private final Available available; + + UDFType(Model model, FunctionType type, Available available) { + this.model = model; this.type = type; + this.available = available; } + public static UDFType of(Model model, FunctionType type, boolean available) { + if (model == Model.TREE) { + return available ? TREE_AVAILABLE : TREE_UNAVAILABLE; + } else { + switch (type) { + case SCALAR: + return available ? TABLE_AVAILABLE_SCALAR : TABLE_UNAVAILABLE_SCALAR; + case AGGREGATE: + return available ? TABLE_AVAILABLE_AGGREGATE : TABLE_UNAVAILABLE_AGGREGATE; + case TABLE: + return available ? TABLE_AVAILABLE_TABLE : TABLE_UNAVAILABLE_TABLE; + default: + throw new IllegalArgumentException("Unknown FunctionType: " + type); + } + } + } + + /** + * |Model(0) | FunctionType(1-5)| Available(6) | placeholder(7) | + * + *

+ */ public void serialize(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(type, stream); + byte value = 0; + value |= (byte) (model.getValue() << MODEL_SHIFT); + value |= (byte) (type.getValue() << TYPE_SHIFT); + value |= (byte) (available.getValue() << AVAILABLE_SHIFT); + stream.writeByte(value); } public static UDFType deserialize(ByteBuffer buffer) { - byte type = ReadWriteIOUtils.readByte(buffer); - for (UDFType udfType : UDFType.values()) { - if (udfType.type == type) { + byte readByte = ReadWriteIOUtils.readByte(buffer); + for (UDFType udfType : values()) { + if ((byte) (udfType.model.getValue() << MODEL_SHIFT) == (readByte & MODEL_MASK) + && (byte) (udfType.type.getValue() << TYPE_SHIFT) == (readByte & TYPE_MASK) + && (byte) (udfType.available.getValue() << AVAILABLE_SHIFT) + == (readByte & AVAILABLE_MASK)) { return udfType; } } - throw new IllegalArgumentException("Unknown UDFType: " + type); + throw new IllegalArgumentException( + "Unknown UDFType:" + + String.format("%8s", Integer.toBinaryString(readByte & 0xFF)).replace(' ', '0')); } public boolean isTreeModel() { - return this == TREE_AVAILABLE || this == TREE_BUILT_IN || this == TREE_UNAVAILABLE; + return this.model == Model.TREE; } public boolean isTableModel() { - return this == TABLE_AVAILABLE || this == TABLE_UNAVAILABLE; + return this.model == Model.TABLE; } public boolean isAvailable() { - return this == TREE_AVAILABLE || this == TREE_BUILT_IN || this == TABLE_AVAILABLE; + return this.available == Available.AVAILABLE; + } + + public FunctionType getType() { + return type; + } + + public UDFType setAvailable(boolean available) { + if (this.isTreeModel()) { + return available ? TREE_AVAILABLE : TREE_UNAVAILABLE; + } else { + switch (this.type) { + case SCALAR: + return available ? TABLE_AVAILABLE_SCALAR : TABLE_UNAVAILABLE_SCALAR; + case AGGREGATE: + return available ? TABLE_AVAILABLE_AGGREGATE : TABLE_UNAVAILABLE_AGGREGATE; + case TABLE: + return available ? TABLE_AVAILABLE_TABLE : TABLE_UNAVAILABLE_TABLE; + default: + throw new IllegalArgumentException("Unknown FunctionType: " + type); + } + } + } + + private enum Available { + AVAILABLE((byte) 0), + UNAVAILABLE((byte) 1); + + private final byte value; + + Available(byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java new file mode 100644 index 000000000000..1155e865bb56 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.access; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Binary; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.utils.DateUtils; + +import java.time.LocalDate; +import java.util.Iterator; +import java.util.List; + +public class RecordIterator implements Iterator { + + private final List childrenColumns; + private final List dataTypes; + private final int positionCount; + private int currentIndex; + + public RecordIterator( + List childrenColumns, + List dataTypes, + int positionCount) { + this.childrenColumns = childrenColumns; + this.dataTypes = dataTypes; + this.positionCount = positionCount; + if (childrenColumns.size() != dataTypes.size()) { + throw new IllegalArgumentException( + "The size of childrenColumns and dataTypes should be the same."); + } + } + + @Override + public boolean hasNext() { + return currentIndex < positionCount; + } + + @Override + public Record next() { + final int index = currentIndex++; + return new Record() { + @Override + public int getInt(int columnIndex) { + return childrenColumns.get(columnIndex).getInt(index); + } + + @Override + public long getLong(int columnIndex) { + return childrenColumns.get(columnIndex).getLong(index); + } + + @Override + public float getFloat(int columnIndex) { + return childrenColumns.get(columnIndex).getFloat(index); + } + + @Override + public double getDouble(int columnIndex) { + return childrenColumns.get(columnIndex).getDouble(index); + } + + @Override + public boolean getBoolean(int columnIndex) { + return childrenColumns.get(columnIndex).getBoolean(index); + } + + @Override + public Binary getBinary(int columnIndex) { + return new Binary(childrenColumns.get(columnIndex).getBinary(index).getValues()); + } + + @Override + public String getString(int columnIndex) { + return childrenColumns.get(columnIndex).getBinary(index).toString(); + } + + @Override + public LocalDate getLocalDate(int columnIndex) { + return DateUtils.parseIntToLocalDate(childrenColumns.get(columnIndex).getInt(index)); + } + + @Override + public Type getDataType(int columnIndex) { + return UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex)); + } + + @Override + public boolean isNull(int columnIndex) { + return childrenColumns.get(columnIndex).isNull(index); + } + + @Override + public int size() { + return childrenColumns.size(); + } + }; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java similarity index 91% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java index b070d78b9772..49db27110c7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.relational.metadata; +package org.apache.iotdb.commons.udf.builtin.relational; import org.apache.iotdb.common.rpc.thrift.TAggregationType; @@ -66,14 +66,14 @@ public String getFunctionName() { return functionName; } - private static final Set NATIVE_FUNCTION_NAMES = + private static final Set BUILT_IN_AGGREGATE_FUNCTION_NAME = new HashSet<>( Arrays.stream(TableBuiltinAggregationFunction.values()) .map(TableBuiltinAggregationFunction::getFunctionName) .collect(Collectors.toList())); - public static Set getNativeFunctionNames() { - return NATIVE_FUNCTION_NAMES; + public static Set getBuiltInAggregateFunctionName() { + return BUILT_IN_AGGREGATE_FUNCTION_NAME; } public static Type getIntermediateType(String name, List originalArgumentTypes) { @@ -108,7 +108,7 @@ public static Type getIntermediateType(String name, List originalArgumentT } public static TAggregationType getAggregationTypeByFuncName(String funcName) { - if (NATIVE_FUNCTION_NAMES.contains(funcName)) { + if (BUILT_IN_AGGREGATE_FUNCTION_NAME.contains(funcName)) { return TAggregationType.valueOf(funcName.toUpperCase()); } else { // fallback to UDAF if no enum found diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TableBuiltinScalarFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinScalarFunction.java similarity index 76% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TableBuiltinScalarFunction.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinScalarFunction.java index c58182a3f27e..7b3282bdebed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TableBuiltinScalarFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinScalarFunction.java @@ -17,7 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; +package org.apache.iotdb.commons.udf.builtin.relational; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; public enum TableBuiltinScalarFunction { DIFF("diff"), @@ -70,4 +75,14 @@ public enum TableBuiltinScalarFunction { public String getFunctionName() { return functionName; } + + private static final Set BUILT_IN_SCALAR_FUNCTION_NAME = + new HashSet<>( + Arrays.stream(TableBuiltinScalarFunction.values()) + .map(TableBuiltinScalarFunction::getFunctionName) + .collect(Collectors.toList())); + + public static Set getBuiltInScalarFunctionName() { + return BUILT_IN_SCALAR_FUNCTION_NAME; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java index 74a0d5e0e7bc..fe62cb4d291d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java @@ -26,8 +26,11 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.udf.api.UDF; +import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.exception.UDFManagementException; import org.apache.iotdb.udf.api.relational.SQLFunction; @@ -58,7 +61,7 @@ private UDFManagementService() { new UDFInformation( functionName.toUpperCase(), builtinTimeSeriesGeneratingFunction.getClassName(), - UDFType.TREE_BUILT_IN)); + UDFType.TREE_AVAILABLE)); udfTable.addFunctionAndClass( Model.TREE, functionName, builtinTimeSeriesGeneratingFunction.getFunctionClass()); } @@ -114,8 +117,10 @@ public boolean checkIsBuiltInFunctionName(Model model, String functionName) .contains(functionName.toUpperCase()) || BuiltinScalarFunction.getNativeFunctionNames().contains(functionName.toLowerCase()); } else { - // TODO: Table model UDF - return false; + return TableBuiltinScalarFunction.getBuiltInScalarFunctionName() + .contains(functionName.toLowerCase()) + || TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName() + .contains(functionName.toLowerCase()); } } @@ -233,15 +238,12 @@ public T reflect(String functionName, Class clazz) { "Failed to reflect UDF instance, because UDF %s has not been registered.", functionName.toUpperCase()); LOGGER.warn(errorMessage); - throw new RuntimeException(errorMessage); + throw new UDFException(errorMessage); } try { return clazz.cast( - udfTable - .getFunctionClass(Model.TREE, functionName) - .getDeclaredConstructor() - .newInstance()); + udfTable.getFunctionClass(model, functionName).getDeclaredConstructor().newInstance()); } catch (InstantiationException | InvocationTargetException | NoSuchMethodException @@ -251,7 +253,7 @@ public T reflect(String functionName, Class clazz) { "Failed to reflect UDF %s(%s) instance, because %s", functionName, information.getClassName(), e); LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); + throw new UDFException(errorMessage); } } @@ -259,6 +261,10 @@ public UDFInformation[] getUDFInformation(Model model) { return udfTable.getUDFInformationList(model).toArray(new UDFInformation[0]); } + public UDFInformation getUDFInformation(Model model, String functionName) { + return udfTable.getUDFInformation(model, functionName); + } + @TestOnly public void deregisterAll() throws UDFManagementException { for (UDFInformation information : getUDFInformation(Model.TREE)) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java new file mode 100644 index 000000000000..97548ec105b5 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.utils; + +import org.apache.iotdb.common.rpc.thrift.FunctionType; +import org.apache.iotdb.common.rpc.thrift.Model; +import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.AggregateFunction; +import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.TableFunction; + +public class TableUDFUtils { + + private TableUDFUtils() { + // private constructor + } + + public static boolean isScalarFunction(String functionName) { + UDFInformation information = + UDFManagementService.getInstance().getUDFInformation(Model.TABLE, functionName); + return FunctionType.SCALAR.equals(information.getUdfType().getType()); + } + + public static boolean isAggregateFunction(String functionName) { + UDFInformation information = + UDFManagementService.getInstance().getUDFInformation(Model.TABLE, functionName); + return FunctionType.AGGREGATE.equals(information.getUdfType().getType()); + } + + public static boolean isTableFunction(String functionName) { + UDFInformation information = + UDFManagementService.getInstance().getUDFInformation(Model.TABLE, functionName); + return FunctionType.TABLE.equals(information.getUdfType().getType()); + } + + public static ScalarFunction getScalarFunction(String functionName) throws UDFException { + return UDFManagementService.getInstance().reflect(functionName, ScalarFunction.class); + } + + public static AggregateFunction getAggregateFunction(String functionName) throws UDFException { + return UDFManagementService.getInstance().reflect(functionName, AggregateFunction.class); + } + + public static TableFunction getTableFunction(String functionName) throws UDFException { + return UDFManagementService.getInstance().reflect(functionName, TableFunction.class); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java index eb5878433a2e..1cae52ea5b9b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java @@ -21,6 +21,12 @@ import org.apache.iotdb.udf.api.type.Type; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.type.BinaryType; +import org.apache.tsfile.read.common.type.BooleanType; +import org.apache.tsfile.read.common.type.DoubleType; +import org.apache.tsfile.read.common.type.FloatType; +import org.apache.tsfile.read.common.type.IntType; +import org.apache.tsfile.read.common.type.LongType; import java.util.List; import java.util.stream.Collectors; @@ -46,6 +52,93 @@ public static List transformToUDFDataTypeList(List tsDataTypeL .collect(Collectors.toList()); } + public static TSDataType transformReadTypeToTSDataType( + org.apache.tsfile.read.common.type.Type type) { + if (type == null) { + return null; + } + switch (type.getTypeEnum()) { + case BOOLEAN: + return TSDataType.BOOLEAN; + case INT32: + return TSDataType.INT32; + case INT64: + return TSDataType.INT64; + case FLOAT: + return TSDataType.FLOAT; + case DOUBLE: + return TSDataType.DOUBLE; + case TEXT: + return TSDataType.TEXT; + case TIMESTAMP: + return TSDataType.TIMESTAMP; + case DATE: + return TSDataType.DATE; + case BLOB: + return TSDataType.BLOB; + case STRING: + return TSDataType.STRING; + default: + throw new IllegalArgumentException("Invalid input: " + type); + } + } + + public static Type transformReadTypeToUDFDataType(org.apache.tsfile.read.common.type.Type type) { + if (type == null) { + return null; + } + switch (type.getTypeEnum()) { + case BOOLEAN: + return Type.BOOLEAN; + case INT32: + return Type.INT32; + case INT64: + return Type.INT64; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case TEXT: + return Type.TEXT; + case TIMESTAMP: + return Type.TIMESTAMP; + case DATE: + return Type.DATE; + case BLOB: + return Type.BLOB; + case STRING: + return Type.STRING; + default: + throw new IllegalArgumentException("Invalid input: " + type); + } + } + + public static org.apache.tsfile.read.common.type.Type transformUDFDataTypeToReadType(Type type) { + if (type == null) { + return null; + } + switch (type) { + case BOOLEAN: + return BooleanType.BOOLEAN; + case INT32: + case DATE: + return IntType.INT32; + case INT64: + case TIMESTAMP: + return LongType.INT64; + case FLOAT: + return FloatType.FLOAT; + case DOUBLE: + return DoubleType.DOUBLE; + case TEXT: + case BLOB: + case STRING: + return BinaryType.TEXT; + default: + throw new IllegalArgumentException("Invalid input: " + type); + } + } + private static Type getUDFDataType(byte type) { switch (type) { case 0: diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/UDFTypeTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/UDFTypeTest.java new file mode 100644 index 000000000000..a9f3acb97d01 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/UDFTypeTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf; + +import org.apache.iotdb.common.rpc.thrift.FunctionType; +import org.apache.iotdb.common.rpc.thrift.Model; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +public class UDFTypeTest { + + @Test + public void testSerializationAndDeserialization() throws IOException { + // Testing serialization and deserialization for each UDFType + for (UDFType udfType : UDFType.values()) { + // Serialize the UDFType into a byte array + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + udfType.serialize(dataOutputStream); + + // Convert the byte array into a ByteBuffer for deserialization + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); + UDFType deserializedUdfType = UDFType.deserialize(byteBuffer); + + // Assert that the deserialized UDFType matches the original + Assert.assertEquals(udfType, deserializedUdfType); + } + } + + @Test + public void testPublicInterfaces() { + List tree = + Arrays.asList( + UDFType.of(Model.TREE, FunctionType.NONE, true), + UDFType.of(Model.TREE, FunctionType.NONE, false)); + List table = + Arrays.asList( + UDFType.of(Model.TABLE, FunctionType.SCALAR, true), + UDFType.of(Model.TABLE, FunctionType.SCALAR, false), + UDFType.of(Model.TABLE, FunctionType.AGGREGATE, true), + UDFType.of(Model.TABLE, FunctionType.AGGREGATE, false), + UDFType.of(Model.TABLE, FunctionType.TABLE, true), + UDFType.of(Model.TABLE, FunctionType.TABLE, false)); + // Testing public methods for all UDFType values + for (UDFType udfType : tree) { + Assert.assertTrue(udfType.isTreeModel()); + Assert.assertFalse(udfType.isTableModel()); + Assert.assertEquals(FunctionType.NONE, udfType.getType()); + } + for (UDFType udfType : table) { + Assert.assertFalse(udfType.isTreeModel()); + Assert.assertTrue(udfType.isTableModel()); + Assert.assertNotEquals(FunctionType.NONE, udfType.getType()); + } + } +} diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index 6494922890ec..c46c4c0a65a0 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -304,6 +304,13 @@ enum TrainingState { } enum Model{ - TREE, - TABLE + TREE=0, + TABLE=1 +} + +enum FunctionType{ + NONE=0, + SCALAR=1, + AGGREGATE=2, + TABLE=3 } \ No newline at end of file diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index dc6479554432..fdcd323292a4 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -444,6 +444,7 @@ struct TCreateFunctionReq { 5: optional binary jarFile 6: optional string jarMD5 7: optional common.Model model + 8: optional common.FunctionType functionType } struct TDropFunctionReq { diff --git a/pom.xml b/pom.xml index a0eeb5cacb13..ad0eaea6bcfd 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-241128-SNAPSHOT + 1.2.0-241129-SNAPSHOT