Skip to content

Commit

Permalink
Fixed error on table compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
tomfran committed Feb 23, 2024
1 parent 77c7b72 commit 7d22f1f
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tasks.test {
jmh {
fork = 1
warmupIterations = 3
iterations = 10
iterations = 5
benchmarkMode = ['thrpt']
includes = ['LSMTreeAddBenchmark*']
jmhTimeout = '15s'
Expand Down
2 changes: 1 addition & 1 deletion src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class LSMTreeAddBenchmark {
static final Path DIR = Path.of("tree_add_benchmark");
static final int NUM_ITEMS = 1000000;
static final int MEMTABLE_SIZE = 1024 * 1024 * 256;
static final int LEVEL_SIZE = 2;
static final int LEVEL_SIZE = 4;
static int index = 0;

LSMTree tree;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/tomfran/lsm/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static void main(String[] args) throws InterruptedException {
if (new File(DIRECTORY).exists())
deleteDir();

LSMTree tree = new LSMTree(1024 * 1024 * 5, 2, DIRECTORY);
LSMTree tree = new LSMTree(1024 * 512, 2, DIRECTORY);

Scanner scanner = new Scanner(System.in);
scanner.useDelimiter("\n");
Expand Down Expand Up @@ -64,7 +64,7 @@ public static void main(String[] args) throws InterruptedException {
}
case "r", "rng" -> {
IntStream.range(Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))
.forEach(i -> tree.add(new ByteArrayPair(intToBytes(i), intToBytes(r.nextInt()))));
.forEach(i -> tree.add(new ByteArrayPair(intToBytes(i), intToBytes(r.nextInt()))));

System.out.println("ok");
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/tomfran/lsm/sstable/SSTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public static ObjectArrayList<SSTable> sortedRun(String dataDir, long sstMaxSize
*/
public byte[] get(byte[] key) {
if (ByteArrayComparator.compare(key, minKey) == -1 ||
ByteArrayComparator.compare(key, maxKey) == 1 ||
!bloomFilter.mightContain(key))
ByteArrayComparator.compare(key, maxKey) == 1 ||
!bloomFilter.mightContain(key))
return null;

int offsetIndex = getCandidateOffsetIndex(key);
Expand Down
50 changes: 25 additions & 25 deletions src/main/java/com/tomfran/lsm/tree/LSMTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.tomfran.lsm.types.ByteArrayPair;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;

import javax.management.relation.RoleUnresolvedList;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
Expand All @@ -29,7 +30,7 @@ public class LSMTree {

static final long DEFAULT_MEMTABLE_MAX_BYTE_SIZE = 1024 * 1024 * 32;
static final int DEFAULT_LEVEL_ZERO_MAX_SIZE = 2;
static final int LEVEL_INCR_FACTOR = 2;
static final double LEVEL_INCR_FACTOR = 1.75;

static final String DEFAULT_DATA_DIRECTORY = "LSM-data";

Expand Down Expand Up @@ -168,7 +169,7 @@ private void flushMemtable() {
SSTable table = new SSTable(dataDir, memtableToFlush.iterator(), mutableMemtableMaxSize * 2);

synchronized (tableLock) {
levels.get(0).add(table);
levels.get(0).add(0, table);
}

synchronized (immutableMemtablesLock) {
Expand All @@ -186,32 +187,31 @@ private void levelCompaction() {
for (int i = 0; i < n; i++) {
ObjectArrayList<SSTable> level = levels.get(i);

if (level.size() <= maxLevelSize)
continue;
if (level.size() > maxLevelSize) {
// add new level if needed
if (i == n - 1)
levels.add(new ObjectArrayList<>());

// add new level if needed
if (i == n - 1)
levels.add(new ObjectArrayList<>());
// take all tables from the current and next level
ObjectArrayList<SSTable> nextLevel = levels.get(i + 1);
ObjectArrayList<SSTable> merge = new ObjectArrayList<>();
merge.addAll(level);
merge.addAll(nextLevel);

// take all tables from the current and next level
ObjectArrayList<SSTable> nextLevel = levels.get(i + 1);
ObjectArrayList<SSTable> merge = new ObjectArrayList<>();
merge.addAll(level);
merge.addAll(nextLevel);
// perform a sorted run and replace the next level
var sortedRun = SSTable.sortedRun(dataDir, sstMaxSize, merge.toArray(SSTable[]::new));

// perform a sorted run and replace the next level
var sortedRun = SSTable.sortedRun(dataDir, sstMaxSize, merge.toArray(SSTable[]::new));
// delete previous tables
level.forEach(SSTable::closeAndDelete);
level.clear();
nextLevel.forEach(SSTable::closeAndDelete);
nextLevel.clear();

// delete previous tables
level.forEach(SSTable::closeAndDelete);
level.clear();
nextLevel.forEach(SSTable::closeAndDelete);
nextLevel.clear();
nextLevel.addAll(sortedRun);
}

nextLevel.addAll(sortedRun);

maxLevelSize *= LEVEL_INCR_FACTOR;
sstMaxSize *= LEVEL_INCR_FACTOR;
maxLevelSize = (int) (maxLevelSize * LEVEL_INCR_FACTOR);
sstMaxSize = (int) (sstMaxSize * LEVEL_INCR_FACTOR);
}
}
}
Expand Down Expand Up @@ -241,8 +241,8 @@ public String toString() {
for (var level : levels) {
s.append(String.format("\t\t- %d: ", i));
level.stream()
.map(st -> String.format("[ %s, size: %d ] ", st.filename, st.size))
.forEach(s::append);
.map(st -> String.format("[ %s, size: %d ] ", st.filename, st.size))
.forEach(s::append);
s.append("\n");
i += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static Stream<Arguments> shouldCompare() {
Arguments.of(new byte[]{1, 2}, new byte[]{1, 3}, -1),
Arguments.of(new byte[]{1, 2}, new byte[]{1, 1}, 1),
Arguments.of(new byte[]{1, 2}, new byte[]{1, 2}, 0)
);
);
}

@ParameterizedTest
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/tomfran/lsm/sstable/SSTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public static void setup() {

// sort and divide into inserted and skipped
var items = l.stream()
.sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key()))
.toList();
.sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key()))
.toList();

inserted = new ObjectArrayList<>();
skipped = new ObjectArrayList<>();
Expand Down

0 comments on commit 7d22f1f

Please sign in to comment.