Skip to content

Commit

Permalink
Fix issues with queries that are bigger than 2GB (#266)
Browse files Browse the repository at this point in the history
* Update QuerySource hashing functions

* Update hash tests

* Add query size limitation for reading queries as string

* Put query indices in record class instead of long array

* Fix test

* Delegate QueryIndex to its own class

* Make InMemQueryList store queries in byte arrays instead of strings

* Add shallow copy for BigByteArrayOutputStream

* Add available methods for BigByteArrayInputStream

* Fix shallow copy

* Fix available method

* Make BigByteArrayInputStream use a ByteBuffer

* Change RequestFactory

* Make StreamEntityProducer to not fully read in stream

* Fix test

* Try to speedup indexing

* Remove hash code file

* Remove unnecessary exception

* Change ByteArrayListInputStream

* Use ByteArrayListInputStreams for storing queries instead of ByteArrayOutputStreams

* Fix tests

* Speed up FileUtils again

* Add doc mentioning limitations

* Fix function

* Remove unused methods and add comments

* Use constants

* Change title and move paragraph

* Complete last commit

* Fix test
  • Loading branch information
nck-mlcnv authored Sep 13, 2024
1 parent 9dc1b50 commit f1761cf
Show file tree
Hide file tree
Showing 16 changed files with 402 additions and 221 deletions.
5 changes: 5 additions & 0 deletions docs/configuration/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ WHERE {
```
The `separator` property should be set to `"\n###\n"`. (be aware of different line endings on different operating systems)

## Huge Query Strings
When working with large queries (Queries that are larger than 2³¹ Bytes or ~2GB),
it is important to consider that only the request types `post query` and `update query`
support large queries.

## Example
```yaml
tasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOExc
return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex));
}

