From 0723317b080603056e7452b472872fa986614731 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Fri, 1 Apr 2022 16:32:41 +0800 Subject: [PATCH 1/8] [Issue-668] Manage configurations in Flink ConfigOptions Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../connectors/flink/PravegaOptions.java | 111 +++++++ .../flink/source/PravegaSourceBuilder.java | 278 +++++++++++++++++- .../source/FlinkPravegaSourceITCase.java | 2 +- .../connectors/flink/utils/SetupUtils.java | 13 + 4 files changed, 391 insertions(+), 13 deletions(-) create mode 100644 src/main/java/io/pravega/connectors/flink/PravegaOptions.java diff --git a/src/main/java/io/pravega/connectors/flink/PravegaOptions.java b/src/main/java/io/pravega/connectors/flink/PravegaOptions.java new file mode 100644 index 00000000..f57e1bab --- /dev/null +++ b/src/main/java/io/pravega/connectors/flink/PravegaOptions.java @@ -0,0 +1,111 @@ +/** + * Copyright Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.pravega.connectors.flink; + +import io.pravega.client.ClientConfig; +import io.pravega.shared.security.auth.DefaultCredentials; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; +import java.net.URI; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +/** + * Details about each configuration could be found at {@link ClientConfig}. + */ +public class PravegaOptions { + public static final String CONTROLLER_URI = "controllerURI"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String TRUST_STORE = "trustStore"; + public static final String VALIDATE_HOST_NAME = "validateHostName"; + public static final String MAX_CONNECTION_PER_SEGMENT_STORE = "maxConnectionsPerSegmentStore"; + public static final String ENABLE_TLS_TO_CONTROLLER = "enableTlsToController"; + public static final String ENABLE_TLS_TO_SEGMENT_STORE = "enableTlsToSegmentStore"; + /** + * Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups. + */ + public static final String DEFAULT_SCOPE = "defaultScope"; + + public static Properties getPropertiesFromEnvironmentAndCommand(@Nullable ParameterTool params) { + Properties pravegaClientConfig = new Properties(); + + Properties properties = System.getProperties(); + Map env = System.getenv(); + + Stream + .of(PravegaOptions.class.getFields()) + .filter(field -> field.getType().equals(String.class)) + .map(field -> { + try { + return (String) field.get(null); + } catch (IllegalAccessException e) { + // Should never happen. + return ""; + } + }) + .forEach(optionName -> { + if (params != null && params.has(optionName)) { + pravegaClientConfig.put(optionName, params.get(optionName)); + } + if (properties != null && properties.containsKey(optionName)) { + pravegaClientConfig.put(optionName, properties.getProperty(optionName)); + } + if (env != null && env.containsKey(optionName)) { + pravegaClientConfig.put(optionName, env.get(optionName)); + } + }); + + return pravegaClientConfig; + } + + public static ClientConfig buildClientConfigFromProperties(Properties pravegaClientConfig) { + ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); + Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.CONTROLLER_URI), "Controller uri must be provided!"); + builder.controllerURI(URI.create(pravegaClientConfig.getProperty(PravegaOptions.CONTROLLER_URI))); + Preconditions.checkState( + (pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)) || + (!pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && !pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)), + "Username and password must be provided together or not!" + ); + if (pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)) { + builder.credentials(new DefaultCredentials( + pravegaClientConfig.getProperty(PravegaOptions.USERNAME), + pravegaClientConfig.getProperty(PravegaOptions.PASSWORD)) + ); + } + if (pravegaClientConfig.containsKey(PravegaOptions.VALIDATE_HOST_NAME)) { + builder.validateHostName(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.VALIDATE_HOST_NAME))); + } + if (pravegaClientConfig.containsKey(PravegaOptions.TRUST_STORE)) { + builder.trustStore(pravegaClientConfig.getProperty(PravegaOptions.TRUST_STORE)); + } + if (pravegaClientConfig.containsKey(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE)) { + builder.maxConnectionsPerSegmentStore(Integer.parseInt(pravegaClientConfig.getProperty(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE))); + } + if (pravegaClientConfig.containsKey(PravegaOptions.ENABLE_TLS_TO_CONTROLLER)) { + builder.enableTlsToController(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.ENABLE_TLS_TO_CONTROLLER))); + } + if (pravegaClientConfig.containsKey(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE)) { + builder.enableTlsToSegmentStore(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE))); + } + return builder.build(); + } +} diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index ed4064a9..b6d590e1 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -16,15 +16,31 @@ package io.pravega.connectors.flink.source; -import io.pravega.connectors.flink.AbstractStreamingReaderBuilder; +import io.pravega.client.stream.ReaderGroupConfig; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamCut; +import io.pravega.connectors.flink.PravegaConfig; +import io.pravega.connectors.flink.PravegaOptions; +import io.pravega.connectors.flink.util.FlinkPravegaUtils; import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows; +import org.apache.commons.lang3.tuple.Triple; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static io.pravega.connectors.flink.PravegaOptions.buildClientConfigFromProperties; +import static io.pravega.connectors.flink.PravegaOptions.getPropertiesFromEnvironmentAndCommand; /** *The @builder class for {@link PravegaSource} to make it easier for the users to construct a {@link @@ -32,10 +48,22 @@ * * @param the element type. */ -public class PravegaSourceBuilder extends AbstractStreamingReaderBuilder> { +public class PravegaSourceBuilder { private DeserializationSchema deserializationSchema; - private SerializedValue> assignerWithTimeWindows; + private @Nullable SerializedValue> assignerWithTimeWindows; + /** + * The internal Pravega client configuration. See {@link PravegaOptions}. + */ + private final Properties pravegaClientConfig = new Properties(); + private final List> streams = new ArrayList<>(1); + private boolean enableMetrics = true; + private Time checkpointInitiateTimeout = Time.seconds(5); + private Time eventReadTimeout = Time.seconds(1); + private @Nullable String readerGroupScope; + private @Nullable String readerGroupName; + private @Nullable Time readerGroupRefreshTime; + private int maxOutstandingCheckpointRequest = 3; protected PravegaSourceBuilder builder() { return this; @@ -58,7 +86,6 @@ public PravegaSourceBuilder withDeserializationSchema(DeserializationSchema withTimestampAssigner(AssignerWithTimeWindows assignerWithTimeWindows) { try { ClosureCleaner.clean(assignerWithTimeWindows, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); @@ -69,17 +96,242 @@ public PravegaSourceBuilder withTimestampAssigner(AssignerWithTimeWindows return this; } - @Override protected DeserializationSchema getDeserializationSchema() { Preconditions.checkState(deserializationSchema != null, "Deserialization schema must not be null."); return deserializationSchema; } - @Override protected SerializedValue> getAssignerWithTimeWindows() { return assignerWithTimeWindows; } + public PravegaSourceBuilder withEnvironmentAndParameter(@Nullable ParameterTool params) { + this.pravegaClientConfig.putAll(getPropertiesFromEnvironmentAndCommand(params)); + return this; + } + + public PravegaSourceBuilder withPravegaClientConfig(Properties pravegaClientConfig) { + Preconditions.checkNotNull(pravegaClientConfig, "pravegaClientConfig"); + this.pravegaClientConfig.putAll(pravegaClientConfig); + return this; + } + + /** + * enable/disable pravega reader metrics (default: enabled). + * + * @param enable boolean + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder enableMetrics(boolean enable) { + this.enableMetrics = enable; + return this; + } + + /** + * Configures the reader group scope for synchronization purposes. + *

+ * The default value is taken from the {@link PravegaConfig} {@code defaultScope} property. + * + * @param scope the scope name. + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withReaderGroupScope(String scope) { + this.readerGroupScope = Preconditions.checkNotNull(scope); + return this; + } + + /** + * Configures the reader group name. + * + * @param readerGroupName the reader group name. + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withReaderGroupName(String readerGroupName) { + this.readerGroupName = Preconditions.checkNotNull(readerGroupName); + return this; + } + + /** + * Sets the group refresh time, with a default of 1 second. + * + * @param groupRefreshTime The group refresh time + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withReaderGroupRefreshTime(Time groupRefreshTime) { + this.readerGroupRefreshTime = groupRefreshTime; + return this; + } + + /** + * Sets the timeout for initiating a checkpoint in Pravega. + * + * @param checkpointInitiateTimeout The timeout + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) { + Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0"); + this.checkpointInitiateTimeout = checkpointInitiateTimeout; + return this; + } + + /** + * Sets the timeout for the call to read events from Pravega. After the timeout + * expires (without an event being returned), another call will be made. + * + * @param eventReadTimeout The timeout + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withEventReadTimeout(Time eventReadTimeout) { + Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0"); + this.eventReadTimeout = eventReadTimeout; + return this; + } + + /** + * Configures the maximum outstanding checkpoint requests to Pravega (default=3). + * Upon requesting more checkpoints than the specified maximum, + * (say a checkpoint request times out on the ReaderCheckpointHook but Pravega is still working on it), + * this configurations allows Pravega to limit any further checkpoint request being made to the ReaderGroup. + * This configuration is particularly relevant when multiple checkpoint requests need to be honored (e.g., frequent savepoint requests being triggered concurrently). + * + * @param maxOutstandingCheckpointRequest maximum outstanding checkpoint request. + * @return A builder to configure and create a streaming reader. + */ + public PravegaSourceBuilder withMaxOutstandingCheckpointRequest(int maxOutstandingCheckpointRequest) { + this.maxOutstandingCheckpointRequest = maxOutstandingCheckpointRequest; + return this; + } + + /** + * Add a stream to be read by the source, from the earliest available position in the stream. + * + * @param streamSpec the unqualified or qualified name of the stream. + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final String streamSpec) { + return forStream(streamSpec, StreamCut.UNBOUNDED, StreamCut.UNBOUNDED); + } + + /** + * Add a stream to be read by the source, from the given start position in the stream. + * + * @param streamSpec the unqualified or qualified name of the stream. + * @param startStreamCut Start {@link StreamCut} + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final String streamSpec, final StreamCut startStreamCut) { + return forStream(streamSpec, startStreamCut, StreamCut.UNBOUNDED); + } + + /** + * Add a stream to be read by the source, from the given start position in the stream. + * + * @param streamSpec the unqualified or qualified name of the stream. + * @param startStreamCut Start {@link StreamCut} + * @param endStreamCut End {@link StreamCut} + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final String streamSpec, final StreamCut startStreamCut, final StreamCut endStreamCut) { + Preconditions.checkNotNull(streamSpec, "streamSpec"); + Preconditions.checkNotNull(startStreamCut, "from"); + Preconditions.checkNotNull(endStreamCut, "to"); + streams.add(Triple.of(streamSpec, startStreamCut, endStreamCut)); + return this; + } + + /** + * Add a stream to be read by the source, from the earliest available position in the stream. + * + * @param stream Stream. + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final Stream stream) { + return forStream(stream, StreamCut.UNBOUNDED, StreamCut.UNBOUNDED); + } + + /** + * Add a stream to be read by the source, from the given start position in the stream. + * + * @param stream Stream. + * @param startStreamCut Start {@link StreamCut} + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final Stream stream, final StreamCut startStreamCut) { + return forStream(stream, startStreamCut, StreamCut.UNBOUNDED); + } + + /** + * Add a stream to be read by the source, from the given start position in the stream to the given end position. + * + * @param stream Stream. + * @param startStreamCut Start {@link StreamCut} + * @param endStreamCut End {@link StreamCut} + * @return A builder to configure and create a reader. + */ + public PravegaSourceBuilder forStream(final Stream stream, final StreamCut startStreamCut, final StreamCut endStreamCut) { + Preconditions.checkNotNull(stream, "streamSpec"); + Preconditions.checkNotNull(startStreamCut, "from"); + Preconditions.checkNotNull(endStreamCut, "to"); + streams.add(Triple.of(stream.getScopedName(), startStreamCut, endStreamCut)); + return this; + } + + /** + * Resolves the given stream name. + * + * The scope name is resolved in the following order: + * 1. from the stream name (if fully-qualified) + * 2. from the program argument {@code --scope} (if program arguments were provided to the {@link PravegaConfig}) + * 3. from the system property {@code pravega.scope} + * 4. from the system environment variable {@code PRAVEGA_SCOPE} + * + * @param streamSpec a qualified or unqualified stream name + * @return a fully-qualified stream name + * @throws IllegalStateException if an unqualified stream name is supplied but the scope is not configured. + */ + public Stream resolve(String streamSpec) { + Preconditions.checkNotNull(streamSpec, "streamSpec"); + String[] split = streamSpec.split("/", 2); + if (split.length == 1) { + // unqualified + Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.DEFAULT_SCOPE), "The default scope is not configured."); + return Stream.of(pravegaClientConfig.getProperty(PravegaOptions.DEFAULT_SCOPE), split[0]); + } else { + // qualified + assert split.length == 2; + return Stream.of(split[0], split[1]); + } + } + + /** + * Build reader group configuration from streams and defaultScope. + * + * @return rgConfig, rgScope, and rgName. + */ + public Triple buildReaderGroupInfo() { + // rgConfig + ReaderGroupConfig.ReaderGroupConfigBuilder rgConfigBuilder = ReaderGroupConfig + .builder() + .maxOutstandingCheckpointRequest(maxOutstandingCheckpointRequest) + .disableAutomaticCheckpoints(); + if (readerGroupRefreshTime != null) { + rgConfigBuilder.groupRefreshTimeMillis(readerGroupRefreshTime.toMilliseconds()); + } + Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied."); + streams.forEach(s -> rgConfigBuilder.stream(resolve(s.getLeft()), s.getMiddle(), s.getRight())); + final ReaderGroupConfig rgConfig = rgConfigBuilder.build(); + + // rgScope + final String rgScope = Optional.ofNullable(readerGroupScope).orElseGet(() -> { + Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.DEFAULT_SCOPE), "A reader group scope or default scope must be configured"); + return pravegaClientConfig.getProperty(PravegaOptions.DEFAULT_SCOPE); + }); + + // rgName + final String rgName = Optional.ofNullable(readerGroupName).orElseGet(FlinkPravegaUtils::generateRandomReaderGroupName); + return Triple.of(rgConfig, rgScope, rgName); + } + /** * Builds a {@link PravegaSource} based on the configuration. * @@ -87,16 +339,18 @@ protected SerializedValue> getAssignerWithTimeWindows * @return an uninitiailized reader as a source function. */ private PravegaSource buildSource() { - PravegaSourceBuilder.ReaderGroupInfo readerGroupInfo = buildReaderGroupInfo(); + // get rgConfig, rgScope, and rgName from streams and defaultScope. + Triple readerGroupInfo = buildReaderGroupInfo(); + return new PravegaSource<>( - getPravegaConfig().getClientConfig(), - readerGroupInfo.getReaderGroupConfig(), - readerGroupInfo.getReaderGroupScope(), - readerGroupInfo.getReaderGroupName(), + buildClientConfigFromProperties(this.pravegaClientConfig), + readerGroupInfo.getLeft(), + readerGroupInfo.getMiddle(), + readerGroupInfo.getRight(), getDeserializationSchema(), this.eventReadTimeout, this.checkpointInitiateTimeout, - isMetricsEnabled()); + this.enableMetrics); } /** diff --git a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java index 006c7442..fe7bed3e 100644 --- a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java +++ b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java @@ -132,7 +132,7 @@ private static void runTest( final PravegaSource pravegaSource = PravegaSource.builder() .forStream(streamName) .enableMetrics(false) - .withPravegaConfig(SETUP_UTILS.getPravegaConfig()) + .withPravegaClientConfig(SETUP_UTILS.getPravegaClientConfig()) .withReaderGroupName(readerGroupName) .withDeserializationSchema(new IntegerDeserializationSchema()) .build(); diff --git a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java index 9b0852fc..ad807112 100644 --- a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java +++ b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java @@ -28,6 +28,7 @@ import io.pravega.client.stream.Stream; import io.pravega.client.stream.StreamConfiguration; import io.pravega.connectors.flink.PravegaConfig; +import io.pravega.connectors.flink.PravegaOptions; import io.pravega.local.InProcPravegaCluster; import io.pravega.shared.security.auth.DefaultCredentials; import org.apache.commons.lang3.RandomStringUtils; @@ -39,6 +40,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -211,6 +213,17 @@ public PravegaConfig getPravegaConfig() { .withTrustStore(getPathFromResource(CLIENT_TRUST_STORE_FILE)); } + public Properties getPravegaClientConfig() { + final Properties properties = new Properties(); + properties.put(PravegaOptions.CONTROLLER_URI, getControllerUri().toString()); + properties.put(PravegaOptions.DEFAULT_SCOPE, getScope()); + properties.put(PravegaOptions.USERNAME, PRAVEGA_PASSWORD); + properties.put(PravegaOptions.PASSWORD, PRAVEGA_USERNAME); + properties.put(PravegaOptions.VALIDATE_HOST_NAME, enableHostNameValidation); + properties.put(PravegaOptions.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); + return properties; + } + /** * Create the test stream. * From 1f851dc57719fb2063b5ca37e3e729602107c9e1 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Sat, 2 Apr 2022 14:40:48 +0800 Subject: [PATCH 2/8] Flink ConfigOptions instead of Properties Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../connectors/flink/PravegaOptions.java | 162 ++++++++++++------ .../flink/source/PravegaSourceBuilder.java | 18 +- .../connectors/flink/utils/SetupUtils.java | 20 +-- 3 files changed, 129 insertions(+), 71 deletions(-) diff --git a/src/main/java/io/pravega/connectors/flink/PravegaOptions.java b/src/main/java/io/pravega/connectors/flink/PravegaOptions.java index f57e1bab..aad870c0 100644 --- a/src/main/java/io/pravega/connectors/flink/PravegaOptions.java +++ b/src/main/java/io/pravega/connectors/flink/PravegaOptions.java @@ -19,93 +19,151 @@ import io.pravega.client.ClientConfig; import io.pravega.shared.security.auth.DefaultCredentials; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.util.Preconditions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.description.Description; import javax.annotation.Nullable; import java.net.URI; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.stream.Stream; /** * Details about each configuration could be found at {@link ClientConfig}. */ -public class PravegaOptions { - public static final String CONTROLLER_URI = "controllerURI"; - public static final String USERNAME = "username"; - public static final String PASSWORD = "password"; - public static final String TRUST_STORE = "trustStore"; - public static final String VALIDATE_HOST_NAME = "validateHostName"; - public static final String MAX_CONNECTION_PER_SEGMENT_STORE = "maxConnectionsPerSegmentStore"; - public static final String ENABLE_TLS_TO_CONTROLLER = "enableTlsToController"; - public static final String ENABLE_TLS_TO_SEGMENT_STORE = "enableTlsToSegmentStore"; - /** - * Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups. - */ - public static final String DEFAULT_SCOPE = "defaultScope"; +public final class PravegaOptions { + public static final String CLIENT_PREFIX = "pravega."; - public static Properties getPropertiesFromEnvironmentAndCommand(@Nullable ParameterTool params) { - Properties pravegaClientConfig = new Properties(); + public static final ConfigOption DEFAULT_SCOPE = + ConfigOptions.key(CLIENT_PREFIX + "defaultScope") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups.") + .build()); + public static final ConfigOption CONTROLLER_URI = + ConfigOptions.key(CLIENT_PREFIX + "controllerURI") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Service URL provider for Pravega service.") + .build()); + public static final ConfigOption USERNAME = + ConfigOptions.key(CLIENT_PREFIX + "username") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("The username to access Pravega.") + .build()); + public static final ConfigOption PASSWORD = + ConfigOptions.key(CLIENT_PREFIX + "password") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("The password to access Pravega.") + .build()); + public static final ConfigOption TRUST_STORE = + ConfigOptions.key(CLIENT_PREFIX + "trustStore") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("The password to access Pravega.") + .build()); + public static final ConfigOption VALIDATE_HOST_NAME = + ConfigOptions.key(CLIENT_PREFIX + "validateHostName") + .booleanType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.") + .linebreak() + .text("This is currently expected to be a signing certificate for the certification authority.") + .build()); + public static final ConfigOption MAX_CONNECTION_PER_SEGMENT_STORE = + ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore") + .intType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("An optional property representing whether to enable TLS for client's communication with the Controller.") + .build()); + public static final ConfigOption ENABLE_TLS_TO_CONTROLLER = + ConfigOptions.key(CLIENT_PREFIX + "enableTlsToController") + .booleanType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Maximum number of connections per Segment store to be used by connection pooling.") + .build()); + public static final ConfigOption ENABLE_TLS_TO_SEGMENT_STORE = + ConfigOptions.key(CLIENT_PREFIX + "enableTlsToSegmentStore") + .booleanType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Maximum number of connections per Segment store to be used by connection pooling.") + .build()); + + private PravegaOptions() { + // This is a constant class. + } + + public static Configuration getPropertiesFromEnvironmentAndCommand(@Nullable ParameterTool params) { + Configuration pravegaClientConfig = new Configuration(); Properties properties = System.getProperties(); Map env = System.getenv(); Stream .of(PravegaOptions.class.getFields()) - .filter(field -> field.getType().equals(String.class)) + .filter(field -> field.getType().equals(ConfigOption.class)) .map(field -> { try { - return (String) field.get(null); + return (ConfigOption) field.get(null); } catch (IllegalAccessException e) { // Should never happen. - return ""; + return null; } }) - .forEach(optionName -> { - if (params != null && params.has(optionName)) { - pravegaClientConfig.put(optionName, params.get(optionName)); + .filter(Objects::nonNull) + .forEach(option -> { + if (params != null && params.has(option.key())) { + pravegaClientConfig.set(option, params.get(option.key())); } - if (properties != null && properties.containsKey(optionName)) { - pravegaClientConfig.put(optionName, properties.getProperty(optionName)); + if (properties != null && properties.containsKey(option.key())) { + pravegaClientConfig.set(option, properties.getProperty(option.key())); } - if (env != null && env.containsKey(optionName)) { - pravegaClientConfig.put(optionName, env.get(optionName)); + if (env != null && env.containsKey(option.key())) { + pravegaClientConfig.set(option, env.get(option.key())); } }); return pravegaClientConfig; } - public static ClientConfig buildClientConfigFromProperties(Properties pravegaClientConfig) { + public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); - Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.CONTROLLER_URI), "Controller uri must be provided!"); - builder.controllerURI(URI.create(pravegaClientConfig.getProperty(PravegaOptions.CONTROLLER_URI))); - Preconditions.checkState( - (pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)) || - (!pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && !pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)), - "Username and password must be provided together or not!" - ); - if (pravegaClientConfig.containsKey(PravegaOptions.USERNAME) && pravegaClientConfig.containsKey(PravegaOptions.PASSWORD)) { + builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaOptions.CONTROLLER_URI))); + if (pravegaClientConfig.getOptional(PravegaOptions.USERNAME).isPresent() && + pravegaClientConfig.getOptional(PravegaOptions.PASSWORD).isPresent()) { builder.credentials(new DefaultCredentials( - pravegaClientConfig.getProperty(PravegaOptions.USERNAME), - pravegaClientConfig.getProperty(PravegaOptions.PASSWORD)) + pravegaClientConfig.get(PravegaOptions.USERNAME), + pravegaClientConfig.get(PravegaOptions.PASSWORD)) ); } - if (pravegaClientConfig.containsKey(PravegaOptions.VALIDATE_HOST_NAME)) { - builder.validateHostName(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.VALIDATE_HOST_NAME))); - } - if (pravegaClientConfig.containsKey(PravegaOptions.TRUST_STORE)) { - builder.trustStore(pravegaClientConfig.getProperty(PravegaOptions.TRUST_STORE)); - } - if (pravegaClientConfig.containsKey(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE)) { - builder.maxConnectionsPerSegmentStore(Integer.parseInt(pravegaClientConfig.getProperty(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE))); - } - if (pravegaClientConfig.containsKey(PravegaOptions.ENABLE_TLS_TO_CONTROLLER)) { - builder.enableTlsToController(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.ENABLE_TLS_TO_CONTROLLER))); - } - if (pravegaClientConfig.containsKey(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE)) { - builder.enableTlsToSegmentStore(Boolean.parseBoolean(pravegaClientConfig.getProperty(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE))); - } + pravegaClientConfig.getOptional(PravegaOptions.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName); + pravegaClientConfig.getOptional(PravegaOptions.TRUST_STORE).ifPresent(builder::trustStore); + pravegaClientConfig.getOptional(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE).ifPresent(builder::maxConnectionsPerSegmentStore); + pravegaClientConfig.getOptional(PravegaOptions.ENABLE_TLS_TO_CONTROLLER).ifPresent(builder::enableTlsToController); + pravegaClientConfig.getOptional(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE).ifPresent(builder::enableTlsToSegmentStore); return builder.build(); } } diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index b6d590e1..6c7751aa 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.Properties; import static io.pravega.connectors.flink.PravegaOptions.buildClientConfigFromProperties; import static io.pravega.connectors.flink.PravegaOptions.getPropertiesFromEnvironmentAndCommand; @@ -55,7 +55,7 @@ public class PravegaSourceBuilder { /** * The internal Pravega client configuration. See {@link PravegaOptions}. */ - private final Properties pravegaClientConfig = new Properties(); + private final Configuration pravegaClientConfig = new Configuration(); private final List> streams = new ArrayList<>(1); private boolean enableMetrics = true; private Time checkpointInitiateTimeout = Time.seconds(5); @@ -106,13 +106,13 @@ protected SerializedValue> getAssignerWithTimeWindows } public PravegaSourceBuilder withEnvironmentAndParameter(@Nullable ParameterTool params) { - this.pravegaClientConfig.putAll(getPropertiesFromEnvironmentAndCommand(params)); + this.pravegaClientConfig.addAll(getPropertiesFromEnvironmentAndCommand(params)); return this; } - public PravegaSourceBuilder withPravegaClientConfig(Properties pravegaClientConfig) { + public PravegaSourceBuilder withPravegaClientConfig(Configuration pravegaClientConfig) { Preconditions.checkNotNull(pravegaClientConfig, "pravegaClientConfig"); - this.pravegaClientConfig.putAll(pravegaClientConfig); + this.pravegaClientConfig.addAll(pravegaClientConfig); return this; } @@ -294,8 +294,8 @@ public Stream resolve(String streamSpec) { String[] split = streamSpec.split("/", 2); if (split.length == 1) { // unqualified - Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.DEFAULT_SCOPE), "The default scope is not configured."); - return Stream.of(pravegaClientConfig.getProperty(PravegaOptions.DEFAULT_SCOPE), split[0]); + Preconditions.checkState(pravegaClientConfig.getOptional(PravegaOptions.DEFAULT_SCOPE).isPresent(), "The default scope is not configured."); + return Stream.of(pravegaClientConfig.get(PravegaOptions.DEFAULT_SCOPE), split[0]); } else { // qualified assert split.length == 2; @@ -323,8 +323,8 @@ public Triple buildReaderGroupInfo() { // rgScope final String rgScope = Optional.ofNullable(readerGroupScope).orElseGet(() -> { - Preconditions.checkState(pravegaClientConfig.containsKey(PravegaOptions.DEFAULT_SCOPE), "A reader group scope or default scope must be configured"); - return pravegaClientConfig.getProperty(PravegaOptions.DEFAULT_SCOPE); + Preconditions.checkState(pravegaClientConfig.getOptional(PravegaOptions.DEFAULT_SCOPE).isPresent(), "A reader group scope or default scope must be configured"); + return pravegaClientConfig.get(PravegaOptions.DEFAULT_SCOPE); }); // rgName diff --git a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java index ad807112..5a868e24 100644 --- a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java +++ b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java @@ -32,6 +32,7 @@ import io.pravega.local.InProcPravegaCluster; import io.pravega.shared.security.auth.DefaultCredentials; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,6 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Base64; -import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -213,15 +213,15 @@ public PravegaConfig getPravegaConfig() { .withTrustStore(getPathFromResource(CLIENT_TRUST_STORE_FILE)); } - public Properties getPravegaClientConfig() { - final Properties properties = new Properties(); - properties.put(PravegaOptions.CONTROLLER_URI, getControllerUri().toString()); - properties.put(PravegaOptions.DEFAULT_SCOPE, getScope()); - properties.put(PravegaOptions.USERNAME, PRAVEGA_PASSWORD); - properties.put(PravegaOptions.PASSWORD, PRAVEGA_USERNAME); - properties.put(PravegaOptions.VALIDATE_HOST_NAME, enableHostNameValidation); - properties.put(PravegaOptions.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); - return properties; + public Configuration getPravegaClientConfig() { + final Configuration pravegaClientConfig = new Configuration(); + pravegaClientConfig.set(PravegaOptions.CONTROLLER_URI, getControllerUri().toString()); + pravegaClientConfig.set(PravegaOptions.DEFAULT_SCOPE, getScope()); + pravegaClientConfig.set(PravegaOptions.USERNAME, PRAVEGA_PASSWORD); + pravegaClientConfig.set(PravegaOptions.PASSWORD, PRAVEGA_USERNAME); + pravegaClientConfig.set(PravegaOptions.VALIDATE_HOST_NAME, enableHostNameValidation); + pravegaClientConfig.set(PravegaOptions.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); + return pravegaClientConfig; } /** From 162c0e43cecd1f37f0bb3f837ab15d9f9213ac29 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Wed, 6 Apr 2022 17:18:35 +0800 Subject: [PATCH 3/8] use configBuilder and sourceOptions Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../PravegaClientConfig.java} | 88 +++--------------- .../config/PravegaClientConfigBuilder.java | 86 ++++++++++++++++++ .../flink/source/PravegaSource.java | 8 +- .../flink/source/PravegaSourceBuilder.java | 89 ++++++++----------- .../flink/source/PravegaSourceOptions.java | 54 ++++++++++- .../source/FlinkPravegaSourceITCase.java | 1 - .../connectors/flink/utils/SetupUtils.java | 14 +-- 7 files changed, 203 insertions(+), 137 deletions(-) rename src/main/java/io/pravega/connectors/flink/{PravegaOptions.java => config/PravegaClientConfig.java} (56%) create mode 100644 src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java diff --git a/src/main/java/io/pravega/connectors/flink/PravegaOptions.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java similarity index 56% rename from src/main/java/io/pravega/connectors/flink/PravegaOptions.java rename to src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java index aad870c0..fb478ada 100644 --- a/src/main/java/io/pravega/connectors/flink/PravegaOptions.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java @@ -14,28 +14,19 @@ * limitations under the License. */ -package io.pravega.connectors.flink; +package io.pravega.connectors.flink.config; import io.pravega.client.ClientConfig; -import io.pravega.shared.security.auth.DefaultCredentials; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.description.Description; -import javax.annotation.Nullable; -import java.net.URI; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Stream; - /** * Details about each configuration could be found at {@link ClientConfig}. */ -public final class PravegaOptions { +public final class PravegaClientConfig { public static final String CLIENT_PREFIX = "pravega."; + public static final String CLIENT_SECURITY_PREFIX = "security."; public static final ConfigOption DEFAULT_SCOPE = ConfigOptions.key(CLIENT_PREFIX + "defaultScope") @@ -54,7 +45,7 @@ public final class PravegaOptions { .text("Service URL provider for Pravega service.") .build()); public static final ConfigOption USERNAME = - ConfigOptions.key(CLIENT_PREFIX + "username") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username") .stringType() .noDefaultValue() .withDescription( @@ -62,7 +53,7 @@ public final class PravegaOptions { .text("The username to access Pravega.") .build()); public static final ConfigOption PASSWORD = - ConfigOptions.key(CLIENT_PREFIX + "password") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password") .stringType() .noDefaultValue() .withDescription( @@ -70,22 +61,22 @@ public final class PravegaOptions { .text("The password to access Pravega.") .build()); public static final ConfigOption TRUST_STORE = - ConfigOptions.key(CLIENT_PREFIX + "trustStore") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore") .stringType() .noDefaultValue() .withDescription( Description.builder() - .text("The password to access Pravega.") + .text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.") + .linebreak() + .text("This is currently expected to be a signing certificate for the certification authority.") .build()); public static final ConfigOption VALIDATE_HOST_NAME = - ConfigOptions.key(CLIENT_PREFIX + "validateHostName") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName") .booleanType() .noDefaultValue() .withDescription( Description.builder() - .text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.") - .linebreak() - .text("This is currently expected to be a signing certificate for the certification authority.") + .text("Whether to enable host name validation or not.") .build()); public static final ConfigOption MAX_CONNECTION_PER_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore") @@ -96,7 +87,7 @@ public final class PravegaOptions { .text("An optional property representing whether to enable TLS for client's communication with the Controller.") .build()); public static final ConfigOption ENABLE_TLS_TO_CONTROLLER = - ConfigOptions.key(CLIENT_PREFIX + "enableTlsToController") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController") .booleanType() .noDefaultValue() .withDescription( @@ -104,7 +95,7 @@ public final class PravegaOptions { .text("Maximum number of connections per Segment store to be used by connection pooling.") .build()); public static final ConfigOption ENABLE_TLS_TO_SEGMENT_STORE = - ConfigOptions.key(CLIENT_PREFIX + "enableTlsToSegmentStore") + ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore") .booleanType() .noDefaultValue() .withDescription( @@ -112,58 +103,7 @@ public final class PravegaOptions { .text("Maximum number of connections per Segment store to be used by connection pooling.") .build()); - private PravegaOptions() { + private PravegaClientConfig() { // This is a constant class. } - - public static Configuration getPropertiesFromEnvironmentAndCommand(@Nullable ParameterTool params) { - Configuration pravegaClientConfig = new Configuration(); - - Properties properties = System.getProperties(); - Map env = System.getenv(); - - Stream - .of(PravegaOptions.class.getFields()) - .filter(field -> field.getType().equals(ConfigOption.class)) - .map(field -> { - try { - return (ConfigOption) field.get(null); - } catch (IllegalAccessException e) { - // Should never happen. - return null; - } - }) - .filter(Objects::nonNull) - .forEach(option -> { - if (params != null && params.has(option.key())) { - pravegaClientConfig.set(option, params.get(option.key())); - } - if (properties != null && properties.containsKey(option.key())) { - pravegaClientConfig.set(option, properties.getProperty(option.key())); - } - if (env != null && env.containsKey(option.key())) { - pravegaClientConfig.set(option, env.get(option.key())); - } - }); - - return pravegaClientConfig; - } - - public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { - ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); - builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaOptions.CONTROLLER_URI))); - if (pravegaClientConfig.getOptional(PravegaOptions.USERNAME).isPresent() && - pravegaClientConfig.getOptional(PravegaOptions.PASSWORD).isPresent()) { - builder.credentials(new DefaultCredentials( - pravegaClientConfig.get(PravegaOptions.USERNAME), - pravegaClientConfig.get(PravegaOptions.PASSWORD)) - ); - } - pravegaClientConfig.getOptional(PravegaOptions.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName); - pravegaClientConfig.getOptional(PravegaOptions.TRUST_STORE).ifPresent(builder::trustStore); - pravegaClientConfig.getOptional(PravegaOptions.MAX_CONNECTION_PER_SEGMENT_STORE).ifPresent(builder::maxConnectionsPerSegmentStore); - pravegaClientConfig.getOptional(PravegaOptions.ENABLE_TLS_TO_CONTROLLER).ifPresent(builder::enableTlsToController); - pravegaClientConfig.getOptional(PravegaOptions.ENABLE_TLS_TO_SEGMENT_STORE).ifPresent(builder::enableTlsToSegmentStore); - return builder.build(); - } } diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java new file mode 100644 index 00000000..ad6b1451 --- /dev/null +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java @@ -0,0 +1,86 @@ +/** + * Copyright Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.pravega.connectors.flink.config; + +import io.pravega.client.ClientConfig; +import io.pravega.shared.security.auth.DefaultCredentials; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nullable; +import java.net.URI; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Stream; + +/** + * A builder for building the Pravega {@link ClientConfig} instance. + */ +public final class PravegaClientConfigBuilder { + public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { + ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); + builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI))); + if (pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() && + pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) { + builder.credentials(new DefaultCredentials( + pravegaClientConfig.get(PravegaClientConfig.USERNAME), + pravegaClientConfig.get(PravegaClientConfig.PASSWORD)) + ); + } + pravegaClientConfig.getOptional(PravegaClientConfig.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName); + pravegaClientConfig.getOptional(PravegaClientConfig.TRUST_STORE).ifPresent(builder::trustStore); + pravegaClientConfig.getOptional(PravegaClientConfig.MAX_CONNECTION_PER_SEGMENT_STORE).ifPresent(builder::maxConnectionsPerSegmentStore); + pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_CONTROLLER).ifPresent(builder::enableTlsToController); + pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_SEGMENT_STORE).ifPresent(builder::enableTlsToSegmentStore); + return builder.build(); + } + + public static Configuration getConfigFromEnvironmentAndCommand(@Nullable ParameterTool params) { + Configuration pravegaClientConfig = new Configuration(); + + Properties properties = System.getProperties(); + Map env = System.getenv(); + + Stream + .of(PravegaClientConfig.class.getFields()) + .filter(field -> field.getType().equals(ConfigOption.class)) + .map(field -> { + try { + return (ConfigOption) field.get(null); + } catch (IllegalAccessException e) { + // Should never happen. + return null; + } + }) + .filter(Objects::nonNull) + .forEach(option -> { + if (params != null && params.has(option.key())) { + pravegaClientConfig.set(option, params.get(option.key())); + } + if (properties != null && properties.containsKey(option.key())) { + pravegaClientConfig.set(option, properties.getProperty(option.key())); + } + if (env != null && env.containsKey(option.key())) { + pravegaClientConfig.set(option, env.get(option.key())); + } + }); + + return pravegaClientConfig; + } +} diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java index e788ab52..970a4468 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java @@ -29,7 +29,6 @@ import io.pravega.connectors.flink.source.split.PravegaSplitSerializer; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.function.Supplier; /** @@ -85,10 +85,10 @@ public class PravegaSource final DeserializationSchema deserializationSchema; // the timeout for reading events from Pravega - final Time eventReadTimeout; + final Duration eventReadTimeout; // the timeout for call that initiates the Pravega checkpoint - final Time checkpointInitiateTimeout; + final Duration checkpointInitiateTimeout; // flag to enable/disable metrics final boolean enableMetrics; @@ -110,7 +110,7 @@ public class PravegaSource public PravegaSource(ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig, String scope, String readerGroupName, DeserializationSchema deserializationSchema, - Time eventReadTimeout, Time checkpointInitiateTimeout, + Duration eventReadTimeout, Duration checkpointInitiateTimeout, boolean enableMetrics) { this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig"); this.readerGroupConfig = Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig"); diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index 6c7751aa..deab224b 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -20,13 +20,12 @@ import io.pravega.client.stream.Stream; import io.pravega.client.stream.StreamCut; import io.pravega.connectors.flink.PravegaConfig; -import io.pravega.connectors.flink.PravegaOptions; +import io.pravega.connectors.flink.config.PravegaClientConfig; import io.pravega.connectors.flink.util.FlinkPravegaUtils; import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows; import org.apache.commons.lang3.tuple.Triple; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; @@ -35,12 +34,12 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import static io.pravega.connectors.flink.PravegaOptions.buildClientConfigFromProperties; -import static io.pravega.connectors.flink.PravegaOptions.getPropertiesFromEnvironmentAndCommand; +import static io.pravega.connectors.flink.config.PravegaClientConfigBuilder.buildClientConfigFromProperties; +import static io.pravega.connectors.flink.config.PravegaClientConfigBuilder.getConfigFromEnvironmentAndCommand; /** *The @builder class for {@link PravegaSource} to make it easier for the users to construct a {@link @@ -53,17 +52,14 @@ public class PravegaSourceBuilder { private DeserializationSchema deserializationSchema; private @Nullable SerializedValue> assignerWithTimeWindows; /** - * The internal Pravega client configuration. See {@link PravegaOptions}. + * The internal Pravega client configuration. See {@link PravegaClientConfig}. */ private final Configuration pravegaClientConfig = new Configuration(); + /** + * The Pravega source configuration. See {@link PravegaSourceOptions}. + */ + private final Configuration pravegaSourceOptions = new Configuration(); private final List> streams = new ArrayList<>(1); - private boolean enableMetrics = true; - private Time checkpointInitiateTimeout = Time.seconds(5); - private Time eventReadTimeout = Time.seconds(1); - private @Nullable String readerGroupScope; - private @Nullable String readerGroupName; - private @Nullable Time readerGroupRefreshTime; - private int maxOutstandingCheckpointRequest = 3; protected PravegaSourceBuilder builder() { return this; @@ -106,7 +102,7 @@ protected SerializedValue> getAssignerWithTimeWindows } public PravegaSourceBuilder withEnvironmentAndParameter(@Nullable ParameterTool params) { - this.pravegaClientConfig.addAll(getPropertiesFromEnvironmentAndCommand(params)); + this.pravegaClientConfig.addAll(getConfigFromEnvironmentAndCommand(params)); return this; } @@ -116,17 +112,6 @@ public PravegaSourceBuilder withPravegaClientConfig(Configuration pravegaClie return this; } - /** - * enable/disable pravega reader metrics (default: enabled). - * - * @param enable boolean - * @return A builder to configure and create a reader. - */ - public PravegaSourceBuilder enableMetrics(boolean enable) { - this.enableMetrics = enable; - return this; - } - /** * Configures the reader group scope for synchronization purposes. *

@@ -136,7 +121,8 @@ public PravegaSourceBuilder enableMetrics(boolean enable) { * @return A builder to configure and create a streaming reader. */ public PravegaSourceBuilder withReaderGroupScope(String scope) { - this.readerGroupScope = Preconditions.checkNotNull(scope); + this.pravegaSourceOptions.set(PravegaSourceOptions.READER_GROUP_SCOPE, + Preconditions.checkNotNull(scope)); return this; } @@ -147,18 +133,19 @@ public PravegaSourceBuilder withReaderGroupScope(String scope) { * @return A builder to configure and create a streaming reader. */ public PravegaSourceBuilder withReaderGroupName(String readerGroupName) { - this.readerGroupName = Preconditions.checkNotNull(readerGroupName); + this.pravegaSourceOptions.set(PravegaSourceOptions.READER_GROUP_NAME, + Preconditions.checkNotNull(readerGroupName)); return this; } /** - * Sets the group refresh time, with a default of 1 second. + * Sets the group refresh time. * * @param groupRefreshTime The group refresh time * @return A builder to configure and create a streaming reader. */ - public PravegaSourceBuilder withReaderGroupRefreshTime(Time groupRefreshTime) { - this.readerGroupRefreshTime = groupRefreshTime; + public PravegaSourceBuilder withReaderGroupRefreshTime(Duration groupRefreshTime) { + this.pravegaSourceOptions.set(PravegaSourceOptions.READER_GROUP_REFRESH_TIME, groupRefreshTime); return this; } @@ -168,9 +155,9 @@ public PravegaSourceBuilder withReaderGroupRefreshTime(Time groupRefreshTime) * @param checkpointInitiateTimeout The timeout * @return A builder to configure and create a streaming reader. */ - public PravegaSourceBuilder withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) { - Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0"); - this.checkpointInitiateTimeout = checkpointInitiateTimeout; + public PravegaSourceBuilder withCheckpointInitiateTimeout(Duration checkpointInitiateTimeout) { + Preconditions.checkArgument(checkpointInitiateTimeout.getNano() > 0, "timeout must be > 0"); + this.pravegaSourceOptions.set(PravegaSourceOptions.CHECKPOINT_INITIATE_TIMEOUT, checkpointInitiateTimeout); return this; } @@ -181,9 +168,9 @@ public PravegaSourceBuilder withCheckpointInitiateTimeout(Time checkpointInit * @param eventReadTimeout The timeout * @return A builder to configure and create a streaming reader. */ - public PravegaSourceBuilder withEventReadTimeout(Time eventReadTimeout) { - Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0"); - this.eventReadTimeout = eventReadTimeout; + public PravegaSourceBuilder withEventReadTimeout(Duration eventReadTimeout) { + Preconditions.checkArgument(eventReadTimeout.getNano() > 0, "timeout must be > 0"); + this.pravegaSourceOptions.set(PravegaSourceOptions.EVENT_READ_TIMEOUT, eventReadTimeout); return this; } @@ -198,7 +185,7 @@ public PravegaSourceBuilder withEventReadTimeout(Time eventReadTimeout) { * @return A builder to configure and create a streaming reader. */ public PravegaSourceBuilder withMaxOutstandingCheckpointRequest(int maxOutstandingCheckpointRequest) { - this.maxOutstandingCheckpointRequest = maxOutstandingCheckpointRequest; + this.pravegaSourceOptions.set(PravegaSourceOptions.MAX_OUTSTANDING_CHECKPOINT_REQUEST, maxOutstandingCheckpointRequest); return this; } @@ -294,8 +281,8 @@ public Stream resolve(String streamSpec) { String[] split = streamSpec.split("/", 2); if (split.length == 1) { // unqualified - Preconditions.checkState(pravegaClientConfig.getOptional(PravegaOptions.DEFAULT_SCOPE).isPresent(), "The default scope is not configured."); - return Stream.of(pravegaClientConfig.get(PravegaOptions.DEFAULT_SCOPE), split[0]); + Preconditions.checkState(pravegaClientConfig.getOptional(PravegaClientConfig.DEFAULT_SCOPE).isPresent(), "The default scope is not configured."); + return Stream.of(pravegaClientConfig.get(PravegaClientConfig.DEFAULT_SCOPE), split[0]); } else { // qualified assert split.length == 2; @@ -312,23 +299,25 @@ public Triple buildReaderGroupInfo() { // rgConfig ReaderGroupConfig.ReaderGroupConfigBuilder rgConfigBuilder = ReaderGroupConfig .builder() - .maxOutstandingCheckpointRequest(maxOutstandingCheckpointRequest) + .maxOutstandingCheckpointRequest(pravegaSourceOptions.get(PravegaSourceOptions.MAX_OUTSTANDING_CHECKPOINT_REQUEST)) .disableAutomaticCheckpoints(); - if (readerGroupRefreshTime != null) { - rgConfigBuilder.groupRefreshTimeMillis(readerGroupRefreshTime.toMilliseconds()); - } + pravegaSourceOptions + .getOptional(PravegaSourceOptions.READER_GROUP_REFRESH_TIME) + .ifPresent(readerGroupRefreshTime -> rgConfigBuilder.groupRefreshTimeMillis(readerGroupRefreshTime.toMillis())); Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied."); streams.forEach(s -> rgConfigBuilder.stream(resolve(s.getLeft()), s.getMiddle(), s.getRight())); final ReaderGroupConfig rgConfig = rgConfigBuilder.build(); // rgScope - final String rgScope = Optional.ofNullable(readerGroupScope).orElseGet(() -> { - Preconditions.checkState(pravegaClientConfig.getOptional(PravegaOptions.DEFAULT_SCOPE).isPresent(), "A reader group scope or default scope must be configured"); - return pravegaClientConfig.get(PravegaOptions.DEFAULT_SCOPE); + final String rgScope = pravegaSourceOptions.getOptional(PravegaSourceOptions.READER_GROUP_SCOPE).orElseGet(() -> { + Preconditions.checkState(pravegaClientConfig.getOptional(PravegaClientConfig.DEFAULT_SCOPE).isPresent(), + "A reader group scope or default scope must be configured"); + return pravegaClientConfig.get(PravegaClientConfig.DEFAULT_SCOPE); }); // rgName - final String rgName = Optional.ofNullable(readerGroupName).orElseGet(FlinkPravegaUtils::generateRandomReaderGroupName); + final String rgName = pravegaSourceOptions.getOptional(PravegaSourceOptions.READER_GROUP_NAME) + .orElseGet(FlinkPravegaUtils::generateRandomReaderGroupName); return Triple.of(rgConfig, rgScope, rgName); } @@ -348,9 +337,9 @@ private PravegaSource buildSource() { readerGroupInfo.getMiddle(), readerGroupInfo.getRight(), getDeserializationSchema(), - this.eventReadTimeout, - this.checkpointInitiateTimeout, - this.enableMetrics); + pravegaSourceOptions.get(PravegaSourceOptions.EVENT_READ_TIMEOUT), + pravegaSourceOptions.get(PravegaSourceOptions.CHECKPOINT_INITIATE_TIMEOUT), + false); } /** diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java index 55462ad2..8c5fa70b 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java @@ -18,16 +18,68 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import java.time.Duration; import java.util.Properties; import java.util.function.Function; public class PravegaSourceOptions { + public static final String SOURCE_PREFIX = "pravega.source."; + public static final ConfigOption READER_TIMEOUT_MS = - ConfigOptions.key("reader.timeout.ms") + ConfigOptions.key(SOURCE_PREFIX + "timeout.ms") .longType() .defaultValue(1000L) .withDescription("The max time to wait when closing components."); + public static final ConfigOption READER_GROUP_NAME = + ConfigOptions.key(SOURCE_PREFIX + "readerGroupName") + .stringType() + .noDefaultValue() + .withDescription("Configures the reader group name."); + public static final ConfigOption READER_GROUP_SCOPE = + ConfigOptions.key(SOURCE_PREFIX + "readerGroupScope") + .stringType() + .noDefaultValue() + .withDescription("Configures the reader group scope for synchronization purposes."); + public static final ConfigOption READER_GROUP_REFRESH_TIME = + ConfigOptions.key(SOURCE_PREFIX + "readerGroupRefreshTime") + .durationType() + .noDefaultValue() + .withDescription("Sets the group refresh time."); + public static final ConfigOption CHECKPOINT_INITIATE_TIMEOUT = + ConfigOptions.key(SOURCE_PREFIX + "checkpointInitiateTimeout") + .durationType() + .defaultValue(Duration.ofSeconds(5)) + .withDescription("Sets the timeout for initiating a checkpoint in Pravega."); + public static final ConfigOption EVENT_READ_TIMEOUT = + ConfigOptions.key(SOURCE_PREFIX + "eventReadTimeout") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription( + Description.builder() + .text("Sets the timeout for the call to read events from Pravega. After the timeout") + .linebreak() + .text("expires (without an event being returned), another call will be made.") + .build()); + public static final ConfigOption MAX_OUTSTANDING_CHECKPOINT_REQUEST = + ConfigOptions.key(SOURCE_PREFIX + "maxOutstandingCheckpointRequest") + .intType() + .defaultValue(3) + .withDescription( + Description.builder() + .text("Configures the maximum outstanding checkpoint requests to Pravega.") + .linebreak() + .text("Upon requesting more checkpoints than the specified maximum,") + .linebreak() + .text("(say a checkpoint request times out on the ReaderCheckpointHook but Pravega is still working on it),") + .linebreak() + .text("this configurations allows Pravega to limit any further checkpoint request being made to the ReaderGroup.") + .linebreak() + .text("This configuration is particularly relevant when multiple checkpoint requests need to be honored.") + .linebreak() + .text("(e.g., frequent savepoint requests being triggered concurrently)") + .build()); public static T getOption( Properties props, ConfigOption configOption, Function parser) { diff --git a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java index fe7bed3e..148e998f 100644 --- a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java +++ b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java @@ -131,7 +131,6 @@ private static void runTest( // the Pravega reader final PravegaSource pravegaSource = PravegaSource.builder() .forStream(streamName) - .enableMetrics(false) .withPravegaClientConfig(SETUP_UTILS.getPravegaClientConfig()) .withReaderGroupName(readerGroupName) .withDeserializationSchema(new IntegerDeserializationSchema()) diff --git a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java index 5a868e24..808e4f4c 100644 --- a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java +++ b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java @@ -28,7 +28,7 @@ import io.pravega.client.stream.Stream; import io.pravega.client.stream.StreamConfiguration; import io.pravega.connectors.flink.PravegaConfig; -import io.pravega.connectors.flink.PravegaOptions; +import io.pravega.connectors.flink.config.PravegaClientConfig; import io.pravega.local.InProcPravegaCluster; import io.pravega.shared.security.auth.DefaultCredentials; import org.apache.commons.lang3.RandomStringUtils; @@ -215,12 +215,12 @@ public PravegaConfig getPravegaConfig() { public Configuration getPravegaClientConfig() { final Configuration pravegaClientConfig = new Configuration(); - pravegaClientConfig.set(PravegaOptions.CONTROLLER_URI, getControllerUri().toString()); - pravegaClientConfig.set(PravegaOptions.DEFAULT_SCOPE, getScope()); - pravegaClientConfig.set(PravegaOptions.USERNAME, PRAVEGA_PASSWORD); - pravegaClientConfig.set(PravegaOptions.PASSWORD, PRAVEGA_USERNAME); - pravegaClientConfig.set(PravegaOptions.VALIDATE_HOST_NAME, enableHostNameValidation); - pravegaClientConfig.set(PravegaOptions.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); + pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, getControllerUri().toString()); + pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, getScope()); + pravegaClientConfig.set(PravegaClientConfig.USERNAME, PRAVEGA_PASSWORD); + pravegaClientConfig.set(PravegaClientConfig.PASSWORD, PRAVEGA_USERNAME); + pravegaClientConfig.set(PravegaClientConfig.VALIDATE_HOST_NAME, enableHostNameValidation); + pravegaClientConfig.set(PravegaClientConfig.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); return pravegaClientConfig; } From 2effec179a5a7ec182ede77e2079ab36d0a89945 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Wed, 6 Apr 2022 17:46:37 +0800 Subject: [PATCH 4/8] update get env Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../flink/config/PravegaClientConfig.java | 45 ++++++------------- .../config/PravegaClientConfigBuilder.java | 43 ++++++++++-------- 2 files changed, 38 insertions(+), 50 deletions(-) diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java index fb478ada..1bdf1657 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java @@ -32,34 +32,22 @@ public final class PravegaClientConfig { ConfigOptions.key(CLIENT_PREFIX + "defaultScope") .stringType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups.") - .build()); + .withDescription("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups."); public static final ConfigOption CONTROLLER_URI = ConfigOptions.key(CLIENT_PREFIX + "controllerURI") .stringType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Service URL provider for Pravega service.") - .build()); + .withDescription("Service URL provider for Pravega service."); public static final ConfigOption USERNAME = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username") .stringType() .noDefaultValue() - .withDescription( - Description.builder() - .text("The username to access Pravega.") - .build()); + .withDescription("The username to access Pravega."); public static final ConfigOption PASSWORD = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password") .stringType() .noDefaultValue() - .withDescription( - Description.builder() - .text("The password to access Pravega.") - .build()); + .withDescription("The password to access Pravega."); public static final ConfigOption TRUST_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore") .stringType() @@ -74,34 +62,27 @@ public final class PravegaClientConfig { ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName") .booleanType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Whether to enable host name validation or not.") - .build()); + .withDescription("Whether to enable host name validation or not."); public static final ConfigOption MAX_CONNECTION_PER_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore") .intType() .noDefaultValue() - .withDescription( - Description.builder() - .text("An optional property representing whether to enable TLS for client's communication with the Controller.") - .build()); + .withDescription("An optional property representing whether to enable TLS for client's communication with the Controller."); public static final ConfigOption ENABLE_TLS_TO_CONTROLLER = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController") .booleanType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Maximum number of connections per Segment store to be used by connection pooling.") - .build()); + .withDescription("Maximum number of connections per Segment store to be used by connection pooling."); public static final ConfigOption ENABLE_TLS_TO_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore") .booleanType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Maximum number of connections per Segment store to be used by connection pooling.") - .build()); + .withDescription("Maximum number of connections per Segment store to be used by connection pooling."); + public static final ConfigOption SCHEMA_REGISTRY_URI = + ConfigOptions.key(CLIENT_PREFIX + "schemaRegistryURI") + .stringType() + .noDefaultValue() + .withDescription("Configures the Pravega schema registry URI."); private PravegaClientConfig() { // This is a constant class. diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java index ad6b1451..3b1f29b1 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java @@ -25,7 +25,6 @@ import javax.annotation.Nullable; import java.net.URI; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.stream.Stream; @@ -57,27 +56,35 @@ public static Configuration getConfigFromEnvironmentAndCommand(@Nullable Paramet Properties properties = System.getProperties(); Map env = System.getenv(); + final class EnvOption { + final String parameterName; + final String propertyName; + final String variableName; + final ConfigOption configOption; + + EnvOption(String parameterName, String propertyName, String variableName, ConfigOption configOption) { + this.parameterName = parameterName; + this.propertyName = propertyName; + this.variableName = variableName; + this.configOption = configOption; + } + } + Stream - .of(PravegaClientConfig.class.getFields()) - .filter(field -> field.getType().equals(ConfigOption.class)) - .map(field -> { - try { - return (ConfigOption) field.get(null); - } catch (IllegalAccessException e) { - // Should never happen. - return null; - } - }) - .filter(Objects::nonNull) + .of( + new EnvOption("controller", "pravega.controller.uri", "PRAVEGA_CONTROLLER_URI", PravegaClientConfig.CONTROLLER_URI), + new EnvOption("scope", "pravega.scope", "PRAVEGA_SCOPE", PravegaClientConfig.DEFAULT_SCOPE), + new EnvOption("schema-registry", "pravega.schema-registry.uri", "PRAVEGA_SCHEMA_REGISTRY_URI", PravegaClientConfig.SCHEMA_REGISTRY_URI) + ) .forEach(option -> { - if (params != null && params.has(option.key())) { - pravegaClientConfig.set(option, params.get(option.key())); + if (params != null && params.has(option.parameterName)) { + pravegaClientConfig.set(option.configOption, params.get(option.parameterName)); } - if (properties != null && properties.containsKey(option.key())) { - pravegaClientConfig.set(option, properties.getProperty(option.key())); + if (properties != null && properties.containsKey(option.propertyName)) { + pravegaClientConfig.set(option.configOption, properties.getProperty(option.propertyName)); } - if (env != null && env.containsKey(option.key())) { - pravegaClientConfig.set(option, env.get(option.key())); + if (env != null && env.containsKey(option.variableName)) { + pravegaClientConfig.set(option.configOption, env.get(option.variableName)); } }); From c27fb91017aefb66b39e0f629e9e9458aa70d86a Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Thu, 7 Apr 2022 17:49:08 +0800 Subject: [PATCH 5/8] add docs Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../flink/config/PravegaClientConfig.java | 26 ++-- .../config/PravegaClientConfigBuilder.java | 14 +- .../flink/source/PravegaSourceBuilder.java | 135 ++++++++++++++++++ .../flink/source/PravegaSourceOptions.java | 33 +---- 4 files changed, 165 insertions(+), 43 deletions(-) diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java index 1bdf1657..c443ae58 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java @@ -19,7 +19,6 @@ import io.pravega.client.ClientConfig; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.description.Description; /** * Details about each configuration could be found at {@link ClientConfig}. @@ -32,57 +31,52 @@ public final class PravegaClientConfig { ConfigOptions.key(CLIENT_PREFIX + "defaultScope") .stringType() .noDefaultValue() - .withDescription("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups."); + .withDescription("Required default scope name."); public static final ConfigOption CONTROLLER_URI = ConfigOptions.key(CLIENT_PREFIX + "controllerURI") .stringType() .noDefaultValue() - .withDescription("Service URL provider for Pravega service."); + .withDescription("Required Pravega controller URI."); public static final ConfigOption USERNAME = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username") .stringType() .noDefaultValue() - .withDescription("The username to access Pravega."); + .withDescription("Optional username for security."); public static final ConfigOption PASSWORD = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password") .stringType() .noDefaultValue() - .withDescription("The password to access Pravega."); + .withDescription("Optional password for security."); public static final ConfigOption TRUST_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore") .stringType() .noDefaultValue() - .withDescription( - Description.builder() - .text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.") - .linebreak() - .text("This is currently expected to be a signing certificate for the certification authority.") - .build()); + .withDescription("Optional trust store path for Pravega client."); public static final ConfigOption VALIDATE_HOST_NAME = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName") .booleanType() .noDefaultValue() - .withDescription("Whether to enable host name validation or not."); + .withDescription("Optional flag to decide whether to enable host name validation when TLS is enabled."); public static final ConfigOption MAX_CONNECTION_PER_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore") .intType() .noDefaultValue() - .withDescription("An optional property representing whether to enable TLS for client's communication with the Controller."); + .withDescription("Optional max number of connections per Segment store to be used by connection pooling."); public static final ConfigOption ENABLE_TLS_TO_CONTROLLER = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController") .booleanType() .noDefaultValue() - .withDescription("Maximum number of connections per Segment store to be used by connection pooling."); + .withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller."); public static final ConfigOption ENABLE_TLS_TO_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore") .booleanType() .noDefaultValue() - .withDescription("Maximum number of connections per Segment store to be used by connection pooling."); + .withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller."); public static final ConfigOption SCHEMA_REGISTRY_URI = ConfigOptions.key(CLIENT_PREFIX + "schemaRegistryURI") .stringType() .noDefaultValue() - .withDescription("Configures the Pravega schema registry URI."); + .withDescription("Optional Pravega schema registry URI."); private PravegaClientConfig() { // This is a constant class. diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java index 3b1f29b1..b30bef70 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java @@ -29,9 +29,15 @@ import java.util.stream.Stream; /** - * A builder for building the Pravega {@link ClientConfig} instance. + * Helper methods for {@link PravegaClientConfig}. */ public final class PravegaClientConfigBuilder { + /** + * A builder for building the Pravega {@link ClientConfig} instance. + * + * @param pravegaClientConfig The configuration. + * @return A Pravega {@link ClientConfig} instance. + */ public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI))); @@ -50,6 +56,12 @@ public static ClientConfig buildClientConfigFromProperties(Configuration pravega return builder.build(); } + /** + * Get configuration from command line and system environment. + * + * @param params Command line params from {@link ParameterTool#fromArgs(String[])} + * @return A Pravega client configuration. + */ public static Configuration getConfigFromEnvironmentAndCommand(@Nullable ParameterTool params) { Configuration pravegaClientConfig = new Configuration(); diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index deab224b..0c874b49 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -101,17 +101,151 @@ protected SerializedValue> getAssignerWithTimeWindows return assignerWithTimeWindows; } + /** + * Set the Pravega client configuration, which includes connection info, security info, and a default scope + * from command line and system environments. + * + * @param params The command arguments executing the program. + * @return Builder instance. + */ public PravegaSourceBuilder withEnvironmentAndParameter(@Nullable ParameterTool params) { this.pravegaClientConfig.addAll(getConfigFromEnvironmentAndCommand(params)); return this; } + /** + * Set the Pravega client configuration, which includes connection info, security info, and a default scope. + * + * @param pravegaClientConfig The configuration to use. + * @return Builder instance. + */ public PravegaSourceBuilder withPravegaClientConfig(Configuration pravegaClientConfig) { Preconditions.checkNotNull(pravegaClientConfig, "pravegaClientConfig"); this.pravegaClientConfig.addAll(pravegaClientConfig); return this; } + /** + * Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups. + * + * @param defaultScope The default Scope. + * @return Builder instance. + */ + public PravegaSourceBuilder withDefaultScope(String defaultScope) { + this.pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, + Preconditions.checkNotNull(defaultScope)); + return this; + } + + /** + * Service URL provider for Pravega service. + * + * @param controllerURI The controller RPC URI. + * @return Builder instance. + */ + public PravegaSourceBuilder withControllerURI(String controllerURI) { + this.pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, + Preconditions.checkNotNull(controllerURI)); + return this; + } + + /** + * The username to access Pravega. + * + * @param username The username. + * @return Builder instance. + */ + public PravegaSourceBuilder withUsername(String username) { + this.pravegaClientConfig.set(PravegaClientConfig.USERNAME, + Preconditions.checkNotNull(username)); + return this; + } + + /** + * The password to access Pravega. + * + * @param password The password. + * @return Builder instance. + */ + public PravegaSourceBuilder withPassword(String password) { + this.pravegaClientConfig.set(PravegaClientConfig.PASSWORD, + Preconditions.checkNotNull(password)); + return this; + } + + /** + * Path to an optional truststore. If this is null or empty, the default JVM trust store is used. + * This is currently expected to be a signing certificate for the certification authority. + * + * @param trustStore Path to an optional truststore. + * @return Builder instance. + */ + public PravegaSourceBuilder withTrustStore(String trustStore) { + this.pravegaClientConfig.set(PravegaClientConfig.PASSWORD, + Preconditions.checkNotNull(trustStore)); + return this; + } + + /** + * Whether to enable host name validation or not. + * + * @param validateHostName Flag to decide whether to enable host name validation or not. + * @return Builder instance. + */ + public PravegaSourceBuilder withValidateHostName(Boolean validateHostName) { + this.pravegaClientConfig.set(PravegaClientConfig.VALIDATE_HOST_NAME, + Preconditions.checkNotNull(validateHostName)); + return this; + } + + /** + * Maximum number of connections per Segment store to be used by connection pooling. + * + * @param maxConnectionsPerSegmentStore Maximum number of connections per Segment store. + * @return Builder instance. + */ + public PravegaSourceBuilder withMaxConnectionsPerSegmentStore(Integer maxConnectionsPerSegmentStore) { + this.pravegaClientConfig.set(PravegaClientConfig.MAX_CONNECTION_PER_SEGMENT_STORE, + Preconditions.checkNotNull(maxConnectionsPerSegmentStore)); + return this; + } + + /** + * An optional property representing whether to enable TLS for client's communication with the Controller. + * + * @param enableTlsToController Flag to decide whether to enable TLS with the Controller or not. + * @return Builder instance. + */ + public PravegaSourceBuilder withEnableTlsToController(Boolean enableTlsToController) { + this.pravegaClientConfig.set(PravegaClientConfig.ENABLE_TLS_TO_CONTROLLER, + Preconditions.checkNotNull(enableTlsToController)); + return this; + } + + /** + * An optional property representing whether to enable TLS for client's communication with the Controller. + * + * @param enableTlsToSegmentStore Flag to decide whether to enable TLS with the Controller or not. + * @return Builder instance. + */ + public PravegaSourceBuilder withEnableTlsToSegmentStore(Boolean enableTlsToSegmentStore) { + this.pravegaClientConfig.set(PravegaClientConfig.ENABLE_TLS_TO_SEGMENT_STORE, + Preconditions.checkNotNull(enableTlsToSegmentStore)); + return this; + } + + /** + * Configures the Pravega schema registry URI. + * + * @param schemaRegistryURI The schema registry URI. + * @return Builder instance. + */ + public PravegaSourceBuilder withSchemaRegistryURI(String schemaRegistryURI) { + this.pravegaClientConfig.set(PravegaClientConfig.SCHEMA_REGISTRY_URI, + Preconditions.checkNotNull(schemaRegistryURI)); + return this; + } + /** * Configures the reader group scope for synchronization purposes. *

@@ -145,6 +279,7 @@ public PravegaSourceBuilder withReaderGroupName(String readerGroupName) { * @return A builder to configure and create a streaming reader. */ public PravegaSourceBuilder withReaderGroupRefreshTime(Duration groupRefreshTime) { + Preconditions.checkArgument(groupRefreshTime.getNano() > 0, "refreshtime must be > 0"); this.pravegaSourceOptions.set(PravegaSourceOptions.READER_GROUP_REFRESH_TIME, groupRefreshTime); return this; } diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java index 8c5fa70b..06afa2a1 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java @@ -18,7 +18,6 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.description.Description; import java.time.Duration; import java.util.Properties; @@ -31,55 +30,37 @@ public class PravegaSourceOptions { ConfigOptions.key(SOURCE_PREFIX + "timeout.ms") .longType() .defaultValue(1000L) - .withDescription("The max time to wait when closing components."); + .withDescription("Optional max time to wait when closing components."); public static final ConfigOption READER_GROUP_NAME = ConfigOptions.key(SOURCE_PREFIX + "readerGroupName") .stringType() .noDefaultValue() - .withDescription("Configures the reader group name."); + .withDescription("Required Pravega reader group name."); public static final ConfigOption READER_GROUP_SCOPE = ConfigOptions.key(SOURCE_PREFIX + "readerGroupScope") .stringType() .noDefaultValue() - .withDescription("Configures the reader group scope for synchronization purposes."); + .withDescription("Optional Pravega reader group scope for synchronization purposes."); public static final ConfigOption READER_GROUP_REFRESH_TIME = ConfigOptions.key(SOURCE_PREFIX + "readerGroupRefreshTime") .durationType() .noDefaultValue() - .withDescription("Sets the group refresh time."); + .withDescription("Optional reader group refresh time."); public static final ConfigOption CHECKPOINT_INITIATE_TIMEOUT = ConfigOptions.key(SOURCE_PREFIX + "checkpointInitiateTimeout") .durationType() .defaultValue(Duration.ofSeconds(5)) - .withDescription("Sets the timeout for initiating a checkpoint in Pravega."); + .withDescription("Optional timeout for initiating a checkpoint in Pravega."); public static final ConfigOption EVENT_READ_TIMEOUT = ConfigOptions.key(SOURCE_PREFIX + "eventReadTimeout") .durationType() .defaultValue(Duration.ofSeconds(1)) - .withDescription( - Description.builder() - .text("Sets the timeout for the call to read events from Pravega. After the timeout") - .linebreak() - .text("expires (without an event being returned), another call will be made.") - .build()); + .withDescription("Optional timeout for the call to read events from Pravega."); public static final ConfigOption MAX_OUTSTANDING_CHECKPOINT_REQUEST = ConfigOptions.key(SOURCE_PREFIX + "maxOutstandingCheckpointRequest") .intType() .defaultValue(3) - .withDescription( - Description.builder() - .text("Configures the maximum outstanding checkpoint requests to Pravega.") - .linebreak() - .text("Upon requesting more checkpoints than the specified maximum,") - .linebreak() - .text("(say a checkpoint request times out on the ReaderCheckpointHook but Pravega is still working on it),") - .linebreak() - .text("this configurations allows Pravega to limit any further checkpoint request being made to the ReaderGroup.") - .linebreak() - .text("This configuration is particularly relevant when multiple checkpoint requests need to be honored.") - .linebreak() - .text("(e.g., frequent savepoint requests being triggered concurrently)") - .build()); + .withDescription("Optional max outstanding checkpoint requests to Pravega."); public static T getOption( Properties props, ConfigOption configOption, Function parser) { From dbae4ad4be38323801a1c5c607454d74671eca61 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Tue, 12 Apr 2022 11:57:26 +0800 Subject: [PATCH 6/8] fix comments Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../flink/config/PravegaClientConfig.java | 41 +++++++++----- ...der.java => PravegaClientConfigUtils.java} | 55 ++++++++----------- .../flink/source/PravegaSourceBuilder.java | 42 +------------- .../source/FlinkPravegaSourceITCase.java | 7 ++- .../connectors/flink/utils/SetupUtils.java | 31 ++++++++++- 5 files changed, 86 insertions(+), 90 deletions(-) rename src/main/java/io/pravega/connectors/flink/config/{PravegaClientConfigBuilder.java => PravegaClientConfigUtils.java} (61%) diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java index c443ae58..e68201b3 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfig.java @@ -19,6 +19,7 @@ import io.pravega.client.ClientConfig; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; /** * Details about each configuration could be found at {@link ClientConfig}. @@ -31,52 +32,64 @@ public final class PravegaClientConfig { ConfigOptions.key(CLIENT_PREFIX + "defaultScope") .stringType() .noDefaultValue() - .withDescription("Required default scope name."); + .withDescription("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups."); public static final ConfigOption CONTROLLER_URI = ConfigOptions.key(CLIENT_PREFIX + "controllerURI") .stringType() .noDefaultValue() - .withDescription("Required Pravega controller URI."); + .withDescription(Description.builder() + .text("Configures the Pravega controller RPC URI.") + .linebreak() + .text("This can be of 2 types:") + .linebreak() + .text("1. tcp://ip1:port1,ip2:port2,...") + .linebreak() + .text("This is used if the controller endpoints are static and can be directly accessed.") + .linebreak() + .text("2. pravega://ip1:port1,ip2:port2,...") + .linebreak() + .text("This is used to autodiscovery the controller endpoints from an initial controller list.") + .build()); public static final ConfigOption USERNAME = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username") .stringType() .noDefaultValue() - .withDescription("Optional username for security."); + .withDescription("Username passed to Pravega for authentication and authorizing the access."); public static final ConfigOption PASSWORD = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password") .stringType() .noDefaultValue() - .withDescription("Optional password for security."); + .withDescription("Password passed to Pravega for authentication and authorizing the access."); public static final ConfigOption TRUST_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore") .stringType() .noDefaultValue() - .withDescription("Optional trust store path for Pravega client."); + .withDescription(Description.builder() + .text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.") + .linebreak() + .text("This is currently expected to be a signing certificate for the certification authority.") + .build() + ); public static final ConfigOption VALIDATE_HOST_NAME = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName") .booleanType() .noDefaultValue() - .withDescription("Optional flag to decide whether to enable host name validation when TLS is enabled."); + .withDescription("Flag to decide whether to validate the hostname on incoming requests."); public static final ConfigOption MAX_CONNECTION_PER_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore") .intType() .noDefaultValue() - .withDescription("Optional max number of connections per Segment store to be used by connection pooling."); + .withDescription("Maximum number of connections per Segment store to be used by connection pooling."); public static final ConfigOption ENABLE_TLS_TO_CONTROLLER = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController") .booleanType() .noDefaultValue() - .withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller."); + .withDescription("Flag to decide whether to enable TLS for client's communication with the Controller."); public static final ConfigOption ENABLE_TLS_TO_SEGMENT_STORE = ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore") .booleanType() .noDefaultValue() - .withDescription("Optional flag decide whether to enable TLS for client's communication with the Controller."); - public static final ConfigOption SCHEMA_REGISTRY_URI = - ConfigOptions.key(CLIENT_PREFIX + "schemaRegistryURI") - .stringType() - .noDefaultValue() - .withDescription("Optional Pravega schema registry URI."); + .withDescription("Flag to decide whether to enable TLS for client's communication with the Controller."); private PravegaClientConfig() { // This is a constant class. diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java similarity index 61% rename from src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java rename to src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java index b30bef70..f356f97b 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java @@ -19,19 +19,19 @@ import io.pravega.client.ClientConfig; import io.pravega.shared.security.auth.DefaultCredentials; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import javax.annotation.Nullable; import java.net.URI; import java.util.Map; import java.util.Properties; -import java.util.stream.Stream; + +import static io.pravega.connectors.flink.util.FlinkPravegaUtils.isCredentialsLoadDynamic; /** * Helper methods for {@link PravegaClientConfig}. */ -public final class PravegaClientConfigBuilder { +public final class PravegaClientConfigUtils { /** * A builder for building the Pravega {@link ClientConfig} instance. * @@ -41,7 +41,8 @@ public final class PravegaClientConfigBuilder { public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI))); - if (pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() && + if (isCredentialsLoadDynamic() && + pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() && pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) { builder.credentials(new DefaultCredentials( pravegaClientConfig.get(PravegaClientConfig.USERNAME), @@ -68,37 +69,25 @@ public static Configuration getConfigFromEnvironmentAndCommand(@Nullable Paramet Properties properties = System.getProperties(); Map env = System.getenv(); - final class EnvOption { - final String parameterName; - final String propertyName; - final String variableName; - final ConfigOption configOption; - - EnvOption(String parameterName, String propertyName, String variableName, ConfigOption configOption) { - this.parameterName = parameterName; - this.propertyName = propertyName; - this.variableName = variableName; - this.configOption = configOption; - } + if (params != null && params.has("controller")) { + pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, params.get("controller")); + } + if (properties != null && properties.containsKey("pravega.controller.uri")) { + pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, properties.getProperty("pravega.controller.uri")); + } + if (env != null && env.containsKey("PRAVEGA_CONTROLLER_URI")) { + pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, env.get("PRAVEGA_CONTROLLER_URI")); } - Stream - .of( - new EnvOption("controller", "pravega.controller.uri", "PRAVEGA_CONTROLLER_URI", PravegaClientConfig.CONTROLLER_URI), - new EnvOption("scope", "pravega.scope", "PRAVEGA_SCOPE", PravegaClientConfig.DEFAULT_SCOPE), - new EnvOption("schema-registry", "pravega.schema-registry.uri", "PRAVEGA_SCHEMA_REGISTRY_URI", PravegaClientConfig.SCHEMA_REGISTRY_URI) - ) - .forEach(option -> { - if (params != null && params.has(option.parameterName)) { - pravegaClientConfig.set(option.configOption, params.get(option.parameterName)); - } - if (properties != null && properties.containsKey(option.propertyName)) { - pravegaClientConfig.set(option.configOption, properties.getProperty(option.propertyName)); - } - if (env != null && env.containsKey(option.variableName)) { - pravegaClientConfig.set(option.configOption, env.get(option.variableName)); - } - }); + if (params != null && params.has("scope")) { + pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, params.get("scope")); + } + if (properties != null && properties.containsKey("pravega.scope")) { + pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, properties.getProperty("pravega.scope")); + } + if (env != null && env.containsKey("PRAVEGA_SCOPE")) { + pravegaClientConfig.set( PravegaClientConfig.DEFAULT_SCOPE, env.get("PRAVEGA_CONTROLLER_URI")); + } return pravegaClientConfig; } diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index 0c874b49..bd47c04f 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -22,24 +22,19 @@ import io.pravega.connectors.flink.PravegaConfig; import io.pravega.connectors.flink.config.PravegaClientConfig; import io.pravega.connectors.flink.util.FlinkPravegaUtils; -import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows; import org.apache.commons.lang3.tuple.Triple; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import static io.pravega.connectors.flink.config.PravegaClientConfigBuilder.buildClientConfigFromProperties; -import static io.pravega.connectors.flink.config.PravegaClientConfigBuilder.getConfigFromEnvironmentAndCommand; +import static io.pravega.connectors.flink.config.PravegaClientConfigUtils.buildClientConfigFromProperties; +import static io.pravega.connectors.flink.config.PravegaClientConfigUtils.getConfigFromEnvironmentAndCommand; /** *The @builder class for {@link PravegaSource} to make it easier for the users to construct a {@link @@ -50,7 +45,6 @@ public class PravegaSourceBuilder { private DeserializationSchema deserializationSchema; - private @Nullable SerializedValue> assignerWithTimeWindows; /** * The internal Pravega client configuration. See {@link PravegaClientConfig}. */ @@ -76,31 +70,11 @@ public PravegaSourceBuilder withDeserializationSchema(DeserializationSchema withTimestampAssigner(AssignerWithTimeWindows assignerWithTimeWindows) { - try { - ClosureCleaner.clean(assignerWithTimeWindows, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - this.assignerWithTimeWindows = new SerializedValue<>(assignerWithTimeWindows); - } catch (IOException e) { - throw new IllegalArgumentException("The given assigner is not serializable", e); - } - return this; - } - protected DeserializationSchema getDeserializationSchema() { Preconditions.checkState(deserializationSchema != null, "Deserialization schema must not be null."); return deserializationSchema; } - protected SerializedValue> getAssignerWithTimeWindows() { - return assignerWithTimeWindows; - } - /** * Set the Pravega client configuration, which includes connection info, security info, and a default scope * from command line and system environments. @@ -234,18 +208,6 @@ public PravegaSourceBuilder withEnableTlsToSegmentStore(Boolean enableTlsToSe return this; } - /** - * Configures the Pravega schema registry URI. - * - * @param schemaRegistryURI The schema registry URI. - * @return Builder instance. - */ - public PravegaSourceBuilder withSchemaRegistryURI(String schemaRegistryURI) { - this.pravegaClientConfig.set(PravegaClientConfig.SCHEMA_REGISTRY_URI, - Preconditions.checkNotNull(schemaRegistryURI)); - return this; - } - /** * Configures the reader group scope for synchronization purposes. *

diff --git a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java index 148e998f..dd32fcd9 100644 --- a/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java +++ b/src/test/java/io/pravega/connectors/flink/source/FlinkPravegaSourceITCase.java @@ -131,7 +131,12 @@ private static void runTest( // the Pravega reader final PravegaSource pravegaSource = PravegaSource.builder() .forStream(streamName) - .withPravegaClientConfig(SETUP_UTILS.getPravegaClientConfig()) + .withControllerURI(SETUP_UTILS.getControllerUri().toString()) + .withDefaultScope(SETUP_UTILS.getScope()) + .withUsername(SetupUtils.getUsername()) + .withPassword(SetupUtils.getPassword()) + .withValidateHostName(SETUP_UTILS.isEnableHostNameValidation()) + .withTrustStore(SetupUtils.getPathFromResource(SetupUtils.getTrustStoreFile())) .withReaderGroupName(readerGroupName) .withDeserializationSchema(new IntegerDeserializationSchema()) .build(); diff --git a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java index 808e4f4c..552776fe 100644 --- a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java +++ b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java @@ -152,13 +152,40 @@ public void stopAllServices() throws Exception { } /** - * Get resources path from resource + * Get Username. + * + * @return Username. + */ + static public String getUsername() { + return PRAVEGA_USERNAME; + } + + /** + * Get Password. + * + * @return Password. + */ + static public String getPassword() { + return PRAVEGA_PASSWORD; + } + + /** + * Get TrustStoreFile. + * + * @return TrustStoreFile. + */ + static public String getTrustStoreFile() { + return CLIENT_TRUST_STORE_FILE; + } + + /** + * Get resources path from resource. * * @param resourceName Name of the resource. * * @return Path of the resource file. */ - static String getPathFromResource(String resourceName) { + public static String getPathFromResource(String resourceName) { return SetupUtils.class.getClassLoader().getResource(resourceName).getPath(); } From 62cc8b6bf74161df37d9bbac8be47bfb9aefbf9c Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Tue, 12 Apr 2022 15:30:43 +0800 Subject: [PATCH 7/8] bug fixes Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../connectors/flink/config/PravegaClientConfigUtils.java | 2 +- .../pravega/connectors/flink/source/PravegaSourceBuilder.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java index f356f97b..55e831f3 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java @@ -41,7 +41,7 @@ public final class PravegaClientConfigUtils { public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) { ClientConfig.ClientConfigBuilder builder = ClientConfig.builder(); builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI))); - if (isCredentialsLoadDynamic() && + if (!isCredentialsLoadDynamic() && pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() && pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) { builder.credentials(new DefaultCredentials( diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java index bd47c04f..5da031e7 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceBuilder.java @@ -155,7 +155,7 @@ public PravegaSourceBuilder withPassword(String password) { * @return Builder instance. */ public PravegaSourceBuilder withTrustStore(String trustStore) { - this.pravegaClientConfig.set(PravegaClientConfig.PASSWORD, + this.pravegaClientConfig.set(PravegaClientConfig.TRUST_STORE, Preconditions.checkNotNull(trustStore)); return this; } From c7697af9125009b9585874e0aa57c690fc24eec5 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:28:50 +0800 Subject: [PATCH 8/8] update source param Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .../config/PravegaClientConfigUtils.java | 4 ++-- .../flink/source/PravegaSource.java | 10 ++++---- .../flink/source/PravegaSourceOptions.java | 5 ---- .../enumerator/PravegaSplitEnumerator.java | 23 ++++++++++++------- .../source/reader/PravegaSplitReader.java | 15 ++++++++---- .../FlinkPravegaSplitEnumeratorTest.java | 4 +++- .../reader/FlinkPravegaSourceReaderTest.java | 3 ++- .../reader/FlinkPravegaSplitReaderTest.java | 4 +++- .../connectors/flink/utils/SetupUtils.java | 4 ++-- 9 files changed, 44 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java index 55e831f3..54df98f9 100644 --- a/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java +++ b/src/main/java/io/pravega/connectors/flink/config/PravegaClientConfigUtils.java @@ -45,8 +45,8 @@ public static ClientConfig buildClientConfigFromProperties(Configuration pravega pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() && pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) { builder.credentials(new DefaultCredentials( - pravegaClientConfig.get(PravegaClientConfig.USERNAME), - pravegaClientConfig.get(PravegaClientConfig.PASSWORD)) + pravegaClientConfig.get(PravegaClientConfig.PASSWORD), + pravegaClientConfig.get(PravegaClientConfig.USERNAME)) ); } pravegaClientConfig.getOptional(PravegaClientConfig.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName); diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java index 970a4468..94cc23a6 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSource.java @@ -131,8 +131,8 @@ public Boundedness getBoundedness() { public SourceReader createReader(SourceReaderContext readerContext) { Supplier splitReaderSupplier = () -> - new PravegaSplitReader(scope, clientConfig, - readerGroupName, readerContext.getIndexOfSubtask()); + new PravegaSplitReader(scope, clientConfig, readerGroupName, + readerContext.getIndexOfSubtask(), this.eventReadTimeout); return new PravegaSourceReader<>( splitReaderSupplier, @@ -150,7 +150,8 @@ public SplitEnumerator createEnumerator( this.readerGroupName, this.clientConfig, this.readerGroupConfig, - null); + null, + this.checkpointInitiateTimeout); } @Override @@ -163,7 +164,8 @@ public SplitEnumerator restoreEnumerator( this.readerGroupName, this.clientConfig, this.readerGroupConfig, - checkpoint); + checkpoint, + this.checkpointInitiateTimeout); } diff --git a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java index 06afa2a1..7fc55226 100644 --- a/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java +++ b/src/main/java/io/pravega/connectors/flink/source/PravegaSourceOptions.java @@ -26,11 +26,6 @@ public class PravegaSourceOptions { public static final String SOURCE_PREFIX = "pravega.source."; - public static final ConfigOption READER_TIMEOUT_MS = - ConfigOptions.key(SOURCE_PREFIX + "timeout.ms") - .longType() - .defaultValue(1000L) - .withDescription("Optional max time to wait when closing components."); public static final ConfigOption READER_GROUP_NAME = ConfigOptions.key(SOURCE_PREFIX + "readerGroupName") .stringType() diff --git a/src/main/java/io/pravega/connectors/flink/source/enumerator/PravegaSplitEnumerator.java b/src/main/java/io/pravega/connectors/flink/source/enumerator/PravegaSplitEnumerator.java index a770ae4d..da5bcc74 100644 --- a/src/main/java/io/pravega/connectors/flink/source/enumerator/PravegaSplitEnumerator.java +++ b/src/main/java/io/pravega/connectors/flink/source/enumerator/PravegaSplitEnumerator.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -74,18 +75,22 @@ public class PravegaSplitEnumerator implements SplitEnumerator context, @@ -93,7 +98,8 @@ public PravegaSplitEnumerator( String readerGroupName, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig, - Checkpoint checkpoint) { + Checkpoint checkpoint, + Duration checkpointInitiateTimeout) { this.enumContext = context; this.scope = scope; this.readerGroupName = readerGroupName; @@ -101,6 +107,7 @@ public PravegaSplitEnumerator( this.readerGroupConfig = readerGroupConfig; this.checkpoint = checkpoint; this.scheduledExecutorService = Executors.newScheduledThreadPool(DEFAULT_CHECKPOINT_THREAD_POOL_SIZE); + this.checkpointInitiateTimeout = checkpointInitiateTimeout; } // initiate reader group manager, reader group and reset the group to the checkpoint position if checkpoint isn't null @@ -158,7 +165,7 @@ public Checkpoint snapshotState(long chkPtID) throws Exception { final CompletableFuture checkpointResult = this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService); try { - this.checkpoint = checkpointResult.get(5, TimeUnit.SECONDS); + this.checkpoint = checkpointResult.get(checkpointInitiateTimeout.toMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { LOG.error("Pravega checkpoint met error.", e); throw e; diff --git a/src/main/java/io/pravega/connectors/flink/source/reader/PravegaSplitReader.java b/src/main/java/io/pravega/connectors/flink/source/reader/PravegaSplitReader.java index de586510..aecb89c8 100644 --- a/src/main/java/io/pravega/connectors/flink/source/reader/PravegaSplitReader.java +++ b/src/main/java/io/pravega/connectors/flink/source/reader/PravegaSplitReader.java @@ -22,7 +22,6 @@ import io.pravega.client.stream.EventStreamReader; import io.pravega.client.stream.ReaderConfig; import io.pravega.client.stream.TruncatedDataException; -import io.pravega.connectors.flink.source.PravegaSourceOptions; import io.pravega.connectors.flink.source.split.PravegaSplit; import io.pravega.connectors.flink.util.FlinkPravegaUtils; import org.apache.flink.configuration.Configuration; @@ -36,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.time.Duration; /** * A {@link SplitReader} implementation that reads records from Pravega. @@ -77,6 +77,11 @@ public class PravegaSplitReader */ private final EventStreamClientFactory eventStreamClientFactory; + /** + * Timeout for the call to read events from Pravega. + */ + private final Duration eventReadTimeout; + /** * Creates a new Pravega Split Reader instance which can read event from Pravega stream. * The Pravega Split Reader is actually an instance of a {@link EventStreamReader}. @@ -85,16 +90,19 @@ public class PravegaSplitReader * @param clientConfig The Pravega client configuration. * @param readerGroupName The reader group name. * @param subtaskId The subtaskId of source reader. + * @param eventReadTimeout The timeout for the call to read events from Pravega. */ public PravegaSplitReader( String scope, ClientConfig clientConfig, String readerGroupName, - int subtaskId) { + int subtaskId, + Duration eventReadTimeout) { this.subtaskId = subtaskId; this.options = new Configuration(); this.readerGroupName = readerGroupName; this.eventStreamClientFactory = EventStreamClientFactory.withScope(scope, clientConfig); + this.eventReadTimeout = eventReadTimeout; this.pravegaReader = FlinkPravegaUtils.createPravegaReader( PravegaSplit.splitId(subtaskId), readerGroupName, @@ -110,8 +118,7 @@ public RecordsWithSplitIds> fetch() { EventRead eventRead = null; do { try { - eventRead = pravegaReader.readNextEvent( - options.getLong(PravegaSourceOptions.READER_TIMEOUT_MS)); + eventRead = pravegaReader.readNextEvent(eventReadTimeout.toMillis()); LOG.debug("read event: {} on reader {}", eventRead.getEvent(), subtaskId); } catch (TruncatedDataException e) { continue; diff --git a/src/test/java/io/pravega/connectors/flink/source/enumerator/FlinkPravegaSplitEnumeratorTest.java b/src/test/java/io/pravega/connectors/flink/source/enumerator/FlinkPravegaSplitEnumeratorTest.java index 872ce91f..90babbea 100644 --- a/src/test/java/io/pravega/connectors/flink/source/enumerator/FlinkPravegaSplitEnumeratorTest.java +++ b/src/test/java/io/pravega/connectors/flink/source/enumerator/FlinkPravegaSplitEnumeratorTest.java @@ -33,6 +33,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.time.Duration; import java.util.Collections; /** Unit tests for {@link PravegaSplitEnumerator}. */ @@ -170,6 +171,7 @@ private PravegaSplitEnumerator createEnumerator(MockSplitEnumeratorContext createReader( } return new PravegaSourceReader<>( () -> new PravegaSplitReader(SETUP_UTILS.getScope(), SETUP_UTILS.getClientConfig(), - readerGroupName, subtaskId), + readerGroupName, subtaskId, Duration.ofMillis(1000)), emitter, new Configuration(), new TestingReaderContext()); diff --git a/src/test/java/io/pravega/connectors/flink/source/reader/FlinkPravegaSplitReaderTest.java b/src/test/java/io/pravega/connectors/flink/source/reader/FlinkPravegaSplitReaderTest.java index 3e79fa2b..57504952 100644 --- a/src/test/java/io/pravega/connectors/flink/source/reader/FlinkPravegaSplitReaderTest.java +++ b/src/test/java/io/pravega/connectors/flink/source/reader/FlinkPravegaSplitReaderTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -154,7 +155,8 @@ private PravegaSplitReader createSplitReader(int subtaskId, String readerGroupNa SETUP_UTILS.getScope(), SETUP_UTILS.getClientConfig(), readerGroupName, - subtaskId); + subtaskId, + Duration.ofMillis(1000)); } private static void createReaderGroup(String readerGroupName, String streamName) throws Exception { diff --git a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java index 552776fe..ad06bf6f 100644 --- a/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java +++ b/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java @@ -244,8 +244,8 @@ public Configuration getPravegaClientConfig() { final Configuration pravegaClientConfig = new Configuration(); pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, getControllerUri().toString()); pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, getScope()); - pravegaClientConfig.set(PravegaClientConfig.USERNAME, PRAVEGA_PASSWORD); - pravegaClientConfig.set(PravegaClientConfig.PASSWORD, PRAVEGA_USERNAME); + pravegaClientConfig.set(PravegaClientConfig.USERNAME, PRAVEGA_USERNAME); + pravegaClientConfig.set(PravegaClientConfig.PASSWORD, PRAVEGA_PASSWORD); pravegaClientConfig.set(PravegaClientConfig.VALIDATE_HOST_NAME, enableHostNameValidation); pravegaClientConfig.set(PravegaClientConfig.TRUST_STORE, getPathFromResource(CLIENT_TRUST_STORE_FILE)); return pravegaClientConfig;