diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java index 22d0326165c..aacb83b88f4 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java @@ -48,7 +48,7 @@ public void writeString(Utf8 utf8) throws IOException { @Override public void writeString(String string) throws IOException { - if (0 == string.length()) { + if (string.isEmpty()) { writeZero(); return; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java new file mode 100644 index 00000000000..aaea657c3e6 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * An {@link Encoder} for Avro's binary encoding that does not buffer output. + *

+ * This encoder does not buffer writes, and as a result is slower than + * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when + * the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is + * very short-lived. + *

+ * To construct, use + * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)} + *

+ * DirectBinaryEncoder is not thread-safe + * + * @see BinaryEncoder + * @see EncoderFactory + * @see Encoder + * @see Decoder + */ +public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { + private OutputStream originalStream; + + private final ByteArrayOutputStream buffer; + + private boolean inBlock = false; + + private long blockItemCount; + + /** + * Create a writer that sends its output to the underlying stream + * out. + * + * @param out The Outputstream to write to + */ + public BlockingDirectBinaryEncoder(OutputStream out) { + super(out); + buffer = new ByteArrayOutputStream(); + } + + private void startBlock() { + if (inBlock) { + throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); + } + originalStream = out; + out = buffer; + inBlock = true; + } + + private void endBlock() { + if (!inBlock) { + throw new RuntimeException("Called endBlock, while not buffering a block"); + } + out = originalStream; + if (blockItemCount > 0) { + try { + // Make it negative, so the reader knows that the number of bytes is coming + writeLong(-blockItemCount); + writeLong(buffer.size()); + writeBytes(buffer.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + inBlock = false; + buffer.reset(); + } + + @Override + public void setItemCount(long itemCount) throws IOException { + blockItemCount = itemCount; + } + + @Override + public void writeArrayStart() throws IOException { + startBlock(); + } + + @Override + public void writeArrayEnd() throws IOException { + endBlock(); + // Writes another zero to indicate that this is the last block + super.writeArrayEnd(); + } + + @Override + public void writeMapStart() throws IOException { + startBlock(); + } + + @Override + public void writeMapEnd() throws IOException { + endBlock(); + // Writes another zero to indicate that this is the last block + super.writeMapEnd(); + } +} diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java index 62b2a482627..8b485170ed7 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java @@ -27,20 +27,20 @@ * This encoder does not buffer writes, and as a result is slower than * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when * the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is - * very short lived. + * very short-lived. *

* To construct, use * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)} *

* DirectBinaryEncoder is not thread-safe - * + * * @see BinaryEncoder * @see EncoderFactory * @see Encoder * @see Decoder */ public class DirectBinaryEncoder extends BinaryEncoder { - private OutputStream out; + protected OutputStream out; // the buffer is used for writing floats, doubles, and large longs. private final byte[] buf = new byte[12]; @@ -48,7 +48,7 @@ public class DirectBinaryEncoder extends BinaryEncoder { * Create a writer that sends its output to the underlying stream * out. **/ - DirectBinaryEncoder(OutputStream out) { + public DirectBinaryEncoder(OutputStream out) { configure(out); } @@ -69,7 +69,7 @@ public void writeBoolean(boolean b) throws IOException { } /* - * buffering is slower for ints that encode to just 1 or two bytes, and and + * buffering is slower for ints that encode to just 1 or two bytes, and * faster for large ones. (Sun JRE 1.6u22, x64 -server) */ @Override diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java index 055ef9541d9..1cf3c4a5406 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java @@ -217,6 +217,48 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse) } } + /** + * Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the OutputStream + * provided as the destination for written data. If reuse is provided, an + * attempt will be made to reconfigure reuse rather than construct a new + * instance, but this is not guaranteed, a new instance may be returned. + *

+ * The {@link BinaryEncoder} implementation returned does not buffer its output, + * calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to + * be flushed. + *

+ * The {@link BlockingDirectBinaryEncoder} will write the block sizes for the + * arrays and maps so efficient skipping can be done. + *

+ * Performance of unbuffered writes can be significantly slower than buffered + * writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns + * BinaryEncoder instances that are tuned for performance but may buffer output. + * The unbuffered, 'direct' encoder may be desired when buffering semantics are + * problematic, or if the lifetime of the encoder is so short that the buffer + * would not be useful. + *

+ * {@link BinaryEncoder} instances returned by this method are not thread-safe. + * + * @param out The OutputStream to initialize to. Cannot be null. + * @param reuse The BinaryEncoder to attempt to reuse given the factory + * configuration. A BinaryEncoder implementation may not be + * compatible with reuse, causing a new instance to be returned. If + * null, a new instance is returned. + * @return A BinaryEncoder that uses out as its data output. If + * reuse is null, this will be a new instance. If reuse is + * not null, then the returned instance may be a new instance or + * reuse reconfigured to use out. + * @see DirectBinaryEncoder + * @see Encoder + */ + public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) { + if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) { + return new BlockingDirectBinaryEncoder(out); + } else { + return ((DirectBinaryEncoder) reuse).configure(out); + } + } + /** * Creates or reinitializes a {@link BinaryEncoder} with the OutputStream * provided as the destination for written data. If reuse is provided, an diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java index 1d9009aacb1..f620f7e5abd 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java @@ -181,6 +181,50 @@ void directBinaryEncoder() throws IOException { assertArrayEquals(complexdata, result2); } + @Test + void blockingDirectBinaryEncoder() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null); + generateData(e, true); + + byte[] result = baos.toByteArray(); + assertEquals(legacydata.length, result.length); + assertArrayEquals(legacydata, result); + baos.reset(); + + generateComplexData(e); + byte[] result2 = baos.toByteArray(); + // blocking will cause different length, should be four bytes larger + assertEquals(complexdata.length + 4, result2.length); + // the first byte is the array start, with the count of items negative + assertEquals(complexdata[0] >>> 1, result2[0]); + baos.reset(); + + e.writeArrayStart(); + e.setItemCount(1); + e.startItem(); + e.writeInt(1); + e.writeArrayEnd(); + + // 1: 1 element in the array + // 2: 2 bytes for the int + // 2-3: data + // 4: 0 elements in the next block + assertArrayEquals(baos.toByteArray(), new byte[]{1, 2, 2, 2, 0}); + baos.reset(); + + e.writeArrayStart(); + e.setItemCount(0); + e.writeArrayEnd(); + + // This is correct + // 0: 0 elements in the block + assertArrayEquals(baos.toByteArray(), new byte[]{0}); + baos.reset(); + + baos.reset(); + } + @Test void blockingBinaryEncoder() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java index dbed64d6a18..14930ebce7b 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java @@ -54,7 +54,7 @@ public class TestEncoders { private static final int ENCODER_BUFFER_SIZE = 32; private static final int EXAMPLE_DATA_SIZE = 17; - private static EncoderFactory factory = EncoderFactory.get(); + private static final EncoderFactory FACTORY = EncoderFactory.get(); @TempDir public File DIR; @@ -62,14 +62,14 @@ public class TestEncoders { @Test void binaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); - BinaryEncoder enc = factory.binaryEncoder(out, null); - assertSame(enc, factory.binaryEncoder(out, enc)); + BinaryEncoder enc = FACTORY.binaryEncoder(out, null); + assertSame(enc, FACTORY.binaryEncoder(out, enc)); } @Test void badBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.binaryEncoder(null, null); + FACTORY.binaryEncoder(null, null); }); } @@ -77,29 +77,43 @@ void badBinaryEncoderInit() { void blockingBinaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); BinaryEncoder reuse = null; - reuse = factory.blockingBinaryEncoder(out, reuse); - assertSame(reuse, factory.blockingBinaryEncoder(out, reuse)); + reuse = FACTORY.blockingBinaryEncoder(out, reuse); + assertSame(reuse, FACTORY.blockingBinaryEncoder(out, reuse)); // comparison } @Test void badBlockintBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.binaryEncoder(null, null); + FACTORY.binaryEncoder(null, null); }); } @Test void directBinaryEncoderInit() throws IOException { OutputStream out = new ByteArrayOutputStream(); - BinaryEncoder enc = factory.directBinaryEncoder(out, null); - assertSame(enc, factory.directBinaryEncoder(out, enc)); + BinaryEncoder enc = FACTORY.directBinaryEncoder(out, null); + assertSame(enc, FACTORY.directBinaryEncoder(out, enc)); } @Test void badDirectBinaryEncoderInit() { assertThrows(NullPointerException.class, () -> { - factory.directBinaryEncoder(null, null); + FACTORY.directBinaryEncoder(null, null); + }); + } + + @Test + void blockingDirectBinaryEncoderInit() throws IOException { + OutputStream out = new ByteArrayOutputStream(); + BinaryEncoder enc = FACTORY.blockingDirectBinaryEncoder(out, null); + assertSame(enc, FACTORY.blockingDirectBinaryEncoder(out, enc)); + } + + @Test + void badBlockingDirectBinaryEncoderInit() { + assertThrows(NullPointerException.class, () -> { + FACTORY.blockingDirectBinaryEncoder(null, null); }); } @@ -107,22 +121,22 @@ void badDirectBinaryEncoderInit() { void jsonEncoderInit() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream out = new ByteArrayOutputStream(); - factory.jsonEncoder(s, out); - JsonEncoder enc = factory.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8)); + FACTORY.jsonEncoder(s, out); + JsonEncoder enc = FACTORY.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8)); enc.configure(out); } @Test void badJsonEncoderInitOS() throws IOException { assertThrows(NullPointerException.class, () -> { - factory.jsonEncoder(Schema.create(Type.INT), (OutputStream) null); + FACTORY.jsonEncoder(Schema.create(Type.INT), (OutputStream) null); }); } @Test void badJsonEncoderInit() throws IOException { assertThrows(NullPointerException.class, () -> { - factory.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null); + FACTORY.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null); }); } @@ -130,7 +144,7 @@ void badJsonEncoderInit() throws IOException { void jsonEncoderNewlineDelimited() throws IOException { OutputStream out = new ByteArrayOutputStream(); Schema ints = Schema.create(Type.INT); - Encoder e = factory.jsonEncoder(ints, out); + Encoder e = FACTORY.jsonEncoder(ints, out); String separator = System.getProperty("line.separator"); GenericDatumWriter writer = new GenericDatumWriter<>(ints); writer.write(1, e); @@ -169,8 +183,8 @@ void jsonEncoderWhenIncludeNamespaceOptionIsTrue() throws IOException { void validatingEncoderInit() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream out = new ByteArrayOutputStream(); - Encoder e = factory.directBinaryEncoder(out, null); - factory.validatingEncoder(s, e).configure(e); + Encoder e = FACTORY.directBinaryEncoder(out, null); + FACTORY.validatingEncoder(s, e).configure(e); } @Test @@ -324,7 +338,7 @@ private String fromAvroToJson(byte[] avroBytes, Schema schema, boolean includeNa DatumWriter writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream output = new ByteArrayOutputStream(); - JsonEncoder encoder = factory.jsonEncoder(schema, output); + JsonEncoder encoder = FACTORY.jsonEncoder(schema, output); encoder.setIncludeNamespace(includeNamespace); Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null); Object datum = reader.read(null, decoder); @@ -340,7 +354,7 @@ public void testJsonEncoderInitAutoFlush() throws IOException { Schema s = new Schema.Parser().parse("\"int\""); OutputStream baos = new ByteArrayOutputStream(); OutputStream out = new BufferedOutputStream(baos); - JsonEncoder enc = factory.jsonEncoder(s, out, false); + JsonEncoder enc = FACTORY.jsonEncoder(s, out, false); enc.configure(out, false); enc.writeInt(24); enc.flush(); @@ -354,7 +368,7 @@ public void testJsonEncoderInitAutoFlushDisabled() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStream out = new BufferedOutputStream(baos); Schema ints = Schema.create(Type.INT); - Encoder e = factory.jsonEncoder(ints, out, false, false); + Encoder e = FACTORY.jsonEncoder(ints, out, false, false); String separator = System.getProperty("line.separator"); GenericDatumWriter writer = new GenericDatumWriter(ints); writer.write(1, e);