Skip to content

Commit

Permalink
[INLONG-10067][DataProxy] Simplify the configuration and acquisition …
Browse files Browse the repository at this point in the history
…of the Manager address (apache#10068)
  • Loading branch information
gosonzhang authored Apr 25, 2024
1 parent 4db62df commit 5a0ae62
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,56 @@
*/
public class CommonConfigHolder {

public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class);
private static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class);
// configure file name
public static final String COMMON_CONFIG_FILE_NAME = "common.properties";
private static final String COMMON_CONFIG_FILE_NAME = "common.properties";
// list split separator
private static final String VAL_CONFIG_ITEMS_SEPARATOR = ",|\\s+";
// **** allowed keys and default value, begin
// cluster tag
public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
public static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
private static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
// cluster name
public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy";
private static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
private static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy";
// cluster incharges
public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
public static final String VAL_DEF_CLUSTER_INCHARGES = "admin";
@Deprecated
private static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
private static final String KEY_PROXY_CLUSTER_INCHARGESV2 = "proxy.cluster.incharges";
private static final String VAL_DEF_CLUSTER_INCHARGES = "admin";
// cluster exttag,
public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
@Deprecated
private static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
private static final String KEY_PROXY_CLUSTER_EXT_TAGV2 = "proxy.cluster.ext.tag";
// predefined format of ext tag: {key}={value}
public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true";
// manager type
public static final String KEY_MANAGER_TYPE = "manager.type";
public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName();
private static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true";
// manager hosts
public static final String KEY_MANAGER_HOSTS = "manager.hosts";
public static final String KEY_MANAGER_HOSTS_SEPARATOR = ",";
private static final String KEY_MANAGER_HOSTS = "manager.hosts";
private static final String KEY_MANAGER_HOST_PORT_SEPARATOR = ":";
// manager auth secret id
public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
@Deprecated
private static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
private static final String KEY_MANAGER_AUTH_SECRET_IDV2 = "manager.auth.secret.id";
// manager auth secret key
public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
@Deprecated
private static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
private static final String KEY_MANAGER_AUTH_SECRET_KEYV2 = "manager.auth.secret.key";
// configure file check interval
private static final String KEY_META_CONFIG_SYNC_INTERVAL_MS = "meta.config.sync.interval.ms";
@Deprecated
private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval";
public static final long VAL_DEF_CONFIG_SYNC_INTERVAL_MS = 60000L;
public static final long VAL_MIN_CONFIG_SYNC_INTERVAL_MS = 10000L;
private static final String KEY_META_CONFIG_SYNC_INTERVAL_MS = "meta.config.sync.interval.ms";
private static final long VAL_DEF_CONFIG_SYNC_INTERVAL_MS = 60000L;
private static final long VAL_MIN_CONFIG_SYNC_INTERVAL_MS = 10000L;
// max allowed wait duration
private static final String KEY_META_CONFIG_SYNC_WAST_ALARM_MS = "meta.config.sync.wast.alarm.ms";
public static final long VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS = 30000L;
private static final long VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS = 40000L;
// whether to startup using the local metadata.json file without connecting to the Manager
@Deprecated
private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE =
"startup.using.local.meta.file.enable";
public static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE = false;
private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILEV2 =
"meta.config.startup.using.local.file.enable";
private static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE = false;
// whether to accept messages without mapping between groupId/streamId and topic
public static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = "id2topic.unconfigured.accept.enable";
public static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = false;
Expand Down Expand Up @@ -158,12 +169,13 @@ public class CommonConfigHolder {
private static volatile boolean isInit = false;
private Map<String, String> props;
// pre-read field values
// node setting
private String clusterTag = VAL_DEF_CLUSTER_TAG;
private String clusterName = VAL_DEF_CLUSTER_NAME;
private String clusterIncharges = VAL_DEF_CLUSTER_INCHARGES;
private String clusterExtTag = VAL_DEF_CLUSTER_EXT_TAG;
private String managerType = VAL_DEF_MANAGER_TYPE;
private IManagerIpListParser ipListParser = null;
// manager setting
private final List<String> managerIpList = new ArrayList<>();
private String managerAuthSecretId = "";
private String managerAuthSecretKey = "";
private boolean enableStartupUsingLocalMetaFile = VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE;
Expand Down Expand Up @@ -281,12 +293,8 @@ public boolean isEnableWhiteList() {
return this.enableWhiteList;
}

public String getManagerType() {
return managerType;
}

public List<String> getManagerHosts() {
return this.ipListParser.getIpList();
return this.managerIpList;
}

public String getManagerAuthSecretId() {
Expand Down Expand Up @@ -406,27 +414,8 @@ private void preReadFields() {
if (StringUtils.isNotEmpty(tmpValue)) {
this.clusterExtTag = tmpValue.trim();
}
// read configure sync interval
tmpValue = this.props.get(KEY_META_CONFIG_SYNC_INTERVAL_MS);
if (StringUtils.isBlank(tmpValue)) {
tmpValue = this.props.get(KEY_CONFIG_CHECK_INTERVAL_MS);
}
if (StringUtils.isNotEmpty(tmpValue)) {
long tmpSyncInvMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_CONFIG_SYNC_INTERVAL_MS);
if (tmpSyncInvMs >= VAL_MIN_CONFIG_SYNC_INTERVAL_MS) {
this.metaConfigSyncInvlMs = tmpSyncInvMs;
}
}
// read configure sync wast alarm ms
tmpValue = this.props.get(KEY_META_CONFIG_SYNC_WAST_ALARM_MS);
if (StringUtils.isNotBlank(tmpValue)) {
this.metaConfigWastAlarmMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS);
}
// read enable startup using local meta file
tmpValue = this.props.get(KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableStartupUsingLocalMetaFile = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read the manager setting
this.preReadManagerSetting();
// read whether accept msg without id2topic configure
tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
if (StringUtils.isNotEmpty(tmpValue)) {
Expand All @@ -452,21 +441,6 @@ private void preReadFields() {
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read manager type
tmpValue = this.props.get(KEY_MANAGER_TYPE);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerType = tmpValue.trim();
}
// read manager auth secret id
tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_ID);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretId = tmpValue.trim();
}
// read manager auth secret key
tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_KEY);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretKey = tmpValue.trim();
}
// read whether enable file metric
tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
if (StringUtils.isNotEmpty(tmpValue)) {
Expand Down Expand Up @@ -587,29 +561,68 @@ private void preReadFields() {
this.maxRetriesAfterFailure = retries;
}
}
// initial ip parser
try {
Class<? extends IManagerIpListParser> ipListParserClass =
(Class<? extends IManagerIpListParser>) Class.forName(this.managerType);
this.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
this.ipListParser.setCommonProperties(this.props);
} catch (Throwable t) {
LOG.error("Initial ipListParser Class {} failure, exit!", this.managerType, t);
System.exit(6);
}
}