public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) throws IOException {
public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) {
final var queryIndex = querySelector.getNextIndex();
return new QueryStreamWrapper(queryIndex, config.caching(), () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import org.aksw.iguana.cc.query.list.QueryList;
import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.commons.io.ByteArrayListInputStream;
import org.aksw.iguana.commons.io.ByteArrayListOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -21,21 +23,45 @@ public class InMemQueryList extends QueryList {

private static final Logger LOGGER = LoggerFactory.getLogger(InMemQueryList.class);

private final List<byte[]> queries;
private final List<ByteArrayListOutputStream> queries = new ArrayList<>();

public InMemQueryList(QuerySource querySource) throws IOException {
super(querySource);
queries = this.querySource.getAllQueries().stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toList();
LOGGER.info("Reading queries from the source with the hash code {} into memory.", querySource.hashCode());
for (int i = 0; i < querySource.size(); i++) {
try (InputStream queryStream = querySource.getQueryStream(i)) {
ByteArrayListOutputStream balos = new ByteArrayListOutputStream();
byte[] currentBuffer;
do {
currentBuffer = queryStream.readNBytes(Integer.MAX_VALUE / 2);
balos.write(currentBuffer);
} while (currentBuffer.length == Integer.MAX_VALUE / 2);
balos.close();
queries.add(balos);
}
}
}

@Override
public String getQuery(int index) {
return new String(this.queries.get(index), StandardCharsets.UTF_8);
final var queryStream = queries.get(index);
if (queryStream.size() > Integer.MAX_VALUE - 8) {
throw new OutOfMemoryError("Query is too large to be read into a string object.");
}

byte[] buffer;
try {
buffer = queryStream.toInputStream().readNBytes(Integer.MAX_VALUE - 8);
} catch (IOException ignored) {
LOGGER.error("Could not read query into string.");
return "";
}
return new String(buffer, StandardCharsets.UTF_8);
}

@Override
public InputStream getQueryStream(int index) {
return new ByteArrayInputStream(this.queries.get(index));
return new ByteArrayListInputStream(queries.get(index).getBuffers());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public List<String> getAllQueries() throws IOException {

@Override
public int hashCode() {
return FileUtils.getHashcodeFromFileContent(this.files[0]);
return FileUtils.getHashcodeFromDirectory(this.path);
}
}
146 changes: 106 additions & 40 deletions src/main/java/org/aksw/iguana/cc/utils/files/FileUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.aksw.iguana.cc.utils.files;

import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -14,20 +19,62 @@
*
*/
public class FileUtils {
private static final XXHashFactory hasherFactory = XXHashFactory.fastestJavaInstance();
private static final int BUFFER_SIZE = 8192;

/**
* This method calculates the hashcode of the content of a file. <br/>
* The hashcode is calculated using the XXHash64 algorithm.
*
* @param filepath the path of the file
* @return the hashcode of the file content
*/
public static int getHashcodeFromFileContent(Path filepath) {
int hashcode;
try {
String fileContents = readFile(filepath);
hashcode = Math.abs(fileContents.hashCode());
try (StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0);
InputStream is = new BufferedInputStream(Files.newInputStream(filepath), BUFFER_SIZE)) {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = (is.read(buffer))) != -1) {
hasher.update(buffer, 0, bytesRead);
}
hashcode = (int) hasher.getValue();
} catch (IOException e) {
hashcode = 0;
return 0;
}

return hashcode;
}

public static String readFile(Path path) throws IOException {
return Files.readString(path, StandardCharsets.UTF_8);
/**
* This method calculated the hashcode of a directory by hashing the content of all files in the directory. <br/>
* Only top-level files are considered, subdirectories are ignored. <br/>
* The hashcode is calculated using the XXHash64 algorithm.
*
* @param directory the path of the directory
* @return the hashcode of the directory content
*/
public static int getHashcodeFromDirectory(Path directory) {

int hashcode;
try (StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0)) {
for (Path file : Files.list(directory).sorted().toArray(Path[]::new)) {
if (Files.isRegularFile(file)) {
try (InputStream is = new BufferedInputStream(Files.newInputStream(file), BUFFER_SIZE)) {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = (is.read(buffer))) != -1) {
hasher.update(buffer, 0, bytesRead);
}
}
}
}
hashcode = (int) hasher.getValue();
} catch (IOException e) {
return 0;
}

return hashcode;
}

/**
Expand All @@ -48,16 +95,24 @@ public static String readFile(Path path) throws IOException {
public static String getLineEnding(Path filepath) throws IOException {
if (filepath == null)
throw new IllegalArgumentException("Filepath must not be null.");
try(BufferedReader br = Files.newBufferedReader(filepath)) {
try (BufferedReader br = Files.newBufferedReader(filepath)) {
CharBuffer buffer = CharBuffer.allocate(8192);
char c;
while ((c = (char) br.read()) != (char) -1) {
if (c == '\n')
return "\n";
else if (c == '\r') {
if ((char) br.read() == '\n')
return "\r\n";
return "\r";
while (br.read(buffer) != -1) {
buffer.flip();
while (buffer.hasRemaining()) {
c = buffer.get();
if (c == '\n')
return "\n";
else if (c == '\r') {
if (!buffer.hasRemaining() && br.read(buffer) == -1)
return "\r";
if (buffer.hasRemaining() && buffer.get() == '\n')
return "\r\n";
return "\r";
}
}
buffer.clear();
}
}

Expand All @@ -84,45 +139,56 @@ private static int[] computePrefixTable(byte[] pattern) {
return prefixTable;
}

public static List<long[]> indexStream(String separator, InputStream is) throws IOException {
public static List<QueryIndex> indexStream(String separator, InputStream is) throws IOException {
// basically Knuth-Morris-Pratt
List<long[]> indices = new ArrayList<>();
List<QueryIndex> indices = new ArrayList<>();


final byte[] sepArray = separator.getBytes(StandardCharsets.UTF_8);
final int[] prefixTable = computePrefixTable(sepArray);

long itemStart = 0;

long byteOffset = 0;
int patternIndex = 0;
byte[] currentByte = new byte[1];
while (is.read(currentByte) == 1) {
// skipping fast-forward with the prefixTable
while (patternIndex > 0 && currentByte[0] != sepArray[patternIndex]) {
patternIndex = prefixTable[patternIndex - 1];
}


if (currentByte[0] == sepArray[patternIndex]) {
patternIndex++;

if (patternIndex == sepArray.length) { // match found
patternIndex = 0;
final long itemEnd = byteOffset - sepArray.length + 1;
final long len = itemEnd - itemStart;
indices.add(new long[]{itemStart, len});

itemStart = byteOffset + 1;
}
}

byteOffset++;
}
// read from the stream in chunks, because the BufferedInputStream is synchronized, so single byte reads are
// slow, which is especially bad for large files
byte[] buffer = new byte[8192];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
byte currentByte;
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
byteBuffer.limit(bytesRead);
while (byteBuffer.hasRemaining()) {
currentByte = byteBuffer.get();
// skipping fast-forward with the prefixTable
while (patternIndex > 0 && currentByte != sepArray[patternIndex]) {
patternIndex = prefixTable[patternIndex - 1];
}


if (currentByte == sepArray[patternIndex]) {
patternIndex++;

if (patternIndex == sepArray.length) { // match found
patternIndex = 0;
final long itemEnd = byteOffset - sepArray.length + 1;
final long len = itemEnd - itemStart;
indices.add(new QueryIndex(itemStart, len));

itemStart = byteOffset + 1;
}
}

byteOffset++;
}
byteBuffer.clear();

}

final long itemEnd = byteOffset;
final long len = itemEnd - itemStart;
indices.add(new long[]{itemStart, len});
indices.add(new QueryIndex(itemStart, len));

return indices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class IndexedQueryReader {
/**
* This list stores the start position and the length of each indexed content.
*/
private final List<long[]> indices;
private final List<QueryIndex> indices;

/**
* The file whose content should be indexed.
Expand Down Expand Up @@ -96,12 +96,19 @@ private IndexedQueryReader(Path filepath, String separator) throws IOException {
* @throws IOException
*/
public String readQuery(int index) throws IOException {
final int size;
try {
size = Math.toIntExact(indices.get(index).queryLength());
} catch (Exception e) {
throw new OutOfMemoryError("Can't read a Query to a string, that's bigger than 2^31 Bytes (~2GB)");
}
// Indexed queries can't be larger than ~2GB
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
final ByteBuffer buffer = ByteBuffer.allocate((int) indices.get(index)[1]);
final var read = channel.read(buffer, indices.get(index)[0]);
assert read == indices.get(index)[1];
return new String(buffer.array(), StandardCharsets.UTF_8);
final byte[] buffer = new byte[size]; // it's supposedly faster to manually create a byte array than a ByteBuffer
final ByteBuffer bufferWrapper = ByteBuffer.wrap(buffer);
final var read = channel.read(bufferWrapper, indices.get(index).filePosition());
assert read == indices.get(index).queryLength();
return new String(buffer, StandardCharsets.UTF_8);
}
}

