From 7d22f1f3dc2202586131245304b3d8b5e82d07e2 Mon Sep 17 00:00:00 2001 From: Francesco Date: Fri, 23 Feb 2024 17:55:09 +0100 Subject: [PATCH] Fixed error on table compaction --- build.gradle | 2 +- .../tomfran/lsm/tree/LSMTreeAddBenchmark.java | 2 +- src/main/java/com/tomfran/lsm/Main.java | 4 +- .../java/com/tomfran/lsm/sstable/SSTable.java | 4 +- .../java/com/tomfran/lsm/tree/LSMTree.java | 50 +++++++++---------- .../comparator/ByteArrayComparatorTest.java | 2 +- .../com/tomfran/lsm/sstable/SSTableTest.java | 4 +- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/build.gradle b/build.gradle index ecc29dc..a1d5f6a 100644 --- a/build.gradle +++ b/build.gradle @@ -27,7 +27,7 @@ tasks.test { jmh { fork = 1 warmupIterations = 3 - iterations = 10 + iterations = 5 benchmarkMode = ['thrpt'] includes = ['LSMTreeAddBenchmark*'] jmhTimeout = '15s' diff --git a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java index 9c09361..0faa5e8 100644 --- a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java @@ -14,7 +14,7 @@ public class LSMTreeAddBenchmark { static final Path DIR = Path.of("tree_add_benchmark"); static final int NUM_ITEMS = 1000000; static final int MEMTABLE_SIZE = 1024 * 1024 * 256; - static final int LEVEL_SIZE = 2; + static final int LEVEL_SIZE = 4; static int index = 0; LSMTree tree; diff --git a/src/main/java/com/tomfran/lsm/Main.java b/src/main/java/com/tomfran/lsm/Main.java index 614f59e..9f431a8 100644 --- a/src/main/java/com/tomfran/lsm/Main.java +++ b/src/main/java/com/tomfran/lsm/Main.java @@ -20,7 +20,7 @@ public static void main(String[] args) throws InterruptedException { if (new File(DIRECTORY).exists()) deleteDir(); - LSMTree tree = new LSMTree(1024 * 1024 * 5, 2, DIRECTORY); + LSMTree tree = new LSMTree(1024 * 512, 2, DIRECTORY); Scanner scanner = new Scanner(System.in); scanner.useDelimiter("\n"); @@ -64,7 +64,7 @@ public static void main(String[] args) throws InterruptedException { } case "r", "rng" -> { IntStream.range(Integer.parseInt(parts[1]), Integer.parseInt(parts[2])) - .forEach(i -> tree.add(new ByteArrayPair(intToBytes(i), intToBytes(r.nextInt())))); + .forEach(i -> tree.add(new ByteArrayPair(intToBytes(i), intToBytes(r.nextInt())))); System.out.println("ok"); } diff --git a/src/main/java/com/tomfran/lsm/sstable/SSTable.java b/src/main/java/com/tomfran/lsm/sstable/SSTable.java index 61625a3..57361f5 100644 --- a/src/main/java/com/tomfran/lsm/sstable/SSTable.java +++ b/src/main/java/com/tomfran/lsm/sstable/SSTable.java @@ -102,8 +102,8 @@ public static ObjectArrayList sortedRun(String dataDir, long sstMaxSize */ public byte[] get(byte[] key) { if (ByteArrayComparator.compare(key, minKey) == -1 || - ByteArrayComparator.compare(key, maxKey) == 1 || - !bloomFilter.mightContain(key)) + ByteArrayComparator.compare(key, maxKey) == 1 || + !bloomFilter.mightContain(key)) return null; int offsetIndex = getCandidateOffsetIndex(key); diff --git a/src/main/java/com/tomfran/lsm/tree/LSMTree.java b/src/main/java/com/tomfran/lsm/tree/LSMTree.java index 07bdcfa..260380e 100644 --- a/src/main/java/com/tomfran/lsm/tree/LSMTree.java +++ b/src/main/java/com/tomfran/lsm/tree/LSMTree.java @@ -5,6 +5,7 @@ import com.tomfran.lsm.types.ByteArrayPair; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import javax.management.relation.RoleUnresolvedList; import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; @@ -29,7 +30,7 @@ public class LSMTree { static final long DEFAULT_MEMTABLE_MAX_BYTE_SIZE = 1024 * 1024 * 32; static final int DEFAULT_LEVEL_ZERO_MAX_SIZE = 2; - static final int LEVEL_INCR_FACTOR = 2; + static final double LEVEL_INCR_FACTOR = 1.75; static final String DEFAULT_DATA_DIRECTORY = "LSM-data"; @@ -168,7 +169,7 @@ private void flushMemtable() { SSTable table = new SSTable(dataDir, memtableToFlush.iterator(), mutableMemtableMaxSize * 2); synchronized (tableLock) { - levels.get(0).add(table); + levels.get(0).add(0, table); } synchronized (immutableMemtablesLock) { @@ -186,32 +187,31 @@ private void levelCompaction() { for (int i = 0; i < n; i++) { ObjectArrayList level = levels.get(i); - if (level.size() <= maxLevelSize) - continue; + if (level.size() > maxLevelSize) { + // add new level if needed + if (i == n - 1) + levels.add(new ObjectArrayList<>()); - // add new level if needed - if (i == n - 1) - levels.add(new ObjectArrayList<>()); + // take all tables from the current and next level + ObjectArrayList nextLevel = levels.get(i + 1); + ObjectArrayList merge = new ObjectArrayList<>(); + merge.addAll(level); + merge.addAll(nextLevel); - // take all tables from the current and next level - ObjectArrayList nextLevel = levels.get(i + 1); - ObjectArrayList merge = new ObjectArrayList<>(); - merge.addAll(level); - merge.addAll(nextLevel); + // perform a sorted run and replace the next level + var sortedRun = SSTable.sortedRun(dataDir, sstMaxSize, merge.toArray(SSTable[]::new)); - // perform a sorted run and replace the next level - var sortedRun = SSTable.sortedRun(dataDir, sstMaxSize, merge.toArray(SSTable[]::new)); + // delete previous tables + level.forEach(SSTable::closeAndDelete); + level.clear(); + nextLevel.forEach(SSTable::closeAndDelete); + nextLevel.clear(); - // delete previous tables - level.forEach(SSTable::closeAndDelete); - level.clear(); - nextLevel.forEach(SSTable::closeAndDelete); - nextLevel.clear(); + nextLevel.addAll(sortedRun); + } - nextLevel.addAll(sortedRun); - - maxLevelSize *= LEVEL_INCR_FACTOR; - sstMaxSize *= LEVEL_INCR_FACTOR; + maxLevelSize = (int) (maxLevelSize * LEVEL_INCR_FACTOR); + sstMaxSize = (int) (sstMaxSize * LEVEL_INCR_FACTOR); } } } @@ -241,8 +241,8 @@ public String toString() { for (var level : levels) { s.append(String.format("\t\t- %d: ", i)); level.stream() - .map(st -> String.format("[ %s, size: %d ] ", st.filename, st.size)) - .forEach(s::append); + .map(st -> String.format("[ %s, size: %d ] ", st.filename, st.size)) + .forEach(s::append); s.append("\n"); i += 1; } diff --git a/src/test/java/com/tomfran/lsm/comparator/ByteArrayComparatorTest.java b/src/test/java/com/tomfran/lsm/comparator/ByteArrayComparatorTest.java index 646f2f1..7f680a0 100644 --- a/src/test/java/com/tomfran/lsm/comparator/ByteArrayComparatorTest.java +++ b/src/test/java/com/tomfran/lsm/comparator/ByteArrayComparatorTest.java @@ -15,7 +15,7 @@ static Stream shouldCompare() { Arguments.of(new byte[]{1, 2}, new byte[]{1, 3}, -1), Arguments.of(new byte[]{1, 2}, new byte[]{1, 1}, 1), Arguments.of(new byte[]{1, 2}, new byte[]{1, 2}, 0) - ); + ); } @ParameterizedTest diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java index fb31367..bbd153a 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java @@ -36,8 +36,8 @@ public static void setup() { // sort and divide into inserted and skipped var items = l.stream() - .sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key())) - .toList(); + .sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key())) + .toList(); inserted = new ObjectArrayList<>(); skipped = new ObjectArrayList<>();