Skip to content

Commit

Permalink
Fix: Crash when creating a new cublet for a single cube (#149)
Browse files Browse the repository at this point in the history
* fix: NullPointerException when a new cublet is created

* fix: wrong data chunk offset due to not resetting after cublet change

* refactor: new data chunk instead of inplace setting

* fix: formatting

---------

Co-authored-by: hugy718 <[email protected]>
  • Loading branch information
tinyAdapter and hugy718 authored Aug 7, 2024
1 parent db69825 commit 1d45e31
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Read cublet store
Expand All @@ -50,6 +52,8 @@
*/
public class CubletRS implements Input {

static final Logger logger = LoggerFactory.getLogger(CubletRS.class);

/**
* MetaChunk for this cublet.
*/
Expand Down Expand Up @@ -82,6 +86,8 @@ public CubletRS(TableSchema schema) {
*/
@Override
public void readFrom(ByteBuffer buffer) {
logger.debug("readFrom: buffer.limit()=" + buffer.limit());

// Read header offset
int end = buffer.limit();
this.limit = end;
Expand All @@ -105,12 +111,15 @@ public void readFrom(ByteBuffer buffer) {
}
}

logger.debug("headOffset=" + headOffset);

// Get #chunk and chunk offsets
buffer.position(headOffset);
int chunks = buffer.getInt();
int[] chunkOffsets = new int[chunks];
for (int i = 0; i < chunks; i++) {
chunkOffsets[i] = buffer.getInt();
logger.debug("chunkOffsets[" + i + "]=" + chunkOffsets[i]);
}

// read the metaChunk, which is the last one in #chunks
Expand All @@ -124,6 +133,7 @@ public void readFrom(ByteBuffer buffer) {
for (int i = 0; i < chunks - 1; i++) {
buffer.position(chunkOffsets[i]);
chunkHeadOffset = buffer.getInt();
logger.debug("chunkHeadOffset[" + i + "]=" + chunkHeadOffset);
buffer.position(chunkHeadOffset);
ChunkRS chunk = new ChunkRS(this.schema, this.metaChunk);
chunk.readFrom(buffer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.nus.cool.core.io.writestore;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.nus.cool.core.field.FieldValue;
import com.nus.cool.core.field.HashField;
Expand All @@ -14,6 +15,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -38,7 +40,12 @@ public class MetaUserFieldWS extends MetaHashFieldWS {
public MetaUserFieldWS(FieldType type, Charset charset, MetaChunkWS metaChunkWS) {
super(type, charset);
this.metaChunkWS = metaChunkWS;
for (int invariantIdx : metaChunkWS.getTableSchema().getInvariantFieldIdxs()) {
this.resetInvariantIdxToValueList();
}

private void resetInvariantIdxToValueList() {
this.invariantIdxToValueList.clear();
for (int invariantIdx : this.metaChunkWS.getTableSchema().getInvariantFieldIdxs()) {
this.invariantIdxToValueList.put(invariantIdx, new LinkedList<>());
}
}
Expand All @@ -47,7 +54,7 @@ public MetaUserFieldWS(FieldType type, Charset charset, MetaChunkWS metaChunkWS)
public void put(FieldValue[] tuple, int idx) throws IllegalArgumentException {
if (!(tuple[idx] instanceof HashField)) {
throw new IllegalArgumentException(
"Illegal argument for MetaUserFieldWS (HashField required).");
"Illegal argument for MetaUserFieldWS (HashField required).");
}
HashField user = (HashField) tuple[idx];
int hashKey = user.getInt();
Expand Down Expand Up @@ -89,15 +96,12 @@ public int count() {

@Override
public void cleanForNextCublet() {
this.fingerToGid.clear();
this.invariantIdxToValueList.clear();
this.valueList.clear();
this.nextGid = 0; // a field can have different id across cublet.
super.cleanForNextCublet();
this.resetInvariantIdxToValueList();
}

@Override
public int writeTo(DataOutput out) throws IOException {

int bytesWritten = writeFingersAndGids(out);

TableSchema tableSchema = this.metaChunkWS.getTableSchema();
Expand All @@ -121,13 +125,13 @@ public int writeTo(DataOutput out) throws IOException {
.numOfValues(values.size())
.build();
bytesWritten += OutputCompressor.writeTo(CompressType.Value, hist,
values, out);
values, out);
}

// Write value
bytesWritten += OutputCompressor.writeTo(CompressType.KeyString,
Histogram.builder().charset(charset).build(),
valueList, out);
Histogram.builder().charset(charset).build(),
valueList, out);

return bytesWritten;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import java.util.List;
import javax.validation.constraints.NotNull;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Native data writer writes a set of records in cool storage format.
*/
@RequiredArgsConstructor
public class NativeDataWriter implements DataWriter {

static final Logger logger = LoggerFactory.getLogger(NativeDataWriter.class);

@NotNull
private final TableSchema tableSchema;

Expand Down Expand Up @@ -107,6 +111,7 @@ private boolean maybeSwitchChunk(FieldValue curUser) throws IOException {
}
finishChunk();
// create a new data chunk, init tuple Count
logger.debug("newDataChunk: offset=" + offset);
dataChunk = DataChunkWS.newDataChunk(tableSchema, metaChunk.getMetaFields(), offset);
tupleCount = 0;
return true;
Expand Down Expand Up @@ -134,6 +139,8 @@ private void finishCublet() throws IOException {
// 4. flush after writing whole Cublet.
out.flush();
out.close();

this.offset = 0;
}

/**
Expand All @@ -146,20 +153,23 @@ private DataOutputStream newCublet() throws IOException {
System.out.println("[*] A new cublet " + fileName + " is created!");
File cublet = new File(outputDir, fileName);
DataOutputStream out = new DataOutputStream(new FileOutputStream(cublet));
offset = 0;
chunkHeaderOffsets.clear();
return out;
}

/**
* Switch a new cublet File once meet 1GB.
*/
private void maybeSwitchCublet() throws IOException {
private boolean maybeSwitchCublet() throws IOException {
if (offset < cubletSize) {
return;
return false;
}
finishCublet();
logger.debug("switching cublet...");

out = newCublet();

return true;
}

@Override
Expand All @@ -170,7 +180,11 @@ public boolean add(FieldValue[] tuple) throws IOException {
}
// start a new chunk
if (maybeSwitchChunk(curUser)) {
maybeSwitchCublet();
if (maybeSwitchCublet()) {
// create a new data chunk with offset 0
this.dataChunk = DataChunkWS.newDataChunk(
this.tableSchema, this.metaChunk.getMetaFields(), 0);
}
}
lastUser = curUser;
// update metachunk / metafield
Expand Down
8 changes: 8 additions & 0 deletions cool-core/src/main/java/com/nus/cool/model/CoolModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ public synchronized void reload(String cube) throws IOException {
CubeRS cubeRS = new CubeRS(schema);

File[] cubletFiles = currentVersion.listFiles((file, s) -> s.endsWith(".dz"));
Arrays.sort(cubletFiles);

logger.info("Cube " + cube + ", Use version: " + currentVersion.getName());
storePath.put(cube, currentVersion);

logger.debug("cublet files: ");
for (int i = 0; i < cubletFiles.length; i++) {
logger.debug(cubletFiles[i].getPath());
}

// Load all cubes under latest version
checkNotNull(cubletFiles);
for (File cubletFile : cubletFiles) {
logger.debug("loading cublet file: " + cubletFile);
cubeRS.addCublet(cubletFile);
}

Expand Down

0 comments on commit 1d45e31

Please sign in to comment.