Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compat] [server] [client] [test] Global RT DIV: Chunking Support #1385

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
if (messageType.equals(MessageType.PUT)) {
Put put = (Put) message.getValue().payloadUnion;
// Select appropriate deserializers
Lazy deserializerProvider;
Lazy<RecordDeserializer> deserializerProvider;
int readerSchemaId;
if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) {
deserializerProvider = Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId));
Expand All @@ -783,9 +783,9 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
keyBytes,
put.getPutValue(),
message.getOffset(),
deserializerProvider,
readerSchemaId,
compressor);
compressor,
(valueBytes) -> deserializerProvider.get().deserialize(valueBytes));
if (assembledObject == null) {
// bufferAndAssembleRecord may have only buffered records and not returned anything yet because
// it's waiting for more input. In this case, just return an empty optional for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.davinci.validation.PartitionTracker.TopicType.REALTIME_TOPIC_TYPE;
import static com.linkedin.davinci.validation.PartitionTracker.TopicType.VERSION_TOPIC_TYPE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER;
Expand All @@ -33,6 +35,7 @@
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.davinci.validation.PartitionTracker;
import com.linkedin.davinci.validation.PartitionTracker.TopicType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -1479,7 +1482,7 @@ protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState part
upstreamTopic = versionTopic;
}
if (upstreamTopic.isRealTime()) {
offsetRecord.resetUpstreamOffsetMap(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap());
offsetRecord.mergeUpstreamOffsets(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap());
Copy link
Contributor

@sixpluszero sixpluszero Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you do this change? Personally I think this makes less explicit about the offset map we are tracking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I feel that reset() is not the right word for it. It's not clearing the state or replacing the map entirely. It's updating the existing map with the values of the new map and retaining the values missing from the new map.

I can rename it to updateUpstreamOffsets().

} else {
offsetRecord.setCheckpointUpstreamVersionTopicOffset(
partitionConsumptionState.getLatestProcessedUpstreamVersionTopicOffset());
Expand Down Expand Up @@ -1619,7 +1622,6 @@ protected static void checkAndHandleUpstreamOffsetRewind(
int actualSchemaId = ByteUtils.readInt(actualValue, 0);
Put put = (Put) envelope.payloadUnion;
if (actualSchemaId == put.schemaId) {

if (put.putValue.equals(
ByteBuffer.wrap(
actualValue,
Expand Down Expand Up @@ -1904,6 +1906,10 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
}
}
}
// Global RT DIV messages should be completely ignored when leader is consuming from remote version topic
if (record.getKey().isGlobalRtDiv()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have Global RT DIV messages in the remote version topic?
There are a few cases for remote version topic consumption:

  1. NR for the venice push job.
  2. Streaming Reprocessing job.
  3. Data recovery for batch store.
    None of the use cases in theory would encounter Global RT DIV messages.
    Today, there is no support for hybrid store data recovery and it is being done via re-push.
    So can you explain the scenarios in your mind?

return false;
}
}

if (!Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, record.getTopicPartition().getPubSubTopic())
Expand Down Expand Up @@ -2281,18 +2287,17 @@ protected Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> validate
Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records,
String kafkaUrl,
PubSubTopicPartition topicPartition) {
PartitionConsumptionState partitionConsumptionState =
partitionConsumptionStateMap.get(topicPartition.getPartitionNumber());
if (partitionConsumptionState == null) {
final PartitionConsumptionState pcs = partitionConsumptionStateMap.get(topicPartition.getPartitionNumber());
if (pcs == null) {
// The partition is likely unsubscribed, will skip these messages.
LOGGER.warn(
"No partition consumption state for store version: {}, partition:{}, will filter out all the messages",
"No partition consumption state for store version: {}, partition: {}, will filter out all the messages",
kafkaVersionTopic,
topicPartition.getPartitionNumber());
return Collections.emptyList();
}
boolean isEndOfPushReceived = partitionConsumptionState.isEndOfPushReceived();
if (!shouldProduceToVersionTopic(partitionConsumptionState)) {
boolean isEndOfPushReceived = pcs.isEndOfPushReceived();
if (!shouldProduceToVersionTopic(pcs)) {
return records;
}
/**
Expand All @@ -2302,30 +2307,15 @@ protected Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> validate
Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> iter = records.iterator();
while (iter.hasNext()) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = iter.next();
boolean isRealTimeMsg = record.getTopicPartition().getPubSubTopic().isRealTime();
boolean isRealTimeTopic = record.getTopicPartition().getPubSubTopic().isRealTime();
try {
/**
* TODO: An improvement can be made to fail all future versions for fatal DIV exceptions after EOP.
*/
if (!isGlobalRtDivEnabled) {
validateMessage(
PartitionTracker.VERSION_TOPIC,
this.kafkaDataIntegrityValidatorForLeaders,
record,
isEndOfPushReceived,
partitionConsumptionState);
} else {
validateMessage(
PartitionTracker.TopicType.of(
isRealTimeMsg
? PartitionTracker.TopicType.REALTIME_TOPIC_TYPE
: PartitionTracker.TopicType.VERSION_TOPIC_TYPE,
kafkaUrl),
this.kafkaDataIntegrityValidatorForLeaders,
record,
isEndOfPushReceived,
partitionConsumptionState);
}
final TopicType topicType = (isGlobalRtDivEnabled)
? TopicType.of(isRealTimeTopic ? REALTIME_TOPIC_TYPE : VERSION_TOPIC_TYPE, kafkaUrl)
: PartitionTracker.VERSION_TOPIC;
validateMessage(topicType, kafkaDataIntegrityValidatorForLeaders, record, isEndOfPushReceived, pcs);
versionedDIVStats.recordSuccessMsg(storeName, versionNumber);
} catch (FatalDataValidationException e) {
if (!isEndOfPushReceived) {
Expand All @@ -2342,7 +2332,7 @@ protected Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> validate
"Skipping a duplicate record from: {} offset: {} for replica: {}",
record.getTopicPartition(),
record.getOffset(),
partitionConsumptionState.getReplicaId());
pcs.getReplicaId());
iter.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_RT_DIV_STATE;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static com.linkedin.venice.utils.Utils.getReplicaId;
import static java.util.Comparator.comparingInt;
Expand Down Expand Up @@ -48,6 +49,7 @@
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
Expand Down Expand Up @@ -184,6 +186,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
private static final int MAX_KILL_CHECKING_ATTEMPTS = 10;
private static final int CHUNK_MANIFEST_SCHEMA_ID =
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
private static final int GLOBAL_RT_DIV_STATE_SCHEMA_ID =
AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion();

protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();
Expand Down Expand Up @@ -245,6 +249,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
* flushed to the metadata partition of the storage engine regularly in {@link #syncOffset(String, PartitionConsumptionState)}
*/
private final KafkaDataIntegrityValidator kafkaDataIntegrityValidator;

protected final HostLevelIngestionStats hostLevelIngestionStats;
protected final AggVersionedDIVStats versionedDIVStats;
protected final AggVersionedIngestionStats versionedIngestionStats;
Expand Down Expand Up @@ -317,6 +322,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final IngestionNotificationDispatcher ingestionNotificationDispatcher;

protected final ChunkAssembler chunkAssembler;
protected final ChunkAssembler divChunkAssembler;
private final Optional<ObjectCacheBackend> cacheBackend;
private DaVinciRecordTransformer recordTransformer;

Expand Down Expand Up @@ -467,6 +473,8 @@ public StoreIngestionTask(
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion);
this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic));
this.chunkAssembler = new ChunkAssembler(storeName);
this.divChunkAssembler =
builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ChunkAssembler is store specific, so can we use the instance from the builder, which typically only contains the sharable objects?

this.cacheBackend = cacheBackend;

if (recordTransformerFunction != null) {
Expand Down Expand Up @@ -1145,6 +1153,12 @@ private int handleSingleMessage(
record.getTopicPartition().getPartitionNumber(),
partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()));
}
} else if (record.getKey().isGlobalRtDiv()) {
// TODO: This is a placeholder for the actual implementation.
if (isGlobalRtDivEnabled) {
processGlobalRtDivMessage(record); // This is a global realtime topic data integrity validator snapshot
}
return 0;
}

