Skip to content

Commit

Permalink
[Feature][Transform-V2] add UUID Transform (#3)
Browse files Browse the repository at this point in the history
[Improve][Transform-V2] Fix UUID Transform for 2.3.2

[Improve][Transform-V2] Fix UUID Transform for 2.3.2 #modify import
  • Loading branch information
tedshim authored and 4chicat committed Aug 30, 2023
1 parent aafd41c commit 1233aa0
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public class ZetaSQLFunction {
public static final String COALESCE = "COALESCE";
public static final String IFNULL = "IFNULL";
public static final String NULLIF = "NULLIF";
public static final String UUID = "UUID";

private final SeaTunnelRowType inputRowType;
private final ZetaSQLType zetaSQLType;
Expand Down Expand Up @@ -406,6 +407,8 @@ public Object executeFunctionExpr(String functionName, List<Object> args) {
return SystemFunction.ifnull(args);
case NULLIF:
return SystemFunction.nullif(args);
case UUID:
return SystemFunction.uuid();
default:
for (ZetaUDF udf : udfList) {
if (udf.functionName().equalsIgnoreCase(functionName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ private SeaTunnelDataType<?> getFunctionType(Function function) {
case ZetaSQLFunction.DAYNAME:
case ZetaSQLFunction.MONTHNAME:
case ZetaSQLFunction.FORMATDATETIME:
case ZetaSQLFunction.UUID:
return BasicType.STRING_TYPE;
case ZetaSQLFunction.ASCII:
case ZetaSQLFunction.LOCATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.UUID;

public class SystemFunction {
public static Object coalesce(List<Object> args) {
Expand Down Expand Up @@ -122,4 +123,9 @@ public static Object castAs(List<Object> args) {
CommonErrorCode.UNSUPPORTED_OPERATION,
String.format("Unsupported CAST AS type: %s", v2));
}

public static String uuid() {
UUID uuid = UUID.randomUUID();
return uuid.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.uuid;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@Slf4j
@AutoService(SeaTunnelTransform.class)
@NoArgsConstructor
public class UUIDTransform extends SingleFieldOutputTransform {
public static final String PLUGIN_NAME = "UUID";

private UUIDTransformConfig uuidTransformConfig;
private String uuidField;

public UUIDTransform(
@NonNull UUIDTransformConfig uuidTransformConfig,
@NonNull CatalogTable catalogTable) {
super(catalogTable);
this.uuidTransformConfig = uuidTransformConfig;
uuidField = uuidTransformConfig.getUuidField();
if (uuidField.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find [" + uuidField + "] field");
}
this.outputCatalogTable = getProducedCatalogTable();
}

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
protected void setConfig(Config pluginConfig) {
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
.validate(new UUIDTransformFactory().optionRule());
this.uuidTransformConfig = UUIDTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
}

@Override
protected void setInputRowType(SeaTunnelRowType rowType) {}

@Override
protected String getOutputFieldName() {
return uuidTransformConfig.getUuidField();
}

@Override
protected SeaTunnelDataType getOutputFieldDataType() {
return BasicType.STRING_TYPE;
}

@Override
protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
return getUUID();
}

@Override
protected Column getOutputColumn() {
List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
List<Column> collect =
columns.stream()
.filter(
column ->
column.getName()
.equals(
uuidTransformConfig.getUuidField()))
.collect(Collectors.toList());
return collect.get(0).copy();
}

private Object getUUID() {
UUID uuid = UUID.randomUUID();
return uuid.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.seatunnel.transform.uuid;

import lombok.Getter;
import lombok.Setter;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import java.io.Serializable;

@Getter
@Setter
public class UUIDTransformConfig implements Serializable {

public static final Option<String> UUID_FIELD =
Options.key("uuid_field")
.stringType()
.noDefaultValue()
.withDescription("This is the field for UUID");

private String uuidField;

public static UUIDTransformConfig of(ReadonlyConfig config) {
UUIDTransformConfig uuidTransformConfig = new UUIDTransformConfig();
uuidTransformConfig.setUuidField(config.get(UUID_FIELD));
return uuidTransformConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.uuid;

import com.google.auto.service.AutoService;
import lombok.NonNull;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;

@AutoService(Factory.class)
public class UUIDTransformFactory implements TableTransformFactory {

@Override
public String factoryIdentifier() {
return "UUID";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(UUIDTransformConfig.UUID_FIELD).build();
}

@Override
public TableTransform createTransform(@NonNull TableFactoryContext context) {
UUIDTransformConfig uuidTransformConfig = UUIDTransformConfig.of(context.getOptions());
CatalogTable catalogTable = context.getCatalogTable();
return () -> new UUIDTransform(uuidTransformConfig, catalogTable);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.seatunnel.transform;

import org.apache.seatunnel.transform.uuid.UUIDTransformFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class UUIDTransformFactoryTest {

@Test
public void testOptionRule() throws Exception {
UUIDTransformFactory uuidTransformFactory = new UUIDTransformFactory();
Assertions.assertNotNull(uuidTransformFactory.optionRule());
}
}

0 comments on commit 1233aa0

Please sign in to comment.