private void chkRequiredFields(String requiredFieldKey) {
String fieldVal = props.get(requiredFieldKey);
if (fieldVal == null) {
LOG.error("Missing mandatory field {} in {}, exit!",
requiredFieldKey, COMMON_CONFIG_FILE_NAME);
System.exit(4);
private void preReadManagerSetting() {
String tmpValue;
// read manager hosts
String managerHosts = this.props.get(KEY_MANAGER_HOSTS);
if (StringUtils.isBlank(managerHosts)) {
LOG.error("Value of {} is required in {}, exit!", KEY_MANAGER_HOSTS, COMMON_CONFIG_FILE_NAME);
System.exit(2);
}
if (StringUtils.isBlank(fieldVal)) {
LOG.error("Required {} field value is blank in {}, exit!",
requiredFieldKey, COMMON_CONFIG_FILE_NAME);
System.exit(5);
managerHosts = managerHosts.trim();
String[] hostPort;
String[] hostPortList = managerHosts.split(VAL_CONFIG_ITEMS_SEPARATOR);
for (String tmpItem : hostPortList) {
if (StringUtils.isBlank(tmpItem)) {
continue;
}
hostPort = tmpItem.split(KEY_MANAGER_HOST_PORT_SEPARATOR);
if (hostPort.length != 2
|| StringUtils.isBlank(hostPort[0])
|| StringUtils.isBlank(hostPort[1])) {
continue;
}
managerIpList.add(hostPort[0].trim() + KEY_MANAGER_HOST_PORT_SEPARATOR + hostPort[1].trim());
}
if (managerIpList.isEmpty()) {
LOG.error("Invalid value {} in configure item {}, exit!", managerHosts, KEY_MANAGER_HOSTS);
System.exit(2);
}
// read manager auth secret id
tmpValue = compatGetValue(this.props,
KEY_MANAGER_AUTH_SECRET_IDV2, KEY_MANAGER_AUTH_SECRET_ID);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretId = tmpValue.trim();
}
// read manager auth secret key
tmpValue = compatGetValue(this.props,
KEY_MANAGER_AUTH_SECRET_KEYV2, KEY_MANAGER_AUTH_SECRET_KEY);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretKey = tmpValue.trim();
}
// read enable startup using local meta file
tmpValue = compatGetValue(this.props,
KEY_ENABLE_STARTUP_USING_LOCAL_META_FILEV2, KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableStartupUsingLocalMetaFile = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read configure sync interval
tmpValue = compatGetValue(this.props,
KEY_META_CONFIG_SYNC_INTERVAL_MS, KEY_CONFIG_CHECK_INTERVAL_MS);
if (StringUtils.isNotEmpty(tmpValue)) {
long tmpSyncInvMs = NumberUtils.toLong(
tmpValue.trim(), VAL_DEF_CONFIG_SYNC_INTERVAL_MS);
if (tmpSyncInvMs >= VAL_MIN_CONFIG_SYNC_INTERVAL_MS) {
this.metaConfigSyncInvlMs = tmpSyncInvMs;
}
}
// read configure sync wast alarm ms
tmpValue = this.props.get(KEY_META_CONFIG_SYNC_WAST_ALARM_MS);
if (StringUtils.isNotBlank(tmpValue)) {
this.metaConfigWastAlarmMs = NumberUtils.toLong(
tmpValue.trim(), VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS);
}
}

Expand Down Expand Up @@ -644,7 +657,7 @@ private boolean loadConfigFile() {
} catch (Throwable e) {
LOG.error("Fail to load properties from {}, exit!",
COMMON_CONFIG_FILE_NAME, e);
System.exit(2);
System.exit(1);
return false;
} finally {
if (null != inStream) {
Expand All @@ -653,10 +666,21 @@ private boolean loadConfigFile() {
} catch (IOException e) {
LOG.error("Fail to InputStream.close() for file {}, exit!",
COMMON_CONFIG_FILE_NAME, e);
System.exit(3);
System.exit(1);
}
}
}
return true;
}

private String compatGetValue(Map<String, String> attrs, String newKey, String depKey) {
String tmpValue = attrs.get(newKey);
if (StringUtils.isBlank(tmpValue)) {
tmpValue = attrs.get(depKey);
if (StringUtils.isNotEmpty(tmpValue)) {
LOG.warn("** Deprecated configure key {}, replaced by {} **", depKey, newKey);
}
}
return tmpValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.inlong.dataproxy.config.ConfigManager.CONFIG_HOLDER_LIST;
import static org.apache.inlong.dataproxy.config.ConfigManager.CONFIG_HOLDER_MAP;

public abstract class ConfigHolder {

Expand All @@ -50,7 +50,7 @@ public abstract class ConfigHolder {
public ConfigHolder(String fileName) {
this.fileName = fileName;
setFilePath(fileName);
CONFIG_HOLDER_LIST.add(this);
CONFIG_HOLDER_MAP.put(this, System.currentTimeMillis());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.slf4j.LoggerFactory;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,7 +60,7 @@ public class ConfigManager {

private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);

public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
public static final Map<ConfigHolder, Long> CONFIG_HOLDER_MAP = new ConcurrentHashMap<>();
// whether handshake manager ok
public static final AtomicBoolean handshakeManagerOk = new AtomicBoolean(false);
private static volatile boolean isInit = false;
Expand Down Expand Up @@ -91,7 +90,7 @@ public static ConfigManager getInstance() {
synchronized (ConfigManager.class) {
if (!isInit) {
instance = new ConfigManager();
for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
for (ConfigHolder holder : CONFIG_HOLDER_MAP.keySet()) {
holder.loadFromFileToHolder();
}
ReloadConfigWorker reloadProperties = ReloadConfigWorker.create(instance);
Expand Down Expand Up @@ -249,29 +248,19 @@ public void run() {
long count = 0;
long startTime;
long wstTime;
boolean fisrtCheck = true;
LOG.info("Reload-Config Worker started!");
while (isRunning) {
count += 1;
startTime = System.currentTimeMillis();
try {
// check and load local configure files
for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
for (ConfigHolder holder : CONFIG_HOLDER_MAP.keySet()) {
if (holder.checkAndUpdateHolder()) {
holder.executeCallbacks();
}
}
// connect to manager
if (fisrtCheck) {
fisrtCheck = false;
// connect to manager for updating remote config
if (count % 3 == 0) {
checkRemoteConfig();
count = 0;
} else {
// wait for 3 * check-time to update remote config
if (count % 3 == 0) {
checkRemoteConfig();
count = 0;
}
}
// check processing time
wstTime = System.currentTimeMillis() - startTime;
Expand All @@ -286,6 +275,8 @@ public void run() {
break;
} catch (Throwable ex2) {
LOG.error("Reload-Config Worker encounters exception, continue process", ex2);
} finally {
count++;
}
}
LOG.info("Reload-Config Worker existed!");
Expand All @@ -310,24 +301,12 @@ private int getRandom(int min, int max) {
}

private void checkRemoteConfig() {
String proxyClusterName = CommonConfigHolder.getInstance().getClusterName();
String proxyClusterTag = CommonConfigHolder.getInstance().getClusterTag();
if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(proxyClusterTag)) {
LOG.error("Found {} or {} is blank in {}, can't quest remote configure!",
CommonConfigHolder.KEY_PROXY_CLUSTER_NAME,
CommonConfigHolder.KEY_PROXY_CLUSTER_TAG,
CommonConfigHolder.COMMON_CONFIG_FILE_NAME);
return;
}
List<String> managerIpList = CommonConfigHolder.getInstance().getManagerHosts();
if (managerIpList == null || managerIpList.size() == 0) {
LOG.error("Found manager ip list are empty, can't quest remote configure!");
return;
}
int managerIpSize = managerIpList.size();
for (int i = 0; i < managerIpList.size(); i++) {
String host = managerIpList.get(Math.abs(managerIpListIndex.getAndIncrement()) % managerIpSize);
if (this.reloadDataProxyConfig(proxyClusterName, proxyClusterTag, host)) {
if (this.reloadDataProxyConfig(CommonConfigHolder.getInstance().getClusterName(),
CommonConfigHolder.getInstance().getClusterTag(), host)) {
break;
}
}
Expand Down
Loading

0 comments on commit 5a0ae62

Please sign in to comment.