Skip to content

Commit

Permalink
Speedup writing TranslogHeader (elastic#95790)
Browse files Browse the repository at this point in the history
We were doing single byte writes to the translog output
channel via the output stream wrapper. This is very slow
since there's no buffering involved here.
-> just build the header that is of known size anyway up
front and write it in one go.
  • Loading branch information
original-brownbear authored May 4, 2023
1 parent bedaf3c commit f951ed2
Showing 1 changed file with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.ByteUtils;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.zip.CRC32;

/**
* Each translog file is started with a translog header then followed by translog operations.
Expand Down Expand Up @@ -148,25 +153,36 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
}
}

private static final byte[] TRANSLOG_HEADER;

static {
var out = new ByteArrayOutputStream();
try {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), TRANSLOG_CODEC, CURRENT_VERSION);
TRANSLOG_HEADER = out.toByteArray();
} catch (IOException e) {
throw new AssertionError(e);
}
}

/**
* Writes this header with the latest format into the file channel
*/
void write(final FileChannel channel) throws IOException {
// This output is intentionally not closed because closing it will close the FileChannel.
@SuppressWarnings({ "IOResourceOpenedButNotSafelyClosed", "resource" })
final BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(
new OutputStreamStreamOutput(java.nio.channels.Channels.newOutputStream(channel))
);
CodecUtil.writeHeader(new OutputStreamDataOutput(out), TRANSLOG_CODEC, CURRENT_VERSION);
// Write uuid
final BytesRef uuid = new BytesRef(translogUUID);
out.writeInt(uuid.length);
out.writeBytes(uuid.bytes, uuid.offset, uuid.length);
final byte[] buffer = Arrays.copyOf(TRANSLOG_HEADER, headerSizeInBytes);
// Write uuid and leave 4 bytes for its length
final int uuidOffset = TRANSLOG_HEADER.length + Integer.BYTES;
int offset = UnicodeUtil.UTF16toUTF8(translogUUID, 0, translogUUID.length(), buffer, uuidOffset);
// write uuid length before uuid
ByteUtils.writeIntBE(offset - uuidOffset, buffer, TRANSLOG_HEADER.length);
// Write primary term
out.writeLong(primaryTerm);
ByteUtils.writeLongBE(primaryTerm, buffer, offset);
offset += Long.BYTES;
final CRC32 crc32 = new CRC32();
crc32.update(buffer, 0, offset);
// Checksum header
out.writeInt((int) out.getChecksum());
out.flush();
ByteUtils.writeIntBE((int) crc32.getValue(), buffer, offset);
Channels.writeToChannel(buffer, channel);
channel.force(true);
assert channel.position() == headerSizeInBytes
: "Header is not fully written; header size [" + headerSizeInBytes + "], channel position [" + channel.position() + "]";
Expand Down

0 comments on commit f951ed2

Please sign in to comment.