Skip to content

Commit

Permalink
Fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
JohannesLichtenberger committed Sep 23, 2024
1 parent bfa8ee3 commit d98608e
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 76 deletions.
20 changes: 5 additions & 15 deletions bundles/sirix-core/src/main/java/io/sirix/io/AbstractReader.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.sirix.io;

import io.sirix.access.ResourceConfiguration;
import io.sirix.api.PageReadOnlyTrx;
import io.sirix.io.bytepipe.ByteHandler;
import io.sirix.page.PagePersister;
import io.sirix.page.PageReference;
Expand Down Expand Up @@ -28,32 +29,21 @@ public abstract class AbstractReader implements Reader {
*/
protected final PagePersister pagePersister;

private final byte[] bytes = new byte[130_000];

public AbstractReader(ByteHandler byteHandler, PagePersister pagePersister, SerializationType type) {
this.byteHandler = byteHandler;
this.pagePersister = pagePersister;
this.type = type;
}

public Page deserialize(ResourceConfiguration resourceConfiguration, byte[] page, int uncompressedLength)
throws IOException {
public Page deserialize(ResourceConfiguration resourceConfiguration, byte[] page) throws IOException {
// perform byte operations
byte[] bytes;
try (final var inputStream = byteHandler.deserialize(new ByteArrayInputStream(page))) {
int bytesRead = 0;
while (bytesRead < uncompressedLength) {
int read = inputStream.read(bytes, bytesRead, uncompressedLength - bytesRead);
if (read == -1) {
throw new IOException("Unexpected end of stream while reading decompressed data.");
}
bytesRead += read;
}
assert bytesRead == uncompressedLength : "Read bytes mismatch: expected " + uncompressedLength + " but got " + bytesRead;
bytes = inputStream.readAllBytes();
}
wrappedForRead.write(bytes, 0, uncompressedLength);
wrappedForRead.write(bytes);
final var deserializedPage = pagePersister.deserializePage(resourceConfiguration, wrappedForRead, type);
wrappedForRead.clear();
assert !deserializedPage.isClosed();
return deserializedPage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,19 @@ public Page read(final @NonNull PageReference reference, final @Nullable Resourc
ByteBuffer buffer = ByteBuffer.allocateDirect(IOStorage.OTHER_BEACON).order(ByteOrder.nativeOrder());

final long position = reference.getKey();

dataFileChannel.read(buffer, position);
buffer.flip();
final int uncompressedLength = buffer.getInt();
buffer.flip();
dataFileChannel.read(buffer, position + 4);

buffer.flip();
final int dataLength = buffer.getInt();

buffer = ByteBuffer.allocate(dataLength).order(ByteOrder.nativeOrder());
dataFileChannel.read(buffer, position + 8);

dataFileChannel.read(buffer, position + 4);
buffer.flip();
final byte[] page = buffer.array();

// Perform byte operations.
return deserialize(resourceConfiguration, page, uncompressedLength);
return deserialize(resourceConfiguration, page);
} catch (final IOException e) {
throw new SirixIOException(e);
}
Expand All @@ -124,25 +121,21 @@ public PageReference readUberPageReference() {
@Override
public RevisionRootPage readRevisionRootPage(final int revision, final ResourceConfiguration resourceConfiguration) {
try {
final var dataFileOffset = cache.get(revision, _ -> getRevisionFileData(revision)).offset();
final var dataFileOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)).offset();

ByteBuffer buffer = ByteBuffer.allocateDirect(4).order(ByteOrder.nativeOrder());
dataFileChannel.read(buffer, dataFileOffset);
buffer.flip();
final int uncompressedDataLength = buffer.getInt();
buffer.flip();
dataFileChannel.read(buffer, dataFileOffset + 4);
buffer.flip();
final int dataLength = buffer.getInt();

buffer = ByteBuffer.allocateDirect(dataLength).order(ByteOrder.nativeOrder());
dataFileChannel.read(buffer, dataFileOffset + 8);
dataFileChannel.read(buffer, dataFileOffset + 4);
buffer.flip();
final byte[] page = new byte[dataLength];
buffer.get(page);

// Perform byte operations.
return (RevisionRootPage) deserialize(resourceConfiguration, page, uncompressedDataLength);
return (RevisionRootPage) deserialize(resourceConfiguration, page);
} catch (IOException e) {
throw new SirixIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,11 @@ private FileChannelWriter writePageReference(final ResourceConfiguration resourc
pagePersister.serializePage(resourceConfiguration, byteBufferBytes, page, serializationType);

final byte[] serializedPage;
final int uncompressedLength;

final var byteArray = byteBufferBytes.toByteArray();
if (page instanceof KeyValueLeafPage) {
uncompressedLength = Writer.bytesToIntLittleEndian(byteArray[0], byteArray[1], byteArray[2], byteArray[3]);
serializedPage = Arrays.copyOfRange(byteArray, 4, byteArray.length);;
serializedPage = byteArray;
} else {
uncompressedLength = byteArray.length;
try (final ByteArrayOutputStream output = new ByteArrayOutputStream(byteArray.length)) {
try (final DataOutputStream dataOutput = new DataOutputStream(reader.getByteHandler().serialize(output))) {
dataOutput.write(byteArray);
Expand Down Expand Up @@ -196,7 +193,6 @@ private FileChannelWriter writePageReference(final ResourceConfiguration resourc
bufferedBytes.writePosition(bufferedBytes.writePosition() + offsetToAdd);
}

bufferedBytes.writeInt(uncompressedLength);
bufferedBytes.writeInt(serializedPage.length);
bufferedBytes.write(serializedPage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@
import io.sirix.access.ResourceConfiguration;
import io.sirix.api.PageReadOnlyTrx;
import io.sirix.exception.SirixIOException;
import io.sirix.io.AbstractReader;
import io.sirix.io.IOStorage;
import io.sirix.io.Reader;
import io.sirix.io.RevisionFileData;
import io.sirix.io.bytepipe.ByteHandler;
import io.sirix.page.PagePersister;
import io.sirix.page.PageReference;
import io.sirix.page.RevisionRootPage;
import io.sirix.page.SerializationType;
import io.sirix.page.interfaces.Page;
import one.jasyncfio.AsyncFile;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import io.sirix.io.AbstractReader;
import io.sirix.io.IOStorage;
import io.sirix.io.Reader;
import io.sirix.io.RevisionFileData;
import io.sirix.page.interfaces.Page;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -113,22 +114,19 @@ private Page readPageFragment(@NonNull PageReference reference,

final long position = reference.getKey();
dataFile.read(buffer, position).join();
buffer.flip();
final int uncompressedDataLength = buffer.getInt();

dataFile.read(buffer, position + 4).join();
buffer.flip();
final int dataLength = buffer.getInt();

buffer = ByteBuffer.allocateDirect(dataLength).order(ByteOrder.nativeOrder());

dataFile.read(buffer, position + Integer.BYTES + Integer.BYTES).join();
dataFile.read(buffer, position + Integer.BYTES).join();
buffer.flip();
final byte[] page = new byte[dataLength];
buffer.get(page);

// Perform byte operations.
return deserialize(resourceConfiguration, page, uncompressedDataLength);
return deserialize(resourceConfiguration, page);
} catch (final IOException e) {
throw new SirixIOException(e);
}
Expand All @@ -137,33 +135,29 @@ private Page readPageFragment(@NonNull PageReference reference,
@Override
public RevisionRootPage readRevisionRootPage(final int revision, final ResourceConfiguration resourceConfiguration) {
try {
final var dataFileOffset = cache.get(revision, _ -> getRevisionFileData(revision)).offset();
final var dataFileOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)).offset();

ByteBuffer buffer = ByteBuffer.allocateDirect(Integer.BYTES).order(ByteOrder.nativeOrder());
dataFile.read(buffer, dataFileOffset).join();
buffer.flip();
final int uncompressedDataLength = buffer.getInt();

dataFile.read(buffer, dataFileOffset + Integer.BYTES).join();
buffer.flip();
final int dataLength = buffer.getInt();

buffer = ByteBuffer.allocateDirect(dataLength).order(ByteOrder.nativeOrder());
dataFile.read(buffer, dataFileOffset + Integer.BYTES + Integer.BYTES).join();
dataFile.read(buffer, dataFileOffset + Integer.BYTES).join();
buffer.flip();
final byte[] page = new byte[dataLength];
buffer.get(page);

// Perform byte operations.
return (RevisionRootPage) deserialize(resourceConfiguration, page, uncompressedDataLength);
return (RevisionRootPage) deserialize(resourceConfiguration, page);
} catch (IOException e) {
throw new SirixIOException(e);
}
}

@Override
public Instant readRevisionRootPageCommitTimestamp(int revision) {
return cache.get(revision, _ -> getRevisionFileData(revision)).timestamp();
return cache.get(revision, (unused) -> getRevisionFileData(revision)).timestamp();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,16 @@ public MMFileReader(final MemorySegment dataFileSegment, final MemorySegment rev
}

@Override
public Page read(final @NonNull PageReference reference,
final @Nullable ResourceConfiguration resourceConfiguration) {
public Page read(final @NonNull PageReference reference, final @Nullable ResourceConfiguration resourceConfiguration) {
try {
final long offset = reference.getKey() + LAYOUT_INT.byteSize() + LAYOUT_INT.byteSize();
final int uncompressedDataLength = dataFileSegment.get(LAYOUT_INT, reference.getKey());
final int dataLength = dataFileSegment.get(LAYOUT_INT, reference.getKey() + LAYOUT_INT.byteSize());
final long offset = reference.getKey() + LAYOUT_INT.byteSize();
final int dataLength = dataFileSegment.get(LAYOUT_INT, reference.getKey());

final byte[] page = new byte[dataLength];

MemorySegment.copy(dataFileSegment, LAYOUT_BYTE, offset, page, 0, dataLength);

return deserialize(resourceConfiguration, page, uncompressedDataLength);
return deserialize(resourceConfiguration, page);
} catch (final IOException e) {
throw new SirixIOException(e);
}
Expand All @@ -100,21 +98,15 @@ public Page read(final @NonNull PageReference reference,
public RevisionRootPage readRevisionRootPage(final int revision, final ResourceConfiguration resourceConfiguration) {
try {
//noinspection DataFlowIssue
final var dataFileOffset = cache.get(revision, _ -> getRevisionFileData(revision)).offset();
final var dataFileOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)).offset();

final int uncompressedDataLength = dataFileSegment.get(LAYOUT_INT, dataFileOffset);
final int dataLength = dataFileSegment.get(LAYOUT_INT, dataFileOffset + LAYOUT_INT.byteSize());
final int dataLength = dataFileSegment.get(LAYOUT_INT, dataFileOffset);

final byte[] page = new byte[dataLength];

MemorySegment.copy(dataFileSegment,
LAYOUT_BYTE,
dataFileOffset + LAYOUT_INT.byteSize() + LAYOUT_INT.byteSize(),
page,
0,
dataLength);
MemorySegment.copy(dataFileSegment, LAYOUT_BYTE, dataFileOffset + LAYOUT_INT.byteSize(), page, 0, dataLength);

return (RevisionRootPage) deserialize(resourceConfiguration, page, uncompressedDataLength);
return (RevisionRootPage) deserialize(resourceConfiguration, page);
} catch (final IOException e) {
throw new SirixIOException(e);
}
Expand All @@ -123,7 +115,7 @@ public RevisionRootPage readRevisionRootPage(final int revision, final ResourceC
@Override
public Instant readRevisionRootPageCommitTimestamp(int revision) {
//noinspection DataFlowIssue
return cache.get(revision, _ -> getRevisionFileData(revision)).timestamp();
return cache.get(revision, (unused) -> getRevisionFileData(revision)).timestamp();
}

@Override
Expand Down
8 changes: 1 addition & 7 deletions bundles/sirix-core/src/main/java/io/sirix/page/PageKind.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ public void serializePage(final ResourceConfiguration resourceConfig, final Byte

if (bytes != null) {
sink.write(bytes.bytesForWrite());
keyValueLeafPage.clear();
bytes.clear();
return;
}

Expand Down Expand Up @@ -298,11 +296,7 @@ public void serializePage(final ResourceConfiguration resourceConfig, final Byte
throw new UncheckedIOException(e);
}

VanillaBytes<Void> bytesToSet = Bytes.allocateDirect(Integer.BYTES + serializedPage.length);
bytesToSet.writeInt(uncompressedLength);
bytesToSet.write(serializedPage);

keyValueLeafPage.setBytes(bytesToSet);
keyValueLeafPage.setBytes(Bytes.wrapForRead(serializedPage));
}
},

Expand Down

0 comments on commit d98608e

Please sign in to comment.