-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embed StreamConfig within ShardInfo #1304
base: master
Are you sure you want to change the base?
Conversation
when(leaseCoordinator.getAssignments()).thenReturn( | ||
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber) | ||
.map(SchedulerTest::constructLease) | ||
.collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually changes the behavior - returns all three leases in each call to getAssignments
, rather than returning one new lease for each of the three subsequent calls (as intended by the original code); will revert this portion in next push.
@Getter(AccessLevel.NONE) | ||
private final boolean isMultiStreamMode; | ||
@Getter(AccessLevel.NONE) | ||
private final String streamIdentifierStr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these two member variables if they are just extracted out from streamConfig? I see you are using them in some functions in this class for comparisons but would it be more clean to extract it from streamConfig individually instead?
/* | ||
* NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated. | ||
* RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future. | ||
*/ | ||
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason we are not changing this now is that there could be some clients which pass in their own interface of RetrievalFactory and may not implement the new function yet? Wouldn't that still mean its backwards incompatible if we ever change its, so it would have to be a major version upgrade?
* @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved. | ||
* @param metricsFactory The {@link MetricsFactory} for recording metrics. | ||
* @return A {@link RecordsPublisher} instance for retrieving records from the shard. | ||
*/ | ||
RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting this was deprecated before and we are undeprecating it. Was there a reason this was deprecated before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes seems like it was deprecated before but this also suggests that ShardInfo was in a way a public contract too, the custom implementation may depend on shardInfo.streamIdentifierSerOpt :( So we cannot remove it, have to mark it deprecated, keep it same as before but dont use it in any of KCL default implementation and only use StreamConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass, still need to review the tests
if (streamIdentifierStr.isPresent()) { | ||
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); | ||
final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier(); | ||
if (streamIdentifier.isMultiStreamInstance()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this check? Can it just be if StringUtils.isBlank(shardInfo.streamConfig().consumerArn()) then use defaultConsumerArn?
The other difference between the two is whether we pass streamIdentifier.serialize() or not. Id say, always pass streamIdentifier, this value is only used for logging, and I think its trying to be smart about whether to only log shardId or streamSer+shardId. I dont get why that is needed, I think its ok to log streamIdentifier.toString()+shardId and for single stream case it would be streamName+shardId, its probably redundant information but we should not be complicating code and having 2 constructors and adding if statements and exposing a public method from streamIdentifier to achieve it. Its an overkill.
At this point, it maybe possible that customer may have script to parse our logs, thats the only time we should be concerned about changing logs. But I highly doubt if we need to be worried about it. We can ask around, Ill check with abhit to see what he thinks about changing logs, I think it should be fine. We should simplify it. But ill double check to make sure im not missing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got confirmed its ok to change logs
* | ||
* @return true if this is a multi-stream instance, false otherwise. | ||
*/ | ||
public boolean isMultiStreamInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont think this needs to be public, see my comment on the usage of this in during the construction of FanOutRecordsPublisher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if you make the other change and we probably dont need a method because its used in only one place. Also I'd like to avoid saying a streamIdentifier is single stream instance or multi stream instance. What does it even mean? You can create a streamIdentifier from streamName, streamArn or streamSer, and its just an ID. I know this is existing code, it was probably meant to force customers to use a streamIdentifier constructed using streamArn or streamArn in multistream mode, but why? If is use just streamName in multi-stream mode, it should still work just fine?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); | ||
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization); | ||
|
||
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally these should be encapsulated in the class itself. lease.shardId() should give leaseKey if its Lease and shardId if its MultiStreamLease.
final boolean isMultiStreamLease = lease instanceof MultiStreamLease; | ||
|
||
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ? | ||
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as below, encapsulate this inside the Lease and MultiStreamLease classes so you dont need these if checks, you can do
Optional.ofNullable(lease.streamIdentifier), make Lease.streamIdentifier return null
|
||
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) { | ||
if (!streamIdentifierSerialization.isPresent()) { | ||
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double negations are always hard to read, can you use Validate.isFalse(isMultistreamMod, "...")
@@ -139,7 +134,7 @@ public boolean equals(Object obj) { | |||
ShardInfo other = (ShardInfo) obj; | |||
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) | |||
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) | |||
.append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals(); | |||
.append(streamIdentifierStr, other.streamIdentifierStr).isEquals(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh you want to compare StreamConfig instead you no longer need the streamIdentifierStr
private final String shardId; | ||
private final String concurrencyToken; | ||
// Sorted list of parent shardIds. | ||
private final List<String> parentShardIds; | ||
private final ExtendedSequenceNumber checkpoint; | ||
private final StreamConfig streamConfig; | ||
|
||
@Getter(AccessLevel.NONE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes thank you, this is necessary because this is class exposes a public contract which cannot be violated and we should not expose more than what is necessary. But I do agree with lucien that we probably dont need them and get it from streamConfig when needed.
@@ -120,8 +119,7 @@ public TaskResult call() { | |||
*/ | |||
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION); | |||
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); | |||
shardInfo.streamIdentifierSerOpt() | |||
.ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId))); | |||
MetricsUtil.addStreamId(shardScope, shardInfo.streamConfig().streamIdentifier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay we dont want to do this by default, for single stream application, this can change dimension and may break public contract on the metrics exposed by KCL.
Seems like then we need some method from streamIdentifier, you had isMultiStreamInstance. We could use it, but I dint want to expose that method based on the comment I left there. Lets think if there is another way.
@@ -218,8 +216,7 @@ private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRec | |||
.millisBehindLatest(input.millisBehindLatest()).build(); | |||
|
|||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); | |||
shardInfo.streamIdentifierSerOpt() | |||
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); | |||
MetricsUtil.addStreamId(scope, shardInfo.streamConfig().streamIdentifier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
@Override | ||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:( dont know why this was part of the public interface, thanks for marking it deprecated, its never called from outside and its only called from within the class so it dint need to be part of the public interface, could just have been a private method.
private ShardInfo constructShardInfoFromLease(final Lease lease) { | ||
final boolean isMultiStreamLease = lease instanceof MultiStreamLease; | ||
|
||
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ? | ||
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); | ||
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization); | ||
|
||
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey(); | ||
return new ShardInfo( | ||
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig); | ||
} | ||
|
||
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) { | ||
if (!streamIdentifierSerialization.isPresent()) { | ||
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); | ||
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next(); | ||
Validate.notNull(streamConfig, "StreamConfig should not be null"); | ||
return streamConfig; | ||
} | ||
|
||
final StreamIdentifier streamIdentifier = | ||
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get()); | ||
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private ShardInfo constructShardInfoFromLease(final Lease lease) { | |
final boolean isMultiStreamLease = lease instanceof MultiStreamLease; | |
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ? | |
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); | |
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization); | |
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey(); | |
return new ShardInfo( | |
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig); | |
} | |
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) { | |
if (!streamIdentifierSerialization.isPresent()) { | |
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); | |
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next(); | |
Validate.notNull(streamConfig, "StreamConfig should not be null"); | |
return streamConfig; | |
} | |
final StreamIdentifier streamIdentifier = | |
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get()); | |
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); | |
} | |
private ShardInfo constructShardInfoFromLease(final Lease lease) { | |
Optional<MultiStreamLease> msl = Optional.of(lease).filter(l->l instanceof MultiStreamLease).map(l->(MultiStreamLease) l); | |
String shardId = msl.map(MultiStreamLease::shardId).orElse(lease.leaseKey()); | |
StreamConfig streamConfig = msl.map(MultiStreamLease::streamIdentifier).map(streamIdentifierSerialization -> { | |
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierSerialization); | |
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); | |
}).orElseGet(()->{ | |
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); | |
return Optional.ofNullable(currentStreamConfigMap.values().iterator().next()).get(); | |
}); | |
return new ShardInfo( | |
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig); | |
} |
This makes it a little more clear that the empty optional represents an uncastable Lease.
when(leaseCoordinator.getAssignments()).thenReturn( | ||
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber) | ||
.map(SchedulerTest::constructLease) | ||
.collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when(leaseCoordinator.getAssignments()).thenReturn( | |
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber) | |
.map(SchedulerTest::constructLease) | |
.collect(Collectors.toList())); | |
when(leaseCoordinator.getAssignments()).thenReturn( | |
ImmutableList.of( | |
constructLease(firstSequenceNumber), | |
constructLease(secondSequenceNumber), | |
constructLease(finalSequenceNumber))); |
Avoid control flow in unit tests.
Issue #, if available:
N/A.
Description of changes:
Add
StreamConfig
as a field withinShardInfo
.Previously,
StreamIdentifier
instances were reconstructed using the optionalstreamIdentifierSerOpt
field withinShardInfo
(which has since been removed).Now,
ShardInfo
maintains a reference to theStreamIdentifier
throughStreamConfig
, eliminating the need for reconstruction.This change encompasses any additional refactoring necessitated by embedding
StreamConfig
withinShardInfo
.For example, there's no longer a need to explicitly provide
initialPositionInStreamExtended
(a field ofStreamConfig
) whenShardInfo
is already available.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.