-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Follow-up changes based on #3768 #3773
Open
gautamguptabasant
wants to merge
1
commit into
apache:master
Choose a base branch
from
gautamguptabasant:3768-followup
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 0 additions & 33 deletions
33
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,23 +75,20 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { | |
public static final String USE_ALL_OBJECTS = "use.all.objects"; | ||
public static final boolean DEFAULT_USE_ALL_OBJECTS = false; | ||
|
||
@VisibleForTesting | ||
static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; | ||
static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; | ||
public static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; | ||
public static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; | ||
static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000; | ||
|
||
@VisibleForTesting | ||
static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; | ||
@VisibleForTesting | ||
static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; | ||
public static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; | ||
public static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; | ||
private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4; | ||
|
||
static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss"; | ||
|
||
private boolean isEarlyStopped = false; | ||
protected SalesforceConnector salesforceConnector = null; | ||
|
||
private SalesforceHistogramService salesforceHistogramService; | ||
private RecordModTimeHistogramService histogramService; | ||
|
||
public SalesforceSource() { | ||
this.lineageInfo = Optional.absent(); | ||
|
@@ -103,9 +100,9 @@ public SalesforceSource() { | |
} | ||
|
||
@VisibleForTesting | ||
SalesforceSource(SalesforceHistogramService salesforceHistogramService) { | ||
SalesforceSource(RecordModTimeHistogramService histogramService) { | ||
this.lineageInfo = Optional.absent(); | ||
this.salesforceHistogramService = salesforceHistogramService; | ||
this.histogramService = histogramService; | ||
} | ||
|
||
@Override | ||
|
@@ -133,11 +130,11 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity | |
} | ||
@Override | ||
protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { | ||
SalesforceConnector connector = getConnector(state); | ||
|
||
SfConfig sfConfig = new SfConfig(state.getProperties()); | ||
if (salesforceHistogramService == null) { | ||
salesforceHistogramService = new SalesforceHistogramService(sfConfig, connector); | ||
if (histogramService == null) { | ||
salesforceConnector = getConnector(state); | ||
histogramService = new RecordModTimeHistogramService(sfConfig, getConnector(state)); | ||
} | ||
|
||
List<WorkUnit> workUnits; | ||
|
@@ -294,7 +291,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st | |
|
||
Partition partition = partitioner.getGlobalPartition(previousWatermark); | ||
Histogram histogram = | ||
salesforceHistogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); | ||
histogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); | ||
|
||
// we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large | ||
|
||
|
@@ -303,7 +300,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st | |
// TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search | ||
if (isEarlyStopEnabled(state)) { | ||
histogramAdjust = new Histogram(); | ||
for (HistogramGroup group : histogram.getGroups()) { | ||
for (Histogram.Group group : histogram.getGroups()) { | ||
histogramAdjust.add(group); | ||
long earlyStopRecordLimit = state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT); | ||
if (histogramAdjust.getTotalRecordCount() > earlyStopRecordLimit) { | ||
|
@@ -316,7 +313,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st | |
|
||
long expectedHighWatermark = partition.getHighWatermark(); | ||
if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) { | ||
HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); | ||
Histogram.Group lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); | ||
long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT)); | ||
log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", | ||
state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark); | ||
|
@@ -354,13 +351,13 @@ String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSi | |
log.info("maxPartitions: " + maxPartitions); | ||
log.info("interval: " + interval); | ||
|
||
List<HistogramGroup> groups = histogram.getGroups(); | ||
List<Histogram.Group> groups = histogram.getGroups(); | ||
List<String> partitionPoints = new ArrayList<>(); | ||
DescriptiveStatistics statistics = new DescriptiveStatistics(); | ||
|
||
int count = 0; | ||
HistogramGroup group; | ||
Iterator<HistogramGroup> it = groups.iterator(); | ||
Histogram.Group group; | ||
Iterator<Histogram.Group> it = groups.iterator(); | ||
|
||
while (it.hasNext()) { | ||
group = it.next(); | ||
|
@@ -427,18 +424,18 @@ protected Set<SourceEntity> getSourceEntities(State state) { | |
return super.getSourceEntities(state); | ||
} | ||
|
||
SalesforceConnector connector = getConnector(state); | ||
salesforceConnector = getConnector(state); | ||
try { | ||
if (!connector.connect()) { | ||
if (!salesforceConnector.connect()) { | ||
throw new RuntimeException("Failed to connect."); | ||
} | ||
} catch (RestApiConnectionException e) { | ||
throw new RuntimeException("Failed to connect.", e); | ||
} | ||
|
||
List<Command> commands = RestApiConnector.constructGetCommand(connector.getFullUri("/sobjects")); | ||
List<Command> commands = RestApiConnector.constructGetCommand(salesforceConnector.getFullUri("/sobjects")); | ||
try { | ||
CommandOutput<?, ?> response = connector.getResponse(commands); | ||
CommandOutput<?, ?> response = salesforceConnector.getResponse(commands); | ||
Iterator<String> itr = (Iterator<String>) response.getResults().values().iterator(); | ||
if (itr.hasNext()) { | ||
String next = itr.next(); | ||
|
@@ -462,9 +459,9 @@ private static Set<SourceEntity> getSourceEntities(String response) { | |
} | ||
|
||
protected SalesforceConnector getConnector(State state) { | ||
if (this.salesforceConnector == null) { | ||
this.salesforceConnector = new SalesforceConnector(state); | ||
if (salesforceConnector == null) { | ||
salesforceConnector = new SalesforceConnector(state); | ||
Comment on lines
+462
to
+463
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not thread-safe... should it be |
||
} | ||
return this.salesforceConnector; | ||
return salesforceConnector; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add comment on why another
getConnector(state)
, rather than reusingedit: reading on I see it doesn't create a new connector... just looks that way