Skip to content

Commit

Permalink
[catalog] Introduce catalog managed (#3936)
Browse files Browse the repository at this point in the history
* [catalog] Introduce catalog managed

* [feature] Add the catalog type parameter specific definition.

* spotless

* revert vcs.xml

* fixed sonar cloud

* [impove] fixed sonar cloud issue

* [feature][catalog] add test from check catalogtype To db
  • Loading branch information
Mrart authored Aug 7, 2024
1 parent 7706822 commit 33c64e8
Show file tree
Hide file tree
Showing 11 changed files with 771 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

/** catalog type */
public enum CatalogType {
JDBC,
HIVE,
PAIMON,
CUSTOM
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.module.scala.DefaultScalaModule;

import java.io.IOException;
import java.text.SimpleDateFormat;

/** Serialization utils */
Expand Down Expand Up @@ -56,4 +58,13 @@ public static <T> T read(String json, TypeReference<T> typeReference) throws Jso
public static String write(Object object) throws JsonProcessingException {
return MAPPER.writeValueAsString(object);
}

public static boolean isValidJson(String jsonStr) {
try {
JsonNode jsonNode = MAPPER.readTree(jsonStr);
return true;
} catch (IOException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import org.apache.streampark.common.enums.CatalogType;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.entity.FlinkCatalog;

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.annotate.JsonProperty;
import org.springframework.beans.BeanUtils;

import javax.validation.constraints.NotBlank;

import java.io.Serializable;
import java.util.Date;

@Data
@Slf4j
public class FlinkCatalogParams implements Serializable {

private Long id;

private Long teamId;

private String catalogName;

private CatalogType catalogType;

/** creator */
private Long userId;

private Date createTime;

private Date updateTime;

private FlinkJDBCCatalog flinkJDBCCatalog;

private FlinkHiveCatalog flinkHiveCatalog;

private FlinkPaimonCatalog flinkPaimonCatalog;

private String customCatalogConfig;

public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) {
if (flinkCatalog == null) {
return null;
}
FlinkCatalogParams flinkCatalogParams = new FlinkCatalogParams();
BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams, "configuration");
try {
switch (flinkCatalog.getCatalogType()) {
case JDBC:
flinkCatalogParams.setFlinkJDBCCatalog(
JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkJDBCCatalog.class));
break;
case HIVE:
flinkCatalogParams.setFlinkHiveCatalog(
JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkHiveCatalog.class));
break;
case PAIMON:
flinkCatalogParams.setFlinkPaimonCatalog(
JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkPaimonCatalog.class));
break;
case CUSTOM:
flinkCatalogParams.setCustomCatalogConfig(flinkCatalog.getConfiguration());
break;
}
} catch (JsonProcessingException e) {
log.error("Flink catalog params json read failed", e);
throw new RuntimeException("Flink catalog params json read failed");
}

return flinkCatalogParams;
}

@Data
public static class FlinkJDBCCatalog implements Serializable {

@NotBlank
private String type;

@NotBlank
@JsonProperty("default-database")
private String defaultDatabase;

@NotBlank
private String username;
@NotBlank
private String password;

@NotBlank
@JsonProperty("base-url")
private String baseUrl;
}

@Data
public static class FlinkHiveCatalog implements Serializable {

@NotBlank
private String type;
@NotBlank
private String name;

@JsonProperty("hive-conf-dir")
private String hiveConfDir;

@JsonProperty("default-database")
private String defaultDatabase;

@JsonProperty("hive-version")
private String hiveVersion;

@JsonProperty("hadoop-conf-dir")
private String hadoopConfDir;
}

@Data
public static class FlinkPaimonCatalog implements Serializable {

@NotBlank
private String type;
@NotBlank
private String warehouse;
@NotBlank
private String metastore; // hive filesystem
private String uri;

@JsonProperty("hive-conf-dir")
private String hiveConfDir;

@JsonProperty("hadoop-conf-dir")
private String hadoopConfDir;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.controller;

import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.bean.FlinkCatalogParams;
import org.apache.streampark.console.core.service.CatalogService;
import org.apache.streampark.console.core.util.ServiceHelper;

import org.apache.shiro.authz.annotation.RequiresPermissions;

import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

@Slf4j
@Validated
@RestController
@RequestMapping("flink/catalog")
public class CatalogController {

@Autowired
CatalogService catalogService;

@Permission(team = "#catalog.teamId")
@PostMapping("create")
@RequiresPermissions("catalog:create")
public RestResponse create(FlinkCatalogParams catalog) throws IOException {
Long userId = ServiceHelper.getUserId();
boolean saved = catalogService.create(catalog, userId);
return RestResponse.success(saved);
}

@PostMapping("list")
@Permission(team = "#app.teamId")
@RequiresPermissions("catalog:view")
public RestResponse list(FlinkCatalogParams catalog, RestRequest request) {
IPage<FlinkCatalogParams> catalogList = catalogService.page(catalog, request);
return RestResponse.success(catalogList);
}

@PostMapping("delete")
@Permission(team = "#app.teamId")
@RequiresPermissions("catalog:delete")
public RestResponse remove(FlinkCatalogParams catalog, RestRequest request) {
boolean deleted = catalogService.remove(catalog.getId());
return RestResponse.success(deleted);
}

@PostMapping("update")
@Permission(team = "#app.teamId")
@RequiresPermissions("catalog:update")
public RestResponse remove(FlinkCatalogParams catalog) {
Long userId = ServiceHelper.getUserId();
boolean updated = catalogService.update(catalog, userId);
return RestResponse.success(updated);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.enums.CatalogType;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.FlinkCatalogParams;

import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;

import java.io.Serializable;
import java.util.Date;

/** catalog store */
@Data
@TableName("t_flink_catalog")
@Slf4j
public class FlinkCatalog implements Serializable {

@TableId(type = IdType.AUTO)
private Long id;

private Long teamId;

private String catalogName;

private CatalogType catalogType;

private String configuration;

@TableField(updateStrategy = FieldStrategy.IGNORED)
private Date createTime;

private Date updateTime;

/** creator */
private Long userId;

public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) {
if (flinkCatalogParams == null)
return null;
FlinkCatalog flinkCatalog = new FlinkCatalog();

BeanUtils.copyProperties(
flinkCatalogParams,
flinkCatalog,
"flinkJDBCCatalog",
"flinkHiveCatalog",
"flinkPaimonCatalog",
"customCatalogConfig");

try {
switch (flinkCatalogParams.getCatalogType()) {
case JDBC:
flinkCatalog.setConfiguration(
JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog()));
break;
case HIVE:
flinkCatalog.setConfiguration(
JacksonUtils.write(flinkCatalogParams.getFlinkHiveCatalog()));
break;
case PAIMON:
flinkCatalog.setConfiguration(
JacksonUtils.write(flinkCatalogParams.getFlinkPaimonCatalog()));
break;
case CUSTOM:
flinkCatalog.setConfiguration(flinkCatalogParams.getCustomCatalogConfig());
break;
}
} catch (JsonProcessingException e) {
log.error("Flink catalog json read failed", e);
throw new RuntimeException("Flink catalog json read failed");
}
return flinkCatalog;
}
}
Loading

0 comments on commit 33c64e8

Please sign in to comment.