Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-668] Manage all the source and sink configurations in Flink ConfigOptions #670

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright Pravega Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.flink.config;

import io.pravega.client.ClientConfig;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/**
* Details about each configuration could be found at {@link ClientConfig}.
*/
public final class PravegaClientConfig {
public static final String CLIENT_PREFIX = "pravega.";
public static final String CLIENT_SECURITY_PREFIX = "security.";

public static final ConfigOption<String> DEFAULT_SCOPE =
ConfigOptions.key(CLIENT_PREFIX + "defaultScope")
.stringType()
.noDefaultValue()
.withDescription("Required default scope name.");
thekingofcity marked this conversation as resolved.
Show resolved Hide resolved
public static final ConfigOption<String> CONTROLLER_URI =
ConfigOptions.key(CLIENT_PREFIX + "controllerURI")
.stringType()
.noDefaultValue()
.withDescription("Required Pravega controller URI.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username")
.stringType()
.noDefaultValue()
.withDescription("Optional username for security.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password")
.stringType()
.noDefaultValue()
.withDescription("Optional password for security.");
public static final ConfigOption<String> TRUST_STORE =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore")
.stringType()
.noDefaultValue()
.withDescription("Optional trust store path for Pravega client.");
public static final ConfigOption<Boolean> VALIDATE_HOST_NAME =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName")
.booleanType()
.noDefaultValue()
.withDescription("Optional flag to decide whether to enable host name validation when TLS is enabled.");
public static final ConfigOption<Integer> MAX_CONNECTION_PER_SEGMENT_STORE =
ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore")
.intType()
.noDefaultValue()
.withDescription("Optional max number of connections per Segment store to be used by connection pooling.");
public static final ConfigOption<Boolean> ENABLE_TLS_TO_CONTROLLER =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController")
.booleanType()
.noDefaultValue()
.withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller.");
public static final ConfigOption<Boolean> ENABLE_TLS_TO_SEGMENT_STORE =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore")
.booleanType()
.noDefaultValue()
.withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller.");
public static final ConfigOption<String> SCHEMA_REGISTRY_URI =
thekingofcity marked this conversation as resolved.
Show resolved Hide resolved
ConfigOptions.key(CLIENT_PREFIX + "schemaRegistryURI")
.stringType()
.noDefaultValue()
.withDescription("Optional Pravega schema registry URI.");

private PravegaClientConfig() {
// This is a constant class.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright Pravega Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.flink.config;

import io.pravega.client.ClientConfig;
import io.pravega.shared.security.auth.DefaultCredentials;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;

import javax.annotation.Nullable;
import java.net.URI;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;

/**
* Helper methods for {@link PravegaClientConfig}.
*/
public final class PravegaClientConfigBuilder {
thekingofcity marked this conversation as resolved.
Show resolved Hide resolved
/**
* A builder for building the Pravega {@link ClientConfig} instance.
*
* @param pravegaClientConfig The configuration.
* @return A Pravega {@link ClientConfig} instance.
*/
public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) {
ClientConfig.ClientConfigBuilder builder = ClientConfig.builder();
builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI)));
if (pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() &&
thekingofcity marked this conversation as resolved.
Show resolved Hide resolved
pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) {
builder.credentials(new DefaultCredentials(
pravegaClientConfig.get(PravegaClientConfig.USERNAME),
pravegaClientConfig.get(PravegaClientConfig.PASSWORD))
);
}
pravegaClientConfig.getOptional(PravegaClientConfig.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName);
pravegaClientConfig.getOptional(PravegaClientConfig.TRUST_STORE).ifPresent(builder::trustStore);
pravegaClientConfig.getOptional(PravegaClientConfig.MAX_CONNECTION_PER_SEGMENT_STORE).ifPresent(builder::maxConnectionsPerSegmentStore);
pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_CONTROLLER).ifPresent(builder::enableTlsToController);
pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_SEGMENT_STORE).ifPresent(builder::enableTlsToSegmentStore);
return builder.build();
}

/**
* Get configuration from command line and system environment.
*
* @param params Command line params from {@link ParameterTool#fromArgs(String[])}
* @return A Pravega client configuration.
*/
public static Configuration getConfigFromEnvironmentAndCommand(@Nullable ParameterTool params) {
Configuration pravegaClientConfig = new Configuration();

Properties properties = System.getProperties();
Map<String, String> env = System.getenv();

final class EnvOption {
thekingofcity marked this conversation as resolved.
Show resolved Hide resolved
final String parameterName;
final String propertyName;
final String variableName;
final ConfigOption<String> configOption;

EnvOption(String parameterName, String propertyName, String variableName, ConfigOption<String> configOption) {
this.parameterName = parameterName;
this.propertyName = propertyName;
this.variableName = variableName;
this.configOption = configOption;
}
}

Stream
.of(
new EnvOption("controller", "pravega.controller.uri", "PRAVEGA_CONTROLLER_URI", PravegaClientConfig.CONTROLLER_URI),
new EnvOption("scope", "pravega.scope", "PRAVEGA_SCOPE", PravegaClientConfig.DEFAULT_SCOPE),
new EnvOption("schema-registry", "pravega.schema-registry.uri", "PRAVEGA_SCHEMA_REGISTRY_URI", PravegaClientConfig.SCHEMA_REGISTRY_URI)
)
.forEach(option -> {
if (params != null && params.has(option.parameterName)) {
pravegaClientConfig.set(option.configOption, params.get(option.parameterName));
}
if (properties != null && properties.containsKey(option.propertyName)) {
pravegaClientConfig.set(option.configOption, properties.getProperty(option.propertyName));
}
if (env != null && env.containsKey(option.variableName)) {
pravegaClientConfig.set(option.configOption, env.get(option.variableName));
}
});

return pravegaClientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.pravega.connectors.flink.source.split.PravegaSplitSerializer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand All @@ -45,6 +44,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -85,10 +85,10 @@ public class PravegaSource<T>
final DeserializationSchema<T> deserializationSchema;

// the timeout for reading events from Pravega
final Time eventReadTimeout;
final Duration eventReadTimeout;

// the timeout for call that initiates the Pravega checkpoint
final Time checkpointInitiateTimeout;
final Duration checkpointInitiateTimeout;

// flag to enable/disable metrics
final boolean enableMetrics;
Expand All @@ -110,7 +110,7 @@ public class PravegaSource<T>
public PravegaSource(ClientConfig clientConfig,
ReaderGroupConfig readerGroupConfig, String scope, String readerGroupName,
DeserializationSchema<T> deserializationSchema,
Time eventReadTimeout, Time checkpointInitiateTimeout,
Duration eventReadTimeout, Duration checkpointInitiateTimeout,
boolean enableMetrics) {
this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig");
this.readerGroupConfig = Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig");
Expand Down
Loading