Skip to content

Commit

Permalink
[FLINK-34316] Reduce instantiation of ScanRuntimeProvider in streamin…
Browse files Browse the repository at this point in the history
…g mode
  • Loading branch information
twalthr committed Jan 31, 2024
1 parent ff9a3c8 commit 5035808
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,12 @@ private static void validateScanSource(
ScanTableSource scanSource,
boolean isBatchMode,
ReadableConfig config) {
final ScanRuntimeProvider provider =
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
final ChangelogMode changelogMode = scanSource.getChangelogMode();

validateWatermarks(tableDebugName, schema);

if (isBatchMode) {
validateScanSourceForBatch(tableDebugName, changelogMode, provider);
validateScanSourceForBatch(tableDebugName, scanSource, changelogMode);
} else {
validateScanSourceForStreaming(
tableDebugName, schema, scanSource, changelogMode, config);
Expand Down Expand Up @@ -558,7 +556,9 @@ private static void validateScanSourceForStreaming(
}

private static void validateScanSourceForBatch(
String tableDebugName, ChangelogMode changelogMode, ScanRuntimeProvider provider) {
String tableDebugName, ScanTableSource scanSource, ChangelogMode changelogMode) {
final ScanRuntimeProvider provider =
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
// batch only supports bounded source
if (!provider.isBounded()) {
throw new ValidationException(
Expand Down

0 comments on commit 5035808

Please sign in to comment.