Skip to content

Commit

Permalink
Compress term dictionary with Zstd
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 25, 2024
1 parent c4b1773 commit 450faed
Showing 9 changed files with 117 additions and 24 deletions.
5 changes: 5 additions & 0 deletions gradle/verification-metadata.xml
Original file line number Diff line number Diff line change
@@ -411,6 +411,11 @@
<sha256 value="084197555590a53bb21b59508a3330559f536ddb448eafd1ec675f5462036fcf" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.github.luben" name="zstd-jni" version="1.5.5-11">
<artifact name="zstd-jni-1.5.5-11.jar">
<sha256 value="d75b2ced6059f81ad23e021c554259b906b6c4f2991cb772409827569ead4c1a" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.github.spotbugs" name="spotbugs-annotations" version="4.0.2">
<artifact name="spotbugs-annotations-4.0.2.jar">
<sha256 value="3ef6c9f822b601aa151e10e123b49e5604243a4a99bcc47e4e1f9eea9781dc63" origin="Generated by Gradle"/>
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ dependencies {
api "org.apache.logging.log4j:log4j-core:${versions.log4j}"

api "net.java.dev.jna:jna:${versions.jna}"
api 'com.github.luben:zstd-jni:1.5.5-11'

api "co.elastic.logging:log4j2-ecs-layout:${versions.ecsLogging}"
api "co.elastic.logging:ecs-logging-core:${versions.ecsLogging}"
Empty file.
26 changes: 26 additions & 0 deletions server/licenses/zstd-jni-LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Zstd-jni: JNI bindings to Zstd Library

Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.

BSD License

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1 change: 1 addition & 0 deletions server/licenses/zstd-jni-NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d75b2ced6059f81ad23e021c554259b906b6c4f2991cb772409827569ead4c1a
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
requires java.management;
requires jdk.unsupported;
requires java.net.http; // required by ingest-geoip's dependency maxmind.geoip2 https://github.com/elastic/elasticsearch/issues/93553
requires com.github.luben.zstd_jni;

requires org.elasticsearch.cli;
requires org.elasticsearch.base;
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@

package org.elasticsearch.index.codec.postings;

import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.FieldsConsumer;
import org.apache.lucene.codecs.NormsProducer;
@@ -19,6 +21,7 @@
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
@@ -29,6 +32,7 @@
import org.elasticsearch.lucene.util.BitStreamOutput;

import java.io.IOException;
import java.io.OutputStream;

import static org.elasticsearch.index.codec.postings.ES814InlinePostingsFormat.INVERTED_INDEX_CODEC;
import static org.elasticsearch.index.codec.postings.ES814InlinePostingsFormat.INVERTED_INDEX_EXTENSION;
@@ -116,6 +120,7 @@ public void write(Fields fields, NormsProducer norms) throws IOException {

ByteBuffersDataOutput termsOut = new ByteBuffersDataOutput();
ByteBuffersDataOutput postingsOut = new ByteBuffersDataOutput();
ByteBuffersDataOutput sparse = new ByteBuffersDataOutput();

PostingsWriter writer = new PostingsWriter(hasFreqs, hasPositions, hasOffsets, hasPayloads);
TermIndexWriter termIndexWriter = new TermIndexWriter(termIndex, index.getFilePointer());
@@ -148,14 +153,14 @@ public void write(Fields fields, NormsProducer norms) throws IOException {
++numTerms;
if (numPending >= MAX_BLOCK_TERMS || (termsOut.size() + postingsOut.size()) >= MAX_BLOCK_BYTES) {
termIndexWriter.addTerm(term, index.getFilePointer());
flushBlock(numPending, termsOut, postingsOut);
flushBlock(numPending, termsOut, postingsOut, sparse);
numPending = 0;
++numBlocks;
}
}
termIndexWriter.finish(index.getFilePointer());
if (numPending > 0) {
flushBlock(numPending, termsOut, postingsOut);
flushBlock(numPending, termsOut, postingsOut, sparse);
++numBlocks;
}
meta.writeLong(numTerms);
@@ -182,15 +187,18 @@ public void write(Fields fields, NormsProducer norms) throws IOException {
}
}

private void flushBlock(int numPending, ByteBuffersDataOutput termsOut, ByteBuffersDataOutput postingsOut) throws IOException {
private void flushBlock(int numPending, ByteBuffersDataOutput termsOut, ByteBuffersDataOutput postingsOut, ByteBuffersDataOutput sparse)
throws IOException {
index.writeVInt(numPending); // num terms
index.writeVLong(postingsOut.size()); // postings size
final long postingPointer = index.getFilePointer() + termsOut.size() + Long.BYTES;
index.writeLong(postingPointer);
termsOut.copyTo(index);
termsOut.reset();
compressTerms(termsOut, sparse);
index.writeVLong(termsOut.size()); // original term size
index.writeVLong(sparse.size()); // compressed terms byte
index.writeVLong(postingsOut.size()); // postings bytes
sparse.copyTo(index);
postingsOut.copyTo(index);
postingsOut.reset();
termsOut.reset();
sparse.reset();
}

private class PostingsWriter {
@@ -405,6 +413,32 @@ void finish(long blockAddress) throws IOException {
}
}

static void compressTerms(ByteBuffersDataOutput in, ByteBuffersDataOutput out) throws IOException {
ZstdOutputStreamNoFinalizer zstream = new ZstdOutputStreamNoFinalizer(new OutputStream() {
@Override
public void write(int b) {
out.writeByte((byte) b);
}

@Override
public void write(byte[] b, int off, int len) {
out.writeBytes(b, off, len);
}
});
in.copyTo(new DataOutput() {
@Override
public void writeByte(byte b) throws IOException {
zstream.write(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
zstream.write(b, offset, length);
}
});
zstream.close();
}

@Override
public void close() throws IOException {
IOUtils.close(meta, index, termIndex, prox);
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@

package org.elasticsearch.index.codec.postings;

import com.github.luben.zstd.Zstd;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.index.BaseTermsEnum;
@@ -22,7 +24,9 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ArrayUtil;
@@ -293,6 +297,11 @@ private class InlineTermsEnum extends BaseTermsEnum {
private final BytesRefBuilder term;
private final ES814InlinePostingsFormat.InlineTermState state = new ES814InlinePostingsFormat.InlineTermState();

// term buffers for zstd
private byte[] zCompressedTerms = new byte[1024];
private byte[] zDecompressedTerms = new byte[1024];
private DataInput termsReader;

InlineTermsEnum(Meta meta) throws IOException {
this.meta = meta;
this.term = new BytesRefBuilder();
@@ -325,8 +334,8 @@ private void reset() throws IOException {
state.blockIndex = -1;
state.termIndexInBlock = 0;
state.numTermsInBlock = -1;
state.postingsFilePointer = 0;
state.postingsSize = 0;
state.postingsFP = 0;
state.postingsBytes = 0;
}

private long findBlockIndex(BytesRef target) throws IOException {
@@ -424,34 +433,48 @@ public BytesRef next() throws IOException {
return null; // exhausted
}
if (state.blockIndex > 0) {
index.seek(state.postingsFilePointer + state.postingsSize);
index.seek(state.postingsFP + state.postingsBytes);
}
loadFrame();
}
scanNextTermInCurrentFrame();
return term.get();
}

private void decompressTerms(int compressedBytes, int originalBytes) throws IOException {
zCompressedTerms = ArrayUtil.growNoCopy(zCompressedTerms, compressedBytes);
zDecompressedTerms = ArrayUtil.growNoCopy(zDecompressedTerms, originalBytes);
index.readBytes(zCompressedTerms, 0, (int) compressedBytes);
long actual = Zstd.decompressByteArray(zDecompressedTerms, 0, originalBytes, zCompressedTerms, 0, compressedBytes);
assert actual == originalBytes : actual + " != " + originalBytes;
termsReader = new ByteArrayDataInput(zDecompressedTerms, 0, (int) actual);
}

private void loadFrame() throws IOException {
state.termIndexInBlock = 0;
state.numTermsInBlock = index.readVInt();
state.postingsSize = index.readVLong();
state.postingsFilePointer = index.readLong();
final long originalTermsBytes = index.readVLong();
final long termBytes = index.readVLong();
state.postingsBytes = index.readVLong();
long termsFP = index.getFilePointer();
state.postingsFP = termsFP + termBytes;
decompressTerms((int) termBytes, (int) originalTermsBytes);
}

private void scanNextTermInCurrentFrame() throws IOException {
term.setLength(index.readVInt());
index.readBytes(term.bytes(), 0, term.length());
state.docFreq = index.readInt();
term.setLength(termsReader.readVInt());
termsReader.readBytes(term.bytes(), 0, term.length());
state.docFreq = termsReader.readInt();

if (meta.options.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0) {
state.totalTermFreq = index.readLong();
state.totalTermFreq = termsReader.readLong();
} else {
state.totalTermFreq = state.docFreq;
}
if (meta.options.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0) {
state.proxOffset = index.readLong();
state.proxOffset = termsReader.readLong();
}
state.docOffset = state.postingsFilePointer + index.readLong();
state.docOffset = state.postingsFP + termsReader.readLong();
state.termIndexInBlock++;
}

@@ -742,7 +765,7 @@ private void readPosition() throws IOException {
if (p == 0) {
refillPositions();
}
pos += posBuffer[p];
pos += (int) posBuffer[p];
if (options.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0) {
startOffset = endOffset + (int) offsetBuffer[p];
endOffset = startOffset + (int) lengthBuffer[p];
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ public final class ES814InlinePostingsFormat extends PostingsFormat {

static final int TERM_INDEX_BLOCK_SIZE = 4096;

static final int ZSTD_COMPRESSION_LEVEL = 3;

/**
* Sole constructor.
*/
@@ -62,8 +64,8 @@ public static final class InlineTermState extends BlockTermState {
public long termIndexInBlock;
public long numTermsInBlock;

public long postingsFilePointer;
public long postingsSize;
public long postingsFP;
public long postingsBytes;

public long docOffset;
public long proxOffset;
@@ -75,8 +77,8 @@ public void copyFrom(TermState _other) {
this.blockIndex = state.blockIndex;
this.termIndexInBlock = state.termIndexInBlock;
this.numTermsInBlock = state.numTermsInBlock;
this.postingsFilePointer = state.postingsFilePointer;
this.postingsSize = state.postingsSize;
this.postingsFP = state.postingsFP;
this.postingsBytes = state.postingsBytes;
this.docOffset = state.docOffset;
this.proxOffset = state.proxOffset;
}

0 comments on commit 450faed

Please sign in to comment.