diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FibonacciFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FibonacciFunction.java new file mode 100644 index 00000000000..59f4ce81096 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FibonacciFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * FibonacciFunction + * description: fibonacci(numeric)--returns the nth Fibonacci number + */ + +public class FibonacciFunction implements ValueParser { + + private final ValueParser numberParser; + + /** + * Constructor + * + * @param expr + */ + public FibonacciFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + int n = numberValue.intValue(); + return fibonacci(n); + } + + /** + * Calculate the nth Fibonacci number. + * + * @param n the position in the Fibonacci sequence + * @return the nth Fibonacci number + */ + private long fibonacci(int n) { + if (n <= 1) + return n; + long prev = 0, curr = 1; + for (int i = 2; i <= n; i++) { + long temp = curr; + curr = curr + prev; + prev = temp; + } + return curr; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 9982b374189..59b10bdb485 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -17,9 +17,31 @@ package org.apache.inlong.sdk.transform.process.operator; -import org.apache.inlong.sdk.transform.process.function.FunctionTools; +import org.apache.inlong.sdk.transform.process.function.AbsFunction; +import org.apache.inlong.sdk.transform.process.function.CeilFunction; +import org.apache.inlong.sdk.transform.process.function.ConcatFunction; +import org.apache.inlong.sdk.transform.process.function.CosFunction; +import org.apache.inlong.sdk.transform.process.function.DateFormatFunction; +import org.apache.inlong.sdk.transform.process.function.ExpFunction; +import org.apache.inlong.sdk.transform.process.function.FibonacciFunction; +import org.apache.inlong.sdk.transform.process.function.FloorFunction; +import org.apache.inlong.sdk.transform.process.function.FromUnixTimeFunction; +import org.apache.inlong.sdk.transform.process.function.LnFunction; +import org.apache.inlong.sdk.transform.process.function.LocateFunction; +import org.apache.inlong.sdk.transform.process.function.Log10Function; +import org.apache.inlong.sdk.transform.process.function.Log2Function; +import org.apache.inlong.sdk.transform.process.function.LogFunction; +import org.apache.inlong.sdk.transform.process.function.NowFunction; +import org.apache.inlong.sdk.transform.process.function.PowerFunction; +import org.apache.inlong.sdk.transform.process.function.RoundFunction; +import org.apache.inlong.sdk.transform.process.function.SinFunction; +import org.apache.inlong.sdk.transform.process.function.SinhFunction; +import org.apache.inlong.sdk.transform.process.function.SqrtFunction; +import org.apache.inlong.sdk.transform.process.function.SubstringFunction; +import org.apache.inlong.sdk.transform.process.function.ToDateFunction; +import org.apache.inlong.sdk.transform.process.function.ToTimestampFunction; +import org.apache.inlong.sdk.transform.process.function.UnixTimestampFunction; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; -import org.apache.inlong.sdk.transform.process.parser.ParserTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import com.google.common.collect.Maps; @@ -27,15 +49,12 @@ import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; import org.apache.commons.lang.ObjectUtils; -import org.reflections.Reflections; -import org.reflections.scanners.Scanners; -import java.lang.reflect.Constructor; import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; +import java.util.HashMap; import java.util.Map; -import java.util.Set; /** * OperatorTools @@ -52,55 +71,35 @@ public class OperatorTools { public static final String CHILD_KEY = "$child"; - static { - init(); - } - - private static void init() { - Reflections reflections = new Reflections(OPERATOR_PATH, Scanners.TypesAnnotated); - Set> clazzSet = reflections.getTypesAnnotatedWith(TransformOperator.class); - for (Class clazz : clazzSet) { - if (ExpressionOperator.class.isAssignableFrom(clazz)) { - TransformOperator annotation = clazz.getAnnotation(TransformOperator.class); - if (annotation == null) { - continue; - } - Class[] values = annotation.values(); - for (Class value : values) { - operatorMap.compute(value, (key, former) -> { - if (former != null) { - log.warn("find a conflict for parser class [{}], the former one is [{}], new one is [{}]", - key, former.getName(), clazz.getName()); - } - return clazz; - }); - } - } - } - } + private static final Map> functionMap = new HashMap<>(); - public static ExpressionOperator getTransformOperator(Expression expr) { - Class clazz = operatorMap.get(expr.getClass()); - if (clazz == null) { - return null; - } - try { - Constructor constructor = clazz.getDeclaredConstructor(expr.getClass()); - return (ExpressionOperator) constructor.newInstance(expr); - } catch (NoSuchMethodException e) { - log.error("transform operator {} needs one constructor that accept one params whose type is {}", - clazz.getName(), expr.getClass().getName(), e); - throw new RuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + static { + functionMap.put("concat", ConcatFunction::new); + functionMap.put("now", NowFunction::new); + functionMap.put("power", PowerFunction::new); + functionMap.put("abs", AbsFunction::new); + functionMap.put("sqrt", SqrtFunction::new); + functionMap.put("ln", LnFunction::new); + functionMap.put("log10", Log10Function::new); + functionMap.put("log2", Log2Function::new); + functionMap.put("log", LogFunction::new); + functionMap.put("exp", ExpFunction::new); + functionMap.put("substring", SubstringFunction::new); + functionMap.put("locate", LocateFunction::new); + functionMap.put("to_date", ToDateFunction::new); + functionMap.put("date_format", DateFormatFunction::new); + functionMap.put("ceil", CeilFunction::new); + functionMap.put("floor", FloorFunction::new); + functionMap.put("sin", SinFunction::new); + functionMap.put("sinh", SinhFunction::new); + functionMap.put("cos", CosFunction::new); + + functionMap.put("round", RoundFunction::new); + functionMap.put("from_unixtime", FromUnixTimeFunction::new); + functionMap.put("unix_timestamp", UnixTimestampFunction::new); + functionMap.put("to_timestamp", ToTimestampFunction::new); + functionMap.put("fibonacci", FibonacciFunction::new); - public static ExpressionOperator buildOperator(Expression expr) { - if (expr != null) { - return getTransformOperator(expr); - } - return null; } public static ValueParser buildParser(Expression expr) { @@ -109,14 +108,23 @@ public static ValueParser buildParser(Expression expr) { if (exprString.startsWith(ROOT_KEY) || exprString.startsWith(CHILD_KEY)) { return new ColumnParser((Function) expr); } else { - return FunctionTools.getTransformFunction((Function) expr); + // TODO + Function func = (Function) expr; + java.util.function.Function valueParserConstructor = functionMap + .get(func.getName().toLowerCase()); + if (valueParserConstructor != null) { + return valueParserConstructor.apply(func); + } else { + return new ColumnParser(func); + } } } - return ParserTools.getTransformParser(expr); + return null; } /** * parseBigDecimal + * * @param value * @return */ @@ -150,6 +158,7 @@ public static Timestamp parseTimestamp(Object value) { /** * compareValue + * * @param left * @param right * @return diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index b6254eb35d5..cc3c4423289 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -709,6 +709,35 @@ public void testLog10Function() throws Exception { Assert.assertEquals(output2.get(0), "result=3.0"); } + @Test + public void testFibonacciFunction() throws Exception { + String transformSql = "select fibonacci(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: fibonacci(0) + List output1 = processor.transform("0|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + + // case2: fibonacci(1) + List output2 = processor.transform("1|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + // case3: fibonacci(7) + List output3 = processor.transform("7|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=13"); + + // case4: fibonacci(10) + List output4 = processor.transform("10|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=55"); + } + @Test public void testLog2Function() throws Exception { String transformSql = "select log2(numeric1) from source"; diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/binlog/InLongBinlog.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/binlog/InLongBinlog.java index 487fc6483e4..c8efd07dd7e 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/binlog/InLongBinlog.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/binlog/InLongBinlog.java @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ // Generated by the protocol buffer compiler. DO NOT EDIT! // source: InLongBinlog.proto