// This function may modify the original record in KME and it is unsafe to use the payload from KME directly after
Expand Down Expand Up @@ -1189,6 +1203,28 @@ private int handleSingleMessage(
return record.getPayloadSize();
}

void processGlobalRtDivMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Put put = (Put) value.getPayloadUnion();

Object assembledObject = divChunkAssembler.bufferAndAssembleRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this function is using in-memory storage engine as a temp storage, which will increase the memory usage, which is different from the regular data chunk handling, which would minimize the memory usage, and I think this is also what we agreed during the design review.
cc @lluwm

record.getTopicPartition(),
put.getSchemaId(),
key.getKey(),
put.getPutValue(),
record.getOffset(),
put.getSchemaId(),
new NoopCompressor(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we always use a certain compression algo for GLOBAL RT DIV messages?
We know DIV is large sometimes, I wondered whether we can always enable gzip compression or not to reduce the Kafka usage.

(valueBytes) -> GLOBAL_RT_DIV_STATE.getSerializer()
.deserialize(ByteUtils.extractByteArray(valueBytes), GLOBAL_RT_DIV_STATE_SCHEMA_ID));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update InternalAvroSpecificSerializer to support a ByteBuffer as the input?


if (assembledObject == null) {
return; // the message value only contained one data chunk, so the Global RT DIV cannot yet be fully assembled
}
// TODO: We will add the code to process Global RT DIV message later in here.
}