Expand All @@ -111,8 +118,8 @@ public InputStream streamQuery(int index) throws IOException {
new BoundedInputStream(
Channels.newInputStream(
FileChannel.open(path, StandardOpenOption.READ)
.position(this.indices.get(index)[0] /* offset */)),
this.indices.get(index)[1] /* length */)));
.position(this.indices.get(index).filePosition() /* offset */)),
this.indices.get(index).queryLength() /* length */)));
}

/**
Expand Down Expand Up @@ -146,11 +153,11 @@ public int size() {
* @return the Indexes
* @throws IOException
*/
private static List<long[]> indexFile(Path filepath, String separator) throws IOException {
private static List<QueryIndex> indexFile(Path filepath, String separator) throws IOException {
try (InputStream fi = Files.newInputStream(filepath, StandardOpenOption.READ);
BufferedInputStream bis = new BufferedInputStream(fi)) {
return FileUtils.indexStream(separator, bis)
.stream().filter((long[] e) -> e[1] > 0 /* Only elements with length > 0 */).collect(Collectors.toList());
.stream().filter(e -> e.queryLength() > 0 /* Only elements with length > 0 */).collect(Collectors.toList());
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/aksw/iguana/cc/utils/files/QueryIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.aksw.iguana.cc.utils.files;

public record QueryIndex(long filePosition, long queryLength) {
}
Loading

0 comments on commit f1761cf

Please sign in to comment.