Skip to content

Commit

Permalink
Revert #3224 to resolve CP Packaging 7.9.x build (#3373)
Browse files Browse the repository at this point in the history
  • Loading branch information
GunalKupta authored Oct 22, 2024
1 parent 75fe003 commit 58e664a
Show file tree
Hide file tree
Showing 12 changed files with 662 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.assertEquals;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
Expand All @@ -27,13 +28,13 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void tearDown() {

@Test
public void testDeserializeBytesIssue506() throws IOException, RestClientException {
formatter.init(props);
formatter.init(props);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
Expand Down Expand Up @@ -195,8 +196,8 @@ protected ConsumerRecord<byte[], byte[]> createConsumerRecord(boolean includeKey
System.arraycopy(SOME_BYTES, 0, value, 1 + SCHEMA_ID_BYTES.length, SOME_BYTES.length);

return new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME,
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0,
0, value.length,
includeKey ? key : null, value, new RecordHeaders(), Optional.empty());
includeKey ? key : null, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
Expand Down Expand Up @@ -100,8 +99,8 @@ public void testKafkaAvroValueFormatter() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedValue.length,
null, serializedValue, message.headers(), Optional.empty());
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, 0, serializedValue.length,
null, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand All @@ -125,8 +124,8 @@ public void testKafkaAvroKeyValueFormatter() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, serializedKey.length,
serializedValue.length, serializedKey, serializedValue, message.headers(), Optional.empty());
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedKey.length,
serializedValue.length, serializedKey, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand Down Expand Up @@ -154,8 +153,8 @@ public void testKafkaAvroValueWithTimestampFormatter() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, timestamp, timestampType, 0, serializedValue.length,
null, serializedValue, message.headers(), Optional.empty());
"topic1", 0, 200, timestamp, timestampType, 0, 0, serializedValue.length,
null, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand Down Expand Up @@ -197,8 +196,8 @@ public void testStringKey() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, serializedKey.length,
serializedValue.length, serializedKey, serializedValue, message.headers(), Optional.empty());
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedKey.length,
serializedValue.length, serializedKey, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand Down Expand Up @@ -229,8 +228,8 @@ public void testStringKeyWithTimestamp() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, timestamp, timestampType, serializedKey.length,
serializedValue.length, serializedKey, serializedValue, new RecordHeaders(), Optional.empty());
"topic1", 0, 200, timestamp, timestampType, 0, serializedKey.length,
serializedValue.length, serializedKey, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand All @@ -254,8 +253,8 @@ public void testKafkaAvroValueUsingLatestVersion() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedValue.length,
null, serializedValue, message.headers(), Optional.empty());
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, 0, serializedValue.length,
null, serializedValue);
formatter.writeTo(crecord, ps);
String outputJson = baos.toString();

Expand Down Expand Up @@ -305,8 +304,8 @@ public void testUsingTopicRecordNameStrategy() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedValue.length,
null, serializedValue, new RecordHeaders(), Optional.empty());
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, 0, serializedValue.length,
null, serializedValue);

avroMessageFormatter.writeTo(crecord, ps);

Expand Down Expand Up @@ -351,8 +350,8 @@ public void testUsingSubjectNameStrategy() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
topicName, 0, 200, 1000, TimestampType.LOG_APPEND_TIME, serializedKey.length, serializedValue.length,
serializedKey, serializedValue, new RecordHeaders(), Optional.empty());
topicName, 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, serializedKey.length, serializedValue.length,
serializedKey, serializedValue);

avroMessageFormatter.writeTo(crecord, ps);
String outputJson = baos.toString();
Expand Down Expand Up @@ -398,7 +397,7 @@ public void testUsingHeaders() throws Exception {
byte[] serializedValue = message.value();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<byte[], byte[]>(
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
topicName, 0, 200, 1000, TimestampType.LOG_APPEND_TIME, serializedKey.length, serializedValue.length,
serializedKey, serializedValue, message.headers(), Optional.empty());

Expand Down Expand Up @@ -448,7 +447,7 @@ public void testUsingNull() throws Exception {
byte[] serializedValue = message.value();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<byte[], byte[]>(
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
topicName, 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, 0,
serializedKey, serializedValue, message.headers(), Optional.empty());

Expand Down
Loading

0 comments on commit 58e664a

Please sign in to comment.