/**
* This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}.
*
Expand Down Expand Up @@ -2397,6 +2433,12 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
return false;
}

// Global RT DIV messages should only be in version topics, not realtime topics. Skip it and log a warning.
if (record.getKey().isGlobalRtDiv() && record.getTopic().isRealTime()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the scenarios that the RT topic would contain Global RT DIV messages?

LOGGER.warn("Skipping Global RT DIV message from realtime topic partition: {}", record.getTopicPartition());
return false;
}

if (partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.isBatchOnly()) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Expand Down Expand Up @@ -3786,7 +3828,7 @@ private int processKafkaDataMessage(
private void waitReadyToProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record)
throws InterruptedException {
KafkaMessageEnvelope kafkaValue = record.getValue();
if (record.getKey().isControlMessage() || kafkaValue == null) {
if (record.getKey().isControlMessage() || record.getKey().isGlobalRtDiv() || kafkaValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Global RT DIV be one type of Control Message?

return;
}

Expand Down Expand Up @@ -4561,4 +4603,8 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
protected boolean isDaVinciClient() {
return isDaVinciClient;
}

ChunkAssembler getDivChunkAssembler() {
return this.divChunkAssembler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.davinci.utils.ChunkAssembler;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -126,6 +127,7 @@ public static class Builder {
private PubSubTopicRepository pubSubTopicRepository;
private Runnable runnableForKillIngestionTasksForNonCurrentVersions;
private ExecutorService aaWCWorkLoadProcessingThreadPool;
private ChunkAssembler divChunkAssembler;

private interface Setter {
void apply();
Expand Down Expand Up @@ -336,5 +338,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi
public ExecutorService getAAWCWorkLoadProcessingThreadPool() {
return this.aaWCWorkLoadProcessingThreadPool;
}

public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkAssember has a store name field, and in theory, it is not sharable among store ingestion tasks with different stores.
Can you explain your thinking here?

return set(() -> this.divChunkAssembler = divChunkAssembler);
}

public ChunkAssembler getDivChunkAssembler() {
return divChunkAssembler;
}
}
}
Loading
Loading