From 33c64e8f85261a699bc8e5e73bdad42784345897 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Wed, 7 Aug 2024 15:35:42 +0800 Subject: [PATCH] [catalog] Introduce catalog managed (#3936) * [catalog] Introduce catalog managed * [feature] Add the catalog type parameter specific definition. * spotless * revert vcs.xml * fixed sonar cloud * [impove] fixed sonar cloud issue * [feature][catalog] add test from check catalogtype To db --- .../streampark/common/enums/CatalogType.java | 26 +++ .../console/base/util/JacksonUtils.java | 11 ++ .../console/core/bean/FlinkCatalogParams.java | 152 ++++++++++++++++++ .../core/controller/CatalogController.java | 81 ++++++++++ .../console/core/entity/FlinkCatalog.java | 99 ++++++++++++ .../console/core/mapper/CatalogMapper.java | 34 ++++ .../console/core/service/CatalogService.java | 58 +++++++ .../core/service/impl/CatalogServiceImpl.java | 123 ++++++++++++++ .../src/main/resources/db/schema-h2.sql | 15 ++ .../resources/mapper/core/CatalogMapper.xml | 53 ++++++ .../core/service/CatalogServiceTest.java | 119 ++++++++++++++ 11 files changed, 771 insertions(+) create mode 100644 streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java create mode 100644 streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java new file mode 100644 index 0000000000..fb2caab555 --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.enums; + +/** catalog type */ +public enum CatalogType { + JDBC, + HIVE, + PAIMON, + CUSTOM +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java index c712c6eddb..ee136c1b19 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.module.scala.DefaultScalaModule; +import java.io.IOException; import java.text.SimpleDateFormat; /** Serialization utils */ @@ -56,4 +58,13 @@ public static T read(String json, TypeReference typeReference) throws Jso public static String write(Object object) throws JsonProcessingException { return MAPPER.writeValueAsString(object); } + + public static boolean isValidJson(String jsonStr) { + try { + JsonNode jsonNode = MAPPER.readTree(jsonStr); + return true; + } catch (IOException e) { + return false; + } + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java new file mode 100644 index 0000000000..46a8210d80 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import org.apache.streampark.common.enums.CatalogType; +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.core.entity.FlinkCatalog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.jackson.annotate.JsonProperty; +import org.springframework.beans.BeanUtils; + +import javax.validation.constraints.NotBlank; + +import java.io.Serializable; +import java.util.Date; + +@Data +@Slf4j +public class FlinkCatalogParams implements Serializable { + + private Long id; + + private Long teamId; + + private String catalogName; + + private CatalogType catalogType; + + /** creator */ + private Long userId; + + private Date createTime; + + private Date updateTime; + + private FlinkJDBCCatalog flinkJDBCCatalog; + + private FlinkHiveCatalog flinkHiveCatalog; + + private FlinkPaimonCatalog flinkPaimonCatalog; + + private String customCatalogConfig; + + public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) { + if (flinkCatalog == null) { + return null; + } + FlinkCatalogParams flinkCatalogParams = new FlinkCatalogParams(); + BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams, "configuration"); + try { + switch (flinkCatalog.getCatalogType()) { + case JDBC: + flinkCatalogParams.setFlinkJDBCCatalog( + JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkJDBCCatalog.class)); + break; + case HIVE: + flinkCatalogParams.setFlinkHiveCatalog( + JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkHiveCatalog.class)); + break; + case PAIMON: + flinkCatalogParams.setFlinkPaimonCatalog( + JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkPaimonCatalog.class)); + break; + case CUSTOM: + flinkCatalogParams.setCustomCatalogConfig(flinkCatalog.getConfiguration()); + break; + } + } catch (JsonProcessingException e) { + log.error("Flink catalog params json read failed", e); + throw new RuntimeException("Flink catalog params json read failed"); + } + + return flinkCatalogParams; + } + + @Data + public static class FlinkJDBCCatalog implements Serializable { + + @NotBlank + private String type; + + @NotBlank + @JsonProperty("default-database") + private String defaultDatabase; + + @NotBlank + private String username; + @NotBlank + private String password; + + @NotBlank + @JsonProperty("base-url") + private String baseUrl; + } + + @Data + public static class FlinkHiveCatalog implements Serializable { + + @NotBlank + private String type; + @NotBlank + private String name; + + @JsonProperty("hive-conf-dir") + private String hiveConfDir; + + @JsonProperty("default-database") + private String defaultDatabase; + + @JsonProperty("hive-version") + private String hiveVersion; + + @JsonProperty("hadoop-conf-dir") + private String hadoopConfDir; + } + + @Data + public static class FlinkPaimonCatalog implements Serializable { + + @NotBlank + private String type; + @NotBlank + private String warehouse; + @NotBlank + private String metastore; // hive filesystem + private String uri; + + @JsonProperty("hive-conf-dir") + private String hiveConfDir; + + @JsonProperty("hadoop-conf-dir") + private String hadoopConfDir; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java new file mode 100644 index 0000000000..78847211f6 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.core.annotation.Permission; +import org.apache.streampark.console.core.bean.FlinkCatalogParams; +import org.apache.streampark.console.core.service.CatalogService; +import org.apache.streampark.console.core.util.ServiceHelper; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; + +@Slf4j +@Validated +@RestController +@RequestMapping("flink/catalog") +public class CatalogController { + + @Autowired + CatalogService catalogService; + + @Permission(team = "#catalog.teamId") + @PostMapping("create") + @RequiresPermissions("catalog:create") + public RestResponse create(FlinkCatalogParams catalog) throws IOException { + Long userId = ServiceHelper.getUserId(); + boolean saved = catalogService.create(catalog, userId); + return RestResponse.success(saved); + } + + @PostMapping("list") + @Permission(team = "#app.teamId") + @RequiresPermissions("catalog:view") + public RestResponse list(FlinkCatalogParams catalog, RestRequest request) { + IPage catalogList = catalogService.page(catalog, request); + return RestResponse.success(catalogList); + } + + @PostMapping("delete") + @Permission(team = "#app.teamId") + @RequiresPermissions("catalog:delete") + public RestResponse remove(FlinkCatalogParams catalog, RestRequest request) { + boolean deleted = catalogService.remove(catalog.getId()); + return RestResponse.success(deleted); + } + + @PostMapping("update") + @Permission(team = "#app.teamId") + @RequiresPermissions("catalog:update") + public RestResponse remove(FlinkCatalogParams catalog) { + Long userId = ServiceHelper.getUserId(); + boolean updated = catalogService.update(catalog, userId); + return RestResponse.success(updated); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java new file mode 100644 index 0000000000..b8de5a2e55 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import org.apache.streampark.common.enums.CatalogType; +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.core.bean.FlinkCatalogParams; + +import com.baomidou.mybatisplus.annotation.FieldStrategy; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; + +import java.io.Serializable; +import java.util.Date; + +/** catalog store */ +@Data +@TableName("t_flink_catalog") +@Slf4j +public class FlinkCatalog implements Serializable { + + @TableId(type = IdType.AUTO) + private Long id; + + private Long teamId; + + private String catalogName; + + private CatalogType catalogType; + + private String configuration; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Date createTime; + + private Date updateTime; + + /** creator */ + private Long userId; + + public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) { + if (flinkCatalogParams == null) + return null; + FlinkCatalog flinkCatalog = new FlinkCatalog(); + + BeanUtils.copyProperties( + flinkCatalogParams, + flinkCatalog, + "flinkJDBCCatalog", + "flinkHiveCatalog", + "flinkPaimonCatalog", + "customCatalogConfig"); + + try { + switch (flinkCatalogParams.getCatalogType()) { + case JDBC: + flinkCatalog.setConfiguration( + JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog())); + break; + case HIVE: + flinkCatalog.setConfiguration( + JacksonUtils.write(flinkCatalogParams.getFlinkHiveCatalog())); + break; + case PAIMON: + flinkCatalog.setConfiguration( + JacksonUtils.write(flinkCatalogParams.getFlinkPaimonCatalog())); + break; + case CUSTOM: + flinkCatalog.setConfiguration(flinkCatalogParams.getCustomCatalogConfig()); + break; + } + } catch (JsonProcessingException e) { + log.error("Flink catalog json read failed", e); + throw new RuntimeException("Flink catalog json read failed"); + } + return flinkCatalog; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java new file mode 100644 index 0000000000..7053bee5bc --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.FlinkCatalog; + +import org.apache.ibatis.annotations.Param; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** catalog mapper */ +public interface CatalogMapper extends BaseMapper { + + boolean existsByCatalogName(@Param("catalogName") String catalogName); + + IPage selectPage(Page page, @Param("catalog") FlinkCatalog catalog); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java new file mode 100644 index 0000000000..c19d467b41 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.core.bean.FlinkCatalogParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.service.IService; + +/** This interface is use to managed catalog */ +public interface CatalogService extends IService { + + /** + * Create Catalog + * + * @param catalog The {@link FlinkCatalogParams} object containing the search criteria. + */ + boolean create(FlinkCatalogParams catalog, Long userId); + + /** + * Remove Catalog + * + * @param id The {@link FlinkCatalogParams} object containing the search criteria. + */ + boolean remove(Long id); + + /** + * Retrieves a page of applications based on the provided parameters. Params: catalog – The + * Catalog object to be used for filtering the results. request – The REST request object + * containing additional parameters or headers. Returns: A page of Catalog objects based on the + * provided parameters. + */ + IPage page(FlinkCatalogParams catalog, RestRequest request); + + /** + * update Catalog + * + * @param catalog The {@link FlinkCatalogParams} object containing the search criteria. + */ + boolean update(FlinkCatalogParams catalog, long userId); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java new file mode 100644 index 0000000000..199220fb06 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.exception.AlertException; +import org.apache.streampark.console.base.exception.ApiAlertException; +import org.apache.streampark.console.base.mybatis.pager.MybatisPager; +import org.apache.streampark.console.core.bean.FlinkCatalogParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; +import org.apache.streampark.console.core.mapper.CatalogMapper; +import org.apache.streampark.console.core.service.CatalogService; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.regex.Pattern; + +/** catalog manage */ +@Service +@Slf4j +@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) +public class CatalogServiceImpl extends ServiceImpl + implements + CatalogService { + + private static final String CATALOG_REGEX = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?$"; + + @Override + public boolean create(FlinkCatalogParams catalog, Long userId) { + AlertException.throwIfNull( + catalog.getTeamId(), "The teamId can't be null. Create catalog failed."); + AlertException.throwIfFalse( + validateCatalogName(catalog.getCatalogName()), + "Catalog Name only lowercase letters, numbers, and -,.. Symbol composition, cannot end with a symbol."); + AlertException.throwIfTrue( + existsByCatalogName(catalog.getCatalogName()), "Catalog name already exists."); + FlinkCatalog flinkCatalog = FlinkCatalog.of(catalog); + Date date = new Date(); + flinkCatalog.setCreateTime(date); + flinkCatalog.setUpdateTime(date); + return this.save(flinkCatalog); + } + + @Override + public boolean remove(Long id) { + FlinkCatalog catalog = getById(id); + ApiAlertException.throwIfNull(catalog, "Catalog not exist, please check."); + return this.removeById(id); + } + + @Override + public IPage page(FlinkCatalogParams catalog, RestRequest request) { + AlertException.throwIfNull( + catalog.getTeamId(), "The teamId can't be null. List catalog failed."); + + Page page = MybatisPager.getPage(request); + this.baseMapper.selectPage(page, FlinkCatalog.of(catalog)); + Page paramsPage = new Page<>(); + BeanUtils.copyProperties(page, paramsPage, "records"); + List paramList = new ArrayList<>(); + page.getRecords() + .forEach( + record -> { + paramList.add(FlinkCatalogParams.of(record)); + }); + paramsPage.setRecords(paramList); + return paramsPage; + } + + @Override + public boolean update(FlinkCatalogParams catalogParam, long userId) { + AlertException.throwIfNull( + catalogParam.getTeamId(), "The teamId can't be null. List catalog failed."); + FlinkCatalog catalog = getById(catalogParam.getId()); + FlinkCatalog flinkCatalog = FlinkCatalog.of(catalogParam); + AlertException.throwIfFalse( + catalogParam.getCatalogName().equalsIgnoreCase(catalog.getCatalogName()), + "The catalog name cannot be modified."); + log.debug( + "Catalog {} has modify from {} to {}", + catalog.getCatalogName(), + catalog.getConfiguration(), + flinkCatalog.getConfiguration()); + catalog.setConfiguration(flinkCatalog.getConfiguration()); + catalog.setUpdateTime(new Date()); + catalog.setUserId(userId); + return this.updateById(catalog); + } + + public Boolean existsByCatalogName(String catalogName) { + return this.baseMapper.existsByCatalogName(catalogName); + } + + /** validate catalog name */ + private boolean validateCatalogName(String catalogName) { + return Pattern.compile(CATALOG_REGEX).matcher(catalogName).matches(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index e3fca631f5..037a4ab0e3 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -575,3 +575,18 @@ create table if not exists `t_spark_app` ( `hadoop_user` varchar(500) default null, primary key(`id`) ); + + -- ---------------------------- + -- Table structure for t_flink_app + -- ---------------------------- + create table if not exists t_flink_catalog ( + `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1), + `team_id` bigint not null, + `user_id` bigint default null, + `catalog_type` varchar(255) not NULL, + `catalog_name` VARCHAR(255) NOT NULL, + `configuration` text, + `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL, + `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL, + CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`) + ); diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml new file mode 100644 index 0000000000..9d1ffde45f --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + + + + + + + + diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java new file mode 100644 index 0000000000..23decd3bd7 --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.common.enums.CatalogType; +import org.apache.streampark.console.SpringUnitTestBase; +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.core.bean.FlinkCatalogParams; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.assertj.core.api.Assertions.assertThat; + +/** CatalogService Tests */ +public class CatalogServiceTest extends SpringUnitTestBase { + + @Autowired + private CatalogService catalogService; + + @AfterEach + void cleanTestRecordsInDatabase() { + catalogService.remove(new QueryWrapper<>()); + } + + @Test + @Order(1) + public void create() { + FlinkCatalogParams catalog = new FlinkCatalogParams(); + catalog.setTeamId(1L); + catalog.setCatalogType(CatalogType.JDBC); + catalog.setCatalogName("catalog-name"); + FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog = + new FlinkCatalogParams.FlinkJDBCCatalog(); + flinkJDBCCatalog.setType("jdbc"); + flinkJDBCCatalog.setDefaultDatabase("aa"); + flinkJDBCCatalog.setPassword("11"); + flinkJDBCCatalog.setUsername("user"); + flinkJDBCCatalog.setBaseUrl("url"); + catalog.setFlinkJDBCCatalog(flinkJDBCCatalog); + + boolean create = catalogService.create(catalog, 1L); + assertThat(create).isTrue(); + } + + @Test + @Order(2) + public void update() { + FlinkCatalogParams catalog = new FlinkCatalogParams(); + catalog.setTeamId(1L); + catalog.setCatalogType(CatalogType.JDBC); + catalog.setCatalogName("catalog-name"); + FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog = + new FlinkCatalogParams.FlinkJDBCCatalog(); + flinkJDBCCatalog.setType("jdbc"); + flinkJDBCCatalog.setDefaultDatabase("aa"); + flinkJDBCCatalog.setPassword("11"); + flinkJDBCCatalog.setUsername("user"); + flinkJDBCCatalog.setBaseUrl("url1"); + catalog.setFlinkJDBCCatalog(flinkJDBCCatalog); + RestRequest request = new RestRequest(); + catalogService.create(catalog, 1L); + + IPage catalogIPage = catalogService.page(catalog, request); + FlinkCatalogParams catalogs = catalogIPage.getRecords().get(0); + catalogs.getFlinkJDBCCatalog().setBaseUrl("url2"); + catalogService.update(catalogs, 2L); + + IPage catalogResult = catalogService.page(catalog, request); + + assertThat( + catalogResult.getRecords().get(0).getFlinkJDBCCatalog().getBaseUrl().contains("url2")) + .isTrue(); + assertThat(catalogResult.getRecords().get(0).getUserId().equals(2L)).isTrue(); + assertThat(catalogResult.getRecords().get(0).getCatalogType().equals(CatalogType.JDBC)) + .isTrue(); + } + + @Test + @Order(3) + public void remove() { + FlinkCatalogParams catalog = new FlinkCatalogParams(); + catalog.setTeamId(1L); + catalog.setCatalogType(CatalogType.JDBC); + catalog.setCatalogName("catalog-name"); + FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog = + new FlinkCatalogParams.FlinkJDBCCatalog(); + flinkJDBCCatalog.setType("jdbc"); + flinkJDBCCatalog.setDefaultDatabase("aa"); + flinkJDBCCatalog.setPassword("11"); + flinkJDBCCatalog.setUsername("user"); + flinkJDBCCatalog.setBaseUrl("url"); + catalog.setFlinkJDBCCatalog(flinkJDBCCatalog); + catalogService.create(catalog, 1L); + RestRequest request = new RestRequest(); + IPage catalogIPage = catalogService.page(catalog, request); + boolean deleted = catalogService.remove(catalogIPage.getRecords().get(0).getId()); + assertThat(deleted).isTrue(); + } +}