Skip to content

Commit

Permalink
Fix wrong offset increment for Sarama message set (streamnative#1992)
Browse files Browse the repository at this point in the history
### Motivation

When Sarama producers send messages to KoP with message set, e.g.

```golang
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
```

The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong
`LogAppendInfo#numMessages` result. It's because this method uses
`RecordBatch#lastOffset` to get the latest offset.

https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028

However, when Sarama client handles the message set (v0 or v1 records),
it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results
in a wrong offset. Assuming there are 2 batches whose size is 5, the
offsets written in the entries could be:

- Entry 0: index=0
- Entry 1: index=1
- LEO: 2

The correct offsets should be:

- Entry 0: index=0
- Entry 1: index=5
- LEO: 10

The wrong LEO could make the offset check in
`PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset
the offset and consume duplicated messages.

### Modifications

When the `lastOffset` is 0, iterate over the records in the batch to
compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in
this patch, we only add a `SaramaCompressedV1Records` class to simulate
the behavior of Sarama. Then add the unit tests.

(cherry picked from commit 0241e43)
(cherry picked from commit f3b2b30)
  • Loading branch information
BewareMyPower committed Aug 4, 2023
1 parent 4cb7c5d commit 18934ff
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
Expand Down Expand Up @@ -997,7 +998,17 @@ public LogAppendInfo analyzeAndValidateRecords(MemoryRecords records) {
batch.ensureValid();
shallowMessageCount += 1;
validBytesCount += batchSize;
numMessages += (batch.lastOffset() - batch.baseOffset() + 1);

int numMessagesInBatch = (int) (batch.lastOffset() - batch.baseOffset() + 1);
if (numMessagesInBatch <= 1) {
// The lastOffset field might be set. We need to iterate the records.
for (Record record : batch) {
numMessages++;
}
} else {
numMessages += numMessagesInBatch;
}

isTransaction = batch.isTransactional();
isControlBatch = batch.isControlBatch();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ public void testAnalyzeAndValidateRecords(CompressionType compressionType) {
Assert.assertFalse(appendInfo.isTransaction());
}

@DataProvider(name = "compressionTypesForSarama")
public static Object[] compressionTypesForSarama() {
return Arrays.stream(CompressionType.values()).filter(t ->
t.id > CompressionType.NONE.id && t.id < CompressionType.ZSTD.id
).map(x -> (Object) x).toArray();
}

@Test(dataProvider = "compressionTypesForSarama")
public void testAnalyzeSaramaV1CompressedRecords(CompressionType compressionType) throws Exception {
final SaramaCompressedV1Records builder = new SaramaCompressedV1Records(compressionType);
for (int i = 0; i < 3; i++) {
builder.appendLegacyRecord(i, "msg-" + i);
}
final MemoryRecords records = builder.build();
PartitionLog.LogAppendInfo appendInfo = PARTITION_LOG.analyzeAndValidateRecords(records);
Assert.assertEquals(appendInfo.numMessages(), 3);
}

@Test
public void testAnalyzeAndValidateEmptyRecords() {
MemoryRecords memoryRecords = buildMemoryRecords(new int[]{}, CompressionType.NONE, 0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.storage;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

/**
* Simulate the behavior of Golang Sarama library for v0 and v1 message set with compression enabled.
*
* See https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_set.go#L181-L184 and
* https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/message.go#L82. The `Message.encode`
* method does not set the last offset field.
*/
public class SaramaCompressedV1Records {

// See https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/async_producer.go#L446
private static final byte MAGIC = RecordBatch.MAGIC_VALUE_V1;
private static final TimestampType TIMESTAMP_TYPE = TimestampType.LOG_APPEND_TIME;
private final ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(ByteBuffer.allocate(1024 * 10));
private final DataOutputStream appendStream;
private final CompressionType compressionType;
private long timestamp;

public SaramaCompressedV1Records(CompressionType compressionType) {
if (compressionType == CompressionType.NONE) {
throw new IllegalArgumentException("CompressionType should not be NONE");
}
this.compressionType = compressionType;
this.bufferStream.position(AbstractRecords.recordBatchHeaderSizeInBytes(MAGIC, compressionType));
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, MAGIC));
}

public void appendLegacyRecord(long offset, String value) throws IOException {
// Only test null key and current timestamp
timestamp = System.currentTimeMillis();
int size = LegacyRecord.recordSize(MAGIC, 0, value.getBytes().length);
appendStream.writeLong(offset);
appendStream.writeInt(size);
LegacyRecord.write(appendStream, MAGIC, timestamp, null, ByteBuffer.wrap(value.getBytes()),
CompressionType.NONE, TimestampType.LOG_APPEND_TIME);
}

public MemoryRecords build() throws IOException {
close();
ByteBuffer buffer = bufferStream.buffer().duplicate();
buffer.flip();
buffer.position(0);
return MemoryRecords.readableRecords(buffer.slice());
}

private void close() throws IOException {
appendStream.close();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(0);

// NOTE: This is the core difference between Sarama and Kafka official Java client. Sarama does not write the
// last offset.
buffer.putLong(0L);

int wrapperSize = pos - Records.LOG_OVERHEAD;
buffer.putInt(wrapperSize);
LegacyRecord.writeCompressedRecordHeader(buffer, MAGIC, wrapperSize, timestamp, compressionType,
TIMESTAMP_TYPE);
buffer.position(pos);
}
}

0 comments on commit 18934ff

Please sign in to comment.